Streaming
January 15, 202425 min read

Debug Streaming Response Errors

Streaming responses via Server-Sent Events (SSE) enable real-time LLM output but can encounter unique errors. This guide helps you debug connection issues, handle incomplete streams, and build robust streaming implementations.

Common Streaming Errors

Streaming implementations are documented in OpenAI's streaming guide andAnthropic's streaming docs.

Connection Dropped

SSE connection terminated unexpectedly

"ReadableStream closed"
Incomplete Response

Stream ended without finish_reason

Missing [DONE] marker
Parse Errors

Malformed SSE data chunks

"Invalid JSON in stream"
Timeout Issues

No data received within timeout

"Stream timeout"

Robust Streaming Implementation

Build resilient streaming handlers that recover from errors gracefully. Implementation patterns from OpenAI's Node SDK.

TypeScript: Complete Streaming Handler

import { EventSourceParserStream } from 'eventsource-parser/stream';

interface StreamOptions {
  onToken?: (token: string) => void;
  onError?: (error: Error) => void;
  onComplete?: (fullResponse: string) => void;
  maxRetries?: number;
  timeout?: number;
}

class RobustStreamHandler {
  private abortController: AbortController | null = null;
  private retryCount = 0;
  private buffer = '';
  private lastEventTime = Date.now();
  
  async streamCompletion(
    messages: Array<{ role: string; content: string }>,
    options: StreamOptions = {}
  ): Promise<void> {
    const {
      onToken = () => {},
      onError = () => {},
      onComplete = () => {},
      maxRetries = 3,
      timeout = 30000
    } = options;
    
    try {
      await this.attemptStream(messages, {
        onToken,
        onError,
        onComplete,
        maxRetries,
        timeout
      });
    } catch (error) {
      if (this.retryCount < maxRetries) {
        this.retryCount++;
        console.warn(`Stream failed, retrying (${this.retryCount}/${maxRetries})...`);
        
        // Exponential backoff
        await new Promise(resolve => 
          setTimeout(resolve, Math.pow(2, this.retryCount) * 1000)
        );
        
        // Retry with accumulated buffer
        return this.streamCompletion(messages, options);
      }
      
      onError(error as Error);
      throw error;
    }
  }
  
  private async attemptStream(
    messages: Array<{ role: string; content: string }>,
    options: StreamOptions
  ): Promise<void> {
    const { onToken, onError, onComplete, timeout } = options;
    
    this.abortController = new AbortController();
    
    // Set up timeout
    const timeoutId = setTimeout(() => {
      if (this.abortController) {
        this.abortController.abort();
        onError(new Error('Stream timeout'));
      }
    }, timeout);
    
    // Set up activity monitor
    const activityCheckInterval = setInterval(() => {
      if (Date.now() - this.lastEventTime > 10000) { // 10s no activity
        console.warn('No stream activity for 10s');
        if (this.abortController) {
          this.abortController.abort();
        }
      }
    }, 5000);
    
    try {
      const response = await fetch('https://api.openai.com/v1/chat/completions', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${process.env.OPENAI_API_KEY}`,
        },
        body: JSON.stringify({
          model: 'gpt-4',
          messages: messages,
          stream: true,
        }),
        signal: this.abortController.signal,
      });
      
      if (!response.ok) {
        const error = await response.json();
        throw new Error(`API error: ${error.error?.message || response.statusText}`);
      }
      
      if (!response.body) {
        throw new Error('No response body');
      }
      
      // Process stream
      await this.processStream(response.body, {
        onToken,
        onError,
        onComplete
      });
      
    } finally {
      clearTimeout(timeoutId);
      clearInterval(activityCheckInterval);
      this.abortController = null;
    }
  }
  
  private async processStream(
    body: ReadableStream<Uint8Array>,
    callbacks: {
      onToken: (token: string) => void;
      onError: (error: Error) => void;
      onComplete: (fullResponse: string) => void;
    }
  ): Promise<void> {
    const { onToken, onError, onComplete } = callbacks;
    
    const decoder = new TextDecoder();
    const reader = body.getReader();
    let fullResponse = this.buffer; // Start with any buffered content
    let isComplete = false;
    
    try {
      while (true) {
        const { done, value } = await reader.read();
        
        if (done) {
          if (!isComplete) {
            throw new Error('Stream ended without completion marker');
          }
          break;
        }
        
        this.lastEventTime = Date.now();
        const chunk = decoder.decode(value, { stream: true });
        
        // Parse SSE chunks
        const lines = chunk.split('\n');
        
        for (const line of lines) {
          if (line.startsWith('data: ')) {
            const data = line.slice(6);
            
            if (data === '[DONE]') {
              isComplete = true;
              onComplete(fullResponse);
              return;
            }
            
            try {
              const parsed = JSON.parse(data);
              const token = parsed.choices?.[0]?.delta?.content || '';
              
              if (token) {
                fullResponse += token;
                this.buffer = fullResponse; // Update buffer
                onToken(token);
              }
              
              // Check for completion
              if (parsed.choices?.[0]?.finish_reason) {
                isComplete = true;
              }
              
            } catch (parseError) {
              console.error('Failed to parse SSE data:', data);
              // Continue processing other chunks
            }
          }
        }
      }
      
      if (!isComplete) {
        throw new Error('Stream completed without finish_reason');
      }
      
      onComplete(fullResponse);
      
    } catch (error) {
      if (error.name === 'AbortError') {
        throw new Error('Stream aborted');
      }
      throw error;
    } finally {
      reader.releaseLock();
    }
  }
  
  abort(): void {
    if (this.abortController) {
      this.abortController.abort();
      this.abortController = null;
    }
  }
}

// Usage with error handling
const streamHandler = new RobustStreamHandler();

async function streamWithUI() {
  const messageContainer = document.getElementById('response');
  let fullResponse = '';
  
  try {
    await streamHandler.streamCompletion(
      [{ role: 'user', content: 'Tell me a story' }],
      {
        onToken: (token) => {
          fullResponse += token;
          messageContainer.textContent = fullResponse;
        },
        onError: (error) => {
          console.error('Stream error:', error);
          messageContainer.innerHTML += `
            <div class="error">
              Error: ${error.message}
              <button onclick="retryStream()">Retry</button>
            </div>
          `;
        },
        onComplete: (response) => {
          console.log('Stream completed:', response.length, 'chars');
          messageContainer.innerHTML += '<div class="complete">✓ Complete</div>';
        },
        maxRetries: 3,
        timeout: 60000 // 1 minute
      }
    );
  } catch (error) {
    console.error('Failed after retries:', error);
  }
}

Python: Async Streaming with Error Recovery

import asyncio
import aiohttp
import json
from typing import AsyncIterator, Optional, Callable
from datetime import datetime
import logging

class StreamingError(Exception):
    """Base exception for streaming errors"""
    pass

class IncompleteStreamError(StreamingError):
    """Stream ended without proper completion"""
    pass

class StreamTimeoutError(StreamingError):
    """Stream timed out waiting for data"""
    pass

class ResilientStreamHandler:
    """Handle streaming responses with automatic recovery"""
    
    def __init__(self, api_key: str, timeout: int = 60):
        self.api_key = api_key
        self.timeout = timeout
        self.logger = logging.getLogger(__name__)
        self.buffer = ""
        self.retry_count = 0
        self.max_retries = 3
    
    async def stream_completion(
        self,
        messages: list,
        model: str = "gpt-4",
        on_token: Optional[Callable[[str], None]] = None,
        on_error: Optional[Callable[[Exception], None]] = None,
        on_complete: Optional[Callable[[str], None]] = None
    ) -> str:
        """Stream completion with error recovery"""
        
        for attempt in range(self.max_retries):
            try:
                self.retry_count = attempt
                result = await self._attempt_stream(
                    messages, model, on_token, on_error, on_complete
                )
                return result
                
            except (StreamingError, aiohttp.ClientError) as e:
                self.logger.warning(
                    f"Stream attempt {attempt + 1} failed: {e}"
                )
                
                if on_error:
                    on_error(e)
                
                if attempt < self.max_retries - 1:
                    # Exponential backoff
                    await asyncio.sleep(2 ** attempt)
                    
                    # Resume from buffer if possible
                    if self.buffer and on_token:
                        on_token(f"[Resuming...] {self.buffer}")
                else:
                    raise
        
        raise StreamingError("Max retries exceeded")
    
    async def _attempt_stream(
        self,
        messages: list,
        model: str,
        on_token: Optional[Callable[[str], None]],
        on_error: Optional[Callable[[Exception], None]],
        on_complete: Optional[Callable[[str], None]]
    ) -> str:
        """Single streaming attempt"""
        
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
            "Accept": "text/event-stream"
        }
        
        data = {
            "model": model,
            "messages": messages,
            "stream": True
        }
        
        timeout_config = aiohttp.ClientTimeout(
            total=self.timeout,
            connect=10,
            sock_read=30
        )
        
        async with aiohttp.ClientSession(timeout=timeout_config) as session:
            async with session.post(
                "https://api.openai.com/v1/chat/completions",
                headers=headers,
                json=data
            ) as response:
                if response.status != 200:
                    error_data = await response.json()
                    raise StreamingError(
                        f"API error: {error_data.get('error', {}).get('message', 'Unknown')}"
                    )
                
                return await self._process_stream(
                    response, on_token, on_error, on_complete
                )
    
    async def _process_stream(
        self,
        response: aiohttp.ClientResponse,
        on_token: Optional[Callable[[str], None]],
        on_error: Optional[Callable[[Exception], None]],
        on_complete: Optional[Callable[[str], None]]
    ) -> str:
        """Process SSE stream with error handling"""
        
        full_response = self.buffer
        last_activity = datetime.now()
        is_complete = False
        
        try:
            async for line in response.content:
                # Update activity timestamp
                last_activity = datetime.now()
                
                # Decode line
                try:
                    line = line.decode('utf-8').strip()
                except UnicodeDecodeError:
                    self.logger.warning("Failed to decode line, skipping")
                    continue
                
                if not line:
                    continue
                
                # Parse SSE format
                if line.startswith('data: '):
                    data = line[6:]
                    
                    if data == '[DONE]':
                        is_complete = True
                        break
                    
                    try:
                        chunk = json.loads(data)
                        
                        # Extract token
                        if 'choices' in chunk and len(chunk['choices']) > 0:
                            delta = chunk['choices'][0].get('delta', {})
                            content = delta.get('content', '')
                            
                            if content:
                                full_response += content
                                self.buffer = full_response
                                
                                if on_token:
                                    on_token(content)
                            
                            # Check completion
                            finish_reason = chunk['choices'][0].get('finish_reason')
                            if finish_reason:
                                is_complete = True
                                
                    except json.JSONDecodeError as e:
                        self.logger.error(f"Failed to parse chunk: {data}")
                        if on_error:
                            on_error(e)
                
                # Check for timeout
                if (datetime.now() - last_activity).total_seconds() > 10:
                    raise StreamTimeoutError("No activity for 10 seconds")
            
            if not is_complete:
                raise IncompleteStreamError(
                    "Stream ended without completion marker"
                )
            
            if on_complete:
                on_complete(full_response)
            
            # Clear buffer on success
            self.buffer = ""
            
            return full_response
            
        except asyncio.TimeoutError:
            raise StreamTimeoutError(f"Stream timeout after {self.timeout}s")
        except Exception as e:
            self.logger.error(f"Stream processing error: {e}")
            raise


# Advanced usage with monitoring
class StreamMonitor:
    """Monitor streaming performance and errors"""
    
    def __init__(self):
        self.metrics = {
            "total_streams": 0,
            "successful_streams": 0,
            "failed_streams": 0,
            "retries": 0,
            "total_tokens": 0,
            "errors_by_type": {}
        }
        self.active_streams = set()
    
    async def monitored_stream(
        self,
        handler: ResilientStreamHandler,
        messages: list,
        stream_id: str
    ) -> str:
        """Stream with monitoring"""
        
        self.metrics["total_streams"] += 1
        self.active_streams.add(stream_id)
        
        start_time = datetime.now()
        tokens_received = 0
        
        try:
            result = await handler.stream_completion(
                messages=messages,
                on_token=lambda token: self._on_token(token, tokens_received),
                on_error=lambda error: self._on_error(error, stream_id),
                on_complete=lambda response: self._on_complete(
                    stream_id, start_time, len(response)
                )
            )
            
            self.metrics["successful_streams"] += 1
            return result
            
        except Exception as e:
            self.metrics["failed_streams"] += 1
            error_type = type(e).__name__
            self.metrics["errors_by_type"][error_type] =                 self.metrics["errors_by_type"].get(error_type, 0) + 1
            raise
            
        finally:
            self.active_streams.discard(stream_id)
    
    def _on_token(self, token: str, token_count: int):
        token_count += 1
        self.metrics["total_tokens"] += 1
    
    def _on_error(self, error: Exception, stream_id: str):
        self.metrics["retries"] += 1
        print(f"Stream {stream_id} error: {error}")
    
    def _on_complete(self, stream_id: str, start_time: datetime, response_length: int):
        duration = (datetime.now() - start_time).total_seconds()
        print(f"Stream {stream_id} completed: {response_length} chars in {duration:.1f}s")
    
    def get_stats(self) -> dict:
        """Get streaming statistics"""
        success_rate = (
            self.metrics["successful_streams"] / 
            max(self.metrics["total_streams"], 1) * 100
        )
        
        return {
            **self.metrics,
            "success_rate": f"{success_rate:.1f}%",
            "active_streams": len(self.active_streams),
            "avg_retries": self.metrics["retries"] / max(self.metrics["total_streams"], 1)
        }


# Usage example
async def main():
    handler = ResilientStreamHandler(api_key="your-key")
    monitor = StreamMonitor()
    
    messages = [{"role": "user", "content": "Write a poem"}]
    
    try:
        response = await monitor.monitored_stream(
            handler, messages, stream_id="poem-001"
        )
        print(f"Final response: {response}")
        
    except StreamingError as e:
        print(f"Streaming failed: {e}")
    
    print("\nStreaming stats:", monitor.get_stats())

# Run
asyncio.run(main())

Debugging Streaming Issues

Stream Event Logger

class StreamDebugger {
  private events: Array<{
    timestamp: number;
    type: string;
    data: any;
  }> = [];
  
  logEvent(type: string, data: any): void {
    this.events.push({
      timestamp: Date.now(),
      type,
      data
    });
  }
  
  async debugStream(url: string, options: RequestInit): Promise<void> {
    const startTime = Date.now();
    this.logEvent('stream_start', { url, options });
    
    try {
      const response = await fetch(url, options);
      this.logEvent('response_received', {
        status: response.status,
        headers: Object.fromEntries(response.headers.entries())
      });
      
      if (!response.ok) {
        const error = await response.json();
        this.logEvent('api_error', error);
        throw new Error(`API error: ${response.status}`);
      }
      
      const reader = response.body!.getReader();
      const decoder = new TextDecoder();
      let buffer = '';
      let chunkCount = 0;
      
      while (true) {
        const readStart = Date.now();
        const { done, value } = await reader.read();
        const readDuration = Date.now() - readStart;
        
        if (done) {
          this.logEvent('stream_end', {
            duration: Date.now() - startTime,
            chunks: chunkCount
          });
          break;
        }
        
        chunkCount++;
        const chunk = decoder.decode(value, { stream: true });
        buffer += chunk;
        
        this.logEvent('chunk_received', {
          chunkNumber: chunkCount,
          size: value.length,
          readDuration,
          preview: chunk.substring(0, 100)
        });
        
        // Parse SSE lines
        const lines = buffer.split('\n');
        buffer = lines.pop() || ''; // Keep incomplete line in buffer
        
        for (const line of lines) {
          if (line.startsWith('data: ')) {
            const data = line.slice(6);
            
            if (data === '[DONE]') {
              this.logEvent('stream_complete', { chunkCount });
              return;
            }
            
            try {
              const parsed = JSON.parse(data);
              this.logEvent('data_parsed', {
                hasContent: !!parsed.choices?.[0]?.delta?.content,
                finishReason: parsed.choices?.[0]?.finish_reason
              });
            } catch (e) {
              this.logEvent('parse_error', {
                data,
                error: e.message
              });
            }
          }
        }
      }
      
    } catch (error) {
      this.logEvent('stream_error', {
        error: error.message,
        duration: Date.now() - startTime
      });
      throw error;
    }
  }
  
  generateReport(): string {
    const report = ['=== Stream Debug Report ===\n'];
    
    // Summary
    const duration = this.events[this.events.length - 1].timestamp - this.events[0].timestamp;
    const chunks = this.events.filter(e => e.type === 'chunk_received').length;
    const errors = this.events.filter(e => e.type.includes('error')).length;
    
    report.push(`Duration: ${duration}ms`);
    report.push(`Chunks received: ${chunks}`);
    report.push(`Errors: ${errors}`);
    report.push('\n=== Event Timeline ===\n');
    
    // Timeline
    const startTime = this.events[0].timestamp;
    this.events.forEach(event => {
      const relativeTime = event.timestamp - startTime;
      report.push(`[${relativeTime}ms] ${event.type}: ${JSON.stringify(event.data, null, 2)}`);
    });
    
    // Chunk analysis
    const chunkEvents = this.events.filter(e => e.type === 'chunk_received');
    if (chunkEvents.length > 0) {
      report.push('\n=== Chunk Analysis ===\n');
      
      const sizes = chunkEvents.map(e => e.data.size);
      const durations = chunkEvents.map(e => e.data.readDuration);
      
      report.push(`Average chunk size: ${(sizes.reduce((a, b) => a + b) / sizes.length).toFixed(0)} bytes`);
      report.push(`Average read time: ${(durations.reduce((a, b) => a + b) / durations.length).toFixed(1)}ms`);
      report.push(`Min/Max chunk size: ${Math.min(...sizes)}/${Math.max(...sizes)} bytes`);
    }
    
    return report.join('\n');
  }
}

// Usage
const debugger = new StreamDebugger();

try {
  await debugger.debugStream('https://api.openai.com/v1/chat/completions', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${apiKey}`
    },
    body: JSON.stringify({
      model: 'gpt-4',
      messages: [{ role: 'user', content: 'Hello' }],
      stream: true
    })
  });
  
  console.log(debugger.generateReport());
  
} catch (error) {
  console.error('Stream failed:', error);
  console.log(debugger.generateReport());
}

Common Streaming Issues & Solutions

1. Connection Drops Mid-Stream

Symptoms: Stream stops receiving data without error or completion marker.

Causes: Network issues, proxy timeouts, server-side errors.

Solutions:

  • Implement activity monitoring with timeout detection
  • Use keep-alive mechanisms
  • Buffer partial responses for retry
  • Implement reconnection logic

2. Malformed SSE Data

Symptoms: JSON parse errors, missing data fields.

Causes: Network corruption, incomplete chunks, encoding issues.

Solutions:

  • Use proper SSE parser libraries
  • Buffer incomplete lines
  • Validate JSON before parsing
  • Skip corrupted chunks gracefully

3. Memory Issues with Large Streams

Symptoms: Browser/process runs out of memory, performance degradation.

Causes: Accumulating entire response in memory, no cleanup.

Solutions:

  • Process chunks incrementally
  • Implement streaming to disk/database
  • Use backpressure mechanisms
  • Limit buffer sizes

Monitoring Stream Health

interface StreamMetrics {
  streamId: string;
  startTime: number;
  endTime?: number;
  bytesReceived: number;
  chunksReceived: number;
  tokensReceived: number;
  errors: Array<{ time: number; error: string }>;
  reconnects: number;
  status: 'active' | 'completed' | 'failed';
}

class StreamHealthMonitor {
  private streams: Map<string, StreamMetrics> = new Map();
  private listeners: Array<(metrics: StreamMetrics) => void> = [];
  
  startMonitoring(streamId: string): void {
    this.streams.set(streamId, {
      streamId,
      startTime: Date.now(),
      bytesReceived: 0,
      chunksReceived: 0,
      tokensReceived: 0,
      errors: [],
      reconnects: 0,
      status: 'active'
    });
  }
  
  updateMetrics(
    streamId: string,
    update: Partial<StreamMetrics>
  ): void {
    const metrics = this.streams.get(streamId);
    if (!metrics) return;
    
    Object.assign(metrics, update);
    
    // Notify listeners
    this.listeners.forEach(listener => listener(metrics));
  }
  
  recordChunk(streamId: string, size: number, tokens: number): void {
    const metrics = this.streams.get(streamId);
    if (!metrics) return;
    
    metrics.bytesReceived += size;
    metrics.chunksReceived += 1;
    metrics.tokensReceived += tokens;
  }
  
  recordError(streamId: string, error: string): void {
    const metrics = this.streams.get(streamId);
    if (!metrics) return;
    
    metrics.errors.push({
      time: Date.now(),
      error
    });
  }
  
  completeStream(streamId: string, success: boolean): void {
    const metrics = this.streams.get(streamId);
    if (!metrics) return;
    
    metrics.endTime = Date.now();
    metrics.status = success ? 'completed' : 'failed';
  }
  
  getHealthReport(): {
    activeStreams: number;
    totalStreams: number;
    successRate: number;
    avgDuration: number;
    avgBytesPerStream: number;
    commonErrors: Array<{ error: string; count: number }>;
  } {
    const allStreams = Array.from(this.streams.values());
    const completedStreams = allStreams.filter(s => s.status !== 'active');
    const successfulStreams = allStreams.filter(s => s.status === 'completed');
    
    // Calculate averages
    const durations = completedStreams
      .filter(s => s.endTime)
      .map(s => s.endTime! - s.startTime);
    
    const avgDuration = durations.length > 0
      ? durations.reduce((a, b) => a + b) / durations.length
      : 0;
    
    const avgBytes = allStreams.length > 0
      ? allStreams.reduce((sum, s) => sum + s.bytesReceived, 0) / allStreams.length
      : 0;
    
    // Count errors
    const errorCounts = new Map<string, number>();
    allStreams.forEach(stream => {
      stream.errors.forEach(({ error }) => {
        errorCounts.set(error, (errorCounts.get(error) || 0) + 1);
      });
    });
    
    const commonErrors = Array.from(errorCounts.entries())
      .map(([error, count]) => ({ error, count }))
      .sort((a, b) => b.count - a.count)
      .slice(0, 5);
    
    return {
      activeStreams: allStreams.filter(s => s.status === 'active').length,
      totalStreams: allStreams.length,
      successRate: completedStreams.length > 0
        ? (successfulStreams.length / completedStreams.length) * 100
        : 0,
      avgDuration: Math.round(avgDuration),
      avgBytesPerStream: Math.round(avgBytes),
      commonErrors
    };
  }
  
  // Real-time monitoring dashboard
  createDashboard(): HTMLElement {
    const dashboard = document.createElement('div');
    dashboard.className = 'stream-monitor-dashboard';
    
    const update = () => {
      const report = this.getHealthReport();
      dashboard.innerHTML = `
        <h3>Stream Health Monitor</h3>
        <div class="metrics-grid">
          <div class="metric">
            <span class="label">Active Streams</span>
            <span class="value">${report.activeStreams}</span>
          </div>
          <div class="metric">
            <span class="label">Success Rate</span>
            <span class="value">${report.successRate.toFixed(1)}%</span>
          </div>
          <div class="metric">
            <span class="label">Avg Duration</span>
            <span class="value">${(report.avgDuration / 1000).toFixed(1)}s</span>
          </div>
          <div class="metric">
            <span class="label">Avg Size</span>
            <span class="value">${(report.avgBytesPerStream / 1024).toFixed(1)}KB</span>
          </div>
        </div>
        <div class="errors">
          <h4>Common Errors</h4>
          <ul>
            ${report.commonErrors.map(({ error, count }) => 
              `<li>${error}: ${count} times</li>`
            ).join('')}
          </ul>
        </div>
      `;
    };
    
    // Update every second
    setInterval(update, 1000);
    update();
    
    return dashboard;
  }
}

// Global monitor instance
const streamMonitor = new StreamHealthMonitor();

// Integration with streaming handler
async function monitoredStream(messages: any[]) {
  const streamId = `stream-${Date.now()}`;
  streamMonitor.startMonitoring(streamId);
  
  try {
    const result = await streamWithRetry(messages, {
      onChunk: (chunk, size) => {
        streamMonitor.recordChunk(streamId, size, countTokens(chunk));
      },
      onError: (error) => {
        streamMonitor.recordError(streamId, error.message);
      },
      onReconnect: () => {
        streamMonitor.updateMetrics(streamId, {
          reconnects: (streamMonitor.streams.get(streamId)?.reconnects || 0) + 1
        });
      }
    });
    
    streamMonitor.completeStream(streamId, true);
    return result;
    
  } catch (error) {
    streamMonitor.completeStream(streamId, false);
    throw error;
  }
}

Best Practices

Testing Streaming Endpoints

Always test your streaming implementation with:

  • Slow connections (throttle network speed)
  • Interrupted connections (disconnect mid-stream)
  • Large responses (test memory limits)
  • Concurrent streams (test resource usage)
  • Error responses (test error handling)
  • Timeout scenarios (test recovery)

SSE Specification

Learn the Server-Sent Events standard.

View spec →

Streaming Guide

Complete guide to LLM streaming.

Read guide →

Error Recovery

Build resilient streaming systems.

Learn more →

References

  1. [1] OpenAI. "Error Codes Reference" (2024)
  2. [2] Anthropic. "API Errors" (2024)
  3. [3] Stack Overflow. "OpenAI API Questions" (2024)