Intermediate
January 15, 202415 min read

How to Handle LLM API Rate Limits: Complete Guide with Code Examples

Master rate limit handling with exponential backoff, queuing strategies, and production-ready code examples for all major LLM APIs.

Understanding Rate Limit Errors

All major LLM providers use HTTP 429 (Too Many Requests) as their standard rate limit response[1]. However, each provider includes different headers and metadata to help you handle these limits intelligently:

Provider-Specific Rate Limit Responses

OpenAI

Status: 429
Headers: x-ratelimit-limit-requests, x-ratelimit-remaining-requests, x-ratelimit-reset-requests
Body: {"error": {"type": "rate_limit_error", "message": "Rate limit exceeded"}}

Anthropic

Status: 429
Headers: anthropic-ratelimit-requests-limit, anthropic-ratelimit-requests-remaining
Body: {"type": "rate_limit_error", "message": "Rate limit exceeded"}

Google (Vertex AI)

Status: 429
Headers: Retry-After
Body: {"error": {"code": 429, "message": "Resource exhausted", "status": "RESOURCE_EXHAUSTED"}}

Cohere

Status: 429
Headers: X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Reset
Body: {"message": "Too many requests"}

Implementing Exponential Backoff with Jitter

Exponential backoff prevents "retry storms" by progressively increasing wait times between retries[2]. Adding jitter (randomness) prevents synchronized retries from multiple clients:

Python Implementation
import time
import random
import requests
from typing import Optional, Dict, Any

class RateLimitHandler:
    def __init__(
        self,
        max_retries: int = 5,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        jitter: float = 0.5
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.jitter = jitter
    
    def parse_rate_limit_headers(self, headers: Dict[str, str]) -> Dict[str, Any]:
        """Parse rate limit headers from different providers"""
        result = {}
        
        # OpenAI headers
        if 'x-ratelimit-remaining-requests' in headers:
            result['remaining'] = int(headers.get('x-ratelimit-remaining-requests', 0))
            result['limit'] = int(headers.get('x-ratelimit-limit-requests', 0))
            result['reset'] = int(headers.get('x-ratelimit-reset-requests', 0))
        
        # Anthropic headers
        elif 'anthropic-ratelimit-requests-remaining' in headers:
            result['remaining'] = int(headers.get('anthropic-ratelimit-requests-remaining', 0))
            result['limit'] = int(headers.get('anthropic-ratelimit-requests-limit', 0))
            result['reset'] = int(headers.get('anthropic-ratelimit-requests-reset', 0))
        
        # Cohere headers
        elif 'X-RateLimit-Remaining' in headers:
            result['remaining'] = int(headers.get('X-RateLimit-Remaining', 0))
            result['limit'] = int(headers.get('X-RateLimit-Limit', 0))
            result['reset'] = int(headers.get('X-RateLimit-Reset', 0))
        
        # Retry-After header (common)
        if 'Retry-After' in headers:
            result['retry_after'] = int(headers.get('Retry-After'))
        
        return result
    
    def call_with_retry(
        self,
        func,
        *args,
        **kwargs
    ) -> requests.Response:
        """Execute API call with exponential backoff and retry logic"""
        last_exception = None
        
        for attempt in range(self.max_retries):
            try:
                response = func(*args, **kwargs)
                
                # Success - return response
                if response.status_code < 400:
                    return response
                
                # Rate limit hit
                if response.status_code == 429:
                    headers = self.parse_rate_limit_headers(response.headers)
                    
                    # Calculate delay
                    if 'retry_after' in headers:
                        delay = headers['retry_after']
                    else:
                        delay = min(
                            self.base_delay * (2 ** attempt),
                            self.max_delay
                        )
                    
                    # Add jitter
                    delay += random.uniform(0, self.jitter * delay)
                    
                    print(f"Rate limit hit. Waiting {delay:.2f}s before retry {attempt + 1}/{self.max_retries}")
                    time.sleep(delay)
                    continue
                
                # Other errors - raise immediately
                response.raise_for_status()
                
            except requests.exceptions.RequestException as e:
                last_exception = e
                if attempt == self.max_retries - 1:
                    raise
                
                # Network errors - use exponential backoff
                delay = min(
                    self.base_delay * (2 ** attempt),
                    self.max_delay
                )
                delay += random.uniform(0, self.jitter * delay)
                
                print(f"Request failed: {e}. Retrying in {delay:.2f}s...")
                time.sleep(delay)
        
        # All retries exhausted
        if last_exception:
            raise last_exception
        else:
            raise Exception("Max retries exceeded")

# Usage example
rate_limiter = RateLimitHandler()

def make_openai_request(prompt: str) -> requests.Response:
    return requests.post(
        "https://api.openai.com/v1/chat/completions",
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        },
        json={
            "model": "gpt-4",
            "messages": [{"role": "user", "content": prompt}]
        }
    )

# Use with retry handling
response = rate_limiter.call_with_retry(
    make_openai_request,
    "Hello, world!"
)

Advanced Queue Management

For high-volume applications, implement a queue system that respects rate limits proactively rather than reactively[2]. This prevents hitting limits in the first place:

Token Bucket Algorithm
Smooth out request patterns and prevent bursts
import asyncio
import time
from typing import Optional
from collections import deque

class TokenBucket:
    """
    Token bucket implementation for rate limiting.
    Allows bursts while maintaining average rate.
    """
    def __init__(
        self,
        rate: float,  # tokens per second
        capacity: int  # max burst size
    ):
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_update = time.monotonic()
        self.lock = asyncio.Lock()
    
    async def acquire(self, tokens: int = 1) -> float:
        """
        Acquire tokens from the bucket.
        Returns wait time if tokens not immediately available.
        """
        async with self.lock:
            now = time.monotonic()
            elapsed = now - self.last_update
            
            # Add new tokens based on elapsed time
            self.tokens = min(
                self.capacity,
                self.tokens + elapsed * self.rate
            )
            self.last_update = now
            
            if self.tokens >= tokens:
                # Tokens available
                self.tokens -= tokens
                return 0.0
            else:
                # Calculate wait time
                deficit = tokens - self.tokens
                wait_time = deficit / self.rate
                return wait_time

class RateLimitedQueue:
    """
    Queue that respects rate limits using token bucket
    """
    def __init__(
        self,
        requests_per_minute: int,
        burst_capacity: Optional[int] = None
    ):
        self.bucket = TokenBucket(
            rate=requests_per_minute / 60.0,
            capacity=burst_capacity or requests_per_minute
        )
        self.queue = asyncio.Queue()
        self.workers = []
    
    async def add_request(self, request_func, *args, **kwargs):
        """Add a request to the queue"""
        await self.queue.put((request_func, args, kwargs))
    
    async def _worker(self):
        """Worker that processes queue with rate limiting"""
        while True:
            try:
                request_func, args, kwargs = await self.queue.get()
                
                # Wait for token
                wait_time = await self.bucket.acquire()
                if wait_time > 0:
                    await asyncio.sleep(wait_time)
                
                # Execute request
                try:
                    result = await request_func(*args, **kwargs)
                    # Handle result (store, callback, etc.)
                except Exception as e:
                    print(f"Request failed: {e}")
                
            except asyncio.CancelledError:
                break
    
    async def start(self, num_workers: int = 1):
        """Start worker tasks"""
        self.workers = [
            asyncio.create_task(self._worker())
            for _ in range(num_workers)
        ]
    
    async def stop(self):
        """Stop all workers"""
        for worker in self.workers:
            worker.cancel()
        await asyncio.gather(*self.workers, return_exceptions=True)

# Usage example
queue = RateLimitedQueue(
    requests_per_minute=60,  # OpenAI tier limit
    burst_capacity=10
)

await queue.start(num_workers=3)

# Add requests to queue
for prompt in prompts:
    await queue.add_request(
        call_openai_async,
        prompt=prompt
    )

Circuit Breaker Pattern

Circuit breakers prevent cascading failures by temporarily blocking requests when a service is struggling[4]. This protects both your system and the API provider:

Circuit Breaker States

Closed

Normal operation, requests flow through

Open

Failures exceeded threshold, requests blocked

Half-Open

Testing if service recovered

from enum import Enum
from datetime import datetime, timedelta
import threading

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
        self.lock = threading.Lock()
    
    def call(self, func, *args, **kwargs):
        with self.lock:
            if self.state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self.state = CircuitState.HALF_OPEN
                else:
                    raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure()
            raise
    
    def _should_attempt_reset(self) -> bool:
        return (
            self.last_failure_time and
            datetime.now() > self.last_failure_time + 
            timedelta(seconds=self.recovery_timeout)
        )
    
    def _on_success(self):
        with self.lock:
            self.failure_count = 0
            self.state = CircuitState.CLOSED
    
    def _on_failure(self):
        with self.lock:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN

# Usage with rate limit handling
breaker = CircuitBreaker(
    failure_threshold=5,
    recovery_timeout=60,
    expected_exception=requests.HTTPError
)

def make_protected_request(prompt):
    return breaker.call(
        lambda: rate_limiter.call_with_retry(
            make_openai_request,
            prompt
        )
    )

Provider-Specific Rate Limits

Each provider has different rate limits based on your tier and usage. Here's a comprehensive overview[1][2]:

OpenAI Rate Limits

Free Tier

3 RPM, 200 RPD, 150K TPM

Tier 1 ($5 paid)

60 RPM, 500 RPD, 1M TPM

Tier 2 ($50 paid)

500 RPM, 10K RPD, 2M TPM

Headers: x-ratelimit-*
Anthropic Rate Limits

Default

5 RPM, 300K TPD

Scale Tier

50 RPM, 5M TPD

Enterprise

Custom limits

Headers: anthropic-ratelimit-*
Google Vertex AI

Per Project

60 RPM default, adjustable via quotas

Gemini Pro

60 RPM, 1M TPD

Headers: Retry-After
Cohere

Trial

10 RPM, 100 RPD

Production

100 RPM, 10K RPD

Headers: X-RateLimit-*

RPM = Requests Per Minute, RPD = Requests Per Day, TPM = Tokens Per Minute, TPD = Tokens Per Day

Best Practices for Avoiding Rate Limits

Proactive Strategies
  • Batch Requests: Combine multiple queries when possible
  • Cache Responses: Store and reuse common responses
  • Optimize Prompts: Reduce token usage without sacrificing quality
  • Monitor Usage: Track consumption against limits
  • Request Increases: Apply for higher limits before hitting them
Monitoring & Alerting
  • Track Headers: Log remaining quota from response headers
  • Set Alerts: Notify when usage exceeds 80% of limits
  • Dashboard Metrics: Visualize usage patterns over time
  • Error Tracking: Monitor 429 error rates
  • Capacity Planning: Predict when limit increases needed

Production-Ready Implementation

Here's a complete example combining all the techniques for a production-ready rate limit handler:

Complete Rate Limit Solution
Combines exponential backoff, circuit breaker, and queue management
import asyncio
import aiohttp
from dataclasses import dataclass
from typing import Dict, Any, Optional
import logging

@dataclass
class LLMProvider:
    name: str
    base_url: str
    headers_mapping: Dict[str, str]
    rate_limits: Dict[str, int]

# Provider configurations
PROVIDERS = {
    "openai": LLMProvider(
        name="OpenAI",
        base_url="https://api.openai.com/v1",
        headers_mapping={
            "remaining": "x-ratelimit-remaining-requests",
            "limit": "x-ratelimit-limit-requests",
            "reset": "x-ratelimit-reset-requests"
        },
        rate_limits={"rpm": 60, "tpm": 1000000}
    ),
    "anthropic": LLMProvider(
        name="Anthropic",
        base_url="https://api.anthropic.com/v1",
        headers_mapping={
            "remaining": "anthropic-ratelimit-requests-remaining",
            "limit": "anthropic-ratelimit-requests-limit",
            "reset": "anthropic-ratelimit-requests-reset"
        },
        rate_limits={"rpm": 5, "tpm": 300000}
    )
}

class ProductionRateLimitHandler:
    def __init__(
        self,
        provider: str,
        api_key: str,
        logger: Optional[logging.Logger] = None
    ):
        self.provider = PROVIDERS[provider]
        self.api_key = api_key
        self.logger = logger or logging.getLogger(__name__)
        
        # Components
        self.rate_limiter = RateLimitHandler()
        self.circuit_breaker = CircuitBreaker()
        self.token_bucket = TokenBucket(
            rate=self.provider.rate_limits["rpm"] / 60,
            capacity=self.provider.rate_limits["rpm"]
        )
        
        # Metrics
        self.metrics = {
            "requests_sent": 0,
            "requests_failed": 0,
            "rate_limits_hit": 0,
            "circuit_opens": 0
        }
    
    async def make_request(
        self,
        endpoint: str,
        payload: Dict[str, Any],
        timeout: int = 30
    ) -> Dict[str, Any]:
        """Make rate-limited request with full error handling"""
        
        # Wait for token bucket
        wait_time = await self.token_bucket.acquire()
        if wait_time > 0:
            self.logger.info(f"Token bucket wait: {wait_time:.2f}s")
            await asyncio.sleep(wait_time)
        
        # Check circuit breaker
        if self.circuit_breaker.state == CircuitState.OPEN:
            self.metrics["circuit_opens"] += 1
            raise Exception("Circuit breaker open - service unavailable")
        
        # Make request with retries
        async with aiohttp.ClientSession() as session:
            url = f"{self.provider.base_url}/{endpoint}"
            headers = {
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json"
            }
            
            for attempt in range(self.rate_limiter.max_retries):
                try:
                    async with session.post(
                        url,
                        json=payload,
                        headers=headers,
                        timeout=aiohttp.ClientTimeout(total=timeout)
                    ) as response:
                        self.metrics["requests_sent"] += 1
                        
                        # Parse headers for monitoring
                        self._log_rate_limit_status(response.headers)
                        
                        # Success
                        if response.status == 200:
                            self.circuit_breaker._on_success()
                            return await response.json()
                        
                        # Rate limit
                        if response.status == 429:
                            self.metrics["rate_limits_hit"] += 1
                            self.circuit_breaker._on_failure()
                            
                            # Calculate backoff
                            delay = self._calculate_delay(
                                response.headers,
                                attempt
                            )
                            
                            self.logger.warning(
                                f"Rate limit hit, waiting {delay:.2f}s"
                            )
                            await asyncio.sleep(delay)
                            continue
                        
                        # Other errors
                        error_data = await response.text()
                        raise Exception(f"API error {response.status}: {error_data}")
                        
                except asyncio.TimeoutError:
                    self.metrics["requests_failed"] += 1
                    self.logger.error(f"Request timeout on attempt {attempt + 1}")
                    if attempt == self.rate_limiter.max_retries - 1:
                        raise
                    
                    await asyncio.sleep(2 ** attempt)
    
    def _calculate_delay(
        self,
        headers: Dict[str, str],
        attempt: int
    ) -> float:
        """Calculate retry delay from headers or exponential backoff"""
        if "Retry-After" in headers:
            return float(headers["Retry-After"])
        
        base_delay = self.rate_limiter.base_delay
        max_delay = self.rate_limiter.max_delay
        jitter = self.rate_limiter.jitter
        
        delay = min(base_delay * (2 ** attempt), max_delay)
        delay += random.uniform(0, jitter * delay)
        
        return delay
    
    def _log_rate_limit_status(self, headers: Dict[str, str]):
        """Log current rate limit status from headers"""
        mapping = self.provider.headers_mapping
        
        remaining = headers.get(mapping.get("remaining"))
        limit = headers.get(mapping.get("limit"))
        
        if remaining and limit:
            usage_pct = (1 - int(remaining) / int(limit)) * 100
            
            if usage_pct > 80:
                self.logger.warning(
                    f"High rate limit usage: {usage_pct:.1f}% "
                    f"({remaining}/{limit} remaining)"
                )
    
    def get_metrics(self) -> Dict[str, Any]:
        """Get current metrics for monitoring"""
        return {
            **self.metrics,
            "circuit_state": self.circuit_breaker.state.value,
            "bucket_tokens": self.token_bucket.tokens
        }

# Usage example
handler = ProductionRateLimitHandler(
    provider="openai",
    api_key=api_key
)

# Make requests
response = await handler.make_request(
    endpoint="chat/completions",
    payload={
        "model": "gpt-4",
        "messages": [{"role": "user", "content": "Hello!"}]
    }
)

# Monitor metrics
print(handler.get_metrics())

Conclusion

Proper rate limit handling is essential for building reliable LLM applications. By implementing exponential backoff, circuit breakers, and proactive queue management, you can ensure your application gracefully handles rate limits while maintaining performance[4].

References

  1. [1] OpenAI. "Rate Limits Guide" (2024)
  2. [2] Orq.ai. "API Rate Limits Explained: Best Practices for 2025" (2025)
  3. [3] Zuplo. "10 Best Practices for API Rate Limiting in 2025" (2025)
  4. [4] Portkey AI. "Tackling Rate Limiting for LLM Apps" (2025)
  5. [5] Anthropic. "API Rate Limits Documentation" (2024)
  6. [6] GitHub Block. "Handling LLM Rate Limits with Goose" (2025)
  7. [7] Vellum. "How to Manage OpenAI Rate Limits as You Scale" (2024)
  8. [8] Requesty AI. "Rate Limits for LLM Providers: OpenAI, Anthropic, and DeepSeek" (2024)
  9. [9] Microsoft. "Azure OpenAI Service Quotas and Limits" (2024)
  10. [10] Google Cloud. "Vertex AI Quotas and Limits" (2024)
  11. [11] Cohere. "Rate Limits Documentation" (2024)