Quick Start
Create a Lambda function for LLM API calls:
# Install AWS CLI and configure aws configure # Create deployment package mkdir lambda-llm && cd lambda-llm pip install requests -t . # Add your handler.py file zip -r function.zip . # Create Lambda function aws lambda create-function --function-name llm-api-handler --runtime python3.11 --role arn:aws:iam::YOUR_ACCOUNT:role/lambda-execution-role --handler handler.lambda_handler --zip-file fileb://function.zip --timeout 30 --memory-size 512
Basic Lambda Handler Setup
Lambda Handler with ParrotRouter
Basic implementation for LLM API calls
import json
import os
import requests
from typing import Dict, Any
# Initialize outside handler for connection reuse
PARROTROUTER_API_KEY = os.environ['PARROTROUTER_API_KEY']
PARROTROUTER_BASE_URL = "https://api.parrotrouter.com/v1"
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
Lambda handler for LLM API calls via ParrotRouter
"""
try:
# Parse request body
body = json.loads(event.get('body', '{}'))
prompt = body.get('prompt', 'Hello from Lambda!')
model = body.get('model', 'gpt-3.5-turbo')
# Prepare LLM request
headers = {
'Authorization': f'Bearer {PARROTROUTER_API_KEY}',
'Content-Type': 'application/json'
}
payload = {
'model': model,
'messages': [
{'role': 'user', 'content': prompt}
],
'temperature': 0.7,
'max_tokens': 500
}
# Make API call with timeout
response = requests.post(
f'{PARROTROUTER_BASE_URL}/chat/completions',
headers=headers,
json=payload,
timeout=25 # Lambda timeout minus buffer
)
response.raise_for_status()
result = response.json()
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'success': True,
'response': result['choices'][0]['message']['content'],
'model': model,
'usage': result.get('usage', {})
})
}
except requests.exceptions.Timeout:
return {
'statusCode': 504,
'body': json.dumps({'error': 'Request timeout'})
}
except Exception as e:
print(f"Error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}Always set Lambda timeout to be longer than your HTTP request timeout to avoid orphaned requests.
Cold Start Optimization
Provisioned Concurrency
Eliminate cold starts for critical functions
# Configure provisioned concurrency aws lambda put-provisioned-concurrency-config --function-name llm-api-handler --provisioned-concurrent-executions 5 --qualifier $LATEST # Set up auto-scaling aws application-autoscaling register-scalable-target --service-namespace lambda --resource-id function:llm-api-handler:provisioned-concurrency:$LATEST --scalable-dimension lambda:function:ProvisionedConcurrency --min-capacity 2 --max-capacity 10
Optimized Dependencies
Minimize package size and import time
# handler.py - Optimized imports
import json
import os
# Lazy import expensive libraries
_requests = None
def get_requests():
global _requests
if _requests is None:
import requests
_requests = requests
return _requests
def lambda_handler(event, context):
requests = get_requests() # Import only when needed
# ... rest of handler codeAPI Gateway Integration
REST API Configuration
Set up API Gateway with Lambda proxy integration
# serverless.yml
service: llm-api-service
provider:
name: aws
runtime: python3.11
region: us-east-1
apiGateway:
shouldStartNameWithService: true
apiKeys:
- name: llm-api-key
description: API key for LLM endpoints
usagePlan:
quota:
limit: 10000
period: MONTH
throttle:
burstLimit: 100
rateLimit: 50
functions:
llmHandler:
handler: handler.lambda_handler
memorySize: 512
timeout: 30
environment:
PARROTROUTER_API_KEY: ${ssm:/parrotrouter/api-key}
events:
- http:
path: /generate
method: post
cors: true
private: true # Requires API keyAsync Processing with SQS
Queue-Based Architecture
Handle long-running LLM tasks asynchronously
import json
import boto3
import uuid
from datetime import datetime
sqs = boto3.client('sqs')
s3 = boto3.client('s3')
def api_handler(event, context):
"""API endpoint - enqueues request"""
body = json.loads(event['body'])
request_id = str(uuid.uuid4())
# Send to SQS for async processing
sqs.send_message(
QueueUrl=os.environ['QUEUE_URL'],
MessageBody=json.dumps({
'request_id': request_id,
'timestamp': datetime.utcnow().isoformat(),
'payload': body
})
)
return {
'statusCode': 202,
'body': json.dumps({
'request_id': request_id,
'status': 'processing',
'result_url': f"/status/{request_id}"
})
}
def processor_handler(event, context):
"""SQS processor - handles LLM calls"""
for record in event['Records']:
message = json.loads(record['body'])
request_id = message['request_id']
try:
# Process LLM request
result = call_llm_api(message['payload'])
# Store result in S3
s3.put_object(
Bucket=os.environ['RESULTS_BUCKET'],
Key=f"results/{request_id}.json",
Body=json.dumps({
'request_id': request_id,
'status': 'completed',
'result': result
})
)
except Exception as e:
# Store error
s3.put_object(
Bucket=os.environ['RESULTS_BUCKET'],
Key=f"results/{request_id}.json",
Body=json.dumps({
'request_id': request_id,
'status': 'failed',
'error': str(e)
})
)Lambda Layers for Dependencies
Creating and Using Layers
Share common dependencies across functions
# Create layer directory structure mkdir -p layer/python/lib/python3.11/site-packages # Install dependencies pip install requests openai anthropic -t layer/python/lib/python3.11/site-packages/ # Create layer zip cd layer && zip -r ../llm-dependencies.zip . && cd .. # Publish layer aws lambda publish-layer-version --layer-name llm-dependencies --description "Common LLM API dependencies" --zip-file fileb://llm-dependencies.zip --compatible-runtimes python3.11 # Attach to function aws lambda update-function-configuration --function-name llm-api-handler --layers arn:aws:lambda:region:account:layer:llm-dependencies:1
Secrets Management
AWS Secrets Manager Integration
Securely manage API keys and credentials
import json
import boto3
from functools import lru_cache
secrets_client = boto3.client('secretsmanager')
@lru_cache(maxsize=1)
def get_secret(secret_name: str) -> dict:
"""Cache secrets to avoid repeated API calls"""
try:
response = secrets_client.get_secret_value(
SecretId=secret_name
)
return json.loads(response['SecretString'])
except Exception as e:
print(f"Error retrieving secret: {e}")
raise
def lambda_handler(event, context):
# Get cached secrets
secrets = get_secret('parrotrouter/api-keys')
api_key = secrets['PARROTROUTER_API_KEY']
# Use API key for requests
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
# ... rest of handlerMonitoring & Logging
CloudWatch Metrics
Track performance and errors
import json
import time
import boto3
cloudwatch = boto3.client('cloudwatch')
def lambda_handler(event, context):
start_time = time.time()
try:
# Process request
result = process_llm_request(event)
# Log success metric
cloudwatch.put_metric_data(
Namespace='LLM/API',
MetricData=[
{
'MetricName': 'RequestSuccess',
'Value': 1,
'Unit': 'Count'
},
{
'MetricName': 'RequestLatency',
'Value': (time.time() - start_time) * 1000,
'Unit': 'Milliseconds'
}
]
)
return result
except Exception as e:
# Log error metric
cloudwatch.put_metric_data(
Namespace='LLM/API',
MetricData=[
{
'MetricName': 'RequestError',
'Value': 1,
'Unit': 'Count',
'Dimensions': [
{
'Name': 'ErrorType',
'Value': type(e).__name__
}
]
}
]
)
raiseX-Ray Tracing
Distributed tracing for debugging
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.ext.flask.middleware import XRayMiddleware
import requests
# Patch libraries for tracing
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('llm_api_call')
def call_llm_api(prompt: str, model: str):
"""Traced LLM API call"""
subsegment = xray_recorder.current_subsegment()
subsegment.put_metadata('model', model)
subsegment.put_metadata('prompt_length', len(prompt))
response = requests.post(
'https://api.parrotrouter.com/v1/chat/completions',
json={'model': model, 'messages': [{'role': 'user', 'content': prompt}]}
)
subsegment.put_metadata('response_tokens', response.json().get('usage', {}).get('total_tokens'))
return response.json()Cost Optimization
Cost-Saving Strategies
Optimize Lambda and LLM API costs
Memory Optimization: Right-size Lambda memory. More memory = faster CPU, potentially lower cost.
# Cost optimization patterns
# 1. Response caching with DynamoDB
import hashlib
from datetime import datetime, timedelta
dynamodb = boto3.resource('dynamodb')
cache_table = dynamodb.Table('llm-cache')
def get_cached_response(prompt: str, model: str):
cache_key = hashlib.md5(f"{model}:{prompt}".encode()).hexdigest()
response = cache_table.get_item(Key={'cache_key': cache_key})
if 'Item' in response:
item = response['Item']
if datetime.fromisoformat(item['expires_at']) > datetime.utcnow():
return item['response']
return None
def cache_response(prompt: str, model: str, response: str, ttl_hours: int = 24):
cache_key = hashlib.md5(f"{model}:{prompt}".encode()).hexdigest()
expires_at = datetime.utcnow() + timedelta(hours=ttl_hours)
cache_table.put_item(
Item={
'cache_key': cache_key,
'prompt': prompt,
'model': model,
'response': response,
'expires_at': expires_at.isoformat()
}
)
# 2. Batch processing for multiple requests
def batch_handler(event, context):
"""Process multiple LLM requests in one Lambda invocation"""
batch_requests = json.loads(event['body'])['requests']
results = []
for request in batch_requests:
# Check cache first
cached = get_cached_response(request['prompt'], request['model'])
if cached:
results.append(cached)
else:
# Process and cache
response = call_llm_api(request)
cache_response(request['prompt'], request['model'], response)
results.append(response)
return {'statusCode': 200, 'body': json.dumps(results)}Lambda Cost Factors
- • Invocations: $0.20 per 1M requests
- • Duration: $0.0000166667 per GB-second
- • Provisioned Concurrency: $0.0000041667 per GB-second
Optimization Tips
- • Use ARM architecture (Graviton2)
- • Enable HTTP keep-alive
- • Minimize cold starts
- • Cache frequent responses
Production Checklist
Performance
- ✓Provisioned concurrency for critical functions
- ✓Connection pooling and keep-alive
- ✓Response caching strategy
- ✓Optimized memory allocation
Reliability
- ✓Dead letter queues configured
- ✓Retry logic with exponential backoff
- ✓CloudWatch alarms for errors
- ✓X-Ray tracing enabled