Advanced
January 16, 202412 min read

Building a Multi-Provider LLM Fallback System

Implement a robust fallback system that automatically switches between LLM providers to ensure 99.9% uptime for your AI applications.

Architecture Overview

A production-ready multi-provider system combines intelligent routing, health monitoring, and seamless fallback to ensure your AI applications remain available even during provider outages[1].

High-Level Architecture

API Gateway

Single endpoint for all LLM requests

Entry Point

Health Monitor

Continuous provider health checks

Load Balancer

Intelligent request routing

Fallback Engine

Automatic provider switching

OpenAI
GPT-4, GPT-3.5
Anthropic
Claude 3
Google
Gemini Pro
Cohere
Command

Core Implementation

Let's build a production-ready multi-provider system with health checks, intelligent routing, and automatic fallback[2]:

Python Implementation
import asyncio
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from enum import Enum
import aiohttp
import logging

class ProviderStatus(Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"

@dataclass
class ProviderConfig:
    name: str
    api_key: str
    base_url: str
    model: str
    timeout: int = 30
    max_retries: int = 3
    cost_per_1k_tokens: float = 0.01
    priority: int = 1
    weight: float = 1.0

class ProviderAdapter(ABC):
    """Abstract base class for provider adapters"""
    
    def __init__(self, config: ProviderConfig):
        self.config = config
        self.status = ProviderStatus.HEALTHY
        self.last_health_check = 0
        self.consecutive_failures = 0
        self.total_requests = 0
        self.total_latency = 0
        self.logger = logging.getLogger(f"{__name__}.{config.name}")
    
    @abstractmethod
    async def complete(self, messages: List[Dict], **kwargs) -> Dict[str, Any]:
        """Make completion request to provider"""
        pass
    
    @abstractmethod
    async def health_check(self) -> bool:
        """Check if provider is healthy"""
        pass
    
    def normalize_response(self, provider_response: Dict) -> Dict[str, Any]:
        """Normalize provider response to standard format"""
        return {
            "content": self.extract_content(provider_response),
            "usage": self.extract_usage(provider_response),
            "provider": self.config.name,
            "model": self.config.model,
            "latency": getattr(self, '_last_latency', 0)
        }
    
    @abstractmethod
    def extract_content(self, response: Dict) -> str:
        """Extract content from provider-specific response"""
        pass
    
    @abstractmethod
    def extract_usage(self, response: Dict) -> Dict[str, int]:
        """Extract token usage from provider response"""
        pass
    
    async def make_request(self, messages: List[Dict], **kwargs) -> Dict[str, Any]:
        """Make request with timing and error handling"""
        start_time = time.time()
        
        try:
            response = await self.complete(messages, **kwargs)
            self._last_latency = time.time() - start_time
            self.total_latency += self._last_latency
            self.total_requests += 1
            self.consecutive_failures = 0
            
            return self.normalize_response(response)
            
        except Exception as e:
            self.consecutive_failures += 1
            self.logger.error(f"Request failed: {e}")
            
            # Update status based on failures
            if self.consecutive_failures >= 3:
                self.status = ProviderStatus.UNHEALTHY
            elif self.consecutive_failures >= 1:
                self.status = ProviderStatus.DEGRADED
            
            raise

class OpenAIAdapter(ProviderAdapter):
    """OpenAI-specific adapter"""
    
    async def complete(self, messages: List[Dict], **kwargs) -> Dict[str, Any]:
        async with aiohttp.ClientSession() as session:
            headers = {
                "Authorization": f"Bearer {self.config.api_key}",
                "Content-Type": "application/json"
            }
            
            payload = {
                "model": self.config.model,
                "messages": messages,
                **kwargs
            }
            
            async with session.post(
                f"{self.config.base_url}/chat/completions",
                json=payload,
                headers=headers,
                timeout=aiohttp.ClientTimeout(total=self.config.timeout)
            ) as response:
                if response.status != 200:
                    error_data = await response.text()
                    raise Exception(f"OpenAI error {response.status}: {error_data}")
                
                return await response.json()
    
    async def health_check(self) -> bool:
        try:
            await self.complete([{"role": "user", "content": "Hi"}], max_tokens=5)
            self.status = ProviderStatus.HEALTHY
            return True
        except:
            return False
    
    def extract_content(self, response: Dict) -> str:
        return response["choices"][0]["message"]["content"]
    
    def extract_usage(self, response: Dict) -> Dict[str, int]:
        return response.get("usage", {})

class AnthropicAdapter(ProviderAdapter):
    """Anthropic Claude adapter"""
    
    async def complete(self, messages: List[Dict], **kwargs) -> Dict[str, Any]:
        async with aiohttp.ClientSession() as session:
            headers = {
                "x-api-key": self.config.api_key,
                "anthropic-version": "2023-06-01",
                "Content-Type": "application/json"
            }
            
            # Convert messages to Anthropic format
            system_msg = next((m["content"] for m in messages if m["role"] == "system"), None)
            user_msgs = [m for m in messages if m["role"] != "system"]
            
            payload = {
                "model": self.config.model,
                "messages": user_msgs,
                "max_tokens": kwargs.get("max_tokens", 1000),
            }
            
            if system_msg:
                payload["system"] = system_msg
            
            async with session.post(
                f"{self.config.base_url}/messages",
                json=payload,
                headers=headers,
                timeout=aiohttp.ClientTimeout(total=self.config.timeout)
            ) as response:
                if response.status != 200:
                    error_data = await response.text()
                    raise Exception(f"Anthropic error {response.status}: {error_data}")
                
                return await response.json()
    
    async def health_check(self) -> bool:
        try:
            await self.complete([{"role": "user", "content": "Hi"}], max_tokens=5)
            self.status = ProviderStatus.HEALTHY
            return True
        except:
            return False
    
    def extract_content(self, response: Dict) -> str:
        return response["content"][0]["text"]
    
    def extract_usage(self, response: Dict) -> Dict[str, int]:
        return {
            "prompt_tokens": response.get("usage", {}).get("input_tokens", 0),
            "completion_tokens": response.get("usage", {}).get("output_tokens", 0),
            "total_tokens": response.get("usage", {}).get("input_tokens", 0) + 
                          response.get("usage", {}).get("output_tokens", 0)
        }

class LoadBalancer:
    """Intelligent load balancer with health-aware routing"""
    
    def __init__(self, providers: List[ProviderAdapter]):
        self.providers = providers
        self.logger = logging.getLogger(f"{__name__}.LoadBalancer")
    
    def get_available_providers(self) -> List[ProviderAdapter]:
        """Get list of healthy providers sorted by priority"""
        available = [p for p in self.providers if p.status != ProviderStatus.UNHEALTHY]
        
        # Sort by priority (lower is better), then by average latency
        return sorted(available, key=lambda p: (
            p.config.priority,
            p.total_latency / max(p.total_requests, 1)
        ))
    
    def select_provider(self, strategy: str = "priority") -> Optional[ProviderAdapter]:
        """Select provider based on strategy"""
        available = self.get_available_providers()
        
        if not available:
            return None
        
        if strategy == "priority":
            return available[0]
        
        elif strategy == "weighted":
            # Weight-based selection
            import random
            total_weight = sum(p.config.weight for p in available)
            r = random.uniform(0, total_weight)
            
            current = 0
            for provider in available:
                current += provider.config.weight
                if r <= current:
                    return provider
        
        elif strategy == "least_latency":
            # Select provider with lowest average latency
            return min(available, key=lambda p: p.total_latency / max(p.total_requests, 1))
        
        return available[0]

class MultiProviderLLM:
    """Main class orchestrating multi-provider fallback"""
    
    def __init__(
        self,
        providers: List[ProviderConfig],
        health_check_interval: int = 60,
        fallback_enabled: bool = True
    ):
        self.adapters = self._init_adapters(providers)
        self.load_balancer = LoadBalancer(self.adapters)
        self.health_check_interval = health_check_interval
        self.fallback_enabled = fallback_enabled
        self.logger = logging.getLogger(f"{__name__}.MultiProviderLLM")
        
        # Start health check loop
        asyncio.create_task(self._health_check_loop())
    
    def _init_adapters(self, providers: List[ProviderConfig]) -> List[ProviderAdapter]:
        """Initialize provider adapters"""
        adapters = []
        
        for config in providers:
            if config.name.lower() == "openai":
                adapters.append(OpenAIAdapter(config))
            elif config.name.lower() == "anthropic":
                adapters.append(AnthropicAdapter(config))
            # Add more providers here
            
        return adapters
    
    async def _health_check_loop(self):
        """Continuous health monitoring"""
        while True:
            for adapter in self.adapters:
                try:
                    # Skip if recently checked
                    if time.time() - adapter.last_health_check < 30:
                        continue
                    
                    healthy = await adapter.health_check()
                    adapter.last_health_check = time.time()
                    
                    if healthy and adapter.status == ProviderStatus.UNHEALTHY:
                        self.logger.info(f"{adapter.config.name} recovered")
                    elif not healthy and adapter.status == ProviderStatus.HEALTHY:
                        self.logger.warning(f"{adapter.config.name} became unhealthy")
                        
                except Exception as e:
                    self.logger.error(f"Health check failed for {adapter.config.name}: {e}")
            
            await asyncio.sleep(self.health_check_interval)
    
    async def complete(
        self,
        messages: List[Dict],
        strategy: str = "priority",
        **kwargs
    ) -> Dict[str, Any]:
        """Make completion request with automatic fallback"""
        
        attempted_providers = []
        last_error = None
        
        # Get ordered list of providers to try
        providers_to_try = self.load_balancer.get_available_providers()
        
        if not providers_to_try:
            raise Exception("No healthy providers available")
        
        # Try each provider in order
        for adapter in providers_to_try:
            attempted_providers.append(adapter.config.name)
            
            try:
                self.logger.info(f"Attempting request with {adapter.config.name}")
                
                response = await adapter.make_request(messages, **kwargs)
                
                # Add metadata about fallback
                response["attempted_providers"] = attempted_providers
                response["fallback_count"] = len(attempted_providers) - 1
                
                return response
                
            except Exception as e:
                last_error = e
                self.logger.warning(
                    f"Provider {adapter.config.name} failed: {e}"
                )
                
                if not self.fallback_enabled:
                    raise
                
                # Continue to next provider
                continue
        
        # All providers failed
        raise Exception(
            f"All providers failed. Attempted: {attempted_providers}. "
            f"Last error: {last_error}"
        )
    
    def get_status(self) -> Dict[str, Any]:
        """Get current system status"""
        return {
            "providers": [
                {
                    "name": adapter.config.name,
                    "status": adapter.status.value,
                    "requests": adapter.total_requests,
                    "avg_latency": adapter.total_latency / max(adapter.total_requests, 1),
                    "consecutive_failures": adapter.consecutive_failures
                }
                for adapter in self.adapters
            ],
            "healthy_count": len([
                a for a in self.adapters 
                if a.status == ProviderStatus.HEALTHY
            ]),
            "total_providers": len(self.adapters)
        }

# Usage example
async def main():
    # Configure providers
    providers = [
        ProviderConfig(
            name="openai",
            api_key="sk-...",
            base_url="https://api.openai.com/v1",
            model="gpt-4",
            priority=1,
            weight=0.6,
            cost_per_1k_tokens=0.03
        ),
        ProviderConfig(
            name="anthropic",
            api_key="sk-ant-...",
            base_url="https://api.anthropic.com/v1",
            model="claude-3-sonnet-20240229",
            priority=2,
            weight=0.3,
            cost_per_1k_tokens=0.015
        ),
        # Add more providers...
    ]
    
    # Initialize multi-provider system
    llm = MultiProviderLLM(providers)
    
    # Make requests with automatic fallback
    response = await llm.complete(
        messages=[
            {"role": "user", "content": "Hello, how are you?"}
        ],
        strategy="priority",
        max_tokens=100
    )
    
    print(f"Response from {response['provider']}: {response['content']}")
    print(f"Fallback count: {response['fallback_count']}")
    
    # Check system status
    status = llm.get_status()
    print(f"System status: {status}")

if __name__ == "__main__":
    asyncio.run(main())

Advanced Routing Strategies

Beyond simple fallback, implement intelligent routing based on request characteristics, provider capabilities, and real-time performance[1]:

Content-Based Routing
class ContentRouter:
    """Route based on request content"""
    
    def __init__(self):
        self.rules = {
            "code": ["openai", "anthropic"],
            "creative": ["anthropic", "openai"],
            "factual": ["google", "openai"],
            "translation": ["google", "deepl"],
            "math": ["openai", "wolfram"]
        }
    
    def categorize_request(self, messages: List[Dict]) -> str:
        """Categorize request type from messages"""
        content = " ".join(m["content"] for m in messages)
        
        # Simple keyword matching (use ML in production)
        if any(kw in content.lower() for kw in ["code", "function", "debug"]):
            return "code"
        elif any(kw in content.lower() for kw in ["story", "poem", "creative"]):
            return "creative"
        elif any(kw in content.lower() for kw in ["translate", "translation"]):
            return "translation"
        elif any(kw in content.lower() for kw in ["calculate", "math", "equation"]):
            return "math"
        else:
            return "factual"
    
    def get_provider_order(self, messages: List[Dict]) -> List[str]:
        category = self.categorize_request(messages)
        return self.rules.get(category, ["openai", "anthropic", "google"])
Cost-Optimized Routing
class CostOptimizer:
    """Route to minimize costs while meeting SLAs"""
    
    def select_provider(
        self,
        providers: List[ProviderAdapter],
        estimated_tokens: int,
        max_latency_ms: Optional[int] = None,
        quality_threshold: float = 0.9
    ) -> ProviderAdapter:
        """Select cheapest provider meeting requirements"""
        
        eligible = []
        
        for provider in providers:
            # Check if healthy
            if provider.status != ProviderStatus.HEALTHY:
                continue
            
            # Check latency requirements
            if max_latency_ms:
                avg_latency = provider.get_average_latency()
                if avg_latency > max_latency_ms:
                    continue
            
            # Check quality threshold (from benchmarks)
            if provider.quality_score < quality_threshold:
                continue
            
            eligible.append(provider)
        
        if not eligible:
            raise Exception("No providers meet requirements")
        
        # Sort by cost and return cheapest
        return min(
            eligible,
            key=lambda p: p.config.cost_per_1k_tokens * estimated_tokens / 1000
        )

Monitoring and Observability

Comprehensive monitoring is essential for maintaining high availability[4]. Track key metrics and set up alerts for degraded performance:

Key Metrics to Monitor

Provider Health

  • • Availability percentage
  • • Response time (p50, p95, p99)
  • • Error rates by type
  • • Consecutive failure count

System Performance

  • • Fallback frequency
  • • Provider distribution
  • • Cost per request
  • • Queue depth

Prometheus Metrics Example

from prometheus_client import Counter, Histogram, Gauge

# Define metrics
request_total = Counter(
    'llm_requests_total',
    'Total LLM requests',
    ['provider', 'status']
)

request_duration = Histogram(
    'llm_request_duration_seconds',
    'LLM request duration',
    ['provider']
)

provider_health = Gauge(
    'llm_provider_health',
    'Provider health status',
    ['provider']
)

fallback_total = Counter(
    'llm_fallback_total',
    'Total fallback occurrences'
)

# Use in your code
async def complete_with_metrics(self, messages, **kwargs):
    start_time = time.time()
    provider = None
    
    try:
        provider = self.load_balancer.select_provider()
        response = await provider.make_request(messages, **kwargs)
        
        # Record success
        request_total.labels(
            provider=provider.config.name,
            status='success'
        ).inc()
        
        return response
        
    except Exception as e:
        # Record failure
        if provider:
            request_total.labels(
                provider=provider.config.name,
                status='error'
            ).inc()
        
        raise
        
    finally:
        # Record duration
        if provider:
            request_duration.labels(
                provider=provider.config.name
            ).observe(time.time() - start_time)

Testing Your Fallback System

Thorough testing ensures your fallback system works correctly under all conditions[1]:

Testing Strategies
import pytest
from unittest.mock import Mock, patch

@pytest.mark.asyncio
async def test_fallback_on_provider_failure():
    """Test fallback when primary provider fails"""
    
    # Mock providers
    openai_mock = Mock(spec=OpenAIAdapter)
    openai_mock.make_request.side_effect = Exception("OpenAI down")
    openai_mock.status = ProviderStatus.UNHEALTHY
    openai_mock.config.name = "openai"
    
    anthropic_mock = Mock(spec=AnthropicAdapter)
    anthropic_mock.make_request.return_value = {
        "content": "Hello from Anthropic",
        "provider": "anthropic"
    }
    anthropic_mock.status = ProviderStatus.HEALTHY
    anthropic_mock.config.name = "anthropic"
    
    # Create system with mocks
    llm = MultiProviderLLM([])
    llm.adapters = [openai_mock, anthropic_mock]
    llm.load_balancer = LoadBalancer(llm.adapters)
    
    # Test fallback
    response = await llm.complete([{"role": "user", "content": "test"}])
    
    assert response["provider"] == "anthropic"
    assert response["fallback_count"] == 1
    assert openai_mock.make_request.called
    assert anthropic_mock.make_request.called

@pytest.mark.asyncio
async def test_all_providers_down():
    """Test behavior when all providers are down"""
    
    # Create system with all unhealthy providers
    llm = MultiProviderLLM([])
    for adapter in llm.adapters:
        adapter.status = ProviderStatus.UNHEALTHY
    
    # Should raise exception
    with pytest.raises(Exception, match="No healthy providers"):
        await llm.complete([{"role": "user", "content": "test"}])

Best Practices

Do's
  • ✓ Implement health checks with appropriate intervals
  • ✓ Use circuit breakers to prevent cascade failures
  • ✓ Log all fallback events for debugging
  • ✓ Monitor provider-specific error patterns
  • ✓ Test fallback logic regularly
  • ✓ Cache successful responses when possible
  • ✓ Implement request deduplication
Don'ts
  • ✗ Don't retry immediately without backoff
  • ✗ Don't ignore provider-specific rate limits
  • ✗ Don't use the same timeout for all providers
  • ✗ Don't fallback infinitely
  • ✗ Don't mix incompatible model capabilities
  • ✗ Don't forget to normalize responses
  • ✗ Don't neglect cost tracking

Conclusion

A well-designed multi-provider fallback system is essential for production AI applications. By implementing intelligent routing, health monitoring, and automatic fallback, you can achieve 99.9% uptime while optimizing for cost and performance[1].

References

  1. [1] LangChain. "Multi-Provider Fallback Patterns" (2024)
  2. [2] Portkey. "Provider Fallback Routing" (2024)
  3. [3] Helicone. "Multi-LLM Provider Resilience" (2024)
  4. [4] Martin Fowler. "Circuit Breaker Pattern" (2024)
  5. [5] AWS. "Disaster Recovery Strategies" (2024)
  6. [6] Microsoft. "Retry Pattern" (2024)