Troubleshooting
January 15, 202420 min read

Fix 'Rate Limit Exceeded' Errors (429)

Rate limit errors are the most common issues when working with LLM APIs. This guide provides battle-tested solutions to handle, prevent, and recover from rate limiting across all major providers.

Understanding Rate Limit Errors

According to OpenAI's official documentation, rate limiting helps ensure fair usage and prevents abuse. Similar policies are documented inAnthropic's API guide andGoogle's Gemini documentation.

Common Error Messages by Provider

OpenAI
Status: 429
Error: "RateLimitError"
Message: "Rate limit exceeded"
Anthropic (Claude)
Status: 429
Error: "rate_limit_exceeded"
Headers: Retry-After
Google Vertex AI
Status: 429
Error: "Quota exceeded"
Message: "User Rate Limit Exceeded"
AWS Bedrock
Status: 429
Error: "ThrottlingException"
Headers: Retry-After
Solution 1

Exponential Backoff with Jitter

The gold standard for handling rate limits. This approach prevents thundering herd problems and respects API limits. As recommended in AssemblyAI's best practices guide and demonstrated in OpenAI's Python SDK.

Python Implementation

import time
import random
import requests
from typing import Optional, Dict, Any

class RateLimitHandler:
    def __init__(self, max_retries: int = 5, base_delay: float = 1.0, max_delay: float = 60.0):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
    
    def calculate_delay(self, attempt: int, retry_after: Optional[int] = None) -> float:
        """Calculate delay with exponential backoff and jitter"""
        if retry_after:
            return retry_after
        
        # Exponential backoff: base_delay * 2^attempt
        delay = min(self.base_delay * (2 ** attempt), self.max_delay)
        
        # Add jitter to prevent thundering herd
        jitter = random.uniform(0, delay * 0.1)
        return delay + jitter
    
    def make_request_with_retry(self, url: str, headers: Dict[str, str], 
                               method: str = "POST", **kwargs) -> requests.Response:
        """Make HTTP request with automatic retry on rate limit"""
        for attempt in range(self.max_retries):
            try:
                response = requests.request(method, url, headers=headers, **kwargs)
                
                if response.status_code == 429:
                    retry_after = response.headers.get("Retry-After")
                    delay = self.calculate_delay(attempt, int(retry_after) if retry_after else None)
                    
                    print(f"Rate limited. Waiting {delay:.2f} seconds...")
                    time.sleep(delay)
                    continue
                
                response.raise_for_status()
                return response
                
            except requests.exceptions.RequestException as e:
                if attempt == self.max_retries - 1:
                    raise
                print(f"Request failed: {e}. Retrying...")
                time.sleep(self.calculate_delay(attempt))
        
        raise Exception("Max retries exceeded")

# Usage example
handler = RateLimitHandler()
response = handler.make_request_with_retry(
    "https://api.openai.com/v1/chat/completions",
    headers={"Authorization": "Bearer YOUR_API_KEY"},
    json={"model": "gpt-4", "messages": [{"role": "user", "content": "Hello"}]}
)

TypeScript/JavaScript Implementation

class RateLimitHandler {
  constructor(
    private maxRetries = 5,
    private baseDelay = 1000,
    private maxDelay = 60000
  ) {}

  private calculateDelay(attempt: number, retryAfter?: number): number {
    if (retryAfter) {
      return retryAfter * 1000; // Convert to milliseconds
    }
    
    // Exponential backoff with jitter
    const delay = Math.min(this.baseDelay * Math.pow(2, attempt), this.maxDelay);
    const jitter = Math.random() * delay * 0.1;
    return delay + jitter;
  }

  async makeRequestWithRetry(url: string, options: RequestInit = {}): Promise<Response> {
    for (let attempt = 0; attempt < this.maxRetries; attempt++) {
      try {
        const response = await fetch(url, options);
        
        if (response.status === 429) {
          const retryAfter = response.headers.get('Retry-After');
          const delay = this.calculateDelay(
            attempt, 
            retryAfter ? parseInt(retryAfter) : undefined
          );
          
          console.log(`Rate limited. Waiting ${delay / 1000} seconds...`);
          await new Promise(resolve => setTimeout(resolve, delay));
          continue;
        }
        
        if (!response.ok && response.status !== 429) {
          throw new Error(`HTTP error! status: ${response.status}`);
        }
        
        return response;
      } catch (error) {
        if (attempt === this.maxRetries - 1) {
          throw error;
        }
        console.error(`Request failed: ${error.message}. Retrying...`);
        await new Promise(resolve => 
          setTimeout(resolve, this.calculateDelay(attempt))
        );
      }
    }
    
    throw new Error('Max retries exceeded');
  }
}

// Usage example
const handler = new RateLimitHandler();
const response = await handler.makeRequestWithRetry(
  'https://api.anthropic.com/v1/messages',
  {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'x-api-key': 'YOUR_API_KEY',
    },
    body: JSON.stringify({
      model: 'claude-3-opus-20240229',
      messages: [{ role: 'user', content: 'Hello' }],
    }),
  }
);

Go Implementation

package main

import (
    "fmt"
    "math"
    "math/rand"
    "net/http"
    "strconv"
    "time"
)

type RateLimitHandler struct {
    MaxRetries int
    BaseDelay  time.Duration
    MaxDelay   time.Duration
    Client     *http.Client
}

func NewRateLimitHandler() *RateLimitHandler {
    return &RateLimitHandler{
        MaxRetries: 5,
        BaseDelay:  time.Second,
        MaxDelay:   time.Minute,
        Client:     &http.Client{Timeout: 30 * time.Second},
    }
}

func (h *RateLimitHandler) calculateDelay(attempt int, retryAfter *int) time.Duration {
    if retryAfter != nil {
        return time.Duration(*retryAfter) * time.Second
    }
    
    // Exponential backoff
    delay := time.Duration(math.Min(
        float64(h.BaseDelay)*math.Pow(2, float64(attempt)),
        float64(h.MaxDelay),
    ))
    
    // Add jitter (0-10% of delay)
    jitter := time.Duration(rand.Float64() * float64(delay) * 0.1)
    
    return delay + jitter
}

func (h *RateLimitHandler) DoRequestWithRetry(req *http.Request) (*http.Response, error) {
    for attempt := 0; attempt < h.MaxRetries; attempt++ {
        resp, err := h.Client.Do(req)
        if err != nil {
            if attempt == h.MaxRetries-1 {
                return nil, err
            }
            delay := h.calculateDelay(attempt, nil)
            fmt.Printf("Request failed: %v. Waiting %v before retry\n", err, delay)
            time.Sleep(delay)
            continue
        }
        
        if resp.StatusCode == 429 {
            var retryAfter *int
            if retryAfterStr := resp.Header.Get("Retry-After"); retryAfterStr != "" {
                if val, err := strconv.Atoi(retryAfterStr); err == nil {
                    retryAfter = &val
                }
            }
            
            delay := h.calculateDelay(attempt, retryAfter)
            fmt.Printf("Rate limited (429). Waiting %v before retry\n", delay)
            resp.Body.Close()
            time.Sleep(delay)
            continue
        }
        
        return resp, nil
    }
    
    return nil, fmt.Errorf("max retries exceeded")
}
Solution 2

Client-Side Rate Limiting

Prevent hitting rate limits by implementing client-side throttling. This pattern is discussed inWeights & Biases' LLMOps guide andthis Stack Overflow discussion.

Token Bucket Algorithm

import threading
import time

class TokenBucketRateLimiter:
    """Token bucket algorithm for client-side rate limiting"""
    
    def __init__(self, rate: float, capacity: int):
        self.rate = rate  # tokens per second
        self.capacity = capacity
        self.tokens = capacity
        self.last_update = time.monotonic()
        self.lock = threading.Lock()
    
    def acquire(self, tokens: int = 1) -> bool:
        """Try to acquire tokens, returns True if successful"""
        with self.lock:
            now = time.monotonic()
            elapsed = now - self.last_update
            
            # Refill tokens
            self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
            self.last_update = now
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False
    
    def wait_and_acquire(self, tokens: int = 1):
        """Wait until tokens are available, then acquire"""
        while not self.acquire(tokens):
            time.sleep(0.1)

# Usage: Limit to 10 requests per second
limiter = TokenBucketRateLimiter(rate=10, capacity=10)

def make_api_call():
    limiter.wait_and_acquire()
    # Make your API call here
    response = requests.post(...)
    return response

Sliding Window Rate Limiter

class SlidingWindowRateLimiter {
  private requests: number[] = [];
  
  constructor(
    private maxRequests: number,
    private windowSeconds: number
  ) {}
  
  allowRequest(): boolean {
    const now = Date.now();
    const windowStart = now - (this.windowSeconds * 1000);
    
    // Remove old requests outside the window
    this.requests = this.requests.filter(time => time > windowStart);
    
    if (this.requests.length < this.maxRequests) {
      this.requests.push(now);
      return true;
    }
    
    return false;
  }
  
  async waitForSlot(): Promise<void> {
    while (!this.allowRequest()) {
      await new Promise(resolve => setTimeout(resolve, 100));
    }
  }
}

// Usage: Allow 60 requests per minute
const limiter = new SlidingWindowRateLimiter(60, 60);

async function makeApiCall() {
  await limiter.waitForSlot();
  // Make your API call
  const response = await fetch(...);
  return response;
}
Solution 3

Circuit Breaker Pattern

Prevent cascading failures when rate limits are consistently hit. This pattern is detailed inAnyScale's production deployment guide andLangChain's error handling utilities.

from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    """Circuit breaker to prevent cascading failures"""
    
    def __init__(self, failure_threshold: int = 5, 
                 recovery_timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker protection"""
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN - API is rate limited")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except requests.HTTPError as e:
            if e.response.status_code == 429:
                self._on_failure()
            raise e
    
    def _should_attempt_reset(self) -> bool:
        return (self.last_failure_time and 
                datetime.now() - self.last_failure_time > 
                timedelta(seconds=self.recovery_timeout))
    
    def _on_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = datetime.now()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"Circuit breaker opened after {self.failure_count} failures")

# Usage
breaker = CircuitBreaker()

def make_api_request():
    response = requests.post(...)
    response.raise_for_status()
    return response

try:
    result = breaker.call(make_api_request)
except Exception as e:
    print(f"API call failed: {e}")

Parsing Rate Limit Headers

Extract valuable information from response headers to optimize retry behavior. Header formats are documented inMistral's API docs andCohere's error handling guide.

def parse_rate_limit_headers(response):
    """Extract rate limit information from response headers"""
    headers = response.headers
    
    rate_limit_info = {
        'limit': headers.get('X-RateLimit-Limit'),
        'remaining': headers.get('X-RateLimit-Remaining'),
        'reset': headers.get('X-RateLimit-Reset'),
        'retry_after': headers.get('Retry-After'),
    }
    
    # OpenAI specific headers
    if 'x-ratelimit-limit-requests' in headers:
        rate_limit_info['limit_requests'] = headers.get('x-ratelimit-limit-requests')
        rate_limit_info['remaining_requests'] = headers.get('x-ratelimit-remaining-requests')
        rate_limit_info['limit_tokens'] = headers.get('x-ratelimit-limit-tokens')
        rate_limit_info['remaining_tokens'] = headers.get('x-ratelimit-remaining-tokens')
    
    # Convert to appropriate types
    for key, value in rate_limit_info.items():
        if value and key != 'retry_after':
            try:
                rate_limit_info[key] = int(value)
            except ValueError:
                pass
    
    return rate_limit_info

# Usage
response = requests.post(...)
if response.status_code == 429:
    limits = parse_rate_limit_headers(response)
    print(f"Rate limit: {limits['remaining']}/{limits['limit']} remaining")
    print(f"Reset at: {datetime.fromtimestamp(limits['reset'])}")
    print(f"Retry after: {limits['retry_after']} seconds")

Handling Concurrent Requests

When making multiple API calls concurrently, centralize rate limit handling:

Async Rate Limiting with Python

import asyncio
import aiohttp
from asyncio import Semaphore

class AsyncRateLimiter:
    """Async rate limiter for concurrent requests"""
    
    def __init__(self, rate: int, per: float):
        self.rate = rate
        self.per = per
        self.allowance = rate
        self.last_check = asyncio.get_event_loop().time()
        self.lock = asyncio.Lock()
        self.semaphore = Semaphore(rate)
    
    async def acquire(self):
        """Acquire permission to make a request"""
        async with self.semaphore:
            async with self.lock:
                current = asyncio.get_event_loop().time()
                time_passed = current - self.last_check
                self.last_check = current
                
                self.allowance += time_passed * (self.rate / self.per)
                if self.allowance > self.rate:
                    self.allowance = self.rate
                
                if self.allowance < 1.0:
                    sleep_time = (1.0 - self.allowance) * (self.per / self.rate)
                    await asyncio.sleep(sleep_time)
                    self.allowance = 0.0
                else:
                    self.allowance -= 1.0

async def make_concurrent_requests(prompts, api_key):
    # 10 requests per second
    rate_limiter = AsyncRateLimiter(rate=10, per=1.0)
    
    async def fetch_completion(session, prompt):
        await rate_limiter.acquire()
        
        async with session.post(
            'https://api.openai.com/v1/chat/completions',
            json={
                'model': 'gpt-3.5-turbo',
                'messages': [{'role': 'user', 'content': prompt}]
            },
            headers={'Authorization': f'Bearer {api_key}'}
        ) as response:
            if response.status == 429:
                retry_after = int(response.headers.get('Retry-After', 5))
                await asyncio.sleep(retry_after)
                return await fetch_completion(session, prompt)
            
            return await response.json()
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_completion(session, prompt) for prompt in prompts]
        return await asyncio.gather(*tasks)

# Usage
prompts = ["Tell me a joke", "What's 2+2?", "Hello!"]
results = await make_concurrent_requests(prompts, "YOUR_API_KEY")

Provider-Specific Rate Limits

ProviderLimitsHeadersNotes
OpenAIRPM & TPM based on tierX-RateLimit-*Separate limits for requests and tokens
AnthropicModel-specific limitsRetry-AfterDifferent limits for Claude models
Google VertexProject quotasStandard headersConfigurable in GCP console
AWS BedrockAccount/region basedRetry-AfterCan request limit increases
GroqModel-specificX-RateLimit-*Very high limits for Mixtral

Best Practices

Monitoring Rate Limits

Track your API usage to prevent surprises:

import time
from collections import defaultdict
from datetime import datetime, timedelta

class RateLimitMonitor:
    """Monitor and track rate limit usage"""
    
    def __init__(self):
        self.requests = defaultdict(list)
        self.rate_limit_hits = defaultdict(int)
    
    def log_request(self, endpoint: str, status_code: int, headers: dict):
        """Log API request for monitoring"""
        now = datetime.now()
        
        self.requests[endpoint].append({
            'timestamp': now,
            'status': status_code,
            'headers': headers
        })
        
        if status_code == 429:
            self.rate_limit_hits[endpoint] += 1
        
        # Clean old entries (keep last hour)
        cutoff = now - timedelta(hours=1)
        self.requests[endpoint] = [
            r for r in self.requests[endpoint] 
            if r['timestamp'] > cutoff
        ]
    
    def get_usage_stats(self, endpoint: str) -> dict:
        """Get usage statistics for an endpoint"""
        requests = self.requests[endpoint]
        if not requests:
            return {}
        
        total = len(requests)
        rate_limited = sum(1 for r in requests if r['status'] == 429)
        
        # Get current limits from latest response
        latest = requests[-1]
        headers = latest['headers']
        
        return {
            'total_requests': total,
            'rate_limited_requests': rate_limited,
            'rate_limit_percentage': (rate_limited / total * 100) if total > 0 else 0,
            'current_limit': headers.get('X-RateLimit-Limit'),
            'remaining': headers.get('X-RateLimit-Remaining'),
            'reset_time': headers.get('X-RateLimit-Reset'),
        }

# Usage
monitor = RateLimitMonitor()

# Log each request
response = requests.post(...)
monitor.log_request(
    endpoint='chat/completions',
    status_code=response.status_code,
    headers=dict(response.headers)
)

# Check usage
stats = monitor.get_usage_stats('chat/completions')
print(f"Rate limited {stats['rate_limit_percentage']:.1f}% of requests")

Emergency Recovery

When you're consistently hitting rate limits:

  1. Implement request queuing with priority levels
  2. Use multiple API keys with round-robin distribution
  3. Enable caching for repeated requests
  4. Batch requests where supported (e.g., embeddings)
  5. Upgrade your plan or request limit increases
  6. Implement fallback providers for critical paths

Pro Tip: Multi-Provider Fallback

Don't let rate limits bring down your application. Implement automatic fallback to alternative providers:

providers = [
    {'name': 'openai', 'model': 'gpt-4', 'api_key': 'key1'},
    {'name': 'anthropic', 'model': 'claude-3-opus', 'api_key': 'key2'},
    {'name': 'google', 'model': 'gemini-pro', 'api_key': 'key3'},
]

for provider in providers:
    try:
        response = make_api_call(provider)
        break
    except RateLimitError:
        continue  # Try next provider

Need Faster Limits?

Learn about provider tiers and how to increase your limits.

View tier guide →

Multi-Provider Setup

Implement fallback strategies across multiple providers.

Learn more →

Production Ready

Build resilient systems with our monitoring guide.

Setup monitoring →

References

  1. [1] OpenAI. "Error Codes Reference" (2024)
  2. [2] Anthropic. "API Errors" (2024)
  3. [3] Stack Overflow. "OpenAI API Questions" (2024)