Intermediate
January 20, 202414 min read

LLM Streaming Implementation Guide

Implement real-time streaming responses from LLM APIs with proper error handling and client-side rendering.

Understanding Streaming Technologies

LLM streaming can be implemented using Server-Sent Events (SSE) or WebSockets. Each has distinct advantages[2]:

Server-Sent Events (SSE)

Advantages:

  • • Simple HTTP-based protocol
  • • Automatic reconnection
  • • Works through proxies/firewalls
  • • Built-in browser support

Limitations:

  • • Unidirectional (server → client)
  • • Text-only data
  • • 6 connection limit per domain
Best for: LLM streaming
WebSockets

Advantages:

  • • Bidirectional communication
  • • Binary data support
  • • Lower latency
  • • No connection limits

Limitations:

  • • More complex implementation
  • • Manual reconnection logic
  • • Proxy/firewall issues
Best for: Interactive apps

Backend Implementation

Let's implement streaming endpoints for major LLM providers with proper error handling and performance optimization[2]:

Node.js Streaming Implementation
import express from 'express';
import { OpenAI } from 'openai';
import { Anthropic } from '@anthropic-ai/sdk';
import { PassThrough } from 'stream';

const app = express();
const openai = new OpenAI();
const anthropic = new Anthropic();

// SSE headers configuration
const SSE_HEADERS = {
  'Content-Type': 'text/event-stream',
  'Cache-Control': 'no-cache',
  'Connection': 'keep-alive',
  'X-Accel-Buffering': 'no', // Disable Nginx buffering
};

// Stream manager for connection tracking
class StreamManager {
  private activeStreams = new Map<string, PassThrough>();

  register(id: string, stream: PassThrough) {
    this.activeStreams.set(id, stream);
  }

  unregister(id: string) {
    const stream = this.activeStreams.get(id);
    if (stream) {
      stream.end();
      this.activeStreams.delete(id);
    }
  }

  getActiveCount(): number {
    return this.activeStreams.size;
  }
}

const streamManager = new StreamManager();

// OpenAI streaming endpoint
app.post('/api/stream/openai', async (req, res) => {
  const streamId = `stream-${Date.now()}`;
  const stream = new PassThrough();
  
  // Set SSE headers
  res.writeHead(200, SSE_HEADERS);
  
  // Register stream
  streamManager.register(streamId, stream);
  
  // Handle client disconnect
  req.on('close', () => {
    streamManager.unregister(streamId);
  });

  try {
    const { messages, model = 'gpt-3.5-turbo' } = req.body;
    
    // Create streaming completion
    const completion = await openai.chat.completions.create({
      model,
      messages,
      stream: true,
      temperature: 0.7,
    });

    // Process stream
    for await (const chunk of completion) {
      const content = chunk.choices[0]?.delta?.content;
      
      if (content) {
        // Send SSE formatted data
        const data = JSON.stringify({ 
          content, 
          finished: false,
          timestamp: Date.now() 
        });
        
        stream.write(`data: ${data}\n\n`);
        
        // Pipe to response
        if (!stream.pipe(res)) {
          break; // Client disconnected
        }
      }
    }

    // Send completion event
    stream.write(`data: ${JSON.stringify({ 
      finished: true, 
      timestamp: Date.now() 
    })}\n\n`);
    
    stream.end();
    
  } catch (error) {
    console.error('Streaming error:', error);
    
    // Send error event
    stream.write(`data: ${JSON.stringify({ 
      error: error.message,
      finished: true 
    })}\n\n`);
    
    stream.end();
  } finally {
    streamManager.unregister(streamId);
  }
});

// Anthropic (Claude) streaming endpoint
app.post('/api/stream/anthropic', async (req, res) => {
  const streamId = `stream-${Date.now()}`;
  
  res.writeHead(200, SSE_HEADERS);

  try {
    const { messages, model = 'claude-3-sonnet-20240229' } = req.body;
    
    const stream = await anthropic.messages.create({
      model,
      messages,
      max_tokens: 1000,
      stream: true,
    });

    for await (const event of stream) {
      if (event.type === 'content_block_delta') {
        const data = JSON.stringify({
          content: event.delta.text,
          finished: false,
          timestamp: Date.now(),
        });
        
        res.write(`data: ${data}\n\n`);
      }
    }

    res.write(`data: ${JSON.stringify({ 
      finished: true, 
      timestamp: Date.now() 
    })}\n\n`);
    
    res.end();
    
  } catch (error) {
    res.write(`data: ${JSON.stringify({ 
      error: error.message,
      finished: true 
    })}\n\n`);
    
    res.end();
  }
});

// Advanced streaming with buffering and rate control
class StreamBuffer {
  private buffer: string[] = [];
  private bufferSize: number;
  private flushInterval: number;
  private lastFlush: number = Date.now();
  
  constructor(bufferSize = 5, flushInterval = 100) {
    this.bufferSize = bufferSize;
    this.flushInterval = flushInterval;
  }

  add(content: string): string | null {
    this.buffer.push(content);
    
    const shouldFlush = 
      this.buffer.length >= this.bufferSize ||
      Date.now() - this.lastFlush >= this.flushInterval;
    
    if (shouldFlush) {
      return this.flush();
    }
    
    return null;
  }

  flush(): string | null {
    if (this.buffer.length === 0) return null;
    
    const content = this.buffer.join('');
    this.buffer = [];
    this.lastFlush = Date.now();
    
    return content;
  }
}

// Buffered streaming endpoint
app.post('/api/stream/buffered', async (req, res) => {
  res.writeHead(200, SSE_HEADERS);
  
  const buffer = new StreamBuffer(5, 100); // 5 tokens or 100ms
  
  try {
    const { messages } = req.body;
    
    const completion = await openai.chat.completions.create({
      model: 'gpt-3.5-turbo',
      messages,
      stream: true,
    });

    for await (const chunk of completion) {
      const content = chunk.choices[0]?.delta?.content;
      
      if (content) {
        const bufferedContent = buffer.add(content);
        
        if (bufferedContent) {
          res.write(`data: ${JSON.stringify({ 
            content: bufferedContent,
            buffered: true,
            timestamp: Date.now()
          })}\n\n`);
        }
      }
    }

    // Flush remaining buffer
    const remaining = buffer.flush();
    if (remaining) {
      res.write(`data: ${JSON.stringify({ 
        content: remaining,
        buffered: true,
        timestamp: Date.now()
      })}\n\n`);
    }

    res.write(`data: ${JSON.stringify({ finished: true })}\n\n`);
    res.end();
    
  } catch (error) {
    res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
    res.end();
  }
});

// WebSocket implementation
import { WebSocketServer } from 'ws';

const wss = new WebSocketServer({ port: 8080 });

wss.on('connection', (ws) => {
  console.log('WebSocket client connected');
  
  ws.on('message', async (data) => {
    try {
      const { messages, provider = 'openai' } = JSON.parse(data.toString());
      
      if (provider === 'openai') {
        const stream = await openai.chat.completions.create({
          model: 'gpt-3.5-turbo',
          messages,
          stream: true,
        });

        for await (const chunk of stream) {
          const content = chunk.choices[0]?.delta?.content;
          
          if (content && ws.readyState === ws.OPEN) {
            ws.send(JSON.stringify({ 
              type: 'content',
              content,
              timestamp: Date.now()
            }));
          }
        }

        if (ws.readyState === ws.OPEN) {
          ws.send(JSON.stringify({ type: 'done' }));
        }
      }
      
    } catch (error) {
      if (ws.readyState === ws.OPEN) {
        ws.send(JSON.stringify({ 
          type: 'error', 
          error: error.message 
        }));
      }
    }
  });

  ws.on('close', () => {
    console.log('WebSocket client disconnected');
  });
});

// Monitoring endpoint
app.get('/api/stream/status', (req, res) => {
  res.json({
    activeStreams: streamManager.getActiveCount(),
    timestamp: Date.now(),
  });
});

app.listen(3000, () => {
  console.log('Streaming server running on port 3000');
});

Frontend Implementation

Implement robust client-side streaming with proper error handling and reconnection logic[3]:

React Streaming Hook
import { useState, useCallback, useRef, useEffect } from 'react';

interface StreamOptions {
  onChunk?: (chunk: string) => void;
  onComplete?: (fullText: string) => void;
  onError?: (error: Error) => void;
  bufferSize?: number;
  reconnectAttempts?: number;
  reconnectDelay?: number;
}

interface StreamState {
  isStreaming: boolean;
  text: string;
  error: Error | null;
  chunks: string[];
}

export function useStreamingLLM(options: StreamOptions = {}) {
  const [state, setState] = useState<StreamState>({
    isStreaming: false,
    text: '',
    error: null,
    chunks: []
  });

  const abortControllerRef = useRef<AbortController | null>(null);
  const eventSourceRef = useRef<EventSource | null>(null);
  const reconnectAttemptsRef = useRef(0);
  const textBufferRef = useRef('');

  const {
    onChunk,
    onComplete,
    onError,
    reconnectAttempts = 3,
    reconnectDelay = 1000
  } = options;

  // Cleanup function
  const cleanup = useCallback(() => {
    if (abortControllerRef.current) {
      abortControllerRef.current.abort();
      abortControllerRef.current = null;
    }
    if (eventSourceRef.current) {
      eventSourceRef.current.close();
      eventSourceRef.current = null;
    }
  }, []);

  // SSE streaming implementation
  const streamWithSSE = useCallback(async (
    url: string,
    messages: any[],
    provider: string = 'openai'
  ) => {
    cleanup();
    
    setState(prev => ({
      ...prev,
      isStreaming: true,
      error: null,
      text: '',
      chunks: []
    }));

    textBufferRef.current = '';

    try {
      const response = await fetch(url, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        body: JSON.stringify({ messages, provider })
      });

      if (!response.ok) {
        throw new Error(`HTTP error! status: ${response.status}`);
      }

      const reader = response.body?.getReader();
      const decoder = new TextDecoder();

      if (!reader) {
        throw new Error('No reader available');
      }

      const processStream = async () => {
        while (true) {
          const { done, value } = await reader.read();
          
          if (done) break;

          const chunk = decoder.decode(value, { stream: true });
          const lines = chunk.split('\n');

          for (const line of lines) {
            if (line.startsWith('data: ')) {
              try {
                const data = JSON.parse(line.slice(6));
                
                if (data.error) {
                  throw new Error(data.error);
                }

                if (data.finished) {
                  setState(prev => ({
                    ...prev,
                    isStreaming: false
                  }));
                  onComplete?.(textBufferRef.current);
                  return;
                }

                if (data.content) {
                  textBufferRef.current += data.content;
                  
                  setState(prev => ({
                    ...prev,
                    text: textBufferRef.current,
                    chunks: [...prev.chunks, data.content]
                  }));

                  onChunk?.(data.content);
                }
              } catch (e) {
                if (e instanceof SyntaxError) {
                  // Skip invalid JSON
                  continue;
                }
                throw e;
              }
            }
          }
        }
      };

      await processStream();

    } catch (error) {
      const err = error as Error;
      setState(prev => ({
        ...prev,
        isStreaming: false,
        error: err
      }));
      onError?.(err);
      
      // Retry logic
      if (reconnectAttemptsRef.current < reconnectAttempts) {
        reconnectAttemptsRef.current++;
        setTimeout(() => {
          streamWithSSE(url, messages, provider);
        }, reconnectDelay * reconnectAttemptsRef.current);
      }
    }
  }, [cleanup, onChunk, onComplete, onError, reconnectAttempts, reconnectDelay]);

  // EventSource implementation (alternative)
  const streamWithEventSource = useCallback((
    url: string,
    messages: any[]
  ) => {
    cleanup();
    
    setState(prev => ({
      ...prev,
      isStreaming: true,
      error: null,
      text: '',
      chunks: []
    }));

    textBufferRef.current = '';

    const eventSource = new EventSource(url);
    eventSourceRef.current = eventSource;

    eventSource.onmessage = (event) => {
      try {
        const data = JSON.parse(event.data);
        
        if (data.finished) {
          eventSource.close();
          setState(prev => ({
            ...prev,
            isStreaming: false
          }));
          onComplete?.(textBufferRef.current);
          return;
        }

        if (data.content) {
          textBufferRef.current += data.content;
          
          setState(prev => ({
            ...prev,
            text: textBufferRef.current,
            chunks: [...prev.chunks, data.content]
          }));

          onChunk?.(data.content);
        }
      } catch (error) {
        console.error('Parse error:', error);
      }
    };

    eventSource.onerror = (error) => {
      eventSource.close();
      const err = new Error('EventSource error');
      
      setState(prev => ({
        ...prev,
        isStreaming: false,
        error: err
      }));
      
      onError?.(err);
    };
  }, [cleanup, onChunk, onComplete, onError]);

  // WebSocket streaming
  const streamWithWebSocket = useCallback((
    wsUrl: string,
    messages: any[]
  ) => {
    cleanup();
    
    setState(prev => ({
      ...prev,
      isStreaming: true,
      error: null,
      text: '',
      chunks: []
    }));

    textBufferRef.current = '';

    const ws = new WebSocket(wsUrl);

    ws.onopen = () => {
      ws.send(JSON.stringify({ messages }));
    };

    ws.onmessage = (event) => {
      try {
        const data = JSON.parse(event.data);
        
        if (data.type === 'done') {
          ws.close();
          setState(prev => ({
            ...prev,
            isStreaming: false
          }));
          onComplete?.(textBufferRef.current);
          return;
        }

        if (data.type === 'content' && data.content) {
          textBufferRef.current += data.content;
          
          setState(prev => ({
            ...prev,
            text: textBufferRef.current,
            chunks: [...prev.chunks, data.content]
          }));

          onChunk?.(data.content);
        }

        if (data.type === 'error') {
          throw new Error(data.error);
        }
      } catch (error) {
        const err = error as Error;
        setState(prev => ({
          ...prev,
          isStreaming: false,
          error: err
        }));
        onError?.(err);
        ws.close();
      }
    };

    ws.onerror = (error) => {
      const err = new Error('WebSocket error');
      setState(prev => ({
        ...prev,
        isStreaming: false,
        error: err
      }));
      onError?.(err);
    };

    ws.onclose = () => {
      setState(prev => ({
        ...prev,
        isStreaming: false
      }));
    };
  }, [cleanup, onChunk, onComplete, onError]);

  // Abort streaming
  const abort = useCallback(() => {
    cleanup();
    setState(prev => ({
      ...prev,
      isStreaming: false
    }));
  }, [cleanup]);

  // Cleanup on unmount
  useEffect(() => {
    return cleanup;
  }, [cleanup]);

  return {
    ...state,
    streamWithSSE,
    streamWithEventSource,
    streamWithWebSocket,
    abort,
    reset: () => setState({
      isStreaming: false,
      text: '',
      error: null,
      chunks: []
    })
  };
}

// Usage example
export function StreamingChat() {
  const {
    text,
    isStreaming,
    error,
    streamWithSSE,
    abort
  } = useStreamingLLM({
    onChunk: (chunk) => {
      console.log('Received chunk:', chunk);
    },
    onComplete: (fullText) => {
      console.log('Streaming complete:', fullText);
    },
    onError: (error) => {
      console.error('Streaming error:', error);
    }
  });

  const handleStream = () => {
    const messages = [
      { role: 'user', content: 'Tell me a story about AI' }
    ];
    
    streamWithSSE('/api/stream/openai', messages);
  };

  return (
    <div className="p-4">
      <div className="mb-4">
        <button
          onClick={isStreaming ? abort : handleStream}
          className="px-4 py-2 bg-blue-500 text-white rounded hover:bg-blue-600"
        >
          {isStreaming ? 'Stop' : 'Start Streaming'}
        </button>
      </div>

      {error && (
        <div className="mb-4 p-3 bg-red-100 text-red-700 rounded">
          Error: {error.message}
        </div>
      )}

      <div className="p-4 border rounded-lg bg-gray-50">
        {isStreaming && !text && (
          <div className="flex items-center">
            <Loader2 className="w-4 h-4 animate-spin mr-2" />
            <span>AI is thinking...</span>
          </div>
        )}
        
        {text && (
          <div className="whitespace-pre-wrap">
            {text}
            {isStreaming && (
              <span className="inline-block w-2 h-4 bg-gray-600 animate-pulse ml-1" />
            )}
          </div>
        )}
      </div>
    </div>
  );
}

Performance Optimization

Optimize streaming performance for better user experience and reduced resource usage[4]:

Token Buffering Strategy

Buffer tokens to reduce UI updates and improve performance:

// Optimal buffering configuration
const bufferConfig = {
  // Time-based: Flush every 100ms
  timeInterval: 100,
  
  // Size-based: Flush every 5 tokens
  tokenCount: 5,
  
  // Adaptive: Increase buffer for fast streams
  adaptive: true,
  
  // Max buffer size to prevent memory issues
  maxBufferSize: 50
};

// Implementation
class AdaptiveBuffer {
  constructor(config) {
    this.config = config;
    this.buffer = [];
    this.metrics = {
      tokensPerSecond: 0,
      avgTokenSize: 0
    };
  }

  shouldFlush() {
    return (
      this.buffer.length >= this.config.tokenCount ||
      Date.now() - this.lastFlush > this.config.timeInterval
    );
  }

  adaptBufferSize() {
    if (this.metrics.tokensPerSecond > 50) {
      this.config.tokenCount = Math.min(
        this.config.tokenCount + 1,
        this.config.maxBufferSize
      );
    }
  }
}
UI Optimization

Optimize rendering for smooth streaming:

  • • Use requestAnimationFrame for updates
  • • Implement virtual scrolling for long outputs
  • • Debounce DOM updates to 60fps max
  • • Use CSS transforms for cursor animation
  • • Lazy render markdown/code blocks
  • • Implement progressive rendering

Testing Streaming Endpoints

Comprehensive testing ensures reliable streaming in production[2]:

Testing Suite
import { describe, test, expect, beforeEach, afterEach } from 'vitest';
import { createServer } from 'http';
import { EventSource } from 'eventsource';

describe('Streaming Endpoints', () => {
  let server: any;
  let baseUrl: string;

  beforeEach(async () => {
    server = await createTestServer();
    baseUrl = `http://localhost:${server.port}`;
  });

  afterEach(() => {
    server.close();
  });

  test('SSE streaming delivers chunks incrementally', async () => {
    const chunks: string[] = [];
    const startTime = Date.now();

    await new Promise<void>((resolve, reject) => {
      const es = new EventSource(`${baseUrl}/stream`);
      
      es.onmessage = (event) => {
        const data = JSON.parse(event.data);
        
        if (data.content) {
          chunks.push(data.content);
          
          // Verify incremental delivery
          const elapsed = Date.now() - startTime;
          expect(elapsed).toBeGreaterThan(chunks.length * 50);
        }
        
        if (data.finished) {
          es.close();
          resolve();
        }
      };
      
      es.onerror = () => {
        es.close();
        reject(new Error('Stream error'));
      };
    });

    expect(chunks.length).toBeGreaterThan(0);
    expect(chunks.join('')).toContain('expected content');
  });

  test('Stream handles disconnection gracefully', async () => {
    const es = new EventSource(`${baseUrl}/stream`);
    
    // Simulate disconnection after 100ms
    setTimeout(() => es.close(), 100);
    
    const disconnected = await new Promise<boolean>((resolve) => {
      es.onerror = () => resolve(true);
      setTimeout(() => resolve(false), 200);
    });
    
    expect(disconnected).toBe(true);
  });

  test('Buffering reduces chunk frequency', async () => {
    const timestamps: number[] = [];
    
    const response = await fetch(`${baseUrl}/stream/buffered`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ messages: [], bufferSize: 10 })
    });
    
    const reader = response.body!.getReader();
    const decoder = new TextDecoder();
    
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      
      const chunk = decoder.decode(value);
      if (chunk.includes('data:')) {
        timestamps.push(Date.now());
      }
    }
    
    // Verify buffering effect
    const intervals = timestamps.slice(1).map(
      (t, i) => t - timestamps[i]
    );
    
    const avgInterval = intervals.reduce((a, b) => a + b) / intervals.length;
    expect(avgInterval).toBeGreaterThan(80); // Buffered chunks
  });

  test('Performance: handles high-throughput streaming', async () => {
    const startTime = Date.now();
    const tokenCount = 1000;
    
    const response = await fetch(`${baseUrl}/stream/performance`, {
      method: 'POST',
      body: JSON.stringify({ tokenCount })
    });
    
    let receivedTokens = 0;
    const reader = response.body!.getReader();
    
    while (true) {
      const { done } = await reader.read();
      if (done) break;
      receivedTokens++;
    }
    
    const duration = Date.now() - startTime;
    const tokensPerSecond = (receivedTokens / duration) * 1000;
    
    expect(tokensPerSecond).toBeGreaterThan(100); // Min performance
  });
});

// Load testing with k6
const k6Script = `
import { check } from 'k6';
import { Trend } from 'k6/metrics';
import { EventSource } from 'k6/experimental/events';

const streamDuration = new Trend('stream_duration');
const tokensReceived = new Trend('tokens_received');

export const options = {
  stages: [
    { duration: '30s', target: 10 },
    { duration: '1m', target: 50 },
    { duration: '30s', target: 0 },
  ],
};

export default function () {
  const url = 'http://localhost:3000/stream';
  const es = new EventSource(url);
  
  let tokens = 0;
  const startTime = Date.now();
  
  es.addEventListener('message', (event) => {
    const data = JSON.parse(event.data);
    if (data.content) tokens++;
    
    if (data.finished) {
      const duration = Date.now() - startTime;
      streamDuration.add(duration);
      tokensReceived.add(tokens);
      es.close();
    }
  });
  
  check(tokens, {
    'received tokens': (t) => t > 0,
  });
}
`;

Best Practices

Do's ✅
  • • Implement proper error handling and retries
  • • Use token buffering for performance
  • • Add connection status indicators
  • • Implement graceful degradation
  • • Monitor streaming metrics
  • • Test with poor network conditions
  • • Use compression when available
Don'ts ❌
  • • Don't update UI on every token
  • • Don't ignore connection limits
  • • Don't forget to clean up resources
  • • Don't block the main thread
  • • Don't retry infinitely
  • • Don't ignore backpressure
  • • Don't trust client-side validation

Conclusion

Implementing robust streaming for LLM responses requires careful attention to error handling, performance optimization, and user experience. By following the patterns in this guide, you can build streaming that feels instant and works reliably across all conditions[5].

References

  1. [1] OpenAI. "Streaming API Reference" (2024)
  2. [2] LangChain. "Streaming Concepts" (2025)
  3. [3] Mastering Nuxt. "Implement Streaming Responses for Real-Time LLM Outputs" (2025)
  4. [4] Tamás Piros. "Streaming LLM Responses: A Deep Dive" (2025)
  5. [5] Dev.to. "Mastering Real-Time AI: Building Streaming LLMs with FastAPI and Transformers" (2025)
  6. [6] Anthropic. "Streaming Responses" (2024)
  7. [7] Google Cloud. "Send Streaming Request" (2024)
  8. [8] Vercel AI SDK. "Streaming Documentation" (2024)
  9. [9] n8n Community. "Stream AI Responses: LLM Chains and AI Agents" (2025)
  10. [10] MDN. "Server-sent Events" (2024)