Back to Integrations

Django + LLM APIs Complete Integration Guide

Master the integration of LLM APIs with Django applications. This comprehensive guide covers Django REST Framework setup, async streaming, Celery background tasks, and production deployment strategies.

Quick Start

Set up a new Django project with LLM support:

django-admin startproject llm_project
cd llm_project
pip install django djangorestframework openai anthropic
pip install celery redis django-channels httpx

1. Project Setup & Configuration

Settings Configuration

# settings.py
import os
from pathlib import Path

# Build paths
BASE_DIR = Path(__file__).resolve().parent.parent

# Security
SECRET_KEY = os.environ.get('DJANGO_SECRET_KEY')
DEBUG = os.environ.get('DEBUG', 'False') == 'True'
ALLOWED_HOSTS = os.environ.get('ALLOWED_HOSTS', '').split(',')

# Application definition
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'rest_framework',
    'rest_framework.authtoken',
    'channels',
    'llm_app',  # Your app
]

MIDDLEWARE = [
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

# LLM API Configuration
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')
ANTHROPIC_API_KEY = os.environ.get('ANTHROPIC_API_KEY')
GOOGLE_API_KEY = os.environ.get('GOOGLE_API_KEY')

# Django REST Framework
REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': [
        'rest_framework.authentication.TokenAuthentication',
        'rest_framework.authentication.SessionAuthentication',
    ],
    'DEFAULT_PERMISSION_CLASSES': [
        'rest_framework.permissions.IsAuthenticated',
    ],
    'DEFAULT_THROTTLE_CLASSES': [
        'rest_framework.throttling.AnonRateThrottle',
        'rest_framework.throttling.UserRateThrottle'
    ],
    'DEFAULT_THROTTLE_RATES': {
        'anon': '10/hour',
        'user': '100/hour'
    }
}

# Celery Configuration
CELERY_BROKER_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
CELERY_RESULT_BACKEND = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']

# Channels Configuration
ASGI_APPLICATION = 'llm_project.asgi.application'
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            'hosts': [(os.environ.get('REDIS_URL', 'redis://localhost:6379'))],
        },
    },
}

# Cache Configuration
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.redis.RedisCache',
        'LOCATION': os.environ.get('REDIS_URL', 'redis://localhost:6379'),
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.DefaultClient',
        },
        'KEY_PREFIX': 'llm_cache',
        'TIMEOUT': 3600,  # 1 hour default
    }
}

Project Structure

llm_project/
├── llm_project/
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   ├── asgi.py
│   ├── wsgi.py
│   └── celery.py
├── llm_app/
│   ├── models.py          # Prompt templates, conversations
│   ├── serializers.py     # DRF serializers
│   ├── views.py           # API views
│   ├── tasks.py           # Celery tasks
│   ├── consumers.py       # WebSocket consumers
│   ├── services.py        # LLM service layer
│   ├── admin.py           # Admin customization
│   └── tests.py           # Test suite
├── templates/
├── static/
├── requirements.txt
└── manage.py

Celery Configuration

# llm_project/celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'llm_project.settings')

app = Celery('llm_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

2. Django REST Framework LLM Endpoints

Models

# llm_app/models.py
from django.db import models
from django.contrib.auth.models import User
import uuid

class PromptTemplate(models.Model):
    name = models.CharField(max_length=100, unique=True)
    description = models.TextField(blank=True)
    template = models.TextField(help_text="Use {variables} for placeholders")
    system_prompt = models.TextField(blank=True)
    created_by = models.ForeignKey(User, on_delete=models.CASCADE)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    is_active = models.BooleanField(default=True)
    
    class Meta:
        ordering = ['-created_at']
    
    def __str__(self):
        return self.name

class Conversation(models.Model):
    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    title = models.CharField(max_length=200, blank=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    
    class Meta:
        ordering = ['-updated_at']

class Message(models.Model):
    ROLE_CHOICES = [
        ('user', 'User'),
        ('assistant', 'Assistant'),
        ('system', 'System'),
    ]
    
    conversation = models.ForeignKey(Conversation, on_delete=models.CASCADE, related_name='messages')
    role = models.CharField(max_length=10, choices=ROLE_CHOICES)
    content = models.TextField()
    tokens_used = models.IntegerField(default=0)
    provider = models.CharField(max_length=50, default='openai')
    model = models.CharField(max_length=100, default='gpt-3.5-turbo')
    created_at = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        ordering = ['created_at']

class APIUsage(models.Model):
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    endpoint = models.CharField(max_length=100)
    tokens_used = models.IntegerField(default=0)
    cost = models.DecimalField(max_digits=10, decimal_places=4, default=0)
    provider = models.CharField(max_length=50)
    model = models.CharField(max_length=100)
    created_at = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        ordering = ['-created_at']
        indexes = [
            models.Index(fields=['user', 'created_at']),
        ]

Serializers

# llm_app/serializers.py
from rest_framework import serializers
from .models import PromptTemplate, Conversation, Message

class PromptTemplateSerializer(serializers.ModelSerializer):
    class Meta:
        model = PromptTemplate
        fields = ['id', 'name', 'description', 'template', 'system_prompt', 
                  'created_at', 'updated_at', 'is_active']
        read_only_fields = ['created_at', 'updated_at']

class MessageSerializer(serializers.ModelSerializer):
    class Meta:
        model = Message
        fields = ['id', 'role', 'content', 'tokens_used', 'provider', 
                  'model', 'created_at']
        read_only_fields = ['tokens_used', 'created_at']

class ConversationSerializer(serializers.ModelSerializer):
    messages = MessageSerializer(many=True, read_only=True)
    message_count = serializers.IntegerField(source='messages.count', read_only=True)
    
    class Meta:
        model = Conversation
        fields = ['id', 'title', 'created_at', 'updated_at', 
                  'messages', 'message_count']
        read_only_fields = ['created_at', 'updated_at']

class ChatRequestSerializer(serializers.Serializer):
    message = serializers.CharField(max_length=4000)
    conversation_id = serializers.UUIDField(required=False)
    provider = serializers.ChoiceField(
        choices=['openai', 'anthropic', 'google'],
        default='openai'
    )
    model = serializers.CharField(required=False)
    temperature = serializers.FloatField(min_value=0, max_value=2, default=0.7)
    max_tokens = serializers.IntegerField(min_value=1, max_value=4000, default=1000)
    stream = serializers.BooleanField(default=False)

LLM Service Layer

# llm_app/services.py
import openai
import anthropic
from django.conf import settings
from typing import AsyncGenerator, Dict, Any
import httpx
import json

class LLMService:
    def __init__(self):
        self.openai_client = openai.OpenAI(api_key=settings.OPENAI_API_KEY)
        self.anthropic_client = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY)
    
    def get_provider_client(self, provider: str):
        if provider == 'openai':
            return self.openai_client
        elif provider == 'anthropic':
            return self.anthropic_client
        else:
            raise ValueError(f"Unsupported provider: {provider}")
    
    def generate_completion(self, messages: list, provider: str = 'openai', **kwargs) -> Dict[str, Any]:
        if provider == 'openai':
            response = self.openai_client.chat.completions.create(
                model=kwargs.get('model', 'gpt-3.5-turbo'),
                messages=messages,
                temperature=kwargs.get('temperature', 0.7),
                max_tokens=kwargs.get('max_tokens', 1000),
            )
            return {
                'content': response.choices[0].message.content,
                'tokens_used': response.usage.total_tokens,
                'model': response.model,
            }
        
        elif provider == 'anthropic':
            response = self.anthropic_client.messages.create(
                model=kwargs.get('model', 'claude-3-sonnet-20240229'),
                messages=messages,
                max_tokens=kwargs.get('max_tokens', 1000),
            )
            return {
                'content': response.content[0].text,
                'tokens_used': response.usage.input_tokens + response.usage.output_tokens,
                'model': response.model,
            }
    
    async def generate_stream(self, messages: list, provider: str = 'openai', **kwargs) -> AsyncGenerator[str, None]:
        if provider == 'openai':
            async with httpx.AsyncClient() as client:
                headers = {
                    "Authorization": f"Bearer {settings.OPENAI_API_KEY}",
                    "Content-Type": "application/json",
                }
                data = {
                    "model": kwargs.get('model', 'gpt-3.5-turbo'),
                    "messages": messages,
                    "temperature": kwargs.get('temperature', 0.7),
                    "max_tokens": kwargs.get('max_tokens', 1000),
                    "stream": True,
                }
                
                async with client.stream(
                    "POST",
                    "https://api.openai.com/v1/chat/completions",
                    json=data,
                    headers=headers,
                    timeout=60.0,
                ) as response:
                    async for line in response.aiter_lines():
                        if line.startswith("data: "):
                            data = line[6:]
                            if data == "[DONE]":
                                break
                            try:
                                chunk = json.loads(data)
                                content = chunk["choices"][0]["delta"].get("content", "")
                                if content:
                                    yield content
                            except json.JSONDecodeError:
                                continue

API Views

# llm_app/views.py
from rest_framework import status, generics
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from django.http import StreamingHttpResponse
from django.views.decorators.csrf import csrf_exempt
from asgiref.sync import sync_to_async
import asyncio
from .models import Conversation, Message, APIUsage
from .serializers import (
    ChatRequestSerializer, ConversationSerializer, 
    MessageSerializer, PromptTemplateSerializer
)
from .services import LLMService
from .tasks import generate_completion_task

class ChatCompletionView(APIView):
    permission_classes = [IsAuthenticated]
    
    def post(self, request):
        serializer = ChatRequestSerializer(data=request.data)
        serializer.is_valid(raise_exception=True)
        data = serializer.validated_data
        
        # Get or create conversation
        conversation_id = data.get('conversation_id')
        if conversation_id:
            try:
                conversation = Conversation.objects.get(
                    id=conversation_id, 
                    user=request.user
                )
            except Conversation.DoesNotExist:
                return Response(
                    {'error': 'Conversation not found'}, 
                    status=status.HTTP_404_NOT_FOUND
                )
        else:
            conversation = Conversation.objects.create(user=request.user)
        
        # Add user message
        user_message = Message.objects.create(
            conversation=conversation,
            role='user',
            content=data['message']
        )
        
        # Get conversation context
        messages = list(conversation.messages.values('role', 'content'))
        
        # Generate response
        if data.get('stream'):
            # For streaming, return task ID
            task = generate_completion_task.delay(
                messages=messages,
                provider=data['provider'],
                model=data.get('model'),
                temperature=data['temperature'],
                max_tokens=data['max_tokens'],
                conversation_id=str(conversation.id),
                user_id=request.user.id
            )
            return Response({
                'conversation_id': conversation.id,
                'task_id': task.id,
                'stream_url': f'/api/stream/{task.id}/'
            })
        else:
            # Synchronous generation
            service = LLMService()
            result = service.generate_completion(
                messages=messages,
                provider=data['provider'],
                model=data.get('model'),
                temperature=data['temperature'],
                max_tokens=data['max_tokens']
            )
            
            # Save assistant message
            assistant_message = Message.objects.create(
                conversation=conversation,
                role='assistant',
                content=result['content'],
                tokens_used=result['tokens_used'],
                provider=data['provider'],
                model=result['model']
            )
            
            # Track usage
            APIUsage.objects.create(
                user=request.user,
                endpoint='chat_completion',
                tokens_used=result['tokens_used'],
                provider=data['provider'],
                model=result['model']
            )
            
            return Response({
                'conversation_id': conversation.id,
                'message': MessageSerializer(assistant_message).data
            })

class ConversationListView(generics.ListAPIView):
    serializer_class = ConversationSerializer
    permission_classes = [IsAuthenticated]
    
    def get_queryset(self):
        return Conversation.objects.filter(user=self.request.user)

class ConversationDetailView(generics.RetrieveDestroyAPIView):
    serializer_class = ConversationSerializer
    permission_classes = [IsAuthenticated]
    
    def get_queryset(self):
        return Conversation.objects.filter(user=self.request.user)

3. Async Views for Streaming

# llm_app/views.py (async views)
from django.http import StreamingHttpResponse
import asyncio
import json

async def stream_chat_view(request, task_id):
    """Async view for streaming LLM responses"""
    
    async def event_stream():
        service = LLMService()
        
        # Get task result or generate directly
        messages = json.loads(request.GET.get('messages', '[]'))
        provider = request.GET.get('provider', 'openai')
        
        yield "data: {"event": "connected"}

"
        
        try:
            async for token in service.generate_stream(
                messages=messages,
                provider=provider
            ):
                data = json.dumps({"token": token})
                yield f"data: {data}

"
                await asyncio.sleep(0.01)  # Small delay to prevent overwhelming
            
            yield "data: {"event": "completed"}

"
        except Exception as e:
            error_data = json.dumps({"error": str(e)})
            yield f"data: {error_data}

"
    
    response = StreamingHttpResponse(
        event_stream(),
        content_type='text/event-stream'
    )
    response['Cache-Control'] = 'no-cache'
    response['X-Accel-Buffering'] = 'no'
    return response

# URL configuration
from django.urls import path

urlpatterns = [
    path('api/stream/<str:task_id>/', stream_chat_view, name='stream-chat'),
]

Performance Note

For production streaming, use ASGI servers like Daphne or Uvicorn instead of WSGI servers to properly handle async views and streaming responses.

4. Celery Background Tasks

Celery Tasks

# llm_app/tasks.py
from celery import shared_task
from celery.result import AsyncResult
from django.core.cache import cache
from .models import Conversation, Message, APIUsage
from .services import LLMService
import logging

logger = logging.getLogger(__name__)

@shared_task(bind=True, max_retries=3)
def generate_completion_task(self, messages, provider, model, temperature, 
                           max_tokens, conversation_id, user_id):
    """Background task for LLM completion generation"""
    try:
        # Update task progress
        self.update_state(state='PROGRESS', meta={'status': 'Generating response...'})
        
        service = LLMService()
        result = service.generate_completion(
            messages=messages,
            provider=provider,
            model=model,
            temperature=temperature,
            max_tokens=max_tokens
        )
        
        # Save to database
        conversation = Conversation.objects.get(id=conversation_id)
        assistant_message = Message.objects.create(
            conversation=conversation,
            role='assistant',
            content=result['content'],
            tokens_used=result['tokens_used'],
            provider=provider,
            model=result['model']
        )
        
        # Track usage
        APIUsage.objects.create(
            user_id=user_id,
            endpoint='chat_completion_async',
            tokens_used=result['tokens_used'],
            provider=provider,
            model=result['model']
        )
        
        # Cache result for quick retrieval
        cache_key = f'task_result:{self.request.id}'
        cache.set(cache_key, result, timeout=3600)
        
        return {
            'status': 'completed',
            'message_id': assistant_message.id,
            'content': result['content'],
            'tokens_used': result['tokens_used']
        }
        
    except Exception as exc:
        logger.error(f"Task {self.request.id} failed: {exc}")
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

@shared_task
def batch_generate_summaries(conversation_ids):
    """Batch process multiple conversations for summarization"""
    service = LLMService()
    results = []
    
    for conv_id in conversation_ids:
        try:
            conversation = Conversation.objects.get(id=conv_id)
            messages = list(conversation.messages.values('role', 'content'))
            
            # Add summarization prompt
            messages.append({
                'role': 'user',
                'content': 'Please provide a brief summary of this conversation.'
            })
            
            result = service.generate_completion(
                messages=messages,
                provider='openai',
                model='gpt-3.5-turbo',
                max_tokens=150
            )
            
            # Update conversation title if empty
            if not conversation.title:
                conversation.title = result['content'][:100]
                conversation.save()
            
            results.append({
                'conversation_id': conv_id,
                'summary': result['content']
            })
            
        except Exception as e:
            logger.error(f"Failed to summarize conversation {conv_id}: {e}")
            results.append({
                'conversation_id': conv_id,
                'error': str(e)
            })
    
    return results

@shared_task
def cleanup_old_conversations():
    """Periodic task to clean up old conversations"""
    from datetime import timedelta
    from django.utils import timezone
    
    cutoff_date = timezone.now() - timedelta(days=30)
    deleted_count = Conversation.objects.filter(
        updated_at__lt=cutoff_date,
        messages__count=0
    ).delete()[0]
    
    logger.info(f"Cleaned up {deleted_count} old empty conversations")
    return deleted_count

Celery Beat Schedule

# settings.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'cleanup-old-conversations': {
        'task': 'llm_app.tasks.cleanup_old_conversations',
        'schedule': crontab(hour=2, minute=0),  # Run daily at 2 AM
    },
    'generate-usage-reports': {
        'task': 'llm_app.tasks.generate_usage_reports',
        'schedule': crontab(hour=0, minute=0, day_of_week=1),  # Weekly on Monday
    },
}

5. Admin Interface for Prompts

# llm_app/admin.py
from django.contrib import admin
from django.db.models import Count, Sum
from .models import PromptTemplate, Conversation, Message, APIUsage

@admin.register(PromptTemplate)
class PromptTemplateAdmin(admin.ModelAdmin):
    list_display = ['name', 'created_by', 'is_active', 'created_at']
    list_filter = ['is_active', 'created_at', 'created_by']
    search_fields = ['name', 'description', 'template']
    readonly_fields = ['created_at', 'updated_at']
    
    fieldsets = (
        ('Basic Information', {
            'fields': ('name', 'description', 'is_active')
        }),
        ('Prompt Configuration', {
            'fields': ('template', 'system_prompt'),
            'classes': ('wide',)
        }),
        ('Metadata', {
            'fields': ('created_by', 'created_at', 'updated_at'),
            'classes': ('collapse',)
        }),
    )
    
    def save_model(self, request, obj, form, change):
        if not change:
            obj.created_by = request.user
        super().save_model(request, obj, form, change)

@admin.register(Conversation)
class ConversationAdmin(admin.ModelAdmin):
    list_display = ['id', 'user', 'title', 'message_count', 'updated_at']
    list_filter = ['created_at', 'updated_at']
    search_fields = ['title', 'user__username', 'user__email']
    readonly_fields = ['id', 'created_at', 'updated_at']
    
    def message_count(self, obj):
        return obj.messages.count()
    message_count.short_description = 'Messages'
    
    def get_queryset(self, request):
        return super().get_queryset(request).annotate(
            message_count=Count('messages')
        )

class MessageInline(admin.TabularInline):
    model = Message
    extra = 0
    readonly_fields = ['created_at', 'tokens_used']
    fields = ['role', 'content', 'provider', 'model', 'tokens_used', 'created_at']

@admin.register(APIUsage)
class APIUsageAdmin(admin.ModelAdmin):
    list_display = ['user', 'endpoint', 'provider', 'model', 'tokens_used', 'cost', 'created_at']
    list_filter = ['provider', 'model', 'endpoint', 'created_at']
    date_hierarchy = 'created_at'
    
    def get_queryset(self, request):
        qs = super().get_queryset(request)
        return qs.select_related('user')
    
    def changelist_view(self, request, extra_context=None):
        # Add usage statistics to admin
        qs = self.get_queryset(request)
        total_tokens = qs.aggregate(Sum('tokens_used'))['tokens_used__sum'] or 0
        total_cost = qs.aggregate(Sum('cost'))['cost__sum'] or 0
        
        extra_context = extra_context or {}
        extra_context.update({
            'total_tokens': total_tokens,
            'total_cost': total_cost,
        })
        
        return super().changelist_view(request, extra_context=extra_context)

6. Authentication & API Keys

Custom Authentication

# llm_app/authentication.py
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from django.contrib.auth.models import User
import hashlib

class APIKeyAuthentication(BaseAuthentication):
    def authenticate(self, request):
        api_key = request.META.get('HTTP_X_API_KEY')
        if not api_key:
            return None
        
        try:
            # Hash the API key for secure comparison
            key_hash = hashlib.sha256(api_key.encode()).hexdigest()
            user = User.objects.get(profile__api_key_hash=key_hash)
            return (user, api_key)
        except User.DoesNotExist:
            raise AuthenticationFailed('Invalid API key')

# Models for API key storage
from django.db import models
import secrets

class UserProfile(models.Model):
    user = models.OneToOneField(User, on_delete=models.CASCADE)
    api_key_hash = models.CharField(max_length=64, unique=True, blank=True)
    api_key_created_at = models.DateTimeField(null=True, blank=True)
    
    def generate_api_key(self):
        """Generate a new API key for the user"""
        api_key = secrets.token_urlsafe(32)
        self.api_key_hash = hashlib.sha256(api_key.encode()).hexdigest()
        self.api_key_created_at = timezone.now()
        self.save()
        return api_key  # Return once for user to save

# Views for API key management
class GenerateAPIKeyView(APIView):
    permission_classes = [IsAuthenticated]
    
    def post(self, request):
        profile, created = UserProfile.objects.get_or_create(user=request.user)
        api_key = profile.generate_api_key()
        
        return Response({
            'api_key': api_key,
            'message': 'Save this key securely. It cannot be retrieved again.'
        })

# Rate limiting per API key
from rest_framework.throttling import BaseThrottle

class APIKeyRateThrottle(BaseThrottle):
    scope = 'api_key'
    
    def get_cache_key(self, request, view):
        if hasattr(request, 'auth') and request.auth:
            # Use API key for rate limiting
            return self.cache_format % {
                'scope': self.scope,
                'ident': hashlib.md5(request.auth.encode()).hexdigest()
            }
        return None

7. Response Caching

# llm_app/decorators.py
from django.core.cache import cache
from functools import wraps
import hashlib
import json

def cache_llm_response(timeout=3600):
    """Decorator to cache LLM responses"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Create cache key from function arguments
            cache_key = f"llm:{func.__name__}:{hashlib.md5(
                json.dumps({
                    'args': str(args),
                    'kwargs': str(kwargs)
                }, sort_keys=True).encode()
            ).hexdigest()}"
            
            # Try to get from cache
            cached_result = cache.get(cache_key)
            if cached_result is not None:
                return cached_result
            
            # Generate result
            result = func(*args, **kwargs)
            
            # Cache the result
            cache.set(cache_key, result, timeout=timeout)
            
            return result
        return wrapper
    return decorator

# Usage in service
class CachedLLMService(LLMService):
    @cache_llm_response(timeout=3600)
    def generate_completion_cached(self, messages, provider='openai', **kwargs):
        # For caching, only cache non-personalized prompts
        if self._is_cacheable(messages):
            return self.generate_completion(messages, provider, **kwargs)
        return self.generate_completion(messages, provider, **kwargs)
    
    def _is_cacheable(self, messages):
        # Don't cache if messages contain user-specific data
        for msg in messages:
            if any(keyword in msg.get('content', '').lower() 
                   for keyword in ['my', 'i', 'me', 'personal']):
                return False
        return True

# Django cache middleware for views
from django.views.decorators.cache import cache_page

class CachedPromptTemplateView(APIView):
    @cache_page(60 * 15)  # Cache for 15 minutes
    def get(self, request, template_id):
        template = PromptTemplate.objects.get(id=template_id)
        return Response(PromptTemplateSerializer(template).data)

8. File Handling for Multimodal

# llm_app/views.py
from django.core.files.storage import default_storage
from django.core.files.base import ContentFile
import base64
import mimetypes

class MultimodalChatView(APIView):
    permission_classes = [IsAuthenticated]
    parser_classes = [MultiPartParser, FormParser]
    
    def post(self, request):
        serializer = MultimodalChatSerializer(data=request.data)
        serializer.is_valid(raise_exception=True)
        
        message = serializer.validated_data['message']
        files = request.FILES.getlist('files')
        provider = serializer.validated_data.get('provider', 'openai')
        
        # Process files
        file_contents = []
        for file in files[:3]:  # Limit to 3 files
            if file.size > 10 * 1024 * 1024:  # 10MB limit
                return Response(
                    {'error': f'File {file.name} exceeds 10MB limit'},
                    status=status.HTTP_400_BAD_REQUEST
                )
            
            # Save file temporarily
            file_path = default_storage.save(
                f'temp/{request.user.id}/{file.name}',
                ContentFile(file.read())
            )
            
            # Convert to base64 for API
            with default_storage.open(file_path, 'rb') as f:
                file_data = base64.b64encode(f.read()).decode('utf-8')
                mime_type = mimetypes.guess_type(file.name)[0] or 'application/octet-stream'
                
                file_contents.append({
                    'type': 'image' if mime_type.startswith('image/') else 'file',
                    'data': file_data,
                    'mime_type': mime_type,
                    'name': file.name
                })
            
            # Clean up temp file
            default_storage.delete(file_path)
        
        # Prepare messages for multimodal API
        messages = [{
            'role': 'user',
            'content': [
                {'type': 'text', 'text': message},
                *[{
                    'type': 'image_url',
                    'image_url': {
                        'url': f"data:{f['mime_type']};base64,{f['data']}"
                    }
                } for f in file_contents if f['type'] == 'image']
            ]
        }]
        
        # Call appropriate provider
        if provider == 'openai':
            response = self._call_openai_vision(messages)
        elif provider == 'anthropic':
            response = self._call_anthropic_vision(messages, file_contents)
        else:
            return Response(
                {'error': 'Provider does not support multimodal'},
                status=status.HTTP_400_BAD_REQUEST
            )
        
        return Response(response)
    
    def _call_openai_vision(self, messages):
        client = openai.OpenAI(api_key=settings.OPENAI_API_KEY)
        response = client.chat.completions.create(
            model="gpt-4-vision-preview",
            messages=messages,
            max_tokens=500
        )
        return {
            'content': response.choices[0].message.content,
            'model': 'gpt-4-vision-preview'
        }

9. Django Channels WebSocket

ASGI Configuration

# llm_project/asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from channels.security.websocket import AllowedHostsOriginValidator
import llm_app.routing

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'llm_project.settings')

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AllowedHostsOriginValidator(
        AuthMiddlewareStack(
            URLRouter(
                llm_app.routing.websocket_urlpatterns
            )
        )
    ),
})

WebSocket Consumers

# llm_app/consumers.py
from channels.generic.websocket import AsyncWebsocketConsumer
import json
from channels.db import database_sync_to_async
from .models import Conversation, Message
from .services import LLMService

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.user = self.scope["user"]
        if not self.user.is_authenticated:
            await self.close()
            return
        
        self.conversation_id = self.scope["url_route"]["kwargs"]["conversation_id"]
        self.conversation_group = f"chat_{self.conversation_id}"
        
        # Join conversation group
        await self.channel_layer.group_add(
            self.conversation_group,
            self.channel_name
        )
        
        await self.accept()
        await self.send(text_data=json.dumps({
            "type": "connection_established",
            "conversation_id": self.conversation_id
        }))
    
    async def disconnect(self, close_code):
        # Leave conversation group
        await self.channel_layer.group_discard(
            self.conversation_group,
            self.channel_name
        )
    
    async def receive(self, text_data):
        data = json.loads(text_data)
        message_type = data.get("type")
        
        if message_type == "chat_message":
            await self.handle_chat_message(data)
        elif message_type == "typing_indicator":
            await self.handle_typing_indicator(data)
    
    async def handle_chat_message(self, data):
        content = data["content"]
        provider = data.get("provider", "openai")
        
        # Save user message
        user_message = await self.save_message("user", content)
        
        # Send user message to group
        await self.channel_layer.group_send(
            self.conversation_group,
            {
                "type": "chat_message",
                "message": {
                    "id": str(user_message.id),
                    "role": "user",
                    "content": content,
                    "timestamp": user_message.created_at.isoformat()
                }
            }
        )
        
        # Generate AI response
        service = LLMService()
        messages = await self.get_conversation_messages()
        
        # Send typing indicator
        await self.channel_layer.group_send(
            self.conversation_group,
            {"type": "typing_indicator", "is_typing": True}
        )
        
        # Stream response
        full_response = ""
        async for token in service.generate_stream(messages, provider):
            full_response += token
            await self.send(text_data=json.dumps({
                "type": "stream_token",
                "token": token
            }))
        
        # Save assistant message
        assistant_message = await self.save_message("assistant", full_response, provider)
        
        # Send complete message
        await self.channel_layer.group_send(
            self.conversation_group,
            {
                "type": "chat_message",
                "message": {
                    "id": str(assistant_message.id),
                    "role": "assistant",
                    "content": full_response,
                    "timestamp": assistant_message.created_at.isoformat()
                }
            }
        )
    
    @database_sync_to_async
    def save_message(self, role, content, provider="openai"):
        return Message.objects.create(
            conversation_id=self.conversation_id,
            role=role,
            content=content,
            provider=provider
        )
    
    @database_sync_to_async
    def get_conversation_messages(self):
        messages = Message.objects.filter(
            conversation_id=self.conversation_id
        ).order_by('created_at').values('role', 'content')
        return list(messages)
    
    # Handler for group messages
    async def chat_message(self, event):
        await self.send(text_data=json.dumps({
            "type": "chat_message",
            "message": event["message"]
        }))
    
    async def typing_indicator(self, event):
        await self.send(text_data=json.dumps({
            "type": "typing_indicator",
            "is_typing": event["is_typing"]
        }))

# Routing
# llm_app/routing.py
from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r'ws/chat/(?P<conversation_id>[^/]+)/$', consumers.ChatConsumer.as_asgi()),
]

10. Production Deployment

Gunicorn Configuration

# gunicorn.conf.py
import multiprocessing

bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "sync"  # Use 'uvicorn.workers.UvicornWorker' for async
worker_connections = 1000
keepalive = 5
threads = 2

# Logging
accesslog = "/var/log/gunicorn/access.log"
errorlog = "/var/log/gunicorn/error.log"
loglevel = "info"

# Process naming
proc_name = 'llm_django_app'

# Server mechanics
daemon = False
pidfile = '/var/run/gunicorn.pid'
user = 'www-data'
group = 'www-data'
tmp_upload_dir = None

# SSL (if not using nginx)
# keyfile = '/path/to/keyfile'
# certfile = '/path/to/certfile'

# Worker timeout (important for long LLM requests)
timeout = 120
graceful_timeout = 30

# Restart workers after this many requests
max_requests = 1000
max_requests_jitter = 50

Daphne for WebSockets

# Install Daphne
pip install daphne

# Run Daphne for WebSocket support
daphne -b 0.0.0.0 -p 8001 llm_project.asgi:application

Supervisor Configuration

; /etc/supervisor/conf.d/llm_django.conf

[program:llm_django_gunicorn]
command=/path/to/venv/bin/gunicorn llm_project.wsgi:application -c /path/to/gunicorn.conf.py
directory=/path/to/project
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/supervisor/llm_django_gunicorn.log
environment=PATH="/path/to/venv/bin",DJANGO_SETTINGS_MODULE="llm_project.settings"

[program:llm_django_daphne]
command=/path/to/venv/bin/daphne -b 0.0.0.0 -p 8001 llm_project.asgi:application
directory=/path/to/project
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/supervisor/llm_django_daphne.log

[program:llm_django_celery]
command=/path/to/venv/bin/celery -A llm_project worker -l info
directory=/path/to/project
user=www-data
numprocs=1
autostart=true
autorestart=true
startsecs=10
stopwaitsecs=600
stdout_logfile=/var/log/supervisor/llm_django_celery.log

[program:llm_django_celery_beat]
command=/path/to/venv/bin/celery -A llm_project beat -l info
directory=/path/to/project
user=www-data
numprocs=1
autostart=true
autorestart=true
stdout_logfile=/var/log/supervisor/llm_django_celery_beat.log

Nginx Configuration

# /etc/nginx/sites-available/llm_django

upstream django_app {
    server localhost:8000;
}

upstream websocket_app {
    server localhost:8001;
}

server {
    listen 80;
    server_name api.example.com;
    return 301 https://$server_name$request_uri;
}

server {
    listen 443 ssl http2;
    server_name api.example.com;
    
    ssl_certificate /etc/letsencrypt/live/api.example.com/fullchain.pem;
    ssl_certificate_key /etc/letsencrypt/live/api.example.com/privkey.pem;
    
    client_max_body_size 10M;
    
    location /static/ {
        alias /path/to/project/staticfiles/;
        expires 30d;
    }
    
    location /media/ {
        alias /path/to/project/media/;
        expires 30d;
    }
    
    location /ws/ {
        proxy_pass http://websocket_app;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }
    
    location / {
        proxy_pass http://django_app;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # Timeouts for long LLM requests
        proxy_connect_timeout 300s;
        proxy_send_timeout 300s;
        proxy_read_timeout 300s;
    }
}

Docker Deployment

# Dockerfile
FROM python:3.11-slim

# Install system dependencies
RUN apt-get update && apt-get install -y     gcc     postgresql-client     && rm -rf /var/lib/apt/lists/*

# Set work directory
WORKDIR /app

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy project
COPY . .

# Collect static files
RUN python manage.py collectstatic --noinput

# Run migrations
RUN python manage.py migrate

# Create user
RUN useradd -m -u 1000 django && chown -R django:django /app
USER django

# Expose port
EXPOSE 8000

# Run gunicorn
CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "4", "llm_project.wsgi:application"]

Docker Compose

# docker-compose.yml
version: '3.8'

services:
  db:
    image: postgres:15
    environment:
      POSTGRES_DB: llm_db
      POSTGRES_USER: llm_user
      POSTGRES_PASSWORD: secure_password
    volumes:
      - postgres_data:/var/lib/postgresql/data
  
  redis:
    image: redis:7-alpine
    command: redis-server --appendonly yes
    volumes:
      - redis_data:/data
  
  web:
    build: .
    command: gunicorn llm_project.wsgi:application --bind 0.0.0.0:8000
    volumes:
      - .:/app
      - static_volume:/app/staticfiles
      - media_volume:/app/media
    ports:
      - "8000:8000"
    environment:
      - DJANGO_SETTINGS_MODULE=llm_project.settings
      - DATABASE_URL=postgresql://llm_user:secure_password@db:5432/llm_db
      - REDIS_URL=redis://redis:6379
    depends_on:
      - db
      - redis
  
  daphne:
    build: .
    command: daphne -b 0.0.0.0 -p 8001 llm_project.asgi:application
    volumes:
      - .:/app
    ports:
      - "8001:8001"
    environment:
      - DJANGO_SETTINGS_MODULE=llm_project.settings
      - DATABASE_URL=postgresql://llm_user:secure_password@db:5432/llm_db
      - REDIS_URL=redis://redis:6379
    depends_on:
      - db
      - redis
  
  celery:
    build: .
    command: celery -A llm_project worker -l info
    volumes:
      - .:/app
    environment:
      - DJANGO_SETTINGS_MODULE=llm_project.settings
      - DATABASE_URL=postgresql://llm_user:secure_password@db:5432/llm_db
      - REDIS_URL=redis://redis:6379
    depends_on:
      - db
      - redis
  
  celery-beat:
    build: .
    command: celery -A llm_project beat -l info
    volumes:
      - .:/app
    environment:
      - DJANGO_SETTINGS_MODULE=llm_project.settings
      - DATABASE_URL=postgresql://llm_user:secure_password@db:5432/llm_db
      - REDIS_URL=redis://redis:6379
    depends_on:
      - db
      - redis

volumes:
  postgres_data:
  redis_data:
  static_volume:
  media_volume:

✓ Production Checklist

  • ☐ Set DEBUG=False in production
  • ☐ Configure ALLOWED_HOSTS properly
  • ☐ Use environment variables for secrets
  • ☐ Set up SSL certificates
  • ☐ Configure database connection pooling
  • ☐ Set up monitoring (Sentry, New Relic)
  • ☐ Configure log aggregation
  • ☐ Set up backup strategy
  • ☐ Implement health check endpoints
  • ☐ Configure auto-scaling policies

Testing Strategies

# llm_app/tests.py
from django.test import TestCase, TransactionTestCase
from django.contrib.auth.models import User
from rest_framework.test import APITestCase
from unittest.mock import patch, MagicMock
from channels.testing import WebsocketCommunicator
from .models import Conversation, Message
from .consumers import ChatConsumer

class LLMAPITestCase(APITestCase):
    def setUp(self):
        self.user = User.objects.create_user(
            username='testuser',
            password='testpass123'
        )
        self.client.force_authenticate(user=self.user)
    
    @patch('llm_app.services.LLMService.generate_completion')
    def test_chat_completion(self, mock_generate):
        mock_generate.return_value = {
            'content': 'Test response',
            'tokens_used': 50,
            'model': 'gpt-3.5-turbo'
        }
        
        response = self.client.post('/api/chat/', {
            'message': 'Hello, AI!',
            'provider': 'openai'
        })
        
        self.assertEqual(response.status_code, 200)
        self.assertIn('conversation_id', response.data)
        self.assertEqual(
            response.data['message']['content'], 
            'Test response'
        )
    
    def test_rate_limiting(self):
        # Make requests up to the limit
        for i in range(100):
            response = self.client.post('/api/chat/', {
                'message': f'Test {i}'
            })
            if response.status_code == 429:
                break
        
        # Verify rate limit is enforced
        self.assertEqual(response.status_code, 429)

class WebSocketTestCase(TransactionTestCase):
    async def test_chat_websocket(self):
        # Create test user and conversation
        user = await sync_to_async(User.objects.create_user)(
            username='wstest',
            password='testpass'
        )
        conversation = await sync_to_async(Conversation.objects.create)(
            user=user
        )
        
        # Create WebSocket communicator
        communicator = WebsocketCommunicator(
            ChatConsumer.as_asgi(),
            f"/ws/chat/{conversation.id}/"
        )
        communicator.scope['user'] = user
        
        # Connect
        connected, _ = await communicator.connect()
        self.assertTrue(connected)
        
        # Send message
        await communicator.send_json_to({
            'type': 'chat_message',
            'content': 'Test message',
            'provider': 'openai'
        })
        
        # Receive response
        response = await communicator.receive_json_from()
        self.assertEqual(response['type'], 'chat_message')
        
        # Disconnect
        await communicator.disconnect()

References & Citations

Start Building with Django

Create powerful Django applications with integrated LLM capabilities using our unified API gateway.

References
  1. [1] AWS. "Lambda Documentation" (2024)
  2. [2] Vercel. "Streaming Responses" (2024)
  3. [3] GitHub. "OpenAI Node.js Library" (2024)