Quick Start
Set up a webhook receiver for LLM events:
npm init -y npm install express body-parser crypto dotenv npm install @types/node typescript nodemon ts-node -D npm install bull redis # For queue management
1. Webhook Architecture for LLMs
System Overview
// src/types/webhook.types.ts export interface WebhookConfig { url: string secret: string events: WebhookEvent[] retryConfig: RetryConfig headers?: Record<string, string> timeout?: number } export interface WebhookEvent { type: 'llm.completion' | 'llm.error' | 'llm.streaming.start' | 'llm.streaming.end' filters?: Record<string, any> } export interface WebhookPayload { id: string timestamp: string event: string data: { requestId: string conversationId?: string userId: string model: string prompt: string completion?: string tokens?: { prompt: number completion: number total: number } error?: { code: string message: string } metadata?: Record<string, any> } } export interface RetryConfig { maxAttempts: number initialDelay: number maxDelay: number backoffMultiplier: number }
Webhook Registry Service
// src/services/webhook-registry.ts import { WebhookConfig } from '../types/webhook.types' import { Redis } from 'ioredis' export class WebhookRegistry { private redis: Redis constructor(redis: Redis) { this.redis = redis } async register(userId: string, webhook: WebhookConfig): Promise<string> { const webhookId = this.generateWebhookId() const key = `webhook:${userId}:${webhookId}` await this.redis.set(key, JSON.stringify({ ...webhook, id: webhookId, userId, createdAt: new Date().toISOString(), active: true, })) // Add to user's webhook list await this.redis.sadd(`user:${userId}:webhooks`, webhookId) // Index by event type for efficient filtering for (const event of webhook.events) { await this.redis.sadd(`webhooks:event:${event.type}`, key) } return webhookId } async getWebhooksForEvent(eventType: string): Promise<WebhookConfig[]> { const keys = await this.redis.smembers(`webhooks:event:${eventType}`) const webhooks: WebhookConfig[] = [] for (const key of keys) { const data = await this.redis.get(key) if (data) { const webhook = JSON.parse(data) if (webhook.active) { webhooks.push(webhook) } } } return webhooks } async deactivate(userId: string, webhookId: string): Promise<void> { const key = `webhook:${userId}:${webhookId}` const data = await this.redis.get(key) if (data) { const webhook = JSON.parse(data) webhook.active = false await this.redis.set(key, JSON.stringify(webhook)) } } private generateWebhookId(): string { return `whk_${Date.now()}_${Math.random().toString(36).substr(2, 9)}` } }
2. Async LLM Processing Patterns
Async LLM Service with Callbacks
// src/services/async-llm.service.ts import { Queue, Worker } from 'bullmq' import { LLMService } from './llm.service' import { WebhookDispatcher } from './webhook-dispatcher' import { WebhookPayload } from '../types/webhook.types' export class AsyncLLMService { private llmQueue: Queue private llmService: LLMService private webhookDispatcher: WebhookDispatcher constructor(redis: any) { this.llmQueue = new Queue('llm-processing', { connection: redis, defaultJobOptions: { removeOnComplete: true, removeOnFail: false, }, }) this.llmService = new LLMService() this.webhookDispatcher = new WebhookDispatcher(redis) this.setupWorker(redis) } async submitRequest(request: { userId: string prompt: string model: string webhookUrl?: string metadata?: any }): Promise<{ requestId: string }> { const job = await this.llmQueue.add('process-llm', { ...request, requestId: this.generateRequestId(), timestamp: new Date().toISOString(), }) return { requestId: job.data.requestId } } private setupWorker(redis: any) { const worker = new Worker( 'llm-processing', async (job) => { const { userId, prompt, model, webhookUrl, metadata, requestId } = job.data try { // Notify webhook of start await this.webhookDispatcher.dispatch({ id: this.generateEventId(), timestamp: new Date().toISOString(), event: 'llm.streaming.start', data: { requestId, userId, model, prompt, metadata, }, }, userId) // Process LLM request const startTime = Date.now() const response = await this.llmService.complete({ prompt, model, stream: false, }) const duration = Date.now() - startTime // Prepare webhook payload const payload: WebhookPayload = { id: this.generateEventId(), timestamp: new Date().toISOString(), event: 'llm.completion', data: { requestId, userId, model, prompt, completion: response.text, tokens: response.tokens, metadata: { ...metadata, duration, provider: 'parrotrouter', }, }, } // Dispatch to registered webhooks await this.webhookDispatcher.dispatch(payload, userId) // If callback URL provided, send there too if (webhookUrl) { await this.webhookDispatcher.sendToUrl(webhookUrl, payload) } return payload } catch (error) { // Handle error const errorPayload: WebhookPayload = { id: this.generateEventId(), timestamp: new Date().toISOString(), event: 'llm.error', data: { requestId, userId, model, prompt, error: { code: error.code || 'UNKNOWN_ERROR', message: error.message, }, metadata, }, } await this.webhookDispatcher.dispatch(errorPayload, userId) throw error } }, { connection: redis, concurrency: 10, } ) worker.on('completed', (job) => { console.log(`LLM job ${job.id} completed`) }) worker.on('failed', (job, err) => { console.error(`LLM job ${job?.id} failed:`, err) }) } private generateRequestId(): string { return `req_${Date.now()}_${Math.random().toString(36).substr(2, 9)}` } private generateEventId(): string { return `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}` } }
Streaming with Webhooks
// src/services/streaming-webhook.ts export class StreamingWebhookService { private activeStreams = new Map<string, NodeJS.Timer>() async streamWithWebhooks( request: StreamRequest, onToken: (token: string) => void ): Promise<void> { const streamId = this.generateStreamId() let buffer = '' let chunkCount = 0 // Send updates every N tokens or M milliseconds const flushBuffer = async () => { if (buffer.length > 0) { await this.webhookDispatcher.dispatch({ id: this.generateEventId(), timestamp: new Date().toISOString(), event: 'llm.streaming.chunk', data: { requestId: request.requestId, streamId, chunkIndex: chunkCount++, content: buffer, }, }, request.userId) buffer = '' } } // Set up periodic flush const flushInterval = setInterval(flushBuffer, 1000) this.activeStreams.set(streamId, flushInterval) try { await this.llmService.stream(request, async (token) => { buffer += token onToken(token) // Flush if buffer gets too large if (buffer.length > 100) { await flushBuffer() } }) // Final flush await flushBuffer() // Send completion event await this.webhookDispatcher.dispatch({ id: this.generateEventId(), timestamp: new Date().toISOString(), event: 'llm.streaming.end', data: { requestId: request.requestId, streamId, totalChunks: chunkCount, }, }, request.userId) } finally { clearInterval(flushInterval) this.activeStreams.delete(streamId) } } }
3. Security & Validation
HMAC Signature Implementation
// src/security/webhook-security.ts import crypto from 'crypto' export class WebhookSecurity { static generateSignature( payload: string | Buffer, secret: string, algorithm: string = 'sha256' ): string { const hmac = crypto.createHmac(algorithm, secret) hmac.update(payload) return hmac.digest('hex') } static verifySignature( payload: string | Buffer, signature: string, secret: string, algorithm: string = 'sha256' ): boolean { const expectedSignature = this.generateSignature(payload, secret, algorithm) // Use timing-safe comparison return crypto.timingSafeEqual( Buffer.from(expectedSignature, 'hex'), Buffer.from(signature, 'hex') ) } static generateWebhookHeaders( payload: string, secret: string ): Record<string, string> { const timestamp = Math.floor(Date.now() / 1000).toString() const signaturePayload = `${timestamp}.${payload}` const signature = this.generateSignature(signaturePayload, secret) return { 'X-Webhook-Signature': signature, 'X-Webhook-Timestamp': timestamp, 'X-Webhook-Algorithm': 'sha256', } } static validateWebhookRequest( body: string, headers: Record<string, string>, secret: string, maxAgeSeconds: number = 300 ): { valid: boolean; error?: string } { const signature = headers['x-webhook-signature'] const timestamp = headers['x-webhook-timestamp'] const algorithm = headers['x-webhook-algorithm'] || 'sha256' if (!signature || !timestamp) { return { valid: false, error: 'Missing signature or timestamp' } } // Check timestamp to prevent replay attacks const currentTime = Math.floor(Date.now() / 1000) const webhookTime = parseInt(timestamp, 10) if (isNaN(webhookTime)) { return { valid: false, error: 'Invalid timestamp' } } if (currentTime - webhookTime > maxAgeSeconds) { return { valid: false, error: 'Request too old' } } // Verify signature const signaturePayload = `${timestamp}.${body}` const isValid = this.verifySignature( signaturePayload, signature, secret, algorithm ) return { valid: isValid, error: isValid ? undefined : 'Invalid signature' } } } // Middleware for Express export function webhookAuthMiddleware(getSecret: (req: any) => string) { return async (req: any, res: any, next: any) => { const secret = getSecret(req) if (!secret) { return res.status(401).json({ error: 'Webhook not configured' }) } const validation = WebhookSecurity.validateWebhookRequest( JSON.stringify(req.body), req.headers, secret ) if (!validation.valid) { return res.status(401).json({ error: validation.error }) } next() } }
Request Validation
// src/validation/webhook-validation.ts import Joi from 'joi' export const webhookConfigSchema = Joi.object({ url: Joi.string().uri().required(), secret: Joi.string().min(32).required(), events: Joi.array().items( Joi.object({ type: Joi.string().valid( 'llm.completion', 'llm.error', 'llm.streaming.start', 'llm.streaming.chunk', 'llm.streaming.end' ).required(), filters: Joi.object().optional(), }) ).min(1).required(), retryConfig: Joi.object({ maxAttempts: Joi.number().min(1).max(10).default(3), initialDelay: Joi.number().min(100).max(10000).default(1000), maxDelay: Joi.number().min(1000).max(60000).default(30000), backoffMultiplier: Joi.number().min(1).max(5).default(2), }).optional(), headers: Joi.object().pattern(Joi.string(), Joi.string()).optional(), timeout: Joi.number().min(1000).max(30000).default(10000), }) export function validateWebhookPayload(payload: any): boolean { // Ensure required fields exist if (!payload.id || !payload.timestamp || !payload.event || !payload.data) { return false } // Validate timestamp is recent (within last hour) const timestamp = new Date(payload.timestamp).getTime() const now = Date.now() if (isNaN(timestamp) || Math.abs(now - timestamp) > 3600000) { return false } // Validate event data based on type switch (payload.event) { case 'llm.completion': return !!( payload.data.requestId && payload.data.prompt && payload.data.completion && payload.data.model ) case 'llm.error': return !!( payload.data.requestId && payload.data.error?.code && payload.data.error?.message ) default: return true } }
4. Reliability & Retry Logic
Webhook Dispatcher with Retry
// src/services/webhook-dispatcher.ts import axios, { AxiosError } from 'axios' import { Queue, Worker } from 'bullmq' import { WebhookRegistry } from './webhook-registry' import { WebhookSecurity } from '../security/webhook-security' import { WebhookPayload, WebhookConfig } from '../types/webhook.types' export class WebhookDispatcher { private webhookQueue: Queue private registry: WebhookRegistry constructor(redis: any) { this.webhookQueue = new Queue('webhook-delivery', { connection: redis, }) this.registry = new WebhookRegistry(redis) this.setupWorker(redis) } async dispatch(payload: WebhookPayload, userId: string): Promise<void> { // Get all active webhooks for this event const webhooks = await this.registry.getWebhooksForEvent(payload.event) // Filter by user and any additional filters const userWebhooks = webhooks.filter(w => w.userId === userId) // Queue delivery jobs for (const webhook of userWebhooks) { if (this.matchesFilters(payload, webhook.filters)) { await this.queueDelivery(webhook, payload) } } } async sendToUrl(url: string, payload: WebhookPayload): Promise<void> { const tempWebhook: WebhookConfig = { url, secret: process.env.DEFAULT_WEBHOOK_SECRET || 'temp-secret', events: [{ type: payload.event as any }], retryConfig: { maxAttempts: 3, initialDelay: 1000, maxDelay: 30000, backoffMultiplier: 2, }, } await this.queueDelivery(tempWebhook, payload) } private async queueDelivery( webhook: WebhookConfig, payload: WebhookPayload ): Promise<void> { await this.webhookQueue.add( 'deliver', { webhook, payload, attempt: 0, }, { attempts: webhook.retryConfig.maxAttempts, backoff: { type: 'exponential', delay: webhook.retryConfig.initialDelay, }, removeOnComplete: true, removeOnFail: false, } ) } private setupWorker(redis: any) { const worker = new Worker( 'webhook-delivery', async (job) => { const { webhook, payload, attempt } = job.data try { const payloadStr = JSON.stringify(payload) const headers = { 'Content-Type': 'application/json', ...WebhookSecurity.generateWebhookHeaders(payloadStr, webhook.secret), ...webhook.headers, } const response = await axios.post(webhook.url, payload, { headers, timeout: webhook.timeout || 10000, validateStatus: (status) => status >= 200 && status < 300, }) // Log successful delivery console.log(`Webhook delivered to ${webhook.url}: ${response.status}`) return { status: response.status, deliveredAt: new Date().toISOString(), } } catch (error) { const axiosError = error as AxiosError // Determine if error is retryable const isRetryable = this.isRetryableError(axiosError) if (!isRetryable) { // Move to DLQ for non-retryable errors await this.moveToDeadLetterQueue(webhook, payload, error) throw new Error(`Non-retryable error: ${axiosError.message}`) } // Log retry console.error( `Webhook delivery failed (attempt ${attempt + 1}/${webhook.retryConfig.maxAttempts}): ${axiosError.message}` ) throw error } }, { connection: redis, concurrency: 20, } ) worker.on('failed', async (job, err) => { if (job && job.attemptsMade >= job.opts.attempts!) { // Max retries reached, move to DLQ await this.moveToDeadLetterQueue( job.data.webhook, job.data.payload, err ) } }) } private isRetryableError(error: AxiosError): boolean { if (!error.response) { // Network errors are retryable return true } // Retry on 5xx errors and specific 4xx errors const status = error.response.status return status >= 500 || status === 429 || status === 408 } private async moveToDeadLetterQueue( webhook: WebhookConfig, payload: WebhookPayload, error: any ): Promise<void> { const dlqQueue = new Queue('webhook-dlq', { connection: this.webhookQueue.opts.connection, }) await dlqQueue.add('failed-delivery', { webhook, payload, error: { message: error.message, code: error.code, status: error.response?.status, }, failedAt: new Date().toISOString(), }) } private matchesFilters( payload: WebhookPayload, filters?: Record<string, any> ): boolean { if (!filters) return true for (const [key, value] of Object.entries(filters)) { const payloadValue = this.getNestedValue(payload, key) if (payloadValue !== value) { return false } } return true } private getNestedValue(obj: any, path: string): any { return path.split('.').reduce((curr, part) => curr?.[part], obj) } }
Reliability Best Practices
- • Implement idempotency keys to prevent duplicate processing
- • Use exponential backoff with jitter for retries
- • Set reasonable timeouts (10-30 seconds)
- • Monitor retry rates and adjust accordingly
- • Implement circuit breakers for consistently failing endpoints
5. Event-Driven Architecture
Event Bus Implementation
// src/events/event-bus.ts import { EventEmitter } from 'events' import { Redis } from 'ioredis' export interface LLMEvent { id: string type: string timestamp: string data: any metadata?: Record<string, any> } export class EventBus extends EventEmitter { private redis: Redis private subscriber: Redis constructor(redis: Redis) { super() this.redis = redis this.subscriber = redis.duplicate() this.setupSubscriptions() } async publish(event: LLMEvent): Promise<void> { // Emit locally this.emit(event.type, event) // Publish to Redis for distributed systems await this.redis.publish( `llm:events:${event.type}`, JSON.stringify(event) ) // Store event for audit/replay await this.storeEvent(event) } async subscribe(pattern: string, handler: (event: LLMEvent) => void): Promise<void> { // Local subscription this.on(pattern, handler) // Redis subscription for distributed events await this.subscriber.psubscribe(`llm:events:${pattern}`) } private setupSubscriptions() { this.subscriber.on('pmessage', (pattern, channel, message) => { try { const event = JSON.parse(message) const eventType = channel.replace('llm:events:', '') this.emit(eventType, event) } catch (error) { console.error('Failed to process event:', error) } }) } private async storeEvent(event: LLMEvent): Promise<void> { const key = `events:${event.type}:${event.timestamp}` await this.redis.setex(key, 86400 * 7, JSON.stringify(event)) // 7 days TTL // Add to sorted set for time-based queries await this.redis.zadd( `events:timeline`, new Date(event.timestamp).getTime(), event.id ) } async replayEvents( from: Date, to: Date, filter?: (event: LLMEvent) => boolean ): Promise<LLMEvent[]> { const eventIds = await this.redis.zrangebyscore( 'events:timeline', from.getTime(), to.getTime() ) const events: LLMEvent[] = [] for (const id of eventIds) { const eventData = await this.redis.get(`events:*:${id}`) if (eventData) { const event = JSON.parse(eventData) if (!filter || filter(event)) { events.push(event) } } } return events } } // Event handlers export class LLMEventHandlers { constructor( private eventBus: EventBus, private webhookDispatcher: WebhookDispatcher ) { this.setupHandlers() } private setupHandlers() { // Handle completion events this.eventBus.subscribe('llm.completion', async (event) => { // Trigger webhooks await this.webhookDispatcher.dispatch({ id: event.id, timestamp: event.timestamp, event: 'llm.completion', data: event.data, }, event.data.userId) // Update analytics await this.updateAnalytics(event) // Trigger dependent workflows await this.triggerWorkflows(event) }) // Handle error events this.eventBus.subscribe('llm.error', async (event) => { // Alert monitoring await this.alertMonitoring(event) // Trigger error webhooks await this.webhookDispatcher.dispatch({ id: event.id, timestamp: event.timestamp, event: 'llm.error', data: event.data, }, event.data.userId) }) } private async updateAnalytics(event: LLMEvent): Promise<void> { // Implement analytics updates } private async triggerWorkflows(event: LLMEvent): Promise<void> { // Implement workflow triggers } private async alertMonitoring(event: LLMEvent): Promise<void> { // Implement monitoring alerts } }
6. Queue Integration
Multi-Queue Architecture
// src/queues/queue-manager.ts import { Queue, QueueScheduler, Worker, Job } from 'bullmq' import { Redis } from 'ioredis' export class QueueManager { private queues: Map<string, Queue> = new Map() private workers: Map<string, Worker> = new Map() private schedulers: Map<string, QueueScheduler> = new Map() constructor(private redis: Redis) { this.initializeQueues() } private initializeQueues() { // LLM processing queue this.createQueue('llm-processing', { defaultJobOptions: { removeOnComplete: 100, removeOnFail: 1000, }, }) // Webhook delivery queue this.createQueue('webhook-delivery', { defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 1000, }, }, }) // Priority queue for premium users this.createQueue('llm-priority', { defaultJobOptions: { priority: 1, }, }) // Batch processing queue this.createQueue('llm-batch', { defaultJobOptions: { delay: 0, }, }) } private createQueue(name: string, options: any) { const queue = new Queue(name, { connection: this.redis, ...options, }) const scheduler = new QueueScheduler(name, { connection: this.redis, }) this.queues.set(name, queue) this.schedulers.set(name, scheduler) } async addJob( queueName: string, jobName: string, data: any, options?: any ): Promise<Job> { const queue = this.queues.get(queueName) if (!queue) { throw new Error(`Queue ${queueName} not found`) } return queue.add(jobName, data, options) } createWorker( queueName: string, processor: (job: Job) => Promise<any>, concurrency: number = 5 ): Worker { const worker = new Worker(queueName, processor, { connection: this.redis, concurrency, }) this.workers.set(queueName, worker) return worker } async getQueueMetrics(queueName: string) { const queue = this.queues.get(queueName) if (!queue) { throw new Error(`Queue ${queueName} not found`) } const [ waiting, active, completed, failed, delayed, ] = await Promise.all([ queue.getWaitingCount(), queue.getActiveCount(), queue.getCompletedCount(), queue.getFailedCount(), queue.getDelayedCount(), ]) return { waiting, active, completed, failed, delayed, total: waiting + active + delayed, } } async gracefulShutdown(): Promise<void> { // Close all workers for (const worker of this.workers.values()) { await worker.close() } // Close all schedulers for (const scheduler of this.schedulers.values()) { await scheduler.close() } // Close all queues for (const queue of this.queues.values()) { await queue.close() } } } // Priority queue implementation export class PriorityQueueService { constructor(private queueManager: QueueManager) {} async submitHighPriority(data: any): Promise<Job> { return this.queueManager.addJob('llm-priority', 'process', data, { priority: 1, }) } async submitNormalPriority(data: any): Promise<Job> { return this.queueManager.addJob('llm-processing', 'process', data, { priority: 10, }) } async submitBatch(items: any[]): Promise<Job[]> { const jobs = items.map((item, index) => ({ name: 'process-batch-item', data: { ...item, batchIndex: index }, opts: { priority: 20 }, })) const queue = this.queueManager['queues'].get('llm-batch')! return queue.addBulk(jobs) } }
7. Platform-Specific Webhooks
Platform Webhook Adapters
// src/adapters/platform-adapters.ts import { WebhookPayload } from '../types/webhook.types' export interface PlatformAdapter { name: string transformPayload(payload: WebhookPayload): any validateConfig(config: any): boolean sendWebhook(url: string, transformedPayload: any): Promise<void> } // Slack Adapter export class SlackAdapter implements PlatformAdapter { name = 'slack' transformPayload(payload: WebhookPayload): any { const { data } = payload return { text: `LLM Processing Complete`, blocks: [ { type: 'header', text: { type: 'plain_text', text: '🤖 AI Response Ready', }, }, { type: 'section', fields: [ { type: 'mrkdwn', text: `*Request ID:*\n${data.requestId}`, }, { type: 'mrkdwn', text: `*Model:*\n${data.model}`, }, ], }, { type: 'section', text: { type: 'mrkdwn', text: `*Prompt:*\n\`\`\`${data.prompt.slice(0, 200)}...\`\`\``, }, }, { type: 'section', text: { type: 'mrkdwn', text: `*Response:*\n${data.completion?.slice(0, 500)}...`, }, }, { type: 'context', elements: [ { type: 'mrkdwn', text: `Tokens: ${data.tokens?.total || 'N/A'} | Time: ${new Date(payload.timestamp).toLocaleString()}`, }, ], }, ], } } validateConfig(config: any): boolean { return config.url && config.url.includes('hooks.slack.com') } async sendWebhook(url: string, payload: any): Promise<void> { const response = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(payload), }) if (!response.ok) { throw new Error(`Slack webhook failed: ${response.statusText}`) } } } // Discord Adapter export class DiscordAdapter implements PlatformAdapter { name = 'discord' transformPayload(payload: WebhookPayload): any { const { data } = payload return { username: 'ParrotRouter AI', avatar_url: 'https://parrotrouter.com/logo.png', embeds: [ { title: '🤖 AI Response Ready', color: 0x00ff00, fields: [ { name: 'Request ID', value: data.requestId, inline: true, }, { name: 'Model', value: data.model, inline: true, }, { name: 'Tokens Used', value: data.tokens?.total?.toString() || 'N/A', inline: true, }, { name: 'Prompt', value: `\`\`${data.prompt.slice(0, 1000)}\`\``, }, { name: 'Response', value: data.completion?.slice(0, 1000) || 'No response', }, ], timestamp: payload.timestamp, footer: { text: 'Powered by ParrotRouter', }, }, ], } } validateConfig(config: any): boolean { return config.url && config.url.includes('discord.com/api/webhooks') } async sendWebhook(url: string, payload: any): Promise<void> { const response = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(payload), }) if (!response.ok) { throw new Error(`Discord webhook failed: ${response.statusText}`) } } } // Microsoft Teams Adapter export class TeamsAdapter implements PlatformAdapter { name = 'teams' transformPayload(payload: WebhookPayload): any { const { data } = payload return { '@type': 'MessageCard', '@context': 'http://schema.org/extensions', themeColor: '0076D7', summary: 'AI Response Ready', sections: [ { activityTitle: '🤖 ParrotRouter AI Response', facts: [ { name: 'Request ID', value: data.requestId, }, { name: 'Model', value: data.model, }, { name: 'Tokens', value: data.tokens?.total?.toString() || 'N/A', }, ], }, { title: 'Prompt', text: data.prompt.slice(0, 500), }, { title: 'Response', text: data.completion?.slice(0, 1000) || 'No response', }, ], potentialAction: [ { '@type': 'OpenUri', name: 'View in ParrotRouter', targets: [ { os: 'default', uri: `https://parrotrouter.com/requests/${data.requestId}`, }, ], }, ], } } validateConfig(config: any): boolean { return config.url && config.url.includes('webhook.office.com') } async sendWebhook(url: string, payload: any): Promise<void> { const response = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(payload), }) if (!response.ok) { throw new Error(`Teams webhook failed: ${response.statusText}`) } } } // Adapter Manager export class PlatformAdapterManager { private adapters = new Map<string, PlatformAdapter>() constructor() { this.registerAdapter(new SlackAdapter()) this.registerAdapter(new DiscordAdapter()) this.registerAdapter(new TeamsAdapter()) } registerAdapter(adapter: PlatformAdapter) { this.adapters.set(adapter.name, adapter) } getAdapter(platform: string): PlatformAdapter | undefined { return this.adapters.get(platform) } async sendToPlatform( platform: string, url: string, payload: WebhookPayload ): Promise<void> { const adapter = this.getAdapter(platform) if (!adapter) { throw new Error(`No adapter found for platform: ${platform}`) } const transformedPayload = adapter.transformPayload(payload) await adapter.sendWebhook(url, transformedPayload) } }
8. Error Handling & Dead Letter Queues
DLQ Management
// src/queues/dlq-manager.ts import { Queue, Worker } from 'bullmq' import { Redis } from 'ioredis' export interface DLQEntry { id: string originalQueue: string payload: any error: { message: string code?: string stack?: string } attempts: number firstFailedAt: string lastFailedAt: string metadata?: Record<string, any> } export class DeadLetterQueueManager { private dlqQueue: Queue private dlqWorker?: Worker constructor(private redis: Redis) { this.dlqQueue = new Queue('dead-letter-queue', { connection: redis, }) } async addToDeadLetter( originalQueue: string, job: any, error: Error ): Promise<void> { const entry: DLQEntry = { id: `dlq_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`, originalQueue, payload: job.data, error: { message: error.message, code: (error as any).code, stack: error.stack, }, attempts: job.attemptsMade || 1, firstFailedAt: job.timestamp ? new Date(job.timestamp).toISOString() : new Date().toISOString(), lastFailedAt: new Date().toISOString(), metadata: { jobId: job.id, jobName: job.name, }, } await this.dlqQueue.add('failed-job', entry) // Store in Redis for querying await this.redis.hset( `dlq:entries:${originalQueue}`, entry.id, JSON.stringify(entry) ) // Emit metric await this.incrementDLQMetric(originalQueue, error) } async getDeadLetterEntries( queue?: string, limit: number = 100 ): Promise<DLQEntry[]> { if (queue) { const entries = await this.redis.hgetall(`dlq:entries:${queue}`) return Object.values(entries) .map(e => JSON.parse(e)) .sort((a, b) => new Date(b.lastFailedAt).getTime() - new Date(a.lastFailedAt).getTime()) .slice(0, limit) } // Get from all queues const keys = await this.redis.keys('dlq:entries:*') const allEntries: DLQEntry[] = [] for (const key of keys) { const entries = await this.redis.hgetall(key) allEntries.push(...Object.values(entries).map(e => JSON.parse(e))) } return allEntries .sort((a, b) => new Date(b.lastFailedAt).getTime() - new Date(a.lastFailedAt).getTime()) .slice(0, limit) } async retryDeadLetterEntry(entryId: string): Promise<boolean> { // Find the entry const keys = await this.redis.keys('dlq:entries:*') let entry: DLQEntry | null = null let sourceKey: string | null = null for (const key of keys) { const entryData = await this.redis.hget(key, entryId) if (entryData) { entry = JSON.parse(entryData) sourceKey = key break } } if (!entry || !sourceKey) { return false } // Re-queue to original queue const originalQueue = new Queue(entry.originalQueue, { connection: this.redis, }) await originalQueue.add( entry.metadata?.jobName || 'retry-from-dlq', entry.payload, { attempts: 3, backoff: { type: 'exponential', delay: 2000, }, } ) // Remove from DLQ await this.redis.hdel(sourceKey, entryId) return true } async purgeDeadLetterQueue(olderThan?: Date): Promise<number> { const entries = await this.getDeadLetterEntries() let purged = 0 for (const entry of entries) { if (!olderThan || new Date(entry.lastFailedAt) < olderThan) { // Remove from Redis await this.redis.hdel(`dlq:entries:${entry.originalQueue}`, entry.id) purged++ } } return purged } private async incrementDLQMetric(queue: string, error: Error): Promise<void> { const errorType = error.constructor.name const today = new Date().toISOString().split('T')[0] await this.redis.hincrby(`metrics:dlq:${today}`, queue, 1) await this.redis.hincrby(`metrics:dlq:${today}`, `${queue}:${errorType}`, 1) // Set expiry for metrics await this.redis.expire(`metrics:dlq:${today}`, 86400 * 30) // 30 days } async getDLQMetrics(date?: string): Promise<Record<string, number>> { const targetDate = date || new Date().toISOString().split('T')[0] const metrics = await this.redis.hgetall(`metrics:dlq:${targetDate}`) const result: Record<string, number> = {} for (const [key, value] of Object.entries(metrics)) { result[key] = parseInt(value, 10) } return result } } // DLQ Monitor Service export class DLQMonitorService { constructor( private dlqManager: DeadLetterQueueManager, private alerting: AlertingService ) {} async startMonitoring(intervalMs: number = 60000) { setInterval(async () => { await this.checkDLQHealth() }, intervalMs) } private async checkDLQHealth() { const metrics = await this.dlqManager.getDLQMetrics() // Check for high failure rates for (const [queue, count] of Object.entries(metrics)) { if (count > 100) { await this.alerting.sendAlert({ severity: 'high', title: `High DLQ rate for queue: ${queue}`, message: `${count} failures in the last 24 hours`, metadata: { queue, count }, }) } } // Check for old entries const oldEntries = await this.dlqManager.getDeadLetterEntries() const oneWeekAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000) const staleEntries = oldEntries.filter( e => new Date(e.lastFailedAt) < oneWeekAgo ) if (staleEntries.length > 0) { await this.alerting.sendAlert({ severity: 'medium', title: 'Stale entries in DLQ', message: `${staleEntries.length} entries older than 7 days`, metadata: { count: staleEntries.length }, }) } } }
9. Monitoring & Observability
Webhook Monitoring
// src/monitoring/webhook-monitoring.ts import { Histogram, Counter, Gauge, Registry } from 'prom-client' export class WebhookMonitoring { private registry: Registry // Metrics private deliveryDuration: Histogram<string> private deliveryCounter: Counter<string> private failureCounter: Counter<string> private activeWebhooks: Gauge<string> private queueSize: Gauge<string> constructor() { this.registry = new Registry() // Initialize metrics this.deliveryDuration = new Histogram({ name: 'webhook_delivery_duration_seconds', help: 'Duration of webhook delivery attempts', labelNames: ['webhook_url', 'event_type', 'status'], buckets: [0.1, 0.5, 1, 2, 5, 10, 30], registers: [this.registry], }) this.deliveryCounter = new Counter({ name: 'webhook_deliveries_total', help: 'Total number of webhook deliveries', labelNames: ['webhook_url', 'event_type', 'status'], registers: [this.registry], }) this.failureCounter = new Counter({ name: 'webhook_failures_total', help: 'Total number of webhook failures', labelNames: ['webhook_url', 'event_type', 'error_type'], registers: [this.registry], }) this.activeWebhooks = new Gauge({ name: 'active_webhooks_total', help: 'Number of active webhooks', labelNames: ['event_type'], registers: [this.registry], }) this.queueSize = new Gauge({ name: 'webhook_queue_size', help: 'Current size of webhook delivery queue', labelNames: ['queue_name'], registers: [this.registry], }) } recordDelivery( url: string, eventType: string, status: number, duration: number ) { const labels = { webhook_url: this.sanitizeUrl(url), event_type: eventType, status: status.toString(), } this.deliveryDuration.observe(labels, duration / 1000) this.deliveryCounter.inc(labels) } recordFailure( url: string, eventType: string, errorType: string ) { this.failureCounter.inc({ webhook_url: this.sanitizeUrl(url), event_type: eventType, error_type: errorType, }) } updateActiveWebhooks(eventType: string, count: number) { this.activeWebhooks.set({ event_type: eventType }, count) } updateQueueSize(queueName: string, size: number) { this.queueSize.set({ queue_name: queueName }, size) } async getMetrics(): Promise<string> { return this.registry.metrics() } private sanitizeUrl(url: string): string { // Remove sensitive parts from URL for metrics try { const parsed = new URL(url) return `${parsed.protocol}//${parsed.hostname}${parsed.pathname}` } catch { return 'invalid_url' } } } // Distributed Tracing export class WebhookTracing { private spans = new Map<string, any>() startSpan(requestId: string, operation: string): string { const spanId = `${requestId}-${operation}-${Date.now()}` this.spans.set(spanId, { requestId, operation, startTime: Date.now(), events: [], }) return spanId } addEvent(spanId: string, event: string, attributes?: any) { const span = this.spans.get(spanId) if (span) { span.events.push({ name: event, timestamp: Date.now(), attributes, }) } } endSpan(spanId: string, status: 'ok' | 'error', error?: any) { const span = this.spans.get(spanId) if (span) { span.endTime = Date.now() span.duration = span.endTime - span.startTime span.status = status span.error = error // Send to tracing backend this.exportSpan(span) this.spans.delete(spanId) } } private exportSpan(span: any) { // Export to Jaeger, Zipkin, etc. console.log('Trace:', span) } } // Health Check Service export class WebhookHealthCheck { constructor( private monitoring: WebhookMonitoring, private queueManager: QueueManager, private dlqManager: DeadLetterQueueManager ) {} async getHealth(): Promise<{ status: 'healthy' | 'degraded' | 'unhealthy' checks: Record<string, any> }> { const checks: Record<string, any> = {} let status: 'healthy' | 'degraded' | 'unhealthy' = 'healthy' // Check queue health try { const queueMetrics = await this.queueManager.getQueueMetrics('webhook-delivery') checks.queue = { status: 'ok', metrics: queueMetrics, } if (queueMetrics.failed > 100) { status = 'degraded' checks.queue.status = 'high_failure_rate' } if (queueMetrics.waiting > 1000) { status = 'unhealthy' checks.queue.status = 'backlogged' } } catch (error) { checks.queue = { status: 'error', error: error.message } status = 'unhealthy' } // Check DLQ try { const dlqEntries = await this.dlqManager.getDeadLetterEntries(undefined, 10) checks.dlq = { status: 'ok', count: dlqEntries.length, } if (dlqEntries.length > 50) { status = 'degraded' checks.dlq.status = 'high_dlq_count' } } catch (error) { checks.dlq = { status: 'error', error: error.message } } return { status, checks } } }
10. Scaling Strategies
Horizontal Scaling
// src/scaling/webhook-scaler.ts import { Cluster } from 'cluster' import * as os from 'os' export class WebhookScaler { static setupCluster() { if (Cluster.isPrimary) { const numWorkers = process.env.WORKER_COUNT ? parseInt(process.env.WORKER_COUNT) : os.cpus().length console.log(`Starting ${numWorkers} webhook workers...`) // Fork workers for (let i = 0; i < numWorkers; i++) { Cluster.fork() } // Handle worker deaths Cluster.on('exit', (worker, code, signal) => { console.log(`Worker ${worker.process.pid} died`) console.log('Starting a new worker...') Cluster.fork() }) // Graceful shutdown process.on('SIGTERM', () => { console.log('SIGTERM received, shutting down cluster...') for (const id in Cluster.workers) { Cluster.workers[id]?.kill() } }) } else { // Worker process require('./worker') } } } // Load Balancer export class WebhookLoadBalancer { private workers: Worker[] = [] private currentWorker = 0 addWorker(worker: Worker) { this.workers.push(worker) } getNextWorker(): Worker { const worker = this.workers[this.currentWorker] this.currentWorker = (this.currentWorker + 1) % this.workers.length return worker } async distributeJob(job: any) { const worker = this.getNextWorker() await worker.process(job) } } // Rate Limiter for Scaling export class ScalableRateLimiter { private limits = new Map<string, { count: number; reset: number }>() constructor( private maxRequests: number, private windowMs: number ) {} async checkLimit(key: string): Promise<boolean> { const now = Date.now() const limit = this.limits.get(key) if (!limit || now > limit.reset) { this.limits.set(key, { count: 1, reset: now + this.windowMs, }) return true } if (limit.count >= this.maxRequests) { return false } limit.count++ return true } // Distributed rate limiting with Redis async checkDistributedLimit(redis: any, key: string): Promise<boolean> { const multi = redis.multi() const now = Date.now() const window = Math.floor(now / this.windowMs) const redisKey = `rate_limit:${key}:${window}` multi.incr(redisKey) multi.expire(redisKey, Math.ceil(this.windowMs / 1000)) const results = await multi.exec() const count = results[0][1] return count <= this.maxRequests } } // Auto-scaling based on metrics export class AutoScaler { constructor( private monitoring: WebhookMonitoring, private minWorkers: number = 2, private maxWorkers: number = 20 ) {} async evaluateScaling(): Promise<{ action: 'scale_up' | 'scale_down' | 'maintain' currentWorkers: number targetWorkers: number }> { const metrics = await this.getScalingMetrics() const currentWorkers = this.getCurrentWorkerCount() let targetWorkers = currentWorkers let action: 'scale_up' | 'scale_down' | 'maintain' = 'maintain' // Scale up conditions if (metrics.queueDepth > 1000 || metrics.avgResponseTime > 5000) { targetWorkers = Math.min(currentWorkers + 2, this.maxWorkers) action = 'scale_up' } // Scale down conditions else if (metrics.queueDepth < 100 && metrics.avgResponseTime < 1000) { targetWorkers = Math.max(currentWorkers - 1, this.minWorkers) action = 'scale_down' } return { action, currentWorkers, targetWorkers } } private async getScalingMetrics() { // Get metrics from monitoring return { queueDepth: 500, avgResponseTime: 2000, errorRate: 0.01, } } private getCurrentWorkerCount(): number { return Object.keys(Cluster.workers || {}).length } }
✓ Production Deployment Checklist
- ☐ Implement webhook signature verification
- ☐ Set up retry logic with exponential backoff
- ☐ Configure dead letter queues
- ☐ Enable distributed tracing
- ☐ Set up monitoring and alerting
- ☐ Implement rate limiting
- ☐ Configure auto-scaling policies
- ☐ Test webhook endpoints under load
- ☐ Document webhook payload formats
- ☐ Set up webhook management UI
Complete Example: Production Webhook System
// src/index.ts - Complete webhook system import express from 'express' import { Redis } from 'ioredis' import { WebhookRegistry } from './services/webhook-registry' import { WebhookDispatcher } from './services/webhook-dispatcher' import { AsyncLLMService } from './services/async-llm.service' import { QueueManager } from './queues/queue-manager' import { DeadLetterQueueManager } from './queues/dlq-manager' import { WebhookMonitoring } from './monitoring/webhook-monitoring' import { WebhookSecurity } from './security/webhook-security' import { EventBus } from './events/event-bus' async function startWebhookSystem() { const app = express() app.use(express.json()) // Initialize Redis const redis = new Redis({ host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), }) // Initialize services const registry = new WebhookRegistry(redis) const dispatcher = new WebhookDispatcher(redis) const llmService = new AsyncLLMService(redis) const queueManager = new QueueManager(redis) const dlqManager = new DeadLetterQueueManager(redis) const monitoring = new WebhookMonitoring() const eventBus = new EventBus(redis) // API Routes // Register webhook app.post('/webhooks', async (req, res) => { try { const { url, events, secret } = req.body const webhookId = await registry.register(req.user.id, { url, secret: secret || WebhookSecurity.generateSecret(), events, retryConfig: { maxAttempts: 3, initialDelay: 1000, maxDelay: 30000, backoffMultiplier: 2, }, }) res.json({ webhookId, message: 'Webhook registered successfully' }) } catch (error) { res.status(400).json({ error: error.message }) } }) // Submit LLM request with webhook callback app.post('/llm/submit', async (req, res) => { try { const { prompt, model, webhookUrl, metadata } = req.body const result = await llmService.submitRequest({ userId: req.user.id, prompt, model: model || 'gpt-3.5-turbo', webhookUrl, metadata, }) res.json(result) } catch (error) { res.status(500).json({ error: error.message }) } }) // Webhook receiver endpoint (for testing) app.post('/webhook-test', webhookAuthMiddleware((req) => process.env.TEST_WEBHOOK_SECRET!), async (req, res) => { console.log('Received webhook:', req.body) res.status(200).json({ received: true }) } ) // Health check app.get('/health', async (req, res) => { const health = await getSystemHealth() res.status(health.status === 'healthy' ? 200 : 503).json(health) }) // Metrics endpoint app.get('/metrics', async (req, res) => { const metrics = await monitoring.getMetrics() res.set('Content-Type', 'text/plain') res.send(metrics) }) // Start server const PORT = process.env.PORT || 3000 app.listen(PORT, () => { console.log(`Webhook system listening on port ${PORT}`) }) // Graceful shutdown process.on('SIGTERM', async () => { console.log('SIGTERM received, shutting down gracefully...') await queueManager.gracefulShutdown() await redis.quit() process.exit(0) }) } // Start the system if (require.main === module) { startWebhookSystem().catch(console.error) } export { startWebhookSystem }
References & Citations
Ready to Build Event-Driven LLM Apps?
Implement robust webhook patterns for your LLM applications with ParrotRouter's unified API gateway.
References
- [1] AWS. "Lambda Documentation" (2024)
- [2] Vercel. "Streaming Responses" (2024)
- [3] GitHub. "OpenAI Node.js Library" (2024)