Back to Integrations

Webhook Integration for LLMs - Complete Guide

Build robust event-driven LLM applications with webhooks. This guide covers async processing patterns, security implementation, platform integrations, and production-ready scaling strategies.

Quick Start

Set up a webhook receiver for LLM events:

npm init -y
npm install express body-parser crypto dotenv
npm install @types/node typescript nodemon ts-node -D
npm install bull redis # For queue management

1. Webhook Architecture for LLMs

System Overview

// src/types/webhook.types.ts
export interface WebhookConfig {
  url: string
  secret: string
  events: WebhookEvent[]
  retryConfig: RetryConfig
  headers?: Record<string, string>
  timeout?: number
}

export interface WebhookEvent {
  type: 'llm.completion' | 'llm.error' | 'llm.streaming.start' | 'llm.streaming.end'
  filters?: Record<string, any>
}

export interface WebhookPayload {
  id: string
  timestamp: string
  event: string
  data: {
    requestId: string
    conversationId?: string
    userId: string
    model: string
    prompt: string
    completion?: string
    tokens?: {
      prompt: number
      completion: number
      total: number
    }
    error?: {
      code: string
      message: string
    }
    metadata?: Record<string, any>
  }
}

export interface RetryConfig {
  maxAttempts: number
  initialDelay: number
  maxDelay: number
  backoffMultiplier: number
}

Webhook Registry Service

// src/services/webhook-registry.ts
import { WebhookConfig } from '../types/webhook.types'
import { Redis } from 'ioredis'

export class WebhookRegistry {
  private redis: Redis
  
  constructor(redis: Redis) {
    this.redis = redis
  }
  
  async register(userId: string, webhook: WebhookConfig): Promise<string> {
    const webhookId = this.generateWebhookId()
    const key = `webhook:${userId}:${webhookId}`
    
    await this.redis.set(key, JSON.stringify({
      ...webhook,
      id: webhookId,
      userId,
      createdAt: new Date().toISOString(),
      active: true,
    }))
    
    // Add to user's webhook list
    await this.redis.sadd(`user:${userId}:webhooks`, webhookId)
    
    // Index by event type for efficient filtering
    for (const event of webhook.events) {
      await this.redis.sadd(`webhooks:event:${event.type}`, key)
    }
    
    return webhookId
  }
  
  async getWebhooksForEvent(eventType: string): Promise<WebhookConfig[]> {
    const keys = await this.redis.smembers(`webhooks:event:${eventType}`)
    const webhooks: WebhookConfig[] = []
    
    for (const key of keys) {
      const data = await this.redis.get(key)
      if (data) {
        const webhook = JSON.parse(data)
        if (webhook.active) {
          webhooks.push(webhook)
        }
      }
    }
    
    return webhooks
  }
  
  async deactivate(userId: string, webhookId: string): Promise<void> {
    const key = `webhook:${userId}:${webhookId}`
    const data = await this.redis.get(key)
    
    if (data) {
      const webhook = JSON.parse(data)
      webhook.active = false
      await this.redis.set(key, JSON.stringify(webhook))
    }
  }
  
  private generateWebhookId(): string {
    return `whk_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
  }
}

2. Async LLM Processing Patterns

Async LLM Service with Callbacks

// src/services/async-llm.service.ts
import { Queue, Worker } from 'bullmq'
import { LLMService } from './llm.service'
import { WebhookDispatcher } from './webhook-dispatcher'
import { WebhookPayload } from '../types/webhook.types'

export class AsyncLLMService {
  private llmQueue: Queue
  private llmService: LLMService
  private webhookDispatcher: WebhookDispatcher
  
  constructor(redis: any) {
    this.llmQueue = new Queue('llm-processing', {
      connection: redis,
      defaultJobOptions: {
        removeOnComplete: true,
        removeOnFail: false,
      },
    })
    
    this.llmService = new LLMService()
    this.webhookDispatcher = new WebhookDispatcher(redis)
    
    this.setupWorker(redis)
  }
  
  async submitRequest(request: {
    userId: string
    prompt: string
    model: string
    webhookUrl?: string
    metadata?: any
  }): Promise<{ requestId: string }> {
    const job = await this.llmQueue.add('process-llm', {
      ...request,
      requestId: this.generateRequestId(),
      timestamp: new Date().toISOString(),
    })
    
    return { requestId: job.data.requestId }
  }
  
  private setupWorker(redis: any) {
    const worker = new Worker(
      'llm-processing',
      async (job) => {
        const { userId, prompt, model, webhookUrl, metadata, requestId } = job.data
        
        try {
          // Notify webhook of start
          await this.webhookDispatcher.dispatch({
            id: this.generateEventId(),
            timestamp: new Date().toISOString(),
            event: 'llm.streaming.start',
            data: {
              requestId,
              userId,
              model,
              prompt,
              metadata,
            },
          }, userId)
          
          // Process LLM request
          const startTime = Date.now()
          const response = await this.llmService.complete({
            prompt,
            model,
            stream: false,
          })
          const duration = Date.now() - startTime
          
          // Prepare webhook payload
          const payload: WebhookPayload = {
            id: this.generateEventId(),
            timestamp: new Date().toISOString(),
            event: 'llm.completion',
            data: {
              requestId,
              userId,
              model,
              prompt,
              completion: response.text,
              tokens: response.tokens,
              metadata: {
                ...metadata,
                duration,
                provider: 'parrotrouter',
              },
            },
          }
          
          // Dispatch to registered webhooks
          await this.webhookDispatcher.dispatch(payload, userId)
          
          // If callback URL provided, send there too
          if (webhookUrl) {
            await this.webhookDispatcher.sendToUrl(webhookUrl, payload)
          }
          
          return payload
        } catch (error) {
          // Handle error
          const errorPayload: WebhookPayload = {
            id: this.generateEventId(),
            timestamp: new Date().toISOString(),
            event: 'llm.error',
            data: {
              requestId,
              userId,
              model,
              prompt,
              error: {
                code: error.code || 'UNKNOWN_ERROR',
                message: error.message,
              },
              metadata,
            },
          }
          
          await this.webhookDispatcher.dispatch(errorPayload, userId)
          throw error
        }
      },
      {
        connection: redis,
        concurrency: 10,
      }
    )
    
    worker.on('completed', (job) => {
      console.log(`LLM job ${job.id} completed`)
    })
    
    worker.on('failed', (job, err) => {
      console.error(`LLM job ${job?.id} failed:`, err)
    })
  }
  
  private generateRequestId(): string {
    return `req_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
  }
  
  private generateEventId(): string {
    return `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
  }
}

Streaming with Webhooks

// src/services/streaming-webhook.ts
export class StreamingWebhookService {
  private activeStreams = new Map<string, NodeJS.Timer>()
  
  async streamWithWebhooks(
    request: StreamRequest,
    onToken: (token: string) => void
  ): Promise<void> {
    const streamId = this.generateStreamId()
    let buffer = ''
    let chunkCount = 0
    
    // Send updates every N tokens or M milliseconds
    const flushBuffer = async () => {
      if (buffer.length > 0) {
        await this.webhookDispatcher.dispatch({
          id: this.generateEventId(),
          timestamp: new Date().toISOString(),
          event: 'llm.streaming.chunk',
          data: {
            requestId: request.requestId,
            streamId,
            chunkIndex: chunkCount++,
            content: buffer,
          },
        }, request.userId)
        
        buffer = ''
      }
    }
    
    // Set up periodic flush
    const flushInterval = setInterval(flushBuffer, 1000)
    this.activeStreams.set(streamId, flushInterval)
    
    try {
      await this.llmService.stream(request, async (token) => {
        buffer += token
        onToken(token)
        
        // Flush if buffer gets too large
        if (buffer.length > 100) {
          await flushBuffer()
        }
      })
      
      // Final flush
      await flushBuffer()
      
      // Send completion event
      await this.webhookDispatcher.dispatch({
        id: this.generateEventId(),
        timestamp: new Date().toISOString(),
        event: 'llm.streaming.end',
        data: {
          requestId: request.requestId,
          streamId,
          totalChunks: chunkCount,
        },
      }, request.userId)
    } finally {
      clearInterval(flushInterval)
      this.activeStreams.delete(streamId)
    }
  }
}

3. Security & Validation

HMAC Signature Implementation

// src/security/webhook-security.ts
import crypto from 'crypto'

export class WebhookSecurity {
  static generateSignature(
    payload: string | Buffer,
    secret: string,
    algorithm: string = 'sha256'
  ): string {
    const hmac = crypto.createHmac(algorithm, secret)
    hmac.update(payload)
    return hmac.digest('hex')
  }
  
  static verifySignature(
    payload: string | Buffer,
    signature: string,
    secret: string,
    algorithm: string = 'sha256'
  ): boolean {
    const expectedSignature = this.generateSignature(payload, secret, algorithm)
    
    // Use timing-safe comparison
    return crypto.timingSafeEqual(
      Buffer.from(expectedSignature, 'hex'),
      Buffer.from(signature, 'hex')
    )
  }
  
  static generateWebhookHeaders(
    payload: string,
    secret: string
  ): Record<string, string> {
    const timestamp = Math.floor(Date.now() / 1000).toString()
    const signaturePayload = `${timestamp}.${payload}`
    const signature = this.generateSignature(signaturePayload, secret)
    
    return {
      'X-Webhook-Signature': signature,
      'X-Webhook-Timestamp': timestamp,
      'X-Webhook-Algorithm': 'sha256',
    }
  }
  
  static validateWebhookRequest(
    body: string,
    headers: Record<string, string>,
    secret: string,
    maxAgeSeconds: number = 300
  ): { valid: boolean; error?: string } {
    const signature = headers['x-webhook-signature']
    const timestamp = headers['x-webhook-timestamp']
    const algorithm = headers['x-webhook-algorithm'] || 'sha256'
    
    if (!signature || !timestamp) {
      return { valid: false, error: 'Missing signature or timestamp' }
    }
    
    // Check timestamp to prevent replay attacks
    const currentTime = Math.floor(Date.now() / 1000)
    const webhookTime = parseInt(timestamp, 10)
    
    if (isNaN(webhookTime)) {
      return { valid: false, error: 'Invalid timestamp' }
    }
    
    if (currentTime - webhookTime > maxAgeSeconds) {
      return { valid: false, error: 'Request too old' }
    }
    
    // Verify signature
    const signaturePayload = `${timestamp}.${body}`
    const isValid = this.verifySignature(
      signaturePayload,
      signature,
      secret,
      algorithm
    )
    
    return { valid: isValid, error: isValid ? undefined : 'Invalid signature' }
  }
}

// Middleware for Express
export function webhookAuthMiddleware(getSecret: (req: any) => string) {
  return async (req: any, res: any, next: any) => {
    const secret = getSecret(req)
    if (!secret) {
      return res.status(401).json({ error: 'Webhook not configured' })
    }
    
    const validation = WebhookSecurity.validateWebhookRequest(
      JSON.stringify(req.body),
      req.headers,
      secret
    )
    
    if (!validation.valid) {
      return res.status(401).json({ error: validation.error })
    }
    
    next()
  }
}

Request Validation

// src/validation/webhook-validation.ts
import Joi from 'joi'

export const webhookConfigSchema = Joi.object({
  url: Joi.string().uri().required(),
  secret: Joi.string().min(32).required(),
  events: Joi.array().items(
    Joi.object({
      type: Joi.string().valid(
        'llm.completion',
        'llm.error',
        'llm.streaming.start',
        'llm.streaming.chunk',
        'llm.streaming.end'
      ).required(),
      filters: Joi.object().optional(),
    })
  ).min(1).required(),
  retryConfig: Joi.object({
    maxAttempts: Joi.number().min(1).max(10).default(3),
    initialDelay: Joi.number().min(100).max(10000).default(1000),
    maxDelay: Joi.number().min(1000).max(60000).default(30000),
    backoffMultiplier: Joi.number().min(1).max(5).default(2),
  }).optional(),
  headers: Joi.object().pattern(Joi.string(), Joi.string()).optional(),
  timeout: Joi.number().min(1000).max(30000).default(10000),
})

export function validateWebhookPayload(payload: any): boolean {
  // Ensure required fields exist
  if (!payload.id || !payload.timestamp || !payload.event || !payload.data) {
    return false
  }
  
  // Validate timestamp is recent (within last hour)
  const timestamp = new Date(payload.timestamp).getTime()
  const now = Date.now()
  if (isNaN(timestamp) || Math.abs(now - timestamp) > 3600000) {
    return false
  }
  
  // Validate event data based on type
  switch (payload.event) {
    case 'llm.completion':
      return !!(
        payload.data.requestId &&
        payload.data.prompt &&
        payload.data.completion &&
        payload.data.model
      )
    case 'llm.error':
      return !!(
        payload.data.requestId &&
        payload.data.error?.code &&
        payload.data.error?.message
      )
    default:
      return true
  }
}

4. Reliability & Retry Logic

Webhook Dispatcher with Retry

// src/services/webhook-dispatcher.ts
import axios, { AxiosError } from 'axios'
import { Queue, Worker } from 'bullmq'
import { WebhookRegistry } from './webhook-registry'
import { WebhookSecurity } from '../security/webhook-security'
import { WebhookPayload, WebhookConfig } from '../types/webhook.types'

export class WebhookDispatcher {
  private webhookQueue: Queue
  private registry: WebhookRegistry
  
  constructor(redis: any) {
    this.webhookQueue = new Queue('webhook-delivery', {
      connection: redis,
    })
    
    this.registry = new WebhookRegistry(redis)
    this.setupWorker(redis)
  }
  
  async dispatch(payload: WebhookPayload, userId: string): Promise<void> {
    // Get all active webhooks for this event
    const webhooks = await this.registry.getWebhooksForEvent(payload.event)
    
    // Filter by user and any additional filters
    const userWebhooks = webhooks.filter(w => w.userId === userId)
    
    // Queue delivery jobs
    for (const webhook of userWebhooks) {
      if (this.matchesFilters(payload, webhook.filters)) {
        await this.queueDelivery(webhook, payload)
      }
    }
  }
  
  async sendToUrl(url: string, payload: WebhookPayload): Promise<void> {
    const tempWebhook: WebhookConfig = {
      url,
      secret: process.env.DEFAULT_WEBHOOK_SECRET || 'temp-secret',
      events: [{ type: payload.event as any }],
      retryConfig: {
        maxAttempts: 3,
        initialDelay: 1000,
        maxDelay: 30000,
        backoffMultiplier: 2,
      },
    }
    
    await this.queueDelivery(tempWebhook, payload)
  }
  
  private async queueDelivery(
    webhook: WebhookConfig,
    payload: WebhookPayload
  ): Promise<void> {
    await this.webhookQueue.add(
      'deliver',
      {
        webhook,
        payload,
        attempt: 0,
      },
      {
        attempts: webhook.retryConfig.maxAttempts,
        backoff: {
          type: 'exponential',
          delay: webhook.retryConfig.initialDelay,
        },
        removeOnComplete: true,
        removeOnFail: false,
      }
    )
  }
  
  private setupWorker(redis: any) {
    const worker = new Worker(
      'webhook-delivery',
      async (job) => {
        const { webhook, payload, attempt } = job.data
        
        try {
          const payloadStr = JSON.stringify(payload)
          const headers = {
            'Content-Type': 'application/json',
            ...WebhookSecurity.generateWebhookHeaders(payloadStr, webhook.secret),
            ...webhook.headers,
          }
          
          const response = await axios.post(webhook.url, payload, {
            headers,
            timeout: webhook.timeout || 10000,
            validateStatus: (status) => status >= 200 && status < 300,
          })
          
          // Log successful delivery
          console.log(`Webhook delivered to ${webhook.url}: ${response.status}`)
          
          return {
            status: response.status,
            deliveredAt: new Date().toISOString(),
          }
        } catch (error) {
          const axiosError = error as AxiosError
          
          // Determine if error is retryable
          const isRetryable = this.isRetryableError(axiosError)
          
          if (!isRetryable) {
            // Move to DLQ for non-retryable errors
            await this.moveToDeadLetterQueue(webhook, payload, error)
            throw new Error(`Non-retryable error: ${axiosError.message}`)
          }
          
          // Log retry
          console.error(
            `Webhook delivery failed (attempt ${attempt + 1}/${webhook.retryConfig.maxAttempts}): ${axiosError.message}`
          )
          
          throw error
        }
      },
      {
        connection: redis,
        concurrency: 20,
      }
    )
    
    worker.on('failed', async (job, err) => {
      if (job && job.attemptsMade >= job.opts.attempts!) {
        // Max retries reached, move to DLQ
        await this.moveToDeadLetterQueue(
          job.data.webhook,
          job.data.payload,
          err
        )
      }
    })
  }
  
  private isRetryableError(error: AxiosError): boolean {
    if (!error.response) {
      // Network errors are retryable
      return true
    }
    
    // Retry on 5xx errors and specific 4xx errors
    const status = error.response.status
    return status >= 500 || status === 429 || status === 408
  }
  
  private async moveToDeadLetterQueue(
    webhook: WebhookConfig,
    payload: WebhookPayload,
    error: any
  ): Promise<void> {
    const dlqQueue = new Queue('webhook-dlq', {
      connection: this.webhookQueue.opts.connection,
    })
    
    await dlqQueue.add('failed-delivery', {
      webhook,
      payload,
      error: {
        message: error.message,
        code: error.code,
        status: error.response?.status,
      },
      failedAt: new Date().toISOString(),
    })
  }
  
  private matchesFilters(
    payload: WebhookPayload,
    filters?: Record<string, any>
  ): boolean {
    if (!filters) return true
    
    for (const [key, value] of Object.entries(filters)) {
      const payloadValue = this.getNestedValue(payload, key)
      if (payloadValue !== value) {
        return false
      }
    }
    
    return true
  }
  
  private getNestedValue(obj: any, path: string): any {
    return path.split('.').reduce((curr, part) => curr?.[part], obj)
  }
}

Reliability Best Practices

  • • Implement idempotency keys to prevent duplicate processing
  • • Use exponential backoff with jitter for retries
  • • Set reasonable timeouts (10-30 seconds)
  • • Monitor retry rates and adjust accordingly
  • • Implement circuit breakers for consistently failing endpoints

5. Event-Driven Architecture

Event Bus Implementation

// src/events/event-bus.ts
import { EventEmitter } from 'events'
import { Redis } from 'ioredis'

export interface LLMEvent {
  id: string
  type: string
  timestamp: string
  data: any
  metadata?: Record<string, any>
}

export class EventBus extends EventEmitter {
  private redis: Redis
  private subscriber: Redis
  
  constructor(redis: Redis) {
    super()
    this.redis = redis
    this.subscriber = redis.duplicate()
    this.setupSubscriptions()
  }
  
  async publish(event: LLMEvent): Promise<void> {
    // Emit locally
    this.emit(event.type, event)
    
    // Publish to Redis for distributed systems
    await this.redis.publish(
      `llm:events:${event.type}`,
      JSON.stringify(event)
    )
    
    // Store event for audit/replay
    await this.storeEvent(event)
  }
  
  async subscribe(pattern: string, handler: (event: LLMEvent) => void): Promise<void> {
    // Local subscription
    this.on(pattern, handler)
    
    // Redis subscription for distributed events
    await this.subscriber.psubscribe(`llm:events:${pattern}`)
  }
  
  private setupSubscriptions() {
    this.subscriber.on('pmessage', (pattern, channel, message) => {
      try {
        const event = JSON.parse(message)
        const eventType = channel.replace('llm:events:', '')
        this.emit(eventType, event)
      } catch (error) {
        console.error('Failed to process event:', error)
      }
    })
  }
  
  private async storeEvent(event: LLMEvent): Promise<void> {
    const key = `events:${event.type}:${event.timestamp}`
    await this.redis.setex(key, 86400 * 7, JSON.stringify(event)) // 7 days TTL
    
    // Add to sorted set for time-based queries
    await this.redis.zadd(
      `events:timeline`,
      new Date(event.timestamp).getTime(),
      event.id
    )
  }
  
  async replayEvents(
    from: Date,
    to: Date,
    filter?: (event: LLMEvent) => boolean
  ): Promise<LLMEvent[]> {
    const eventIds = await this.redis.zrangebyscore(
      'events:timeline',
      from.getTime(),
      to.getTime()
    )
    
    const events: LLMEvent[] = []
    for (const id of eventIds) {
      const eventData = await this.redis.get(`events:*:${id}`)
      if (eventData) {
        const event = JSON.parse(eventData)
        if (!filter || filter(event)) {
          events.push(event)
        }
      }
    }
    
    return events
  }
}

// Event handlers
export class LLMEventHandlers {
  constructor(
    private eventBus: EventBus,
    private webhookDispatcher: WebhookDispatcher
  ) {
    this.setupHandlers()
  }
  
  private setupHandlers() {
    // Handle completion events
    this.eventBus.subscribe('llm.completion', async (event) => {
      // Trigger webhooks
      await this.webhookDispatcher.dispatch({
        id: event.id,
        timestamp: event.timestamp,
        event: 'llm.completion',
        data: event.data,
      }, event.data.userId)
      
      // Update analytics
      await this.updateAnalytics(event)
      
      // Trigger dependent workflows
      await this.triggerWorkflows(event)
    })
    
    // Handle error events
    this.eventBus.subscribe('llm.error', async (event) => {
      // Alert monitoring
      await this.alertMonitoring(event)
      
      // Trigger error webhooks
      await this.webhookDispatcher.dispatch({
        id: event.id,
        timestamp: event.timestamp,
        event: 'llm.error',
        data: event.data,
      }, event.data.userId)
    })
  }
  
  private async updateAnalytics(event: LLMEvent): Promise<void> {
    // Implement analytics updates
  }
  
  private async triggerWorkflows(event: LLMEvent): Promise<void> {
    // Implement workflow triggers
  }
  
  private async alertMonitoring(event: LLMEvent): Promise<void> {
    // Implement monitoring alerts
  }
}

6. Queue Integration

Multi-Queue Architecture

// src/queues/queue-manager.ts
import { Queue, QueueScheduler, Worker, Job } from 'bullmq'
import { Redis } from 'ioredis'

export class QueueManager {
  private queues: Map<string, Queue> = new Map()
  private workers: Map<string, Worker> = new Map()
  private schedulers: Map<string, QueueScheduler> = new Map()
  
  constructor(private redis: Redis) {
    this.initializeQueues()
  }
  
  private initializeQueues() {
    // LLM processing queue
    this.createQueue('llm-processing', {
      defaultJobOptions: {
        removeOnComplete: 100,
        removeOnFail: 1000,
      },
    })
    
    // Webhook delivery queue
    this.createQueue('webhook-delivery', {
      defaultJobOptions: {
        attempts: 3,
        backoff: {
          type: 'exponential',
          delay: 1000,
        },
      },
    })
    
    // Priority queue for premium users
    this.createQueue('llm-priority', {
      defaultJobOptions: {
        priority: 1,
      },
    })
    
    // Batch processing queue
    this.createQueue('llm-batch', {
      defaultJobOptions: {
        delay: 0,
      },
    })
  }
  
  private createQueue(name: string, options: any) {
    const queue = new Queue(name, {
      connection: this.redis,
      ...options,
    })
    
    const scheduler = new QueueScheduler(name, {
      connection: this.redis,
    })
    
    this.queues.set(name, queue)
    this.schedulers.set(name, scheduler)
  }
  
  async addJob(
    queueName: string,
    jobName: string,
    data: any,
    options?: any
  ): Promise<Job> {
    const queue = this.queues.get(queueName)
    if (!queue) {
      throw new Error(`Queue ${queueName} not found`)
    }
    
    return queue.add(jobName, data, options)
  }
  
  createWorker(
    queueName: string,
    processor: (job: Job) => Promise<any>,
    concurrency: number = 5
  ): Worker {
    const worker = new Worker(queueName, processor, {
      connection: this.redis,
      concurrency,
    })
    
    this.workers.set(queueName, worker)
    return worker
  }
  
  async getQueueMetrics(queueName: string) {
    const queue = this.queues.get(queueName)
    if (!queue) {
      throw new Error(`Queue ${queueName} not found`)
    }
    
    const [
      waiting,
      active,
      completed,
      failed,
      delayed,
    ] = await Promise.all([
      queue.getWaitingCount(),
      queue.getActiveCount(),
      queue.getCompletedCount(),
      queue.getFailedCount(),
      queue.getDelayedCount(),
    ])
    
    return {
      waiting,
      active,
      completed,
      failed,
      delayed,
      total: waiting + active + delayed,
    }
  }
  
  async gracefulShutdown(): Promise<void> {
    // Close all workers
    for (const worker of this.workers.values()) {
      await worker.close()
    }
    
    // Close all schedulers
    for (const scheduler of this.schedulers.values()) {
      await scheduler.close()
    }
    
    // Close all queues
    for (const queue of this.queues.values()) {
      await queue.close()
    }
  }
}

// Priority queue implementation
export class PriorityQueueService {
  constructor(private queueManager: QueueManager) {}
  
  async submitHighPriority(data: any): Promise<Job> {
    return this.queueManager.addJob('llm-priority', 'process', data, {
      priority: 1,
    })
  }
  
  async submitNormalPriority(data: any): Promise<Job> {
    return this.queueManager.addJob('llm-processing', 'process', data, {
      priority: 10,
    })
  }
  
  async submitBatch(items: any[]): Promise<Job[]> {
    const jobs = items.map((item, index) => ({
      name: 'process-batch-item',
      data: { ...item, batchIndex: index },
      opts: { priority: 20 },
    }))
    
    const queue = this.queueManager['queues'].get('llm-batch')!
    return queue.addBulk(jobs)
  }
}

7. Platform-Specific Webhooks

Platform Webhook Adapters

// src/adapters/platform-adapters.ts
import { WebhookPayload } from '../types/webhook.types'

export interface PlatformAdapter {
  name: string
  transformPayload(payload: WebhookPayload): any
  validateConfig(config: any): boolean
  sendWebhook(url: string, transformedPayload: any): Promise<void>
}

// Slack Adapter
export class SlackAdapter implements PlatformAdapter {
  name = 'slack'
  
  transformPayload(payload: WebhookPayload): any {
    const { data } = payload
    
    return {
      text: `LLM Processing Complete`,
      blocks: [
        {
          type: 'header',
          text: {
            type: 'plain_text',
            text: '🤖 AI Response Ready',
          },
        },
        {
          type: 'section',
          fields: [
            {
              type: 'mrkdwn',
              text: `*Request ID:*\n${data.requestId}`,
            },
            {
              type: 'mrkdwn',
              text: `*Model:*\n${data.model}`,
            },
          ],
        },
        {
          type: 'section',
          text: {
            type: 'mrkdwn',
            text: `*Prompt:*\n\`\`\`${data.prompt.slice(0, 200)}...\`\`\``,
          },
        },
        {
          type: 'section',
          text: {
            type: 'mrkdwn',
            text: `*Response:*\n${data.completion?.slice(0, 500)}...`,
          },
        },
        {
          type: 'context',
          elements: [
            {
              type: 'mrkdwn',
              text: `Tokens: ${data.tokens?.total || 'N/A'} | Time: ${new Date(payload.timestamp).toLocaleString()}`,
            },
          ],
        },
      ],
    }
  }
  
  validateConfig(config: any): boolean {
    return config.url && config.url.includes('hooks.slack.com')
  }
  
  async sendWebhook(url: string, payload: any): Promise<void> {
    const response = await fetch(url, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(payload),
    })
    
    if (!response.ok) {
      throw new Error(`Slack webhook failed: ${response.statusText}`)
    }
  }
}

// Discord Adapter
export class DiscordAdapter implements PlatformAdapter {
  name = 'discord'
  
  transformPayload(payload: WebhookPayload): any {
    const { data } = payload
    
    return {
      username: 'ParrotRouter AI',
      avatar_url: 'https://parrotrouter.com/logo.png',
      embeds: [
        {
          title: '🤖 AI Response Ready',
          color: 0x00ff00,
          fields: [
            {
              name: 'Request ID',
              value: data.requestId,
              inline: true,
            },
            {
              name: 'Model',
              value: data.model,
              inline: true,
            },
            {
              name: 'Tokens Used',
              value: data.tokens?.total?.toString() || 'N/A',
              inline: true,
            },
            {
              name: 'Prompt',
              value: `\`\`${data.prompt.slice(0, 1000)}\`\``,
            },
            {
              name: 'Response',
              value: data.completion?.slice(0, 1000) || 'No response',
            },
          ],
          timestamp: payload.timestamp,
          footer: {
            text: 'Powered by ParrotRouter',
          },
        },
      ],
    }
  }
  
  validateConfig(config: any): boolean {
    return config.url && config.url.includes('discord.com/api/webhooks')
  }
  
  async sendWebhook(url: string, payload: any): Promise<void> {
    const response = await fetch(url, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(payload),
    })
    
    if (!response.ok) {
      throw new Error(`Discord webhook failed: ${response.statusText}`)
    }
  }
}

// Microsoft Teams Adapter
export class TeamsAdapter implements PlatformAdapter {
  name = 'teams'
  
  transformPayload(payload: WebhookPayload): any {
    const { data } = payload
    
    return {
      '@type': 'MessageCard',
      '@context': 'http://schema.org/extensions',
      themeColor: '0076D7',
      summary: 'AI Response Ready',
      sections: [
        {
          activityTitle: '🤖 ParrotRouter AI Response',
          facts: [
            {
              name: 'Request ID',
              value: data.requestId,
            },
            {
              name: 'Model',
              value: data.model,
            },
            {
              name: 'Tokens',
              value: data.tokens?.total?.toString() || 'N/A',
            },
          ],
        },
        {
          title: 'Prompt',
          text: data.prompt.slice(0, 500),
        },
        {
          title: 'Response',
          text: data.completion?.slice(0, 1000) || 'No response',
        },
      ],
      potentialAction: [
        {
          '@type': 'OpenUri',
          name: 'View in ParrotRouter',
          targets: [
            {
              os: 'default',
              uri: `https://parrotrouter.com/requests/${data.requestId}`,
            },
          ],
        },
      ],
    }
  }
  
  validateConfig(config: any): boolean {
    return config.url && config.url.includes('webhook.office.com')
  }
  
  async sendWebhook(url: string, payload: any): Promise<void> {
    const response = await fetch(url, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(payload),
    })
    
    if (!response.ok) {
      throw new Error(`Teams webhook failed: ${response.statusText}`)
    }
  }
}

// Adapter Manager
export class PlatformAdapterManager {
  private adapters = new Map<string, PlatformAdapter>()
  
  constructor() {
    this.registerAdapter(new SlackAdapter())
    this.registerAdapter(new DiscordAdapter())
    this.registerAdapter(new TeamsAdapter())
  }
  
  registerAdapter(adapter: PlatformAdapter) {
    this.adapters.set(adapter.name, adapter)
  }
  
  getAdapter(platform: string): PlatformAdapter | undefined {
    return this.adapters.get(platform)
  }
  
  async sendToPlatform(
    platform: string,
    url: string,
    payload: WebhookPayload
  ): Promise<void> {
    const adapter = this.getAdapter(platform)
    if (!adapter) {
      throw new Error(`No adapter found for platform: ${platform}`)
    }
    
    const transformedPayload = adapter.transformPayload(payload)
    await adapter.sendWebhook(url, transformedPayload)
  }
}

8. Error Handling & Dead Letter Queues

DLQ Management

// src/queues/dlq-manager.ts
import { Queue, Worker } from 'bullmq'
import { Redis } from 'ioredis'

export interface DLQEntry {
  id: string
  originalQueue: string
  payload: any
  error: {
    message: string
    code?: string
    stack?: string
  }
  attempts: number
  firstFailedAt: string
  lastFailedAt: string
  metadata?: Record<string, any>
}

export class DeadLetterQueueManager {
  private dlqQueue: Queue
  private dlqWorker?: Worker
  
  constructor(private redis: Redis) {
    this.dlqQueue = new Queue('dead-letter-queue', {
      connection: redis,
    })
  }
  
  async addToDeadLetter(
    originalQueue: string,
    job: any,
    error: Error
  ): Promise<void> {
    const entry: DLQEntry = {
      id: `dlq_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
      originalQueue,
      payload: job.data,
      error: {
        message: error.message,
        code: (error as any).code,
        stack: error.stack,
      },
      attempts: job.attemptsMade || 1,
      firstFailedAt: job.timestamp ? new Date(job.timestamp).toISOString() : new Date().toISOString(),
      lastFailedAt: new Date().toISOString(),
      metadata: {
        jobId: job.id,
        jobName: job.name,
      },
    }
    
    await this.dlqQueue.add('failed-job', entry)
    
    // Store in Redis for querying
    await this.redis.hset(
      `dlq:entries:${originalQueue}`,
      entry.id,
      JSON.stringify(entry)
    )
    
    // Emit metric
    await this.incrementDLQMetric(originalQueue, error)
  }
  
  async getDeadLetterEntries(
    queue?: string,
    limit: number = 100
  ): Promise<DLQEntry[]> {
    if (queue) {
      const entries = await this.redis.hgetall(`dlq:entries:${queue}`)
      return Object.values(entries)
        .map(e => JSON.parse(e))
        .sort((a, b) => new Date(b.lastFailedAt).getTime() - new Date(a.lastFailedAt).getTime())
        .slice(0, limit)
    }
    
    // Get from all queues
    const keys = await this.redis.keys('dlq:entries:*')
    const allEntries: DLQEntry[] = []
    
    for (const key of keys) {
      const entries = await this.redis.hgetall(key)
      allEntries.push(...Object.values(entries).map(e => JSON.parse(e)))
    }
    
    return allEntries
      .sort((a, b) => new Date(b.lastFailedAt).getTime() - new Date(a.lastFailedAt).getTime())
      .slice(0, limit)
  }
  
  async retryDeadLetterEntry(entryId: string): Promise<boolean> {
    // Find the entry
    const keys = await this.redis.keys('dlq:entries:*')
    let entry: DLQEntry | null = null
    let sourceKey: string | null = null
    
    for (const key of keys) {
      const entryData = await this.redis.hget(key, entryId)
      if (entryData) {
        entry = JSON.parse(entryData)
        sourceKey = key
        break
      }
    }
    
    if (!entry || !sourceKey) {
      return false
    }
    
    // Re-queue to original queue
    const originalQueue = new Queue(entry.originalQueue, {
      connection: this.redis,
    })
    
    await originalQueue.add(
      entry.metadata?.jobName || 'retry-from-dlq',
      entry.payload,
      {
        attempts: 3,
        backoff: {
          type: 'exponential',
          delay: 2000,
        },
      }
    )
    
    // Remove from DLQ
    await this.redis.hdel(sourceKey, entryId)
    
    return true
  }
  
  async purgeDeadLetterQueue(olderThan?: Date): Promise<number> {
    const entries = await this.getDeadLetterEntries()
    let purged = 0
    
    for (const entry of entries) {
      if (!olderThan || new Date(entry.lastFailedAt) < olderThan) {
        // Remove from Redis
        await this.redis.hdel(`dlq:entries:${entry.originalQueue}`, entry.id)
        purged++
      }
    }
    
    return purged
  }
  
  private async incrementDLQMetric(queue: string, error: Error): Promise<void> {
    const errorType = error.constructor.name
    const today = new Date().toISOString().split('T')[0]
    
    await this.redis.hincrby(`metrics:dlq:${today}`, queue, 1)
    await this.redis.hincrby(`metrics:dlq:${today}`, `${queue}:${errorType}`, 1)
    
    // Set expiry for metrics
    await this.redis.expire(`metrics:dlq:${today}`, 86400 * 30) // 30 days
  }
  
  async getDLQMetrics(date?: string): Promise<Record<string, number>> {
    const targetDate = date || new Date().toISOString().split('T')[0]
    const metrics = await this.redis.hgetall(`metrics:dlq:${targetDate}`)
    
    const result: Record<string, number> = {}
    for (const [key, value] of Object.entries(metrics)) {
      result[key] = parseInt(value, 10)
    }
    
    return result
  }
}

// DLQ Monitor Service
export class DLQMonitorService {
  constructor(
    private dlqManager: DeadLetterQueueManager,
    private alerting: AlertingService
  ) {}
  
  async startMonitoring(intervalMs: number = 60000) {
    setInterval(async () => {
      await this.checkDLQHealth()
    }, intervalMs)
  }
  
  private async checkDLQHealth() {
    const metrics = await this.dlqManager.getDLQMetrics()
    
    // Check for high failure rates
    for (const [queue, count] of Object.entries(metrics)) {
      if (count > 100) {
        await this.alerting.sendAlert({
          severity: 'high',
          title: `High DLQ rate for queue: ${queue}`,
          message: `${count} failures in the last 24 hours`,
          metadata: { queue, count },
        })
      }
    }
    
    // Check for old entries
    const oldEntries = await this.dlqManager.getDeadLetterEntries()
    const oneWeekAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000)
    
    const staleEntries = oldEntries.filter(
      e => new Date(e.lastFailedAt) < oneWeekAgo
    )
    
    if (staleEntries.length > 0) {
      await this.alerting.sendAlert({
        severity: 'medium',
        title: 'Stale entries in DLQ',
        message: `${staleEntries.length} entries older than 7 days`,
        metadata: { count: staleEntries.length },
      })
    }
  }
}

9. Monitoring & Observability

Webhook Monitoring

// src/monitoring/webhook-monitoring.ts
import { Histogram, Counter, Gauge, Registry } from 'prom-client'

export class WebhookMonitoring {
  private registry: Registry
  
  // Metrics
  private deliveryDuration: Histogram<string>
  private deliveryCounter: Counter<string>
  private failureCounter: Counter<string>
  private activeWebhooks: Gauge<string>
  private queueSize: Gauge<string>
  
  constructor() {
    this.registry = new Registry()
    
    // Initialize metrics
    this.deliveryDuration = new Histogram({
      name: 'webhook_delivery_duration_seconds',
      help: 'Duration of webhook delivery attempts',
      labelNames: ['webhook_url', 'event_type', 'status'],
      buckets: [0.1, 0.5, 1, 2, 5, 10, 30],
      registers: [this.registry],
    })
    
    this.deliveryCounter = new Counter({
      name: 'webhook_deliveries_total',
      help: 'Total number of webhook deliveries',
      labelNames: ['webhook_url', 'event_type', 'status'],
      registers: [this.registry],
    })
    
    this.failureCounter = new Counter({
      name: 'webhook_failures_total',
      help: 'Total number of webhook failures',
      labelNames: ['webhook_url', 'event_type', 'error_type'],
      registers: [this.registry],
    })
    
    this.activeWebhooks = new Gauge({
      name: 'active_webhooks_total',
      help: 'Number of active webhooks',
      labelNames: ['event_type'],
      registers: [this.registry],
    })
    
    this.queueSize = new Gauge({
      name: 'webhook_queue_size',
      help: 'Current size of webhook delivery queue',
      labelNames: ['queue_name'],
      registers: [this.registry],
    })
  }
  
  recordDelivery(
    url: string,
    eventType: string,
    status: number,
    duration: number
  ) {
    const labels = {
      webhook_url: this.sanitizeUrl(url),
      event_type: eventType,
      status: status.toString(),
    }
    
    this.deliveryDuration.observe(labels, duration / 1000)
    this.deliveryCounter.inc(labels)
  }
  
  recordFailure(
    url: string,
    eventType: string,
    errorType: string
  ) {
    this.failureCounter.inc({
      webhook_url: this.sanitizeUrl(url),
      event_type: eventType,
      error_type: errorType,
    })
  }
  
  updateActiveWebhooks(eventType: string, count: number) {
    this.activeWebhooks.set({ event_type: eventType }, count)
  }
  
  updateQueueSize(queueName: string, size: number) {
    this.queueSize.set({ queue_name: queueName }, size)
  }
  
  async getMetrics(): Promise<string> {
    return this.registry.metrics()
  }
  
  private sanitizeUrl(url: string): string {
    // Remove sensitive parts from URL for metrics
    try {
      const parsed = new URL(url)
      return `${parsed.protocol}//${parsed.hostname}${parsed.pathname}`
    } catch {
      return 'invalid_url'
    }
  }
}

// Distributed Tracing
export class WebhookTracing {
  private spans = new Map<string, any>()
  
  startSpan(requestId: string, operation: string): string {
    const spanId = `${requestId}-${operation}-${Date.now()}`
    
    this.spans.set(spanId, {
      requestId,
      operation,
      startTime: Date.now(),
      events: [],
    })
    
    return spanId
  }
  
  addEvent(spanId: string, event: string, attributes?: any) {
    const span = this.spans.get(spanId)
    if (span) {
      span.events.push({
        name: event,
        timestamp: Date.now(),
        attributes,
      })
    }
  }
  
  endSpan(spanId: string, status: 'ok' | 'error', error?: any) {
    const span = this.spans.get(spanId)
    if (span) {
      span.endTime = Date.now()
      span.duration = span.endTime - span.startTime
      span.status = status
      span.error = error
      
      // Send to tracing backend
      this.exportSpan(span)
      
      this.spans.delete(spanId)
    }
  }
  
  private exportSpan(span: any) {
    // Export to Jaeger, Zipkin, etc.
    console.log('Trace:', span)
  }
}

// Health Check Service
export class WebhookHealthCheck {
  constructor(
    private monitoring: WebhookMonitoring,
    private queueManager: QueueManager,
    private dlqManager: DeadLetterQueueManager
  ) {}
  
  async getHealth(): Promise<{
    status: 'healthy' | 'degraded' | 'unhealthy'
    checks: Record<string, any>
  }> {
    const checks: Record<string, any> = {}
    let status: 'healthy' | 'degraded' | 'unhealthy' = 'healthy'
    
    // Check queue health
    try {
      const queueMetrics = await this.queueManager.getQueueMetrics('webhook-delivery')
      checks.queue = {
        status: 'ok',
        metrics: queueMetrics,
      }
      
      if (queueMetrics.failed > 100) {
        status = 'degraded'
        checks.queue.status = 'high_failure_rate'
      }
      
      if (queueMetrics.waiting > 1000) {
        status = 'unhealthy'
        checks.queue.status = 'backlogged'
      }
    } catch (error) {
      checks.queue = { status: 'error', error: error.message }
      status = 'unhealthy'
    }
    
    // Check DLQ
    try {
      const dlqEntries = await this.dlqManager.getDeadLetterEntries(undefined, 10)
      checks.dlq = {
        status: 'ok',
        count: dlqEntries.length,
      }
      
      if (dlqEntries.length > 50) {
        status = 'degraded'
        checks.dlq.status = 'high_dlq_count'
      }
    } catch (error) {
      checks.dlq = { status: 'error', error: error.message }
    }
    
    return { status, checks }
  }
}

10. Scaling Strategies

Horizontal Scaling

// src/scaling/webhook-scaler.ts
import { Cluster } from 'cluster'
import * as os from 'os'

export class WebhookScaler {
  static setupCluster() {
    if (Cluster.isPrimary) {
      const numWorkers = process.env.WORKER_COUNT 
        ? parseInt(process.env.WORKER_COUNT) 
        : os.cpus().length
      
      console.log(`Starting ${numWorkers} webhook workers...`)
      
      // Fork workers
      for (let i = 0; i < numWorkers; i++) {
        Cluster.fork()
      }
      
      // Handle worker deaths
      Cluster.on('exit', (worker, code, signal) => {
        console.log(`Worker ${worker.process.pid} died`)
        console.log('Starting a new worker...')
        Cluster.fork()
      })
      
      // Graceful shutdown
      process.on('SIGTERM', () => {
        console.log('SIGTERM received, shutting down cluster...')
        for (const id in Cluster.workers) {
          Cluster.workers[id]?.kill()
        }
      })
    } else {
      // Worker process
      require('./worker')
    }
  }
}

// Load Balancer
export class WebhookLoadBalancer {
  private workers: Worker[] = []
  private currentWorker = 0
  
  addWorker(worker: Worker) {
    this.workers.push(worker)
  }
  
  getNextWorker(): Worker {
    const worker = this.workers[this.currentWorker]
    this.currentWorker = (this.currentWorker + 1) % this.workers.length
    return worker
  }
  
  async distributeJob(job: any) {
    const worker = this.getNextWorker()
    await worker.process(job)
  }
}

// Rate Limiter for Scaling
export class ScalableRateLimiter {
  private limits = new Map<string, { count: number; reset: number }>()
  
  constructor(
    private maxRequests: number,
    private windowMs: number
  ) {}
  
  async checkLimit(key: string): Promise<boolean> {
    const now = Date.now()
    const limit = this.limits.get(key)
    
    if (!limit || now > limit.reset) {
      this.limits.set(key, {
        count: 1,
        reset: now + this.windowMs,
      })
      return true
    }
    
    if (limit.count >= this.maxRequests) {
      return false
    }
    
    limit.count++
    return true
  }
  
  // Distributed rate limiting with Redis
  async checkDistributedLimit(redis: any, key: string): Promise<boolean> {
    const multi = redis.multi()
    const now = Date.now()
    const window = Math.floor(now / this.windowMs)
    const redisKey = `rate_limit:${key}:${window}`
    
    multi.incr(redisKey)
    multi.expire(redisKey, Math.ceil(this.windowMs / 1000))
    
    const results = await multi.exec()
    const count = results[0][1]
    
    return count <= this.maxRequests
  }
}

// Auto-scaling based on metrics
export class AutoScaler {
  constructor(
    private monitoring: WebhookMonitoring,
    private minWorkers: number = 2,
    private maxWorkers: number = 20
  ) {}
  
  async evaluateScaling(): Promise<{
    action: 'scale_up' | 'scale_down' | 'maintain'
    currentWorkers: number
    targetWorkers: number
  }> {
    const metrics = await this.getScalingMetrics()
    const currentWorkers = this.getCurrentWorkerCount()
    
    let targetWorkers = currentWorkers
    let action: 'scale_up' | 'scale_down' | 'maintain' = 'maintain'
    
    // Scale up conditions
    if (metrics.queueDepth > 1000 || metrics.avgResponseTime > 5000) {
      targetWorkers = Math.min(currentWorkers + 2, this.maxWorkers)
      action = 'scale_up'
    }
    // Scale down conditions
    else if (metrics.queueDepth < 100 && metrics.avgResponseTime < 1000) {
      targetWorkers = Math.max(currentWorkers - 1, this.minWorkers)
      action = 'scale_down'
    }
    
    return { action, currentWorkers, targetWorkers }
  }
  
  private async getScalingMetrics() {
    // Get metrics from monitoring
    return {
      queueDepth: 500,
      avgResponseTime: 2000,
      errorRate: 0.01,
    }
  }
  
  private getCurrentWorkerCount(): number {
    return Object.keys(Cluster.workers || {}).length
  }
}

✓ Production Deployment Checklist

  • ☐ Implement webhook signature verification
  • ☐ Set up retry logic with exponential backoff
  • ☐ Configure dead letter queues
  • ☐ Enable distributed tracing
  • ☐ Set up monitoring and alerting
  • ☐ Implement rate limiting
  • ☐ Configure auto-scaling policies
  • ☐ Test webhook endpoints under load
  • ☐ Document webhook payload formats
  • ☐ Set up webhook management UI

Complete Example: Production Webhook System

// src/index.ts - Complete webhook system
import express from 'express'
import { Redis } from 'ioredis'
import { WebhookRegistry } from './services/webhook-registry'
import { WebhookDispatcher } from './services/webhook-dispatcher'
import { AsyncLLMService } from './services/async-llm.service'
import { QueueManager } from './queues/queue-manager'
import { DeadLetterQueueManager } from './queues/dlq-manager'
import { WebhookMonitoring } from './monitoring/webhook-monitoring'
import { WebhookSecurity } from './security/webhook-security'
import { EventBus } from './events/event-bus'

async function startWebhookSystem() {
  const app = express()
  app.use(express.json())
  
  // Initialize Redis
  const redis = new Redis({
    host: process.env.REDIS_HOST || 'localhost',
    port: parseInt(process.env.REDIS_PORT || '6379'),
  })
  
  // Initialize services
  const registry = new WebhookRegistry(redis)
  const dispatcher = new WebhookDispatcher(redis)
  const llmService = new AsyncLLMService(redis)
  const queueManager = new QueueManager(redis)
  const dlqManager = new DeadLetterQueueManager(redis)
  const monitoring = new WebhookMonitoring()
  const eventBus = new EventBus(redis)
  
  // API Routes
  
  // Register webhook
  app.post('/webhooks', async (req, res) => {
    try {
      const { url, events, secret } = req.body
      
      const webhookId = await registry.register(req.user.id, {
        url,
        secret: secret || WebhookSecurity.generateSecret(),
        events,
        retryConfig: {
          maxAttempts: 3,
          initialDelay: 1000,
          maxDelay: 30000,
          backoffMultiplier: 2,
        },
      })
      
      res.json({ webhookId, message: 'Webhook registered successfully' })
    } catch (error) {
      res.status(400).json({ error: error.message })
    }
  })
  
  // Submit LLM request with webhook callback
  app.post('/llm/submit', async (req, res) => {
    try {
      const { prompt, model, webhookUrl, metadata } = req.body
      
      const result = await llmService.submitRequest({
        userId: req.user.id,
        prompt,
        model: model || 'gpt-3.5-turbo',
        webhookUrl,
        metadata,
      })
      
      res.json(result)
    } catch (error) {
      res.status(500).json({ error: error.message })
    }
  })
  
  // Webhook receiver endpoint (for testing)
  app.post('/webhook-test', 
    webhookAuthMiddleware((req) => process.env.TEST_WEBHOOK_SECRET!),
    async (req, res) => {
      console.log('Received webhook:', req.body)
      res.status(200).json({ received: true })
    }
  )
  
  // Health check
  app.get('/health', async (req, res) => {
    const health = await getSystemHealth()
    res.status(health.status === 'healthy' ? 200 : 503).json(health)
  })
  
  // Metrics endpoint
  app.get('/metrics', async (req, res) => {
    const metrics = await monitoring.getMetrics()
    res.set('Content-Type', 'text/plain')
    res.send(metrics)
  })
  
  // Start server
  const PORT = process.env.PORT || 3000
  app.listen(PORT, () => {
    console.log(`Webhook system listening on port ${PORT}`)
  })
  
  // Graceful shutdown
  process.on('SIGTERM', async () => {
    console.log('SIGTERM received, shutting down gracefully...')
    await queueManager.gracefulShutdown()
    await redis.quit()
    process.exit(0)
  })
}

// Start the system
if (require.main === module) {
  startWebhookSystem().catch(console.error)
}

export { startWebhookSystem }

References & Citations

Ready to Build Event-Driven LLM Apps?

Implement robust webhook patterns for your LLM applications with ParrotRouter's unified API gateway.

References
  1. [1] AWS. "Lambda Documentation" (2024)
  2. [2] Vercel. "Streaming Responses" (2024)
  3. [3] GitHub. "OpenAI Node.js Library" (2024)