Quick Start
Set up a new Django project with LLM support:
django-admin startproject llm_project cd llm_project pip install django djangorestframework openai anthropic pip install celery redis django-channels httpx
1. Project Setup & Configuration
Settings Configuration
# settings.py import os from pathlib import Path # Build paths BASE_DIR = Path(__file__).resolve().parent.parent # Security SECRET_KEY = os.environ.get('DJANGO_SECRET_KEY') DEBUG = os.environ.get('DEBUG', 'False') == 'True' ALLOWED_HOSTS = os.environ.get('ALLOWED_HOSTS', '').split(',') # Application definition INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'rest_framework', 'rest_framework.authtoken', 'channels', 'llm_app', # Your app ] MIDDLEWARE = [ 'django.middleware.security.SecurityMiddleware', 'django.contrib.sessions.middleware.SessionMiddleware', 'django.middleware.common.CommonMiddleware', 'django.middleware.csrf.CsrfViewMiddleware', 'django.contrib.auth.middleware.AuthenticationMiddleware', 'django.contrib.messages.middleware.MessageMiddleware', 'django.middleware.clickjacking.XFrameOptionsMiddleware', ] # LLM API Configuration OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY') ANTHROPIC_API_KEY = os.environ.get('ANTHROPIC_API_KEY') GOOGLE_API_KEY = os.environ.get('GOOGLE_API_KEY') # Django REST Framework REST_FRAMEWORK = { 'DEFAULT_AUTHENTICATION_CLASSES': [ 'rest_framework.authentication.TokenAuthentication', 'rest_framework.authentication.SessionAuthentication', ], 'DEFAULT_PERMISSION_CLASSES': [ 'rest_framework.permissions.IsAuthenticated', ], 'DEFAULT_THROTTLE_CLASSES': [ 'rest_framework.throttling.AnonRateThrottle', 'rest_framework.throttling.UserRateThrottle' ], 'DEFAULT_THROTTLE_RATES': { 'anon': '10/hour', 'user': '100/hour' } } # Celery Configuration CELERY_BROKER_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0') CELERY_RESULT_BACKEND = os.environ.get('REDIS_URL', 'redis://localhost:6379/0') CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json'] # Channels Configuration ASGI_APPLICATION = 'llm_project.asgi.application' CHANNEL_LAYERS = { 'default': { 'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': { 'hosts': [(os.environ.get('REDIS_URL', 'redis://localhost:6379'))], }, }, } # Cache Configuration CACHES = { 'default': { 'BACKEND': 'django.core.cache.backends.redis.RedisCache', 'LOCATION': os.environ.get('REDIS_URL', 'redis://localhost:6379'), 'OPTIONS': { 'CLIENT_CLASS': 'django_redis.client.DefaultClient', }, 'KEY_PREFIX': 'llm_cache', 'TIMEOUT': 3600, # 1 hour default } }
Project Structure
llm_project/ ├── llm_project/ │ ├── __init__.py │ ├── settings.py │ ├── urls.py │ ├── asgi.py │ ├── wsgi.py │ └── celery.py ├── llm_app/ │ ├── models.py # Prompt templates, conversations │ ├── serializers.py # DRF serializers │ ├── views.py # API views │ ├── tasks.py # Celery tasks │ ├── consumers.py # WebSocket consumers │ ├── services.py # LLM service layer │ ├── admin.py # Admin customization │ └── tests.py # Test suite ├── templates/ ├── static/ ├── requirements.txt └── manage.py
Celery Configuration
# llm_project/celery.py import os from celery import Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'llm_project.settings') app = Celery('llm_project') app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print(f'Request: {self.request!r}')
2. Django REST Framework LLM Endpoints
Models
# llm_app/models.py from django.db import models from django.contrib.auth.models import User import uuid class PromptTemplate(models.Model): name = models.CharField(max_length=100, unique=True) description = models.TextField(blank=True) template = models.TextField(help_text="Use {variables} for placeholders") system_prompt = models.TextField(blank=True) created_by = models.ForeignKey(User, on_delete=models.CASCADE) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) is_active = models.BooleanField(default=True) class Meta: ordering = ['-created_at'] def __str__(self): return self.name class Conversation(models.Model): id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) user = models.ForeignKey(User, on_delete=models.CASCADE) title = models.CharField(max_length=200, blank=True) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class Meta: ordering = ['-updated_at'] class Message(models.Model): ROLE_CHOICES = [ ('user', 'User'), ('assistant', 'Assistant'), ('system', 'System'), ] conversation = models.ForeignKey(Conversation, on_delete=models.CASCADE, related_name='messages') role = models.CharField(max_length=10, choices=ROLE_CHOICES) content = models.TextField() tokens_used = models.IntegerField(default=0) provider = models.CharField(max_length=50, default='openai') model = models.CharField(max_length=100, default='gpt-3.5-turbo') created_at = models.DateTimeField(auto_now_add=True) class Meta: ordering = ['created_at'] class APIUsage(models.Model): user = models.ForeignKey(User, on_delete=models.CASCADE) endpoint = models.CharField(max_length=100) tokens_used = models.IntegerField(default=0) cost = models.DecimalField(max_digits=10, decimal_places=4, default=0) provider = models.CharField(max_length=50) model = models.CharField(max_length=100) created_at = models.DateTimeField(auto_now_add=True) class Meta: ordering = ['-created_at'] indexes = [ models.Index(fields=['user', 'created_at']), ]
Serializers
# llm_app/serializers.py from rest_framework import serializers from .models import PromptTemplate, Conversation, Message class PromptTemplateSerializer(serializers.ModelSerializer): class Meta: model = PromptTemplate fields = ['id', 'name', 'description', 'template', 'system_prompt', 'created_at', 'updated_at', 'is_active'] read_only_fields = ['created_at', 'updated_at'] class MessageSerializer(serializers.ModelSerializer): class Meta: model = Message fields = ['id', 'role', 'content', 'tokens_used', 'provider', 'model', 'created_at'] read_only_fields = ['tokens_used', 'created_at'] class ConversationSerializer(serializers.ModelSerializer): messages = MessageSerializer(many=True, read_only=True) message_count = serializers.IntegerField(source='messages.count', read_only=True) class Meta: model = Conversation fields = ['id', 'title', 'created_at', 'updated_at', 'messages', 'message_count'] read_only_fields = ['created_at', 'updated_at'] class ChatRequestSerializer(serializers.Serializer): message = serializers.CharField(max_length=4000) conversation_id = serializers.UUIDField(required=False) provider = serializers.ChoiceField( choices=['openai', 'anthropic', 'google'], default='openai' ) model = serializers.CharField(required=False) temperature = serializers.FloatField(min_value=0, max_value=2, default=0.7) max_tokens = serializers.IntegerField(min_value=1, max_value=4000, default=1000) stream = serializers.BooleanField(default=False)
LLM Service Layer
# llm_app/services.py import openai import anthropic from django.conf import settings from typing import AsyncGenerator, Dict, Any import httpx import json class LLMService: def __init__(self): self.openai_client = openai.OpenAI(api_key=settings.OPENAI_API_KEY) self.anthropic_client = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY) def get_provider_client(self, provider: str): if provider == 'openai': return self.openai_client elif provider == 'anthropic': return self.anthropic_client else: raise ValueError(f"Unsupported provider: {provider}") def generate_completion(self, messages: list, provider: str = 'openai', **kwargs) -> Dict[str, Any]: if provider == 'openai': response = self.openai_client.chat.completions.create( model=kwargs.get('model', 'gpt-3.5-turbo'), messages=messages, temperature=kwargs.get('temperature', 0.7), max_tokens=kwargs.get('max_tokens', 1000), ) return { 'content': response.choices[0].message.content, 'tokens_used': response.usage.total_tokens, 'model': response.model, } elif provider == 'anthropic': response = self.anthropic_client.messages.create( model=kwargs.get('model', 'claude-3-sonnet-20240229'), messages=messages, max_tokens=kwargs.get('max_tokens', 1000), ) return { 'content': response.content[0].text, 'tokens_used': response.usage.input_tokens + response.usage.output_tokens, 'model': response.model, } async def generate_stream(self, messages: list, provider: str = 'openai', **kwargs) -> AsyncGenerator[str, None]: if provider == 'openai': async with httpx.AsyncClient() as client: headers = { "Authorization": f"Bearer {settings.OPENAI_API_KEY}", "Content-Type": "application/json", } data = { "model": kwargs.get('model', 'gpt-3.5-turbo'), "messages": messages, "temperature": kwargs.get('temperature', 0.7), "max_tokens": kwargs.get('max_tokens', 1000), "stream": True, } async with client.stream( "POST", "https://api.openai.com/v1/chat/completions", json=data, headers=headers, timeout=60.0, ) as response: async for line in response.aiter_lines(): if line.startswith("data: "): data = line[6:] if data == "[DONE]": break try: chunk = json.loads(data) content = chunk["choices"][0]["delta"].get("content", "") if content: yield content except json.JSONDecodeError: continue
API Views
# llm_app/views.py from rest_framework import status, generics from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.permissions import IsAuthenticated from django.http import StreamingHttpResponse from django.views.decorators.csrf import csrf_exempt from asgiref.sync import sync_to_async import asyncio from .models import Conversation, Message, APIUsage from .serializers import ( ChatRequestSerializer, ConversationSerializer, MessageSerializer, PromptTemplateSerializer ) from .services import LLMService from .tasks import generate_completion_task class ChatCompletionView(APIView): permission_classes = [IsAuthenticated] def post(self, request): serializer = ChatRequestSerializer(data=request.data) serializer.is_valid(raise_exception=True) data = serializer.validated_data # Get or create conversation conversation_id = data.get('conversation_id') if conversation_id: try: conversation = Conversation.objects.get( id=conversation_id, user=request.user ) except Conversation.DoesNotExist: return Response( {'error': 'Conversation not found'}, status=status.HTTP_404_NOT_FOUND ) else: conversation = Conversation.objects.create(user=request.user) # Add user message user_message = Message.objects.create( conversation=conversation, role='user', content=data['message'] ) # Get conversation context messages = list(conversation.messages.values('role', 'content')) # Generate response if data.get('stream'): # For streaming, return task ID task = generate_completion_task.delay( messages=messages, provider=data['provider'], model=data.get('model'), temperature=data['temperature'], max_tokens=data['max_tokens'], conversation_id=str(conversation.id), user_id=request.user.id ) return Response({ 'conversation_id': conversation.id, 'task_id': task.id, 'stream_url': f'/api/stream/{task.id}/' }) else: # Synchronous generation service = LLMService() result = service.generate_completion( messages=messages, provider=data['provider'], model=data.get('model'), temperature=data['temperature'], max_tokens=data['max_tokens'] ) # Save assistant message assistant_message = Message.objects.create( conversation=conversation, role='assistant', content=result['content'], tokens_used=result['tokens_used'], provider=data['provider'], model=result['model'] ) # Track usage APIUsage.objects.create( user=request.user, endpoint='chat_completion', tokens_used=result['tokens_used'], provider=data['provider'], model=result['model'] ) return Response({ 'conversation_id': conversation.id, 'message': MessageSerializer(assistant_message).data }) class ConversationListView(generics.ListAPIView): serializer_class = ConversationSerializer permission_classes = [IsAuthenticated] def get_queryset(self): return Conversation.objects.filter(user=self.request.user) class ConversationDetailView(generics.RetrieveDestroyAPIView): serializer_class = ConversationSerializer permission_classes = [IsAuthenticated] def get_queryset(self): return Conversation.objects.filter(user=self.request.user)
3. Async Views for Streaming
# llm_app/views.py (async views) from django.http import StreamingHttpResponse import asyncio import json async def stream_chat_view(request, task_id): """Async view for streaming LLM responses""" async def event_stream(): service = LLMService() # Get task result or generate directly messages = json.loads(request.GET.get('messages', '[]')) provider = request.GET.get('provider', 'openai') yield "data: {"event": "connected"} " try: async for token in service.generate_stream( messages=messages, provider=provider ): data = json.dumps({"token": token}) yield f"data: {data} " await asyncio.sleep(0.01) # Small delay to prevent overwhelming yield "data: {"event": "completed"} " except Exception as e: error_data = json.dumps({"error": str(e)}) yield f"data: {error_data} " response = StreamingHttpResponse( event_stream(), content_type='text/event-stream' ) response['Cache-Control'] = 'no-cache' response['X-Accel-Buffering'] = 'no' return response # URL configuration from django.urls import path urlpatterns = [ path('api/stream/<str:task_id>/', stream_chat_view, name='stream-chat'), ]
Performance Note
For production streaming, use ASGI servers like Daphne or Uvicorn instead of WSGI servers to properly handle async views and streaming responses.
4. Celery Background Tasks
Celery Tasks
# llm_app/tasks.py from celery import shared_task from celery.result import AsyncResult from django.core.cache import cache from .models import Conversation, Message, APIUsage from .services import LLMService import logging logger = logging.getLogger(__name__) @shared_task(bind=True, max_retries=3) def generate_completion_task(self, messages, provider, model, temperature, max_tokens, conversation_id, user_id): """Background task for LLM completion generation""" try: # Update task progress self.update_state(state='PROGRESS', meta={'status': 'Generating response...'}) service = LLMService() result = service.generate_completion( messages=messages, provider=provider, model=model, temperature=temperature, max_tokens=max_tokens ) # Save to database conversation = Conversation.objects.get(id=conversation_id) assistant_message = Message.objects.create( conversation=conversation, role='assistant', content=result['content'], tokens_used=result['tokens_used'], provider=provider, model=result['model'] ) # Track usage APIUsage.objects.create( user_id=user_id, endpoint='chat_completion_async', tokens_used=result['tokens_used'], provider=provider, model=result['model'] ) # Cache result for quick retrieval cache_key = f'task_result:{self.request.id}' cache.set(cache_key, result, timeout=3600) return { 'status': 'completed', 'message_id': assistant_message.id, 'content': result['content'], 'tokens_used': result['tokens_used'] } except Exception as exc: logger.error(f"Task {self.request.id} failed: {exc}") # Retry with exponential backoff raise self.retry(exc=exc, countdown=2 ** self.request.retries) @shared_task def batch_generate_summaries(conversation_ids): """Batch process multiple conversations for summarization""" service = LLMService() results = [] for conv_id in conversation_ids: try: conversation = Conversation.objects.get(id=conv_id) messages = list(conversation.messages.values('role', 'content')) # Add summarization prompt messages.append({ 'role': 'user', 'content': 'Please provide a brief summary of this conversation.' }) result = service.generate_completion( messages=messages, provider='openai', model='gpt-3.5-turbo', max_tokens=150 ) # Update conversation title if empty if not conversation.title: conversation.title = result['content'][:100] conversation.save() results.append({ 'conversation_id': conv_id, 'summary': result['content'] }) except Exception as e: logger.error(f"Failed to summarize conversation {conv_id}: {e}") results.append({ 'conversation_id': conv_id, 'error': str(e) }) return results @shared_task def cleanup_old_conversations(): """Periodic task to clean up old conversations""" from datetime import timedelta from django.utils import timezone cutoff_date = timezone.now() - timedelta(days=30) deleted_count = Conversation.objects.filter( updated_at__lt=cutoff_date, messages__count=0 ).delete()[0] logger.info(f"Cleaned up {deleted_count} old empty conversations") return deleted_count
Celery Beat Schedule
# settings.py from celery.schedules import crontab CELERY_BEAT_SCHEDULE = { 'cleanup-old-conversations': { 'task': 'llm_app.tasks.cleanup_old_conversations', 'schedule': crontab(hour=2, minute=0), # Run daily at 2 AM }, 'generate-usage-reports': { 'task': 'llm_app.tasks.generate_usage_reports', 'schedule': crontab(hour=0, minute=0, day_of_week=1), # Weekly on Monday }, }
5. Admin Interface for Prompts
# llm_app/admin.py from django.contrib import admin from django.db.models import Count, Sum from .models import PromptTemplate, Conversation, Message, APIUsage @admin.register(PromptTemplate) class PromptTemplateAdmin(admin.ModelAdmin): list_display = ['name', 'created_by', 'is_active', 'created_at'] list_filter = ['is_active', 'created_at', 'created_by'] search_fields = ['name', 'description', 'template'] readonly_fields = ['created_at', 'updated_at'] fieldsets = ( ('Basic Information', { 'fields': ('name', 'description', 'is_active') }), ('Prompt Configuration', { 'fields': ('template', 'system_prompt'), 'classes': ('wide',) }), ('Metadata', { 'fields': ('created_by', 'created_at', 'updated_at'), 'classes': ('collapse',) }), ) def save_model(self, request, obj, form, change): if not change: obj.created_by = request.user super().save_model(request, obj, form, change) @admin.register(Conversation) class ConversationAdmin(admin.ModelAdmin): list_display = ['id', 'user', 'title', 'message_count', 'updated_at'] list_filter = ['created_at', 'updated_at'] search_fields = ['title', 'user__username', 'user__email'] readonly_fields = ['id', 'created_at', 'updated_at'] def message_count(self, obj): return obj.messages.count() message_count.short_description = 'Messages' def get_queryset(self, request): return super().get_queryset(request).annotate( message_count=Count('messages') ) class MessageInline(admin.TabularInline): model = Message extra = 0 readonly_fields = ['created_at', 'tokens_used'] fields = ['role', 'content', 'provider', 'model', 'tokens_used', 'created_at'] @admin.register(APIUsage) class APIUsageAdmin(admin.ModelAdmin): list_display = ['user', 'endpoint', 'provider', 'model', 'tokens_used', 'cost', 'created_at'] list_filter = ['provider', 'model', 'endpoint', 'created_at'] date_hierarchy = 'created_at' def get_queryset(self, request): qs = super().get_queryset(request) return qs.select_related('user') def changelist_view(self, request, extra_context=None): # Add usage statistics to admin qs = self.get_queryset(request) total_tokens = qs.aggregate(Sum('tokens_used'))['tokens_used__sum'] or 0 total_cost = qs.aggregate(Sum('cost'))['cost__sum'] or 0 extra_context = extra_context or {} extra_context.update({ 'total_tokens': total_tokens, 'total_cost': total_cost, }) return super().changelist_view(request, extra_context=extra_context)
6. Authentication & API Keys
Custom Authentication
# llm_app/authentication.py from rest_framework.authentication import BaseAuthentication from rest_framework.exceptions import AuthenticationFailed from django.contrib.auth.models import User import hashlib class APIKeyAuthentication(BaseAuthentication): def authenticate(self, request): api_key = request.META.get('HTTP_X_API_KEY') if not api_key: return None try: # Hash the API key for secure comparison key_hash = hashlib.sha256(api_key.encode()).hexdigest() user = User.objects.get(profile__api_key_hash=key_hash) return (user, api_key) except User.DoesNotExist: raise AuthenticationFailed('Invalid API key') # Models for API key storage from django.db import models import secrets class UserProfile(models.Model): user = models.OneToOneField(User, on_delete=models.CASCADE) api_key_hash = models.CharField(max_length=64, unique=True, blank=True) api_key_created_at = models.DateTimeField(null=True, blank=True) def generate_api_key(self): """Generate a new API key for the user""" api_key = secrets.token_urlsafe(32) self.api_key_hash = hashlib.sha256(api_key.encode()).hexdigest() self.api_key_created_at = timezone.now() self.save() return api_key # Return once for user to save # Views for API key management class GenerateAPIKeyView(APIView): permission_classes = [IsAuthenticated] def post(self, request): profile, created = UserProfile.objects.get_or_create(user=request.user) api_key = profile.generate_api_key() return Response({ 'api_key': api_key, 'message': 'Save this key securely. It cannot be retrieved again.' }) # Rate limiting per API key from rest_framework.throttling import BaseThrottle class APIKeyRateThrottle(BaseThrottle): scope = 'api_key' def get_cache_key(self, request, view): if hasattr(request, 'auth') and request.auth: # Use API key for rate limiting return self.cache_format % { 'scope': self.scope, 'ident': hashlib.md5(request.auth.encode()).hexdigest() } return None
7. Response Caching
# llm_app/decorators.py from django.core.cache import cache from functools import wraps import hashlib import json def cache_llm_response(timeout=3600): """Decorator to cache LLM responses""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): # Create cache key from function arguments cache_key = f"llm:{func.__name__}:{hashlib.md5( json.dumps({ 'args': str(args), 'kwargs': str(kwargs) }, sort_keys=True).encode() ).hexdigest()}" # Try to get from cache cached_result = cache.get(cache_key) if cached_result is not None: return cached_result # Generate result result = func(*args, **kwargs) # Cache the result cache.set(cache_key, result, timeout=timeout) return result return wrapper return decorator # Usage in service class CachedLLMService(LLMService): @cache_llm_response(timeout=3600) def generate_completion_cached(self, messages, provider='openai', **kwargs): # For caching, only cache non-personalized prompts if self._is_cacheable(messages): return self.generate_completion(messages, provider, **kwargs) return self.generate_completion(messages, provider, **kwargs) def _is_cacheable(self, messages): # Don't cache if messages contain user-specific data for msg in messages: if any(keyword in msg.get('content', '').lower() for keyword in ['my', 'i', 'me', 'personal']): return False return True # Django cache middleware for views from django.views.decorators.cache import cache_page class CachedPromptTemplateView(APIView): @cache_page(60 * 15) # Cache for 15 minutes def get(self, request, template_id): template = PromptTemplate.objects.get(id=template_id) return Response(PromptTemplateSerializer(template).data)
8. File Handling for Multimodal
# llm_app/views.py from django.core.files.storage import default_storage from django.core.files.base import ContentFile import base64 import mimetypes class MultimodalChatView(APIView): permission_classes = [IsAuthenticated] parser_classes = [MultiPartParser, FormParser] def post(self, request): serializer = MultimodalChatSerializer(data=request.data) serializer.is_valid(raise_exception=True) message = serializer.validated_data['message'] files = request.FILES.getlist('files') provider = serializer.validated_data.get('provider', 'openai') # Process files file_contents = [] for file in files[:3]: # Limit to 3 files if file.size > 10 * 1024 * 1024: # 10MB limit return Response( {'error': f'File {file.name} exceeds 10MB limit'}, status=status.HTTP_400_BAD_REQUEST ) # Save file temporarily file_path = default_storage.save( f'temp/{request.user.id}/{file.name}', ContentFile(file.read()) ) # Convert to base64 for API with default_storage.open(file_path, 'rb') as f: file_data = base64.b64encode(f.read()).decode('utf-8') mime_type = mimetypes.guess_type(file.name)[0] or 'application/octet-stream' file_contents.append({ 'type': 'image' if mime_type.startswith('image/') else 'file', 'data': file_data, 'mime_type': mime_type, 'name': file.name }) # Clean up temp file default_storage.delete(file_path) # Prepare messages for multimodal API messages = [{ 'role': 'user', 'content': [ {'type': 'text', 'text': message}, *[{ 'type': 'image_url', 'image_url': { 'url': f"data:{f['mime_type']};base64,{f['data']}" } } for f in file_contents if f['type'] == 'image'] ] }] # Call appropriate provider if provider == 'openai': response = self._call_openai_vision(messages) elif provider == 'anthropic': response = self._call_anthropic_vision(messages, file_contents) else: return Response( {'error': 'Provider does not support multimodal'}, status=status.HTTP_400_BAD_REQUEST ) return Response(response) def _call_openai_vision(self, messages): client = openai.OpenAI(api_key=settings.OPENAI_API_KEY) response = client.chat.completions.create( model="gpt-4-vision-preview", messages=messages, max_tokens=500 ) return { 'content': response.choices[0].message.content, 'model': 'gpt-4-vision-preview' }
9. Django Channels WebSocket
ASGI Configuration
# llm_project/asgi.py import os from django.core.asgi import get_asgi_application from channels.routing import ProtocolTypeRouter, URLRouter from channels.auth import AuthMiddlewareStack from channels.security.websocket import AllowedHostsOriginValidator import llm_app.routing os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'llm_project.settings') application = ProtocolTypeRouter({ "http": get_asgi_application(), "websocket": AllowedHostsOriginValidator( AuthMiddlewareStack( URLRouter( llm_app.routing.websocket_urlpatterns ) ) ), })
WebSocket Consumers
# llm_app/consumers.py from channels.generic.websocket import AsyncWebsocketConsumer import json from channels.db import database_sync_to_async from .models import Conversation, Message from .services import LLMService class ChatConsumer(AsyncWebsocketConsumer): async def connect(self): self.user = self.scope["user"] if not self.user.is_authenticated: await self.close() return self.conversation_id = self.scope["url_route"]["kwargs"]["conversation_id"] self.conversation_group = f"chat_{self.conversation_id}" # Join conversation group await self.channel_layer.group_add( self.conversation_group, self.channel_name ) await self.accept() await self.send(text_data=json.dumps({ "type": "connection_established", "conversation_id": self.conversation_id })) async def disconnect(self, close_code): # Leave conversation group await self.channel_layer.group_discard( self.conversation_group, self.channel_name ) async def receive(self, text_data): data = json.loads(text_data) message_type = data.get("type") if message_type == "chat_message": await self.handle_chat_message(data) elif message_type == "typing_indicator": await self.handle_typing_indicator(data) async def handle_chat_message(self, data): content = data["content"] provider = data.get("provider", "openai") # Save user message user_message = await self.save_message("user", content) # Send user message to group await self.channel_layer.group_send( self.conversation_group, { "type": "chat_message", "message": { "id": str(user_message.id), "role": "user", "content": content, "timestamp": user_message.created_at.isoformat() } } ) # Generate AI response service = LLMService() messages = await self.get_conversation_messages() # Send typing indicator await self.channel_layer.group_send( self.conversation_group, {"type": "typing_indicator", "is_typing": True} ) # Stream response full_response = "" async for token in service.generate_stream(messages, provider): full_response += token await self.send(text_data=json.dumps({ "type": "stream_token", "token": token })) # Save assistant message assistant_message = await self.save_message("assistant", full_response, provider) # Send complete message await self.channel_layer.group_send( self.conversation_group, { "type": "chat_message", "message": { "id": str(assistant_message.id), "role": "assistant", "content": full_response, "timestamp": assistant_message.created_at.isoformat() } } ) @database_sync_to_async def save_message(self, role, content, provider="openai"): return Message.objects.create( conversation_id=self.conversation_id, role=role, content=content, provider=provider ) @database_sync_to_async def get_conversation_messages(self): messages = Message.objects.filter( conversation_id=self.conversation_id ).order_by('created_at').values('role', 'content') return list(messages) # Handler for group messages async def chat_message(self, event): await self.send(text_data=json.dumps({ "type": "chat_message", "message": event["message"] })) async def typing_indicator(self, event): await self.send(text_data=json.dumps({ "type": "typing_indicator", "is_typing": event["is_typing"] })) # Routing # llm_app/routing.py from django.urls import re_path from . import consumers websocket_urlpatterns = [ re_path(r'ws/chat/(?P<conversation_id>[^/]+)/$', consumers.ChatConsumer.as_asgi()), ]
10. Production Deployment
Gunicorn Configuration
# gunicorn.conf.py import multiprocessing bind = "0.0.0.0:8000" workers = multiprocessing.cpu_count() * 2 + 1 worker_class = "sync" # Use 'uvicorn.workers.UvicornWorker' for async worker_connections = 1000 keepalive = 5 threads = 2 # Logging accesslog = "/var/log/gunicorn/access.log" errorlog = "/var/log/gunicorn/error.log" loglevel = "info" # Process naming proc_name = 'llm_django_app' # Server mechanics daemon = False pidfile = '/var/run/gunicorn.pid' user = 'www-data' group = 'www-data' tmp_upload_dir = None # SSL (if not using nginx) # keyfile = '/path/to/keyfile' # certfile = '/path/to/certfile' # Worker timeout (important for long LLM requests) timeout = 120 graceful_timeout = 30 # Restart workers after this many requests max_requests = 1000 max_requests_jitter = 50
Daphne for WebSockets
# Install Daphne pip install daphne # Run Daphne for WebSocket support daphne -b 0.0.0.0 -p 8001 llm_project.asgi:application
Supervisor Configuration
; /etc/supervisor/conf.d/llm_django.conf [program:llm_django_gunicorn] command=/path/to/venv/bin/gunicorn llm_project.wsgi:application -c /path/to/gunicorn.conf.py directory=/path/to/project user=www-data autostart=true autorestart=true redirect_stderr=true stdout_logfile=/var/log/supervisor/llm_django_gunicorn.log environment=PATH="/path/to/venv/bin",DJANGO_SETTINGS_MODULE="llm_project.settings" [program:llm_django_daphne] command=/path/to/venv/bin/daphne -b 0.0.0.0 -p 8001 llm_project.asgi:application directory=/path/to/project user=www-data autostart=true autorestart=true redirect_stderr=true stdout_logfile=/var/log/supervisor/llm_django_daphne.log [program:llm_django_celery] command=/path/to/venv/bin/celery -A llm_project worker -l info directory=/path/to/project user=www-data numprocs=1 autostart=true autorestart=true startsecs=10 stopwaitsecs=600 stdout_logfile=/var/log/supervisor/llm_django_celery.log [program:llm_django_celery_beat] command=/path/to/venv/bin/celery -A llm_project beat -l info directory=/path/to/project user=www-data numprocs=1 autostart=true autorestart=true stdout_logfile=/var/log/supervisor/llm_django_celery_beat.log
Nginx Configuration
# /etc/nginx/sites-available/llm_django upstream django_app { server localhost:8000; } upstream websocket_app { server localhost:8001; } server { listen 80; server_name api.example.com; return 301 https://$server_name$request_uri; } server { listen 443 ssl http2; server_name api.example.com; ssl_certificate /etc/letsencrypt/live/api.example.com/fullchain.pem; ssl_certificate_key /etc/letsencrypt/live/api.example.com/privkey.pem; client_max_body_size 10M; location /static/ { alias /path/to/project/staticfiles/; expires 30d; } location /media/ { alias /path/to/project/media/; expires 30d; } location /ws/ { proxy_pass http://websocket_app; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; } location / { proxy_pass http://django_app; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; # Timeouts for long LLM requests proxy_connect_timeout 300s; proxy_send_timeout 300s; proxy_read_timeout 300s; } }
Docker Deployment
# Dockerfile FROM python:3.11-slim # Install system dependencies RUN apt-get update && apt-get install -y gcc postgresql-client && rm -rf /var/lib/apt/lists/* # Set work directory WORKDIR /app # Install Python dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Copy project COPY . . # Collect static files RUN python manage.py collectstatic --noinput # Run migrations RUN python manage.py migrate # Create user RUN useradd -m -u 1000 django && chown -R django:django /app USER django # Expose port EXPOSE 8000 # Run gunicorn CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "4", "llm_project.wsgi:application"]
Docker Compose
# docker-compose.yml version: '3.8' services: db: image: postgres:15 environment: POSTGRES_DB: llm_db POSTGRES_USER: llm_user POSTGRES_PASSWORD: secure_password volumes: - postgres_data:/var/lib/postgresql/data redis: image: redis:7-alpine command: redis-server --appendonly yes volumes: - redis_data:/data web: build: . command: gunicorn llm_project.wsgi:application --bind 0.0.0.0:8000 volumes: - .:/app - static_volume:/app/staticfiles - media_volume:/app/media ports: - "8000:8000" environment: - DJANGO_SETTINGS_MODULE=llm_project.settings - DATABASE_URL=postgresql://llm_user:secure_password@db:5432/llm_db - REDIS_URL=redis://redis:6379 depends_on: - db - redis daphne: build: . command: daphne -b 0.0.0.0 -p 8001 llm_project.asgi:application volumes: - .:/app ports: - "8001:8001" environment: - DJANGO_SETTINGS_MODULE=llm_project.settings - DATABASE_URL=postgresql://llm_user:secure_password@db:5432/llm_db - REDIS_URL=redis://redis:6379 depends_on: - db - redis celery: build: . command: celery -A llm_project worker -l info volumes: - .:/app environment: - DJANGO_SETTINGS_MODULE=llm_project.settings - DATABASE_URL=postgresql://llm_user:secure_password@db:5432/llm_db - REDIS_URL=redis://redis:6379 depends_on: - db - redis celery-beat: build: . command: celery -A llm_project beat -l info volumes: - .:/app environment: - DJANGO_SETTINGS_MODULE=llm_project.settings - DATABASE_URL=postgresql://llm_user:secure_password@db:5432/llm_db - REDIS_URL=redis://redis:6379 depends_on: - db - redis volumes: postgres_data: redis_data: static_volume: media_volume:
✓ Production Checklist
- ☐ Set DEBUG=False in production
- ☐ Configure ALLOWED_HOSTS properly
- ☐ Use environment variables for secrets
- ☐ Set up SSL certificates
- ☐ Configure database connection pooling
- ☐ Set up monitoring (Sentry, New Relic)
- ☐ Configure log aggregation
- ☐ Set up backup strategy
- ☐ Implement health check endpoints
- ☐ Configure auto-scaling policies
Testing Strategies
# llm_app/tests.py from django.test import TestCase, TransactionTestCase from django.contrib.auth.models import User from rest_framework.test import APITestCase from unittest.mock import patch, MagicMock from channels.testing import WebsocketCommunicator from .models import Conversation, Message from .consumers import ChatConsumer class LLMAPITestCase(APITestCase): def setUp(self): self.user = User.objects.create_user( username='testuser', password='testpass123' ) self.client.force_authenticate(user=self.user) @patch('llm_app.services.LLMService.generate_completion') def test_chat_completion(self, mock_generate): mock_generate.return_value = { 'content': 'Test response', 'tokens_used': 50, 'model': 'gpt-3.5-turbo' } response = self.client.post('/api/chat/', { 'message': 'Hello, AI!', 'provider': 'openai' }) self.assertEqual(response.status_code, 200) self.assertIn('conversation_id', response.data) self.assertEqual( response.data['message']['content'], 'Test response' ) def test_rate_limiting(self): # Make requests up to the limit for i in range(100): response = self.client.post('/api/chat/', { 'message': f'Test {i}' }) if response.status_code == 429: break # Verify rate limit is enforced self.assertEqual(response.status_code, 429) class WebSocketTestCase(TransactionTestCase): async def test_chat_websocket(self): # Create test user and conversation user = await sync_to_async(User.objects.create_user)( username='wstest', password='testpass' ) conversation = await sync_to_async(Conversation.objects.create)( user=user ) # Create WebSocket communicator communicator = WebsocketCommunicator( ChatConsumer.as_asgi(), f"/ws/chat/{conversation.id}/" ) communicator.scope['user'] = user # Connect connected, _ = await communicator.connect() self.assertTrue(connected) # Send message await communicator.send_json_to({ 'type': 'chat_message', 'content': 'Test message', 'provider': 'openai' }) # Receive response response = await communicator.receive_json_from() self.assertEqual(response['type'], 'chat_message') # Disconnect await communicator.disconnect()
References & Citations
Start Building with Django
Create powerful Django applications with integrated LLM capabilities using our unified API gateway.
References
- [1] AWS. "Lambda Documentation" (2024)
- [2] Vercel. "Streaming Responses" (2024)
- [3] GitHub. "OpenAI Node.js Library" (2024)