Quick Fix
Understanding Rate Limit Errors
According to OpenAI's official documentation, rate limiting helps ensure fair usage and prevents abuse. Similar policies are documented inAnthropic's API guide andGoogle's Gemini documentation.
Common Error Messages by Provider
Status: 429
Error: "RateLimitError"
Message: "Rate limit exceeded"
Status: 429
Error: "rate_limit_exceeded"
Headers: Retry-After
Status: 429
Error: "Quota exceeded"
Message: "User Rate Limit Exceeded"
Status: 429
Error: "ThrottlingException"
Headers: Retry-After
Exponential Backoff with Jitter
The gold standard for handling rate limits. This approach prevents thundering herd problems and respects API limits. As recommended in AssemblyAI's best practices guide and demonstrated in OpenAI's Python SDK.
Python Implementation
import time
import random
import requests
from typing import Optional, Dict, Any
class RateLimitHandler:
def __init__(self, max_retries: int = 5, base_delay: float = 1.0, max_delay: float = 60.0):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
def calculate_delay(self, attempt: int, retry_after: Optional[int] = None) -> float:
"""Calculate delay with exponential backoff and jitter"""
if retry_after:
return retry_after
# Exponential backoff: base_delay * 2^attempt
delay = min(self.base_delay * (2 ** attempt), self.max_delay)
# Add jitter to prevent thundering herd
jitter = random.uniform(0, delay * 0.1)
return delay + jitter
def make_request_with_retry(self, url: str, headers: Dict[str, str],
method: str = "POST", **kwargs) -> requests.Response:
"""Make HTTP request with automatic retry on rate limit"""
for attempt in range(self.max_retries):
try:
response = requests.request(method, url, headers=headers, **kwargs)
if response.status_code == 429:
retry_after = response.headers.get("Retry-After")
delay = self.calculate_delay(attempt, int(retry_after) if retry_after else None)
print(f"Rate limited. Waiting {delay:.2f} seconds...")
time.sleep(delay)
continue
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
if attempt == self.max_retries - 1:
raise
print(f"Request failed: {e}. Retrying...")
time.sleep(self.calculate_delay(attempt))
raise Exception("Max retries exceeded")
# Usage example
handler = RateLimitHandler()
response = handler.make_request_with_retry(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": "Bearer YOUR_API_KEY"},
json={"model": "gpt-4", "messages": [{"role": "user", "content": "Hello"}]}
)
TypeScript/JavaScript Implementation
class RateLimitHandler {
constructor(
private maxRetries = 5,
private baseDelay = 1000,
private maxDelay = 60000
) {}
private calculateDelay(attempt: number, retryAfter?: number): number {
if (retryAfter) {
return retryAfter * 1000; // Convert to milliseconds
}
// Exponential backoff with jitter
const delay = Math.min(this.baseDelay * Math.pow(2, attempt), this.maxDelay);
const jitter = Math.random() * delay * 0.1;
return delay + jitter;
}
async makeRequestWithRetry(url: string, options: RequestInit = {}): Promise<Response> {
for (let attempt = 0; attempt < this.maxRetries; attempt++) {
try {
const response = await fetch(url, options);
if (response.status === 429) {
const retryAfter = response.headers.get('Retry-After');
const delay = this.calculateDelay(
attempt,
retryAfter ? parseInt(retryAfter) : undefined
);
console.log(`Rate limited. Waiting ${delay / 1000} seconds...`);
await new Promise(resolve => setTimeout(resolve, delay));
continue;
}
if (!response.ok && response.status !== 429) {
throw new Error(`HTTP error! status: ${response.status}`);
}
return response;
} catch (error) {
if (attempt === this.maxRetries - 1) {
throw error;
}
console.error(`Request failed: ${error.message}. Retrying...`);
await new Promise(resolve =>
setTimeout(resolve, this.calculateDelay(attempt))
);
}
}
throw new Error('Max retries exceeded');
}
}
// Usage example
const handler = new RateLimitHandler();
const response = await handler.makeRequestWithRetry(
'https://api.anthropic.com/v1/messages',
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': 'YOUR_API_KEY',
},
body: JSON.stringify({
model: 'claude-3-opus-20240229',
messages: [{ role: 'user', content: 'Hello' }],
}),
}
);
Go Implementation
package main
import (
"fmt"
"math"
"math/rand"
"net/http"
"strconv"
"time"
)
type RateLimitHandler struct {
MaxRetries int
BaseDelay time.Duration
MaxDelay time.Duration
Client *http.Client
}
func NewRateLimitHandler() *RateLimitHandler {
return &RateLimitHandler{
MaxRetries: 5,
BaseDelay: time.Second,
MaxDelay: time.Minute,
Client: &http.Client{Timeout: 30 * time.Second},
}
}
func (h *RateLimitHandler) calculateDelay(attempt int, retryAfter *int) time.Duration {
if retryAfter != nil {
return time.Duration(*retryAfter) * time.Second
}
// Exponential backoff
delay := time.Duration(math.Min(
float64(h.BaseDelay)*math.Pow(2, float64(attempt)),
float64(h.MaxDelay),
))
// Add jitter (0-10% of delay)
jitter := time.Duration(rand.Float64() * float64(delay) * 0.1)
return delay + jitter
}
func (h *RateLimitHandler) DoRequestWithRetry(req *http.Request) (*http.Response, error) {
for attempt := 0; attempt < h.MaxRetries; attempt++ {
resp, err := h.Client.Do(req)
if err != nil {
if attempt == h.MaxRetries-1 {
return nil, err
}
delay := h.calculateDelay(attempt, nil)
fmt.Printf("Request failed: %v. Waiting %v before retry\n", err, delay)
time.Sleep(delay)
continue
}
if resp.StatusCode == 429 {
var retryAfter *int
if retryAfterStr := resp.Header.Get("Retry-After"); retryAfterStr != "" {
if val, err := strconv.Atoi(retryAfterStr); err == nil {
retryAfter = &val
}
}
delay := h.calculateDelay(attempt, retryAfter)
fmt.Printf("Rate limited (429). Waiting %v before retry\n", delay)
resp.Body.Close()
time.Sleep(delay)
continue
}
return resp, nil
}
return nil, fmt.Errorf("max retries exceeded")
}
Client-Side Rate Limiting
Prevent hitting rate limits by implementing client-side throttling. This pattern is discussed inWeights & Biases' LLMOps guide andthis Stack Overflow discussion.
Token Bucket Algorithm
import threading
import time
class TokenBucketRateLimiter:
"""Token bucket algorithm for client-side rate limiting"""
def __init__(self, rate: float, capacity: int):
self.rate = rate # tokens per second
self.capacity = capacity
self.tokens = capacity
self.last_update = time.monotonic()
self.lock = threading.Lock()
def acquire(self, tokens: int = 1) -> bool:
"""Try to acquire tokens, returns True if successful"""
with self.lock:
now = time.monotonic()
elapsed = now - self.last_update
# Refill tokens
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_update = now
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_and_acquire(self, tokens: int = 1):
"""Wait until tokens are available, then acquire"""
while not self.acquire(tokens):
time.sleep(0.1)
# Usage: Limit to 10 requests per second
limiter = TokenBucketRateLimiter(rate=10, capacity=10)
def make_api_call():
limiter.wait_and_acquire()
# Make your API call here
response = requests.post(...)
return response
Sliding Window Rate Limiter
class SlidingWindowRateLimiter {
private requests: number[] = [];
constructor(
private maxRequests: number,
private windowSeconds: number
) {}
allowRequest(): boolean {
const now = Date.now();
const windowStart = now - (this.windowSeconds * 1000);
// Remove old requests outside the window
this.requests = this.requests.filter(time => time > windowStart);
if (this.requests.length < this.maxRequests) {
this.requests.push(now);
return true;
}
return false;
}
async waitForSlot(): Promise<void> {
while (!this.allowRequest()) {
await new Promise(resolve => setTimeout(resolve, 100));
}
}
}
// Usage: Allow 60 requests per minute
const limiter = new SlidingWindowRateLimiter(60, 60);
async function makeApiCall() {
await limiter.waitForSlot();
// Make your API call
const response = await fetch(...);
return response;
}
Circuit Breaker Pattern
Prevent cascading failures when rate limits are consistently hit. This pattern is detailed inAnyScale's production deployment guide andLangChain's error handling utilities.
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker to prevent cascading failures"""
def __init__(self, failure_threshold: int = 5,
recovery_timeout: int = 60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN - API is rate limited")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except requests.HTTPError as e:
if e.response.status_code == 429:
self._on_failure()
raise e
def _should_attempt_reset(self) -> bool:
return (self.last_failure_time and
datetime.now() - self.last_failure_time >
timedelta(seconds=self.recovery_timeout))
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit breaker opened after {self.failure_count} failures")
# Usage
breaker = CircuitBreaker()
def make_api_request():
response = requests.post(...)
response.raise_for_status()
return response
try:
result = breaker.call(make_api_request)
except Exception as e:
print(f"API call failed: {e}")
Parsing Rate Limit Headers
Extract valuable information from response headers to optimize retry behavior. Header formats are documented inMistral's API docs andCohere's error handling guide.
def parse_rate_limit_headers(response):
"""Extract rate limit information from response headers"""
headers = response.headers
rate_limit_info = {
'limit': headers.get('X-RateLimit-Limit'),
'remaining': headers.get('X-RateLimit-Remaining'),
'reset': headers.get('X-RateLimit-Reset'),
'retry_after': headers.get('Retry-After'),
}
# OpenAI specific headers
if 'x-ratelimit-limit-requests' in headers:
rate_limit_info['limit_requests'] = headers.get('x-ratelimit-limit-requests')
rate_limit_info['remaining_requests'] = headers.get('x-ratelimit-remaining-requests')
rate_limit_info['limit_tokens'] = headers.get('x-ratelimit-limit-tokens')
rate_limit_info['remaining_tokens'] = headers.get('x-ratelimit-remaining-tokens')
# Convert to appropriate types
for key, value in rate_limit_info.items():
if value and key != 'retry_after':
try:
rate_limit_info[key] = int(value)
except ValueError:
pass
return rate_limit_info
# Usage
response = requests.post(...)
if response.status_code == 429:
limits = parse_rate_limit_headers(response)
print(f"Rate limit: {limits['remaining']}/{limits['limit']} remaining")
print(f"Reset at: {datetime.fromtimestamp(limits['reset'])}")
print(f"Retry after: {limits['retry_after']} seconds")
Handling Concurrent Requests
When making multiple API calls concurrently, centralize rate limit handling:
Async Rate Limiting with Python
import asyncio
import aiohttp
from asyncio import Semaphore
class AsyncRateLimiter:
"""Async rate limiter for concurrent requests"""
def __init__(self, rate: int, per: float):
self.rate = rate
self.per = per
self.allowance = rate
self.last_check = asyncio.get_event_loop().time()
self.lock = asyncio.Lock()
self.semaphore = Semaphore(rate)
async def acquire(self):
"""Acquire permission to make a request"""
async with self.semaphore:
async with self.lock:
current = asyncio.get_event_loop().time()
time_passed = current - self.last_check
self.last_check = current
self.allowance += time_passed * (self.rate / self.per)
if self.allowance > self.rate:
self.allowance = self.rate
if self.allowance < 1.0:
sleep_time = (1.0 - self.allowance) * (self.per / self.rate)
await asyncio.sleep(sleep_time)
self.allowance = 0.0
else:
self.allowance -= 1.0
async def make_concurrent_requests(prompts, api_key):
# 10 requests per second
rate_limiter = AsyncRateLimiter(rate=10, per=1.0)
async def fetch_completion(session, prompt):
await rate_limiter.acquire()
async with session.post(
'https://api.openai.com/v1/chat/completions',
json={
'model': 'gpt-3.5-turbo',
'messages': [{'role': 'user', 'content': prompt}]
},
headers={'Authorization': f'Bearer {api_key}'}
) as response:
if response.status == 429:
retry_after = int(response.headers.get('Retry-After', 5))
await asyncio.sleep(retry_after)
return await fetch_completion(session, prompt)
return await response.json()
async with aiohttp.ClientSession() as session:
tasks = [fetch_completion(session, prompt) for prompt in prompts]
return await asyncio.gather(*tasks)
# Usage
prompts = ["Tell me a joke", "What's 2+2?", "Hello!"]
results = await make_concurrent_requests(prompts, "YOUR_API_KEY")
Provider-Specific Rate Limits
Provider | Limits | Headers | Notes |
---|---|---|---|
OpenAI | RPM & TPM based on tier | X-RateLimit-* | Separate limits for requests and tokens |
Anthropic | Model-specific limits | Retry-After | Different limits for Claude models |
Google Vertex | Project quotas | Standard headers | Configurable in GCP console |
AWS Bedrock | Account/region based | Retry-After | Can request limit increases |
Groq | Model-specific | X-RateLimit-* | Very high limits for Mixtral |
Best Practices
Do's
- • Always implement exponential backoff
- • Respect Retry-After headers
- • Use client-side rate limiting
- • Log rate limit errors for monitoring
- • Cache responses when possible
- • Implement circuit breakers for production
Don'ts
- • Don't retry immediately
- • Don't ignore rate limit headers
- • Don't use fixed retry delays
- • Don't make parallel retries
- • Don't share API keys across services
- • Don't bypass client-side limits
Monitoring Rate Limits
Track your API usage to prevent surprises:
import time
from collections import defaultdict
from datetime import datetime, timedelta
class RateLimitMonitor:
"""Monitor and track rate limit usage"""
def __init__(self):
self.requests = defaultdict(list)
self.rate_limit_hits = defaultdict(int)
def log_request(self, endpoint: str, status_code: int, headers: dict):
"""Log API request for monitoring"""
now = datetime.now()
self.requests[endpoint].append({
'timestamp': now,
'status': status_code,
'headers': headers
})
if status_code == 429:
self.rate_limit_hits[endpoint] += 1
# Clean old entries (keep last hour)
cutoff = now - timedelta(hours=1)
self.requests[endpoint] = [
r for r in self.requests[endpoint]
if r['timestamp'] > cutoff
]
def get_usage_stats(self, endpoint: str) -> dict:
"""Get usage statistics for an endpoint"""
requests = self.requests[endpoint]
if not requests:
return {}
total = len(requests)
rate_limited = sum(1 for r in requests if r['status'] == 429)
# Get current limits from latest response
latest = requests[-1]
headers = latest['headers']
return {
'total_requests': total,
'rate_limited_requests': rate_limited,
'rate_limit_percentage': (rate_limited / total * 100) if total > 0 else 0,
'current_limit': headers.get('X-RateLimit-Limit'),
'remaining': headers.get('X-RateLimit-Remaining'),
'reset_time': headers.get('X-RateLimit-Reset'),
}
# Usage
monitor = RateLimitMonitor()
# Log each request
response = requests.post(...)
monitor.log_request(
endpoint='chat/completions',
status_code=response.status_code,
headers=dict(response.headers)
)
# Check usage
stats = monitor.get_usage_stats('chat/completions')
print(f"Rate limited {stats['rate_limit_percentage']:.1f}% of requests")
Emergency Recovery
When you're consistently hitting rate limits:
- Implement request queuing with priority levels
- Use multiple API keys with round-robin distribution
- Enable caching for repeated requests
- Batch requests where supported (e.g., embeddings)
- Upgrade your plan or request limit increases
- Implement fallback providers for critical paths
Pro Tip: Multi-Provider Fallback
Don't let rate limits bring down your application. Implement automatic fallback to alternative providers:
providers = [
{'name': 'openai', 'model': 'gpt-4', 'api_key': 'key1'},
{'name': 'anthropic', 'model': 'claude-3-opus', 'api_key': 'key2'},
{'name': 'google', 'model': 'gemini-pro', 'api_key': 'key3'},
]
for provider in providers:
try:
response = make_api_call(provider)
break
except RateLimitError:
continue # Try next provider
References
- [1] OpenAI. "Error Codes Reference" (2024)
- [2] Anthropic. "API Errors" (2024)
- [3] Stack Overflow. "OpenAI API Questions" (2024)