Quick Start
Set up a vector database for your LLM application:
# Python environment pip install pinecone-client weaviate-client qdrant-client pgvector pip install openai sentence-transformers numpy # Node.js environment npm install @pinecone-database/pinecone weaviate-ts-client npm install @qdrant/js-client-rest openai
1. Vector Database Overview
Popular Vector Databases Comparison
Database | Type | Best For | Key Features |
---|---|---|---|
Pinecone | Managed SaaS | Production at scale | Fully managed, auto-scaling, metadata filtering |
Weaviate | Open Source | Hybrid search | GraphQL API, modules system, built-in vectorization |
Qdrant | Open Source | High performance | Rust-based, advanced filtering, distributed |
pgvector | PostgreSQL Extension | Existing Postgres users | SQL integration, ACID compliance, joins with relational data |
Vector Database Client Setup
// src/vector-db/clients.ts import { PineconeClient } from '@pinecone-database/pinecone' import { QdrantClient } from '@qdrant/js-client-rest' import weaviate from 'weaviate-ts-client' import { Pool } from 'pg' // Pinecone Client export async function createPineconeClient() { const pinecone = new PineconeClient() await pinecone.init({ apiKey: process.env.PINECONE_API_KEY!, environment: process.env.PINECONE_ENVIRONMENT!, }) return pinecone } // Qdrant Client export function createQdrantClient() { return new QdrantClient({ url: process.env.QDRANT_URL || 'http://localhost:6333', apiKey: process.env.QDRANT_API_KEY, }) } // Weaviate Client export function createWeaviateClient() { return weaviate.client({ scheme: 'http', host: process.env.WEAVIATE_HOST || 'localhost:8080', apiKey: process.env.WEAVIATE_API_KEY ? new weaviate.ApiKey(process.env.WEAVIATE_API_KEY) : undefined, }) } // PostgreSQL with pgvector export function createPgVectorClient() { return new Pool({ connectionString: process.env.DATABASE_URL, }) }
2. Embedding Generation & Storage
Embedding Service
// src/services/embedding.service.ts import { OpenAI } from 'openai' import { encode } from 'gpt-3-encoder' export interface EmbeddingOptions { model?: string batchSize?: number dimensions?: number } export class EmbeddingService { private openai: OpenAI private cache = new Map<string, number[]>() constructor() { this.openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY, }) } async generateEmbedding( text: string, options: EmbeddingOptions = {} ): Promise<number[]> { // Check cache first const cacheKey = `${text}-${options.model || 'text-embedding-3-small'}` if (this.cache.has(cacheKey)) { return this.cache.get(cacheKey)! } const response = await this.openai.embeddings.create({ model: options.model || 'text-embedding-3-small', input: text, dimensions: options.dimensions, // Optional dimension reduction }) const embedding = response.data[0].embedding this.cache.set(cacheKey, embedding) return embedding } async generateBatchEmbeddings( texts: string[], options: EmbeddingOptions = {} ): Promise<number[][]> { const batchSize = options.batchSize || 100 const embeddings: number[][] = [] for (let i = 0; i < texts.length; i += batchSize) { const batch = texts.slice(i, i + batchSize) const response = await this.openai.embeddings.create({ model: options.model || 'text-embedding-3-small', input: batch, dimensions: options.dimensions, }) embeddings.push(...response.data.map(d => d.embedding)) } return embeddings } // Token-aware text chunking chunkText(text: string, maxTokens: number = 512): string[] { const chunks: string[] = [] const sentences = text.match(/[^.!?]+[.!?]+/g) || [text] let currentChunk = '' let currentTokens = 0 for (const sentence of sentences) { const sentenceTokens = encode(sentence).length if (currentTokens + sentenceTokens > maxTokens) { if (currentChunk) { chunks.push(currentChunk.trim()) } currentChunk = sentence currentTokens = sentenceTokens } else { currentChunk += ' ' + sentence currentTokens += sentenceTokens } } if (currentChunk) { chunks.push(currentChunk.trim()) } return chunks } // Smart chunking with overlap chunkTextWithOverlap( text: string, chunkSize: number = 1000, overlap: number = 200 ): string[] { const chunks: string[] = [] const stride = chunkSize - overlap for (let i = 0; i < text.length; i += stride) { chunks.push(text.slice(i, i + chunkSize)) } return chunks } }
Document Processing Pipeline
// src/pipelines/document-processor.ts export interface Document { id: string title: string content: string metadata: Record<string, any> } export interface ProcessedChunk { id: string documentId: string content: string embedding: number[] metadata: { chunkIndex: number totalChunks: number startChar: number endChar: number [key: string]: any } } export class DocumentProcessor { constructor( private embeddingService: EmbeddingService, private vectorStore: VectorStore ) {} async processDocument(document: Document): Promise<ProcessedChunk[]> { // 1. Chunk the document const chunks = this.embeddingService.chunkTextWithOverlap( document.content, 1000, 200 ) // 2. Generate embeddings for each chunk const embeddings = await this.embeddingService.generateBatchEmbeddings(chunks) // 3. Create processed chunks with metadata const processedChunks: ProcessedChunk[] = chunks.map((chunk, index) => ({ id: `${document.id}_chunk_${index}`, documentId: document.id, content: chunk, embedding: embeddings[index], metadata: { chunkIndex: index, totalChunks: chunks.length, startChar: index * 800, // Approximate due to overlap endChar: Math.min((index + 1) * 800 + 200, document.content.length), title: document.title, ...document.metadata, }, })) // 4. Store in vector database await this.vectorStore.upsertBatch(processedChunks) return processedChunks } async processDocumentBatch(documents: Document[]): Promise<void> { // Process documents in parallel with concurrency limit const concurrency = 5 const results: ProcessedChunk[][] = [] for (let i = 0; i < documents.length; i += concurrency) { const batch = documents.slice(i, i + concurrency) const batchResults = await Promise.all( batch.map(doc => this.processDocument(doc)) ) results.push(...batchResults) } console.log(`Processed ${documents.length} documents, ${results.flat().length} chunks`) } }
3. Semantic Search Implementation
Vector Store Abstraction
// src/vector-db/vector-store.ts export interface SearchResult { id: string score: number content: string metadata: Record<string, any> } export interface VectorStore { upsert(id: string, embedding: number[], metadata: any): Promise<void> upsertBatch(items: ProcessedChunk[]): Promise<void> search( embedding: number[], topK: number, filter?: Record<string, any> ): Promise<SearchResult[]> delete(ids: string[]): Promise<void> } // Pinecone Implementation export class PineconeVectorStore implements VectorStore { constructor( private client: PineconeClient, private indexName: string ) {} async upsert(id: string, embedding: number[], metadata: any): Promise<void> { const index = this.client.Index(this.indexName) await index.upsert({ upsertRequest: { vectors: [{ id, values: embedding, metadata, }], }, }) } async upsertBatch(items: ProcessedChunk[]): Promise<void> { const index = this.client.Index(this.indexName) const vectors = items.map(item => ({ id: item.id, values: item.embedding, metadata: { content: item.content, ...item.metadata, }, })) // Batch upsert with size limit const batchSize = 100 for (let i = 0; i < vectors.length; i += batchSize) { await index.upsert({ upsertRequest: { vectors: vectors.slice(i, i + batchSize), }, }) } } async search( embedding: number[], topK: number, filter?: Record<string, any> ): Promise<SearchResult[]> { const index = this.client.Index(this.indexName) const results = await index.query({ queryRequest: { vector: embedding, topK, includeMetadata: true, filter, }, }) return results.matches?.map(match => ({ id: match.id, score: match.score || 0, content: match.metadata?.content || '', metadata: match.metadata || {}, })) || [] } async delete(ids: string[]): Promise<void> { const index = this.client.Index(this.indexName) await index.delete1({ ids }) } } // Qdrant Implementation export class QdrantVectorStore implements VectorStore { constructor( private client: QdrantClient, private collectionName: string ) {} async upsert(id: string, embedding: number[], metadata: any): Promise<void> { await this.client.upsert(this.collectionName, { wait: true, points: [{ id, vector: embedding, payload: metadata, }], }) } async upsertBatch(items: ProcessedChunk[]): Promise<void> { const points = items.map(item => ({ id: item.id, vector: item.embedding, payload: { content: item.content, ...item.metadata, }, })) await this.client.upsert(this.collectionName, { wait: true, points, }) } async search( embedding: number[], topK: number, filter?: Record<string, any> ): Promise<SearchResult[]> { const results = await this.client.search(this.collectionName, { vector: embedding, limit: topK, filter: filter ? this.buildQdrantFilter(filter) : undefined, with_payload: true, }) return results.map(result => ({ id: result.id.toString(), score: result.score, content: result.payload?.content || '', metadata: result.payload || {}, })) } private buildQdrantFilter(filter: Record<string, any>): any { // Convert simple filter to Qdrant filter format const must: any[] = [] for (const [key, value] of Object.entries(filter)) { must.push({ key, match: { value }, }) } return { must } } async delete(ids: string[]): Promise<void> { await this.client.delete(this.collectionName, { wait: true, points: ids, }) } }
Semantic Search Service
// src/services/semantic-search.service.ts export interface SearchOptions { topK?: number threshold?: number filter?: Record<string, any> rerank?: boolean hybridAlpha?: number // For hybrid search } export class SemanticSearchService { constructor( private embeddingService: EmbeddingService, private vectorStore: VectorStore, private rerankingModel?: RerankingModel ) {} async search( query: string, options: SearchOptions = {} ): Promise<SearchResult[]> { const { topK = 10, threshold = 0.7, filter, rerank = false, } = options // 1. Generate query embedding const queryEmbedding = await this.embeddingService.generateEmbedding(query) // 2. Search vector store const results = await this.vectorStore.search( queryEmbedding, rerank ? topK * 2 : topK, // Get more results if reranking filter ) // 3. Filter by threshold const filteredResults = results.filter(r => r.score >= threshold) // 4. Optional reranking if (rerank && this.rerankingModel && filteredResults.length > 0) { const rerankedResults = await this.rerankingModel.rerank( query, filteredResults.map(r => r.content) ) // Map back to original results with new scores return rerankedResults .map((reranked, index) => ({ ...filteredResults[index], score: reranked.score, })) .slice(0, topK) } return filteredResults.slice(0, topK) } async searchWithContext( query: string, conversationHistory: Message[], options: SearchOptions = {} ): Promise<SearchResult[]> { // Enhance query with conversation context const contextualQuery = this.buildContextualQuery(query, conversationHistory) return this.search(contextualQuery, options) } private buildContextualQuery( query: string, history: Message[] ): string { // Take last few messages for context const recentHistory = history.slice(-3) const context = recentHistory .map(m => `${m.role}: ${m.content}`) .join('\n') return `Context:\n${context}\n\nCurrent query: ${query}` } // Multi-query search for better recall async multiQuerySearch( query: string, options: SearchOptions = {} ): Promise<SearchResult[]> { // Generate query variations const queries = await this.generateQueryVariations(query) // Search with each query const allResults = await Promise.all( queries.map(q => this.search(q, { ...options, topK: 5 })) ) // Deduplicate and combine results const resultMap = new Map<string, SearchResult>() for (const results of allResults) { for (const result of results) { const existing = resultMap.get(result.id) if (!existing || result.score > existing.score) { resultMap.set(result.id, result) } } } // Sort by score and return top K return Array.from(resultMap.values()) .sort((a, b) => b.score - a.score) .slice(0, options.topK || 10) } private async generateQueryVariations(query: string): Promise<string[]> { // Use LLM to generate query variations const prompt = `Generate 3 alternative phrasings of this query: "${query}" Alternatives: 1.` // Implementation would call LLM here // For now, return simple variations return [ query, `What is ${query}`, `Explain ${query}`, `Information about ${query}`, ] } }
4. RAG Architecture
RAG Service Implementation
// src/services/rag.service.ts export interface RAGOptions { searchOptions?: SearchOptions contextWindow?: number systemPrompt?: string temperature?: number model?: string } export class RAGService { constructor( private searchService: SemanticSearchService, private llmService: LLMService, private cache: CacheService ) {} async generate( query: string, options: RAGOptions = {} ): Promise<{ answer: string sources: SearchResult[] cached: boolean }> { const { searchOptions = {}, contextWindow = 3000, systemPrompt, temperature = 0.7, model = 'gpt-4-turbo-preview', } = options // Check cache first const cacheKey = this.generateCacheKey(query, options) const cached = await this.cache.get(cacheKey) if (cached) { return { ...cached, cached: true } } // 1. Retrieve relevant documents const searchResults = await this.searchService.search(query, searchOptions) if (searchResults.length === 0) { return { answer: "I couldn't find relevant information to answer your question.", sources: [], cached: false, } } // 2. Build context from search results const context = this.buildContext(searchResults, contextWindow) // 3. Create prompt const prompt = this.buildRAGPrompt(query, context, systemPrompt) // 4. Generate answer using LLM const answer = await this.llmService.generate({ messages: [ { role: 'system', content: prompt.system }, { role: 'user', content: prompt.user }, ], temperature, model, }) // 5. Extract citations from answer const citedSources = this.extractCitations(answer, searchResults) // 6. Cache the result const result = { answer, sources: citedSources, cached: false, } await this.cache.set(cacheKey, result, 3600) // 1 hour TTL return result } async generateStreaming( query: string, options: RAGOptions = {}, onToken: (token: string) => void ): Promise<{ sources: SearchResult[] fullAnswer: string }> { // Retrieve context (same as non-streaming) const searchResults = await this.searchService.search( query, options.searchOptions ) const context = this.buildContext( searchResults, options.contextWindow || 3000 ) const prompt = this.buildRAGPrompt( query, context, options.systemPrompt ) // Stream the response let fullAnswer = '' await this.llmService.generateStream({ messages: [ { role: 'system', content: prompt.system }, { role: 'user', content: prompt.user }, ], temperature: options.temperature || 0.7, model: options.model || 'gpt-4-turbo-preview', onToken: (token) => { fullAnswer += token onToken(token) }, }) return { sources: searchResults, fullAnswer, } } private buildContext( results: SearchResult[], maxTokens: number ): string { let context = '' let currentTokens = 0 for (const [index, result] of results.entries()) { const chunk = `[Source ${index + 1}] ${result.content}\n\n` const chunkTokens = this.estimateTokens(chunk) if (currentTokens + chunkTokens > maxTokens) { break } context += chunk currentTokens += chunkTokens } return context.trim() } private buildRAGPrompt( query: string, context: string, customSystemPrompt?: string ): { system: string; user: string } { const system = customSystemPrompt || `You are a helpful AI assistant powered by ParrotRouter. Your task is to answer questions based on the provided context. Always cite your sources using [Source N] notation. If the context doesn't contain relevant information, say so clearly. Be concise and accurate in your responses.` const user = `Context: ${context} Question: ${query} Please provide a comprehensive answer based on the context above. Include source citations.` return { system, user } } private extractCitations( answer: string, sources: SearchResult[] ): SearchResult[] { const citationPattern = /\[Source (\d+)\]/g const citations = new Set<number>() let match while ((match = citationPattern.exec(answer)) !== null) { const sourceIndex = parseInt(match[1], 10) - 1 if (sourceIndex >= 0 && sourceIndex < sources.length) { citations.add(sourceIndex) } } return Array.from(citations).map(index => sources[index]) } private generateCacheKey(query: string, options: RAGOptions): string { const params = { query, model: options.model, temperature: options.temperature, searchOptions: options.searchOptions, } return `rag:${JSON.stringify(params)}` } private estimateTokens(text: string): number { // Rough estimation: 1 token ≈ 4 characters return Math.ceil(text.length / 4) } } // Advanced RAG with conversation memory export class ConversationalRAGService extends RAGService { async generateWithMemory( query: string, conversationId: string, options: RAGOptions = {} ): Promise<{ answer: string sources: SearchResult[] conversationId: string }> { // Retrieve conversation history const history = await this.getConversationHistory(conversationId) // Search with context const searchResults = await this.searchService.searchWithContext( query, history, options.searchOptions ) // Build prompt with history const context = this.buildContext(searchResults, options.contextWindow || 3000) const prompt = this.buildConversationalPrompt(query, context, history) // Generate response const answer = await this.llmService.generate({ messages: [ { role: 'system', content: prompt.system }, ...history.map(msg => ({ role: msg.role as 'user' | 'assistant', content: msg.content, })), { role: 'user', content: query }, ], temperature: options.temperature || 0.7, model: options.model || 'gpt-4-turbo-preview', }) // Store in conversation history await this.addToConversation(conversationId, query, answer) return { answer, sources: searchResults, conversationId, } } private buildConversationalPrompt( query: string, context: string, history: Message[] ): { system: string } { return { system: `You are a helpful AI assistant powered by ParrotRouter. You have access to a knowledge base and can answer questions based on the provided context. Maintain conversation continuity and reference previous messages when relevant. Always cite sources using [Source N] notation. Current context from knowledge base: ${context}`, } } private async getConversationHistory( conversationId: string ): Promise<Message[]> { // Implementation would fetch from database return [] } private async addToConversation( conversationId: string, query: string, answer: string ): Promise<void> { // Implementation would store in database } }
RAG Best Practices
- • Chunk documents with appropriate overlap (10-20%)
- • Use metadata filtering to improve relevance
- • Implement query expansion for better recall
- • Cache frequent queries to reduce costs
- • Monitor retrieval quality and adjust thresholds
5. Hybrid Search Strategies
Hybrid Search Implementation
// src/services/hybrid-search.service.ts export interface HybridSearchOptions extends SearchOptions { alpha?: number // Balance between keyword (0) and semantic (1) search keywordBoost?: number bm25Config?: { k1?: number b?: number } } export class HybridSearchService { constructor( private semanticSearch: SemanticSearchService, private keywordSearch: KeywordSearchService, private vectorStore: VectorStore ) {} async hybridSearch( query: string, options: HybridSearchOptions = {} ): Promise<SearchResult[]> { const { alpha = 0.5, topK = 10, filter, keywordBoost = 1.0, } = options // Perform both searches in parallel const [semanticResults, keywordResults] = await Promise.all([ this.semanticSearch.search(query, { topK: topK * 2, filter }), this.keywordSearch.search(query, { topK: topK * 2, filter }), ]) // Combine and rerank results const combinedResults = this.fuseResults( semanticResults, keywordResults, alpha, keywordBoost ) return combinedResults.slice(0, topK) } private fuseResults( semanticResults: SearchResult[], keywordResults: SearchResult[], alpha: number, keywordBoost: number ): SearchResult[] { const scoreMap = new Map<string, { result: SearchResult semanticScore: number keywordScore: number }>() // Process semantic results for (const result of semanticResults) { scoreMap.set(result.id, { result, semanticScore: result.score, keywordScore: 0, }) } // Process keyword results for (const result of keywordResults) { const existing = scoreMap.get(result.id) if (existing) { existing.keywordScore = result.score * keywordBoost } else { scoreMap.set(result.id, { result, semanticScore: 0, keywordScore: result.score * keywordBoost, }) } } // Calculate hybrid scores const hybridResults: SearchResult[] = [] for (const { result, semanticScore, keywordScore } of scoreMap.values()) { const hybridScore = alpha * semanticScore + (1 - alpha) * keywordScore hybridResults.push({ ...result, score: hybridScore, metadata: { ...result.metadata, semanticScore, keywordScore, hybridScore, }, }) } // Sort by hybrid score return hybridResults.sort((a, b) => b.score - a.score) } } // Weaviate Hybrid Search (Native Support) export class WeaviateHybridSearch { constructor(private client: any) {} async search( query: string, className: string, options: HybridSearchOptions = {} ): Promise<SearchResult[]> { const { alpha = 0.5, topK = 10, filter, } = options const result = await this.client.graphql .get() .withClassName(className) .withHybrid({ query, alpha, // 0 = keyword only, 1 = vector only }) .withLimit(topK) .withWhere(filter ? this.buildWhereFilter(filter) : undefined) .withFields('_additional { id score } content metadata') .do() return this.transformResults(result.data.Get[className]) } private buildWhereFilter(filter: Record<string, any>): any { // Transform to Weaviate where filter format const operator = 'And' const operands = Object.entries(filter).map(([path, value]) => ({ path: [path], operator: 'Equal', value, })) return { operator, operands } } private transformResults(weaviateResults: any[]): SearchResult[] { return weaviateResults.map(result => ({ id: result._additional.id, score: result._additional.score, content: result.content, metadata: result.metadata || {}, })) } }
6. Database Schema Design
PostgreSQL with pgvector Schema
-- Enable pgvector extension CREATE EXTENSION IF NOT EXISTS vector; -- Documents table CREATE TABLE documents ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), title VARCHAR(255) NOT NULL, source VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, metadata JSONB DEFAULT '{}'::jsonb ); -- Document chunks with embeddings CREATE TABLE document_chunks ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), document_id UUID REFERENCES documents(id) ON DELETE CASCADE, chunk_index INTEGER NOT NULL, content TEXT NOT NULL, embedding vector(1536), -- OpenAI embedding dimension char_start INTEGER, char_end INTEGER, metadata JSONB DEFAULT '{}'::jsonb, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(document_id, chunk_index) ); -- Create indexes for vector similarity search CREATE INDEX document_chunks_embedding_idx ON document_chunks USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100); -- Conversations table CREATE TABLE conversations ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id UUID NOT NULL, title VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, metadata JSONB DEFAULT '{}'::jsonb ); -- Messages table CREATE TABLE messages ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE, role VARCHAR(20) NOT NULL CHECK (role IN ('user', 'assistant', 'system')), content TEXT NOT NULL, embedding vector(1536), tokens_used INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, metadata JSONB DEFAULT '{}'::jsonb ); -- Search history for analytics CREATE TABLE search_queries ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id UUID, query TEXT NOT NULL, query_embedding vector(1536), results_count INTEGER, clicked_results JSONB DEFAULT '[]'::jsonb, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- Cached embeddings CREATE TABLE embedding_cache ( text_hash VARCHAR(64) PRIMARY KEY, text TEXT NOT NULL, embedding vector(1536) NOT NULL, model VARCHAR(50) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- Function for semantic search CREATE OR REPLACE FUNCTION search_documents( query_embedding vector(1536), match_threshold float DEFAULT 0.7, match_count int DEFAULT 10 ) RETURNS TABLE ( chunk_id UUID, document_id UUID, content TEXT, similarity float, metadata JSONB ) LANGUAGE plpgsql AS $$ BEGIN RETURN QUERY SELECT dc.id, dc.document_id, dc.content, 1 - (dc.embedding <=> query_embedding) AS similarity, dc.metadata FROM document_chunks dc WHERE 1 - (dc.embedding <=> query_embedding) > match_threshold ORDER BY dc.embedding <=> query_embedding LIMIT match_count; END; $$; -- Hybrid search function (vector + full-text) CREATE OR REPLACE FUNCTION hybrid_search( query_text TEXT, query_embedding vector(1536), alpha float DEFAULT 0.5, match_count int DEFAULT 10 ) RETURNS TABLE ( chunk_id UUID, document_id UUID, content TEXT, vector_similarity float, text_rank float, hybrid_score float, metadata JSONB ) LANGUAGE plpgsql AS $$ BEGIN RETURN QUERY WITH vector_search AS ( SELECT dc.id, dc.document_id, dc.content, 1 - (dc.embedding <=> query_embedding) AS similarity, dc.metadata FROM document_chunks dc ORDER BY dc.embedding <=> query_embedding LIMIT match_count * 2 ), text_search AS ( SELECT dc.id, ts_rank(to_tsvector('english', dc.content), plainto_tsquery('english', query_text)) AS rank FROM document_chunks dc WHERE to_tsvector('english', dc.content) @@ plainto_tsquery('english', query_text) ORDER BY rank DESC LIMIT match_count * 2 ) SELECT DISTINCT vs.id, vs.document_id, vs.content, vs.similarity, COALESCE(ts.rank, 0) AS text_rank, (alpha * vs.similarity + (1 - alpha) * COALESCE(ts.rank, 0)) AS hybrid_score, vs.metadata FROM vector_search vs LEFT JOIN text_search ts ON vs.id = ts.id ORDER BY hybrid_score DESC LIMIT match_count; END; $$;
TypeScript Data Access Layer
// src/dal/pgvector-dal.ts import { Pool } from 'pg' import { v4 as uuidv4 } from 'uuid' export class PgVectorDAL { constructor(private pool: Pool) {} async initializeSchema(): Promise<void> { // Run schema creation SQL const schemaSQL = `...` // Schema from above await this.pool.query(schemaSQL) } async insertDocument( title: string, source: string, metadata: Record<string, any> = {} ): Promise<string> { const result = await this.pool.query( `INSERT INTO documents (title, source, metadata) VALUES ($1, $2, $3) RETURNING id`, [title, source, JSON.stringify(metadata)] ) return result.rows[0].id } async insertChunks(chunks: ProcessedChunk[]): Promise<void> { const values = chunks.map(chunk => [ chunk.documentId, chunk.metadata.chunkIndex, chunk.content, JSON.stringify(chunk.embedding), chunk.metadata.startChar, chunk.metadata.endChar, JSON.stringify(chunk.metadata), ]) // Bulk insert using COPY for performance const copyStream = this.pool.query( `COPY document_chunks ( document_id, chunk_index, content, embedding, char_start, char_end, metadata ) FROM STDIN WITH (FORMAT CSV)` ) for (const row of values) { copyStream.write(row.join(',') + '\n') } copyStream.end() } async semanticSearch( embedding: number[], threshold: number = 0.7, limit: number = 10 ): Promise<SearchResult[]> { const result = await this.pool.query( 'SELECT * FROM search_documents($1, $2, $3)', [JSON.stringify(embedding), threshold, limit] ) return result.rows.map(row => ({ id: row.chunk_id, score: row.similarity, content: row.content, metadata: row.metadata, })) } async hybridSearch( query: string, embedding: number[], alpha: number = 0.5, limit: number = 10 ): Promise<SearchResult[]> { const result = await this.pool.query( 'SELECT * FROM hybrid_search($1, $2, $3, $4)', [query, JSON.stringify(embedding), alpha, limit] ) return result.rows.map(row => ({ id: row.chunk_id, score: row.hybrid_score, content: row.content, metadata: { ...row.metadata, vectorSimilarity: row.vector_similarity, textRank: row.text_rank, }, })) } // Embedding cache operations async getCachedEmbedding( text: string, model: string ): Promise<number[] | null> { const hash = this.hashText(text) const result = await this.pool.query( `UPDATE embedding_cache SET last_accessed = CURRENT_TIMESTAMP WHERE text_hash = $1 AND model = $2 RETURNING embedding`, [hash, model] ) if (result.rows.length > 0) { return JSON.parse(result.rows[0].embedding) } return null } async setCachedEmbedding( text: string, embedding: number[], model: string ): Promise<void> { const hash = this.hashText(text) await this.pool.query( `INSERT INTO embedding_cache (text_hash, text, embedding, model) VALUES ($1, $2, $3, $4) ON CONFLICT (text_hash) DO UPDATE SET last_accessed = CURRENT_TIMESTAMP`, [hash, text, JSON.stringify(embedding), model] ) } private hashText(text: string): string { // Simple hash for demo - use crypto.createHash in production return Buffer.from(text).toString('base64').substring(0, 64) } }
7. Caching Strategies
Multi-Level Caching
// src/services/cache.service.ts import Redis from 'ioredis' import LRU from 'lru-cache' export class MultiLevelCache { private memoryCache: LRU<string, any> private redisCache: Redis constructor(redis: Redis, options?: { maxMemoryItems?: number memoryTTL?: number }) { this.redisCache = redis this.memoryCache = new LRU({ max: options?.maxMemoryItems || 1000, ttl: options?.memoryTTL || 1000 * 60 * 5, // 5 minutes }) } async get<T>(key: string): Promise<T | null> { // Check memory cache first const memoryResult = this.memoryCache.get(key) if (memoryResult !== undefined) { return memoryResult } // Check Redis cache const redisResult = await this.redisCache.get(key) if (redisResult) { const parsed = JSON.parse(redisResult) // Populate memory cache this.memoryCache.set(key, parsed) return parsed } return null } async set<T>( key: string, value: T, ttlSeconds?: number ): Promise<void> { // Set in both caches this.memoryCache.set(key, value) const serialized = JSON.stringify(value) if (ttlSeconds) { await this.redisCache.setex(key, ttlSeconds, serialized) } else { await this.redisCache.set(key, serialized) } } async invalidate(pattern: string): Promise<void> { // Clear from memory cache for (const key of this.memoryCache.keys()) { if (key.includes(pattern)) { this.memoryCache.delete(key) } } // Clear from Redis const keys = await this.redisCache.keys(pattern) if (keys.length > 0) { await this.redisCache.del(...keys) } } } // Embedding cache service export class EmbeddingCacheService { constructor( private cache: MultiLevelCache, private dal: PgVectorDAL ) {} async getEmbedding( text: string, model: string ): Promise<number[] | null> { const cacheKey = `embedding:${model}:${this.hashText(text)}` // Check multi-level cache const cached = await this.cache.get<number[]>(cacheKey) if (cached) { return cached } // Check persistent storage const stored = await this.dal.getCachedEmbedding(text, model) if (stored) { // Populate cache await this.cache.set(cacheKey, stored, 3600) return stored } return null } async setEmbedding( text: string, embedding: number[], model: string ): Promise<void> { const cacheKey = `embedding:${model}:${this.hashText(text)}` // Store in all levels await Promise.all([ this.cache.set(cacheKey, embedding, 3600), this.dal.setCachedEmbedding(text, embedding, model), ]) } private hashText(text: string): string { return Buffer.from(text).toString('base64').substring(0, 32) } } // Query result caching export class QueryCacheService { constructor( private cache: MultiLevelCache, private ttlSeconds: number = 3600 ) {} async getCachedResults( query: string, filters?: Record<string, any> ): Promise<SearchResult[] | null> { const cacheKey = this.generateQueryCacheKey(query, filters) return this.cache.get<SearchResult[]>(cacheKey) } async setCachedResults( query: string, results: SearchResult[], filters?: Record<string, any> ): Promise<void> { const cacheKey = this.generateQueryCacheKey(query, filters) await this.cache.set(cacheKey, results, this.ttlSeconds) } private generateQueryCacheKey( query: string, filters?: Record<string, any> ): string { const params = { q: query, f: filters || {}, } return `query:${JSON.stringify(params)}` } }
8. Performance Optimization
Batch Processing Optimization
// src/optimization/batch-processor.ts export class BatchProcessor { private queue: Map<string, any[]> = new Map() private timers: Map<string, NodeJS.Timeout> = new Map() constructor( private batchSize: number = 100, private flushInterval: number = 1000 ) {} async addToBatch<T>( batchKey: string, item: T, processor: (items: T[]) => Promise<void> ): Promise<void> { // Get or create batch if (!this.queue.has(batchKey)) { this.queue.set(batchKey, []) } const batch = this.queue.get(batchKey)! batch.push(item) // Process if batch is full if (batch.length >= this.batchSize) { await this.flush(batchKey, processor) } else { // Set timer for time-based flush this.setFlushTimer(batchKey, processor) } } private setFlushTimer<T>( batchKey: string, processor: (items: T[]) => Promise<void> ): void { // Clear existing timer const existingTimer = this.timers.get(batchKey) if (existingTimer) { clearTimeout(existingTimer) } // Set new timer const timer = setTimeout(() => { this.flush(batchKey, processor) }, this.flushInterval) this.timers.set(batchKey, timer) } private async flush<T>( batchKey: string, processor: (items: T[]) => Promise<void> ): Promise<void> { const batch = this.queue.get(batchKey) if (!batch || batch.length === 0) { return } // Clear batch and timer this.queue.delete(batchKey) const timer = this.timers.get(batchKey) if (timer) { clearTimeout(timer) this.timers.delete(batchKey) } // Process batch await processor(batch) } } // Parallel processing for large datasets export class ParallelProcessor { constructor( private concurrency: number = 5 ) {} async processInParallel<T, R>( items: T[], processor: (item: T) => Promise<R>, onProgress?: (completed: number, total: number) => void ): Promise<R[]> { const results: R[] = new Array(items.length) let completed = 0 // Process in chunks for (let i = 0; i < items.length; i += this.concurrency) { const chunk = items.slice(i, i + this.concurrency) const chunkResults = await Promise.all( chunk.map((item, index) => processor(item)) ) // Store results in correct positions for (let j = 0; j < chunkResults.length; j++) { results[i + j] = chunkResults[j] } completed += chunk.length onProgress?.(completed, items.length) } return results } } // Index optimization service export class IndexOptimizationService { constructor( private vectorStore: VectorStore, private monitoring: MonitoringService ) {} async optimizeIndex(collectionName: string): Promise<void> { const metrics = await this.analyzeIndexPerformance(collectionName) if (metrics.avgQueryTime > 100) { // Re-index with better parameters await this.reindexCollection(collectionName, { nlist: Math.ceil(Math.sqrt(metrics.totalVectors)), nprobe: 10, }) } if (metrics.fragmentationRatio > 0.3) { // Compact index await this.compactIndex(collectionName) } } private async analyzeIndexPerformance( collectionName: string ): Promise<any> { // Analyze query performance const queryMetrics = await this.monitoring.getQueryMetrics(collectionName) return { avgQueryTime: queryMetrics.avgDuration, totalVectors: queryMetrics.totalVectors, fragmentationRatio: queryMetrics.fragmentation, } } private async reindexCollection( collectionName: string, params: any ): Promise<void> { // Implementation depends on vector database console.log(`Reindexing ${collectionName} with params:`, params) } private async compactIndex(collectionName: string): Promise<void> { // Implementation depends on vector database console.log(`Compacting index for ${collectionName}`) } }
Performance Tips
- • Use appropriate index types (IVF, HNSW) based on dataset size
- • Implement query result caching for repeated searches
- • Batch embedding generation to reduce API calls
- • Monitor and optimize vector dimensions if possible
- • Use async/parallel processing for large datasets
9. Cost Optimization
Cost Management Service
// src/services/cost-management.service.ts export interface CostMetrics { embeddingCosts: number storageCosts: number queryCosts: number totalCosts: number } export class CostManagementService { private costPerEmbedding = 0.0001 // $0.0001 per embedding private costPerQuery = 0.00002 // $0.00002 per query private costPerGBMonth = 0.25 // $0.25 per GB/month for storage constructor( private monitoring: MonitoringService, private cache: CacheService ) {} async calculateMonthlyCosts(): Promise<CostMetrics> { const usage = await this.monitoring.getMonthlyUsage() const embeddingCosts = usage.embeddingsGenerated * this.costPerEmbedding const queryCosts = usage.queriesProcessed * this.costPerQuery const storageCosts = (usage.storageGB * this.costPerGBMonth) return { embeddingCosts, storageCosts, queryCosts, totalCosts: embeddingCosts + storageCosts + queryCosts, } } async optimizeCosts(): Promise<{ recommendations: string[] potentialSavings: number }> { const recommendations: string[] = [] let potentialSavings = 0 // Check cache hit rate const cacheStats = await this.cache.getStats() if (cacheStats.hitRate < 0.5) { recommendations.push( 'Improve cache hit rate by increasing cache size or TTL' ) potentialSavings += this.estimateCacheSavings(cacheStats) } // Check for duplicate embeddings const duplicates = await this.findDuplicateEmbeddings() if (duplicates > 100) { recommendations.push( `Remove ${duplicates} duplicate embeddings to save storage` ) potentialSavings += duplicates * 0.001 // Rough estimate } // Check embedding model usage const modelUsage = await this.monitoring.getModelUsage() if (modelUsage['text-embedding-3-large'] > modelUsage['text-embedding-3-small'] * 0.5) { recommendations.push( 'Consider using smaller embedding models for non-critical content' ) potentialSavings += this.estimateModelSavings(modelUsage) } return { recommendations, potentialSavings } } private estimateCacheSavings(stats: any): number { const missedQueries = stats.misses const potentialCacheHits = missedQueries * 0.3 // Assume 30% could be cached return potentialCacheHits * this.costPerEmbedding } private estimateModelSavings(usage: any): number { const largeModelCost = 0.00013 const smallModelCost = 0.00002 const potentialSwitches = usage['text-embedding-3-large'] * 0.7 return potentialSwitches * (largeModelCost - smallModelCost) } async findDuplicateEmbeddings(): Promise<number> { // Implementation would query database for duplicates return 0 } } // Usage tracking export class UsageTracker { constructor(private redis: Redis) {} async trackEmbedding( model: string, inputTokens: number ): Promise<void> { const date = new Date().toISOString().split('T')[0] const key = `usage:embeddings:${date}` await this.redis.hincrby(key, model, 1) await this.redis.hincrby(key, `${model}:tokens`, inputTokens) await this.redis.expire(key, 86400 * 90) // 90 days retention } async trackQuery( queryType: string, resultCount: number ): Promise<void> { const date = new Date().toISOString().split('T')[0] const key = `usage:queries:${date}` await this.redis.hincrby(key, queryType, 1) await this.redis.hincrby(key, `${queryType}:results`, resultCount) await this.redis.expire(key, 86400 * 90) } async getUsageReport( startDate: string, endDate: string ): Promise<any> { const report = { embeddings: {}, queries: {}, totalCost: 0, } // Aggregate usage data const dates = this.getDateRange(startDate, endDate) for (const date of dates) { // Get embedding usage const embeddingKey = `usage:embeddings:${date}` const embeddings = await this.redis.hgetall(embeddingKey) this.aggregateUsage(report.embeddings, embeddings) // Get query usage const queryKey = `usage:queries:${date}` const queries = await this.redis.hgetall(queryKey) this.aggregateUsage(report.queries, queries) } // Calculate costs report.totalCost = this.calculateTotalCost(report) return report } private getDateRange(start: string, end: string): string[] { const dates: string[] = [] const current = new Date(start) const endDate = new Date(end) while (current <= endDate) { dates.push(current.toISOString().split('T')[0]) current.setDate(current.getDate() + 1) } return dates } private aggregateUsage(target: any, source: any): void { for (const [key, value] of Object.entries(source)) { target[key] = (target[key] || 0) + parseInt(value as string, 10) } } private calculateTotalCost(usage: any): number { // Calculate based on model pricing const embeddingCosts = { 'text-embedding-3-small': 0.00002, 'text-embedding-3-large': 0.00013, 'text-embedding-ada-002': 0.0001, } let total = 0 for (const [model, count] of Object.entries(usage.embeddings)) { if (embeddingCosts[model]) { total += embeddingCosts[model] * (count as number) } } return total } }
10. Production Deployment
Deployment Configuration
# docker-compose.yml version: '3.8' services: # PostgreSQL with pgvector postgres: image: ankane/pgvector:latest environment: POSTGRES_DB: vectordb POSTGRES_USER: vectoruser POSTGRES_PASSWORD: vectorpass volumes: - postgres_data:/var/lib/postgresql/data ports: - "5432:5432" healthcheck: test: ["CMD-SHELL", "pg_isready -U vectoruser"] interval: 10s timeout: 5s retries: 5 # Qdrant vector database qdrant: image: qdrant/qdrant:latest ports: - "6333:6333" - "6334:6334" volumes: - qdrant_data:/qdrant/storage environment: QDRANT__SERVICE__GRPC_PORT: 6334 # Weaviate vector database weaviate: image: semitechnologies/weaviate:latest ports: - "8080:8080" environment: QUERY_DEFAULTS_LIMIT: 25 AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true' PERSISTENCE_DATA_PATH: '/var/lib/weaviate' DEFAULT_VECTORIZER_MODULE: 'text2vec-openai' ENABLE_MODULES: 'text2vec-openai' OPENAI_APIKEY: ${OPENAI_API_KEY} volumes: - weaviate_data:/var/lib/weaviate # Redis for caching redis: image: redis:7-alpine command: redis-server --appendonly yes ports: - "6379:6379" volumes: - redis_data:/data # Application app: build: . environment: DATABASE_URL: postgresql://vectoruser:vectorpass@postgres:5432/vectordb QDRANT_URL: http://qdrant:6333 WEAVIATE_URL: http://weaviate:8080 REDIS_URL: redis://redis:6379 OPENAI_API_KEY: ${OPENAI_API_KEY} depends_on: - postgres - qdrant - weaviate - redis ports: - "3000:3000" volumes: postgres_data: qdrant_data: weaviate_data: redis_data:
Kubernetes Deployment
# k8s/vector-db-deployment.yaml apiVersion: apps/v1 kind: StatefulSet metadata: name: qdrant spec: serviceName: qdrant replicas: 3 selector: matchLabels: app: qdrant template: metadata: labels: app: qdrant spec: containers: - name: qdrant image: qdrant/qdrant:latest ports: - containerPort: 6333 name: http - containerPort: 6334 name: grpc volumeMounts: - name: storage mountPath: /qdrant/storage resources: requests: memory: "2Gi" cpu: "1" limits: memory: "4Gi" cpu: "2" livenessProbe: httpGet: path: / port: 6333 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /readyz port: 6333 initialDelaySeconds: 5 periodSeconds: 5 volumeClaimTemplates: - metadata: name: storage spec: accessModes: ["ReadWriteOnce"] resources: requests: storage: 100Gi --- apiVersion: v1 kind: Service metadata: name: qdrant spec: selector: app: qdrant clusterIP: None ports: - port: 6333 name: http - port: 6334 name: grpc
Monitoring Setup
// src/monitoring/vector-db-monitoring.ts import { Registry, Histogram, Counter, Gauge } from 'prom-client' export class VectorDBMonitoring { private registry: Registry // Metrics private embeddingLatency: Histogram<string> private searchLatency: Histogram<string> private searchResultCount: Histogram<string> private indexSize: Gauge<string> private cacheHitRate: Gauge<string> constructor() { this.registry = new Registry() this.embeddingLatency = new Histogram({ name: 'embedding_generation_duration_seconds', help: 'Time to generate embeddings', labelNames: ['model'], buckets: [0.1, 0.5, 1, 2, 5], registers: [this.registry], }) this.searchLatency = new Histogram({ name: 'vector_search_duration_seconds', help: 'Time to perform vector search', labelNames: ['index', 'type'], buckets: [0.01, 0.05, 0.1, 0.5, 1], registers: [this.registry], }) this.searchResultCount = new Histogram({ name: 'search_result_count', help: 'Number of results returned', labelNames: ['index'], buckets: [0, 1, 5, 10, 20, 50, 100], registers: [this.registry], }) this.indexSize = new Gauge({ name: 'vector_index_size', help: 'Number of vectors in index', labelNames: ['index'], registers: [this.registry], }) this.cacheHitRate = new Gauge({ name: 'embedding_cache_hit_rate', help: 'Cache hit rate for embeddings', registers: [this.registry], }) } recordEmbeddingGeneration(model: string, duration: number) { this.embeddingLatency.observe({ model }, duration / 1000) } recordSearch( index: string, type: 'semantic' | 'hybrid', duration: number, resultCount: number ) { this.searchLatency.observe({ index, type }, duration / 1000) this.searchResultCount.observe({ index }, resultCount) } updateIndexSize(index: string, size: number) { this.indexSize.set({ index }, size) } updateCacheHitRate(rate: number) { this.cacheHitRate.set(rate) } async getMetrics(): Promise<string> { return this.registry.metrics() } }
✓ Production Checklist
- ☐ Configure index parameters for your dataset size
- ☐ Implement backup and recovery procedures
- ☐ Set up monitoring and alerting
- ☐ Configure auto-scaling for vector databases
- ☐ Implement access control and encryption
- ☐ Set up cost monitoring and alerts
- ☐ Create runbooks for common operations
- ☐ Test disaster recovery procedures
- ☐ Document embedding model versions
- ☐ Plan for index maintenance windows
Complete RAG System Example
// Complete RAG system with all components import { config } from './config' import { createPineconeClient, createQdrantClient } from './vector-db/clients' import { EmbeddingService } from './services/embedding.service' import { SemanticSearchService } from './services/semantic-search.service' import { RAGService } from './services/rag.service' import { MultiLevelCache } from './services/cache.service' import { DocumentProcessor } from './pipelines/document-processor' import { VectorDBMonitoring } from './monitoring/vector-db-monitoring' import Redis from 'ioredis' async function initializeRAGSystem() { // Initialize clients const redis = new Redis(config.redisUrl) const pinecone = await createPineconeClient() const vectorStore = new PineconeVectorStore(pinecone, config.pineconeIndex) // Initialize services const cache = new MultiLevelCache(redis) const embeddingService = new EmbeddingService() const searchService = new SemanticSearchService( embeddingService, vectorStore ) const ragService = new RAGService( searchService, llmService, cache ) // Initialize monitoring const monitoring = new VectorDBMonitoring() // Document processing pipeline const processor = new DocumentProcessor( embeddingService, vectorStore ) return { ragService, processor, monitoring, // Process and index documents async indexDocuments(documents: Document[]) { console.log(`Indexing ${documents.length} documents...`) const startTime = Date.now() await processor.processDocumentBatch(documents) const duration = Date.now() - startTime monitoring.recordEmbeddingGeneration( 'text-embedding-3-small', duration ) console.log(`Indexed in ${duration}ms`) }, // Perform RAG query async query(question: string, options?: RAGOptions) { const startTime = Date.now() const result = await ragService.generate(question, options) const duration = Date.now() - startTime monitoring.recordSearch( config.pineconeIndex, 'semantic', duration, result.sources.length ) return result }, // Get system metrics async getMetrics() { return monitoring.getMetrics() }, } } // Usage example async function main() { const rag = await initializeRAGSystem() // Index some documents await rag.indexDocuments([ { id: '1', title: 'Introduction to Vector Databases', content: 'Vector databases are specialized systems...', metadata: { category: 'tutorial' }, }, ]) // Query the system const result = await rag.query( 'What are vector databases and how do they work?' ) console.log('Answer:', result.answer) console.log('Sources:', result.sources) } main().catch(console.error)
References & Citations
[1] Vector Databases and LLMs: Better Together - Instaclustr
[2] When Large Language Models Meet Vector Databases: A Survey - arXiv
[3] Integrating Vector Databases with LLMs: A Hands-On Guide - JFrog
[4] Building LLM Applications With Vector Databases - Neptune.ai
[5] The 4 Best Vector Database Options for Your LLM Projects - Advancing Analytics
Ready to Build RAG Systems?
Implement powerful retrieval-augmented generation with vector databases using ParrotRouter's unified API.
References
- [1] AWS. "Lambda Documentation" (2024)
- [2] Vercel. "Streaming Responses" (2024)
- [3] GitHub. "OpenAI Node.js Library" (2024)