Why Streaming Matters
Understanding Streaming Technologies
LLM streaming can be implemented using Server-Sent Events (SSE) or WebSockets. Each has distinct advantages[2]:
Advantages:
- • Simple HTTP-based protocol
- • Automatic reconnection
- • Works through proxies/firewalls
- • Built-in browser support
Limitations:
- • Unidirectional (server → client)
- • Text-only data
- • 6 connection limit per domain
Advantages:
- • Bidirectional communication
- • Binary data support
- • Lower latency
- • No connection limits
Limitations:
- • More complex implementation
- • Manual reconnection logic
- • Proxy/firewall issues
Backend Implementation
Let's implement streaming endpoints for major LLM providers with proper error handling and performance optimization[2]:
import express from 'express';
import { OpenAI } from 'openai';
import { Anthropic } from '@anthropic-ai/sdk';
import { PassThrough } from 'stream';
const app = express();
const openai = new OpenAI();
const anthropic = new Anthropic();
// SSE headers configuration
const SSE_HEADERS = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no', // Disable Nginx buffering
};
// Stream manager for connection tracking
class StreamManager {
private activeStreams = new Map<string, PassThrough>();
register(id: string, stream: PassThrough) {
this.activeStreams.set(id, stream);
}
unregister(id: string) {
const stream = this.activeStreams.get(id);
if (stream) {
stream.end();
this.activeStreams.delete(id);
}
}
getActiveCount(): number {
return this.activeStreams.size;
}
}
const streamManager = new StreamManager();
// OpenAI streaming endpoint
app.post('/api/stream/openai', async (req, res) => {
const streamId = `stream-${Date.now()}`;
const stream = new PassThrough();
// Set SSE headers
res.writeHead(200, SSE_HEADERS);
// Register stream
streamManager.register(streamId, stream);
// Handle client disconnect
req.on('close', () => {
streamManager.unregister(streamId);
});
try {
const { messages, model = 'gpt-3.5-turbo' } = req.body;
// Create streaming completion
const completion = await openai.chat.completions.create({
model,
messages,
stream: true,
temperature: 0.7,
});
// Process stream
for await (const chunk of completion) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
// Send SSE formatted data
const data = JSON.stringify({
content,
finished: false,
timestamp: Date.now()
});
stream.write(`data: ${data}\n\n`);
// Pipe to response
if (!stream.pipe(res)) {
break; // Client disconnected
}
}
}
// Send completion event
stream.write(`data: ${JSON.stringify({
finished: true,
timestamp: Date.now()
})}\n\n`);
stream.end();
} catch (error) {
console.error('Streaming error:', error);
// Send error event
stream.write(`data: ${JSON.stringify({
error: error.message,
finished: true
})}\n\n`);
stream.end();
} finally {
streamManager.unregister(streamId);
}
});
// Anthropic (Claude) streaming endpoint
app.post('/api/stream/anthropic', async (req, res) => {
const streamId = `stream-${Date.now()}`;
res.writeHead(200, SSE_HEADERS);
try {
const { messages, model = 'claude-3-sonnet-20240229' } = req.body;
const stream = await anthropic.messages.create({
model,
messages,
max_tokens: 1000,
stream: true,
});
for await (const event of stream) {
if (event.type === 'content_block_delta') {
const data = JSON.stringify({
content: event.delta.text,
finished: false,
timestamp: Date.now(),
});
res.write(`data: ${data}\n\n`);
}
}
res.write(`data: ${JSON.stringify({
finished: true,
timestamp: Date.now()
})}\n\n`);
res.end();
} catch (error) {
res.write(`data: ${JSON.stringify({
error: error.message,
finished: true
})}\n\n`);
res.end();
}
});
// Advanced streaming with buffering and rate control
class StreamBuffer {
private buffer: string[] = [];
private bufferSize: number;
private flushInterval: number;
private lastFlush: number = Date.now();
constructor(bufferSize = 5, flushInterval = 100) {
this.bufferSize = bufferSize;
this.flushInterval = flushInterval;
}
add(content: string): string | null {
this.buffer.push(content);
const shouldFlush =
this.buffer.length >= this.bufferSize ||
Date.now() - this.lastFlush >= this.flushInterval;
if (shouldFlush) {
return this.flush();
}
return null;
}
flush(): string | null {
if (this.buffer.length === 0) return null;
const content = this.buffer.join('');
this.buffer = [];
this.lastFlush = Date.now();
return content;
}
}
// Buffered streaming endpoint
app.post('/api/stream/buffered', async (req, res) => {
res.writeHead(200, SSE_HEADERS);
const buffer = new StreamBuffer(5, 100); // 5 tokens or 100ms
try {
const { messages } = req.body;
const completion = await openai.chat.completions.create({
model: 'gpt-3.5-turbo',
messages,
stream: true,
});
for await (const chunk of completion) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
const bufferedContent = buffer.add(content);
if (bufferedContent) {
res.write(`data: ${JSON.stringify({
content: bufferedContent,
buffered: true,
timestamp: Date.now()
})}\n\n`);
}
}
}
// Flush remaining buffer
const remaining = buffer.flush();
if (remaining) {
res.write(`data: ${JSON.stringify({
content: remaining,
buffered: true,
timestamp: Date.now()
})}\n\n`);
}
res.write(`data: ${JSON.stringify({ finished: true })}\n\n`);
res.end();
} catch (error) {
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
res.end();
}
});
// WebSocket implementation
import { WebSocketServer } from 'ws';
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', (ws) => {
console.log('WebSocket client connected');
ws.on('message', async (data) => {
try {
const { messages, provider = 'openai' } = JSON.parse(data.toString());
if (provider === 'openai') {
const stream = await openai.chat.completions.create({
model: 'gpt-3.5-turbo',
messages,
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content && ws.readyState === ws.OPEN) {
ws.send(JSON.stringify({
type: 'content',
content,
timestamp: Date.now()
}));
}
}
if (ws.readyState === ws.OPEN) {
ws.send(JSON.stringify({ type: 'done' }));
}
}
} catch (error) {
if (ws.readyState === ws.OPEN) {
ws.send(JSON.stringify({
type: 'error',
error: error.message
}));
}
}
});
ws.on('close', () => {
console.log('WebSocket client disconnected');
});
});
// Monitoring endpoint
app.get('/api/stream/status', (req, res) => {
res.json({
activeStreams: streamManager.getActiveCount(),
timestamp: Date.now(),
});
});
app.listen(3000, () => {
console.log('Streaming server running on port 3000');
});
Frontend Implementation
Implement robust client-side streaming with proper error handling and reconnection logic[3]:
import { useState, useCallback, useRef, useEffect } from 'react';
interface StreamOptions {
onChunk?: (chunk: string) => void;
onComplete?: (fullText: string) => void;
onError?: (error: Error) => void;
bufferSize?: number;
reconnectAttempts?: number;
reconnectDelay?: number;
}
interface StreamState {
isStreaming: boolean;
text: string;
error: Error | null;
chunks: string[];
}
export function useStreamingLLM(options: StreamOptions = {}) {
const [state, setState] = useState<StreamState>({
isStreaming: false,
text: '',
error: null,
chunks: []
});
const abortControllerRef = useRef<AbortController | null>(null);
const eventSourceRef = useRef<EventSource | null>(null);
const reconnectAttemptsRef = useRef(0);
const textBufferRef = useRef('');
const {
onChunk,
onComplete,
onError,
reconnectAttempts = 3,
reconnectDelay = 1000
} = options;
// Cleanup function
const cleanup = useCallback(() => {
if (abortControllerRef.current) {
abortControllerRef.current.abort();
abortControllerRef.current = null;
}
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
}, []);
// SSE streaming implementation
const streamWithSSE = useCallback(async (
url: string,
messages: any[],
provider: string = 'openai'
) => {
cleanup();
setState(prev => ({
...prev,
isStreaming: true,
error: null,
text: '',
chunks: []
}));
textBufferRef.current = '';
try {
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ messages, provider })
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const reader = response.body?.getReader();
const decoder = new TextDecoder();
if (!reader) {
throw new Error('No reader available');
}
const processStream = async () => {
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const data = JSON.parse(line.slice(6));
if (data.error) {
throw new Error(data.error);
}
if (data.finished) {
setState(prev => ({
...prev,
isStreaming: false
}));
onComplete?.(textBufferRef.current);
return;
}
if (data.content) {
textBufferRef.current += data.content;
setState(prev => ({
...prev,
text: textBufferRef.current,
chunks: [...prev.chunks, data.content]
}));
onChunk?.(data.content);
}
} catch (e) {
if (e instanceof SyntaxError) {
// Skip invalid JSON
continue;
}
throw e;
}
}
}
}
};
await processStream();
} catch (error) {
const err = error as Error;
setState(prev => ({
...prev,
isStreaming: false,
error: err
}));
onError?.(err);
// Retry logic
if (reconnectAttemptsRef.current < reconnectAttempts) {
reconnectAttemptsRef.current++;
setTimeout(() => {
streamWithSSE(url, messages, provider);
}, reconnectDelay * reconnectAttemptsRef.current);
}
}
}, [cleanup, onChunk, onComplete, onError, reconnectAttempts, reconnectDelay]);
// EventSource implementation (alternative)
const streamWithEventSource = useCallback((
url: string,
messages: any[]
) => {
cleanup();
setState(prev => ({
...prev,
isStreaming: true,
error: null,
text: '',
chunks: []
}));
textBufferRef.current = '';
const eventSource = new EventSource(url);
eventSourceRef.current = eventSource;
eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
if (data.finished) {
eventSource.close();
setState(prev => ({
...prev,
isStreaming: false
}));
onComplete?.(textBufferRef.current);
return;
}
if (data.content) {
textBufferRef.current += data.content;
setState(prev => ({
...prev,
text: textBufferRef.current,
chunks: [...prev.chunks, data.content]
}));
onChunk?.(data.content);
}
} catch (error) {
console.error('Parse error:', error);
}
};
eventSource.onerror = (error) => {
eventSource.close();
const err = new Error('EventSource error');
setState(prev => ({
...prev,
isStreaming: false,
error: err
}));
onError?.(err);
};
}, [cleanup, onChunk, onComplete, onError]);
// WebSocket streaming
const streamWithWebSocket = useCallback((
wsUrl: string,
messages: any[]
) => {
cleanup();
setState(prev => ({
...prev,
isStreaming: true,
error: null,
text: '',
chunks: []
}));
textBufferRef.current = '';
const ws = new WebSocket(wsUrl);
ws.onopen = () => {
ws.send(JSON.stringify({ messages }));
};
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
if (data.type === 'done') {
ws.close();
setState(prev => ({
...prev,
isStreaming: false
}));
onComplete?.(textBufferRef.current);
return;
}
if (data.type === 'content' && data.content) {
textBufferRef.current += data.content;
setState(prev => ({
...prev,
text: textBufferRef.current,
chunks: [...prev.chunks, data.content]
}));
onChunk?.(data.content);
}
if (data.type === 'error') {
throw new Error(data.error);
}
} catch (error) {
const err = error as Error;
setState(prev => ({
...prev,
isStreaming: false,
error: err
}));
onError?.(err);
ws.close();
}
};
ws.onerror = (error) => {
const err = new Error('WebSocket error');
setState(prev => ({
...prev,
isStreaming: false,
error: err
}));
onError?.(err);
};
ws.onclose = () => {
setState(prev => ({
...prev,
isStreaming: false
}));
};
}, [cleanup, onChunk, onComplete, onError]);
// Abort streaming
const abort = useCallback(() => {
cleanup();
setState(prev => ({
...prev,
isStreaming: false
}));
}, [cleanup]);
// Cleanup on unmount
useEffect(() => {
return cleanup;
}, [cleanup]);
return {
...state,
streamWithSSE,
streamWithEventSource,
streamWithWebSocket,
abort,
reset: () => setState({
isStreaming: false,
text: '',
error: null,
chunks: []
})
};
}
// Usage example
export function StreamingChat() {
const {
text,
isStreaming,
error,
streamWithSSE,
abort
} = useStreamingLLM({
onChunk: (chunk) => {
console.log('Received chunk:', chunk);
},
onComplete: (fullText) => {
console.log('Streaming complete:', fullText);
},
onError: (error) => {
console.error('Streaming error:', error);
}
});
const handleStream = () => {
const messages = [
{ role: 'user', content: 'Tell me a story about AI' }
];
streamWithSSE('/api/stream/openai', messages);
};
return (
<div className="p-4">
<div className="mb-4">
<button
onClick={isStreaming ? abort : handleStream}
className="px-4 py-2 bg-blue-500 text-white rounded hover:bg-blue-600"
>
{isStreaming ? 'Stop' : 'Start Streaming'}
</button>
</div>
{error && (
<div className="mb-4 p-3 bg-red-100 text-red-700 rounded">
Error: {error.message}
</div>
)}
<div className="p-4 border rounded-lg bg-gray-50">
{isStreaming && !text && (
<div className="flex items-center">
<Loader2 className="w-4 h-4 animate-spin mr-2" />
<span>AI is thinking...</span>
</div>
)}
{text && (
<div className="whitespace-pre-wrap">
{text}
{isStreaming && (
<span className="inline-block w-2 h-4 bg-gray-600 animate-pulse ml-1" />
)}
</div>
)}
</div>
</div>
);
}
Performance Optimization
Optimize streaming performance for better user experience and reduced resource usage[4]:
Buffer tokens to reduce UI updates and improve performance:
// Optimal buffering configuration
const bufferConfig = {
// Time-based: Flush every 100ms
timeInterval: 100,
// Size-based: Flush every 5 tokens
tokenCount: 5,
// Adaptive: Increase buffer for fast streams
adaptive: true,
// Max buffer size to prevent memory issues
maxBufferSize: 50
};
// Implementation
class AdaptiveBuffer {
constructor(config) {
this.config = config;
this.buffer = [];
this.metrics = {
tokensPerSecond: 0,
avgTokenSize: 0
};
}
shouldFlush() {
return (
this.buffer.length >= this.config.tokenCount ||
Date.now() - this.lastFlush > this.config.timeInterval
);
}
adaptBufferSize() {
if (this.metrics.tokensPerSecond > 50) {
this.config.tokenCount = Math.min(
this.config.tokenCount + 1,
this.config.maxBufferSize
);
}
}
}
Optimize rendering for smooth streaming:
- • Use
requestAnimationFrame
for updates - • Implement virtual scrolling for long outputs
- • Debounce DOM updates to 60fps max
- • Use CSS transforms for cursor animation
- • Lazy render markdown/code blocks
- • Implement progressive rendering
Testing Streaming Endpoints
Comprehensive testing ensures reliable streaming in production[2]:
import { describe, test, expect, beforeEach, afterEach } from 'vitest';
import { createServer } from 'http';
import { EventSource } from 'eventsource';
describe('Streaming Endpoints', () => {
let server: any;
let baseUrl: string;
beforeEach(async () => {
server = await createTestServer();
baseUrl = `http://localhost:${server.port}`;
});
afterEach(() => {
server.close();
});
test('SSE streaming delivers chunks incrementally', async () => {
const chunks: string[] = [];
const startTime = Date.now();
await new Promise<void>((resolve, reject) => {
const es = new EventSource(`${baseUrl}/stream`);
es.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.content) {
chunks.push(data.content);
// Verify incremental delivery
const elapsed = Date.now() - startTime;
expect(elapsed).toBeGreaterThan(chunks.length * 50);
}
if (data.finished) {
es.close();
resolve();
}
};
es.onerror = () => {
es.close();
reject(new Error('Stream error'));
};
});
expect(chunks.length).toBeGreaterThan(0);
expect(chunks.join('')).toContain('expected content');
});
test('Stream handles disconnection gracefully', async () => {
const es = new EventSource(`${baseUrl}/stream`);
// Simulate disconnection after 100ms
setTimeout(() => es.close(), 100);
const disconnected = await new Promise<boolean>((resolve) => {
es.onerror = () => resolve(true);
setTimeout(() => resolve(false), 200);
});
expect(disconnected).toBe(true);
});
test('Buffering reduces chunk frequency', async () => {
const timestamps: number[] = [];
const response = await fetch(`${baseUrl}/stream/buffered`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages: [], bufferSize: 10 })
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
if (chunk.includes('data:')) {
timestamps.push(Date.now());
}
}
// Verify buffering effect
const intervals = timestamps.slice(1).map(
(t, i) => t - timestamps[i]
);
const avgInterval = intervals.reduce((a, b) => a + b) / intervals.length;
expect(avgInterval).toBeGreaterThan(80); // Buffered chunks
});
test('Performance: handles high-throughput streaming', async () => {
const startTime = Date.now();
const tokenCount = 1000;
const response = await fetch(`${baseUrl}/stream/performance`, {
method: 'POST',
body: JSON.stringify({ tokenCount })
});
let receivedTokens = 0;
const reader = response.body!.getReader();
while (true) {
const { done } = await reader.read();
if (done) break;
receivedTokens++;
}
const duration = Date.now() - startTime;
const tokensPerSecond = (receivedTokens / duration) * 1000;
expect(tokensPerSecond).toBeGreaterThan(100); // Min performance
});
});
// Load testing with k6
const k6Script = `
import { check } from 'k6';
import { Trend } from 'k6/metrics';
import { EventSource } from 'k6/experimental/events';
const streamDuration = new Trend('stream_duration');
const tokensReceived = new Trend('tokens_received');
export const options = {
stages: [
{ duration: '30s', target: 10 },
{ duration: '1m', target: 50 },
{ duration: '30s', target: 0 },
],
};
export default function () {
const url = 'http://localhost:3000/stream';
const es = new EventSource(url);
let tokens = 0;
const startTime = Date.now();
es.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
if (data.content) tokens++;
if (data.finished) {
const duration = Date.now() - startTime;
streamDuration.add(duration);
tokensReceived.add(tokens);
es.close();
}
});
check(tokens, {
'received tokens': (t) => t > 0,
});
}
`;
Best Practices
- • Implement proper error handling and retries
- • Use token buffering for performance
- • Add connection status indicators
- • Implement graceful degradation
- • Monitor streaming metrics
- • Test with poor network conditions
- • Use compression when available
- • Don't update UI on every token
- • Don't ignore connection limits
- • Don't forget to clean up resources
- • Don't block the main thread
- • Don't retry infinitely
- • Don't ignore backpressure
- • Don't trust client-side validation
Conclusion
Implementing robust streaming for LLM responses requires careful attention to error handling, performance optimization, and user experience. By following the patterns in this guide, you can build streaming that feels instant and works reliably across all conditions[5].
Quick Start
References
- [1] OpenAI. "Streaming API Reference" (2024)
- [2] LangChain. "Streaming Concepts" (2025)
- [3] Mastering Nuxt. "Implement Streaming Responses for Real-Time LLM Outputs" (2025)
- [4] Tamás Piros. "Streaming LLM Responses: A Deep Dive" (2025)
- [5] Dev.to. "Mastering Real-Time AI: Building Streaming LLMs with FastAPI and Transformers" (2025)
- [6] Anthropic. "Streaming Responses" (2024)
- [7] Google Cloud. "Send Streaming Request" (2024)
- [8] Vercel AI SDK. "Streaming Documentation" (2024)
- [9] n8n Community. "Stream AI Responses: LLM Chains and AI Agents" (2025)
- [10] MDN. "Server-sent Events" (2024)