Quick Start
Create a Lambda function for LLM API calls:
# Install AWS CLI and configure aws configure # Create deployment package mkdir lambda-llm && cd lambda-llm pip install requests -t . # Add your handler.py file zip -r function.zip . # Create Lambda function aws lambda create-function --function-name llm-api-handler --runtime python3.11 --role arn:aws:iam::YOUR_ACCOUNT:role/lambda-execution-role --handler handler.lambda_handler --zip-file fileb://function.zip --timeout 30 --memory-size 512
Basic Lambda Handler Setup
Lambda Handler with ParrotRouter
Basic implementation for LLM API calls
import json import os import requests from typing import Dict, Any # Initialize outside handler for connection reuse PARROTROUTER_API_KEY = os.environ['PARROTROUTER_API_KEY'] PARROTROUTER_BASE_URL = "https://api.parrotrouter.com/v1" def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: """ Lambda handler for LLM API calls via ParrotRouter """ try: # Parse request body body = json.loads(event.get('body', '{}')) prompt = body.get('prompt', 'Hello from Lambda!') model = body.get('model', 'gpt-3.5-turbo') # Prepare LLM request headers = { 'Authorization': f'Bearer {PARROTROUTER_API_KEY}', 'Content-Type': 'application/json' } payload = { 'model': model, 'messages': [ {'role': 'user', 'content': prompt} ], 'temperature': 0.7, 'max_tokens': 500 } # Make API call with timeout response = requests.post( f'{PARROTROUTER_BASE_URL}/chat/completions', headers=headers, json=payload, timeout=25 # Lambda timeout minus buffer ) response.raise_for_status() result = response.json() return { 'statusCode': 200, 'headers': { 'Content-Type': 'application/json', 'Access-Control-Allow-Origin': '*' }, 'body': json.dumps({ 'success': True, 'response': result['choices'][0]['message']['content'], 'model': model, 'usage': result.get('usage', {}) }) } except requests.exceptions.Timeout: return { 'statusCode': 504, 'body': json.dumps({'error': 'Request timeout'}) } except Exception as e: print(f"Error: {str(e)}") return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) }
Always set Lambda timeout to be longer than your HTTP request timeout to avoid orphaned requests.
Cold Start Optimization
Provisioned Concurrency
Eliminate cold starts for critical functions
# Configure provisioned concurrency aws lambda put-provisioned-concurrency-config --function-name llm-api-handler --provisioned-concurrent-executions 5 --qualifier $LATEST # Set up auto-scaling aws application-autoscaling register-scalable-target --service-namespace lambda --resource-id function:llm-api-handler:provisioned-concurrency:$LATEST --scalable-dimension lambda:function:ProvisionedConcurrency --min-capacity 2 --max-capacity 10
Optimized Dependencies
Minimize package size and import time
# handler.py - Optimized imports import json import os # Lazy import expensive libraries _requests = None def get_requests(): global _requests if _requests is None: import requests _requests = requests return _requests def lambda_handler(event, context): requests = get_requests() # Import only when needed # ... rest of handler code
API Gateway Integration
REST API Configuration
Set up API Gateway with Lambda proxy integration
# serverless.yml service: llm-api-service provider: name: aws runtime: python3.11 region: us-east-1 apiGateway: shouldStartNameWithService: true apiKeys: - name: llm-api-key description: API key for LLM endpoints usagePlan: quota: limit: 10000 period: MONTH throttle: burstLimit: 100 rateLimit: 50 functions: llmHandler: handler: handler.lambda_handler memorySize: 512 timeout: 30 environment: PARROTROUTER_API_KEY: ${ssm:/parrotrouter/api-key} events: - http: path: /generate method: post cors: true private: true # Requires API key
Async Processing with SQS
Queue-Based Architecture
Handle long-running LLM tasks asynchronously
import json import boto3 import uuid from datetime import datetime sqs = boto3.client('sqs') s3 = boto3.client('s3') def api_handler(event, context): """API endpoint - enqueues request""" body = json.loads(event['body']) request_id = str(uuid.uuid4()) # Send to SQS for async processing sqs.send_message( QueueUrl=os.environ['QUEUE_URL'], MessageBody=json.dumps({ 'request_id': request_id, 'timestamp': datetime.utcnow().isoformat(), 'payload': body }) ) return { 'statusCode': 202, 'body': json.dumps({ 'request_id': request_id, 'status': 'processing', 'result_url': f"/status/{request_id}" }) } def processor_handler(event, context): """SQS processor - handles LLM calls""" for record in event['Records']: message = json.loads(record['body']) request_id = message['request_id'] try: # Process LLM request result = call_llm_api(message['payload']) # Store result in S3 s3.put_object( Bucket=os.environ['RESULTS_BUCKET'], Key=f"results/{request_id}.json", Body=json.dumps({ 'request_id': request_id, 'status': 'completed', 'result': result }) ) except Exception as e: # Store error s3.put_object( Bucket=os.environ['RESULTS_BUCKET'], Key=f"results/{request_id}.json", Body=json.dumps({ 'request_id': request_id, 'status': 'failed', 'error': str(e) }) )
Lambda Layers for Dependencies
Creating and Using Layers
Share common dependencies across functions
# Create layer directory structure mkdir -p layer/python/lib/python3.11/site-packages # Install dependencies pip install requests openai anthropic -t layer/python/lib/python3.11/site-packages/ # Create layer zip cd layer && zip -r ../llm-dependencies.zip . && cd .. # Publish layer aws lambda publish-layer-version --layer-name llm-dependencies --description "Common LLM API dependencies" --zip-file fileb://llm-dependencies.zip --compatible-runtimes python3.11 # Attach to function aws lambda update-function-configuration --function-name llm-api-handler --layers arn:aws:lambda:region:account:layer:llm-dependencies:1
Secrets Management
AWS Secrets Manager Integration
Securely manage API keys and credentials
import json import boto3 from functools import lru_cache secrets_client = boto3.client('secretsmanager') @lru_cache(maxsize=1) def get_secret(secret_name: str) -> dict: """Cache secrets to avoid repeated API calls""" try: response = secrets_client.get_secret_value( SecretId=secret_name ) return json.loads(response['SecretString']) except Exception as e: print(f"Error retrieving secret: {e}") raise def lambda_handler(event, context): # Get cached secrets secrets = get_secret('parrotrouter/api-keys') api_key = secrets['PARROTROUTER_API_KEY'] # Use API key for requests headers = { 'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json' } # ... rest of handler
Monitoring & Logging
CloudWatch Metrics
Track performance and errors
import json import time import boto3 cloudwatch = boto3.client('cloudwatch') def lambda_handler(event, context): start_time = time.time() try: # Process request result = process_llm_request(event) # Log success metric cloudwatch.put_metric_data( Namespace='LLM/API', MetricData=[ { 'MetricName': 'RequestSuccess', 'Value': 1, 'Unit': 'Count' }, { 'MetricName': 'RequestLatency', 'Value': (time.time() - start_time) * 1000, 'Unit': 'Milliseconds' } ] ) return result except Exception as e: # Log error metric cloudwatch.put_metric_data( Namespace='LLM/API', MetricData=[ { 'MetricName': 'RequestError', 'Value': 1, 'Unit': 'Count', 'Dimensions': [ { 'Name': 'ErrorType', 'Value': type(e).__name__ } ] } ] ) raise
X-Ray Tracing
Distributed tracing for debugging
from aws_xray_sdk.core import xray_recorder from aws_xray_sdk.ext.flask.middleware import XRayMiddleware import requests # Patch libraries for tracing from aws_xray_sdk.core import patch_all patch_all() @xray_recorder.capture('llm_api_call') def call_llm_api(prompt: str, model: str): """Traced LLM API call""" subsegment = xray_recorder.current_subsegment() subsegment.put_metadata('model', model) subsegment.put_metadata('prompt_length', len(prompt)) response = requests.post( 'https://api.parrotrouter.com/v1/chat/completions', json={'model': model, 'messages': [{'role': 'user', 'content': prompt}]} ) subsegment.put_metadata('response_tokens', response.json().get('usage', {}).get('total_tokens')) return response.json()
Cost Optimization
Cost-Saving Strategies
Optimize Lambda and LLM API costs
Memory Optimization: Right-size Lambda memory. More memory = faster CPU, potentially lower cost.
# Cost optimization patterns # 1. Response caching with DynamoDB import hashlib from datetime import datetime, timedelta dynamodb = boto3.resource('dynamodb') cache_table = dynamodb.Table('llm-cache') def get_cached_response(prompt: str, model: str): cache_key = hashlib.md5(f"{model}:{prompt}".encode()).hexdigest() response = cache_table.get_item(Key={'cache_key': cache_key}) if 'Item' in response: item = response['Item'] if datetime.fromisoformat(item['expires_at']) > datetime.utcnow(): return item['response'] return None def cache_response(prompt: str, model: str, response: str, ttl_hours: int = 24): cache_key = hashlib.md5(f"{model}:{prompt}".encode()).hexdigest() expires_at = datetime.utcnow() + timedelta(hours=ttl_hours) cache_table.put_item( Item={ 'cache_key': cache_key, 'prompt': prompt, 'model': model, 'response': response, 'expires_at': expires_at.isoformat() } ) # 2. Batch processing for multiple requests def batch_handler(event, context): """Process multiple LLM requests in one Lambda invocation""" batch_requests = json.loads(event['body'])['requests'] results = [] for request in batch_requests: # Check cache first cached = get_cached_response(request['prompt'], request['model']) if cached: results.append(cached) else: # Process and cache response = call_llm_api(request) cache_response(request['prompt'], request['model'], response) results.append(response) return {'statusCode': 200, 'body': json.dumps(results)}
Lambda Cost Factors
- • Invocations: $0.20 per 1M requests
- • Duration: $0.0000166667 per GB-second
- • Provisioned Concurrency: $0.0000041667 per GB-second
Optimization Tips
- • Use ARM architecture (Graviton2)
- • Enable HTTP keep-alive
- • Minimize cold starts
- • Cache frequent responses
Production Checklist
Performance
- ✓Provisioned concurrency for critical functions
- ✓Connection pooling and keep-alive
- ✓Response caching strategy
- ✓Optimized memory allocation
Reliability
- ✓Dead letter queues configured
- ✓Retry logic with exponential backoff
- ✓CloudWatch alarms for errors
- ✓X-Ray tracing enabled