Quick Fix
Common Streaming Errors
Streaming implementations are documented in OpenAI's streaming guide andAnthropic's streaming docs.
SSE connection terminated unexpectedly
"ReadableStream closed"
Stream ended without finish_reason
Missing [DONE] marker
Malformed SSE data chunks
"Invalid JSON in stream"
No data received within timeout
"Stream timeout"
Robust Streaming Implementation
Build resilient streaming handlers that recover from errors gracefully. Implementation patterns from OpenAI's Node SDK.
TypeScript: Complete Streaming Handler
import { EventSourceParserStream } from 'eventsource-parser/stream';
interface StreamOptions {
onToken?: (token: string) => void;
onError?: (error: Error) => void;
onComplete?: (fullResponse: string) => void;
maxRetries?: number;
timeout?: number;
}
class RobustStreamHandler {
private abortController: AbortController | null = null;
private retryCount = 0;
private buffer = '';
private lastEventTime = Date.now();
async streamCompletion(
messages: Array<{ role: string; content: string }>,
options: StreamOptions = {}
): Promise<void> {
const {
onToken = () => {},
onError = () => {},
onComplete = () => {},
maxRetries = 3,
timeout = 30000
} = options;
try {
await this.attemptStream(messages, {
onToken,
onError,
onComplete,
maxRetries,
timeout
});
} catch (error) {
if (this.retryCount < maxRetries) {
this.retryCount++;
console.warn(`Stream failed, retrying (${this.retryCount}/${maxRetries})...`);
// Exponential backoff
await new Promise(resolve =>
setTimeout(resolve, Math.pow(2, this.retryCount) * 1000)
);
// Retry with accumulated buffer
return this.streamCompletion(messages, options);
}
onError(error as Error);
throw error;
}
}
private async attemptStream(
messages: Array<{ role: string; content: string }>,
options: StreamOptions
): Promise<void> {
const { onToken, onError, onComplete, timeout } = options;
this.abortController = new AbortController();
// Set up timeout
const timeoutId = setTimeout(() => {
if (this.abortController) {
this.abortController.abort();
onError(new Error('Stream timeout'));
}
}, timeout);
// Set up activity monitor
const activityCheckInterval = setInterval(() => {
if (Date.now() - this.lastEventTime > 10000) { // 10s no activity
console.warn('No stream activity for 10s');
if (this.abortController) {
this.abortController.abort();
}
}
}, 5000);
try {
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.OPENAI_API_KEY}`,
},
body: JSON.stringify({
model: 'gpt-4',
messages: messages,
stream: true,
}),
signal: this.abortController.signal,
});
if (!response.ok) {
const error = await response.json();
throw new Error(`API error: ${error.error?.message || response.statusText}`);
}
if (!response.body) {
throw new Error('No response body');
}
// Process stream
await this.processStream(response.body, {
onToken,
onError,
onComplete
});
} finally {
clearTimeout(timeoutId);
clearInterval(activityCheckInterval);
this.abortController = null;
}
}
private async processStream(
body: ReadableStream<Uint8Array>,
callbacks: {
onToken: (token: string) => void;
onError: (error: Error) => void;
onComplete: (fullResponse: string) => void;
}
): Promise<void> {
const { onToken, onError, onComplete } = callbacks;
const decoder = new TextDecoder();
const reader = body.getReader();
let fullResponse = this.buffer; // Start with any buffered content
let isComplete = false;
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
if (!isComplete) {
throw new Error('Stream ended without completion marker');
}
break;
}
this.lastEventTime = Date.now();
const chunk = decoder.decode(value, { stream: true });
// Parse SSE chunks
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
isComplete = true;
onComplete(fullResponse);
return;
}
try {
const parsed = JSON.parse(data);
const token = parsed.choices?.[0]?.delta?.content || '';
if (token) {
fullResponse += token;
this.buffer = fullResponse; // Update buffer
onToken(token);
}
// Check for completion
if (parsed.choices?.[0]?.finish_reason) {
isComplete = true;
}
} catch (parseError) {
console.error('Failed to parse SSE data:', data);
// Continue processing other chunks
}
}
}
}
if (!isComplete) {
throw new Error('Stream completed without finish_reason');
}
onComplete(fullResponse);
} catch (error) {
if (error.name === 'AbortError') {
throw new Error('Stream aborted');
}
throw error;
} finally {
reader.releaseLock();
}
}
abort(): void {
if (this.abortController) {
this.abortController.abort();
this.abortController = null;
}
}
}
// Usage with error handling
const streamHandler = new RobustStreamHandler();
async function streamWithUI() {
const messageContainer = document.getElementById('response');
let fullResponse = '';
try {
await streamHandler.streamCompletion(
[{ role: 'user', content: 'Tell me a story' }],
{
onToken: (token) => {
fullResponse += token;
messageContainer.textContent = fullResponse;
},
onError: (error) => {
console.error('Stream error:', error);
messageContainer.innerHTML += `
<div class="error">
Error: ${error.message}
<button onclick="retryStream()">Retry</button>
</div>
`;
},
onComplete: (response) => {
console.log('Stream completed:', response.length, 'chars');
messageContainer.innerHTML += '<div class="complete">✓ Complete</div>';
},
maxRetries: 3,
timeout: 60000 // 1 minute
}
);
} catch (error) {
console.error('Failed after retries:', error);
}
}
Python: Async Streaming with Error Recovery
import asyncio
import aiohttp
import json
from typing import AsyncIterator, Optional, Callable
from datetime import datetime
import logging
class StreamingError(Exception):
"""Base exception for streaming errors"""
pass
class IncompleteStreamError(StreamingError):
"""Stream ended without proper completion"""
pass
class StreamTimeoutError(StreamingError):
"""Stream timed out waiting for data"""
pass
class ResilientStreamHandler:
"""Handle streaming responses with automatic recovery"""
def __init__(self, api_key: str, timeout: int = 60):
self.api_key = api_key
self.timeout = timeout
self.logger = logging.getLogger(__name__)
self.buffer = ""
self.retry_count = 0
self.max_retries = 3
async def stream_completion(
self,
messages: list,
model: str = "gpt-4",
on_token: Optional[Callable[[str], None]] = None,
on_error: Optional[Callable[[Exception], None]] = None,
on_complete: Optional[Callable[[str], None]] = None
) -> str:
"""Stream completion with error recovery"""
for attempt in range(self.max_retries):
try:
self.retry_count = attempt
result = await self._attempt_stream(
messages, model, on_token, on_error, on_complete
)
return result
except (StreamingError, aiohttp.ClientError) as e:
self.logger.warning(
f"Stream attempt {attempt + 1} failed: {e}"
)
if on_error:
on_error(e)
if attempt < self.max_retries - 1:
# Exponential backoff
await asyncio.sleep(2 ** attempt)
# Resume from buffer if possible
if self.buffer and on_token:
on_token(f"[Resuming...] {self.buffer}")
else:
raise
raise StreamingError("Max retries exceeded")
async def _attempt_stream(
self,
messages: list,
model: str,
on_token: Optional[Callable[[str], None]],
on_error: Optional[Callable[[Exception], None]],
on_complete: Optional[Callable[[str], None]]
) -> str:
"""Single streaming attempt"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"Accept": "text/event-stream"
}
data = {
"model": model,
"messages": messages,
"stream": True
}
timeout_config = aiohttp.ClientTimeout(
total=self.timeout,
connect=10,
sock_read=30
)
async with aiohttp.ClientSession(timeout=timeout_config) as session:
async with session.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=data
) as response:
if response.status != 200:
error_data = await response.json()
raise StreamingError(
f"API error: {error_data.get('error', {}).get('message', 'Unknown')}"
)
return await self._process_stream(
response, on_token, on_error, on_complete
)
async def _process_stream(
self,
response: aiohttp.ClientResponse,
on_token: Optional[Callable[[str], None]],
on_error: Optional[Callable[[Exception], None]],
on_complete: Optional[Callable[[str], None]]
) -> str:
"""Process SSE stream with error handling"""
full_response = self.buffer
last_activity = datetime.now()
is_complete = False
try:
async for line in response.content:
# Update activity timestamp
last_activity = datetime.now()
# Decode line
try:
line = line.decode('utf-8').strip()
except UnicodeDecodeError:
self.logger.warning("Failed to decode line, skipping")
continue
if not line:
continue
# Parse SSE format
if line.startswith('data: '):
data = line[6:]
if data == '[DONE]':
is_complete = True
break
try:
chunk = json.loads(data)
# Extract token
if 'choices' in chunk and len(chunk['choices']) > 0:
delta = chunk['choices'][0].get('delta', {})
content = delta.get('content', '')
if content:
full_response += content
self.buffer = full_response
if on_token:
on_token(content)
# Check completion
finish_reason = chunk['choices'][0].get('finish_reason')
if finish_reason:
is_complete = True
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse chunk: {data}")
if on_error:
on_error(e)
# Check for timeout
if (datetime.now() - last_activity).total_seconds() > 10:
raise StreamTimeoutError("No activity for 10 seconds")
if not is_complete:
raise IncompleteStreamError(
"Stream ended without completion marker"
)
if on_complete:
on_complete(full_response)
# Clear buffer on success
self.buffer = ""
return full_response
except asyncio.TimeoutError:
raise StreamTimeoutError(f"Stream timeout after {self.timeout}s")
except Exception as e:
self.logger.error(f"Stream processing error: {e}")
raise
# Advanced usage with monitoring
class StreamMonitor:
"""Monitor streaming performance and errors"""
def __init__(self):
self.metrics = {
"total_streams": 0,
"successful_streams": 0,
"failed_streams": 0,
"retries": 0,
"total_tokens": 0,
"errors_by_type": {}
}
self.active_streams = set()
async def monitored_stream(
self,
handler: ResilientStreamHandler,
messages: list,
stream_id: str
) -> str:
"""Stream with monitoring"""
self.metrics["total_streams"] += 1
self.active_streams.add(stream_id)
start_time = datetime.now()
tokens_received = 0
try:
result = await handler.stream_completion(
messages=messages,
on_token=lambda token: self._on_token(token, tokens_received),
on_error=lambda error: self._on_error(error, stream_id),
on_complete=lambda response: self._on_complete(
stream_id, start_time, len(response)
)
)
self.metrics["successful_streams"] += 1
return result
except Exception as e:
self.metrics["failed_streams"] += 1
error_type = type(e).__name__
self.metrics["errors_by_type"][error_type] = self.metrics["errors_by_type"].get(error_type, 0) + 1
raise
finally:
self.active_streams.discard(stream_id)
def _on_token(self, token: str, token_count: int):
token_count += 1
self.metrics["total_tokens"] += 1
def _on_error(self, error: Exception, stream_id: str):
self.metrics["retries"] += 1
print(f"Stream {stream_id} error: {error}")
def _on_complete(self, stream_id: str, start_time: datetime, response_length: int):
duration = (datetime.now() - start_time).total_seconds()
print(f"Stream {stream_id} completed: {response_length} chars in {duration:.1f}s")
def get_stats(self) -> dict:
"""Get streaming statistics"""
success_rate = (
self.metrics["successful_streams"] /
max(self.metrics["total_streams"], 1) * 100
)
return {
**self.metrics,
"success_rate": f"{success_rate:.1f}%",
"active_streams": len(self.active_streams),
"avg_retries": self.metrics["retries"] / max(self.metrics["total_streams"], 1)
}
# Usage example
async def main():
handler = ResilientStreamHandler(api_key="your-key")
monitor = StreamMonitor()
messages = [{"role": "user", "content": "Write a poem"}]
try:
response = await monitor.monitored_stream(
handler, messages, stream_id="poem-001"
)
print(f"Final response: {response}")
except StreamingError as e:
print(f"Streaming failed: {e}")
print("\nStreaming stats:", monitor.get_stats())
# Run
asyncio.run(main())
Debugging Streaming Issues
Stream Event Logger
class StreamDebugger {
private events: Array<{
timestamp: number;
type: string;
data: any;
}> = [];
logEvent(type: string, data: any): void {
this.events.push({
timestamp: Date.now(),
type,
data
});
}
async debugStream(url: string, options: RequestInit): Promise<void> {
const startTime = Date.now();
this.logEvent('stream_start', { url, options });
try {
const response = await fetch(url, options);
this.logEvent('response_received', {
status: response.status,
headers: Object.fromEntries(response.headers.entries())
});
if (!response.ok) {
const error = await response.json();
this.logEvent('api_error', error);
throw new Error(`API error: ${response.status}`);
}
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = '';
let chunkCount = 0;
while (true) {
const readStart = Date.now();
const { done, value } = await reader.read();
const readDuration = Date.now() - readStart;
if (done) {
this.logEvent('stream_end', {
duration: Date.now() - startTime,
chunks: chunkCount
});
break;
}
chunkCount++;
const chunk = decoder.decode(value, { stream: true });
buffer += chunk;
this.logEvent('chunk_received', {
chunkNumber: chunkCount,
size: value.length,
readDuration,
preview: chunk.substring(0, 100)
});
// Parse SSE lines
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // Keep incomplete line in buffer
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
this.logEvent('stream_complete', { chunkCount });
return;
}
try {
const parsed = JSON.parse(data);
this.logEvent('data_parsed', {
hasContent: !!parsed.choices?.[0]?.delta?.content,
finishReason: parsed.choices?.[0]?.finish_reason
});
} catch (e) {
this.logEvent('parse_error', {
data,
error: e.message
});
}
}
}
}
} catch (error) {
this.logEvent('stream_error', {
error: error.message,
duration: Date.now() - startTime
});
throw error;
}
}
generateReport(): string {
const report = ['=== Stream Debug Report ===\n'];
// Summary
const duration = this.events[this.events.length - 1].timestamp - this.events[0].timestamp;
const chunks = this.events.filter(e => e.type === 'chunk_received').length;
const errors = this.events.filter(e => e.type.includes('error')).length;
report.push(`Duration: ${duration}ms`);
report.push(`Chunks received: ${chunks}`);
report.push(`Errors: ${errors}`);
report.push('\n=== Event Timeline ===\n');
// Timeline
const startTime = this.events[0].timestamp;
this.events.forEach(event => {
const relativeTime = event.timestamp - startTime;
report.push(`[${relativeTime}ms] ${event.type}: ${JSON.stringify(event.data, null, 2)}`);
});
// Chunk analysis
const chunkEvents = this.events.filter(e => e.type === 'chunk_received');
if (chunkEvents.length > 0) {
report.push('\n=== Chunk Analysis ===\n');
const sizes = chunkEvents.map(e => e.data.size);
const durations = chunkEvents.map(e => e.data.readDuration);
report.push(`Average chunk size: ${(sizes.reduce((a, b) => a + b) / sizes.length).toFixed(0)} bytes`);
report.push(`Average read time: ${(durations.reduce((a, b) => a + b) / durations.length).toFixed(1)}ms`);
report.push(`Min/Max chunk size: ${Math.min(...sizes)}/${Math.max(...sizes)} bytes`);
}
return report.join('\n');
}
}
// Usage
const debugger = new StreamDebugger();
try {
await debugger.debugStream('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${apiKey}`
},
body: JSON.stringify({
model: 'gpt-4',
messages: [{ role: 'user', content: 'Hello' }],
stream: true
})
});
console.log(debugger.generateReport());
} catch (error) {
console.error('Stream failed:', error);
console.log(debugger.generateReport());
}
Common Streaming Issues & Solutions
1. Connection Drops Mid-Stream
Symptoms: Stream stops receiving data without error or completion marker.
Causes: Network issues, proxy timeouts, server-side errors.
Solutions:
- Implement activity monitoring with timeout detection
- Use keep-alive mechanisms
- Buffer partial responses for retry
- Implement reconnection logic
2. Malformed SSE Data
Symptoms: JSON parse errors, missing data fields.
Causes: Network corruption, incomplete chunks, encoding issues.
Solutions:
- Use proper SSE parser libraries
- Buffer incomplete lines
- Validate JSON before parsing
- Skip corrupted chunks gracefully
3. Memory Issues with Large Streams
Symptoms: Browser/process runs out of memory, performance degradation.
Causes: Accumulating entire response in memory, no cleanup.
Solutions:
- Process chunks incrementally
- Implement streaming to disk/database
- Use backpressure mechanisms
- Limit buffer sizes
Monitoring Stream Health
interface StreamMetrics {
streamId: string;
startTime: number;
endTime?: number;
bytesReceived: number;
chunksReceived: number;
tokensReceived: number;
errors: Array<{ time: number; error: string }>;
reconnects: number;
status: 'active' | 'completed' | 'failed';
}
class StreamHealthMonitor {
private streams: Map<string, StreamMetrics> = new Map();
private listeners: Array<(metrics: StreamMetrics) => void> = [];
startMonitoring(streamId: string): void {
this.streams.set(streamId, {
streamId,
startTime: Date.now(),
bytesReceived: 0,
chunksReceived: 0,
tokensReceived: 0,
errors: [],
reconnects: 0,
status: 'active'
});
}
updateMetrics(
streamId: string,
update: Partial<StreamMetrics>
): void {
const metrics = this.streams.get(streamId);
if (!metrics) return;
Object.assign(metrics, update);
// Notify listeners
this.listeners.forEach(listener => listener(metrics));
}
recordChunk(streamId: string, size: number, tokens: number): void {
const metrics = this.streams.get(streamId);
if (!metrics) return;
metrics.bytesReceived += size;
metrics.chunksReceived += 1;
metrics.tokensReceived += tokens;
}
recordError(streamId: string, error: string): void {
const metrics = this.streams.get(streamId);
if (!metrics) return;
metrics.errors.push({
time: Date.now(),
error
});
}
completeStream(streamId: string, success: boolean): void {
const metrics = this.streams.get(streamId);
if (!metrics) return;
metrics.endTime = Date.now();
metrics.status = success ? 'completed' : 'failed';
}
getHealthReport(): {
activeStreams: number;
totalStreams: number;
successRate: number;
avgDuration: number;
avgBytesPerStream: number;
commonErrors: Array<{ error: string; count: number }>;
} {
const allStreams = Array.from(this.streams.values());
const completedStreams = allStreams.filter(s => s.status !== 'active');
const successfulStreams = allStreams.filter(s => s.status === 'completed');
// Calculate averages
const durations = completedStreams
.filter(s => s.endTime)
.map(s => s.endTime! - s.startTime);
const avgDuration = durations.length > 0
? durations.reduce((a, b) => a + b) / durations.length
: 0;
const avgBytes = allStreams.length > 0
? allStreams.reduce((sum, s) => sum + s.bytesReceived, 0) / allStreams.length
: 0;
// Count errors
const errorCounts = new Map<string, number>();
allStreams.forEach(stream => {
stream.errors.forEach(({ error }) => {
errorCounts.set(error, (errorCounts.get(error) || 0) + 1);
});
});
const commonErrors = Array.from(errorCounts.entries())
.map(([error, count]) => ({ error, count }))
.sort((a, b) => b.count - a.count)
.slice(0, 5);
return {
activeStreams: allStreams.filter(s => s.status === 'active').length,
totalStreams: allStreams.length,
successRate: completedStreams.length > 0
? (successfulStreams.length / completedStreams.length) * 100
: 0,
avgDuration: Math.round(avgDuration),
avgBytesPerStream: Math.round(avgBytes),
commonErrors
};
}
// Real-time monitoring dashboard
createDashboard(): HTMLElement {
const dashboard = document.createElement('div');
dashboard.className = 'stream-monitor-dashboard';
const update = () => {
const report = this.getHealthReport();
dashboard.innerHTML = `
<h3>Stream Health Monitor</h3>
<div class="metrics-grid">
<div class="metric">
<span class="label">Active Streams</span>
<span class="value">${report.activeStreams}</span>
</div>
<div class="metric">
<span class="label">Success Rate</span>
<span class="value">${report.successRate.toFixed(1)}%</span>
</div>
<div class="metric">
<span class="label">Avg Duration</span>
<span class="value">${(report.avgDuration / 1000).toFixed(1)}s</span>
</div>
<div class="metric">
<span class="label">Avg Size</span>
<span class="value">${(report.avgBytesPerStream / 1024).toFixed(1)}KB</span>
</div>
</div>
<div class="errors">
<h4>Common Errors</h4>
<ul>
${report.commonErrors.map(({ error, count }) =>
`<li>${error}: ${count} times</li>`
).join('')}
</ul>
</div>
`;
};
// Update every second
setInterval(update, 1000);
update();
return dashboard;
}
}
// Global monitor instance
const streamMonitor = new StreamHealthMonitor();
// Integration with streaming handler
async function monitoredStream(messages: any[]) {
const streamId = `stream-${Date.now()}`;
streamMonitor.startMonitoring(streamId);
try {
const result = await streamWithRetry(messages, {
onChunk: (chunk, size) => {
streamMonitor.recordChunk(streamId, size, countTokens(chunk));
},
onError: (error) => {
streamMonitor.recordError(streamId, error.message);
},
onReconnect: () => {
streamMonitor.updateMetrics(streamId, {
reconnects: (streamMonitor.streams.get(streamId)?.reconnects || 0) + 1
});
}
});
streamMonitor.completeStream(streamId, true);
return result;
} catch (error) {
streamMonitor.completeStream(streamId, false);
throw error;
}
}
Best Practices
Do's
- • Implement reconnection logic
- • Monitor stream activity
- • Buffer partial responses
- • Use proper SSE parsers
- • Handle incomplete streams
- • Set appropriate timeouts
- • Clean up resources properly
Don'ts
- • Don't ignore connection errors
- • Don't accumulate unlimited data
- • Don't retry infinitely
- • Don't parse incomplete JSON
- • Don't leak stream resources
- • Don't ignore backpressure
- • Don't trust all chunks
Testing Streaming Endpoints
Always test your streaming implementation with:
- Slow connections (throttle network speed)
- Interrupted connections (disconnect mid-stream)
- Large responses (test memory limits)
- Concurrent streams (test resource usage)
- Error responses (test error handling)
- Timeout scenarios (test recovery)
References
- [1] OpenAI. "Error Codes Reference" (2024)
- [2] Anthropic. "API Errors" (2024)
- [3] Stack Overflow. "OpenAI API Questions" (2024)