Back to Integrations

Database + Vector Storage Integration for LLMs

Build powerful retrieval-augmented generation (RAG) systems with vector databases. This guide covers embedding generation, semantic search, hybrid retrieval, and production deployment strategies.

Quick Start

Set up a vector database for your LLM application:

# Python environment
pip install pinecone-client weaviate-client qdrant-client pgvector
pip install openai sentence-transformers numpy

# Node.js environment
npm install @pinecone-database/pinecone weaviate-ts-client
npm install @qdrant/js-client-rest openai

1. Vector Database Overview

Popular Vector Databases Comparison

DatabaseTypeBest ForKey Features
PineconeManaged SaaSProduction at scaleFully managed, auto-scaling, metadata filtering
WeaviateOpen SourceHybrid searchGraphQL API, modules system, built-in vectorization
QdrantOpen SourceHigh performanceRust-based, advanced filtering, distributed
pgvectorPostgreSQL ExtensionExisting Postgres usersSQL integration, ACID compliance, joins with relational data

Vector Database Client Setup

// src/vector-db/clients.ts
import { PineconeClient } from '@pinecone-database/pinecone'
import { QdrantClient } from '@qdrant/js-client-rest'
import weaviate from 'weaviate-ts-client'
import { Pool } from 'pg'

// Pinecone Client
export async function createPineconeClient() {
  const pinecone = new PineconeClient()
  await pinecone.init({
    apiKey: process.env.PINECONE_API_KEY!,
    environment: process.env.PINECONE_ENVIRONMENT!,
  })
  return pinecone
}

// Qdrant Client
export function createQdrantClient() {
  return new QdrantClient({
    url: process.env.QDRANT_URL || 'http://localhost:6333',
    apiKey: process.env.QDRANT_API_KEY,
  })
}

// Weaviate Client
export function createWeaviateClient() {
  return weaviate.client({
    scheme: 'http',
    host: process.env.WEAVIATE_HOST || 'localhost:8080',
    apiKey: process.env.WEAVIATE_API_KEY
      ? new weaviate.ApiKey(process.env.WEAVIATE_API_KEY)
      : undefined,
  })
}

// PostgreSQL with pgvector
export function createPgVectorClient() {
  return new Pool({
    connectionString: process.env.DATABASE_URL,
  })
}

2. Embedding Generation & Storage

Embedding Service

// src/services/embedding.service.ts
import { OpenAI } from 'openai'
import { encode } from 'gpt-3-encoder'

export interface EmbeddingOptions {
  model?: string
  batchSize?: number
  dimensions?: number
}

export class EmbeddingService {
  private openai: OpenAI
  private cache = new Map<string, number[]>()
  
  constructor() {
    this.openai = new OpenAI({
      apiKey: process.env.OPENAI_API_KEY,
    })
  }
  
  async generateEmbedding(
    text: string,
    options: EmbeddingOptions = {}
  ): Promise<number[]> {
    // Check cache first
    const cacheKey = `${text}-${options.model || 'text-embedding-3-small'}`
    if (this.cache.has(cacheKey)) {
      return this.cache.get(cacheKey)!
    }
    
    const response = await this.openai.embeddings.create({
      model: options.model || 'text-embedding-3-small',
      input: text,
      dimensions: options.dimensions, // Optional dimension reduction
    })
    
    const embedding = response.data[0].embedding
    this.cache.set(cacheKey, embedding)
    
    return embedding
  }
  
  async generateBatchEmbeddings(
    texts: string[],
    options: EmbeddingOptions = {}
  ): Promise<number[][]> {
    const batchSize = options.batchSize || 100
    const embeddings: number[][] = []
    
    for (let i = 0; i < texts.length; i += batchSize) {
      const batch = texts.slice(i, i + batchSize)
      
      const response = await this.openai.embeddings.create({
        model: options.model || 'text-embedding-3-small',
        input: batch,
        dimensions: options.dimensions,
      })
      
      embeddings.push(...response.data.map(d => d.embedding))
    }
    
    return embeddings
  }
  
  // Token-aware text chunking
  chunkText(text: string, maxTokens: number = 512): string[] {
    const chunks: string[] = []
    const sentences = text.match(/[^.!?]+[.!?]+/g) || [text]
    
    let currentChunk = ''
    let currentTokens = 0
    
    for (const sentence of sentences) {
      const sentenceTokens = encode(sentence).length
      
      if (currentTokens + sentenceTokens > maxTokens) {
        if (currentChunk) {
          chunks.push(currentChunk.trim())
        }
        currentChunk = sentence
        currentTokens = sentenceTokens
      } else {
        currentChunk += ' ' + sentence
        currentTokens += sentenceTokens
      }
    }
    
    if (currentChunk) {
      chunks.push(currentChunk.trim())
    }
    
    return chunks
  }
  
  // Smart chunking with overlap
  chunkTextWithOverlap(
    text: string,
    chunkSize: number = 1000,
    overlap: number = 200
  ): string[] {
    const chunks: string[] = []
    const stride = chunkSize - overlap
    
    for (let i = 0; i < text.length; i += stride) {
      chunks.push(text.slice(i, i + chunkSize))
    }
    
    return chunks
  }
}

Document Processing Pipeline

// src/pipelines/document-processor.ts
export interface Document {
  id: string
  title: string
  content: string
  metadata: Record<string, any>
}

export interface ProcessedChunk {
  id: string
  documentId: string
  content: string
  embedding: number[]
  metadata: {
    chunkIndex: number
    totalChunks: number
    startChar: number
    endChar: number
    [key: string]: any
  }
}

export class DocumentProcessor {
  constructor(
    private embeddingService: EmbeddingService,
    private vectorStore: VectorStore
  ) {}
  
  async processDocument(document: Document): Promise<ProcessedChunk[]> {
    // 1. Chunk the document
    const chunks = this.embeddingService.chunkTextWithOverlap(
      document.content,
      1000,
      200
    )
    
    // 2. Generate embeddings for each chunk
    const embeddings = await this.embeddingService.generateBatchEmbeddings(chunks)
    
    // 3. Create processed chunks with metadata
    const processedChunks: ProcessedChunk[] = chunks.map((chunk, index) => ({
      id: `${document.id}_chunk_${index}`,
      documentId: document.id,
      content: chunk,
      embedding: embeddings[index],
      metadata: {
        chunkIndex: index,
        totalChunks: chunks.length,
        startChar: index * 800, // Approximate due to overlap
        endChar: Math.min((index + 1) * 800 + 200, document.content.length),
        title: document.title,
        ...document.metadata,
      },
    }))
    
    // 4. Store in vector database
    await this.vectorStore.upsertBatch(processedChunks)
    
    return processedChunks
  }
  
  async processDocumentBatch(documents: Document[]): Promise<void> {
    // Process documents in parallel with concurrency limit
    const concurrency = 5
    const results: ProcessedChunk[][] = []
    
    for (let i = 0; i < documents.length; i += concurrency) {
      const batch = documents.slice(i, i + concurrency)
      const batchResults = await Promise.all(
        batch.map(doc => this.processDocument(doc))
      )
      results.push(...batchResults)
    }
    
    console.log(`Processed ${documents.length} documents, ${results.flat().length} chunks`)
  }
}

4. RAG Architecture

RAG Service Implementation

// src/services/rag.service.ts
export interface RAGOptions {
  searchOptions?: SearchOptions
  contextWindow?: number
  systemPrompt?: string
  temperature?: number
  model?: string
}

export class RAGService {
  constructor(
    private searchService: SemanticSearchService,
    private llmService: LLMService,
    private cache: CacheService
  ) {}
  
  async generate(
    query: string,
    options: RAGOptions = {}
  ): Promise<{
    answer: string
    sources: SearchResult[]
    cached: boolean
  }> {
    const {
      searchOptions = {},
      contextWindow = 3000,
      systemPrompt,
      temperature = 0.7,
      model = 'gpt-4-turbo-preview',
    } = options
    
    // Check cache first
    const cacheKey = this.generateCacheKey(query, options)
    const cached = await this.cache.get(cacheKey)
    if (cached) {
      return { ...cached, cached: true }
    }
    
    // 1. Retrieve relevant documents
    const searchResults = await this.searchService.search(query, searchOptions)
    
    if (searchResults.length === 0) {
      return {
        answer: "I couldn't find relevant information to answer your question.",
        sources: [],
        cached: false,
      }
    }
    
    // 2. Build context from search results
    const context = this.buildContext(searchResults, contextWindow)
    
    // 3. Create prompt
    const prompt = this.buildRAGPrompt(query, context, systemPrompt)
    
    // 4. Generate answer using LLM
    const answer = await this.llmService.generate({
      messages: [
        { role: 'system', content: prompt.system },
        { role: 'user', content: prompt.user },
      ],
      temperature,
      model,
    })
    
    // 5. Extract citations from answer
    const citedSources = this.extractCitations(answer, searchResults)
    
    // 6. Cache the result
    const result = {
      answer,
      sources: citedSources,
      cached: false,
    }
    
    await this.cache.set(cacheKey, result, 3600) // 1 hour TTL
    
    return result
  }
  
  async generateStreaming(
    query: string,
    options: RAGOptions = {},
    onToken: (token: string) => void
  ): Promise<{
    sources: SearchResult[]
    fullAnswer: string
  }> {
    // Retrieve context (same as non-streaming)
    const searchResults = await this.searchService.search(
      query,
      options.searchOptions
    )
    
    const context = this.buildContext(
      searchResults,
      options.contextWindow || 3000
    )
    
    const prompt = this.buildRAGPrompt(
      query,
      context,
      options.systemPrompt
    )
    
    // Stream the response
    let fullAnswer = ''
    await this.llmService.generateStream({
      messages: [
        { role: 'system', content: prompt.system },
        { role: 'user', content: prompt.user },
      ],
      temperature: options.temperature || 0.7,
      model: options.model || 'gpt-4-turbo-preview',
      onToken: (token) => {
        fullAnswer += token
        onToken(token)
      },
    })
    
    return {
      sources: searchResults,
      fullAnswer,
    }
  }
  
  private buildContext(
    results: SearchResult[],
    maxTokens: number
  ): string {
    let context = ''
    let currentTokens = 0
    
    for (const [index, result] of results.entries()) {
      const chunk = `[Source ${index + 1}] ${result.content}\n\n`
      const chunkTokens = this.estimateTokens(chunk)
      
      if (currentTokens + chunkTokens > maxTokens) {
        break
      }
      
      context += chunk
      currentTokens += chunkTokens
    }
    
    return context.trim()
  }
  
  private buildRAGPrompt(
    query: string,
    context: string,
    customSystemPrompt?: string
  ): { system: string; user: string } {
    const system = customSystemPrompt || `You are a helpful AI assistant powered by ParrotRouter.
Your task is to answer questions based on the provided context. 
Always cite your sources using [Source N] notation.
If the context doesn't contain relevant information, say so clearly.
Be concise and accurate in your responses.`
    
    const user = `Context:
${context}

Question: ${query}

Please provide a comprehensive answer based on the context above. Include source citations.`
    
    return { system, user }
  }
  
  private extractCitations(
    answer: string,
    sources: SearchResult[]
  ): SearchResult[] {
    const citationPattern = /\[Source (\d+)\]/g
    const citations = new Set<number>()
    
    let match
    while ((match = citationPattern.exec(answer)) !== null) {
      const sourceIndex = parseInt(match[1], 10) - 1
      if (sourceIndex >= 0 && sourceIndex < sources.length) {
        citations.add(sourceIndex)
      }
    }
    
    return Array.from(citations).map(index => sources[index])
  }
  
  private generateCacheKey(query: string, options: RAGOptions): string {
    const params = {
      query,
      model: options.model,
      temperature: options.temperature,
      searchOptions: options.searchOptions,
    }
    
    return `rag:${JSON.stringify(params)}`
  }
  
  private estimateTokens(text: string): number {
    // Rough estimation: 1 token ≈ 4 characters
    return Math.ceil(text.length / 4)
  }
}

// Advanced RAG with conversation memory
export class ConversationalRAGService extends RAGService {
  async generateWithMemory(
    query: string,
    conversationId: string,
    options: RAGOptions = {}
  ): Promise<{
    answer: string
    sources: SearchResult[]
    conversationId: string
  }> {
    // Retrieve conversation history
    const history = await this.getConversationHistory(conversationId)
    
    // Search with context
    const searchResults = await this.searchService.searchWithContext(
      query,
      history,
      options.searchOptions
    )
    
    // Build prompt with history
    const context = this.buildContext(searchResults, options.contextWindow || 3000)
    const prompt = this.buildConversationalPrompt(query, context, history)
    
    // Generate response
    const answer = await this.llmService.generate({
      messages: [
        { role: 'system', content: prompt.system },
        ...history.map(msg => ({
          role: msg.role as 'user' | 'assistant',
          content: msg.content,
        })),
        { role: 'user', content: query },
      ],
      temperature: options.temperature || 0.7,
      model: options.model || 'gpt-4-turbo-preview',
    })
    
    // Store in conversation history
    await this.addToConversation(conversationId, query, answer)
    
    return {
      answer,
      sources: searchResults,
      conversationId,
    }
  }
  
  private buildConversationalPrompt(
    query: string,
    context: string,
    history: Message[]
  ): { system: string } {
    return {
      system: `You are a helpful AI assistant powered by ParrotRouter.
You have access to a knowledge base and can answer questions based on the provided context.
Maintain conversation continuity and reference previous messages when relevant.
Always cite sources using [Source N] notation.

Current context from knowledge base:
${context}`,
    }
  }
  
  private async getConversationHistory(
    conversationId: string
  ): Promise<Message[]> {
    // Implementation would fetch from database
    return []
  }
  
  private async addToConversation(
    conversationId: string,
    query: string,
    answer: string
  ): Promise<void> {
    // Implementation would store in database
  }
}

RAG Best Practices

  • • Chunk documents with appropriate overlap (10-20%)
  • • Use metadata filtering to improve relevance
  • • Implement query expansion for better recall
  • • Cache frequent queries to reduce costs
  • • Monitor retrieval quality and adjust thresholds

6. Database Schema Design

PostgreSQL with pgvector Schema

-- Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;

-- Documents table
CREATE TABLE documents (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    title VARCHAR(255) NOT NULL,
    source VARCHAR(255),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    metadata JSONB DEFAULT '{}'::jsonb
);

-- Document chunks with embeddings
CREATE TABLE document_chunks (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    document_id UUID REFERENCES documents(id) ON DELETE CASCADE,
    chunk_index INTEGER NOT NULL,
    content TEXT NOT NULL,
    embedding vector(1536), -- OpenAI embedding dimension
    char_start INTEGER,
    char_end INTEGER,
    metadata JSONB DEFAULT '{}'::jsonb,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(document_id, chunk_index)
);

-- Create indexes for vector similarity search
CREATE INDEX document_chunks_embedding_idx ON document_chunks 
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);

-- Conversations table
CREATE TABLE conversations (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL,
    title VARCHAR(255),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    metadata JSONB DEFAULT '{}'::jsonb
);

-- Messages table
CREATE TABLE messages (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE,
    role VARCHAR(20) NOT NULL CHECK (role IN ('user', 'assistant', 'system')),
    content TEXT NOT NULL,
    embedding vector(1536),
    tokens_used INTEGER,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    metadata JSONB DEFAULT '{}'::jsonb
);

-- Search history for analytics
CREATE TABLE search_queries (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID,
    query TEXT NOT NULL,
    query_embedding vector(1536),
    results_count INTEGER,
    clicked_results JSONB DEFAULT '[]'::jsonb,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Cached embeddings
CREATE TABLE embedding_cache (
    text_hash VARCHAR(64) PRIMARY KEY,
    text TEXT NOT NULL,
    embedding vector(1536) NOT NULL,
    model VARCHAR(50) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Function for semantic search
CREATE OR REPLACE FUNCTION search_documents(
    query_embedding vector(1536),
    match_threshold float DEFAULT 0.7,
    match_count int DEFAULT 10
)
RETURNS TABLE (
    chunk_id UUID,
    document_id UUID,
    content TEXT,
    similarity float,
    metadata JSONB
)
LANGUAGE plpgsql
AS $$
BEGIN
    RETURN QUERY
    SELECT
        dc.id,
        dc.document_id,
        dc.content,
        1 - (dc.embedding <=> query_embedding) AS similarity,
        dc.metadata
    FROM document_chunks dc
    WHERE 1 - (dc.embedding <=> query_embedding) > match_threshold
    ORDER BY dc.embedding <=> query_embedding
    LIMIT match_count;
END;
$$;

-- Hybrid search function (vector + full-text)
CREATE OR REPLACE FUNCTION hybrid_search(
    query_text TEXT,
    query_embedding vector(1536),
    alpha float DEFAULT 0.5,
    match_count int DEFAULT 10
)
RETURNS TABLE (
    chunk_id UUID,
    document_id UUID,
    content TEXT,
    vector_similarity float,
    text_rank float,
    hybrid_score float,
    metadata JSONB
)
LANGUAGE plpgsql
AS $$
BEGIN
    RETURN QUERY
    WITH vector_search AS (
        SELECT
            dc.id,
            dc.document_id,
            dc.content,
            1 - (dc.embedding <=> query_embedding) AS similarity,
            dc.metadata
        FROM document_chunks dc
        ORDER BY dc.embedding <=> query_embedding
        LIMIT match_count * 2
    ),
    text_search AS (
        SELECT
            dc.id,
            ts_rank(to_tsvector('english', dc.content), 
                   plainto_tsquery('english', query_text)) AS rank
        FROM document_chunks dc
        WHERE to_tsvector('english', dc.content) @@ 
              plainto_tsquery('english', query_text)
        ORDER BY rank DESC
        LIMIT match_count * 2
    )
    SELECT DISTINCT
        vs.id,
        vs.document_id,
        vs.content,
        vs.similarity,
        COALESCE(ts.rank, 0) AS text_rank,
        (alpha * vs.similarity + (1 - alpha) * COALESCE(ts.rank, 0)) AS hybrid_score,
        vs.metadata
    FROM vector_search vs
    LEFT JOIN text_search ts ON vs.id = ts.id
    ORDER BY hybrid_score DESC
    LIMIT match_count;
END;
$$;

TypeScript Data Access Layer

// src/dal/pgvector-dal.ts
import { Pool } from 'pg'
import { v4 as uuidv4 } from 'uuid'

export class PgVectorDAL {
  constructor(private pool: Pool) {}
  
  async initializeSchema(): Promise<void> {
    // Run schema creation SQL
    const schemaSQL = `...` // Schema from above
    await this.pool.query(schemaSQL)
  }
  
  async insertDocument(
    title: string,
    source: string,
    metadata: Record<string, any> = {}
  ): Promise<string> {
    const result = await this.pool.query(
      `INSERT INTO documents (title, source, metadata) 
       VALUES ($1, $2, $3) 
       RETURNING id`,
      [title, source, JSON.stringify(metadata)]
    )
    
    return result.rows[0].id
  }
  
  async insertChunks(chunks: ProcessedChunk[]): Promise<void> {
    const values = chunks.map(chunk => [
      chunk.documentId,
      chunk.metadata.chunkIndex,
      chunk.content,
      JSON.stringify(chunk.embedding),
      chunk.metadata.startChar,
      chunk.metadata.endChar,
      JSON.stringify(chunk.metadata),
    ])
    
    // Bulk insert using COPY for performance
    const copyStream = this.pool.query(
      `COPY document_chunks (
        document_id, chunk_index, content, embedding, 
        char_start, char_end, metadata
      ) FROM STDIN WITH (FORMAT CSV)`
    )
    
    for (const row of values) {
      copyStream.write(row.join(',') + '\n')
    }
    
    copyStream.end()
  }
  
  async semanticSearch(
    embedding: number[],
    threshold: number = 0.7,
    limit: number = 10
  ): Promise<SearchResult[]> {
    const result = await this.pool.query(
      'SELECT * FROM search_documents($1, $2, $3)',
      [JSON.stringify(embedding), threshold, limit]
    )
    
    return result.rows.map(row => ({
      id: row.chunk_id,
      score: row.similarity,
      content: row.content,
      metadata: row.metadata,
    }))
  }
  
  async hybridSearch(
    query: string,
    embedding: number[],
    alpha: number = 0.5,
    limit: number = 10
  ): Promise<SearchResult[]> {
    const result = await this.pool.query(
      'SELECT * FROM hybrid_search($1, $2, $3, $4)',
      [query, JSON.stringify(embedding), alpha, limit]
    )
    
    return result.rows.map(row => ({
      id: row.chunk_id,
      score: row.hybrid_score,
      content: row.content,
      metadata: {
        ...row.metadata,
        vectorSimilarity: row.vector_similarity,
        textRank: row.text_rank,
      },
    }))
  }
  
  // Embedding cache operations
  async getCachedEmbedding(
    text: string,
    model: string
  ): Promise<number[] | null> {
    const hash = this.hashText(text)
    const result = await this.pool.query(
      `UPDATE embedding_cache 
       SET last_accessed = CURRENT_TIMESTAMP 
       WHERE text_hash = $1 AND model = $2 
       RETURNING embedding`,
      [hash, model]
    )
    
    if (result.rows.length > 0) {
      return JSON.parse(result.rows[0].embedding)
    }
    
    return null
  }
  
  async setCachedEmbedding(
    text: string,
    embedding: number[],
    model: string
  ): Promise<void> {
    const hash = this.hashText(text)
    await this.pool.query(
      `INSERT INTO embedding_cache (text_hash, text, embedding, model) 
       VALUES ($1, $2, $3, $4) 
       ON CONFLICT (text_hash) 
       DO UPDATE SET last_accessed = CURRENT_TIMESTAMP`,
      [hash, text, JSON.stringify(embedding), model]
    )
  }
  
  private hashText(text: string): string {
    // Simple hash for demo - use crypto.createHash in production
    return Buffer.from(text).toString('base64').substring(0, 64)
  }
}

7. Caching Strategies

Multi-Level Caching

// src/services/cache.service.ts
import Redis from 'ioredis'
import LRU from 'lru-cache'

export class MultiLevelCache {
  private memoryCache: LRU<string, any>
  private redisCache: Redis
  
  constructor(redis: Redis, options?: {
    maxMemoryItems?: number
    memoryTTL?: number
  }) {
    this.redisCache = redis
    this.memoryCache = new LRU({
      max: options?.maxMemoryItems || 1000,
      ttl: options?.memoryTTL || 1000 * 60 * 5, // 5 minutes
    })
  }
  
  async get<T>(key: string): Promise<T | null> {
    // Check memory cache first
    const memoryResult = this.memoryCache.get(key)
    if (memoryResult !== undefined) {
      return memoryResult
    }
    
    // Check Redis cache
    const redisResult = await this.redisCache.get(key)
    if (redisResult) {
      const parsed = JSON.parse(redisResult)
      // Populate memory cache
      this.memoryCache.set(key, parsed)
      return parsed
    }
    
    return null
  }
  
  async set<T>(
    key: string,
    value: T,
    ttlSeconds?: number
  ): Promise<void> {
    // Set in both caches
    this.memoryCache.set(key, value)
    
    const serialized = JSON.stringify(value)
    if (ttlSeconds) {
      await this.redisCache.setex(key, ttlSeconds, serialized)
    } else {
      await this.redisCache.set(key, serialized)
    }
  }
  
  async invalidate(pattern: string): Promise<void> {
    // Clear from memory cache
    for (const key of this.memoryCache.keys()) {
      if (key.includes(pattern)) {
        this.memoryCache.delete(key)
      }
    }
    
    // Clear from Redis
    const keys = await this.redisCache.keys(pattern)
    if (keys.length > 0) {
      await this.redisCache.del(...keys)
    }
  }
}

// Embedding cache service
export class EmbeddingCacheService {
  constructor(
    private cache: MultiLevelCache,
    private dal: PgVectorDAL
  ) {}
  
  async getEmbedding(
    text: string,
    model: string
  ): Promise<number[] | null> {
    const cacheKey = `embedding:${model}:${this.hashText(text)}`
    
    // Check multi-level cache
    const cached = await this.cache.get<number[]>(cacheKey)
    if (cached) {
      return cached
    }
    
    // Check persistent storage
    const stored = await this.dal.getCachedEmbedding(text, model)
    if (stored) {
      // Populate cache
      await this.cache.set(cacheKey, stored, 3600)
      return stored
    }
    
    return null
  }
  
  async setEmbedding(
    text: string,
    embedding: number[],
    model: string
  ): Promise<void> {
    const cacheKey = `embedding:${model}:${this.hashText(text)}`
    
    // Store in all levels
    await Promise.all([
      this.cache.set(cacheKey, embedding, 3600),
      this.dal.setCachedEmbedding(text, embedding, model),
    ])
  }
  
  private hashText(text: string): string {
    return Buffer.from(text).toString('base64').substring(0, 32)
  }
}

// Query result caching
export class QueryCacheService {
  constructor(
    private cache: MultiLevelCache,
    private ttlSeconds: number = 3600
  ) {}
  
  async getCachedResults(
    query: string,
    filters?: Record<string, any>
  ): Promise<SearchResult[] | null> {
    const cacheKey = this.generateQueryCacheKey(query, filters)
    return this.cache.get<SearchResult[]>(cacheKey)
  }
  
  async setCachedResults(
    query: string,
    results: SearchResult[],
    filters?: Record<string, any>
  ): Promise<void> {
    const cacheKey = this.generateQueryCacheKey(query, filters)
    await this.cache.set(cacheKey, results, this.ttlSeconds)
  }
  
  private generateQueryCacheKey(
    query: string,
    filters?: Record<string, any>
  ): string {
    const params = {
      q: query,
      f: filters || {},
    }
    
    return `query:${JSON.stringify(params)}`
  }
}

8. Performance Optimization

Batch Processing Optimization

// src/optimization/batch-processor.ts
export class BatchProcessor {
  private queue: Map<string, any[]> = new Map()
  private timers: Map<string, NodeJS.Timeout> = new Map()
  
  constructor(
    private batchSize: number = 100,
    private flushInterval: number = 1000
  ) {}
  
  async addToBatch<T>(
    batchKey: string,
    item: T,
    processor: (items: T[]) => Promise<void>
  ): Promise<void> {
    // Get or create batch
    if (!this.queue.has(batchKey)) {
      this.queue.set(batchKey, [])
    }
    
    const batch = this.queue.get(batchKey)!
    batch.push(item)
    
    // Process if batch is full
    if (batch.length >= this.batchSize) {
      await this.flush(batchKey, processor)
    } else {
      // Set timer for time-based flush
      this.setFlushTimer(batchKey, processor)
    }
  }
  
  private setFlushTimer<T>(
    batchKey: string,
    processor: (items: T[]) => Promise<void>
  ): void {
    // Clear existing timer
    const existingTimer = this.timers.get(batchKey)
    if (existingTimer) {
      clearTimeout(existingTimer)
    }
    
    // Set new timer
    const timer = setTimeout(() => {
      this.flush(batchKey, processor)
    }, this.flushInterval)
    
    this.timers.set(batchKey, timer)
  }
  
  private async flush<T>(
    batchKey: string,
    processor: (items: T[]) => Promise<void>
  ): Promise<void> {
    const batch = this.queue.get(batchKey)
    if (!batch || batch.length === 0) {
      return
    }
    
    // Clear batch and timer
    this.queue.delete(batchKey)
    const timer = this.timers.get(batchKey)
    if (timer) {
      clearTimeout(timer)
      this.timers.delete(batchKey)
    }
    
    // Process batch
    await processor(batch)
  }
}

// Parallel processing for large datasets
export class ParallelProcessor {
  constructor(
    private concurrency: number = 5
  ) {}
  
  async processInParallel<T, R>(
    items: T[],
    processor: (item: T) => Promise<R>,
    onProgress?: (completed: number, total: number) => void
  ): Promise<R[]> {
    const results: R[] = new Array(items.length)
    let completed = 0
    
    // Process in chunks
    for (let i = 0; i < items.length; i += this.concurrency) {
      const chunk = items.slice(i, i + this.concurrency)
      const chunkResults = await Promise.all(
        chunk.map((item, index) => processor(item))
      )
      
      // Store results in correct positions
      for (let j = 0; j < chunkResults.length; j++) {
        results[i + j] = chunkResults[j]
      }
      
      completed += chunk.length
      onProgress?.(completed, items.length)
    }
    
    return results
  }
}

// Index optimization service
export class IndexOptimizationService {
  constructor(
    private vectorStore: VectorStore,
    private monitoring: MonitoringService
  ) {}
  
  async optimizeIndex(collectionName: string): Promise<void> {
    const metrics = await this.analyzeIndexPerformance(collectionName)
    
    if (metrics.avgQueryTime > 100) {
      // Re-index with better parameters
      await this.reindexCollection(collectionName, {
        nlist: Math.ceil(Math.sqrt(metrics.totalVectors)),
        nprobe: 10,
      })
    }
    
    if (metrics.fragmentationRatio > 0.3) {
      // Compact index
      await this.compactIndex(collectionName)
    }
  }
  
  private async analyzeIndexPerformance(
    collectionName: string
  ): Promise<any> {
    // Analyze query performance
    const queryMetrics = await this.monitoring.getQueryMetrics(collectionName)
    
    return {
      avgQueryTime: queryMetrics.avgDuration,
      totalVectors: queryMetrics.totalVectors,
      fragmentationRatio: queryMetrics.fragmentation,
    }
  }
  
  private async reindexCollection(
    collectionName: string,
    params: any
  ): Promise<void> {
    // Implementation depends on vector database
    console.log(`Reindexing ${collectionName} with params:`, params)
  }
  
  private async compactIndex(collectionName: string): Promise<void> {
    // Implementation depends on vector database
    console.log(`Compacting index for ${collectionName}`)
  }
}

Performance Tips

  • • Use appropriate index types (IVF, HNSW) based on dataset size
  • • Implement query result caching for repeated searches
  • • Batch embedding generation to reduce API calls
  • • Monitor and optimize vector dimensions if possible
  • • Use async/parallel processing for large datasets

9. Cost Optimization

Cost Management Service

// src/services/cost-management.service.ts
export interface CostMetrics {
  embeddingCosts: number
  storageCosts: number
  queryCosts: number
  totalCosts: number
}

export class CostManagementService {
  private costPerEmbedding = 0.0001 // $0.0001 per embedding
  private costPerQuery = 0.00002 // $0.00002 per query
  private costPerGBMonth = 0.25 // $0.25 per GB/month for storage
  
  constructor(
    private monitoring: MonitoringService,
    private cache: CacheService
  ) {}
  
  async calculateMonthlyCosts(): Promise<CostMetrics> {
    const usage = await this.monitoring.getMonthlyUsage()
    
    const embeddingCosts = usage.embeddingsGenerated * this.costPerEmbedding
    const queryCosts = usage.queriesProcessed * this.costPerQuery
    const storageCosts = (usage.storageGB * this.costPerGBMonth)
    
    return {
      embeddingCosts,
      storageCosts,
      queryCosts,
      totalCosts: embeddingCosts + storageCosts + queryCosts,
    }
  }
  
  async optimizeCosts(): Promise<{
    recommendations: string[]
    potentialSavings: number
  }> {
    const recommendations: string[] = []
    let potentialSavings = 0
    
    // Check cache hit rate
    const cacheStats = await this.cache.getStats()
    if (cacheStats.hitRate < 0.5) {
      recommendations.push(
        'Improve cache hit rate by increasing cache size or TTL'
      )
      potentialSavings += this.estimateCacheSavings(cacheStats)
    }
    
    // Check for duplicate embeddings
    const duplicates = await this.findDuplicateEmbeddings()
    if (duplicates > 100) {
      recommendations.push(
        `Remove ${duplicates} duplicate embeddings to save storage`
      )
      potentialSavings += duplicates * 0.001 // Rough estimate
    }
    
    // Check embedding model usage
    const modelUsage = await this.monitoring.getModelUsage()
    if (modelUsage['text-embedding-3-large'] > modelUsage['text-embedding-3-small'] * 0.5) {
      recommendations.push(
        'Consider using smaller embedding models for non-critical content'
      )
      potentialSavings += this.estimateModelSavings(modelUsage)
    }
    
    return { recommendations, potentialSavings }
  }
  
  private estimateCacheSavings(stats: any): number {
    const missedQueries = stats.misses
    const potentialCacheHits = missedQueries * 0.3 // Assume 30% could be cached
    return potentialCacheHits * this.costPerEmbedding
  }
  
  private estimateModelSavings(usage: any): number {
    const largeModelCost = 0.00013
    const smallModelCost = 0.00002
    const potentialSwitches = usage['text-embedding-3-large'] * 0.7
    
    return potentialSwitches * (largeModelCost - smallModelCost)
  }
  
  async findDuplicateEmbeddings(): Promise<number> {
    // Implementation would query database for duplicates
    return 0
  }
}

// Usage tracking
export class UsageTracker {
  constructor(private redis: Redis) {}
  
  async trackEmbedding(
    model: string,
    inputTokens: number
  ): Promise<void> {
    const date = new Date().toISOString().split('T')[0]
    const key = `usage:embeddings:${date}`
    
    await this.redis.hincrby(key, model, 1)
    await this.redis.hincrby(key, `${model}:tokens`, inputTokens)
    await this.redis.expire(key, 86400 * 90) // 90 days retention
  }
  
  async trackQuery(
    queryType: string,
    resultCount: number
  ): Promise<void> {
    const date = new Date().toISOString().split('T')[0]
    const key = `usage:queries:${date}`
    
    await this.redis.hincrby(key, queryType, 1)
    await this.redis.hincrby(key, `${queryType}:results`, resultCount)
    await this.redis.expire(key, 86400 * 90)
  }
  
  async getUsageReport(
    startDate: string,
    endDate: string
  ): Promise<any> {
    const report = {
      embeddings: {},
      queries: {},
      totalCost: 0,
    }
    
    // Aggregate usage data
    const dates = this.getDateRange(startDate, endDate)
    
    for (const date of dates) {
      // Get embedding usage
      const embeddingKey = `usage:embeddings:${date}`
      const embeddings = await this.redis.hgetall(embeddingKey)
      this.aggregateUsage(report.embeddings, embeddings)
      
      // Get query usage
      const queryKey = `usage:queries:${date}`
      const queries = await this.redis.hgetall(queryKey)
      this.aggregateUsage(report.queries, queries)
    }
    
    // Calculate costs
    report.totalCost = this.calculateTotalCost(report)
    
    return report
  }
  
  private getDateRange(start: string, end: string): string[] {
    const dates: string[] = []
    const current = new Date(start)
    const endDate = new Date(end)
    
    while (current <= endDate) {
      dates.push(current.toISOString().split('T')[0])
      current.setDate(current.getDate() + 1)
    }
    
    return dates
  }
  
  private aggregateUsage(target: any, source: any): void {
    for (const [key, value] of Object.entries(source)) {
      target[key] = (target[key] || 0) + parseInt(value as string, 10)
    }
  }
  
  private calculateTotalCost(usage: any): number {
    // Calculate based on model pricing
    const embeddingCosts = {
      'text-embedding-3-small': 0.00002,
      'text-embedding-3-large': 0.00013,
      'text-embedding-ada-002': 0.0001,
    }
    
    let total = 0
    
    for (const [model, count] of Object.entries(usage.embeddings)) {
      if (embeddingCosts[model]) {
        total += embeddingCosts[model] * (count as number)
      }
    }
    
    return total
  }
}

10. Production Deployment

Deployment Configuration

# docker-compose.yml
version: '3.8'

services:
  # PostgreSQL with pgvector
  postgres:
    image: ankane/pgvector:latest
    environment:
      POSTGRES_DB: vectordb
      POSTGRES_USER: vectoruser
      POSTGRES_PASSWORD: vectorpass
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U vectoruser"]
      interval: 10s
      timeout: 5s
      retries: 5

  # Qdrant vector database
  qdrant:
    image: qdrant/qdrant:latest
    ports:
      - "6333:6333"
      - "6334:6334"
    volumes:
      - qdrant_data:/qdrant/storage
    environment:
      QDRANT__SERVICE__GRPC_PORT: 6334

  # Weaviate vector database
  weaviate:
    image: semitechnologies/weaviate:latest
    ports:
      - "8080:8080"
    environment:
      QUERY_DEFAULTS_LIMIT: 25
      AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
      PERSISTENCE_DATA_PATH: '/var/lib/weaviate'
      DEFAULT_VECTORIZER_MODULE: 'text2vec-openai'
      ENABLE_MODULES: 'text2vec-openai'
      OPENAI_APIKEY: ${OPENAI_API_KEY}
    volumes:
      - weaviate_data:/var/lib/weaviate

  # Redis for caching
  redis:
    image: redis:7-alpine
    command: redis-server --appendonly yes
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

  # Application
  app:
    build: .
    environment:
      DATABASE_URL: postgresql://vectoruser:vectorpass@postgres:5432/vectordb
      QDRANT_URL: http://qdrant:6333
      WEAVIATE_URL: http://weaviate:8080
      REDIS_URL: redis://redis:6379
      OPENAI_API_KEY: ${OPENAI_API_KEY}
    depends_on:
      - postgres
      - qdrant
      - weaviate
      - redis
    ports:
      - "3000:3000"

volumes:
  postgres_data:
  qdrant_data:
  weaviate_data:
  redis_data:

Kubernetes Deployment

# k8s/vector-db-deployment.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: qdrant
spec:
  serviceName: qdrant
  replicas: 3
  selector:
    matchLabels:
      app: qdrant
  template:
    metadata:
      labels:
        app: qdrant
    spec:
      containers:
      - name: qdrant
        image: qdrant/qdrant:latest
        ports:
        - containerPort: 6333
          name: http
        - containerPort: 6334
          name: grpc
        volumeMounts:
        - name: storage
          mountPath: /qdrant/storage
        resources:
          requests:
            memory: "2Gi"
            cpu: "1"
          limits:
            memory: "4Gi"
            cpu: "2"
        livenessProbe:
          httpGet:
            path: /
            port: 6333
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /readyz
            port: 6333
          initialDelaySeconds: 5
          periodSeconds: 5
  volumeClaimTemplates:
  - metadata:
      name: storage
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 100Gi
---
apiVersion: v1
kind: Service
metadata:
  name: qdrant
spec:
  selector:
    app: qdrant
  clusterIP: None
  ports:
  - port: 6333
    name: http
  - port: 6334
    name: grpc

Monitoring Setup

// src/monitoring/vector-db-monitoring.ts
import { Registry, Histogram, Counter, Gauge } from 'prom-client'

export class VectorDBMonitoring {
  private registry: Registry
  
  // Metrics
  private embeddingLatency: Histogram<string>
  private searchLatency: Histogram<string>
  private searchResultCount: Histogram<string>
  private indexSize: Gauge<string>
  private cacheHitRate: Gauge<string>
  
  constructor() {
    this.registry = new Registry()
    
    this.embeddingLatency = new Histogram({
      name: 'embedding_generation_duration_seconds',
      help: 'Time to generate embeddings',
      labelNames: ['model'],
      buckets: [0.1, 0.5, 1, 2, 5],
      registers: [this.registry],
    })
    
    this.searchLatency = new Histogram({
      name: 'vector_search_duration_seconds',
      help: 'Time to perform vector search',
      labelNames: ['index', 'type'],
      buckets: [0.01, 0.05, 0.1, 0.5, 1],
      registers: [this.registry],
    })
    
    this.searchResultCount = new Histogram({
      name: 'search_result_count',
      help: 'Number of results returned',
      labelNames: ['index'],
      buckets: [0, 1, 5, 10, 20, 50, 100],
      registers: [this.registry],
    })
    
    this.indexSize = new Gauge({
      name: 'vector_index_size',
      help: 'Number of vectors in index',
      labelNames: ['index'],
      registers: [this.registry],
    })
    
    this.cacheHitRate = new Gauge({
      name: 'embedding_cache_hit_rate',
      help: 'Cache hit rate for embeddings',
      registers: [this.registry],
    })
  }
  
  recordEmbeddingGeneration(model: string, duration: number) {
    this.embeddingLatency.observe({ model }, duration / 1000)
  }
  
  recordSearch(
    index: string,
    type: 'semantic' | 'hybrid',
    duration: number,
    resultCount: number
  ) {
    this.searchLatency.observe({ index, type }, duration / 1000)
    this.searchResultCount.observe({ index }, resultCount)
  }
  
  updateIndexSize(index: string, size: number) {
    this.indexSize.set({ index }, size)
  }
  
  updateCacheHitRate(rate: number) {
    this.cacheHitRate.set(rate)
  }
  
  async getMetrics(): Promise<string> {
    return this.registry.metrics()
  }
}

✓ Production Checklist

  • ☐ Configure index parameters for your dataset size
  • ☐ Implement backup and recovery procedures
  • ☐ Set up monitoring and alerting
  • ☐ Configure auto-scaling for vector databases
  • ☐ Implement access control and encryption
  • ☐ Set up cost monitoring and alerts
  • ☐ Create runbooks for common operations
  • ☐ Test disaster recovery procedures
  • ☐ Document embedding model versions
  • ☐ Plan for index maintenance windows

Complete RAG System Example

// Complete RAG system with all components
import { config } from './config'
import { createPineconeClient, createQdrantClient } from './vector-db/clients'
import { EmbeddingService } from './services/embedding.service'
import { SemanticSearchService } from './services/semantic-search.service'
import { RAGService } from './services/rag.service'
import { MultiLevelCache } from './services/cache.service'
import { DocumentProcessor } from './pipelines/document-processor'
import { VectorDBMonitoring } from './monitoring/vector-db-monitoring'
import Redis from 'ioredis'

async function initializeRAGSystem() {
  // Initialize clients
  const redis = new Redis(config.redisUrl)
  const pinecone = await createPineconeClient()
  const vectorStore = new PineconeVectorStore(pinecone, config.pineconeIndex)
  
  // Initialize services
  const cache = new MultiLevelCache(redis)
  const embeddingService = new EmbeddingService()
  const searchService = new SemanticSearchService(
    embeddingService,
    vectorStore
  )
  const ragService = new RAGService(
    searchService,
    llmService,
    cache
  )
  
  // Initialize monitoring
  const monitoring = new VectorDBMonitoring()
  
  // Document processing pipeline
  const processor = new DocumentProcessor(
    embeddingService,
    vectorStore
  )
  
  return {
    ragService,
    processor,
    monitoring,
    
    // Process and index documents
    async indexDocuments(documents: Document[]) {
      console.log(`Indexing ${documents.length} documents...`)
      
      const startTime = Date.now()
      await processor.processDocumentBatch(documents)
      const duration = Date.now() - startTime
      
      monitoring.recordEmbeddingGeneration(
        'text-embedding-3-small',
        duration
      )
      
      console.log(`Indexed in ${duration}ms`)
    },
    
    // Perform RAG query
    async query(question: string, options?: RAGOptions) {
      const startTime = Date.now()
      
      const result = await ragService.generate(question, options)
      
      const duration = Date.now() - startTime
      monitoring.recordSearch(
        config.pineconeIndex,
        'semantic',
        duration,
        result.sources.length
      )
      
      return result
    },
    
    // Get system metrics
    async getMetrics() {
      return monitoring.getMetrics()
    },
  }
}

// Usage example
async function main() {
  const rag = await initializeRAGSystem()
  
  // Index some documents
  await rag.indexDocuments([
    {
      id: '1',
      title: 'Introduction to Vector Databases',
      content: 'Vector databases are specialized systems...',
      metadata: { category: 'tutorial' },
    },
  ])
  
  // Query the system
  const result = await rag.query(
    'What are vector databases and how do they work?'
  )
  
  console.log('Answer:', result.answer)
  console.log('Sources:', result.sources)
}

main().catch(console.error)

References & Citations

Ready to Build RAG Systems?

Implement powerful retrieval-augmented generation with vector databases using ParrotRouter's unified API.

References
  1. [1] AWS. "Lambda Documentation" (2024)
  2. [2] Vercel. "Streaming Responses" (2024)
  3. [3] GitHub. "OpenAI Node.js Library" (2024)