Quick Start
Set up a new Django project with LLM support:
django-admin startproject llm_project cd llm_project pip install django djangorestframework openai anthropic pip install celery redis django-channels httpx
1. Project Setup & Configuration
Settings Configuration
# settings.py
import os
from pathlib import Path
# Build paths
BASE_DIR = Path(__file__).resolve().parent.parent
# Security
SECRET_KEY = os.environ.get('DJANGO_SECRET_KEY')
DEBUG = os.environ.get('DEBUG', 'False') == 'True'
ALLOWED_HOSTS = os.environ.get('ALLOWED_HOSTS', '').split(',')
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'rest_framework.authtoken',
'channels',
'llm_app', # Your app
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
# LLM API Configuration
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')
ANTHROPIC_API_KEY = os.environ.get('ANTHROPIC_API_KEY')
GOOGLE_API_KEY = os.environ.get('GOOGLE_API_KEY')
# Django REST Framework
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.TokenAuthentication',
'rest_framework.authentication.SessionAuthentication',
],
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.IsAuthenticated',
],
'DEFAULT_THROTTLE_CLASSES': [
'rest_framework.throttling.AnonRateThrottle',
'rest_framework.throttling.UserRateThrottle'
],
'DEFAULT_THROTTLE_RATES': {
'anon': '10/hour',
'user': '100/hour'
}
}
# Celery Configuration
CELERY_BROKER_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
CELERY_RESULT_BACKEND = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
# Channels Configuration
ASGI_APPLICATION = 'llm_project.asgi.application'
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
'hosts': [(os.environ.get('REDIS_URL', 'redis://localhost:6379'))],
},
},
}
# Cache Configuration
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': os.environ.get('REDIS_URL', 'redis://localhost:6379'),
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
},
'KEY_PREFIX': 'llm_cache',
'TIMEOUT': 3600, # 1 hour default
}
}Project Structure
llm_project/ ├── llm_project/ │ ├── __init__.py │ ├── settings.py │ ├── urls.py │ ├── asgi.py │ ├── wsgi.py │ └── celery.py ├── llm_app/ │ ├── models.py # Prompt templates, conversations │ ├── serializers.py # DRF serializers │ ├── views.py # API views │ ├── tasks.py # Celery tasks │ ├── consumers.py # WebSocket consumers │ ├── services.py # LLM service layer │ ├── admin.py # Admin customization │ └── tests.py # Test suite ├── templates/ ├── static/ ├── requirements.txt └── manage.py
Celery Configuration
# llm_project/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'llm_project.settings')
app = Celery('llm_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')2. Django REST Framework LLM Endpoints
Models
# llm_app/models.py
from django.db import models
from django.contrib.auth.models import User
import uuid
class PromptTemplate(models.Model):
name = models.CharField(max_length=100, unique=True)
description = models.TextField(blank=True)
template = models.TextField(help_text="Use {variables} for placeholders")
system_prompt = models.TextField(blank=True)
created_by = models.ForeignKey(User, on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
is_active = models.BooleanField(default=True)
class Meta:
ordering = ['-created_at']
def __str__(self):
return self.name
class Conversation(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
user = models.ForeignKey(User, on_delete=models.CASCADE)
title = models.CharField(max_length=200, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['-updated_at']
class Message(models.Model):
ROLE_CHOICES = [
('user', 'User'),
('assistant', 'Assistant'),
('system', 'System'),
]
conversation = models.ForeignKey(Conversation, on_delete=models.CASCADE, related_name='messages')
role = models.CharField(max_length=10, choices=ROLE_CHOICES)
content = models.TextField()
tokens_used = models.IntegerField(default=0)
provider = models.CharField(max_length=50, default='openai')
model = models.CharField(max_length=100, default='gpt-3.5-turbo')
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ['created_at']
class APIUsage(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE)
endpoint = models.CharField(max_length=100)
tokens_used = models.IntegerField(default=0)
cost = models.DecimalField(max_digits=10, decimal_places=4, default=0)
provider = models.CharField(max_length=50)
model = models.CharField(max_length=100)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ['-created_at']
indexes = [
models.Index(fields=['user', 'created_at']),
]Serializers
# llm_app/serializers.py
from rest_framework import serializers
from .models import PromptTemplate, Conversation, Message
class PromptTemplateSerializer(serializers.ModelSerializer):
class Meta:
model = PromptTemplate
fields = ['id', 'name', 'description', 'template', 'system_prompt',
'created_at', 'updated_at', 'is_active']
read_only_fields = ['created_at', 'updated_at']
class MessageSerializer(serializers.ModelSerializer):
class Meta:
model = Message
fields = ['id', 'role', 'content', 'tokens_used', 'provider',
'model', 'created_at']
read_only_fields = ['tokens_used', 'created_at']
class ConversationSerializer(serializers.ModelSerializer):
messages = MessageSerializer(many=True, read_only=True)
message_count = serializers.IntegerField(source='messages.count', read_only=True)
class Meta:
model = Conversation
fields = ['id', 'title', 'created_at', 'updated_at',
'messages', 'message_count']
read_only_fields = ['created_at', 'updated_at']
class ChatRequestSerializer(serializers.Serializer):
message = serializers.CharField(max_length=4000)
conversation_id = serializers.UUIDField(required=False)
provider = serializers.ChoiceField(
choices=['openai', 'anthropic', 'google'],
default='openai'
)
model = serializers.CharField(required=False)
temperature = serializers.FloatField(min_value=0, max_value=2, default=0.7)
max_tokens = serializers.IntegerField(min_value=1, max_value=4000, default=1000)
stream = serializers.BooleanField(default=False)LLM Service Layer
# llm_app/services.py
import openai
import anthropic
from django.conf import settings
from typing import AsyncGenerator, Dict, Any
import httpx
import json
class LLMService:
def __init__(self):
self.openai_client = openai.OpenAI(api_key=settings.OPENAI_API_KEY)
self.anthropic_client = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY)
def get_provider_client(self, provider: str):
if provider == 'openai':
return self.openai_client
elif provider == 'anthropic':
return self.anthropic_client
else:
raise ValueError(f"Unsupported provider: {provider}")
def generate_completion(self, messages: list, provider: str = 'openai', **kwargs) -> Dict[str, Any]:
if provider == 'openai':
response = self.openai_client.chat.completions.create(
model=kwargs.get('model', 'gpt-3.5-turbo'),
messages=messages,
temperature=kwargs.get('temperature', 0.7),
max_tokens=kwargs.get('max_tokens', 1000),
)
return {
'content': response.choices[0].message.content,
'tokens_used': response.usage.total_tokens,
'model': response.model,
}
elif provider == 'anthropic':
response = self.anthropic_client.messages.create(
model=kwargs.get('model', 'claude-3-sonnet-20240229'),
messages=messages,
max_tokens=kwargs.get('max_tokens', 1000),
)
return {
'content': response.content[0].text,
'tokens_used': response.usage.input_tokens + response.usage.output_tokens,
'model': response.model,
}
async def generate_stream(self, messages: list, provider: str = 'openai', **kwargs) -> AsyncGenerator[str, None]:
if provider == 'openai':
async with httpx.AsyncClient() as client:
headers = {
"Authorization": f"Bearer {settings.OPENAI_API_KEY}",
"Content-Type": "application/json",
}
data = {
"model": kwargs.get('model', 'gpt-3.5-turbo'),
"messages": messages,
"temperature": kwargs.get('temperature', 0.7),
"max_tokens": kwargs.get('max_tokens', 1000),
"stream": True,
}
async with client.stream(
"POST",
"https://api.openai.com/v1/chat/completions",
json=data,
headers=headers,
timeout=60.0,
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
try:
chunk = json.loads(data)
content = chunk["choices"][0]["delta"].get("content", "")
if content:
yield content
except json.JSONDecodeError:
continueAPI Views
# llm_app/views.py
from rest_framework import status, generics
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from django.http import StreamingHttpResponse
from django.views.decorators.csrf import csrf_exempt
from asgiref.sync import sync_to_async
import asyncio
from .models import Conversation, Message, APIUsage
from .serializers import (
ChatRequestSerializer, ConversationSerializer,
MessageSerializer, PromptTemplateSerializer
)
from .services import LLMService
from .tasks import generate_completion_task
class ChatCompletionView(APIView):
permission_classes = [IsAuthenticated]
def post(self, request):
serializer = ChatRequestSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
# Get or create conversation
conversation_id = data.get('conversation_id')
if conversation_id:
try:
conversation = Conversation.objects.get(
id=conversation_id,
user=request.user
)
except Conversation.DoesNotExist:
return Response(
{'error': 'Conversation not found'},
status=status.HTTP_404_NOT_FOUND
)
else:
conversation = Conversation.objects.create(user=request.user)
# Add user message
user_message = Message.objects.create(
conversation=conversation,
role='user',
content=data['message']
)
# Get conversation context
messages = list(conversation.messages.values('role', 'content'))
# Generate response
if data.get('stream'):
# For streaming, return task ID
task = generate_completion_task.delay(
messages=messages,
provider=data['provider'],
model=data.get('model'),
temperature=data['temperature'],
max_tokens=data['max_tokens'],
conversation_id=str(conversation.id),
user_id=request.user.id
)
return Response({
'conversation_id': conversation.id,
'task_id': task.id,
'stream_url': f'/api/stream/{task.id}/'
})
else:
# Synchronous generation
service = LLMService()
result = service.generate_completion(
messages=messages,
provider=data['provider'],
model=data.get('model'),
temperature=data['temperature'],
max_tokens=data['max_tokens']
)
# Save assistant message
assistant_message = Message.objects.create(
conversation=conversation,
role='assistant',
content=result['content'],
tokens_used=result['tokens_used'],
provider=data['provider'],
model=result['model']
)
# Track usage
APIUsage.objects.create(
user=request.user,
endpoint='chat_completion',
tokens_used=result['tokens_used'],
provider=data['provider'],
model=result['model']
)
return Response({
'conversation_id': conversation.id,
'message': MessageSerializer(assistant_message).data
})
class ConversationListView(generics.ListAPIView):
serializer_class = ConversationSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
return Conversation.objects.filter(user=self.request.user)
class ConversationDetailView(generics.RetrieveDestroyAPIView):
serializer_class = ConversationSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
return Conversation.objects.filter(user=self.request.user)3. Async Views for Streaming
# llm_app/views.py (async views)
from django.http import StreamingHttpResponse
import asyncio
import json
async def stream_chat_view(request, task_id):
"""Async view for streaming LLM responses"""
async def event_stream():
service = LLMService()
# Get task result or generate directly
messages = json.loads(request.GET.get('messages', '[]'))
provider = request.GET.get('provider', 'openai')
yield "data: {"event": "connected"}
"
try:
async for token in service.generate_stream(
messages=messages,
provider=provider
):
data = json.dumps({"token": token})
yield f"data: {data}
"
await asyncio.sleep(0.01) # Small delay to prevent overwhelming
yield "data: {"event": "completed"}
"
except Exception as e:
error_data = json.dumps({"error": str(e)})
yield f"data: {error_data}
"
response = StreamingHttpResponse(
event_stream(),
content_type='text/event-stream'
)
response['Cache-Control'] = 'no-cache'
response['X-Accel-Buffering'] = 'no'
return response
# URL configuration
from django.urls import path
urlpatterns = [
path('api/stream/<str:task_id>/', stream_chat_view, name='stream-chat'),
]Performance Note
For production streaming, use ASGI servers like Daphne or Uvicorn instead of WSGI servers to properly handle async views and streaming responses.
4. Celery Background Tasks
Celery Tasks
# llm_app/tasks.py
from celery import shared_task
from celery.result import AsyncResult
from django.core.cache import cache
from .models import Conversation, Message, APIUsage
from .services import LLMService
import logging
logger = logging.getLogger(__name__)
@shared_task(bind=True, max_retries=3)
def generate_completion_task(self, messages, provider, model, temperature,
max_tokens, conversation_id, user_id):
"""Background task for LLM completion generation"""
try:
# Update task progress
self.update_state(state='PROGRESS', meta={'status': 'Generating response...'})
service = LLMService()
result = service.generate_completion(
messages=messages,
provider=provider,
model=model,
temperature=temperature,
max_tokens=max_tokens
)
# Save to database
conversation = Conversation.objects.get(id=conversation_id)
assistant_message = Message.objects.create(
conversation=conversation,
role='assistant',
content=result['content'],
tokens_used=result['tokens_used'],
provider=provider,
model=result['model']
)
# Track usage
APIUsage.objects.create(
user_id=user_id,
endpoint='chat_completion_async',
tokens_used=result['tokens_used'],
provider=provider,
model=result['model']
)
# Cache result for quick retrieval
cache_key = f'task_result:{self.request.id}'
cache.set(cache_key, result, timeout=3600)
return {
'status': 'completed',
'message_id': assistant_message.id,
'content': result['content'],
'tokens_used': result['tokens_used']
}
except Exception as exc:
logger.error(f"Task {self.request.id} failed: {exc}")
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
@shared_task
def batch_generate_summaries(conversation_ids):
"""Batch process multiple conversations for summarization"""
service = LLMService()
results = []
for conv_id in conversation_ids:
try:
conversation = Conversation.objects.get(id=conv_id)
messages = list(conversation.messages.values('role', 'content'))
# Add summarization prompt
messages.append({
'role': 'user',
'content': 'Please provide a brief summary of this conversation.'
})
result = service.generate_completion(
messages=messages,
provider='openai',
model='gpt-3.5-turbo',
max_tokens=150
)
# Update conversation title if empty
if not conversation.title:
conversation.title = result['content'][:100]
conversation.save()
results.append({
'conversation_id': conv_id,
'summary': result['content']
})
except Exception as e:
logger.error(f"Failed to summarize conversation {conv_id}: {e}")
results.append({
'conversation_id': conv_id,
'error': str(e)
})
return results
@shared_task
def cleanup_old_conversations():
"""Periodic task to clean up old conversations"""
from datetime import timedelta
from django.utils import timezone
cutoff_date = timezone.now() - timedelta(days=30)
deleted_count = Conversation.objects.filter(
updated_at__lt=cutoff_date,
messages__count=0
).delete()[0]
logger.info(f"Cleaned up {deleted_count} old empty conversations")
return deleted_countCelery Beat Schedule
# settings.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'cleanup-old-conversations': {
'task': 'llm_app.tasks.cleanup_old_conversations',
'schedule': crontab(hour=2, minute=0), # Run daily at 2 AM
},
'generate-usage-reports': {
'task': 'llm_app.tasks.generate_usage_reports',
'schedule': crontab(hour=0, minute=0, day_of_week=1), # Weekly on Monday
},
}5. Admin Interface for Prompts
# llm_app/admin.py
from django.contrib import admin
from django.db.models import Count, Sum
from .models import PromptTemplate, Conversation, Message, APIUsage
@admin.register(PromptTemplate)
class PromptTemplateAdmin(admin.ModelAdmin):
list_display = ['name', 'created_by', 'is_active', 'created_at']
list_filter = ['is_active', 'created_at', 'created_by']
search_fields = ['name', 'description', 'template']
readonly_fields = ['created_at', 'updated_at']
fieldsets = (
('Basic Information', {
'fields': ('name', 'description', 'is_active')
}),
('Prompt Configuration', {
'fields': ('template', 'system_prompt'),
'classes': ('wide',)
}),
('Metadata', {
'fields': ('created_by', 'created_at', 'updated_at'),
'classes': ('collapse',)
}),
)
def save_model(self, request, obj, form, change):
if not change:
obj.created_by = request.user
super().save_model(request, obj, form, change)
@admin.register(Conversation)
class ConversationAdmin(admin.ModelAdmin):
list_display = ['id', 'user', 'title', 'message_count', 'updated_at']
list_filter = ['created_at', 'updated_at']
search_fields = ['title', 'user__username', 'user__email']
readonly_fields = ['id', 'created_at', 'updated_at']
def message_count(self, obj):
return obj.messages.count()
message_count.short_description = 'Messages'
def get_queryset(self, request):
return super().get_queryset(request).annotate(
message_count=Count('messages')
)
class MessageInline(admin.TabularInline):
model = Message
extra = 0
readonly_fields = ['created_at', 'tokens_used']
fields = ['role', 'content', 'provider', 'model', 'tokens_used', 'created_at']
@admin.register(APIUsage)
class APIUsageAdmin(admin.ModelAdmin):
list_display = ['user', 'endpoint', 'provider', 'model', 'tokens_used', 'cost', 'created_at']
list_filter = ['provider', 'model', 'endpoint', 'created_at']
date_hierarchy = 'created_at'
def get_queryset(self, request):
qs = super().get_queryset(request)
return qs.select_related('user')
def changelist_view(self, request, extra_context=None):
# Add usage statistics to admin
qs = self.get_queryset(request)
total_tokens = qs.aggregate(Sum('tokens_used'))['tokens_used__sum'] or 0
total_cost = qs.aggregate(Sum('cost'))['cost__sum'] or 0
extra_context = extra_context or {}
extra_context.update({
'total_tokens': total_tokens,
'total_cost': total_cost,
})
return super().changelist_view(request, extra_context=extra_context)6. Authentication & API Keys
Custom Authentication
# llm_app/authentication.py
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from django.contrib.auth.models import User
import hashlib
class APIKeyAuthentication(BaseAuthentication):
def authenticate(self, request):
api_key = request.META.get('HTTP_X_API_KEY')
if not api_key:
return None
try:
# Hash the API key for secure comparison
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
user = User.objects.get(profile__api_key_hash=key_hash)
return (user, api_key)
except User.DoesNotExist:
raise AuthenticationFailed('Invalid API key')
# Models for API key storage
from django.db import models
import secrets
class UserProfile(models.Model):
user = models.OneToOneField(User, on_delete=models.CASCADE)
api_key_hash = models.CharField(max_length=64, unique=True, blank=True)
api_key_created_at = models.DateTimeField(null=True, blank=True)
def generate_api_key(self):
"""Generate a new API key for the user"""
api_key = secrets.token_urlsafe(32)
self.api_key_hash = hashlib.sha256(api_key.encode()).hexdigest()
self.api_key_created_at = timezone.now()
self.save()
return api_key # Return once for user to save
# Views for API key management
class GenerateAPIKeyView(APIView):
permission_classes = [IsAuthenticated]
def post(self, request):
profile, created = UserProfile.objects.get_or_create(user=request.user)
api_key = profile.generate_api_key()
return Response({
'api_key': api_key,
'message': 'Save this key securely. It cannot be retrieved again.'
})
# Rate limiting per API key
from rest_framework.throttling import BaseThrottle
class APIKeyRateThrottle(BaseThrottle):
scope = 'api_key'
def get_cache_key(self, request, view):
if hasattr(request, 'auth') and request.auth:
# Use API key for rate limiting
return self.cache_format % {
'scope': self.scope,
'ident': hashlib.md5(request.auth.encode()).hexdigest()
}
return None7. Response Caching
# llm_app/decorators.py
from django.core.cache import cache
from functools import wraps
import hashlib
import json
def cache_llm_response(timeout=3600):
"""Decorator to cache LLM responses"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Create cache key from function arguments
cache_key = f"llm:{func.__name__}:{hashlib.md5(
json.dumps({
'args': str(args),
'kwargs': str(kwargs)
}, sort_keys=True).encode()
).hexdigest()}"
# Try to get from cache
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result
# Generate result
result = func(*args, **kwargs)
# Cache the result
cache.set(cache_key, result, timeout=timeout)
return result
return wrapper
return decorator
# Usage in service
class CachedLLMService(LLMService):
@cache_llm_response(timeout=3600)
def generate_completion_cached(self, messages, provider='openai', **kwargs):
# For caching, only cache non-personalized prompts
if self._is_cacheable(messages):
return self.generate_completion(messages, provider, **kwargs)
return self.generate_completion(messages, provider, **kwargs)
def _is_cacheable(self, messages):
# Don't cache if messages contain user-specific data
for msg in messages:
if any(keyword in msg.get('content', '').lower()
for keyword in ['my', 'i', 'me', 'personal']):
return False
return True
# Django cache middleware for views
from django.views.decorators.cache import cache_page
class CachedPromptTemplateView(APIView):
@cache_page(60 * 15) # Cache for 15 minutes
def get(self, request, template_id):
template = PromptTemplate.objects.get(id=template_id)
return Response(PromptTemplateSerializer(template).data)8. File Handling for Multimodal
# llm_app/views.py
from django.core.files.storage import default_storage
from django.core.files.base import ContentFile
import base64
import mimetypes
class MultimodalChatView(APIView):
permission_classes = [IsAuthenticated]
parser_classes = [MultiPartParser, FormParser]
def post(self, request):
serializer = MultimodalChatSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
message = serializer.validated_data['message']
files = request.FILES.getlist('files')
provider = serializer.validated_data.get('provider', 'openai')
# Process files
file_contents = []
for file in files[:3]: # Limit to 3 files
if file.size > 10 * 1024 * 1024: # 10MB limit
return Response(
{'error': f'File {file.name} exceeds 10MB limit'},
status=status.HTTP_400_BAD_REQUEST
)
# Save file temporarily
file_path = default_storage.save(
f'temp/{request.user.id}/{file.name}',
ContentFile(file.read())
)
# Convert to base64 for API
with default_storage.open(file_path, 'rb') as f:
file_data = base64.b64encode(f.read()).decode('utf-8')
mime_type = mimetypes.guess_type(file.name)[0] or 'application/octet-stream'
file_contents.append({
'type': 'image' if mime_type.startswith('image/') else 'file',
'data': file_data,
'mime_type': mime_type,
'name': file.name
})
# Clean up temp file
default_storage.delete(file_path)
# Prepare messages for multimodal API
messages = [{
'role': 'user',
'content': [
{'type': 'text', 'text': message},
*[{
'type': 'image_url',
'image_url': {
'url': f"data:{f['mime_type']};base64,{f['data']}"
}
} for f in file_contents if f['type'] == 'image']
]
}]
# Call appropriate provider
if provider == 'openai':
response = self._call_openai_vision(messages)
elif provider == 'anthropic':
response = self._call_anthropic_vision(messages, file_contents)
else:
return Response(
{'error': 'Provider does not support multimodal'},
status=status.HTTP_400_BAD_REQUEST
)
return Response(response)
def _call_openai_vision(self, messages):
client = openai.OpenAI(api_key=settings.OPENAI_API_KEY)
response = client.chat.completions.create(
model="gpt-4-vision-preview",
messages=messages,
max_tokens=500
)
return {
'content': response.choices[0].message.content,
'model': 'gpt-4-vision-preview'
}9. Django Channels WebSocket
ASGI Configuration
# llm_project/asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from channels.security.websocket import AllowedHostsOriginValidator
import llm_app.routing
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'llm_project.settings')
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AllowedHostsOriginValidator(
AuthMiddlewareStack(
URLRouter(
llm_app.routing.websocket_urlpatterns
)
)
),
})WebSocket Consumers
# llm_app/consumers.py
from channels.generic.websocket import AsyncWebsocketConsumer
import json
from channels.db import database_sync_to_async
from .models import Conversation, Message
from .services import LLMService
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.user = self.scope["user"]
if not self.user.is_authenticated:
await self.close()
return
self.conversation_id = self.scope["url_route"]["kwargs"]["conversation_id"]
self.conversation_group = f"chat_{self.conversation_id}"
# Join conversation group
await self.channel_layer.group_add(
self.conversation_group,
self.channel_name
)
await self.accept()
await self.send(text_data=json.dumps({
"type": "connection_established",
"conversation_id": self.conversation_id
}))
async def disconnect(self, close_code):
# Leave conversation group
await self.channel_layer.group_discard(
self.conversation_group,
self.channel_name
)
async def receive(self, text_data):
data = json.loads(text_data)
message_type = data.get("type")
if message_type == "chat_message":
await self.handle_chat_message(data)
elif message_type == "typing_indicator":
await self.handle_typing_indicator(data)
async def handle_chat_message(self, data):
content = data["content"]
provider = data.get("provider", "openai")
# Save user message
user_message = await self.save_message("user", content)
# Send user message to group
await self.channel_layer.group_send(
self.conversation_group,
{
"type": "chat_message",
"message": {
"id": str(user_message.id),
"role": "user",
"content": content,
"timestamp": user_message.created_at.isoformat()
}
}
)
# Generate AI response
service = LLMService()
messages = await self.get_conversation_messages()
# Send typing indicator
await self.channel_layer.group_send(
self.conversation_group,
{"type": "typing_indicator", "is_typing": True}
)
# Stream response
full_response = ""
async for token in service.generate_stream(messages, provider):
full_response += token
await self.send(text_data=json.dumps({
"type": "stream_token",
"token": token
}))
# Save assistant message
assistant_message = await self.save_message("assistant", full_response, provider)
# Send complete message
await self.channel_layer.group_send(
self.conversation_group,
{
"type": "chat_message",
"message": {
"id": str(assistant_message.id),
"role": "assistant",
"content": full_response,
"timestamp": assistant_message.created_at.isoformat()
}
}
)
@database_sync_to_async
def save_message(self, role, content, provider="openai"):
return Message.objects.create(
conversation_id=self.conversation_id,
role=role,
content=content,
provider=provider
)
@database_sync_to_async
def get_conversation_messages(self):
messages = Message.objects.filter(
conversation_id=self.conversation_id
).order_by('created_at').values('role', 'content')
return list(messages)
# Handler for group messages
async def chat_message(self, event):
await self.send(text_data=json.dumps({
"type": "chat_message",
"message": event["message"]
}))
async def typing_indicator(self, event):
await self.send(text_data=json.dumps({
"type": "typing_indicator",
"is_typing": event["is_typing"]
}))
# Routing
# llm_app/routing.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/chat/(?P<conversation_id>[^/]+)/$', consumers.ChatConsumer.as_asgi()),
]10. Production Deployment
Gunicorn Configuration
# gunicorn.conf.py import multiprocessing bind = "0.0.0.0:8000" workers = multiprocessing.cpu_count() * 2 + 1 worker_class = "sync" # Use 'uvicorn.workers.UvicornWorker' for async worker_connections = 1000 keepalive = 5 threads = 2 # Logging accesslog = "/var/log/gunicorn/access.log" errorlog = "/var/log/gunicorn/error.log" loglevel = "info" # Process naming proc_name = 'llm_django_app' # Server mechanics daemon = False pidfile = '/var/run/gunicorn.pid' user = 'www-data' group = 'www-data' tmp_upload_dir = None # SSL (if not using nginx) # keyfile = '/path/to/keyfile' # certfile = '/path/to/certfile' # Worker timeout (important for long LLM requests) timeout = 120 graceful_timeout = 30 # Restart workers after this many requests max_requests = 1000 max_requests_jitter = 50
Daphne for WebSockets
# Install Daphne pip install daphne # Run Daphne for WebSocket support daphne -b 0.0.0.0 -p 8001 llm_project.asgi:application
Supervisor Configuration
; /etc/supervisor/conf.d/llm_django.conf [program:llm_django_gunicorn] command=/path/to/venv/bin/gunicorn llm_project.wsgi:application -c /path/to/gunicorn.conf.py directory=/path/to/project user=www-data autostart=true autorestart=true redirect_stderr=true stdout_logfile=/var/log/supervisor/llm_django_gunicorn.log environment=PATH="/path/to/venv/bin",DJANGO_SETTINGS_MODULE="llm_project.settings" [program:llm_django_daphne] command=/path/to/venv/bin/daphne -b 0.0.0.0 -p 8001 llm_project.asgi:application directory=/path/to/project user=www-data autostart=true autorestart=true redirect_stderr=true stdout_logfile=/var/log/supervisor/llm_django_daphne.log [program:llm_django_celery] command=/path/to/venv/bin/celery -A llm_project worker -l info directory=/path/to/project user=www-data numprocs=1 autostart=true autorestart=true startsecs=10 stopwaitsecs=600 stdout_logfile=/var/log/supervisor/llm_django_celery.log [program:llm_django_celery_beat] command=/path/to/venv/bin/celery -A llm_project beat -l info directory=/path/to/project user=www-data numprocs=1 autostart=true autorestart=true stdout_logfile=/var/log/supervisor/llm_django_celery_beat.log
Nginx Configuration
# /etc/nginx/sites-available/llm_django
upstream django_app {
server localhost:8000;
}
upstream websocket_app {
server localhost:8001;
}
server {
listen 80;
server_name api.example.com;
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name api.example.com;
ssl_certificate /etc/letsencrypt/live/api.example.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/api.example.com/privkey.pem;
client_max_body_size 10M;
location /static/ {
alias /path/to/project/staticfiles/;
expires 30d;
}
location /media/ {
alias /path/to/project/media/;
expires 30d;
}
location /ws/ {
proxy_pass http://websocket_app;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location / {
proxy_pass http://django_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Timeouts for long LLM requests
proxy_connect_timeout 300s;
proxy_send_timeout 300s;
proxy_read_timeout 300s;
}
}Docker Deployment
# Dockerfile FROM python:3.11-slim # Install system dependencies RUN apt-get update && apt-get install -y gcc postgresql-client && rm -rf /var/lib/apt/lists/* # Set work directory WORKDIR /app # Install Python dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Copy project COPY . . # Collect static files RUN python manage.py collectstatic --noinput # Run migrations RUN python manage.py migrate # Create user RUN useradd -m -u 1000 django && chown -R django:django /app USER django # Expose port EXPOSE 8000 # Run gunicorn CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "4", "llm_project.wsgi:application"]
Docker Compose
# docker-compose.yml
version: '3.8'
services:
db:
image: postgres:15
environment:
POSTGRES_DB: llm_db
POSTGRES_USER: llm_user
POSTGRES_PASSWORD: secure_password
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
command: redis-server --appendonly yes
volumes:
- redis_data:/data
web:
build: .
command: gunicorn llm_project.wsgi:application --bind 0.0.0.0:8000
volumes:
- .:/app
- static_volume:/app/staticfiles
- media_volume:/app/media
ports:
- "8000:8000"
environment:
- DJANGO_SETTINGS_MODULE=llm_project.settings
- DATABASE_URL=postgresql://llm_user:secure_password@db:5432/llm_db
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
daphne:
build: .
command: daphne -b 0.0.0.0 -p 8001 llm_project.asgi:application
volumes:
- .:/app
ports:
- "8001:8001"
environment:
- DJANGO_SETTINGS_MODULE=llm_project.settings
- DATABASE_URL=postgresql://llm_user:secure_password@db:5432/llm_db
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
celery:
build: .
command: celery -A llm_project worker -l info
volumes:
- .:/app
environment:
- DJANGO_SETTINGS_MODULE=llm_project.settings
- DATABASE_URL=postgresql://llm_user:secure_password@db:5432/llm_db
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
celery-beat:
build: .
command: celery -A llm_project beat -l info
volumes:
- .:/app
environment:
- DJANGO_SETTINGS_MODULE=llm_project.settings
- DATABASE_URL=postgresql://llm_user:secure_password@db:5432/llm_db
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
volumes:
postgres_data:
redis_data:
static_volume:
media_volume:✓ Production Checklist
- ☐ Set DEBUG=False in production
- ☐ Configure ALLOWED_HOSTS properly
- ☐ Use environment variables for secrets
- ☐ Set up SSL certificates
- ☐ Configure database connection pooling
- ☐ Set up monitoring (Sentry, New Relic)
- ☐ Configure log aggregation
- ☐ Set up backup strategy
- ☐ Implement health check endpoints
- ☐ Configure auto-scaling policies
Testing Strategies
# llm_app/tests.py
from django.test import TestCase, TransactionTestCase
from django.contrib.auth.models import User
from rest_framework.test import APITestCase
from unittest.mock import patch, MagicMock
from channels.testing import WebsocketCommunicator
from .models import Conversation, Message
from .consumers import ChatConsumer
class LLMAPITestCase(APITestCase):
def setUp(self):
self.user = User.objects.create_user(
username='testuser',
password='testpass123'
)
self.client.force_authenticate(user=self.user)
@patch('llm_app.services.LLMService.generate_completion')
def test_chat_completion(self, mock_generate):
mock_generate.return_value = {
'content': 'Test response',
'tokens_used': 50,
'model': 'gpt-3.5-turbo'
}
response = self.client.post('/api/chat/', {
'message': 'Hello, AI!',
'provider': 'openai'
})
self.assertEqual(response.status_code, 200)
self.assertIn('conversation_id', response.data)
self.assertEqual(
response.data['message']['content'],
'Test response'
)
def test_rate_limiting(self):
# Make requests up to the limit
for i in range(100):
response = self.client.post('/api/chat/', {
'message': f'Test {i}'
})
if response.status_code == 429:
break
# Verify rate limit is enforced
self.assertEqual(response.status_code, 429)
class WebSocketTestCase(TransactionTestCase):
async def test_chat_websocket(self):
# Create test user and conversation
user = await sync_to_async(User.objects.create_user)(
username='wstest',
password='testpass'
)
conversation = await sync_to_async(Conversation.objects.create)(
user=user
)
# Create WebSocket communicator
communicator = WebsocketCommunicator(
ChatConsumer.as_asgi(),
f"/ws/chat/{conversation.id}/"
)
communicator.scope['user'] = user
# Connect
connected, _ = await communicator.connect()
self.assertTrue(connected)
# Send message
await communicator.send_json_to({
'type': 'chat_message',
'content': 'Test message',
'provider': 'openai'
})
# Receive response
response = await communicator.receive_json_from()
self.assertEqual(response['type'], 'chat_message')
# Disconnect
await communicator.disconnect()References & Citations
Start Building with Django
Create powerful Django applications with integrated LLM capabilities using our unified API gateway.
References
- [1] AWS. "Lambda Documentation" (2024)
- [2] Vercel. "Streaming Responses" (2024)
- [3] GitHub. "OpenAI Node.js Library" (2024)