Why Multi-Provider Architecture?
Architecture Overview
A production-ready multi-provider system combines intelligent routing, health monitoring, and seamless fallback to ensure your AI applications remain available even during provider outages[1].
API Gateway
Single endpoint for all LLM requests
Health Monitor
Continuous provider health checks
Load Balancer
Intelligent request routing
Fallback Engine
Automatic provider switching
Core Implementation
Let's build a production-ready multi-provider system with health checks, intelligent routing, and automatic fallback[2]:
import asyncio
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from enum import Enum
import aiohttp
import logging
class ProviderStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class ProviderConfig:
name: str
api_key: str
base_url: str
model: str
timeout: int = 30
max_retries: int = 3
cost_per_1k_tokens: float = 0.01
priority: int = 1
weight: float = 1.0
class ProviderAdapter(ABC):
"""Abstract base class for provider adapters"""
def __init__(self, config: ProviderConfig):
self.config = config
self.status = ProviderStatus.HEALTHY
self.last_health_check = 0
self.consecutive_failures = 0
self.total_requests = 0
self.total_latency = 0
self.logger = logging.getLogger(f"{__name__}.{config.name}")
@abstractmethod
async def complete(self, messages: List[Dict], **kwargs) -> Dict[str, Any]:
"""Make completion request to provider"""
pass
@abstractmethod
async def health_check(self) -> bool:
"""Check if provider is healthy"""
pass
def normalize_response(self, provider_response: Dict) -> Dict[str, Any]:
"""Normalize provider response to standard format"""
return {
"content": self.extract_content(provider_response),
"usage": self.extract_usage(provider_response),
"provider": self.config.name,
"model": self.config.model,
"latency": getattr(self, '_last_latency', 0)
}
@abstractmethod
def extract_content(self, response: Dict) -> str:
"""Extract content from provider-specific response"""
pass
@abstractmethod
def extract_usage(self, response: Dict) -> Dict[str, int]:
"""Extract token usage from provider response"""
pass
async def make_request(self, messages: List[Dict], **kwargs) -> Dict[str, Any]:
"""Make request with timing and error handling"""
start_time = time.time()
try:
response = await self.complete(messages, **kwargs)
self._last_latency = time.time() - start_time
self.total_latency += self._last_latency
self.total_requests += 1
self.consecutive_failures = 0
return self.normalize_response(response)
except Exception as e:
self.consecutive_failures += 1
self.logger.error(f"Request failed: {e}")
# Update status based on failures
if self.consecutive_failures >= 3:
self.status = ProviderStatus.UNHEALTHY
elif self.consecutive_failures >= 1:
self.status = ProviderStatus.DEGRADED
raise
class OpenAIAdapter(ProviderAdapter):
"""OpenAI-specific adapter"""
async def complete(self, messages: List[Dict], **kwargs) -> Dict[str, Any]:
async with aiohttp.ClientSession() as session:
headers = {
"Authorization": f"Bearer {self.config.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": self.config.model,
"messages": messages,
**kwargs
}
async with session.post(
f"{self.config.base_url}/chat/completions",
json=payload,
headers=headers,
timeout=aiohttp.ClientTimeout(total=self.config.timeout)
) as response:
if response.status != 200:
error_data = await response.text()
raise Exception(f"OpenAI error {response.status}: {error_data}")
return await response.json()
async def health_check(self) -> bool:
try:
await self.complete([{"role": "user", "content": "Hi"}], max_tokens=5)
self.status = ProviderStatus.HEALTHY
return True
except:
return False
def extract_content(self, response: Dict) -> str:
return response["choices"][0]["message"]["content"]
def extract_usage(self, response: Dict) -> Dict[str, int]:
return response.get("usage", {})
class AnthropicAdapter(ProviderAdapter):
"""Anthropic Claude adapter"""
async def complete(self, messages: List[Dict], **kwargs) -> Dict[str, Any]:
async with aiohttp.ClientSession() as session:
headers = {
"x-api-key": self.config.api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json"
}
# Convert messages to Anthropic format
system_msg = next((m["content"] for m in messages if m["role"] == "system"), None)
user_msgs = [m for m in messages if m["role"] != "system"]
payload = {
"model": self.config.model,
"messages": user_msgs,
"max_tokens": kwargs.get("max_tokens", 1000),
}
if system_msg:
payload["system"] = system_msg
async with session.post(
f"{self.config.base_url}/messages",
json=payload,
headers=headers,
timeout=aiohttp.ClientTimeout(total=self.config.timeout)
) as response:
if response.status != 200:
error_data = await response.text()
raise Exception(f"Anthropic error {response.status}: {error_data}")
return await response.json()
async def health_check(self) -> bool:
try:
await self.complete([{"role": "user", "content": "Hi"}], max_tokens=5)
self.status = ProviderStatus.HEALTHY
return True
except:
return False
def extract_content(self, response: Dict) -> str:
return response["content"][0]["text"]
def extract_usage(self, response: Dict) -> Dict[str, int]:
return {
"prompt_tokens": response.get("usage", {}).get("input_tokens", 0),
"completion_tokens": response.get("usage", {}).get("output_tokens", 0),
"total_tokens": response.get("usage", {}).get("input_tokens", 0) +
response.get("usage", {}).get("output_tokens", 0)
}
class LoadBalancer:
"""Intelligent load balancer with health-aware routing"""
def __init__(self, providers: List[ProviderAdapter]):
self.providers = providers
self.logger = logging.getLogger(f"{__name__}.LoadBalancer")
def get_available_providers(self) -> List[ProviderAdapter]:
"""Get list of healthy providers sorted by priority"""
available = [p for p in self.providers if p.status != ProviderStatus.UNHEALTHY]
# Sort by priority (lower is better), then by average latency
return sorted(available, key=lambda p: (
p.config.priority,
p.total_latency / max(p.total_requests, 1)
))
def select_provider(self, strategy: str = "priority") -> Optional[ProviderAdapter]:
"""Select provider based on strategy"""
available = self.get_available_providers()
if not available:
return None
if strategy == "priority":
return available[0]
elif strategy == "weighted":
# Weight-based selection
import random
total_weight = sum(p.config.weight for p in available)
r = random.uniform(0, total_weight)
current = 0
for provider in available:
current += provider.config.weight
if r <= current:
return provider
elif strategy == "least_latency":
# Select provider with lowest average latency
return min(available, key=lambda p: p.total_latency / max(p.total_requests, 1))
return available[0]
class MultiProviderLLM:
"""Main class orchestrating multi-provider fallback"""
def __init__(
self,
providers: List[ProviderConfig],
health_check_interval: int = 60,
fallback_enabled: bool = True
):
self.adapters = self._init_adapters(providers)
self.load_balancer = LoadBalancer(self.adapters)
self.health_check_interval = health_check_interval
self.fallback_enabled = fallback_enabled
self.logger = logging.getLogger(f"{__name__}.MultiProviderLLM")
# Start health check loop
asyncio.create_task(self._health_check_loop())
def _init_adapters(self, providers: List[ProviderConfig]) -> List[ProviderAdapter]:
"""Initialize provider adapters"""
adapters = []
for config in providers:
if config.name.lower() == "openai":
adapters.append(OpenAIAdapter(config))
elif config.name.lower() == "anthropic":
adapters.append(AnthropicAdapter(config))
# Add more providers here
return adapters
async def _health_check_loop(self):
"""Continuous health monitoring"""
while True:
for adapter in self.adapters:
try:
# Skip if recently checked
if time.time() - adapter.last_health_check < 30:
continue
healthy = await adapter.health_check()
adapter.last_health_check = time.time()
if healthy and adapter.status == ProviderStatus.UNHEALTHY:
self.logger.info(f"{adapter.config.name} recovered")
elif not healthy and adapter.status == ProviderStatus.HEALTHY:
self.logger.warning(f"{adapter.config.name} became unhealthy")
except Exception as e:
self.logger.error(f"Health check failed for {adapter.config.name}: {e}")
await asyncio.sleep(self.health_check_interval)
async def complete(
self,
messages: List[Dict],
strategy: str = "priority",
**kwargs
) -> Dict[str, Any]:
"""Make completion request with automatic fallback"""
attempted_providers = []
last_error = None
# Get ordered list of providers to try
providers_to_try = self.load_balancer.get_available_providers()
if not providers_to_try:
raise Exception("No healthy providers available")
# Try each provider in order
for adapter in providers_to_try:
attempted_providers.append(adapter.config.name)
try:
self.logger.info(f"Attempting request with {adapter.config.name}")
response = await adapter.make_request(messages, **kwargs)
# Add metadata about fallback
response["attempted_providers"] = attempted_providers
response["fallback_count"] = len(attempted_providers) - 1
return response
except Exception as e:
last_error = e
self.logger.warning(
f"Provider {adapter.config.name} failed: {e}"
)
if not self.fallback_enabled:
raise
# Continue to next provider
continue
# All providers failed
raise Exception(
f"All providers failed. Attempted: {attempted_providers}. "
f"Last error: {last_error}"
)
def get_status(self) -> Dict[str, Any]:
"""Get current system status"""
return {
"providers": [
{
"name": adapter.config.name,
"status": adapter.status.value,
"requests": adapter.total_requests,
"avg_latency": adapter.total_latency / max(adapter.total_requests, 1),
"consecutive_failures": adapter.consecutive_failures
}
for adapter in self.adapters
],
"healthy_count": len([
a for a in self.adapters
if a.status == ProviderStatus.HEALTHY
]),
"total_providers": len(self.adapters)
}
# Usage example
async def main():
# Configure providers
providers = [
ProviderConfig(
name="openai",
api_key="sk-...",
base_url="https://api.openai.com/v1",
model="gpt-4",
priority=1,
weight=0.6,
cost_per_1k_tokens=0.03
),
ProviderConfig(
name="anthropic",
api_key="sk-ant-...",
base_url="https://api.anthropic.com/v1",
model="claude-3-sonnet-20240229",
priority=2,
weight=0.3,
cost_per_1k_tokens=0.015
),
# Add more providers...
]
# Initialize multi-provider system
llm = MultiProviderLLM(providers)
# Make requests with automatic fallback
response = await llm.complete(
messages=[
{"role": "user", "content": "Hello, how are you?"}
],
strategy="priority",
max_tokens=100
)
print(f"Response from {response['provider']}: {response['content']}")
print(f"Fallback count: {response['fallback_count']}")
# Check system status
status = llm.get_status()
print(f"System status: {status}")
if __name__ == "__main__":
asyncio.run(main())
Advanced Routing Strategies
Beyond simple fallback, implement intelligent routing based on request characteristics, provider capabilities, and real-time performance[1]:
class ContentRouter:
"""Route based on request content"""
def __init__(self):
self.rules = {
"code": ["openai", "anthropic"],
"creative": ["anthropic", "openai"],
"factual": ["google", "openai"],
"translation": ["google", "deepl"],
"math": ["openai", "wolfram"]
}
def categorize_request(self, messages: List[Dict]) -> str:
"""Categorize request type from messages"""
content = " ".join(m["content"] for m in messages)
# Simple keyword matching (use ML in production)
if any(kw in content.lower() for kw in ["code", "function", "debug"]):
return "code"
elif any(kw in content.lower() for kw in ["story", "poem", "creative"]):
return "creative"
elif any(kw in content.lower() for kw in ["translate", "translation"]):
return "translation"
elif any(kw in content.lower() for kw in ["calculate", "math", "equation"]):
return "math"
else:
return "factual"
def get_provider_order(self, messages: List[Dict]) -> List[str]:
category = self.categorize_request(messages)
return self.rules.get(category, ["openai", "anthropic", "google"])
class CostOptimizer:
"""Route to minimize costs while meeting SLAs"""
def select_provider(
self,
providers: List[ProviderAdapter],
estimated_tokens: int,
max_latency_ms: Optional[int] = None,
quality_threshold: float = 0.9
) -> ProviderAdapter:
"""Select cheapest provider meeting requirements"""
eligible = []
for provider in providers:
# Check if healthy
if provider.status != ProviderStatus.HEALTHY:
continue
# Check latency requirements
if max_latency_ms:
avg_latency = provider.get_average_latency()
if avg_latency > max_latency_ms:
continue
# Check quality threshold (from benchmarks)
if provider.quality_score < quality_threshold:
continue
eligible.append(provider)
if not eligible:
raise Exception("No providers meet requirements")
# Sort by cost and return cheapest
return min(
eligible,
key=lambda p: p.config.cost_per_1k_tokens * estimated_tokens / 1000
)
Monitoring and Observability
Comprehensive monitoring is essential for maintaining high availability[4]. Track key metrics and set up alerts for degraded performance:
Provider Health
- • Availability percentage
- • Response time (p50, p95, p99)
- • Error rates by type
- • Consecutive failure count
System Performance
- • Fallback frequency
- • Provider distribution
- • Cost per request
- • Queue depth
Prometheus Metrics Example
from prometheus_client import Counter, Histogram, Gauge
# Define metrics
request_total = Counter(
'llm_requests_total',
'Total LLM requests',
['provider', 'status']
)
request_duration = Histogram(
'llm_request_duration_seconds',
'LLM request duration',
['provider']
)
provider_health = Gauge(
'llm_provider_health',
'Provider health status',
['provider']
)
fallback_total = Counter(
'llm_fallback_total',
'Total fallback occurrences'
)
# Use in your code
async def complete_with_metrics(self, messages, **kwargs):
start_time = time.time()
provider = None
try:
provider = self.load_balancer.select_provider()
response = await provider.make_request(messages, **kwargs)
# Record success
request_total.labels(
provider=provider.config.name,
status='success'
).inc()
return response
except Exception as e:
# Record failure
if provider:
request_total.labels(
provider=provider.config.name,
status='error'
).inc()
raise
finally:
# Record duration
if provider:
request_duration.labels(
provider=provider.config.name
).observe(time.time() - start_time)
Testing Your Fallback System
Thorough testing ensures your fallback system works correctly under all conditions[1]:
import pytest
from unittest.mock import Mock, patch
@pytest.mark.asyncio
async def test_fallback_on_provider_failure():
"""Test fallback when primary provider fails"""
# Mock providers
openai_mock = Mock(spec=OpenAIAdapter)
openai_mock.make_request.side_effect = Exception("OpenAI down")
openai_mock.status = ProviderStatus.UNHEALTHY
openai_mock.config.name = "openai"
anthropic_mock = Mock(spec=AnthropicAdapter)
anthropic_mock.make_request.return_value = {
"content": "Hello from Anthropic",
"provider": "anthropic"
}
anthropic_mock.status = ProviderStatus.HEALTHY
anthropic_mock.config.name = "anthropic"
# Create system with mocks
llm = MultiProviderLLM([])
llm.adapters = [openai_mock, anthropic_mock]
llm.load_balancer = LoadBalancer(llm.adapters)
# Test fallback
response = await llm.complete([{"role": "user", "content": "test"}])
assert response["provider"] == "anthropic"
assert response["fallback_count"] == 1
assert openai_mock.make_request.called
assert anthropic_mock.make_request.called
@pytest.mark.asyncio
async def test_all_providers_down():
"""Test behavior when all providers are down"""
# Create system with all unhealthy providers
llm = MultiProviderLLM([])
for adapter in llm.adapters:
adapter.status = ProviderStatus.UNHEALTHY
# Should raise exception
with pytest.raises(Exception, match="No healthy providers"):
await llm.complete([{"role": "user", "content": "test"}])
Best Practices
- ✓ Implement health checks with appropriate intervals
- ✓ Use circuit breakers to prevent cascade failures
- ✓ Log all fallback events for debugging
- ✓ Monitor provider-specific error patterns
- ✓ Test fallback logic regularly
- ✓ Cache successful responses when possible
- ✓ Implement request deduplication
- ✗ Don't retry immediately without backoff
- ✗ Don't ignore provider-specific rate limits
- ✗ Don't use the same timeout for all providers
- ✗ Don't fallback infinitely
- ✗ Don't mix incompatible model capabilities
- ✗ Don't forget to normalize responses
- ✗ Don't neglect cost tracking
Conclusion
A well-designed multi-provider fallback system is essential for production AI applications. By implementing intelligent routing, health monitoring, and automatic fallback, you can achieve 99.9% uptime while optimizing for cost and performance[1].
Ready for Production?
References
- [1] LangChain. "Multi-Provider Fallback Patterns" (2024)
- [2] Portkey. "Provider Fallback Routing" (2024)
- [3] Helicone. "Multi-LLM Provider Resilience" (2024)
- [4] Martin Fowler. "Circuit Breaker Pattern" (2024)
- [5] AWS. "Disaster Recovery Strategies" (2024)
- [6] Microsoft. "Retry Pattern" (2024)