Advanced
January 25, 202420 min read

Semantic Caching for LLM Applications: Complete Implementation Guide

Master semantic caching with vector embeddings to reduce LLM costs by 30-60% and improve response times by 10-30x using production-ready implementations.

Cost Reduction
30-60%
Response Time
<100ms
Cache Hit Rate
40-70%
Performance Gain
10-30x

Semantic vs Traditional Caching

Traditional caching uses exact key matching, which fails for natural language queries[1]. Semantic caching converts queries into vector embeddings and finds similar cached responses using cosine similarity or euclidean distance.

Comparison: Traditional vs Semantic Caching

Traditional Caching

Query: "What's the weather in NYC?" → Miss
Query: "What is the weather in New York?" → Miss
Query: "Tell me NYC weather" → Miss

Semantic Caching

Query: "What's the weather in NYC?" → Cached
Query: "What is the weather in New York?" → Hit (0.92 similarity)
Query: "Tell me NYC weather" → Hit (0.89 similarity)

Vector Embeddings and Similarity Search

The foundation of semantic caching is converting text into high-dimensional vectors that capture semantic meaning[2]. Here's how to generate embeddings and calculate similarity:

OpenAI Embeddings
Using text-embedding-ada-002 model
import openai
import numpy as np

# Initialize OpenAI client
client = openai.OpenAI(api_key="your-api-key")

def get_embedding(text):
    response = client.embeddings.create(
        input=text,
        model="text-embedding-ada-002"
    )
    return response.data[0].embedding

def cosine_similarity(vec1, vec2):
    return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

# Example usage
query = "What's the weather like in New York?"
embedding = get_embedding(query)

# Check similarity with cached queries
cached_query = "Tell me about New York weather"
cached_embedding = get_embedding(cached_query)
similarity = cosine_similarity(embedding, cached_embedding)
print(f"Similarity: {similarity:.2f}")  # Output: 0.89

Implementation with Vector Databases

Production semantic caching requires a vector database for efficient similarity search[3]. Here are implementations for the most popular options:

Redis with RediSearch

Redis Vector Similarity Search
High-performance in-memory vector database
import redis
import json
import numpy as np
from redis.commands.search.field import VectorField, TextField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.query import Query

class RedisSemanticCache:
    def __init__(self, redis_host='localhost', redis_port=6379):
        self.r = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
        self.index_name = "semantic_cache"
        self.similarity_threshold = 0.85
        self.create_index()
    
    def create_index(self):
        try:
            self.r.ft(self.index_name).create_index([
                TextField("prompt"),
                TextField("response"),
                VectorField("embedding", 
                    "FLAT", {
                        "TYPE": "FLOAT32",
                        "DIM": 1536,  # OpenAI ada-002 dimension
                        "DISTANCE_METRIC": "COSINE"
                    }
                )
            ])
        except:
            pass  # Index already exists
    
    def add_to_cache(self, prompt, response, embedding):
        key = f"cache:{hash(prompt)}"
        self.r.hset(key, mapping={
            "prompt": prompt,
            "response": response,
            "embedding": embedding.tobytes()
        })
    
    def search_similar(self, embedding, k=5):
        query = (
            Query(f"*=>[KNN {k} @embedding $vec AS similarity]")
            .sort_by("similarity", asc=False)
            .return_fields("prompt", "response", "similarity")
            .dialect(2)
        )
        
        results = self.r.ft(self.index_name).search(
            query, 
            query_params={"vec": embedding.tobytes()}
        )
        
        if results.docs and float(results.docs[0].similarity) > self.similarity_threshold:
            return {
                "prompt": results.docs[0].prompt,
                "response": results.docs[0].response,
                "similarity": float(results.docs[0].similarity)
            }
        return None

# Usage example
cache = RedisSemanticCache()
embedding = get_embedding("What's the weather in NYC?")
result = cache.search_similar(embedding)
if result:
    print(f"Cache hit! Similarity: {result['similarity']}")
    return result['response']

Pinecone Implementation

Pinecone Vector Database
Managed vector database with automatic scaling
import pinecone
import hashlib
from typing import Optional

class PineconeSemanticCache:
    def __init__(self, api_key: str, environment: str, index_name: str):
        pinecone.init(api_key=api_key, environment=environment)
        self.index = pinecone.Index(index_name)
        self.similarity_threshold = 0.85
        
    def add_to_cache(self, prompt: str, response: str, embedding: list):
        cache_id = hashlib.md5(prompt.encode()).hexdigest()
        self.index.upsert([(
            cache_id,
            embedding,
            {
                "prompt": prompt,
                "response": response,
                "timestamp": time.time()
            }
        )])
    
    def search_similar(self, embedding: list) -> Optional[dict]:
        results = self.index.query(
            vector=embedding,
            top_k=1,
            include_metadata=True
        )
        
        if results.matches and results.matches[0].score > self.similarity_threshold:
            match = results.matches[0]
            return {
                "prompt": match.metadata["prompt"],
                "response": match.metadata["response"],
                "similarity": match.score
            }
        return None
    
    def invalidate_old_entries(self, max_age_hours: int = 24):
        """Remove cache entries older than max_age_hours"""
        cutoff_time = time.time() - (max_age_hours * 3600)
        # Pinecone doesn't support metadata-based deletion directly
        # You'd need to maintain a separate index or scan all vectors
        pass

# Usage
cache = PineconeSemanticCache(
    api_key="your-api-key",
    environment="us-west1-gcp",
    index_name="semantic-cache"
)

# Check cache before calling LLM
embedding = get_embedding(user_query)
cached = cache.search_similar(embedding)
if cached:
    return cached['response']
else:
    # Call LLM and cache result
    llm_response = call_llm(user_query)
    cache.add_to_cache(user_query, llm_response, embedding)
    return llm_response

PostgreSQL with pgvector

PostgreSQL with pgvector Extension
Open-source relational database with vector support
import asyncpg
import numpy as np
from typing import Optional

class PgVectorCache:
    def __init__(self, connection_string: str):
        self.conn_string = connection_string
        self.similarity_threshold = 0.85
        
    async def init_db(self):
        conn = await asyncpg.connect(self.conn_string)
        await conn.execute('CREATE EXTENSION IF NOT EXISTS vector')
        await conn.execute('''
            CREATE TABLE IF NOT EXISTS semantic_cache (
                id SERIAL PRIMARY KEY,
                prompt TEXT,
                response TEXT,
                embedding vector(1536),
                created_at TIMESTAMP DEFAULT NOW()
            )
        ''')
        await conn.execute('''
            CREATE INDEX IF NOT EXISTS embedding_idx ON semantic_cache 
            USING ivfflat (embedding vector_cosine_ops)
            WITH (lists = 100)
        ''')
        await conn.close()
    
    async def add_to_cache(self, prompt: str, response: str, embedding: np.ndarray):
        conn = await asyncpg.connect(self.conn_string)
        await conn.execute('''
            INSERT INTO semantic_cache (prompt, response, embedding)
            VALUES ($1, $2, $3)
        ''', prompt, response, embedding.tolist())
        await conn.close()
    
    async def search_similar(self, embedding: np.ndarray) -> Optional[dict]:
        conn = await asyncpg.connect(self.conn_string)
        result = await conn.fetchrow('''
            SELECT prompt, response, 
                   1 - (embedding <=> $1::vector) as similarity
            FROM semantic_cache
            WHERE 1 - (embedding <=> $1::vector) > $2
            ORDER BY embedding <=> $1::vector
            LIMIT 1
        ''', embedding.tolist(), self.similarity_threshold)
        await conn.close()
        
        if result:
            return {
                "prompt": result['prompt'],
                "response": result['response'],
                "similarity": result['similarity']
            }
        return None

# Usage with async
import asyncio

async def main():
    cache = PgVectorCache("postgresql://user:pass@localhost/dbname")
    await cache.init_db()
    
    embedding = get_embedding("What's the weather?")
    result = await cache.search_similar(embedding)
    if result:
        print(f"Cache hit! {result['similarity']:.2f}")
        return result['response']

asyncio.run(main())

Cache Invalidation Strategies

Effective cache invalidation is crucial for maintaining accuracy[4]. Here are proven strategies:

Invalidation Strategies

Time-Based (TTL)

Set expiration times based on data volatility

cache.set(key, value, ttl=3600)  # 1 hour TTL

Confidence-Based

Invalidate when similarity score is below threshold

if similarity < 0.85:
    return fetch_fresh_response()

Event-Driven

Invalidate on data updates or specific triggers

on_data_update:
    cache.invalidate_pattern("weather:*")

Sliding Window

Keep only recent N entries or last X hours

cache.trim_to_size(max_entries=10000)
cache.remove_older_than(hours=24)

Production Deployment

Here's a production-ready implementation with monitoring, error handling, and performance optimization:

Production Python Implementation
import redis
import openai
import hashlib
import json
import time
from typing import Optional, Dict
from dataclasses import dataclass
import logging
from prometheus_client import Counter, Histogram, Gauge

# Metrics
cache_hits = Counter('semantic_cache_hits', 'Number of cache hits')
cache_misses = Counter('semantic_cache_misses', 'Number of cache misses')
cache_latency = Histogram('semantic_cache_latency', 'Cache operation latency')
cache_size = Gauge('semantic_cache_size', 'Current cache size')

@dataclass
class CacheConfig:
    redis_url: str
    similarity_threshold: float = 0.85
    ttl_seconds: int = 3600
    max_cache_size: int = 10000
    embedding_dim: int = 1536

class ProductionSemanticCache:
    def __init__(self, config: CacheConfig):
        self.config = config
        self.redis_client = redis.from_url(config.redis_url)
        self.openai_client = openai.OpenAI()
        self.logger = logging.getLogger(__name__)
        
    def get_embedding(self, text: str) -> np.ndarray:
        """Generate embedding with retry logic"""
        max_retries = 3
        for attempt in range(max_retries):
            try:
                response = self.openai_client.embeddings.create(
                    input=text,
                    model="text-embedding-ada-002"
                )
                return np.array(response.data[0].embedding)
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                time.sleep(2 ** attempt)
    
    @cache_latency.time()
    def get_cached_response(self, query: str) -> Optional[str]:
        """Get cached response with monitoring"""
        try:
            embedding = self.get_embedding(query)
            
            # Search for similar queries
            results = self.search_similar(embedding)
            
            if results and results['similarity'] > self.config.similarity_threshold:
                cache_hits.inc()
                self.logger.info(f"Cache hit: {results['similarity']:.2f}")
                
                # Update access time for LRU
                self.update_access_time(results['key'])
                
                return results['response']
            else:
                cache_misses.inc()
                return None
                
        except Exception as e:
            self.logger.error(f"Cache error: {e}")
            cache_misses.inc()
            return None
    
    def add_to_cache(self, query: str, response: str):
        """Add to cache with size management"""
        try:
            # Check cache size
            if self.get_cache_size() >= self.config.max_cache_size:
                self.evict_lru()
            
            embedding = self.get_embedding(query)
            key = f"cache:{hashlib.md5(query.encode()).hexdigest()}"
            
            # Store with TTL
            self.redis_client.hset(key, mapping={
                'query': query,
                'response': response,
                'embedding': embedding.tobytes(),
                'timestamp': time.time(),
                'access_time': time.time()
            })
            self.redis_client.expire(key, self.config.ttl_seconds)
            
            cache_size.inc()
            
        except Exception as e:
            self.logger.error(f"Failed to add to cache: {e}")
    
    def evict_lru(self):
        """Evict least recently used entries"""
        # Get all cache keys
        keys = self.redis_client.keys("cache:*")
        
        # Sort by access time
        entries = []
        for key in keys:
            access_time = self.redis_client.hget(key, 'access_time')
            if access_time:
                entries.append((key, float(access_time)))
        
        # Remove oldest 10%
        entries.sort(key=lambda x: x[1])
        to_remove = len(entries) // 10
        
        for key, _ in entries[:to_remove]:
            self.redis_client.delete(key)
            cache_size.dec()

# Usage
config = CacheConfig(
    redis_url="redis://localhost:6379",
    similarity_threshold=0.85,
    ttl_seconds=3600
)

cache = ProductionSemanticCache(config)

# In your API endpoint
def handle_llm_request(query: str) -> str:
    # Try cache first
    cached = cache.get_cached_response(query)
    if cached:
        return cached
    
    # Call LLM
    response = call_llm(query)
    
    # Add to cache
    cache.add_to_cache(query, response)
    
    return response

Performance Benchmarks

Based on real-world implementations across different scales[5]:

Performance Metrics by Implementation
DatabaseQueries/secLatency (p99)Storage CostBest For
Redis50,000+<5ms$$$High-traffic, low-latency
Pinecone10,00020-50ms$$Managed, scalable
pgvector5,00010-30ms$Existing PostgreSQL
Weaviate8,00015-40ms$$GraphQL, multi-modal

Best Practices and Tips

1. Choose the Right Similarity Threshold
  • Start with 0.85 for general queries
  • Use 0.90+ for factual/technical content
  • Use 0.80-0.85 for conversational queries
  • Monitor and adjust based on false positive rate
2. Implement Proper Monitoring
  • Track cache hit/miss rates
  • Monitor similarity score distribution
  • Alert on low hit rates (<30%)
  • Log false positives for threshold tuning
3. Handle Edge Cases
  • Exclude time-sensitive queries (current time, stock prices)
  • Skip caching for personalized responses
  • Implement user-specific cache namespaces
  • Handle multilingual queries appropriately
4. Optimize for Scale
  • Use connection pooling for database connections
  • Implement batch embedding generation
  • Consider hybrid caching (memory + persistent)
  • Use approximate nearest neighbor algorithms for large scales

References

  1. [1] Microsoft Azure. "Optimize Azure OpenAI Applications with Semantic Caching" (2024)
  2. [2] OpenAI. "Embeddings Guide" (2024)
  3. [3] Redis. "Vector Similarity Search" (2024)
  4. [4] Pinecone. "Semantic Cache Documentation" (2024)
  5. [5] Weaviate. "Semantic Caching for LLMs" (2024)