Why Rate Limit Handling Matters
Understanding Rate Limit Errors
All major LLM providers use HTTP 429 (Too Many Requests) as their standard rate limit response[1]. However, each provider includes different headers and metadata to help you handle these limits intelligently:
OpenAI
Status: 429
Headers: x-ratelimit-limit-requests, x-ratelimit-remaining-requests, x-ratelimit-reset-requests
Body: {"error": {"type": "rate_limit_error", "message": "Rate limit exceeded"}}
Anthropic
Status: 429
Headers: anthropic-ratelimit-requests-limit, anthropic-ratelimit-requests-remaining
Body: {"type": "rate_limit_error", "message": "Rate limit exceeded"}
Google (Vertex AI)
Status: 429
Headers: Retry-After
Body: {"error": {"code": 429, "message": "Resource exhausted", "status": "RESOURCE_EXHAUSTED"}}
Cohere
Status: 429
Headers: X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Reset
Body: {"message": "Too many requests"}
Implementing Exponential Backoff with Jitter
Exponential backoff prevents "retry storms" by progressively increasing wait times between retries[2]. Adding jitter (randomness) prevents synchronized retries from multiple clients:
import time
import random
import requests
from typing import Optional, Dict, Any
class RateLimitHandler:
def __init__(
self,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: float = 0.5
):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.jitter = jitter
def parse_rate_limit_headers(self, headers: Dict[str, str]) -> Dict[str, Any]:
"""Parse rate limit headers from different providers"""
result = {}
# OpenAI headers
if 'x-ratelimit-remaining-requests' in headers:
result['remaining'] = int(headers.get('x-ratelimit-remaining-requests', 0))
result['limit'] = int(headers.get('x-ratelimit-limit-requests', 0))
result['reset'] = int(headers.get('x-ratelimit-reset-requests', 0))
# Anthropic headers
elif 'anthropic-ratelimit-requests-remaining' in headers:
result['remaining'] = int(headers.get('anthropic-ratelimit-requests-remaining', 0))
result['limit'] = int(headers.get('anthropic-ratelimit-requests-limit', 0))
result['reset'] = int(headers.get('anthropic-ratelimit-requests-reset', 0))
# Cohere headers
elif 'X-RateLimit-Remaining' in headers:
result['remaining'] = int(headers.get('X-RateLimit-Remaining', 0))
result['limit'] = int(headers.get('X-RateLimit-Limit', 0))
result['reset'] = int(headers.get('X-RateLimit-Reset', 0))
# Retry-After header (common)
if 'Retry-After' in headers:
result['retry_after'] = int(headers.get('Retry-After'))
return result
def call_with_retry(
self,
func,
*args,
**kwargs
) -> requests.Response:
"""Execute API call with exponential backoff and retry logic"""
last_exception = None
for attempt in range(self.max_retries):
try:
response = func(*args, **kwargs)
# Success - return response
if response.status_code < 400:
return response
# Rate limit hit
if response.status_code == 429:
headers = self.parse_rate_limit_headers(response.headers)
# Calculate delay
if 'retry_after' in headers:
delay = headers['retry_after']
else:
delay = min(
self.base_delay * (2 ** attempt),
self.max_delay
)
# Add jitter
delay += random.uniform(0, self.jitter * delay)
print(f"Rate limit hit. Waiting {delay:.2f}s before retry {attempt + 1}/{self.max_retries}")
time.sleep(delay)
continue
# Other errors - raise immediately
response.raise_for_status()
except requests.exceptions.RequestException as e:
last_exception = e
if attempt == self.max_retries - 1:
raise
# Network errors - use exponential backoff
delay = min(
self.base_delay * (2 ** attempt),
self.max_delay
)
delay += random.uniform(0, self.jitter * delay)
print(f"Request failed: {e}. Retrying in {delay:.2f}s...")
time.sleep(delay)
# All retries exhausted
if last_exception:
raise last_exception
else:
raise Exception("Max retries exceeded")
# Usage example
rate_limiter = RateLimitHandler()
def make_openai_request(prompt: str) -> requests.Response:
return requests.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
json={
"model": "gpt-4",
"messages": [{"role": "user", "content": prompt}]
}
)
# Use with retry handling
response = rate_limiter.call_with_retry(
make_openai_request,
"Hello, world!"
)
Advanced Queue Management
For high-volume applications, implement a queue system that respects rate limits proactively rather than reactively[2]. This prevents hitting limits in the first place:
import asyncio
import time
from typing import Optional
from collections import deque
class TokenBucket:
"""
Token bucket implementation for rate limiting.
Allows bursts while maintaining average rate.
"""
def __init__(
self,
rate: float, # tokens per second
capacity: int # max burst size
):
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_update = time.monotonic()
self.lock = asyncio.Lock()
async def acquire(self, tokens: int = 1) -> float:
"""
Acquire tokens from the bucket.
Returns wait time if tokens not immediately available.
"""
async with self.lock:
now = time.monotonic()
elapsed = now - self.last_update
# Add new tokens based on elapsed time
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.rate
)
self.last_update = now
if self.tokens >= tokens:
# Tokens available
self.tokens -= tokens
return 0.0
else:
# Calculate wait time
deficit = tokens - self.tokens
wait_time = deficit / self.rate
return wait_time
class RateLimitedQueue:
"""
Queue that respects rate limits using token bucket
"""
def __init__(
self,
requests_per_minute: int,
burst_capacity: Optional[int] = None
):
self.bucket = TokenBucket(
rate=requests_per_minute / 60.0,
capacity=burst_capacity or requests_per_minute
)
self.queue = asyncio.Queue()
self.workers = []
async def add_request(self, request_func, *args, **kwargs):
"""Add a request to the queue"""
await self.queue.put((request_func, args, kwargs))
async def _worker(self):
"""Worker that processes queue with rate limiting"""
while True:
try:
request_func, args, kwargs = await self.queue.get()
# Wait for token
wait_time = await self.bucket.acquire()
if wait_time > 0:
await asyncio.sleep(wait_time)
# Execute request
try:
result = await request_func(*args, **kwargs)
# Handle result (store, callback, etc.)
except Exception as e:
print(f"Request failed: {e}")
except asyncio.CancelledError:
break
async def start(self, num_workers: int = 1):
"""Start worker tasks"""
self.workers = [
asyncio.create_task(self._worker())
for _ in range(num_workers)
]
async def stop(self):
"""Stop all workers"""
for worker in self.workers:
worker.cancel()
await asyncio.gather(*self.workers, return_exceptions=True)
# Usage example
queue = RateLimitedQueue(
requests_per_minute=60, # OpenAI tier limit
burst_capacity=10
)
await queue.start(num_workers=3)
# Add requests to queue
for prompt in prompts:
await queue.add_request(
call_openai_async,
prompt=prompt
)
Circuit Breaker Pattern
Circuit breakers prevent cascading failures by temporarily blocking requests when a service is struggling[4]. This protects both your system and the API provider:
Closed
Normal operation, requests flow through
Open
Failures exceeded threshold, requests blocked
Half-Open
Testing if service recovered
from enum import Enum
from datetime import datetime, timedelta
import threading
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
self.lock = threading.Lock()
def call(self, func, *args, **kwargs):
with self.lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _should_attempt_reset(self) -> bool:
return (
self.last_failure_time and
datetime.now() > self.last_failure_time +
timedelta(seconds=self.recovery_timeout)
)
def _on_success(self):
with self.lock:
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
with self.lock:
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Usage with rate limit handling
breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=60,
expected_exception=requests.HTTPError
)
def make_protected_request(prompt):
return breaker.call(
lambda: rate_limiter.call_with_retry(
make_openai_request,
prompt
)
)
Provider-Specific Rate Limits
Each provider has different rate limits based on your tier and usage. Here's a comprehensive overview[1][2]:
Free Tier
3 RPM, 200 RPD, 150K TPM
Tier 1 ($5 paid)
60 RPM, 500 RPD, 1M TPM
Tier 2 ($50 paid)
500 RPM, 10K RPD, 2M TPM
Default
5 RPM, 300K TPD
Scale Tier
50 RPM, 5M TPD
Enterprise
Custom limits
Per Project
60 RPM default, adjustable via quotas
Gemini Pro
60 RPM, 1M TPD
Trial
10 RPM, 100 RPD
Production
100 RPM, 10K RPD
RPM = Requests Per Minute, RPD = Requests Per Day, TPM = Tokens Per Minute, TPD = Tokens Per Day
Best Practices for Avoiding Rate Limits
- • Batch Requests: Combine multiple queries when possible
- • Cache Responses: Store and reuse common responses
- • Optimize Prompts: Reduce token usage without sacrificing quality
- • Monitor Usage: Track consumption against limits
- • Request Increases: Apply for higher limits before hitting them
- • Track Headers: Log remaining quota from response headers
- • Set Alerts: Notify when usage exceeds 80% of limits
- • Dashboard Metrics: Visualize usage patterns over time
- • Error Tracking: Monitor 429 error rates
- • Capacity Planning: Predict when limit increases needed
Production-Ready Implementation
Here's a complete example combining all the techniques for a production-ready rate limit handler:
import asyncio
import aiohttp
from dataclasses import dataclass
from typing import Dict, Any, Optional
import logging
@dataclass
class LLMProvider:
name: str
base_url: str
headers_mapping: Dict[str, str]
rate_limits: Dict[str, int]
# Provider configurations
PROVIDERS = {
"openai": LLMProvider(
name="OpenAI",
base_url="https://api.openai.com/v1",
headers_mapping={
"remaining": "x-ratelimit-remaining-requests",
"limit": "x-ratelimit-limit-requests",
"reset": "x-ratelimit-reset-requests"
},
rate_limits={"rpm": 60, "tpm": 1000000}
),
"anthropic": LLMProvider(
name="Anthropic",
base_url="https://api.anthropic.com/v1",
headers_mapping={
"remaining": "anthropic-ratelimit-requests-remaining",
"limit": "anthropic-ratelimit-requests-limit",
"reset": "anthropic-ratelimit-requests-reset"
},
rate_limits={"rpm": 5, "tpm": 300000}
)
}
class ProductionRateLimitHandler:
def __init__(
self,
provider: str,
api_key: str,
logger: Optional[logging.Logger] = None
):
self.provider = PROVIDERS[provider]
self.api_key = api_key
self.logger = logger or logging.getLogger(__name__)
# Components
self.rate_limiter = RateLimitHandler()
self.circuit_breaker = CircuitBreaker()
self.token_bucket = TokenBucket(
rate=self.provider.rate_limits["rpm"] / 60,
capacity=self.provider.rate_limits["rpm"]
)
# Metrics
self.metrics = {
"requests_sent": 0,
"requests_failed": 0,
"rate_limits_hit": 0,
"circuit_opens": 0
}
async def make_request(
self,
endpoint: str,
payload: Dict[str, Any],
timeout: int = 30
) -> Dict[str, Any]:
"""Make rate-limited request with full error handling"""
# Wait for token bucket
wait_time = await self.token_bucket.acquire()
if wait_time > 0:
self.logger.info(f"Token bucket wait: {wait_time:.2f}s")
await asyncio.sleep(wait_time)
# Check circuit breaker
if self.circuit_breaker.state == CircuitState.OPEN:
self.metrics["circuit_opens"] += 1
raise Exception("Circuit breaker open - service unavailable")
# Make request with retries
async with aiohttp.ClientSession() as session:
url = f"{self.provider.base_url}/{endpoint}"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
for attempt in range(self.rate_limiter.max_retries):
try:
async with session.post(
url,
json=payload,
headers=headers,
timeout=aiohttp.ClientTimeout(total=timeout)
) as response:
self.metrics["requests_sent"] += 1
# Parse headers for monitoring
self._log_rate_limit_status(response.headers)
# Success
if response.status == 200:
self.circuit_breaker._on_success()
return await response.json()
# Rate limit
if response.status == 429:
self.metrics["rate_limits_hit"] += 1
self.circuit_breaker._on_failure()
# Calculate backoff
delay = self._calculate_delay(
response.headers,
attempt
)
self.logger.warning(
f"Rate limit hit, waiting {delay:.2f}s"
)
await asyncio.sleep(delay)
continue
# Other errors
error_data = await response.text()
raise Exception(f"API error {response.status}: {error_data}")
except asyncio.TimeoutError:
self.metrics["requests_failed"] += 1
self.logger.error(f"Request timeout on attempt {attempt + 1}")
if attempt == self.rate_limiter.max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)
def _calculate_delay(
self,
headers: Dict[str, str],
attempt: int
) -> float:
"""Calculate retry delay from headers or exponential backoff"""
if "Retry-After" in headers:
return float(headers["Retry-After"])
base_delay = self.rate_limiter.base_delay
max_delay = self.rate_limiter.max_delay
jitter = self.rate_limiter.jitter
delay = min(base_delay * (2 ** attempt), max_delay)
delay += random.uniform(0, jitter * delay)
return delay
def _log_rate_limit_status(self, headers: Dict[str, str]):
"""Log current rate limit status from headers"""
mapping = self.provider.headers_mapping
remaining = headers.get(mapping.get("remaining"))
limit = headers.get(mapping.get("limit"))
if remaining and limit:
usage_pct = (1 - int(remaining) / int(limit)) * 100
if usage_pct > 80:
self.logger.warning(
f"High rate limit usage: {usage_pct:.1f}% "
f"({remaining}/{limit} remaining)"
)
def get_metrics(self) -> Dict[str, Any]:
"""Get current metrics for monitoring"""
return {
**self.metrics,
"circuit_state": self.circuit_breaker.state.value,
"bucket_tokens": self.token_bucket.tokens
}
# Usage example
handler = ProductionRateLimitHandler(
provider="openai",
api_key=api_key
)
# Make requests
response = await handler.make_request(
endpoint="chat/completions",
payload={
"model": "gpt-4",
"messages": [{"role": "user", "content": "Hello!"}]
}
)
# Monitor metrics
print(handler.get_metrics())
Conclusion
Proper rate limit handling is essential for building reliable LLM applications. By implementing exponential backoff, circuit breakers, and proactive queue management, you can ensure your application gracefully handles rate limits while maintaining performance[4].
Key Takeaways
- • Always implement exponential backoff with jitter
- • Parse and respect provider-specific headers
- • Use token buckets for proactive rate limiting
- • Implement circuit breakers for system protection
- • Monitor usage and set alerts at 80% capacity
- • Consider using an API gateway like ParrotRouter for automatic handling
References
- [1] OpenAI. "Rate Limits Guide" (2024)
- [2] Orq.ai. "API Rate Limits Explained: Best Practices for 2025" (2025)
- [3] Zuplo. "10 Best Practices for API Rate Limiting in 2025" (2025)
- [4] Portkey AI. "Tackling Rate Limiting for LLM Apps" (2025)
- [5] Anthropic. "API Rate Limits Documentation" (2024)
- [6] GitHub Block. "Handling LLM Rate Limits with Goose" (2025)
- [7] Vellum. "How to Manage OpenAI Rate Limits as You Scale" (2024)
- [8] Requesty AI. "Rate Limits for LLM Providers: OpenAI, Anthropic, and DeepSeek" (2024)
- [9] Microsoft. "Azure OpenAI Service Quotas and Limits" (2024)
- [10] Google Cloud. "Vertex AI Quotas and Limits" (2024)
- [11] Cohere. "Rate Limits Documentation" (2024)