Quick Start
Set up a vector database for your LLM application:
# Python environment pip install pinecone-client weaviate-client qdrant-client pgvector pip install openai sentence-transformers numpy # Node.js environment npm install @pinecone-database/pinecone weaviate-ts-client npm install @qdrant/js-client-rest openai
1. Vector Database Overview
Popular Vector Databases Comparison
| Database | Type | Best For | Key Features |
|---|---|---|---|
| Pinecone | Managed SaaS | Production at scale | Fully managed, auto-scaling, metadata filtering |
| Weaviate | Open Source | Hybrid search | GraphQL API, modules system, built-in vectorization |
| Qdrant | Open Source | High performance | Rust-based, advanced filtering, distributed |
| pgvector | PostgreSQL Extension | Existing Postgres users | SQL integration, ACID compliance, joins with relational data |
Vector Database Client Setup
// src/vector-db/clients.ts
import { PineconeClient } from '@pinecone-database/pinecone'
import { QdrantClient } from '@qdrant/js-client-rest'
import weaviate from 'weaviate-ts-client'
import { Pool } from 'pg'
// Pinecone Client
export async function createPineconeClient() {
const pinecone = new PineconeClient()
await pinecone.init({
apiKey: process.env.PINECONE_API_KEY!,
environment: process.env.PINECONE_ENVIRONMENT!,
})
return pinecone
}
// Qdrant Client
export function createQdrantClient() {
return new QdrantClient({
url: process.env.QDRANT_URL || 'http://localhost:6333',
apiKey: process.env.QDRANT_API_KEY,
})
}
// Weaviate Client
export function createWeaviateClient() {
return weaviate.client({
scheme: 'http',
host: process.env.WEAVIATE_HOST || 'localhost:8080',
apiKey: process.env.WEAVIATE_API_KEY
? new weaviate.ApiKey(process.env.WEAVIATE_API_KEY)
: undefined,
})
}
// PostgreSQL with pgvector
export function createPgVectorClient() {
return new Pool({
connectionString: process.env.DATABASE_URL,
})
}2. Embedding Generation & Storage
Embedding Service
// src/services/embedding.service.ts
import { OpenAI } from 'openai'
import { encode } from 'gpt-3-encoder'
export interface EmbeddingOptions {
model?: string
batchSize?: number
dimensions?: number
}
export class EmbeddingService {
private openai: OpenAI
private cache = new Map<string, number[]>()
constructor() {
this.openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
})
}
async generateEmbedding(
text: string,
options: EmbeddingOptions = {}
): Promise<number[]> {
// Check cache first
const cacheKey = `${text}-${options.model || 'text-embedding-3-small'}`
if (this.cache.has(cacheKey)) {
return this.cache.get(cacheKey)!
}
const response = await this.openai.embeddings.create({
model: options.model || 'text-embedding-3-small',
input: text,
dimensions: options.dimensions, // Optional dimension reduction
})
const embedding = response.data[0].embedding
this.cache.set(cacheKey, embedding)
return embedding
}
async generateBatchEmbeddings(
texts: string[],
options: EmbeddingOptions = {}
): Promise<number[][]> {
const batchSize = options.batchSize || 100
const embeddings: number[][] = []
for (let i = 0; i < texts.length; i += batchSize) {
const batch = texts.slice(i, i + batchSize)
const response = await this.openai.embeddings.create({
model: options.model || 'text-embedding-3-small',
input: batch,
dimensions: options.dimensions,
})
embeddings.push(...response.data.map(d => d.embedding))
}
return embeddings
}
// Token-aware text chunking
chunkText(text: string, maxTokens: number = 512): string[] {
const chunks: string[] = []
const sentences = text.match(/[^.!?]+[.!?]+/g) || [text]
let currentChunk = ''
let currentTokens = 0
for (const sentence of sentences) {
const sentenceTokens = encode(sentence).length
if (currentTokens + sentenceTokens > maxTokens) {
if (currentChunk) {
chunks.push(currentChunk.trim())
}
currentChunk = sentence
currentTokens = sentenceTokens
} else {
currentChunk += ' ' + sentence
currentTokens += sentenceTokens
}
}
if (currentChunk) {
chunks.push(currentChunk.trim())
}
return chunks
}
// Smart chunking with overlap
chunkTextWithOverlap(
text: string,
chunkSize: number = 1000,
overlap: number = 200
): string[] {
const chunks: string[] = []
const stride = chunkSize - overlap
for (let i = 0; i < text.length; i += stride) {
chunks.push(text.slice(i, i + chunkSize))
}
return chunks
}
}Document Processing Pipeline
// src/pipelines/document-processor.ts
export interface Document {
id: string
title: string
content: string
metadata: Record<string, any>
}
export interface ProcessedChunk {
id: string
documentId: string
content: string
embedding: number[]
metadata: {
chunkIndex: number
totalChunks: number
startChar: number
endChar: number
[key: string]: any
}
}
export class DocumentProcessor {
constructor(
private embeddingService: EmbeddingService,
private vectorStore: VectorStore
) {}
async processDocument(document: Document): Promise<ProcessedChunk[]> {
// 1. Chunk the document
const chunks = this.embeddingService.chunkTextWithOverlap(
document.content,
1000,
200
)
// 2. Generate embeddings for each chunk
const embeddings = await this.embeddingService.generateBatchEmbeddings(chunks)
// 3. Create processed chunks with metadata
const processedChunks: ProcessedChunk[] = chunks.map((chunk, index) => ({
id: `${document.id}_chunk_${index}`,
documentId: document.id,
content: chunk,
embedding: embeddings[index],
metadata: {
chunkIndex: index,
totalChunks: chunks.length,
startChar: index * 800, // Approximate due to overlap
endChar: Math.min((index + 1) * 800 + 200, document.content.length),
title: document.title,
...document.metadata,
},
}))
// 4. Store in vector database
await this.vectorStore.upsertBatch(processedChunks)
return processedChunks
}
async processDocumentBatch(documents: Document[]): Promise<void> {
// Process documents in parallel with concurrency limit
const concurrency = 5
const results: ProcessedChunk[][] = []
for (let i = 0; i < documents.length; i += concurrency) {
const batch = documents.slice(i, i + concurrency)
const batchResults = await Promise.all(
batch.map(doc => this.processDocument(doc))
)
results.push(...batchResults)
}
console.log(`Processed ${documents.length} documents, ${results.flat().length} chunks`)
}
}3. Semantic Search Implementation
Vector Store Abstraction
// src/vector-db/vector-store.ts
export interface SearchResult {
id: string
score: number
content: string
metadata: Record<string, any>
}
export interface VectorStore {
upsert(id: string, embedding: number[], metadata: any): Promise<void>
upsertBatch(items: ProcessedChunk[]): Promise<void>
search(
embedding: number[],
topK: number,
filter?: Record<string, any>
): Promise<SearchResult[]>
delete(ids: string[]): Promise<void>
}
// Pinecone Implementation
export class PineconeVectorStore implements VectorStore {
constructor(
private client: PineconeClient,
private indexName: string
) {}
async upsert(id: string, embedding: number[], metadata: any): Promise<void> {
const index = this.client.Index(this.indexName)
await index.upsert({
upsertRequest: {
vectors: [{
id,
values: embedding,
metadata,
}],
},
})
}
async upsertBatch(items: ProcessedChunk[]): Promise<void> {
const index = this.client.Index(this.indexName)
const vectors = items.map(item => ({
id: item.id,
values: item.embedding,
metadata: {
content: item.content,
...item.metadata,
},
}))
// Batch upsert with size limit
const batchSize = 100
for (let i = 0; i < vectors.length; i += batchSize) {
await index.upsert({
upsertRequest: {
vectors: vectors.slice(i, i + batchSize),
},
})
}
}
async search(
embedding: number[],
topK: number,
filter?: Record<string, any>
): Promise<SearchResult[]> {
const index = this.client.Index(this.indexName)
const results = await index.query({
queryRequest: {
vector: embedding,
topK,
includeMetadata: true,
filter,
},
})
return results.matches?.map(match => ({
id: match.id,
score: match.score || 0,
content: match.metadata?.content || '',
metadata: match.metadata || {},
})) || []
}
async delete(ids: string[]): Promise<void> {
const index = this.client.Index(this.indexName)
await index.delete1({ ids })
}
}
// Qdrant Implementation
export class QdrantVectorStore implements VectorStore {
constructor(
private client: QdrantClient,
private collectionName: string
) {}
async upsert(id: string, embedding: number[], metadata: any): Promise<void> {
await this.client.upsert(this.collectionName, {
wait: true,
points: [{
id,
vector: embedding,
payload: metadata,
}],
})
}
async upsertBatch(items: ProcessedChunk[]): Promise<void> {
const points = items.map(item => ({
id: item.id,
vector: item.embedding,
payload: {
content: item.content,
...item.metadata,
},
}))
await this.client.upsert(this.collectionName, {
wait: true,
points,
})
}
async search(
embedding: number[],
topK: number,
filter?: Record<string, any>
): Promise<SearchResult[]> {
const results = await this.client.search(this.collectionName, {
vector: embedding,
limit: topK,
filter: filter ? this.buildQdrantFilter(filter) : undefined,
with_payload: true,
})
return results.map(result => ({
id: result.id.toString(),
score: result.score,
content: result.payload?.content || '',
metadata: result.payload || {},
}))
}
private buildQdrantFilter(filter: Record<string, any>): any {
// Convert simple filter to Qdrant filter format
const must: any[] = []
for (const [key, value] of Object.entries(filter)) {
must.push({
key,
match: { value },
})
}
return { must }
}
async delete(ids: string[]): Promise<void> {
await this.client.delete(this.collectionName, {
wait: true,
points: ids,
})
}
}Semantic Search Service
// src/services/semantic-search.service.ts
export interface SearchOptions {
topK?: number
threshold?: number
filter?: Record<string, any>
rerank?: boolean
hybridAlpha?: number // For hybrid search
}
export class SemanticSearchService {
constructor(
private embeddingService: EmbeddingService,
private vectorStore: VectorStore,
private rerankingModel?: RerankingModel
) {}
async search(
query: string,
options: SearchOptions = {}
): Promise<SearchResult[]> {
const {
topK = 10,
threshold = 0.7,
filter,
rerank = false,
} = options
// 1. Generate query embedding
const queryEmbedding = await this.embeddingService.generateEmbedding(query)
// 2. Search vector store
const results = await this.vectorStore.search(
queryEmbedding,
rerank ? topK * 2 : topK, // Get more results if reranking
filter
)
// 3. Filter by threshold
const filteredResults = results.filter(r => r.score >= threshold)
// 4. Optional reranking
if (rerank && this.rerankingModel && filteredResults.length > 0) {
const rerankedResults = await this.rerankingModel.rerank(
query,
filteredResults.map(r => r.content)
)
// Map back to original results with new scores
return rerankedResults
.map((reranked, index) => ({
...filteredResults[index],
score: reranked.score,
}))
.slice(0, topK)
}
return filteredResults.slice(0, topK)
}
async searchWithContext(
query: string,
conversationHistory: Message[],
options: SearchOptions = {}
): Promise<SearchResult[]> {
// Enhance query with conversation context
const contextualQuery = this.buildContextualQuery(query, conversationHistory)
return this.search(contextualQuery, options)
}
private buildContextualQuery(
query: string,
history: Message[]
): string {
// Take last few messages for context
const recentHistory = history.slice(-3)
const context = recentHistory
.map(m => `${m.role}: ${m.content}`)
.join('\n')
return `Context:\n${context}\n\nCurrent query: ${query}`
}
// Multi-query search for better recall
async multiQuerySearch(
query: string,
options: SearchOptions = {}
): Promise<SearchResult[]> {
// Generate query variations
const queries = await this.generateQueryVariations(query)
// Search with each query
const allResults = await Promise.all(
queries.map(q => this.search(q, { ...options, topK: 5 }))
)
// Deduplicate and combine results
const resultMap = new Map<string, SearchResult>()
for (const results of allResults) {
for (const result of results) {
const existing = resultMap.get(result.id)
if (!existing || result.score > existing.score) {
resultMap.set(result.id, result)
}
}
}
// Sort by score and return top K
return Array.from(resultMap.values())
.sort((a, b) => b.score - a.score)
.slice(0, options.topK || 10)
}
private async generateQueryVariations(query: string): Promise<string[]> {
// Use LLM to generate query variations
const prompt = `Generate 3 alternative phrasings of this query: "${query}"
Alternatives:
1.`
// Implementation would call LLM here
// For now, return simple variations
return [
query,
`What is ${query}`,
`Explain ${query}`,
`Information about ${query}`,
]
}
}4. RAG Architecture
RAG Service Implementation
// src/services/rag.service.ts
export interface RAGOptions {
searchOptions?: SearchOptions
contextWindow?: number
systemPrompt?: string
temperature?: number
model?: string
}
export class RAGService {
constructor(
private searchService: SemanticSearchService,
private llmService: LLMService,
private cache: CacheService
) {}
async generate(
query: string,
options: RAGOptions = {}
): Promise<{
answer: string
sources: SearchResult[]
cached: boolean
}> {
const {
searchOptions = {},
contextWindow = 3000,
systemPrompt,
temperature = 0.7,
model = 'gpt-4-turbo-preview',
} = options
// Check cache first
const cacheKey = this.generateCacheKey(query, options)
const cached = await this.cache.get(cacheKey)
if (cached) {
return { ...cached, cached: true }
}
// 1. Retrieve relevant documents
const searchResults = await this.searchService.search(query, searchOptions)
if (searchResults.length === 0) {
return {
answer: "I couldn't find relevant information to answer your question.",
sources: [],
cached: false,
}
}
// 2. Build context from search results
const context = this.buildContext(searchResults, contextWindow)
// 3. Create prompt
const prompt = this.buildRAGPrompt(query, context, systemPrompt)
// 4. Generate answer using LLM
const answer = await this.llmService.generate({
messages: [
{ role: 'system', content: prompt.system },
{ role: 'user', content: prompt.user },
],
temperature,
model,
})
// 5. Extract citations from answer
const citedSources = this.extractCitations(answer, searchResults)
// 6. Cache the result
const result = {
answer,
sources: citedSources,
cached: false,
}
await this.cache.set(cacheKey, result, 3600) // 1 hour TTL
return result
}
async generateStreaming(
query: string,
options: RAGOptions = {},
onToken: (token: string) => void
): Promise<{
sources: SearchResult[]
fullAnswer: string
}> {
// Retrieve context (same as non-streaming)
const searchResults = await this.searchService.search(
query,
options.searchOptions
)
const context = this.buildContext(
searchResults,
options.contextWindow || 3000
)
const prompt = this.buildRAGPrompt(
query,
context,
options.systemPrompt
)
// Stream the response
let fullAnswer = ''
await this.llmService.generateStream({
messages: [
{ role: 'system', content: prompt.system },
{ role: 'user', content: prompt.user },
],
temperature: options.temperature || 0.7,
model: options.model || 'gpt-4-turbo-preview',
onToken: (token) => {
fullAnswer += token
onToken(token)
},
})
return {
sources: searchResults,
fullAnswer,
}
}
private buildContext(
results: SearchResult[],
maxTokens: number
): string {
let context = ''
let currentTokens = 0
for (const [index, result] of results.entries()) {
const chunk = `[Source ${index + 1}] ${result.content}\n\n`
const chunkTokens = this.estimateTokens(chunk)
if (currentTokens + chunkTokens > maxTokens) {
break
}
context += chunk
currentTokens += chunkTokens
}
return context.trim()
}
private buildRAGPrompt(
query: string,
context: string,
customSystemPrompt?: string
): { system: string; user: string } {
const system = customSystemPrompt || `You are a helpful AI assistant powered by ParrotRouter.
Your task is to answer questions based on the provided context.
Always cite your sources using [Source N] notation.
If the context doesn't contain relevant information, say so clearly.
Be concise and accurate in your responses.`
const user = `Context:
${context}
Question: ${query}
Please provide a comprehensive answer based on the context above. Include source citations.`
return { system, user }
}
private extractCitations(
answer: string,
sources: SearchResult[]
): SearchResult[] {
const citationPattern = /\[Source (\d+)\]/g
const citations = new Set<number>()
let match
while ((match = citationPattern.exec(answer)) !== null) {
const sourceIndex = parseInt(match[1], 10) - 1
if (sourceIndex >= 0 && sourceIndex < sources.length) {
citations.add(sourceIndex)
}
}
return Array.from(citations).map(index => sources[index])
}
private generateCacheKey(query: string, options: RAGOptions): string {
const params = {
query,
model: options.model,
temperature: options.temperature,
searchOptions: options.searchOptions,
}
return `rag:${JSON.stringify(params)}`
}
private estimateTokens(text: string): number {
// Rough estimation: 1 token ≈ 4 characters
return Math.ceil(text.length / 4)
}
}
// Advanced RAG with conversation memory
export class ConversationalRAGService extends RAGService {
async generateWithMemory(
query: string,
conversationId: string,
options: RAGOptions = {}
): Promise<{
answer: string
sources: SearchResult[]
conversationId: string
}> {
// Retrieve conversation history
const history = await this.getConversationHistory(conversationId)
// Search with context
const searchResults = await this.searchService.searchWithContext(
query,
history,
options.searchOptions
)
// Build prompt with history
const context = this.buildContext(searchResults, options.contextWindow || 3000)
const prompt = this.buildConversationalPrompt(query, context, history)
// Generate response
const answer = await this.llmService.generate({
messages: [
{ role: 'system', content: prompt.system },
...history.map(msg => ({
role: msg.role as 'user' | 'assistant',
content: msg.content,
})),
{ role: 'user', content: query },
],
temperature: options.temperature || 0.7,
model: options.model || 'gpt-4-turbo-preview',
})
// Store in conversation history
await this.addToConversation(conversationId, query, answer)
return {
answer,
sources: searchResults,
conversationId,
}
}
private buildConversationalPrompt(
query: string,
context: string,
history: Message[]
): { system: string } {
return {
system: `You are a helpful AI assistant powered by ParrotRouter.
You have access to a knowledge base and can answer questions based on the provided context.
Maintain conversation continuity and reference previous messages when relevant.
Always cite sources using [Source N] notation.
Current context from knowledge base:
${context}`,
}
}
private async getConversationHistory(
conversationId: string
): Promise<Message[]> {
// Implementation would fetch from database
return []
}
private async addToConversation(
conversationId: string,
query: string,
answer: string
): Promise<void> {
// Implementation would store in database
}
}RAG Best Practices
- • Chunk documents with appropriate overlap (10-20%)
- • Use metadata filtering to improve relevance
- • Implement query expansion for better recall
- • Cache frequent queries to reduce costs
- • Monitor retrieval quality and adjust thresholds
5. Hybrid Search Strategies
Hybrid Search Implementation
// src/services/hybrid-search.service.ts
export interface HybridSearchOptions extends SearchOptions {
alpha?: number // Balance between keyword (0) and semantic (1) search
keywordBoost?: number
bm25Config?: {
k1?: number
b?: number
}
}
export class HybridSearchService {
constructor(
private semanticSearch: SemanticSearchService,
private keywordSearch: KeywordSearchService,
private vectorStore: VectorStore
) {}
async hybridSearch(
query: string,
options: HybridSearchOptions = {}
): Promise<SearchResult[]> {
const {
alpha = 0.5,
topK = 10,
filter,
keywordBoost = 1.0,
} = options
// Perform both searches in parallel
const [semanticResults, keywordResults] = await Promise.all([
this.semanticSearch.search(query, { topK: topK * 2, filter }),
this.keywordSearch.search(query, { topK: topK * 2, filter }),
])
// Combine and rerank results
const combinedResults = this.fuseResults(
semanticResults,
keywordResults,
alpha,
keywordBoost
)
return combinedResults.slice(0, topK)
}
private fuseResults(
semanticResults: SearchResult[],
keywordResults: SearchResult[],
alpha: number,
keywordBoost: number
): SearchResult[] {
const scoreMap = new Map<string, {
result: SearchResult
semanticScore: number
keywordScore: number
}>()
// Process semantic results
for (const result of semanticResults) {
scoreMap.set(result.id, {
result,
semanticScore: result.score,
keywordScore: 0,
})
}
// Process keyword results
for (const result of keywordResults) {
const existing = scoreMap.get(result.id)
if (existing) {
existing.keywordScore = result.score * keywordBoost
} else {
scoreMap.set(result.id, {
result,
semanticScore: 0,
keywordScore: result.score * keywordBoost,
})
}
}
// Calculate hybrid scores
const hybridResults: SearchResult[] = []
for (const { result, semanticScore, keywordScore } of scoreMap.values()) {
const hybridScore = alpha * semanticScore + (1 - alpha) * keywordScore
hybridResults.push({
...result,
score: hybridScore,
metadata: {
...result.metadata,
semanticScore,
keywordScore,
hybridScore,
},
})
}
// Sort by hybrid score
return hybridResults.sort((a, b) => b.score - a.score)
}
}
// Weaviate Hybrid Search (Native Support)
export class WeaviateHybridSearch {
constructor(private client: any) {}
async search(
query: string,
className: string,
options: HybridSearchOptions = {}
): Promise<SearchResult[]> {
const {
alpha = 0.5,
topK = 10,
filter,
} = options
const result = await this.client.graphql
.get()
.withClassName(className)
.withHybrid({
query,
alpha, // 0 = keyword only, 1 = vector only
})
.withLimit(topK)
.withWhere(filter ? this.buildWhereFilter(filter) : undefined)
.withFields('_additional { id score } content metadata')
.do()
return this.transformResults(result.data.Get[className])
}
private buildWhereFilter(filter: Record<string, any>): any {
// Transform to Weaviate where filter format
const operator = 'And'
const operands = Object.entries(filter).map(([path, value]) => ({
path: [path],
operator: 'Equal',
value,
}))
return { operator, operands }
}
private transformResults(weaviateResults: any[]): SearchResult[] {
return weaviateResults.map(result => ({
id: result._additional.id,
score: result._additional.score,
content: result.content,
metadata: result.metadata || {},
}))
}
}6. Database Schema Design
PostgreSQL with pgvector Schema
-- Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;
-- Documents table
CREATE TABLE documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
title VARCHAR(255) NOT NULL,
source VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata JSONB DEFAULT '{}'::jsonb
);
-- Document chunks with embeddings
CREATE TABLE document_chunks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
document_id UUID REFERENCES documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
embedding vector(1536), -- OpenAI embedding dimension
char_start INTEGER,
char_end INTEGER,
metadata JSONB DEFAULT '{}'::jsonb,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(document_id, chunk_index)
);
-- Create indexes for vector similarity search
CREATE INDEX document_chunks_embedding_idx ON document_chunks
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
-- Conversations table
CREATE TABLE conversations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL,
title VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata JSONB DEFAULT '{}'::jsonb
);
-- Messages table
CREATE TABLE messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE,
role VARCHAR(20) NOT NULL CHECK (role IN ('user', 'assistant', 'system')),
content TEXT NOT NULL,
embedding vector(1536),
tokens_used INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata JSONB DEFAULT '{}'::jsonb
);
-- Search history for analytics
CREATE TABLE search_queries (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID,
query TEXT NOT NULL,
query_embedding vector(1536),
results_count INTEGER,
clicked_results JSONB DEFAULT '[]'::jsonb,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Cached embeddings
CREATE TABLE embedding_cache (
text_hash VARCHAR(64) PRIMARY KEY,
text TEXT NOT NULL,
embedding vector(1536) NOT NULL,
model VARCHAR(50) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Function for semantic search
CREATE OR REPLACE FUNCTION search_documents(
query_embedding vector(1536),
match_threshold float DEFAULT 0.7,
match_count int DEFAULT 10
)
RETURNS TABLE (
chunk_id UUID,
document_id UUID,
content TEXT,
similarity float,
metadata JSONB
)
LANGUAGE plpgsql
AS $$
BEGIN
RETURN QUERY
SELECT
dc.id,
dc.document_id,
dc.content,
1 - (dc.embedding <=> query_embedding) AS similarity,
dc.metadata
FROM document_chunks dc
WHERE 1 - (dc.embedding <=> query_embedding) > match_threshold
ORDER BY dc.embedding <=> query_embedding
LIMIT match_count;
END;
$$;
-- Hybrid search function (vector + full-text)
CREATE OR REPLACE FUNCTION hybrid_search(
query_text TEXT,
query_embedding vector(1536),
alpha float DEFAULT 0.5,
match_count int DEFAULT 10
)
RETURNS TABLE (
chunk_id UUID,
document_id UUID,
content TEXT,
vector_similarity float,
text_rank float,
hybrid_score float,
metadata JSONB
)
LANGUAGE plpgsql
AS $$
BEGIN
RETURN QUERY
WITH vector_search AS (
SELECT
dc.id,
dc.document_id,
dc.content,
1 - (dc.embedding <=> query_embedding) AS similarity,
dc.metadata
FROM document_chunks dc
ORDER BY dc.embedding <=> query_embedding
LIMIT match_count * 2
),
text_search AS (
SELECT
dc.id,
ts_rank(to_tsvector('english', dc.content),
plainto_tsquery('english', query_text)) AS rank
FROM document_chunks dc
WHERE to_tsvector('english', dc.content) @@
plainto_tsquery('english', query_text)
ORDER BY rank DESC
LIMIT match_count * 2
)
SELECT DISTINCT
vs.id,
vs.document_id,
vs.content,
vs.similarity,
COALESCE(ts.rank, 0) AS text_rank,
(alpha * vs.similarity + (1 - alpha) * COALESCE(ts.rank, 0)) AS hybrid_score,
vs.metadata
FROM vector_search vs
LEFT JOIN text_search ts ON vs.id = ts.id
ORDER BY hybrid_score DESC
LIMIT match_count;
END;
$$;TypeScript Data Access Layer
// src/dal/pgvector-dal.ts
import { Pool } from 'pg'
import { v4 as uuidv4 } from 'uuid'
export class PgVectorDAL {
constructor(private pool: Pool) {}
async initializeSchema(): Promise<void> {
// Run schema creation SQL
const schemaSQL = `...` // Schema from above
await this.pool.query(schemaSQL)
}
async insertDocument(
title: string,
source: string,
metadata: Record<string, any> = {}
): Promise<string> {
const result = await this.pool.query(
`INSERT INTO documents (title, source, metadata)
VALUES ($1, $2, $3)
RETURNING id`,
[title, source, JSON.stringify(metadata)]
)
return result.rows[0].id
}
async insertChunks(chunks: ProcessedChunk[]): Promise<void> {
const values = chunks.map(chunk => [
chunk.documentId,
chunk.metadata.chunkIndex,
chunk.content,
JSON.stringify(chunk.embedding),
chunk.metadata.startChar,
chunk.metadata.endChar,
JSON.stringify(chunk.metadata),
])
// Bulk insert using COPY for performance
const copyStream = this.pool.query(
`COPY document_chunks (
document_id, chunk_index, content, embedding,
char_start, char_end, metadata
) FROM STDIN WITH (FORMAT CSV)`
)
for (const row of values) {
copyStream.write(row.join(',') + '\n')
}
copyStream.end()
}
async semanticSearch(
embedding: number[],
threshold: number = 0.7,
limit: number = 10
): Promise<SearchResult[]> {
const result = await this.pool.query(
'SELECT * FROM search_documents($1, $2, $3)',
[JSON.stringify(embedding), threshold, limit]
)
return result.rows.map(row => ({
id: row.chunk_id,
score: row.similarity,
content: row.content,
metadata: row.metadata,
}))
}
async hybridSearch(
query: string,
embedding: number[],
alpha: number = 0.5,
limit: number = 10
): Promise<SearchResult[]> {
const result = await this.pool.query(
'SELECT * FROM hybrid_search($1, $2, $3, $4)',
[query, JSON.stringify(embedding), alpha, limit]
)
return result.rows.map(row => ({
id: row.chunk_id,
score: row.hybrid_score,
content: row.content,
metadata: {
...row.metadata,
vectorSimilarity: row.vector_similarity,
textRank: row.text_rank,
},
}))
}
// Embedding cache operations
async getCachedEmbedding(
text: string,
model: string
): Promise<number[] | null> {
const hash = this.hashText(text)
const result = await this.pool.query(
`UPDATE embedding_cache
SET last_accessed = CURRENT_TIMESTAMP
WHERE text_hash = $1 AND model = $2
RETURNING embedding`,
[hash, model]
)
if (result.rows.length > 0) {
return JSON.parse(result.rows[0].embedding)
}
return null
}
async setCachedEmbedding(
text: string,
embedding: number[],
model: string
): Promise<void> {
const hash = this.hashText(text)
await this.pool.query(
`INSERT INTO embedding_cache (text_hash, text, embedding, model)
VALUES ($1, $2, $3, $4)
ON CONFLICT (text_hash)
DO UPDATE SET last_accessed = CURRENT_TIMESTAMP`,
[hash, text, JSON.stringify(embedding), model]
)
}
private hashText(text: string): string {
// Simple hash for demo - use crypto.createHash in production
return Buffer.from(text).toString('base64').substring(0, 64)
}
}7. Caching Strategies
Multi-Level Caching
// src/services/cache.service.ts
import Redis from 'ioredis'
import LRU from 'lru-cache'
export class MultiLevelCache {
private memoryCache: LRU<string, any>
private redisCache: Redis
constructor(redis: Redis, options?: {
maxMemoryItems?: number
memoryTTL?: number
}) {
this.redisCache = redis
this.memoryCache = new LRU({
max: options?.maxMemoryItems || 1000,
ttl: options?.memoryTTL || 1000 * 60 * 5, // 5 minutes
})
}
async get<T>(key: string): Promise<T | null> {
// Check memory cache first
const memoryResult = this.memoryCache.get(key)
if (memoryResult !== undefined) {
return memoryResult
}
// Check Redis cache
const redisResult = await this.redisCache.get(key)
if (redisResult) {
const parsed = JSON.parse(redisResult)
// Populate memory cache
this.memoryCache.set(key, parsed)
return parsed
}
return null
}
async set<T>(
key: string,
value: T,
ttlSeconds?: number
): Promise<void> {
// Set in both caches
this.memoryCache.set(key, value)
const serialized = JSON.stringify(value)
if (ttlSeconds) {
await this.redisCache.setex(key, ttlSeconds, serialized)
} else {
await this.redisCache.set(key, serialized)
}
}
async invalidate(pattern: string): Promise<void> {
// Clear from memory cache
for (const key of this.memoryCache.keys()) {
if (key.includes(pattern)) {
this.memoryCache.delete(key)
}
}
// Clear from Redis
const keys = await this.redisCache.keys(pattern)
if (keys.length > 0) {
await this.redisCache.del(...keys)
}
}
}
// Embedding cache service
export class EmbeddingCacheService {
constructor(
private cache: MultiLevelCache,
private dal: PgVectorDAL
) {}
async getEmbedding(
text: string,
model: string
): Promise<number[] | null> {
const cacheKey = `embedding:${model}:${this.hashText(text)}`
// Check multi-level cache
const cached = await this.cache.get<number[]>(cacheKey)
if (cached) {
return cached
}
// Check persistent storage
const stored = await this.dal.getCachedEmbedding(text, model)
if (stored) {
// Populate cache
await this.cache.set(cacheKey, stored, 3600)
return stored
}
return null
}
async setEmbedding(
text: string,
embedding: number[],
model: string
): Promise<void> {
const cacheKey = `embedding:${model}:${this.hashText(text)}`
// Store in all levels
await Promise.all([
this.cache.set(cacheKey, embedding, 3600),
this.dal.setCachedEmbedding(text, embedding, model),
])
}
private hashText(text: string): string {
return Buffer.from(text).toString('base64').substring(0, 32)
}
}
// Query result caching
export class QueryCacheService {
constructor(
private cache: MultiLevelCache,
private ttlSeconds: number = 3600
) {}
async getCachedResults(
query: string,
filters?: Record<string, any>
): Promise<SearchResult[] | null> {
const cacheKey = this.generateQueryCacheKey(query, filters)
return this.cache.get<SearchResult[]>(cacheKey)
}
async setCachedResults(
query: string,
results: SearchResult[],
filters?: Record<string, any>
): Promise<void> {
const cacheKey = this.generateQueryCacheKey(query, filters)
await this.cache.set(cacheKey, results, this.ttlSeconds)
}
private generateQueryCacheKey(
query: string,
filters?: Record<string, any>
): string {
const params = {
q: query,
f: filters || {},
}
return `query:${JSON.stringify(params)}`
}
}8. Performance Optimization
Batch Processing Optimization
// src/optimization/batch-processor.ts
export class BatchProcessor {
private queue: Map<string, any[]> = new Map()
private timers: Map<string, NodeJS.Timeout> = new Map()
constructor(
private batchSize: number = 100,
private flushInterval: number = 1000
) {}
async addToBatch<T>(
batchKey: string,
item: T,
processor: (items: T[]) => Promise<void>
): Promise<void> {
// Get or create batch
if (!this.queue.has(batchKey)) {
this.queue.set(batchKey, [])
}
const batch = this.queue.get(batchKey)!
batch.push(item)
// Process if batch is full
if (batch.length >= this.batchSize) {
await this.flush(batchKey, processor)
} else {
// Set timer for time-based flush
this.setFlushTimer(batchKey, processor)
}
}
private setFlushTimer<T>(
batchKey: string,
processor: (items: T[]) => Promise<void>
): void {
// Clear existing timer
const existingTimer = this.timers.get(batchKey)
if (existingTimer) {
clearTimeout(existingTimer)
}
// Set new timer
const timer = setTimeout(() => {
this.flush(batchKey, processor)
}, this.flushInterval)
this.timers.set(batchKey, timer)
}
private async flush<T>(
batchKey: string,
processor: (items: T[]) => Promise<void>
): Promise<void> {
const batch = this.queue.get(batchKey)
if (!batch || batch.length === 0) {
return
}
// Clear batch and timer
this.queue.delete(batchKey)
const timer = this.timers.get(batchKey)
if (timer) {
clearTimeout(timer)
this.timers.delete(batchKey)
}
// Process batch
await processor(batch)
}
}
// Parallel processing for large datasets
export class ParallelProcessor {
constructor(
private concurrency: number = 5
) {}
async processInParallel<T, R>(
items: T[],
processor: (item: T) => Promise<R>,
onProgress?: (completed: number, total: number) => void
): Promise<R[]> {
const results: R[] = new Array(items.length)
let completed = 0
// Process in chunks
for (let i = 0; i < items.length; i += this.concurrency) {
const chunk = items.slice(i, i + this.concurrency)
const chunkResults = await Promise.all(
chunk.map((item, index) => processor(item))
)
// Store results in correct positions
for (let j = 0; j < chunkResults.length; j++) {
results[i + j] = chunkResults[j]
}
completed += chunk.length
onProgress?.(completed, items.length)
}
return results
}
}
// Index optimization service
export class IndexOptimizationService {
constructor(
private vectorStore: VectorStore,
private monitoring: MonitoringService
) {}
async optimizeIndex(collectionName: string): Promise<void> {
const metrics = await this.analyzeIndexPerformance(collectionName)
if (metrics.avgQueryTime > 100) {
// Re-index with better parameters
await this.reindexCollection(collectionName, {
nlist: Math.ceil(Math.sqrt(metrics.totalVectors)),
nprobe: 10,
})
}
if (metrics.fragmentationRatio > 0.3) {
// Compact index
await this.compactIndex(collectionName)
}
}
private async analyzeIndexPerformance(
collectionName: string
): Promise<any> {
// Analyze query performance
const queryMetrics = await this.monitoring.getQueryMetrics(collectionName)
return {
avgQueryTime: queryMetrics.avgDuration,
totalVectors: queryMetrics.totalVectors,
fragmentationRatio: queryMetrics.fragmentation,
}
}
private async reindexCollection(
collectionName: string,
params: any
): Promise<void> {
// Implementation depends on vector database
console.log(`Reindexing ${collectionName} with params:`, params)
}
private async compactIndex(collectionName: string): Promise<void> {
// Implementation depends on vector database
console.log(`Compacting index for ${collectionName}`)
}
}Performance Tips
- • Use appropriate index types (IVF, HNSW) based on dataset size
- • Implement query result caching for repeated searches
- • Batch embedding generation to reduce API calls
- • Monitor and optimize vector dimensions if possible
- • Use async/parallel processing for large datasets
9. Cost Optimization
Cost Management Service
// src/services/cost-management.service.ts
export interface CostMetrics {
embeddingCosts: number
storageCosts: number
queryCosts: number
totalCosts: number
}
export class CostManagementService {
private costPerEmbedding = 0.0001 // $0.0001 per embedding
private costPerQuery = 0.00002 // $0.00002 per query
private costPerGBMonth = 0.25 // $0.25 per GB/month for storage
constructor(
private monitoring: MonitoringService,
private cache: CacheService
) {}
async calculateMonthlyCosts(): Promise<CostMetrics> {
const usage = await this.monitoring.getMonthlyUsage()
const embeddingCosts = usage.embeddingsGenerated * this.costPerEmbedding
const queryCosts = usage.queriesProcessed * this.costPerQuery
const storageCosts = (usage.storageGB * this.costPerGBMonth)
return {
embeddingCosts,
storageCosts,
queryCosts,
totalCosts: embeddingCosts + storageCosts + queryCosts,
}
}
async optimizeCosts(): Promise<{
recommendations: string[]
potentialSavings: number
}> {
const recommendations: string[] = []
let potentialSavings = 0
// Check cache hit rate
const cacheStats = await this.cache.getStats()
if (cacheStats.hitRate < 0.5) {
recommendations.push(
'Improve cache hit rate by increasing cache size or TTL'
)
potentialSavings += this.estimateCacheSavings(cacheStats)
}
// Check for duplicate embeddings
const duplicates = await this.findDuplicateEmbeddings()
if (duplicates > 100) {
recommendations.push(
`Remove ${duplicates} duplicate embeddings to save storage`
)
potentialSavings += duplicates * 0.001 // Rough estimate
}
// Check embedding model usage
const modelUsage = await this.monitoring.getModelUsage()
if (modelUsage['text-embedding-3-large'] > modelUsage['text-embedding-3-small'] * 0.5) {
recommendations.push(
'Consider using smaller embedding models for non-critical content'
)
potentialSavings += this.estimateModelSavings(modelUsage)
}
return { recommendations, potentialSavings }
}
private estimateCacheSavings(stats: any): number {
const missedQueries = stats.misses
const potentialCacheHits = missedQueries * 0.3 // Assume 30% could be cached
return potentialCacheHits * this.costPerEmbedding
}
private estimateModelSavings(usage: any): number {
const largeModelCost = 0.00013
const smallModelCost = 0.00002
const potentialSwitches = usage['text-embedding-3-large'] * 0.7
return potentialSwitches * (largeModelCost - smallModelCost)
}
async findDuplicateEmbeddings(): Promise<number> {
// Implementation would query database for duplicates
return 0
}
}
// Usage tracking
export class UsageTracker {
constructor(private redis: Redis) {}
async trackEmbedding(
model: string,
inputTokens: number
): Promise<void> {
const date = new Date().toISOString().split('T')[0]
const key = `usage:embeddings:${date}`
await this.redis.hincrby(key, model, 1)
await this.redis.hincrby(key, `${model}:tokens`, inputTokens)
await this.redis.expire(key, 86400 * 90) // 90 days retention
}
async trackQuery(
queryType: string,
resultCount: number
): Promise<void> {
const date = new Date().toISOString().split('T')[0]
const key = `usage:queries:${date}`
await this.redis.hincrby(key, queryType, 1)
await this.redis.hincrby(key, `${queryType}:results`, resultCount)
await this.redis.expire(key, 86400 * 90)
}
async getUsageReport(
startDate: string,
endDate: string
): Promise<any> {
const report = {
embeddings: {},
queries: {},
totalCost: 0,
}
// Aggregate usage data
const dates = this.getDateRange(startDate, endDate)
for (const date of dates) {
// Get embedding usage
const embeddingKey = `usage:embeddings:${date}`
const embeddings = await this.redis.hgetall(embeddingKey)
this.aggregateUsage(report.embeddings, embeddings)
// Get query usage
const queryKey = `usage:queries:${date}`
const queries = await this.redis.hgetall(queryKey)
this.aggregateUsage(report.queries, queries)
}
// Calculate costs
report.totalCost = this.calculateTotalCost(report)
return report
}
private getDateRange(start: string, end: string): string[] {
const dates: string[] = []
const current = new Date(start)
const endDate = new Date(end)
while (current <= endDate) {
dates.push(current.toISOString().split('T')[0])
current.setDate(current.getDate() + 1)
}
return dates
}
private aggregateUsage(target: any, source: any): void {
for (const [key, value] of Object.entries(source)) {
target[key] = (target[key] || 0) + parseInt(value as string, 10)
}
}
private calculateTotalCost(usage: any): number {
// Calculate based on model pricing
const embeddingCosts = {
'text-embedding-3-small': 0.00002,
'text-embedding-3-large': 0.00013,
'text-embedding-ada-002': 0.0001,
}
let total = 0
for (const [model, count] of Object.entries(usage.embeddings)) {
if (embeddingCosts[model]) {
total += embeddingCosts[model] * (count as number)
}
}
return total
}
}10. Production Deployment
Deployment Configuration
# docker-compose.yml
version: '3.8'
services:
# PostgreSQL with pgvector
postgres:
image: ankane/pgvector:latest
environment:
POSTGRES_DB: vectordb
POSTGRES_USER: vectoruser
POSTGRES_PASSWORD: vectorpass
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U vectoruser"]
interval: 10s
timeout: 5s
retries: 5
# Qdrant vector database
qdrant:
image: qdrant/qdrant:latest
ports:
- "6333:6333"
- "6334:6334"
volumes:
- qdrant_data:/qdrant/storage
environment:
QDRANT__SERVICE__GRPC_PORT: 6334
# Weaviate vector database
weaviate:
image: semitechnologies/weaviate:latest
ports:
- "8080:8080"
environment:
QUERY_DEFAULTS_LIMIT: 25
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
PERSISTENCE_DATA_PATH: '/var/lib/weaviate'
DEFAULT_VECTORIZER_MODULE: 'text2vec-openai'
ENABLE_MODULES: 'text2vec-openai'
OPENAI_APIKEY: ${OPENAI_API_KEY}
volumes:
- weaviate_data:/var/lib/weaviate
# Redis for caching
redis:
image: redis:7-alpine
command: redis-server --appendonly yes
ports:
- "6379:6379"
volumes:
- redis_data:/data
# Application
app:
build: .
environment:
DATABASE_URL: postgresql://vectoruser:vectorpass@postgres:5432/vectordb
QDRANT_URL: http://qdrant:6333
WEAVIATE_URL: http://weaviate:8080
REDIS_URL: redis://redis:6379
OPENAI_API_KEY: ${OPENAI_API_KEY}
depends_on:
- postgres
- qdrant
- weaviate
- redis
ports:
- "3000:3000"
volumes:
postgres_data:
qdrant_data:
weaviate_data:
redis_data:Kubernetes Deployment
# k8s/vector-db-deployment.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: qdrant
spec:
serviceName: qdrant
replicas: 3
selector:
matchLabels:
app: qdrant
template:
metadata:
labels:
app: qdrant
spec:
containers:
- name: qdrant
image: qdrant/qdrant:latest
ports:
- containerPort: 6333
name: http
- containerPort: 6334
name: grpc
volumeMounts:
- name: storage
mountPath: /qdrant/storage
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
livenessProbe:
httpGet:
path: /
port: 6333
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /readyz
port: 6333
initialDelaySeconds: 5
periodSeconds: 5
volumeClaimTemplates:
- metadata:
name: storage
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 100Gi
---
apiVersion: v1
kind: Service
metadata:
name: qdrant
spec:
selector:
app: qdrant
clusterIP: None
ports:
- port: 6333
name: http
- port: 6334
name: grpcMonitoring Setup
// src/monitoring/vector-db-monitoring.ts
import { Registry, Histogram, Counter, Gauge } from 'prom-client'
export class VectorDBMonitoring {
private registry: Registry
// Metrics
private embeddingLatency: Histogram<string>
private searchLatency: Histogram<string>
private searchResultCount: Histogram<string>
private indexSize: Gauge<string>
private cacheHitRate: Gauge<string>
constructor() {
this.registry = new Registry()
this.embeddingLatency = new Histogram({
name: 'embedding_generation_duration_seconds',
help: 'Time to generate embeddings',
labelNames: ['model'],
buckets: [0.1, 0.5, 1, 2, 5],
registers: [this.registry],
})
this.searchLatency = new Histogram({
name: 'vector_search_duration_seconds',
help: 'Time to perform vector search',
labelNames: ['index', 'type'],
buckets: [0.01, 0.05, 0.1, 0.5, 1],
registers: [this.registry],
})
this.searchResultCount = new Histogram({
name: 'search_result_count',
help: 'Number of results returned',
labelNames: ['index'],
buckets: [0, 1, 5, 10, 20, 50, 100],
registers: [this.registry],
})
this.indexSize = new Gauge({
name: 'vector_index_size',
help: 'Number of vectors in index',
labelNames: ['index'],
registers: [this.registry],
})
this.cacheHitRate = new Gauge({
name: 'embedding_cache_hit_rate',
help: 'Cache hit rate for embeddings',
registers: [this.registry],
})
}
recordEmbeddingGeneration(model: string, duration: number) {
this.embeddingLatency.observe({ model }, duration / 1000)
}
recordSearch(
index: string,
type: 'semantic' | 'hybrid',
duration: number,
resultCount: number
) {
this.searchLatency.observe({ index, type }, duration / 1000)
this.searchResultCount.observe({ index }, resultCount)
}
updateIndexSize(index: string, size: number) {
this.indexSize.set({ index }, size)
}
updateCacheHitRate(rate: number) {
this.cacheHitRate.set(rate)
}
async getMetrics(): Promise<string> {
return this.registry.metrics()
}
}✓ Production Checklist
- ☐ Configure index parameters for your dataset size
- ☐ Implement backup and recovery procedures
- ☐ Set up monitoring and alerting
- ☐ Configure auto-scaling for vector databases
- ☐ Implement access control and encryption
- ☐ Set up cost monitoring and alerts
- ☐ Create runbooks for common operations
- ☐ Test disaster recovery procedures
- ☐ Document embedding model versions
- ☐ Plan for index maintenance windows
Complete RAG System Example
// Complete RAG system with all components
import { config } from './config'
import { createPineconeClient, createQdrantClient } from './vector-db/clients'
import { EmbeddingService } from './services/embedding.service'
import { SemanticSearchService } from './services/semantic-search.service'
import { RAGService } from './services/rag.service'
import { MultiLevelCache } from './services/cache.service'
import { DocumentProcessor } from './pipelines/document-processor'
import { VectorDBMonitoring } from './monitoring/vector-db-monitoring'
import Redis from 'ioredis'
async function initializeRAGSystem() {
// Initialize clients
const redis = new Redis(config.redisUrl)
const pinecone = await createPineconeClient()
const vectorStore = new PineconeVectorStore(pinecone, config.pineconeIndex)
// Initialize services
const cache = new MultiLevelCache(redis)
const embeddingService = new EmbeddingService()
const searchService = new SemanticSearchService(
embeddingService,
vectorStore
)
const ragService = new RAGService(
searchService,
llmService,
cache
)
// Initialize monitoring
const monitoring = new VectorDBMonitoring()
// Document processing pipeline
const processor = new DocumentProcessor(
embeddingService,
vectorStore
)
return {
ragService,
processor,
monitoring,
// Process and index documents
async indexDocuments(documents: Document[]) {
console.log(`Indexing ${documents.length} documents...`)
const startTime = Date.now()
await processor.processDocumentBatch(documents)
const duration = Date.now() - startTime
monitoring.recordEmbeddingGeneration(
'text-embedding-3-small',
duration
)
console.log(`Indexed in ${duration}ms`)
},
// Perform RAG query
async query(question: string, options?: RAGOptions) {
const startTime = Date.now()
const result = await ragService.generate(question, options)
const duration = Date.now() - startTime
monitoring.recordSearch(
config.pineconeIndex,
'semantic',
duration,
result.sources.length
)
return result
},
// Get system metrics
async getMetrics() {
return monitoring.getMetrics()
},
}
}
// Usage example
async function main() {
const rag = await initializeRAGSystem()
// Index some documents
await rag.indexDocuments([
{
id: '1',
title: 'Introduction to Vector Databases',
content: 'Vector databases are specialized systems...',
metadata: { category: 'tutorial' },
},
])
// Query the system
const result = await rag.query(
'What are vector databases and how do they work?'
)
console.log('Answer:', result.answer)
console.log('Sources:', result.sources)
}
main().catch(console.error)References & Citations
[1] Vector Databases and LLMs: Better Together - Instaclustr
[2] When Large Language Models Meet Vector Databases: A Survey - arXiv
[3] Integrating Vector Databases with LLMs: A Hands-On Guide - JFrog
[4] Building LLM Applications With Vector Databases - Neptune.ai
[5] The 4 Best Vector Database Options for Your LLM Projects - Advancing Analytics
Ready to Build RAG Systems?
Implement powerful retrieval-augmented generation with vector databases using ParrotRouter's unified API.
References
- [1] AWS. "Lambda Documentation" (2024)
- [2] Vercel. "Streaming Responses" (2024)
- [3] GitHub. "OpenAI Node.js Library" (2024)