Quick Start
Set up a webhook receiver for LLM events:
npm init -y npm install express body-parser crypto dotenv npm install @types/node typescript nodemon ts-node -D npm install bull redis # For queue management
1. Webhook Architecture for LLMs
System Overview
// src/types/webhook.types.ts
export interface WebhookConfig {
url: string
secret: string
events: WebhookEvent[]
retryConfig: RetryConfig
headers?: Record<string, string>
timeout?: number
}
export interface WebhookEvent {
type: 'llm.completion' | 'llm.error' | 'llm.streaming.start' | 'llm.streaming.end'
filters?: Record<string, any>
}
export interface WebhookPayload {
id: string
timestamp: string
event: string
data: {
requestId: string
conversationId?: string
userId: string
model: string
prompt: string
completion?: string
tokens?: {
prompt: number
completion: number
total: number
}
error?: {
code: string
message: string
}
metadata?: Record<string, any>
}
}
export interface RetryConfig {
maxAttempts: number
initialDelay: number
maxDelay: number
backoffMultiplier: number
}Webhook Registry Service
// src/services/webhook-registry.ts
import { WebhookConfig } from '../types/webhook.types'
import { Redis } from 'ioredis'
export class WebhookRegistry {
private redis: Redis
constructor(redis: Redis) {
this.redis = redis
}
async register(userId: string, webhook: WebhookConfig): Promise<string> {
const webhookId = this.generateWebhookId()
const key = `webhook:${userId}:${webhookId}`
await this.redis.set(key, JSON.stringify({
...webhook,
id: webhookId,
userId,
createdAt: new Date().toISOString(),
active: true,
}))
// Add to user's webhook list
await this.redis.sadd(`user:${userId}:webhooks`, webhookId)
// Index by event type for efficient filtering
for (const event of webhook.events) {
await this.redis.sadd(`webhooks:event:${event.type}`, key)
}
return webhookId
}
async getWebhooksForEvent(eventType: string): Promise<WebhookConfig[]> {
const keys = await this.redis.smembers(`webhooks:event:${eventType}`)
const webhooks: WebhookConfig[] = []
for (const key of keys) {
const data = await this.redis.get(key)
if (data) {
const webhook = JSON.parse(data)
if (webhook.active) {
webhooks.push(webhook)
}
}
}
return webhooks
}
async deactivate(userId: string, webhookId: string): Promise<void> {
const key = `webhook:${userId}:${webhookId}`
const data = await this.redis.get(key)
if (data) {
const webhook = JSON.parse(data)
webhook.active = false
await this.redis.set(key, JSON.stringify(webhook))
}
}
private generateWebhookId(): string {
return `whk_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}
}2. Async LLM Processing Patterns
Async LLM Service with Callbacks
// src/services/async-llm.service.ts
import { Queue, Worker } from 'bullmq'
import { LLMService } from './llm.service'
import { WebhookDispatcher } from './webhook-dispatcher'
import { WebhookPayload } from '../types/webhook.types'
export class AsyncLLMService {
private llmQueue: Queue
private llmService: LLMService
private webhookDispatcher: WebhookDispatcher
constructor(redis: any) {
this.llmQueue = new Queue('llm-processing', {
connection: redis,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: false,
},
})
this.llmService = new LLMService()
this.webhookDispatcher = new WebhookDispatcher(redis)
this.setupWorker(redis)
}
async submitRequest(request: {
userId: string
prompt: string
model: string
webhookUrl?: string
metadata?: any
}): Promise<{ requestId: string }> {
const job = await this.llmQueue.add('process-llm', {
...request,
requestId: this.generateRequestId(),
timestamp: new Date().toISOString(),
})
return { requestId: job.data.requestId }
}
private setupWorker(redis: any) {
const worker = new Worker(
'llm-processing',
async (job) => {
const { userId, prompt, model, webhookUrl, metadata, requestId } = job.data
try {
// Notify webhook of start
await this.webhookDispatcher.dispatch({
id: this.generateEventId(),
timestamp: new Date().toISOString(),
event: 'llm.streaming.start',
data: {
requestId,
userId,
model,
prompt,
metadata,
},
}, userId)
// Process LLM request
const startTime = Date.now()
const response = await this.llmService.complete({
prompt,
model,
stream: false,
})
const duration = Date.now() - startTime
// Prepare webhook payload
const payload: WebhookPayload = {
id: this.generateEventId(),
timestamp: new Date().toISOString(),
event: 'llm.completion',
data: {
requestId,
userId,
model,
prompt,
completion: response.text,
tokens: response.tokens,
metadata: {
...metadata,
duration,
provider: 'parrotrouter',
},
},
}
// Dispatch to registered webhooks
await this.webhookDispatcher.dispatch(payload, userId)
// If callback URL provided, send there too
if (webhookUrl) {
await this.webhookDispatcher.sendToUrl(webhookUrl, payload)
}
return payload
} catch (error) {
// Handle error
const errorPayload: WebhookPayload = {
id: this.generateEventId(),
timestamp: new Date().toISOString(),
event: 'llm.error',
data: {
requestId,
userId,
model,
prompt,
error: {
code: error.code || 'UNKNOWN_ERROR',
message: error.message,
},
metadata,
},
}
await this.webhookDispatcher.dispatch(errorPayload, userId)
throw error
}
},
{
connection: redis,
concurrency: 10,
}
)
worker.on('completed', (job) => {
console.log(`LLM job ${job.id} completed`)
})
worker.on('failed', (job, err) => {
console.error(`LLM job ${job?.id} failed:`, err)
})
}
private generateRequestId(): string {
return `req_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}
private generateEventId(): string {
return `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}
}Streaming with Webhooks
// src/services/streaming-webhook.ts
export class StreamingWebhookService {
private activeStreams = new Map<string, NodeJS.Timer>()
async streamWithWebhooks(
request: StreamRequest,
onToken: (token: string) => void
): Promise<void> {
const streamId = this.generateStreamId()
let buffer = ''
let chunkCount = 0
// Send updates every N tokens or M milliseconds
const flushBuffer = async () => {
if (buffer.length > 0) {
await this.webhookDispatcher.dispatch({
id: this.generateEventId(),
timestamp: new Date().toISOString(),
event: 'llm.streaming.chunk',
data: {
requestId: request.requestId,
streamId,
chunkIndex: chunkCount++,
content: buffer,
},
}, request.userId)
buffer = ''
}
}
// Set up periodic flush
const flushInterval = setInterval(flushBuffer, 1000)
this.activeStreams.set(streamId, flushInterval)
try {
await this.llmService.stream(request, async (token) => {
buffer += token
onToken(token)
// Flush if buffer gets too large
if (buffer.length > 100) {
await flushBuffer()
}
})
// Final flush
await flushBuffer()
// Send completion event
await this.webhookDispatcher.dispatch({
id: this.generateEventId(),
timestamp: new Date().toISOString(),
event: 'llm.streaming.end',
data: {
requestId: request.requestId,
streamId,
totalChunks: chunkCount,
},
}, request.userId)
} finally {
clearInterval(flushInterval)
this.activeStreams.delete(streamId)
}
}
}3. Security & Validation
HMAC Signature Implementation
// src/security/webhook-security.ts
import crypto from 'crypto'
export class WebhookSecurity {
static generateSignature(
payload: string | Buffer,
secret: string,
algorithm: string = 'sha256'
): string {
const hmac = crypto.createHmac(algorithm, secret)
hmac.update(payload)
return hmac.digest('hex')
}
static verifySignature(
payload: string | Buffer,
signature: string,
secret: string,
algorithm: string = 'sha256'
): boolean {
const expectedSignature = this.generateSignature(payload, secret, algorithm)
// Use timing-safe comparison
return crypto.timingSafeEqual(
Buffer.from(expectedSignature, 'hex'),
Buffer.from(signature, 'hex')
)
}
static generateWebhookHeaders(
payload: string,
secret: string
): Record<string, string> {
const timestamp = Math.floor(Date.now() / 1000).toString()
const signaturePayload = `${timestamp}.${payload}`
const signature = this.generateSignature(signaturePayload, secret)
return {
'X-Webhook-Signature': signature,
'X-Webhook-Timestamp': timestamp,
'X-Webhook-Algorithm': 'sha256',
}
}
static validateWebhookRequest(
body: string,
headers: Record<string, string>,
secret: string,
maxAgeSeconds: number = 300
): { valid: boolean; error?: string } {
const signature = headers['x-webhook-signature']
const timestamp = headers['x-webhook-timestamp']
const algorithm = headers['x-webhook-algorithm'] || 'sha256'
if (!signature || !timestamp) {
return { valid: false, error: 'Missing signature or timestamp' }
}
// Check timestamp to prevent replay attacks
const currentTime = Math.floor(Date.now() / 1000)
const webhookTime = parseInt(timestamp, 10)
if (isNaN(webhookTime)) {
return { valid: false, error: 'Invalid timestamp' }
}
if (currentTime - webhookTime > maxAgeSeconds) {
return { valid: false, error: 'Request too old' }
}
// Verify signature
const signaturePayload = `${timestamp}.${body}`
const isValid = this.verifySignature(
signaturePayload,
signature,
secret,
algorithm
)
return { valid: isValid, error: isValid ? undefined : 'Invalid signature' }
}
}
// Middleware for Express
export function webhookAuthMiddleware(getSecret: (req: any) => string) {
return async (req: any, res: any, next: any) => {
const secret = getSecret(req)
if (!secret) {
return res.status(401).json({ error: 'Webhook not configured' })
}
const validation = WebhookSecurity.validateWebhookRequest(
JSON.stringify(req.body),
req.headers,
secret
)
if (!validation.valid) {
return res.status(401).json({ error: validation.error })
}
next()
}
}Request Validation
// src/validation/webhook-validation.ts
import Joi from 'joi'
export const webhookConfigSchema = Joi.object({
url: Joi.string().uri().required(),
secret: Joi.string().min(32).required(),
events: Joi.array().items(
Joi.object({
type: Joi.string().valid(
'llm.completion',
'llm.error',
'llm.streaming.start',
'llm.streaming.chunk',
'llm.streaming.end'
).required(),
filters: Joi.object().optional(),
})
).min(1).required(),
retryConfig: Joi.object({
maxAttempts: Joi.number().min(1).max(10).default(3),
initialDelay: Joi.number().min(100).max(10000).default(1000),
maxDelay: Joi.number().min(1000).max(60000).default(30000),
backoffMultiplier: Joi.number().min(1).max(5).default(2),
}).optional(),
headers: Joi.object().pattern(Joi.string(), Joi.string()).optional(),
timeout: Joi.number().min(1000).max(30000).default(10000),
})
export function validateWebhookPayload(payload: any): boolean {
// Ensure required fields exist
if (!payload.id || !payload.timestamp || !payload.event || !payload.data) {
return false
}
// Validate timestamp is recent (within last hour)
const timestamp = new Date(payload.timestamp).getTime()
const now = Date.now()
if (isNaN(timestamp) || Math.abs(now - timestamp) > 3600000) {
return false
}
// Validate event data based on type
switch (payload.event) {
case 'llm.completion':
return !!(
payload.data.requestId &&
payload.data.prompt &&
payload.data.completion &&
payload.data.model
)
case 'llm.error':
return !!(
payload.data.requestId &&
payload.data.error?.code &&
payload.data.error?.message
)
default:
return true
}
}4. Reliability & Retry Logic
Webhook Dispatcher with Retry
// src/services/webhook-dispatcher.ts
import axios, { AxiosError } from 'axios'
import { Queue, Worker } from 'bullmq'
import { WebhookRegistry } from './webhook-registry'
import { WebhookSecurity } from '../security/webhook-security'
import { WebhookPayload, WebhookConfig } from '../types/webhook.types'
export class WebhookDispatcher {
private webhookQueue: Queue
private registry: WebhookRegistry
constructor(redis: any) {
this.webhookQueue = new Queue('webhook-delivery', {
connection: redis,
})
this.registry = new WebhookRegistry(redis)
this.setupWorker(redis)
}
async dispatch(payload: WebhookPayload, userId: string): Promise<void> {
// Get all active webhooks for this event
const webhooks = await this.registry.getWebhooksForEvent(payload.event)
// Filter by user and any additional filters
const userWebhooks = webhooks.filter(w => w.userId === userId)
// Queue delivery jobs
for (const webhook of userWebhooks) {
if (this.matchesFilters(payload, webhook.filters)) {
await this.queueDelivery(webhook, payload)
}
}
}
async sendToUrl(url: string, payload: WebhookPayload): Promise<void> {
const tempWebhook: WebhookConfig = {
url,
secret: process.env.DEFAULT_WEBHOOK_SECRET || 'temp-secret',
events: [{ type: payload.event as any }],
retryConfig: {
maxAttempts: 3,
initialDelay: 1000,
maxDelay: 30000,
backoffMultiplier: 2,
},
}
await this.queueDelivery(tempWebhook, payload)
}
private async queueDelivery(
webhook: WebhookConfig,
payload: WebhookPayload
): Promise<void> {
await this.webhookQueue.add(
'deliver',
{
webhook,
payload,
attempt: 0,
},
{
attempts: webhook.retryConfig.maxAttempts,
backoff: {
type: 'exponential',
delay: webhook.retryConfig.initialDelay,
},
removeOnComplete: true,
removeOnFail: false,
}
)
}
private setupWorker(redis: any) {
const worker = new Worker(
'webhook-delivery',
async (job) => {
const { webhook, payload, attempt } = job.data
try {
const payloadStr = JSON.stringify(payload)
const headers = {
'Content-Type': 'application/json',
...WebhookSecurity.generateWebhookHeaders(payloadStr, webhook.secret),
...webhook.headers,
}
const response = await axios.post(webhook.url, payload, {
headers,
timeout: webhook.timeout || 10000,
validateStatus: (status) => status >= 200 && status < 300,
})
// Log successful delivery
console.log(`Webhook delivered to ${webhook.url}: ${response.status}`)
return {
status: response.status,
deliveredAt: new Date().toISOString(),
}
} catch (error) {
const axiosError = error as AxiosError
// Determine if error is retryable
const isRetryable = this.isRetryableError(axiosError)
if (!isRetryable) {
// Move to DLQ for non-retryable errors
await this.moveToDeadLetterQueue(webhook, payload, error)
throw new Error(`Non-retryable error: ${axiosError.message}`)
}
// Log retry
console.error(
`Webhook delivery failed (attempt ${attempt + 1}/${webhook.retryConfig.maxAttempts}): ${axiosError.message}`
)
throw error
}
},
{
connection: redis,
concurrency: 20,
}
)
worker.on('failed', async (job, err) => {
if (job && job.attemptsMade >= job.opts.attempts!) {
// Max retries reached, move to DLQ
await this.moveToDeadLetterQueue(
job.data.webhook,
job.data.payload,
err
)
}
})
}
private isRetryableError(error: AxiosError): boolean {
if (!error.response) {
// Network errors are retryable
return true
}
// Retry on 5xx errors and specific 4xx errors
const status = error.response.status
return status >= 500 || status === 429 || status === 408
}
private async moveToDeadLetterQueue(
webhook: WebhookConfig,
payload: WebhookPayload,
error: any
): Promise<void> {
const dlqQueue = new Queue('webhook-dlq', {
connection: this.webhookQueue.opts.connection,
})
await dlqQueue.add('failed-delivery', {
webhook,
payload,
error: {
message: error.message,
code: error.code,
status: error.response?.status,
},
failedAt: new Date().toISOString(),
})
}
private matchesFilters(
payload: WebhookPayload,
filters?: Record<string, any>
): boolean {
if (!filters) return true
for (const [key, value] of Object.entries(filters)) {
const payloadValue = this.getNestedValue(payload, key)
if (payloadValue !== value) {
return false
}
}
return true
}
private getNestedValue(obj: any, path: string): any {
return path.split('.').reduce((curr, part) => curr?.[part], obj)
}
}Reliability Best Practices
- • Implement idempotency keys to prevent duplicate processing
- • Use exponential backoff with jitter for retries
- • Set reasonable timeouts (10-30 seconds)
- • Monitor retry rates and adjust accordingly
- • Implement circuit breakers for consistently failing endpoints
5. Event-Driven Architecture
Event Bus Implementation
// src/events/event-bus.ts
import { EventEmitter } from 'events'
import { Redis } from 'ioredis'
export interface LLMEvent {
id: string
type: string
timestamp: string
data: any
metadata?: Record<string, any>
}
export class EventBus extends EventEmitter {
private redis: Redis
private subscriber: Redis
constructor(redis: Redis) {
super()
this.redis = redis
this.subscriber = redis.duplicate()
this.setupSubscriptions()
}
async publish(event: LLMEvent): Promise<void> {
// Emit locally
this.emit(event.type, event)
// Publish to Redis for distributed systems
await this.redis.publish(
`llm:events:${event.type}`,
JSON.stringify(event)
)
// Store event for audit/replay
await this.storeEvent(event)
}
async subscribe(pattern: string, handler: (event: LLMEvent) => void): Promise<void> {
// Local subscription
this.on(pattern, handler)
// Redis subscription for distributed events
await this.subscriber.psubscribe(`llm:events:${pattern}`)
}
private setupSubscriptions() {
this.subscriber.on('pmessage', (pattern, channel, message) => {
try {
const event = JSON.parse(message)
const eventType = channel.replace('llm:events:', '')
this.emit(eventType, event)
} catch (error) {
console.error('Failed to process event:', error)
}
})
}
private async storeEvent(event: LLMEvent): Promise<void> {
const key = `events:${event.type}:${event.timestamp}`
await this.redis.setex(key, 86400 * 7, JSON.stringify(event)) // 7 days TTL
// Add to sorted set for time-based queries
await this.redis.zadd(
`events:timeline`,
new Date(event.timestamp).getTime(),
event.id
)
}
async replayEvents(
from: Date,
to: Date,
filter?: (event: LLMEvent) => boolean
): Promise<LLMEvent[]> {
const eventIds = await this.redis.zrangebyscore(
'events:timeline',
from.getTime(),
to.getTime()
)
const events: LLMEvent[] = []
for (const id of eventIds) {
const eventData = await this.redis.get(`events:*:${id}`)
if (eventData) {
const event = JSON.parse(eventData)
if (!filter || filter(event)) {
events.push(event)
}
}
}
return events
}
}
// Event handlers
export class LLMEventHandlers {
constructor(
private eventBus: EventBus,
private webhookDispatcher: WebhookDispatcher
) {
this.setupHandlers()
}
private setupHandlers() {
// Handle completion events
this.eventBus.subscribe('llm.completion', async (event) => {
// Trigger webhooks
await this.webhookDispatcher.dispatch({
id: event.id,
timestamp: event.timestamp,
event: 'llm.completion',
data: event.data,
}, event.data.userId)
// Update analytics
await this.updateAnalytics(event)
// Trigger dependent workflows
await this.triggerWorkflows(event)
})
// Handle error events
this.eventBus.subscribe('llm.error', async (event) => {
// Alert monitoring
await this.alertMonitoring(event)
// Trigger error webhooks
await this.webhookDispatcher.dispatch({
id: event.id,
timestamp: event.timestamp,
event: 'llm.error',
data: event.data,
}, event.data.userId)
})
}
private async updateAnalytics(event: LLMEvent): Promise<void> {
// Implement analytics updates
}
private async triggerWorkflows(event: LLMEvent): Promise<void> {
// Implement workflow triggers
}
private async alertMonitoring(event: LLMEvent): Promise<void> {
// Implement monitoring alerts
}
}6. Queue Integration
Multi-Queue Architecture
// src/queues/queue-manager.ts
import { Queue, QueueScheduler, Worker, Job } from 'bullmq'
import { Redis } from 'ioredis'
export class QueueManager {
private queues: Map<string, Queue> = new Map()
private workers: Map<string, Worker> = new Map()
private schedulers: Map<string, QueueScheduler> = new Map()
constructor(private redis: Redis) {
this.initializeQueues()
}
private initializeQueues() {
// LLM processing queue
this.createQueue('llm-processing', {
defaultJobOptions: {
removeOnComplete: 100,
removeOnFail: 1000,
},
})
// Webhook delivery queue
this.createQueue('webhook-delivery', {
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
},
})
// Priority queue for premium users
this.createQueue('llm-priority', {
defaultJobOptions: {
priority: 1,
},
})
// Batch processing queue
this.createQueue('llm-batch', {
defaultJobOptions: {
delay: 0,
},
})
}
private createQueue(name: string, options: any) {
const queue = new Queue(name, {
connection: this.redis,
...options,
})
const scheduler = new QueueScheduler(name, {
connection: this.redis,
})
this.queues.set(name, queue)
this.schedulers.set(name, scheduler)
}
async addJob(
queueName: string,
jobName: string,
data: any,
options?: any
): Promise<Job> {
const queue = this.queues.get(queueName)
if (!queue) {
throw new Error(`Queue ${queueName} not found`)
}
return queue.add(jobName, data, options)
}
createWorker(
queueName: string,
processor: (job: Job) => Promise<any>,
concurrency: number = 5
): Worker {
const worker = new Worker(queueName, processor, {
connection: this.redis,
concurrency,
})
this.workers.set(queueName, worker)
return worker
}
async getQueueMetrics(queueName: string) {
const queue = this.queues.get(queueName)
if (!queue) {
throw new Error(`Queue ${queueName} not found`)
}
const [
waiting,
active,
completed,
failed,
delayed,
] = await Promise.all([
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getCompletedCount(),
queue.getFailedCount(),
queue.getDelayedCount(),
])
return {
waiting,
active,
completed,
failed,
delayed,
total: waiting + active + delayed,
}
}
async gracefulShutdown(): Promise<void> {
// Close all workers
for (const worker of this.workers.values()) {
await worker.close()
}
// Close all schedulers
for (const scheduler of this.schedulers.values()) {
await scheduler.close()
}
// Close all queues
for (const queue of this.queues.values()) {
await queue.close()
}
}
}
// Priority queue implementation
export class PriorityQueueService {
constructor(private queueManager: QueueManager) {}
async submitHighPriority(data: any): Promise<Job> {
return this.queueManager.addJob('llm-priority', 'process', data, {
priority: 1,
})
}
async submitNormalPriority(data: any): Promise<Job> {
return this.queueManager.addJob('llm-processing', 'process', data, {
priority: 10,
})
}
async submitBatch(items: any[]): Promise<Job[]> {
const jobs = items.map((item, index) => ({
name: 'process-batch-item',
data: { ...item, batchIndex: index },
opts: { priority: 20 },
}))
const queue = this.queueManager['queues'].get('llm-batch')!
return queue.addBulk(jobs)
}
}7. Platform-Specific Webhooks
Platform Webhook Adapters
// src/adapters/platform-adapters.ts
import { WebhookPayload } from '../types/webhook.types'
export interface PlatformAdapter {
name: string
transformPayload(payload: WebhookPayload): any
validateConfig(config: any): boolean
sendWebhook(url: string, transformedPayload: any): Promise<void>
}
// Slack Adapter
export class SlackAdapter implements PlatformAdapter {
name = 'slack'
transformPayload(payload: WebhookPayload): any {
const { data } = payload
return {
text: `LLM Processing Complete`,
blocks: [
{
type: 'header',
text: {
type: 'plain_text',
text: '🤖 AI Response Ready',
},
},
{
type: 'section',
fields: [
{
type: 'mrkdwn',
text: `*Request ID:*\n${data.requestId}`,
},
{
type: 'mrkdwn',
text: `*Model:*\n${data.model}`,
},
],
},
{
type: 'section',
text: {
type: 'mrkdwn',
text: `*Prompt:*\n\`\`\`${data.prompt.slice(0, 200)}...\`\`\``,
},
},
{
type: 'section',
text: {
type: 'mrkdwn',
text: `*Response:*\n${data.completion?.slice(0, 500)}...`,
},
},
{
type: 'context',
elements: [
{
type: 'mrkdwn',
text: `Tokens: ${data.tokens?.total || 'N/A'} | Time: ${new Date(payload.timestamp).toLocaleString()}`,
},
],
},
],
}
}
validateConfig(config: any): boolean {
return config.url && config.url.includes('hooks.slack.com')
}
async sendWebhook(url: string, payload: any): Promise<void> {
const response = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
})
if (!response.ok) {
throw new Error(`Slack webhook failed: ${response.statusText}`)
}
}
}
// Discord Adapter
export class DiscordAdapter implements PlatformAdapter {
name = 'discord'
transformPayload(payload: WebhookPayload): any {
const { data } = payload
return {
username: 'ParrotRouter AI',
avatar_url: 'https://parrotrouter.com/logo.png',
embeds: [
{
title: '🤖 AI Response Ready',
color: 0x00ff00,
fields: [
{
name: 'Request ID',
value: data.requestId,
inline: true,
},
{
name: 'Model',
value: data.model,
inline: true,
},
{
name: 'Tokens Used',
value: data.tokens?.total?.toString() || 'N/A',
inline: true,
},
{
name: 'Prompt',
value: `\`\`${data.prompt.slice(0, 1000)}\`\``,
},
{
name: 'Response',
value: data.completion?.slice(0, 1000) || 'No response',
},
],
timestamp: payload.timestamp,
footer: {
text: 'Powered by ParrotRouter',
},
},
],
}
}
validateConfig(config: any): boolean {
return config.url && config.url.includes('discord.com/api/webhooks')
}
async sendWebhook(url: string, payload: any): Promise<void> {
const response = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
})
if (!response.ok) {
throw new Error(`Discord webhook failed: ${response.statusText}`)
}
}
}
// Microsoft Teams Adapter
export class TeamsAdapter implements PlatformAdapter {
name = 'teams'
transformPayload(payload: WebhookPayload): any {
const { data } = payload
return {
'@type': 'MessageCard',
'@context': 'http://schema.org/extensions',
themeColor: '0076D7',
summary: 'AI Response Ready',
sections: [
{
activityTitle: '🤖 ParrotRouter AI Response',
facts: [
{
name: 'Request ID',
value: data.requestId,
},
{
name: 'Model',
value: data.model,
},
{
name: 'Tokens',
value: data.tokens?.total?.toString() || 'N/A',
},
],
},
{
title: 'Prompt',
text: data.prompt.slice(0, 500),
},
{
title: 'Response',
text: data.completion?.slice(0, 1000) || 'No response',
},
],
potentialAction: [
{
'@type': 'OpenUri',
name: 'View in ParrotRouter',
targets: [
{
os: 'default',
uri: `https://parrotrouter.com/requests/${data.requestId}`,
},
],
},
],
}
}
validateConfig(config: any): boolean {
return config.url && config.url.includes('webhook.office.com')
}
async sendWebhook(url: string, payload: any): Promise<void> {
const response = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
})
if (!response.ok) {
throw new Error(`Teams webhook failed: ${response.statusText}`)
}
}
}
// Adapter Manager
export class PlatformAdapterManager {
private adapters = new Map<string, PlatformAdapter>()
constructor() {
this.registerAdapter(new SlackAdapter())
this.registerAdapter(new DiscordAdapter())
this.registerAdapter(new TeamsAdapter())
}
registerAdapter(adapter: PlatformAdapter) {
this.adapters.set(adapter.name, adapter)
}
getAdapter(platform: string): PlatformAdapter | undefined {
return this.adapters.get(platform)
}
async sendToPlatform(
platform: string,
url: string,
payload: WebhookPayload
): Promise<void> {
const adapter = this.getAdapter(platform)
if (!adapter) {
throw new Error(`No adapter found for platform: ${platform}`)
}
const transformedPayload = adapter.transformPayload(payload)
await adapter.sendWebhook(url, transformedPayload)
}
}8. Error Handling & Dead Letter Queues
DLQ Management
// src/queues/dlq-manager.ts
import { Queue, Worker } from 'bullmq'
import { Redis } from 'ioredis'
export interface DLQEntry {
id: string
originalQueue: string
payload: any
error: {
message: string
code?: string
stack?: string
}
attempts: number
firstFailedAt: string
lastFailedAt: string
metadata?: Record<string, any>
}
export class DeadLetterQueueManager {
private dlqQueue: Queue
private dlqWorker?: Worker
constructor(private redis: Redis) {
this.dlqQueue = new Queue('dead-letter-queue', {
connection: redis,
})
}
async addToDeadLetter(
originalQueue: string,
job: any,
error: Error
): Promise<void> {
const entry: DLQEntry = {
id: `dlq_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
originalQueue,
payload: job.data,
error: {
message: error.message,
code: (error as any).code,
stack: error.stack,
},
attempts: job.attemptsMade || 1,
firstFailedAt: job.timestamp ? new Date(job.timestamp).toISOString() : new Date().toISOString(),
lastFailedAt: new Date().toISOString(),
metadata: {
jobId: job.id,
jobName: job.name,
},
}
await this.dlqQueue.add('failed-job', entry)
// Store in Redis for querying
await this.redis.hset(
`dlq:entries:${originalQueue}`,
entry.id,
JSON.stringify(entry)
)
// Emit metric
await this.incrementDLQMetric(originalQueue, error)
}
async getDeadLetterEntries(
queue?: string,
limit: number = 100
): Promise<DLQEntry[]> {
if (queue) {
const entries = await this.redis.hgetall(`dlq:entries:${queue}`)
return Object.values(entries)
.map(e => JSON.parse(e))
.sort((a, b) => new Date(b.lastFailedAt).getTime() - new Date(a.lastFailedAt).getTime())
.slice(0, limit)
}
// Get from all queues
const keys = await this.redis.keys('dlq:entries:*')
const allEntries: DLQEntry[] = []
for (const key of keys) {
const entries = await this.redis.hgetall(key)
allEntries.push(...Object.values(entries).map(e => JSON.parse(e)))
}
return allEntries
.sort((a, b) => new Date(b.lastFailedAt).getTime() - new Date(a.lastFailedAt).getTime())
.slice(0, limit)
}
async retryDeadLetterEntry(entryId: string): Promise<boolean> {
// Find the entry
const keys = await this.redis.keys('dlq:entries:*')
let entry: DLQEntry | null = null
let sourceKey: string | null = null
for (const key of keys) {
const entryData = await this.redis.hget(key, entryId)
if (entryData) {
entry = JSON.parse(entryData)
sourceKey = key
break
}
}
if (!entry || !sourceKey) {
return false
}
// Re-queue to original queue
const originalQueue = new Queue(entry.originalQueue, {
connection: this.redis,
})
await originalQueue.add(
entry.metadata?.jobName || 'retry-from-dlq',
entry.payload,
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
},
}
)
// Remove from DLQ
await this.redis.hdel(sourceKey, entryId)
return true
}
async purgeDeadLetterQueue(olderThan?: Date): Promise<number> {
const entries = await this.getDeadLetterEntries()
let purged = 0
for (const entry of entries) {
if (!olderThan || new Date(entry.lastFailedAt) < olderThan) {
// Remove from Redis
await this.redis.hdel(`dlq:entries:${entry.originalQueue}`, entry.id)
purged++
}
}
return purged
}
private async incrementDLQMetric(queue: string, error: Error): Promise<void> {
const errorType = error.constructor.name
const today = new Date().toISOString().split('T')[0]
await this.redis.hincrby(`metrics:dlq:${today}`, queue, 1)
await this.redis.hincrby(`metrics:dlq:${today}`, `${queue}:${errorType}`, 1)
// Set expiry for metrics
await this.redis.expire(`metrics:dlq:${today}`, 86400 * 30) // 30 days
}
async getDLQMetrics(date?: string): Promise<Record<string, number>> {
const targetDate = date || new Date().toISOString().split('T')[0]
const metrics = await this.redis.hgetall(`metrics:dlq:${targetDate}`)
const result: Record<string, number> = {}
for (const [key, value] of Object.entries(metrics)) {
result[key] = parseInt(value, 10)
}
return result
}
}
// DLQ Monitor Service
export class DLQMonitorService {
constructor(
private dlqManager: DeadLetterQueueManager,
private alerting: AlertingService
) {}
async startMonitoring(intervalMs: number = 60000) {
setInterval(async () => {
await this.checkDLQHealth()
}, intervalMs)
}
private async checkDLQHealth() {
const metrics = await this.dlqManager.getDLQMetrics()
// Check for high failure rates
for (const [queue, count] of Object.entries(metrics)) {
if (count > 100) {
await this.alerting.sendAlert({
severity: 'high',
title: `High DLQ rate for queue: ${queue}`,
message: `${count} failures in the last 24 hours`,
metadata: { queue, count },
})
}
}
// Check for old entries
const oldEntries = await this.dlqManager.getDeadLetterEntries()
const oneWeekAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000)
const staleEntries = oldEntries.filter(
e => new Date(e.lastFailedAt) < oneWeekAgo
)
if (staleEntries.length > 0) {
await this.alerting.sendAlert({
severity: 'medium',
title: 'Stale entries in DLQ',
message: `${staleEntries.length} entries older than 7 days`,
metadata: { count: staleEntries.length },
})
}
}
}9. Monitoring & Observability
Webhook Monitoring
// src/monitoring/webhook-monitoring.ts
import { Histogram, Counter, Gauge, Registry } from 'prom-client'
export class WebhookMonitoring {
private registry: Registry
// Metrics
private deliveryDuration: Histogram<string>
private deliveryCounter: Counter<string>
private failureCounter: Counter<string>
private activeWebhooks: Gauge<string>
private queueSize: Gauge<string>
constructor() {
this.registry = new Registry()
// Initialize metrics
this.deliveryDuration = new Histogram({
name: 'webhook_delivery_duration_seconds',
help: 'Duration of webhook delivery attempts',
labelNames: ['webhook_url', 'event_type', 'status'],
buckets: [0.1, 0.5, 1, 2, 5, 10, 30],
registers: [this.registry],
})
this.deliveryCounter = new Counter({
name: 'webhook_deliveries_total',
help: 'Total number of webhook deliveries',
labelNames: ['webhook_url', 'event_type', 'status'],
registers: [this.registry],
})
this.failureCounter = new Counter({
name: 'webhook_failures_total',
help: 'Total number of webhook failures',
labelNames: ['webhook_url', 'event_type', 'error_type'],
registers: [this.registry],
})
this.activeWebhooks = new Gauge({
name: 'active_webhooks_total',
help: 'Number of active webhooks',
labelNames: ['event_type'],
registers: [this.registry],
})
this.queueSize = new Gauge({
name: 'webhook_queue_size',
help: 'Current size of webhook delivery queue',
labelNames: ['queue_name'],
registers: [this.registry],
})
}
recordDelivery(
url: string,
eventType: string,
status: number,
duration: number
) {
const labels = {
webhook_url: this.sanitizeUrl(url),
event_type: eventType,
status: status.toString(),
}
this.deliveryDuration.observe(labels, duration / 1000)
this.deliveryCounter.inc(labels)
}
recordFailure(
url: string,
eventType: string,
errorType: string
) {
this.failureCounter.inc({
webhook_url: this.sanitizeUrl(url),
event_type: eventType,
error_type: errorType,
})
}
updateActiveWebhooks(eventType: string, count: number) {
this.activeWebhooks.set({ event_type: eventType }, count)
}
updateQueueSize(queueName: string, size: number) {
this.queueSize.set({ queue_name: queueName }, size)
}
async getMetrics(): Promise<string> {
return this.registry.metrics()
}
private sanitizeUrl(url: string): string {
// Remove sensitive parts from URL for metrics
try {
const parsed = new URL(url)
return `${parsed.protocol}//${parsed.hostname}${parsed.pathname}`
} catch {
return 'invalid_url'
}
}
}
// Distributed Tracing
export class WebhookTracing {
private spans = new Map<string, any>()
startSpan(requestId: string, operation: string): string {
const spanId = `${requestId}-${operation}-${Date.now()}`
this.spans.set(spanId, {
requestId,
operation,
startTime: Date.now(),
events: [],
})
return spanId
}
addEvent(spanId: string, event: string, attributes?: any) {
const span = this.spans.get(spanId)
if (span) {
span.events.push({
name: event,
timestamp: Date.now(),
attributes,
})
}
}
endSpan(spanId: string, status: 'ok' | 'error', error?: any) {
const span = this.spans.get(spanId)
if (span) {
span.endTime = Date.now()
span.duration = span.endTime - span.startTime
span.status = status
span.error = error
// Send to tracing backend
this.exportSpan(span)
this.spans.delete(spanId)
}
}
private exportSpan(span: any) {
// Export to Jaeger, Zipkin, etc.
console.log('Trace:', span)
}
}
// Health Check Service
export class WebhookHealthCheck {
constructor(
private monitoring: WebhookMonitoring,
private queueManager: QueueManager,
private dlqManager: DeadLetterQueueManager
) {}
async getHealth(): Promise<{
status: 'healthy' | 'degraded' | 'unhealthy'
checks: Record<string, any>
}> {
const checks: Record<string, any> = {}
let status: 'healthy' | 'degraded' | 'unhealthy' = 'healthy'
// Check queue health
try {
const queueMetrics = await this.queueManager.getQueueMetrics('webhook-delivery')
checks.queue = {
status: 'ok',
metrics: queueMetrics,
}
if (queueMetrics.failed > 100) {
status = 'degraded'
checks.queue.status = 'high_failure_rate'
}
if (queueMetrics.waiting > 1000) {
status = 'unhealthy'
checks.queue.status = 'backlogged'
}
} catch (error) {
checks.queue = { status: 'error', error: error.message }
status = 'unhealthy'
}
// Check DLQ
try {
const dlqEntries = await this.dlqManager.getDeadLetterEntries(undefined, 10)
checks.dlq = {
status: 'ok',
count: dlqEntries.length,
}
if (dlqEntries.length > 50) {
status = 'degraded'
checks.dlq.status = 'high_dlq_count'
}
} catch (error) {
checks.dlq = { status: 'error', error: error.message }
}
return { status, checks }
}
}10. Scaling Strategies
Horizontal Scaling
// src/scaling/webhook-scaler.ts
import { Cluster } from 'cluster'
import * as os from 'os'
export class WebhookScaler {
static setupCluster() {
if (Cluster.isPrimary) {
const numWorkers = process.env.WORKER_COUNT
? parseInt(process.env.WORKER_COUNT)
: os.cpus().length
console.log(`Starting ${numWorkers} webhook workers...`)
// Fork workers
for (let i = 0; i < numWorkers; i++) {
Cluster.fork()
}
// Handle worker deaths
Cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died`)
console.log('Starting a new worker...')
Cluster.fork()
})
// Graceful shutdown
process.on('SIGTERM', () => {
console.log('SIGTERM received, shutting down cluster...')
for (const id in Cluster.workers) {
Cluster.workers[id]?.kill()
}
})
} else {
// Worker process
require('./worker')
}
}
}
// Load Balancer
export class WebhookLoadBalancer {
private workers: Worker[] = []
private currentWorker = 0
addWorker(worker: Worker) {
this.workers.push(worker)
}
getNextWorker(): Worker {
const worker = this.workers[this.currentWorker]
this.currentWorker = (this.currentWorker + 1) % this.workers.length
return worker
}
async distributeJob(job: any) {
const worker = this.getNextWorker()
await worker.process(job)
}
}
// Rate Limiter for Scaling
export class ScalableRateLimiter {
private limits = new Map<string, { count: number; reset: number }>()
constructor(
private maxRequests: number,
private windowMs: number
) {}
async checkLimit(key: string): Promise<boolean> {
const now = Date.now()
const limit = this.limits.get(key)
if (!limit || now > limit.reset) {
this.limits.set(key, {
count: 1,
reset: now + this.windowMs,
})
return true
}
if (limit.count >= this.maxRequests) {
return false
}
limit.count++
return true
}
// Distributed rate limiting with Redis
async checkDistributedLimit(redis: any, key: string): Promise<boolean> {
const multi = redis.multi()
const now = Date.now()
const window = Math.floor(now / this.windowMs)
const redisKey = `rate_limit:${key}:${window}`
multi.incr(redisKey)
multi.expire(redisKey, Math.ceil(this.windowMs / 1000))
const results = await multi.exec()
const count = results[0][1]
return count <= this.maxRequests
}
}
// Auto-scaling based on metrics
export class AutoScaler {
constructor(
private monitoring: WebhookMonitoring,
private minWorkers: number = 2,
private maxWorkers: number = 20
) {}
async evaluateScaling(): Promise<{
action: 'scale_up' | 'scale_down' | 'maintain'
currentWorkers: number
targetWorkers: number
}> {
const metrics = await this.getScalingMetrics()
const currentWorkers = this.getCurrentWorkerCount()
let targetWorkers = currentWorkers
let action: 'scale_up' | 'scale_down' | 'maintain' = 'maintain'
// Scale up conditions
if (metrics.queueDepth > 1000 || metrics.avgResponseTime > 5000) {
targetWorkers = Math.min(currentWorkers + 2, this.maxWorkers)
action = 'scale_up'
}
// Scale down conditions
else if (metrics.queueDepth < 100 && metrics.avgResponseTime < 1000) {
targetWorkers = Math.max(currentWorkers - 1, this.minWorkers)
action = 'scale_down'
}
return { action, currentWorkers, targetWorkers }
}
private async getScalingMetrics() {
// Get metrics from monitoring
return {
queueDepth: 500,
avgResponseTime: 2000,
errorRate: 0.01,
}
}
private getCurrentWorkerCount(): number {
return Object.keys(Cluster.workers || {}).length
}
}✓ Production Deployment Checklist
- ☐ Implement webhook signature verification
- ☐ Set up retry logic with exponential backoff
- ☐ Configure dead letter queues
- ☐ Enable distributed tracing
- ☐ Set up monitoring and alerting
- ☐ Implement rate limiting
- ☐ Configure auto-scaling policies
- ☐ Test webhook endpoints under load
- ☐ Document webhook payload formats
- ☐ Set up webhook management UI
Complete Example: Production Webhook System
// src/index.ts - Complete webhook system
import express from 'express'
import { Redis } from 'ioredis'
import { WebhookRegistry } from './services/webhook-registry'
import { WebhookDispatcher } from './services/webhook-dispatcher'
import { AsyncLLMService } from './services/async-llm.service'
import { QueueManager } from './queues/queue-manager'
import { DeadLetterQueueManager } from './queues/dlq-manager'
import { WebhookMonitoring } from './monitoring/webhook-monitoring'
import { WebhookSecurity } from './security/webhook-security'
import { EventBus } from './events/event-bus'
async function startWebhookSystem() {
const app = express()
app.use(express.json())
// Initialize Redis
const redis = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
})
// Initialize services
const registry = new WebhookRegistry(redis)
const dispatcher = new WebhookDispatcher(redis)
const llmService = new AsyncLLMService(redis)
const queueManager = new QueueManager(redis)
const dlqManager = new DeadLetterQueueManager(redis)
const monitoring = new WebhookMonitoring()
const eventBus = new EventBus(redis)
// API Routes
// Register webhook
app.post('/webhooks', async (req, res) => {
try {
const { url, events, secret } = req.body
const webhookId = await registry.register(req.user.id, {
url,
secret: secret || WebhookSecurity.generateSecret(),
events,
retryConfig: {
maxAttempts: 3,
initialDelay: 1000,
maxDelay: 30000,
backoffMultiplier: 2,
},
})
res.json({ webhookId, message: 'Webhook registered successfully' })
} catch (error) {
res.status(400).json({ error: error.message })
}
})
// Submit LLM request with webhook callback
app.post('/llm/submit', async (req, res) => {
try {
const { prompt, model, webhookUrl, metadata } = req.body
const result = await llmService.submitRequest({
userId: req.user.id,
prompt,
model: model || 'gpt-3.5-turbo',
webhookUrl,
metadata,
})
res.json(result)
} catch (error) {
res.status(500).json({ error: error.message })
}
})
// Webhook receiver endpoint (for testing)
app.post('/webhook-test',
webhookAuthMiddleware((req) => process.env.TEST_WEBHOOK_SECRET!),
async (req, res) => {
console.log('Received webhook:', req.body)
res.status(200).json({ received: true })
}
)
// Health check
app.get('/health', async (req, res) => {
const health = await getSystemHealth()
res.status(health.status === 'healthy' ? 200 : 503).json(health)
})
// Metrics endpoint
app.get('/metrics', async (req, res) => {
const metrics = await monitoring.getMetrics()
res.set('Content-Type', 'text/plain')
res.send(metrics)
})
// Start server
const PORT = process.env.PORT || 3000
app.listen(PORT, () => {
console.log(`Webhook system listening on port ${PORT}`)
})
// Graceful shutdown
process.on('SIGTERM', async () => {
console.log('SIGTERM received, shutting down gracefully...')
await queueManager.gracefulShutdown()
await redis.quit()
process.exit(0)
})
}
// Start the system
if (require.main === module) {
startWebhookSystem().catch(console.error)
}
export { startWebhookSystem }References & Citations
Ready to Build Event-Driven LLM Apps?
Implement robust webhook patterns for your LLM applications with ParrotRouter's unified API gateway.
References
- [1] AWS. "Lambda Documentation" (2024)
- [2] Vercel. "Streaming Responses" (2024)
- [3] GitHub. "OpenAI Node.js Library" (2024)