Reliability
January 15, 202420 min read

Handle Service Unavailable Errors (503)

Service unavailable errors occur during outages, high load, or maintenance. This guide shows how to build resilient systems with fallbacks, monitoring, and graceful degradation.

Understanding 503 Errors

Service unavailable errors indicate temporary issues. Check status pages: OpenAI Status, Anthropic Status, and Google Cloud Status.

Common Causes
  • • Service outages
  • • Planned maintenance
  • • Capacity overload
  • • Regional failures
Error Indicators
  • • HTTP 503 status code
  • • "Service Unavailable" message
  • • Timeout after retries
  • • Gateway errors (502/504)

Multi-Provider Fallback System

Build resilience with multiple providers. Implementation inspired by AnyScale's resilience guide.

import asyncio
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import aiohttp
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
import logging

class MultiProviderLLM:
    """Resilient LLM client with automatic failover"""
    
    def __init__(self):
        self.providers = {
            "openai": {
                "client": AsyncOpenAI(),
                "status_url": "https://status.openai.com/api/v2/status.json",
                "models": ["gpt-4", "gpt-3.5-turbo"],
                "healthy": True,
                "last_check": None
            },
            "anthropic": {
                "client": AsyncAnthropic(),
                "status_url": "https://status.anthropic.com/api/v2/status.json",
                "models": ["claude-3-opus-20240229", "claude-3-sonnet-20240229"],
                "healthy": True,
                "last_check": None
            },
            "google": {
                "client": None,  # Initialize with credentials
                "status_url": "https://status.cloud.google.com/incidents.json",
                "models": ["gemini-pro"],
                "healthy": True,
                "last_check": None
            }
        }
        
        self.logger = logging.getLogger(__name__)
        self.circuit_breakers = {}
        self.initialize_circuit_breakers()
    
    def initialize_circuit_breakers(self):
        """Set up circuit breakers for each provider"""
        for provider in self.providers:
            self.circuit_breakers[provider] = CircuitBreaker(
                failure_threshold=3,
                recovery_timeout=300,  # 5 minutes
                expected_exception=ServiceUnavailableError
            )
    
    async def check_provider_status(self, provider: str) -> bool:
        """Check if provider is operational"""
        
        provider_info = self.providers[provider]
        
        # Check if we need to refresh status
        if provider_info["last_check"]:
            time_since_check = datetime.now() - provider_info["last_check"]
            if time_since_check < timedelta(minutes=5):
                return provider_info["healthy"]
        
        # Check status page
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    provider_info["status_url"],
                    timeout=aiohttp.ClientTimeout(total=5)
                ) as response:
                    if response.status == 200:
                        data = await response.json()
                        
                        # Parse status based on provider format
                        if provider == "openai":
                            status = data.get("status", {}).get("indicator", "none")
                            healthy = status in ["none", "minor"]
                        elif provider == "anthropic":
                            status = data.get("status", {}).get("indicator", "none")
                            healthy = status in ["none", "minor"]
                        else:  # Google
                            # Check for ongoing incidents
                            incidents = data.get("incidents", [])
                            healthy = len(incidents) == 0
                        
                        provider_info["healthy"] = healthy
                        provider_info["last_check"] = datetime.now()
                        
                        if not healthy:
                            self.logger.warning(f"{provider} is experiencing issues")
                        
                        return healthy
                    
        except Exception as e:
            self.logger.error(f"Failed to check {provider} status: {e}")
            
        return provider_info["healthy"]  # Return last known status
    
    async def complete(
        self,
        messages: List[Dict],
        preferred_model: Optional[str] = None,
        max_retries: int = 3
    ) -> Dict[str, Any]:
        """Make completion with automatic failover"""
        
        # Determine provider order
        provider_order = await self.get_provider_priority(preferred_model)
        
        last_error = None
        
        for provider in provider_order:
            # Check circuit breaker
            if self.circuit_breakers[provider].is_open():
                self.logger.info(f"Skipping {provider} - circuit breaker open")
                continue
            
            # Check provider status
            if not await self.check_provider_status(provider):
                self.logger.info(f"Skipping {provider} - unhealthy status")
                continue
            
            try:
                # Try this provider
                result = await self.circuit_breakers[provider].call(
                    self._make_provider_request,
                    provider,
                    messages,
                    preferred_model
                )
                
                # Success - reset other providers' circuit breakers gradually
                asyncio.create_task(self.reset_other_breakers(provider))
                
                return {
                    "provider": provider,
                    "response": result,
                    "fallback": provider != provider_order[0]
                }
                
            except ServiceUnavailableError as e:
                last_error = e
                self.logger.warning(f"{provider} unavailable: {e}")
                continue
                
            except Exception as e:
                last_error = e
                self.logger.error(f"{provider} error: {e}")
                continue
        
        # All providers failed
        raise AllProvidersUnavailableError(
            f"All providers failed. Last error: {last_error}"
        )
    
    async def _make_provider_request(
        self,
        provider: str,
        messages: List[Dict],
        preferred_model: Optional[str]
    ):
        """Make request to specific provider"""
        
        provider_info = self.providers[provider]
        client = provider_info["client"]
        
        # Select model
        if preferred_model and preferred_model in provider_info["models"]:
            model = preferred_model
        else:
            model = provider_info["models"][0]  # Default model
        
        try:
            if provider == "openai":
                response = await client.chat.completions.create(
                    model=model,
                    messages=messages,
                    timeout=30
                )
                return response
                
            elif provider == "anthropic":
                # Convert messages format
                claude_messages = self.convert_to_claude_format(messages)
                response = await client.messages.create(
                    model=model,
                    messages=claude_messages,
                    max_tokens=1000
                )
                return response
                
            elif provider == "google":
                # Google Gemini implementation
                pass
                
        except asyncio.TimeoutError:
            raise ServiceUnavailableError(f"{provider} request timed out")
        except Exception as e:
            if "503" in str(e) or "service_unavailable" in str(e).lower():
                raise ServiceUnavailableError(f"{provider} service unavailable")
            raise
    
    async def get_provider_priority(
        self,
        preferred_model: Optional[str] = None
    ) -> List[str]:
        """Determine provider priority based on health and preferences"""
        
        providers = []
        
        # First, add provider with preferred model
        if preferred_model:
            for provider, info in self.providers.items():
                if preferred_model in info["models"]:
                    providers.append(provider)
                    break
        
        # Then add healthy providers
        for provider, info in self.providers.items():
            if provider not in providers and info["healthy"]:
                providers.append(provider)
        
        # Finally add unhealthy providers as last resort
        for provider in self.providers:
            if provider not in providers:
                providers.append(provider)
        
        return providers
    
    async def reset_other_breakers(self, working_provider: str):
        """Gradually reset circuit breakers for other providers"""
        await asyncio.sleep(60)  # Wait 1 minute
        
        for provider, breaker in self.circuit_breakers.items():
            if provider != working_provider and breaker.is_open():
                breaker.half_open()
                self.logger.info(f"Set {provider} circuit breaker to half-open")


class CircuitBreaker:
    """Circuit breaker for handling service failures"""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 300,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half_open
    
    def is_open(self) -> bool:
        """Check if circuit breaker is open"""
        if self.state == "open":
            # Check if we should try half-open
            if self.last_failure_time:
                time_since_failure = datetime.now() - self.last_failure_time
                if time_since_failure.total_seconds() > self.recovery_timeout:
                    self.state = "half_open"
                    return False
            return True
        return False
    
    def half_open(self):
        """Manually set to half-open state"""
        self.state = "half_open"
    
    async def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker protection"""
        
        if self.is_open():
            raise ServiceUnavailableError("Circuit breaker is open")
        
        try:
            result = await func(*args, **kwargs)
            self.on_success()
            return result
            
        except self.expected_exception as e:
            self.on_failure()
            raise
    
    def on_success(self):
        """Reset on successful call"""
        self.failure_count = 0
        self.state = "closed"
    
    def on_failure(self):
        """Record failure and possibly open circuit"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()
        
        if self.failure_count >= self.failure_threshold:
            self.state = "open"


class ServiceUnavailableError(Exception):
    """Custom exception for 503 errors"""
    pass


class AllProvidersUnavailableError(Exception):
    """All providers are down"""
    pass


# Usage example
async def main():
    client = MultiProviderLLM()
    
    try:
        response = await client.complete(
            messages=[{"role": "user", "content": "Hello!"}],
            preferred_model="gpt-4"
        )
        
        print(f"Response from {response['provider']}")
        if response.get("fallback"):
            print("Note: Using fallback provider")
            
    except AllProvidersUnavailableError:
        print("All LLM providers are currently unavailable")
        # Implement degraded functionality
        

# Run the example
asyncio.run(main())

Provider Health Monitoring

Proactive monitoring prevents surprises. Learn from LangChain's monitoring guide.

import asyncio
import aiohttp
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import json

class LLMHealthMonitor:
    """Monitor health of multiple LLM providers"""
    
    def __init__(self):
        self.providers = {
            "openai": {
                "status_url": "https://status.openai.com/api/v2/status.json",
                "api_endpoint": "https://api.openai.com/v1/models",
                "test_model": "gpt-3.5-turbo"
            },
            "anthropic": {
                "status_url": "https://status.anthropic.com/api/v2/status.json",
                "api_endpoint": "https://api.anthropic.com/v1/messages",
                "test_model": "claude-3-haiku-20240307"
            },
            "google": {
                "status_url": "https://status.cloud.google.com/incidents.json",
                "api_endpoint": "https://generativelanguage.googleapis.com/v1/models",
                "test_model": "gemini-pro"
            }
        }
        
        self.health_history = {}
        self.alerts_sent = {}
    
    async def check_all_providers(self) -> Dict[str, Dict]:
        """Check health of all providers"""
        
        tasks = []
        for provider in self.providers:
            tasks.append(self.check_provider_health(provider))
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        health_report = {}
        for provider, result in zip(self.providers.keys(), results):
            if isinstance(result, Exception):
                health_report[provider] = {
                    "status": "error",
                    "error": str(result),
                    "timestamp": datetime.now().isoformat()
                }
            else:
                health_report[provider] = result
        
        # Store in history
        self.update_health_history(health_report)
        
        # Check for alerts
        await self.check_alerts(health_report)
        
        return health_report
    
    async def check_provider_health(self, provider: str) -> Dict:
        """Check health of a specific provider"""
        
        provider_info = self.providers[provider]
        health = {
            "provider": provider,
            "timestamp": datetime.now().isoformat(),
            "status_page": "unknown",
            "api_responsive": False,
            "latency": None,
            "errors": []
        }
        
        # Check status page
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    provider_info["status_url"],
                    timeout=aiohttp.ClientTimeout(total=5)
                ) as response:
                    if response.status == 200:
                        data = await response.json()
                        
                        if provider in ["openai", "anthropic"]:
                            indicator = data.get("status", {}).get("indicator", "unknown")
                            health["status_page"] = indicator
                            health["status_description"] = data.get("status", {}).get("description", "")
                        else:  # Google
                            incidents = data.get("incidents", [])
                            health["status_page"] = "operational" if len(incidents) == 0 else "incidents"
                            health["incidents"] = len(incidents)
                            
        except Exception as e:
            health["errors"].append(f"Status page error: {str(e)}")
        
        # Check API endpoint
        try:
            start_time = asyncio.get_event_loop().time()
            
            async with aiohttp.ClientSession() as session:
                headers = self.get_auth_headers(provider)
                
                async with session.get(
                    provider_info["api_endpoint"],
                    headers=headers,
                    timeout=aiohttp.ClientTimeout(total=10)
                ) as response:
                    
                    end_time = asyncio.get_event_loop().time()
                    health["latency"] = round((end_time - start_time) * 1000, 2)  # ms
                    
                    if response.status in [200, 401, 403]:  # API is responding
                        health["api_responsive"] = True
                    else:
                        health["api_responsive"] = False
                        health["api_status_code"] = response.status
                        
        except asyncio.TimeoutError:
            health["errors"].append("API timeout")
            health["api_responsive"] = False
        except Exception as e:
            health["errors"].append(f"API error: {str(e)}")
            health["api_responsive"] = False
        
        # Overall health assessment
        health["healthy"] = (
            health["status_page"] in ["none", "minor", "operational"] and
            health["api_responsive"] and
            len(health["errors"]) == 0
        )
        
        return health
    
    def get_auth_headers(self, provider: str) -> Dict[str, str]:
        """Get authentication headers for provider"""
        
        # In production, load from secure storage
        if provider == "openai":
            return {"Authorization": "Bearer YOUR_OPENAI_KEY"}
        elif provider == "anthropic":
            return {"x-api-key": "YOUR_ANTHROPIC_KEY"}
        elif provider == "google":
            return {"X-API-Key": "YOUR_GOOGLE_KEY"}
        
        return {}
    
    def update_health_history(self, health_report: Dict):
        """Update health history for trend analysis"""
        
        timestamp = datetime.now()
        
        for provider, health in health_report.items():
            if provider not in self.health_history:
                self.health_history[provider] = []
            
            self.health_history[provider].append({
                "timestamp": timestamp,
                "healthy": health.get("healthy", False),
                "latency": health.get("latency"),
                "status": health.get("status_page")
            })
            
            # Keep only last 24 hours
            cutoff = timestamp - timedelta(hours=24)
            self.health_history[provider] = [
                h for h in self.health_history[provider]
                if h["timestamp"] > cutoff
            ]
    
    async def check_alerts(self, health_report: Dict):
        """Check if we need to send alerts"""
        
        for provider, health in health_report.items():
            if not health.get("healthy", False):
                # Check if we already sent an alert recently
                last_alert = self.alerts_sent.get(provider)
                if last_alert:
                    time_since_alert = datetime.now() - last_alert
                    if time_since_alert < timedelta(minutes=30):
                        continue  # Don't spam alerts
                
                # Send alert
                await self.send_alert(provider, health)
                self.alerts_sent[provider] = datetime.now()
    
    async def send_alert(self, provider: str, health: Dict):
        """Send alert for unhealthy provider"""
        
        alert_message = f"""
        🚨 LLM Provider Alert: {provider}
        
        Status Page: {health.get('status_page', 'unknown')}
        API Responsive: {health.get('api_responsive', False)}
        Latency: {health.get('latency', 'N/A')}ms
        Errors: {', '.join(health.get('errors', []))}
        
        Time: {health.get('timestamp', 'unknown')}
        """
        
        # Implement your alerting mechanism
        print(alert_message)
        
        # Could send to Slack, email, PagerDuty, etc.
    
    def get_uptime(self, provider: str, hours: int = 24) -> float:
        """Calculate uptime percentage for provider"""
        
        if provider not in self.health_history:
            return 0.0
        
        history = self.health_history[provider]
        if not history:
            return 0.0
        
        total_checks = len(history)
        healthy_checks = sum(1 for h in history if h["healthy"])
        
        return (healthy_checks / total_checks) * 100
    
    def get_average_latency(self, provider: str) -> Optional[float]:
        """Get average latency for provider"""
        
        if provider not in self.health_history:
            return None
        
        latencies = [
            h["latency"] for h in self.health_history[provider]
            if h["latency"] is not None
        ]
        
        if not latencies:
            return None
        
        return sum(latencies) / len(latencies)


# Usage example
async def monitor_providers():
    monitor = LLMHealthMonitor()
    
    # Run continuous monitoring
    while True:
        print("\n=== Health Check ===")
        health_report = await monitor.check_all_providers()
        
        for provider, health in health_report.items():
            status = "✅" if health.get("healthy", False) else "❌"
            print(f"{status} {provider}: {health.get('status_page', 'unknown')} "
                  f"(API: {health.get('api_responsive', False)}, "
                  f"Latency: {health.get('latency', 'N/A')}ms)")
        
        # Show uptime stats
        print("\n=== 24h Uptime ===")
        for provider in monitor.providers:
            uptime = monitor.get_uptime(provider)
            avg_latency = monitor.get_average_latency(provider)
            print(f"{provider}: {uptime:.1f}% uptime, "
                  f"{avg_latency:.0f}ms avg latency" if avg_latency else "N/A")
        
        # Wait before next check
        await asyncio.sleep(300)  # Check every 5 minutes

# Run monitoring
# asyncio.run(monitor_providers())

Graceful Degradation Strategies

from typing import Optional, Dict, Any
import hashlib
import json

class DegradedModeHandler:
    """Handle degraded functionality when LLMs are unavailable"""
    
    def __init__(self, cache_dir: str = "./llm_cache"):
        self.cache_dir = cache_dir
        self.fallback_responses = {
            "greeting": [
                "Hello! How can I help you today?",
                "Hi there! What can I assist you with?",
                "Welcome! How may I help you?"
            ],
            "error": [
                "I'm having trouble processing your request right now.",
                "Our service is temporarily limited. Please try again later.",
                "I apologize for the inconvenience. Some features are currently unavailable."
            ],
            "acknowledgment": [
                "I understand your request.",
                "Got it, let me help with that.",
                "Thank you for your message."
            ]
        }
    
    async def handle_degraded_request(
        self,
        messages: List[Dict],
        intent: Optional[str] = None
    ) -> Dict[str, Any]:
        """Handle request in degraded mode"""
        
        # Try cache first
        cached_response = self.check_cache(messages)
        if cached_response:
            return {
                "response": cached_response,
                "mode": "cached",
                "warning": "Using cached response due to service unavailability"
            }
        
        # Detect intent if not provided
        if not intent:
            intent = self.detect_simple_intent(messages)
        
        # Generate fallback response
        if intent in self.fallback_responses:
            response = self.get_fallback_response(intent)
        else:
            response = self.get_fallback_response("error")
        
        return {
            "response": response,
            "mode": "degraded",
            "warning": "Limited functionality due to service unavailability",
            "intent": intent
        }
    
    def check_cache(self, messages: List[Dict]) -> Optional[str]:
        """Check if we have a cached response"""
        
        # Create cache key from messages
        cache_key = self.create_cache_key(messages)
        cache_file = f"{self.cache_dir}/{cache_key}.json"
        
        try:
            with open(cache_file, 'r') as f:
                cached = json.load(f)
                
                # Check if cache is still valid (24 hours)
                cache_time = datetime.fromisoformat(cached["timestamp"])
                if datetime.now() - cache_time < timedelta(hours=24):
                    return cached["response"]
                    
        except FileNotFoundError:
            pass
        
        return None
    
    def create_cache_key(self, messages: List[Dict]) -> str:
        """Create unique cache key from messages"""
        
        # Serialize messages
        messages_str = json.dumps(messages, sort_keys=True)
        
        # Create hash
        return hashlib.sha256(messages_str.encode()).hexdigest()
    
    def detect_simple_intent(self, messages: List[Dict]) -> str:
        """Simple intent detection for fallback responses"""
        
        if not messages:
            return "error"
        
        last_message = messages[-1].get("content", "").lower()
        
        # Greeting patterns
        greeting_words = ["hello", "hi", "hey", "greetings", "good morning", "good afternoon"]
        if any(word in last_message for word in greeting_words):
            return "greeting"
        
        # Question patterns
        question_words = ["what", "how", "why", "when", "where", "who", "?"]
        if any(word in last_message for word in question_words):
            return "question"
        
        # Command patterns
        command_words = ["please", "can you", "could you", "would you", "help"]
        if any(word in last_message for word in command_words):
            return "request"
        
        return "general"
    
    def get_fallback_response(self, intent: str) -> str:
        """Get appropriate fallback response"""
        
        import random
        
        if intent in self.fallback_responses:
            responses = self.fallback_responses[intent]
            return random.choice(responses)
        
        return self.fallback_responses["error"][0]


# Integration with main application
class ResilientLLMService:
    """Main service with degraded mode support"""
    
    def __init__(self):
        self.multi_provider = MultiProviderLLM()
        self.degraded_handler = DegradedModeHandler()
        self.degraded_mode = False
    
    async def complete(
        self,
        messages: List[Dict],
        allow_degraded: bool = True
    ) -> Dict[str, Any]:
        """Complete with automatic degraded mode fallback"""
        
        try:
            # Try normal completion
            response = await self.multi_provider.complete(messages)
            
            # Cache successful response
            if response.get("response"):
                await self.cache_response(messages, response["response"])
            
            return response
            
        except AllProvidersUnavailableError:
            if not allow_degraded:
                raise
            
            # Switch to degraded mode
            self.degraded_mode = True
            
            return await self.degraded_handler.handle_degraded_request(messages)
    
    async def cache_response(self, messages: List[Dict], response: Any):
        """Cache successful responses for degraded mode"""
        
        cache_key = self.degraded_handler.create_cache_key(messages)
        cache_file = f"{self.degraded_handler.cache_dir}/{cache_key}.json"
        
        cache_data = {
            "timestamp": datetime.now().isoformat(),
            "messages": messages,
            "response": str(response)
        }
        
        try:
            os.makedirs(self.degraded_handler.cache_dir, exist_ok=True)
            with open(cache_file, 'w') as f:
                json.dump(cache_data, f)
        except Exception as e:
            # Don't fail on cache errors
            pass

Service Status Dashboard

// React component for status dashboard
import React, { useState, useEffect } from 'react';
import { Activity, AlertCircle, CheckCircle, Clock } from 'lucide-react';

interface ProviderStatus {
  name: string;
  healthy: boolean;
  latency: number | null;
  uptime: number;
  lastChecked: string;
  incidents: string[];
}

export function ServiceStatusDashboard() {
  const [providers, setProviders] = useState<ProviderStatus[]>([]);
  const [loading, setLoading] = useState(true);
  
  useEffect(() => {
    const checkStatus = async () => {
      try {
        const response = await fetch('/api/llm-health');
        const data = await response.json();
        setProviders(data.providers);
      } catch (error) {
        console.error('Failed to fetch status:', error);
      } finally {
        setLoading(false);
      }
    };
    
    // Check immediately
    checkStatus();
    
    // Then check every minute
    const interval = setInterval(checkStatus, 60000);
    return () => clearInterval(interval);
  }, []);
  
  const getStatusIcon = (healthy: boolean) => {
    return healthy ? (
      <CheckCircle className="h-5 w-5 text-green-500" />
    ) : (
      <AlertCircle className="h-5 w-5 text-red-500" />
    );
  };
  
  const getStatusColor = (healthy: boolean) => {
    return healthy ? 'bg-green-500' : 'bg-red-500';
  };
  
  if (loading) {
    return <div>Loading status...</div>;
  }
  
  return (
    <div className="space-y-6">
      <div className="flex items-center justify-between">
        <h2 className="text-2xl font-bold">LLM Provider Status</h2>
        <div className="text-sm text-muted-foreground">
          Last updated: {new Date().toLocaleTimeString()}
        </div>
      </div>
      
      <div className="grid gap-4 md:grid-cols-3">
        {providers.map((provider) => (
          <Card key={provider.name} className="p-6">
            <div className="flex items-center justify-between mb-4">
              <h3 className="text-lg font-semibold">{provider.name}</h3>
              {getStatusIcon(provider.healthy)}
            </div>
            
            <div className="space-y-2">
              <div className="flex justify-between">
                <span className="text-sm text-muted-foreground">Status</span>
                <span className={`text-sm font-medium ${
                  provider.healthy ? 'text-green-500' : 'text-red-500'
                }`}>
                  {provider.healthy ? 'Operational' : 'Degraded'}
                </span>
              </div>
              
              <div className="flex justify-between">
                <span className="text-sm text-muted-foreground">Latency</span>
                <span className="text-sm font-medium">
                  {provider.latency ? `${provider.latency}ms` : 'N/A'}
                </span>
              </div>
              
              <div className="flex justify-between">
                <span className="text-sm text-muted-foreground">24h Uptime</span>
                <span className="text-sm font-medium">
                  {provider.uptime.toFixed(1)}%
                </span>
              </div>
              
              {provider.incidents.length > 0 && (
                <div className="mt-2 p-2 bg-red-50 rounded">
                  <p className="text-xs text-red-600">
                    Active incidents: {provider.incidents.join(', ')}
                  </p>
                </div>
              )}
            </div>
            
            <div className="mt-4">
              <div className="w-full bg-gray-200 rounded-full h-2">
                <div
                  className={`h-2 rounded-full ${getStatusColor(provider.healthy)}`}
                  style={{ width: `${provider.uptime}%` }}
                />
              </div>
            </div>
          </Card>
        ))}
      </div>
      
      {providers.every(p => !p.healthy) && (
        <Alert className="bg-red-50 border-red-200">
          <AlertCircle className="h-4 w-4 text-red-600" />
          <AlertDescription className="text-red-600">
            All LLM providers are currently experiencing issues. 
            Service is running in degraded mode with limited functionality.
          </AlertDescription>
        </Alert>
      )}
    </div>
  );
}

Best Practices

Status Pages

Monitor official provider status pages.

Circuit Breakers

Implement circuit breaker patterns.

Learn more →

Fallback Guide

Multi-provider fallback strategies.

View guide →

References

  1. [1] OpenAI. "Error Codes Reference" (2024)
  2. [2] Anthropic. "API Errors" (2024)
  3. [3] Stack Overflow. "OpenAI API Questions" (2024)