Quick Fix
Implement retry logic with exponential backoff, use multiple provider fallbacks, and monitor service status pages for real-time updates.
Understanding 503 Errors
Service unavailable errors indicate temporary issues. Check status pages: OpenAI Status, Anthropic Status, and Google Cloud Status.
Common Causes
- • Service outages
- • Planned maintenance
- • Capacity overload
- • Regional failures
Error Indicators
- • HTTP 503 status code
- • "Service Unavailable" message
- • Timeout after retries
- • Gateway errors (502/504)
Multi-Provider Fallback System
Build resilience with multiple providers. Implementation inspired by AnyScale's resilience guide.
import asyncio
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import aiohttp
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
import logging
class MultiProviderLLM:
"""Resilient LLM client with automatic failover"""
def __init__(self):
self.providers = {
"openai": {
"client": AsyncOpenAI(),
"status_url": "https://status.openai.com/api/v2/status.json",
"models": ["gpt-4", "gpt-3.5-turbo"],
"healthy": True,
"last_check": None
},
"anthropic": {
"client": AsyncAnthropic(),
"status_url": "https://status.anthropic.com/api/v2/status.json",
"models": ["claude-3-opus-20240229", "claude-3-sonnet-20240229"],
"healthy": True,
"last_check": None
},
"google": {
"client": None, # Initialize with credentials
"status_url": "https://status.cloud.google.com/incidents.json",
"models": ["gemini-pro"],
"healthy": True,
"last_check": None
}
}
self.logger = logging.getLogger(__name__)
self.circuit_breakers = {}
self.initialize_circuit_breakers()
def initialize_circuit_breakers(self):
"""Set up circuit breakers for each provider"""
for provider in self.providers:
self.circuit_breakers[provider] = CircuitBreaker(
failure_threshold=3,
recovery_timeout=300, # 5 minutes
expected_exception=ServiceUnavailableError
)
async def check_provider_status(self, provider: str) -> bool:
"""Check if provider is operational"""
provider_info = self.providers[provider]
# Check if we need to refresh status
if provider_info["last_check"]:
time_since_check = datetime.now() - provider_info["last_check"]
if time_since_check < timedelta(minutes=5):
return provider_info["healthy"]
# Check status page
try:
async with aiohttp.ClientSession() as session:
async with session.get(
provider_info["status_url"],
timeout=aiohttp.ClientTimeout(total=5)
) as response:
if response.status == 200:
data = await response.json()
# Parse status based on provider format
if provider == "openai":
status = data.get("status", {}).get("indicator", "none")
healthy = status in ["none", "minor"]
elif provider == "anthropic":
status = data.get("status", {}).get("indicator", "none")
healthy = status in ["none", "minor"]
else: # Google
# Check for ongoing incidents
incidents = data.get("incidents", [])
healthy = len(incidents) == 0
provider_info["healthy"] = healthy
provider_info["last_check"] = datetime.now()
if not healthy:
self.logger.warning(f"{provider} is experiencing issues")
return healthy
except Exception as e:
self.logger.error(f"Failed to check {provider} status: {e}")
return provider_info["healthy"] # Return last known status
async def complete(
self,
messages: List[Dict],
preferred_model: Optional[str] = None,
max_retries: int = 3
) -> Dict[str, Any]:
"""Make completion with automatic failover"""
# Determine provider order
provider_order = await self.get_provider_priority(preferred_model)
last_error = None
for provider in provider_order:
# Check circuit breaker
if self.circuit_breakers[provider].is_open():
self.logger.info(f"Skipping {provider} - circuit breaker open")
continue
# Check provider status
if not await self.check_provider_status(provider):
self.logger.info(f"Skipping {provider} - unhealthy status")
continue
try:
# Try this provider
result = await self.circuit_breakers[provider].call(
self._make_provider_request,
provider,
messages,
preferred_model
)
# Success - reset other providers' circuit breakers gradually
asyncio.create_task(self.reset_other_breakers(provider))
return {
"provider": provider,
"response": result,
"fallback": provider != provider_order[0]
}
except ServiceUnavailableError as e:
last_error = e
self.logger.warning(f"{provider} unavailable: {e}")
continue
except Exception as e:
last_error = e
self.logger.error(f"{provider} error: {e}")
continue
# All providers failed
raise AllProvidersUnavailableError(
f"All providers failed. Last error: {last_error}"
)
async def _make_provider_request(
self,
provider: str,
messages: List[Dict],
preferred_model: Optional[str]
):
"""Make request to specific provider"""
provider_info = self.providers[provider]
client = provider_info["client"]
# Select model
if preferred_model and preferred_model in provider_info["models"]:
model = preferred_model
else:
model = provider_info["models"][0] # Default model
try:
if provider == "openai":
response = await client.chat.completions.create(
model=model,
messages=messages,
timeout=30
)
return response
elif provider == "anthropic":
# Convert messages format
claude_messages = self.convert_to_claude_format(messages)
response = await client.messages.create(
model=model,
messages=claude_messages,
max_tokens=1000
)
return response
elif provider == "google":
# Google Gemini implementation
pass
except asyncio.TimeoutError:
raise ServiceUnavailableError(f"{provider} request timed out")
except Exception as e:
if "503" in str(e) or "service_unavailable" in str(e).lower():
raise ServiceUnavailableError(f"{provider} service unavailable")
raise
async def get_provider_priority(
self,
preferred_model: Optional[str] = None
) -> List[str]:
"""Determine provider priority based on health and preferences"""
providers = []
# First, add provider with preferred model
if preferred_model:
for provider, info in self.providers.items():
if preferred_model in info["models"]:
providers.append(provider)
break
# Then add healthy providers
for provider, info in self.providers.items():
if provider not in providers and info["healthy"]:
providers.append(provider)
# Finally add unhealthy providers as last resort
for provider in self.providers:
if provider not in providers:
providers.append(provider)
return providers
async def reset_other_breakers(self, working_provider: str):
"""Gradually reset circuit breakers for other providers"""
await asyncio.sleep(60) # Wait 1 minute
for provider, breaker in self.circuit_breakers.items():
if provider != working_provider and breaker.is_open():
breaker.half_open()
self.logger.info(f"Set {provider} circuit breaker to half-open")
class CircuitBreaker:
"""Circuit breaker for handling service failures"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 300,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = "closed" # closed, open, half_open
def is_open(self) -> bool:
"""Check if circuit breaker is open"""
if self.state == "open":
# Check if we should try half-open
if self.last_failure_time:
time_since_failure = datetime.now() - self.last_failure_time
if time_since_failure.total_seconds() > self.recovery_timeout:
self.state = "half_open"
return False
return True
return False
def half_open(self):
"""Manually set to half-open state"""
self.state = "half_open"
async def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection"""
if self.is_open():
raise ServiceUnavailableError("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
self.on_success()
return result
except self.expected_exception as e:
self.on_failure()
raise
def on_success(self):
"""Reset on successful call"""
self.failure_count = 0
self.state = "closed"
def on_failure(self):
"""Record failure and possibly open circuit"""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = "open"
class ServiceUnavailableError(Exception):
"""Custom exception for 503 errors"""
pass
class AllProvidersUnavailableError(Exception):
"""All providers are down"""
pass
# Usage example
async def main():
client = MultiProviderLLM()
try:
response = await client.complete(
messages=[{"role": "user", "content": "Hello!"}],
preferred_model="gpt-4"
)
print(f"Response from {response['provider']}")
if response.get("fallback"):
print("Note: Using fallback provider")
except AllProvidersUnavailableError:
print("All LLM providers are currently unavailable")
# Implement degraded functionality
# Run the example
asyncio.run(main())
Provider Health Monitoring
Proactive monitoring prevents surprises. Learn from LangChain's monitoring guide.
import asyncio
import aiohttp
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import json
class LLMHealthMonitor:
"""Monitor health of multiple LLM providers"""
def __init__(self):
self.providers = {
"openai": {
"status_url": "https://status.openai.com/api/v2/status.json",
"api_endpoint": "https://api.openai.com/v1/models",
"test_model": "gpt-3.5-turbo"
},
"anthropic": {
"status_url": "https://status.anthropic.com/api/v2/status.json",
"api_endpoint": "https://api.anthropic.com/v1/messages",
"test_model": "claude-3-haiku-20240307"
},
"google": {
"status_url": "https://status.cloud.google.com/incidents.json",
"api_endpoint": "https://generativelanguage.googleapis.com/v1/models",
"test_model": "gemini-pro"
}
}
self.health_history = {}
self.alerts_sent = {}
async def check_all_providers(self) -> Dict[str, Dict]:
"""Check health of all providers"""
tasks = []
for provider in self.providers:
tasks.append(self.check_provider_health(provider))
results = await asyncio.gather(*tasks, return_exceptions=True)
health_report = {}
for provider, result in zip(self.providers.keys(), results):
if isinstance(result, Exception):
health_report[provider] = {
"status": "error",
"error": str(result),
"timestamp": datetime.now().isoformat()
}
else:
health_report[provider] = result
# Store in history
self.update_health_history(health_report)
# Check for alerts
await self.check_alerts(health_report)
return health_report
async def check_provider_health(self, provider: str) -> Dict:
"""Check health of a specific provider"""
provider_info = self.providers[provider]
health = {
"provider": provider,
"timestamp": datetime.now().isoformat(),
"status_page": "unknown",
"api_responsive": False,
"latency": None,
"errors": []
}
# Check status page
try:
async with aiohttp.ClientSession() as session:
async with session.get(
provider_info["status_url"],
timeout=aiohttp.ClientTimeout(total=5)
) as response:
if response.status == 200:
data = await response.json()
if provider in ["openai", "anthropic"]:
indicator = data.get("status", {}).get("indicator", "unknown")
health["status_page"] = indicator
health["status_description"] = data.get("status", {}).get("description", "")
else: # Google
incidents = data.get("incidents", [])
health["status_page"] = "operational" if len(incidents) == 0 else "incidents"
health["incidents"] = len(incidents)
except Exception as e:
health["errors"].append(f"Status page error: {str(e)}")
# Check API endpoint
try:
start_time = asyncio.get_event_loop().time()
async with aiohttp.ClientSession() as session:
headers = self.get_auth_headers(provider)
async with session.get(
provider_info["api_endpoint"],
headers=headers,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
end_time = asyncio.get_event_loop().time()
health["latency"] = round((end_time - start_time) * 1000, 2) # ms
if response.status in [200, 401, 403]: # API is responding
health["api_responsive"] = True
else:
health["api_responsive"] = False
health["api_status_code"] = response.status
except asyncio.TimeoutError:
health["errors"].append("API timeout")
health["api_responsive"] = False
except Exception as e:
health["errors"].append(f"API error: {str(e)}")
health["api_responsive"] = False
# Overall health assessment
health["healthy"] = (
health["status_page"] in ["none", "minor", "operational"] and
health["api_responsive"] and
len(health["errors"]) == 0
)
return health
def get_auth_headers(self, provider: str) -> Dict[str, str]:
"""Get authentication headers for provider"""
# In production, load from secure storage
if provider == "openai":
return {"Authorization": "Bearer YOUR_OPENAI_KEY"}
elif provider == "anthropic":
return {"x-api-key": "YOUR_ANTHROPIC_KEY"}
elif provider == "google":
return {"X-API-Key": "YOUR_GOOGLE_KEY"}
return {}
def update_health_history(self, health_report: Dict):
"""Update health history for trend analysis"""
timestamp = datetime.now()
for provider, health in health_report.items():
if provider not in self.health_history:
self.health_history[provider] = []
self.health_history[provider].append({
"timestamp": timestamp,
"healthy": health.get("healthy", False),
"latency": health.get("latency"),
"status": health.get("status_page")
})
# Keep only last 24 hours
cutoff = timestamp - timedelta(hours=24)
self.health_history[provider] = [
h for h in self.health_history[provider]
if h["timestamp"] > cutoff
]
async def check_alerts(self, health_report: Dict):
"""Check if we need to send alerts"""
for provider, health in health_report.items():
if not health.get("healthy", False):
# Check if we already sent an alert recently
last_alert = self.alerts_sent.get(provider)
if last_alert:
time_since_alert = datetime.now() - last_alert
if time_since_alert < timedelta(minutes=30):
continue # Don't spam alerts
# Send alert
await self.send_alert(provider, health)
self.alerts_sent[provider] = datetime.now()
async def send_alert(self, provider: str, health: Dict):
"""Send alert for unhealthy provider"""
alert_message = f"""
🚨 LLM Provider Alert: {provider}
Status Page: {health.get('status_page', 'unknown')}
API Responsive: {health.get('api_responsive', False)}
Latency: {health.get('latency', 'N/A')}ms
Errors: {', '.join(health.get('errors', []))}
Time: {health.get('timestamp', 'unknown')}
"""
# Implement your alerting mechanism
print(alert_message)
# Could send to Slack, email, PagerDuty, etc.
def get_uptime(self, provider: str, hours: int = 24) -> float:
"""Calculate uptime percentage for provider"""
if provider not in self.health_history:
return 0.0
history = self.health_history[provider]
if not history:
return 0.0
total_checks = len(history)
healthy_checks = sum(1 for h in history if h["healthy"])
return (healthy_checks / total_checks) * 100
def get_average_latency(self, provider: str) -> Optional[float]:
"""Get average latency for provider"""
if provider not in self.health_history:
return None
latencies = [
h["latency"] for h in self.health_history[provider]
if h["latency"] is not None
]
if not latencies:
return None
return sum(latencies) / len(latencies)
# Usage example
async def monitor_providers():
monitor = LLMHealthMonitor()
# Run continuous monitoring
while True:
print("\n=== Health Check ===")
health_report = await monitor.check_all_providers()
for provider, health in health_report.items():
status = "✅" if health.get("healthy", False) else "❌"
print(f"{status} {provider}: {health.get('status_page', 'unknown')} "
f"(API: {health.get('api_responsive', False)}, "
f"Latency: {health.get('latency', 'N/A')}ms)")
# Show uptime stats
print("\n=== 24h Uptime ===")
for provider in monitor.providers:
uptime = monitor.get_uptime(provider)
avg_latency = monitor.get_average_latency(provider)
print(f"{provider}: {uptime:.1f}% uptime, "
f"{avg_latency:.0f}ms avg latency" if avg_latency else "N/A")
# Wait before next check
await asyncio.sleep(300) # Check every 5 minutes
# Run monitoring
# asyncio.run(monitor_providers())
Graceful Degradation Strategies
from typing import Optional, Dict, Any
import hashlib
import json
class DegradedModeHandler:
"""Handle degraded functionality when LLMs are unavailable"""
def __init__(self, cache_dir: str = "./llm_cache"):
self.cache_dir = cache_dir
self.fallback_responses = {
"greeting": [
"Hello! How can I help you today?",
"Hi there! What can I assist you with?",
"Welcome! How may I help you?"
],
"error": [
"I'm having trouble processing your request right now.",
"Our service is temporarily limited. Please try again later.",
"I apologize for the inconvenience. Some features are currently unavailable."
],
"acknowledgment": [
"I understand your request.",
"Got it, let me help with that.",
"Thank you for your message."
]
}
async def handle_degraded_request(
self,
messages: List[Dict],
intent: Optional[str] = None
) -> Dict[str, Any]:
"""Handle request in degraded mode"""
# Try cache first
cached_response = self.check_cache(messages)
if cached_response:
return {
"response": cached_response,
"mode": "cached",
"warning": "Using cached response due to service unavailability"
}
# Detect intent if not provided
if not intent:
intent = self.detect_simple_intent(messages)
# Generate fallback response
if intent in self.fallback_responses:
response = self.get_fallback_response(intent)
else:
response = self.get_fallback_response("error")
return {
"response": response,
"mode": "degraded",
"warning": "Limited functionality due to service unavailability",
"intent": intent
}
def check_cache(self, messages: List[Dict]) -> Optional[str]:
"""Check if we have a cached response"""
# Create cache key from messages
cache_key = self.create_cache_key(messages)
cache_file = f"{self.cache_dir}/{cache_key}.json"
try:
with open(cache_file, 'r') as f:
cached = json.load(f)
# Check if cache is still valid (24 hours)
cache_time = datetime.fromisoformat(cached["timestamp"])
if datetime.now() - cache_time < timedelta(hours=24):
return cached["response"]
except FileNotFoundError:
pass
return None
def create_cache_key(self, messages: List[Dict]) -> str:
"""Create unique cache key from messages"""
# Serialize messages
messages_str = json.dumps(messages, sort_keys=True)
# Create hash
return hashlib.sha256(messages_str.encode()).hexdigest()
def detect_simple_intent(self, messages: List[Dict]) -> str:
"""Simple intent detection for fallback responses"""
if not messages:
return "error"
last_message = messages[-1].get("content", "").lower()
# Greeting patterns
greeting_words = ["hello", "hi", "hey", "greetings", "good morning", "good afternoon"]
if any(word in last_message for word in greeting_words):
return "greeting"
# Question patterns
question_words = ["what", "how", "why", "when", "where", "who", "?"]
if any(word in last_message for word in question_words):
return "question"
# Command patterns
command_words = ["please", "can you", "could you", "would you", "help"]
if any(word in last_message for word in command_words):
return "request"
return "general"
def get_fallback_response(self, intent: str) -> str:
"""Get appropriate fallback response"""
import random
if intent in self.fallback_responses:
responses = self.fallback_responses[intent]
return random.choice(responses)
return self.fallback_responses["error"][0]
# Integration with main application
class ResilientLLMService:
"""Main service with degraded mode support"""
def __init__(self):
self.multi_provider = MultiProviderLLM()
self.degraded_handler = DegradedModeHandler()
self.degraded_mode = False
async def complete(
self,
messages: List[Dict],
allow_degraded: bool = True
) -> Dict[str, Any]:
"""Complete with automatic degraded mode fallback"""
try:
# Try normal completion
response = await self.multi_provider.complete(messages)
# Cache successful response
if response.get("response"):
await self.cache_response(messages, response["response"])
return response
except AllProvidersUnavailableError:
if not allow_degraded:
raise
# Switch to degraded mode
self.degraded_mode = True
return await self.degraded_handler.handle_degraded_request(messages)
async def cache_response(self, messages: List[Dict], response: Any):
"""Cache successful responses for degraded mode"""
cache_key = self.degraded_handler.create_cache_key(messages)
cache_file = f"{self.degraded_handler.cache_dir}/{cache_key}.json"
cache_data = {
"timestamp": datetime.now().isoformat(),
"messages": messages,
"response": str(response)
}
try:
os.makedirs(self.degraded_handler.cache_dir, exist_ok=True)
with open(cache_file, 'w') as f:
json.dump(cache_data, f)
except Exception as e:
# Don't fail on cache errors
pass
Service Status Dashboard
// React component for status dashboard
import React, { useState, useEffect } from 'react';
import { Activity, AlertCircle, CheckCircle, Clock } from 'lucide-react';
interface ProviderStatus {
name: string;
healthy: boolean;
latency: number | null;
uptime: number;
lastChecked: string;
incidents: string[];
}
export function ServiceStatusDashboard() {
const [providers, setProviders] = useState<ProviderStatus[]>([]);
const [loading, setLoading] = useState(true);
useEffect(() => {
const checkStatus = async () => {
try {
const response = await fetch('/api/llm-health');
const data = await response.json();
setProviders(data.providers);
} catch (error) {
console.error('Failed to fetch status:', error);
} finally {
setLoading(false);
}
};
// Check immediately
checkStatus();
// Then check every minute
const interval = setInterval(checkStatus, 60000);
return () => clearInterval(interval);
}, []);
const getStatusIcon = (healthy: boolean) => {
return healthy ? (
<CheckCircle className="h-5 w-5 text-green-500" />
) : (
<AlertCircle className="h-5 w-5 text-red-500" />
);
};
const getStatusColor = (healthy: boolean) => {
return healthy ? 'bg-green-500' : 'bg-red-500';
};
if (loading) {
return <div>Loading status...</div>;
}
return (
<div className="space-y-6">
<div className="flex items-center justify-between">
<h2 className="text-2xl font-bold">LLM Provider Status</h2>
<div className="text-sm text-muted-foreground">
Last updated: {new Date().toLocaleTimeString()}
</div>
</div>
<div className="grid gap-4 md:grid-cols-3">
{providers.map((provider) => (
<Card key={provider.name} className="p-6">
<div className="flex items-center justify-between mb-4">
<h3 className="text-lg font-semibold">{provider.name}</h3>
{getStatusIcon(provider.healthy)}
</div>
<div className="space-y-2">
<div className="flex justify-between">
<span className="text-sm text-muted-foreground">Status</span>
<span className={`text-sm font-medium ${
provider.healthy ? 'text-green-500' : 'text-red-500'
}`}>
{provider.healthy ? 'Operational' : 'Degraded'}
</span>
</div>
<div className="flex justify-between">
<span className="text-sm text-muted-foreground">Latency</span>
<span className="text-sm font-medium">
{provider.latency ? `${provider.latency}ms` : 'N/A'}
</span>
</div>
<div className="flex justify-between">
<span className="text-sm text-muted-foreground">24h Uptime</span>
<span className="text-sm font-medium">
{provider.uptime.toFixed(1)}%
</span>
</div>
{provider.incidents.length > 0 && (
<div className="mt-2 p-2 bg-red-50 rounded">
<p className="text-xs text-red-600">
Active incidents: {provider.incidents.join(', ')}
</p>
</div>
)}
</div>
<div className="mt-4">
<div className="w-full bg-gray-200 rounded-full h-2">
<div
className={`h-2 rounded-full ${getStatusColor(provider.healthy)}`}
style={{ width: `${provider.uptime}%` }}
/>
</div>
</div>
</Card>
))}
</div>
{providers.every(p => !p.healthy) && (
<Alert className="bg-red-50 border-red-200">
<AlertCircle className="h-4 w-4 text-red-600" />
<AlertDescription className="text-red-600">
All LLM providers are currently experiencing issues.
Service is running in degraded mode with limited functionality.
</AlertDescription>
</Alert>
)}
</div>
);
}
Best Practices
Do's
- • Implement multi-provider fallback
- • Use circuit breakers
- • Monitor provider status pages
- • Cache successful responses
- • Implement graceful degradation
- • Set up alerting systems
- • Test failover regularly
Don'ts
- • Don't rely on single provider
- • Don't ignore status updates
- • Don't retry indefinitely
- • Don't hide outages from users
- • Don't skip error handling
- • Don't forget timeout settings
- • Don't cache sensitive data
References
- [1] OpenAI. "Error Codes Reference" (2024)
- [2] Anthropic. "API Errors" (2024)
- [3] Stack Overflow. "OpenAI API Questions" (2024)