LangChain Integration Guide
Build sophisticated LLM applications with LangChain's powerful orchestration framework
Table of Contents
Overview
What is LangChain and why use it for LLM applications
LangChain is an open-source framework designed to simplify the creation of applications using large language models. It provides a standard interface for chains, lots of integrations with other tools, and end-to-end chains for common applications.
Key Benefits
- • Modular architecture
- • Built-in memory management
- • Tool & agent orchestration
- • Production-ready patterns
Use Cases
- • Chatbots with memory
- • Question-answering systems
- • Document analysis
- • Agent-based workflows
Core Components
Understanding LangChain's building blocks
Models
Wrappers around LLMs, Chat Models, and Embeddings
LLMs
Chat Models
Embeddings
Prompts
Templates, example selectors, and output parsers
PromptTemplate
FewShotPrompt
OutputParser
Chains
Sequences of calls to LLMs, tools, or other chains
LLMChain
SequentialChain
RouterChain
Agents
Dynamic decision-making with tool selection
ReAct
Tools
AgentExecutor
Memory
State persistence across chain/agent calls
ConversationBuffer
VectorStoreMemory
Summary
Setup & Configuration
Installing and configuring LangChain with various LLM providers
Installation
# Core installation pip install langchain langchain-community # Provider-specific packages pip install langchain-openai # For OpenAI pip install langchain-anthropic # For Anthropic pip install langchain-google-genai # For Google pip install langchain-cohere # For Cohere # Additional dependencies pip install chromadb # Vector store pip install faiss-cpu # Vector store pip install beautifulsoup4 # Web scraping pip install pypdf # PDF processing
Basic Setup with ParrotRouter
import os from langchain_openai import ChatOpenAI from langchain.schema import HumanMessage, SystemMessage # Configure ParrotRouter as OpenAI-compatible endpoint os.environ['OPENAI_API_KEY'] = 'your-parrotrouter-api-key' os.environ['OPENAI_API_BASE'] = 'https://api.parrotrouter.com/v1' # Initialize LLM llm = ChatOpenAI( model="gpt-3.5-turbo", temperature=0.7, max_tokens=1000, streaming=True # Enable streaming ) # Basic usage messages = [ SystemMessage(content="You are a helpful assistant."), HumanMessage(content="What is LangChain?") ] response = llm.invoke(messages) print(response.content)
Multi-Provider Configuration
from langchain_openai import ChatOpenAI from langchain_anthropic import ChatAnthropic from langchain_google_genai import ChatGoogleGenerativeAI from langchain.chat_models.base import BaseChatModel from typing import Dict class LLMFactory: """Factory for creating LLM instances with different providers""" def __init__(self): self.providers: Dict[str, BaseChatModel] = {} self._initialize_providers() def _initialize_providers(self): """Initialize available LLM providers""" # ParrotRouter (OpenAI-compatible) if os.environ.get('PARROTROUTER_API_KEY'): self.providers['parrotrouter'] = ChatOpenAI( openai_api_key=os.environ['PARROTROUTER_API_KEY'], openai_api_base='https://api.parrotrouter.com/v1', model="gpt-3.5-turbo" ) # Native OpenAI if os.environ.get('OPENAI_API_KEY'): self.providers['openai'] = ChatOpenAI( model="gpt-4" ) # Anthropic if os.environ.get('ANTHROPIC_API_KEY'): self.providers['anthropic'] = ChatAnthropic( model="claude-3-sonnet-20240229" ) # Google if os.environ.get('GOOGLE_API_KEY'): self.providers['google'] = ChatGoogleGenerativeAI( model="gemini-pro" ) def get_llm(self, provider: str = 'parrotrouter') -> BaseChatModel: """Get LLM instance by provider name""" if provider not in self.providers: raise ValueError(f"Provider {provider} not configured") return self.providers[provider] # Usage factory = LLMFactory() llm = factory.get_llm('parrotrouter') # or 'openai', 'anthropic', 'google'
Always use environment variables for API keys. Never hardcode sensitive credentials in your code.
Chains, Prompts, and Output Parsers
Building modular LLM pipelines with structured outputs
Prompt Templates
from langchain.prompts import PromptTemplate, ChatPromptTemplate, FewShotPromptTemplate from langchain.prompts.example_selector import SemanticSimilarityExampleSelector from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import FAISS # Basic prompt template basic_prompt = PromptTemplate( input_variables=["product", "language"], template=""" Write a short product description for {product} in {language}. Make it engaging and highlight key benefits. """ ) # Chat prompt template chat_prompt = ChatPromptTemplate.from_messages([ ("system", "You are a helpful product description writer."), ("human", "Write a description for {product} targeting {audience}"), ]) # Few-shot prompt with examples examples = [ { "input": "smartphone", "output": "A cutting-edge device that keeps you connected to what matters most." }, { "input": "laptop", "output": "Your portable powerhouse for productivity and creativity on the go." } ] example_prompt = PromptTemplate( input_variables=["input", "output"], template="Product: {input}\nDescription: {output}" ) # Semantic similarity selector for relevant examples example_selector = SemanticSimilarityExampleSelector.from_examples( examples, OpenAIEmbeddings(), FAISS, k=2 ) few_shot_prompt = FewShotPromptTemplate( example_selector=example_selector, example_prompt=example_prompt, prefix="Write concise product descriptions:", suffix="Product: {input}\nDescription:", input_variables=["input"] )
Chains
from langchain.chains import LLMChain, SequentialChain, TransformChain from langchain.callbacks import StreamingStdOutCallbackHandler # Simple LLM Chain simple_chain = LLMChain( llm=llm, prompt=basic_prompt, verbose=True ) result = simple_chain.run(product="wireless earbuds", language="English") # Sequential Chain - Multiple steps # Step 1: Generate description description_chain = LLMChain( llm=llm, prompt=PromptTemplate( input_variables=["product"], template="Write a product description for {product}" ), output_key="description" ) # Step 2: Extract key features features_chain = LLMChain( llm=llm, prompt=PromptTemplate( input_variables=["description"], template="Extract 3 key features from this description: {description}" ), output_key="features" ) # Step 3: Create marketing tagline tagline_chain = LLMChain( llm=llm, prompt=PromptTemplate( input_variables=["product", "features"], template="Create a catchy tagline for {product} based on these features: {features}" ), output_key="tagline" ) # Combine into sequential chain marketing_chain = SequentialChain( chains=[description_chain, features_chain, tagline_chain], input_variables=["product"], output_variables=["description", "features", "tagline"], verbose=True ) result = marketing_chain({"product": "smart home security camera"}) # Transform Chain for preprocessing def preprocess_text(inputs: dict) -> dict: """Clean and prepare text""" text = inputs["text"] # Remove extra whitespace cleaned = " ".join(text.split()) # Convert to lowercase for consistency cleaned = cleaned.lower() return {"cleaned_text": cleaned} transform_chain = TransformChain( input_variables=["text"], output_variables=["cleaned_text"], transform=preprocess_text ) # Chain with streaming streaming_chain = LLMChain( llm=llm, prompt=basic_prompt, callbacks=[StreamingStdOutCallbackHandler()] ) # Stream the response streaming_chain.run(product="electric bicycle", language="English")
Output Parsers
from langchain.output_parsers import ( PydanticOutputParser, StructuredOutputParser, ResponseSchema, OutputFixingParser, RetryOutputParser ) from pydantic import BaseModel, Field from typing import List # Pydantic parser for structured data class ProductInfo(BaseModel): name: str = Field(description="Product name") price: float = Field(description="Product price in USD") features: List[str] = Field(description="List of key features") in_stock: bool = Field(description="Whether product is in stock") pydantic_parser = PydanticOutputParser(pydantic_object=ProductInfo) # Create prompt with format instructions prompt_with_parser = PromptTemplate( template=""" Extract product information from the following text. {format_instructions} Text: {text} """, input_variables=["text"], partial_variables={"format_instructions": pydantic_parser.get_format_instructions()} ) # Use the chain parsing_chain = LLMChain(llm=llm, prompt=prompt_with_parser) raw_output = parsing_chain.run( text="The new iPhone 15 Pro costs $999 and features a titanium design, A17 Pro chip, and improved camera. Currently in stock." ) # Parse the output try: parsed_output = pydantic_parser.parse(raw_output) print(f"Product: {parsed_output.name}") print(f"Price: $\{parsed_output.price}") print(f"Features: {', '.join(parsed_output.features)}") except Exception as e: print(f"Parsing error: {e}") # Structured output parser with schemas response_schemas = [ ResponseSchema(name="sentiment", description="sentiment of the text (positive/negative/neutral)"), ResponseSchema(name="confidence", description="confidence score between 0 and 1"), ResponseSchema(name="key_points", description="list of key points mentioned") ] structured_parser = StructuredOutputParser.from_response_schemas(response_schemas) # Output fixing parser - attempts to fix malformed outputs fixing_parser = OutputFixingParser.from_llm(parser=pydantic_parser, llm=llm) # Retry parser - retries with better instructions if parsing fails retry_parser = RetryOutputParser.from_llm(parser=pydantic_parser, llm=llm) # Combined chain with error handling def safe_parse_chain(text: str) -> ProductInfo: """Parse with fallback strategies""" chain = LLMChain(llm=llm, prompt=prompt_with_parser) raw_output = chain.run(text=text) try: # Try standard parsing return pydantic_parser.parse(raw_output) except Exception as e: print(f"Initial parsing failed: {e}") try: # Try fixing parser return fixing_parser.parse(raw_output) except Exception as e: print(f"Fixing parser failed: {e}") # Final attempt with retry parser return retry_parser.parse_with_prompt(raw_output, prompt_with_parser.format(text=text))
Memory and Conversation Management
Implementing stateful conversations with context retention
Memory Types
from langchain.memory import ( ConversationBufferMemory, ConversationBufferWindowMemory, ConversationSummaryMemory, ConversationSummaryBufferMemory, VectorStoreRetrieverMemory, CombinedMemory ) from langchain.chains import ConversationChain # 1. Buffer Memory - Stores everything buffer_memory = ConversationBufferMemory() conversation = ConversationChain( llm=llm, memory=buffer_memory, verbose=True ) # Have a conversation conversation.predict(input="Hi, I'm Alice and I love hiking") conversation.predict(input="What's my name?") conversation.predict(input="What do I enjoy doing?") # 2. Window Memory - Keeps last K interactions window_memory = ConversationBufferWindowMemory(k=3) windowed_conversation = ConversationChain( llm=llm, memory=window_memory ) # 3. Summary Memory - Summarizes conversation summary_memory = ConversationSummaryMemory(llm=llm) summary_conversation = ConversationChain( llm=llm, memory=summary_memory ) # 4. Summary Buffer Memory - Hybrid approach summary_buffer_memory = ConversationSummaryBufferMemory( llm=llm, max_token_limit=200 # Keep recent messages, summarize older ones ) # 5. Vector Store Memory - Semantic search over conversation history from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import FAISS embeddings = OpenAIEmbeddings() vectorstore = FAISS.from_texts([], embeddings) retriever = vectorstore.as_retriever(search_kwargs=dict(k=3)) vector_memory = VectorStoreRetrieverMemory( retriever=retriever, memory_key="history", input_key="input" ) # Add memories vector_memory.save_context( {"input": "I'm working on a Python project"}, {"output": "That's great! What kind of project?"} ) vector_memory.save_context( {"input": "It's a web scraper"}, {"output": "Web scraping can be very useful. Are you using BeautifulSoup or Scrapy?"} ) # Retrieve relevant memories relevant_history = vector_memory.load_memory_variables({"input": "Python coding"}) print(relevant_history)
Custom Memory Implementation
from langchain.schema import BaseMemory from typing import List, Dict, Any import json from datetime import datetime class PersistentConversationMemory(BaseMemory): """Custom memory that persists to disk with metadata""" def __init__(self, file_path: str = "conversation_history.json"): self.file_path = file_path self.history: List[Dict[str, Any]] = [] self.load_history() @property def memory_variables(self) -> List[str]: """Define memory variables to inject into prompt""" return ["history", "user_info"] def load_history(self): """Load conversation history from disk""" try: with open(self.file_path, 'r') as f: data = json.load(f) self.history = data.get('history', []) except FileNotFoundError: self.history = [] def save_history(self): """Save conversation history to disk""" with open(self.file_path, 'w') as f: json.dump({ 'history': self.history, 'last_updated': datetime.now().isoformat() }, f, indent=2) def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None: """Save conversation turn with metadata""" self.history.append({ 'timestamp': datetime.now().isoformat(), 'input': inputs.get('input', ''), 'output': outputs.get('output', ''), 'metadata': { 'tokens_used': outputs.get('tokens_used', 0), 'model': outputs.get('model', 'unknown') } }) self.save_history() def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]: """Load memory variables for prompt""" # Format recent history recent_history = self.history[-5:] # Last 5 exchanges formatted_history = "\n".join([ f"Human: {turn['input']}\nAssistant: {turn['output']}" for turn in recent_history ]) # Extract user information from history user_info = self._extract_user_info() return { 'history': formatted_history, 'user_info': user_info } def _extract_user_info(self) -> str: """Extract key user information from conversation history""" # Simple implementation - could use NLP for better extraction info_pieces = [] for turn in self.history: input_lower = turn['input'].lower() if 'my name is' in input_lower or "i'm " in input_lower: info_pieces.append(turn['input']) return ' '.join(info_pieces[-3:]) if info_pieces else "No user info available" def clear(self) -> None: """Clear conversation history""" self.history = [] self.save_history() # Use custom memory custom_memory = PersistentConversationMemory() custom_chain = ConversationChain( llm=llm, memory=custom_memory, prompt=PromptTemplate( input_variables=["history", "user_info", "input"], template=""" Previous conversation: {history} Known user information: {user_info} Human: {input} Assistant: """ ) )
Memory with Multiple Chains
# Shared memory across multiple chains from langchain.memory import ConversationEntityMemory # Entity memory tracks entities mentioned in conversation entity_memory = ConversationEntityMemory(llm=llm) # Chain 1: Information gathering info_chain = ConversationChain( llm=llm, memory=entity_memory, prompt=PromptTemplate( input_variables=["entities", "history", "input"], template=""" You are gathering information about the user. Known entities: {entities} Conversation history: {history} Human: {input} Assistant:""" ) ) # Chain 2: Personalized recommendations recommendation_chain = ConversationChain( llm=llm, memory=entity_memory, # Same memory instance prompt=PromptTemplate( input_variables=["entities", "history", "input"], template=""" You provide personalized recommendations based on user information. Known about user: {entities} Previous conversations: {history} Human: {input} Assistant:""" ) ) # Use both chains with shared memory info_chain.predict(input="I'm John and I love Italian food and jazz music") recommendation = recommendation_chain.predict(input="Recommend a restaurant for me") print(recommendation)
Agents and Tools Implementation
Building intelligent agents that can use tools and make decisions
Creating Custom Tools
from langchain.agents import Tool, AgentExecutor, create_react_agent from langchain.tools import BaseTool, StructuredTool from langchain import hub from typing import Optional, Type from pydantic import BaseModel, Field import requests import json # Simple function tool def get_weather(location: str) -> str: """Get weather for a location (mock implementation)""" # In real implementation, call weather API weather_data = { "New York": "Sunny, 72°F", "London": "Cloudy, 59°F", "Tokyo": "Rainy, 65°F" } return weather_data.get(location, "Weather data not available") # Create tool from function weather_tool = Tool( name="get_weather", func=get_weather, description="Get current weather for a city. Input should be a city name." ) # Structured tool with Pydantic class CalculatorInput(BaseModel): expression: str = Field(description="Mathematical expression to evaluate") def calculate(expression: str) -> str: """Safely evaluate mathematical expressions""" try: # Only allow safe operations allowed_chars = "0123456789+-*/()., " if all(c in allowed_chars for c in expression): result = eval(expression) return f"The result is: {result}" else: return "Invalid expression. Only basic math operations allowed." except Exception as e: return f"Error calculating: {str(e)}" calculator_tool = StructuredTool.from_function( func=calculate, name="calculator", description="Perform mathematical calculations", args_schema=CalculatorInput ) # Custom tool class class WebSearchTool(BaseTool): """Custom tool for web search""" name = "web_search" description = "Search the web for current information" def _run(self, query: str) -> str: """Execute web search""" # Mock implementation - replace with actual search API results = { "LangChain": "LangChain is a framework for developing applications powered by language models", "Python": "Python is a high-level programming language", } for key in results: if key.lower() in query.lower(): return results[key] return f"No results found for: {query}" async def _arun(self, query: str) -> str: """Async version""" return self._run(query) # ParrotRouter API tool class ParrotRouterTool(BaseTool): """Tool for making LLM calls via ParrotRouter""" name = "ask_ai" description = "Ask an AI model for help with complex questions" def __init__(self, api_key: str): super().__init__() self.api_key = api_key def _run(self, question: str) -> str: """Make API call to ParrotRouter""" headers = { 'Authorization': f'Bearer {self.api_key}', 'Content-Type': 'application/json' } payload = { 'model': 'gpt-3.5-turbo', 'messages': [{'role': 'user', 'content': question}], 'max_tokens': 200 } try: response = requests.post( 'https://api.parrotrouter.com/v1/chat/completions', headers=headers, json=payload ) response.raise_for_status() return response.json()['choices'][0]['message']['content'] except Exception as e: return f"Error calling AI: {str(e)}" async def _arun(self, question: str) -> str: """Async version""" # Implement async version with aiohttp return self._run(question)
Agent Implementation
# Initialize tools tools = [ weather_tool, calculator_tool, WebSearchTool(), ParrotRouterTool(api_key=os.environ['PARROTROUTER_API_KEY']) ] # Get ReAct prompt from hub prompt = hub.pull("hwchase17/react") # Create agent agent = create_react_agent( llm=llm, tools=tools, prompt=prompt ) # Create agent executor agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=True, max_iterations=5, early_stopping_method="generate" ) # Use the agent result = agent_executor.invoke({ "input": "What's the weather in New York? Also, calculate 23 * 45 for me." }) print(result['output']) # Custom agent with specific behavior from langchain.agents import AgentOutputParser from langchain.schema import AgentAction, AgentFinish import re class CustomAgentOutputParser(AgentOutputParser): """Custom parser for agent outputs""" def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]: # Check if agent should finish if "Final Answer:" in llm_output: return AgentFinish( return_values={"output": llm_output.split("Final Answer:")[-1].strip()}, log=llm_output, ) # Parse action and input regex = r"Action: (.*?)[\n]*Action Input: (.*)" match = re.search(regex, llm_output, re.DOTALL) if not match: raise ValueError(f"Could not parse LLM output: '{llm_output}'") action = match.group(1).strip() action_input = match.group(2).strip() return AgentAction(tool=action, tool_input=action_input, log=llm_output) # Agent with custom tools and memory from langchain.memory import ConversationBufferMemory agent_memory = ConversationBufferMemory(memory_key="chat_history") conversational_agent = create_react_agent( llm=llm, tools=tools, prompt=PromptTemplate( input_variables=["input", "chat_history", "agent_scratchpad"], template=""" You are a helpful AI assistant with access to various tools. Previous conversation: {chat_history} Available tools: - get_weather: Get current weather for a city - calculator: Perform mathematical calculations - web_search: Search the web for information - ask_ai: Ask another AI for help with complex questions To use a tool, respond with: Thought: [your reasoning] Action: [tool name] Action Input: [tool input] When you have a final answer, respond with: Final Answer: [your answer] Human: {input} {agent_scratchpad} """ ) ) conversational_executor = AgentExecutor( agent=conversational_agent, tools=tools, memory=agent_memory, verbose=True )
Tool Selection Strategies
# Multi-agent system with specialized agents from langchain.agents import initialize_agent, AgentType # Research agent with search tools research_tools = [WebSearchTool(), ParrotRouterTool(api_key=os.environ['PARROTROUTER_API_KEY'])] research_agent = initialize_agent( research_tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True ) # Math agent with calculator math_tools = [calculator_tool] math_agent = initialize_agent( math_tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True ) # Router agent that delegates to specialized agents class RouterAgent: """Routes queries to appropriate specialized agents""" def __init__(self, llm, agents: Dict[str, Any]): self.llm = llm self.agents = agents self.router_prompt = PromptTemplate( input_variables=["query"], template=""" Analyze this query and determine which type of agent should handle it: - 'research' for web searches, current events, general knowledge - 'math' for calculations and mathematical problems - 'general' for everything else Query: {query} Respond with just the agent type (research/math/general): """ ) def route(self, query: str) -> str: """Determine which agent to use""" router_chain = LLMChain(llm=self.llm, prompt=self.router_prompt) agent_type = router_chain.run(query=query).strip().lower() if agent_type in self.agents: return self.agents[agent_type].run(query) else: return self.agents['general'].run(query) # Initialize router router = RouterAgent( llm=llm, agents={ 'research': research_agent, 'math': math_agent, 'general': agent_executor } ) # Use router result = router.route("What's the population of Tokyo?") # Goes to research result = router.route("Calculate the compound interest on $1000 at 5% for 10 years") # Goes to math
Document Loaders and Text Splitters
Processing documents for LLM consumption
Document Loaders
from langchain_community.document_loaders import ( TextLoader, PyPDFLoader, CSVLoader, UnstructuredHTMLLoader, WebBaseLoader, DirectoryLoader, JSONLoader, UnstructuredMarkdownLoader ) from langchain.schema import Document import glob # Text file loader text_loader = TextLoader("document.txt", encoding='utf-8') text_docs = text_loader.load() # PDF loader pdf_loader = PyPDFLoader("document.pdf") pdf_docs = pdf_loader.load() # Load and split pages pdf_pages = pdf_loader.load_and_split() # CSV loader with custom settings csv_loader = CSVLoader( file_path="data.csv", csv_args={ 'delimiter': ',', 'quotechar': '"', 'fieldnames': ['name', 'description', 'price'] } ) csv_docs = csv_loader.load() # Web loader web_loader = WebBaseLoader([ "https://example.com/page1", "https://example.com/page2" ]) web_docs = web_loader.load() # Directory loader for multiple files dir_loader = DirectoryLoader( 'documents/', glob="**/*.txt", loader_cls=TextLoader, loader_kwargs={'encoding': 'utf-8'} ) all_docs = dir_loader.load() # JSON loader with jq schema json_loader = JSONLoader( file_path='data.json', jq_schema='.records[]', # Extract each record text_content=False, # Load as structured data metadata_func=lambda record, metadata: {**metadata, "source": "json"} ) # Custom loader class CustomDataLoader: """Custom loader for proprietary format""" def __init__(self, file_path: str): self.file_path = file_path def load(self) -> List[Document]: """Load and parse custom format""" documents = [] with open(self.file_path, 'r') as f: content = f.read() # Custom parsing logic sections = content.split('---SECTION---') for i, section in enumerate(sections): if section.strip(): # Extract metadata lines = section.strip().split('\n') title = lines[0] if lines else f"Section {i}" content = '\n'.join(lines[1:]) if len(lines) > 1 else "" doc = Document( page_content=content, metadata={ 'source': self.file_path, 'section': i, 'title': title } ) documents.append(doc) return documents # Load from multiple sources def load_knowledge_base(sources: Dict[str, str]) -> List[Document]: """Load documents from multiple sources""" all_documents = [] for source_type, source_path in sources.items(): try: if source_type == 'pdf': loader = PyPDFLoader(source_path) elif source_type == 'csv': loader = CSVLoader(source_path) elif source_type == 'html': loader = UnstructuredHTMLLoader(source_path) elif source_type == 'markdown': loader = UnstructuredMarkdownLoader(source_path) else: loader = TextLoader(source_path) documents = loader.load() # Add source type to metadata for doc in documents: doc.metadata['source_type'] = source_type all_documents.extend(documents) except Exception as e: print(f"Error loading {source_path}: {e}") return all_documents
Text Splitters
from langchain.text_splitter import ( RecursiveCharacterTextSplitter, CharacterTextSplitter, TokenTextSplitter, MarkdownTextSplitter, PythonCodeTextSplitter, RecursiveJsonSplitter ) # Recursive character splitter (recommended for most use cases) text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, length_function=len, separators=["\n\n", "\n", " ", ""] # Split by paragraphs, then lines, then words ) documents = text_splitter.split_documents(text_docs) # Token-based splitter (for precise token control) from tiktoken import encoding_for_model encoding = encoding_for_model('gpt-3.5-turbo') token_splitter = TokenTextSplitter( chunk_size=500, # tokens chunk_overlap=50, encoding_name='cl100k_base' ) # Markdown splitter (preserves structure) markdown_splitter = MarkdownTextSplitter( chunk_size=1000, chunk_overlap=100 ) # Code splitter (language-aware) python_splitter = PythonCodeTextSplitter( chunk_size=1000, chunk_overlap=200 ) # Custom splitter with metadata preservation class MetadataPreservingSplitter: """Splitter that maintains document metadata""" def __init__(self, base_splitter): self.base_splitter = base_splitter def split_documents(self, documents: List[Document]) -> List[Document]: """Split documents while preserving and enriching metadata""" all_splits = [] for doc in documents: splits = self.base_splitter.split_text(doc.page_content) for i, split in enumerate(splits): # Create new document with enriched metadata split_doc = Document( page_content=split, metadata={ **doc.metadata, 'chunk_index': i, 'total_chunks': len(splits), 'chunk_size': len(split), 'original_size': len(doc.page_content) } ) all_splits.append(split_doc) return all_splits # Semantic splitter (splits based on meaning) from langchain_experimental.text_splitter import SemanticChunker from langchain_openai import OpenAIEmbeddings semantic_splitter = SemanticChunker( embeddings=OpenAIEmbeddings(), breakpoint_threshold_type="percentile", # or "standard_deviation", "interquartile" breakpoint_threshold_amount=0.5 ) # Split by document type def smart_split_documents(documents: List[Document]) -> List[Document]: """Apply appropriate splitter based on document type""" split_docs = [] for doc in documents: source_type = doc.metadata.get('source_type', 'text') if source_type == 'code': splitter = PythonCodeTextSplitter(chunk_size=1000, chunk_overlap=200) elif source_type == 'markdown': splitter = MarkdownTextSplitter(chunk_size=1000, chunk_overlap=100) elif source_type == 'structured': splitter = RecursiveJsonSplitter(max_chunk_size=1000) else: splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200 ) splits = splitter.split_text(doc.page_content) for i, split in enumerate(splits): split_doc = Document( page_content=split, metadata={ **doc.metadata, 'split_index': i, 'splitter_type': type(splitter).__name__ } ) split_docs.append(split_doc) return split_docs
Choose chunk size based on your use case: smaller chunks (500-1000 chars) for precise retrieval, larger chunks (2000-4000 chars) for maintaining context.
Vector Stores and Retrieval Chains
Implementing Retrieval-Augmented Generation (RAG) systems
Vector Store Setup
from langchain_community.vectorstores import FAISS, Chroma, Pinecone from langchain_openai import OpenAIEmbeddings from langchain.retrievers import ContextualCompressionRetriever from langchain.retrievers.document_compressors import LLMChainExtractor import pinecone # Initialize embeddings embeddings = OpenAIEmbeddings() # FAISS - Local vector store faiss_store = FAISS.from_documents(documents, embeddings) # Save and load FAISS faiss_store.save_local("faiss_index") loaded_faiss = FAISS.load_local("faiss_index", embeddings) # Chroma - Persistent local store chroma_store = Chroma.from_documents( documents, embeddings, persist_directory="./chroma_db", collection_name="my_collection" ) # Pinecone - Cloud vector database pinecone.init(api_key=os.environ['PINECONE_API_KEY'], environment='us-west1-gcp') pinecone_store = Pinecone.from_documents( documents, embeddings, index_name="langchain-index", namespace="my-namespace" ) # Advanced retrieval with filters retriever = faiss_store.as_retriever( search_type="similarity", # or "mmr" for diversity search_kwargs={ "k": 5, # Return top 5 results "score_threshold": 0.7, # Minimum similarity score "filter": {"source": "technical_docs"} # Metadata filter } ) # Contextual compression retriever compressor = LLMChainExtractor.from_llm(llm) compression_retriever = ContextualCompressionRetriever( base_compressor=compressor, base_retriever=retriever ) # Multi-query retriever for better results from langchain.retrievers.multi_query import MultiQueryRetriever multi_query_retriever = MultiQueryRetriever.from_llm( retriever=faiss_store.as_retriever(), llm=llm ) # Ensemble retriever combining multiple strategies from langchain.retrievers import EnsembleRetriever from langchain_community.retrievers import BM25Retriever # BM25 for keyword search bm25_retriever = BM25Retriever.from_documents(documents) bm25_retriever.k = 5 # Combine with semantic search ensemble_retriever = EnsembleRetriever( retrievers=[bm25_retriever, faiss_store.as_retriever()], weights=[0.3, 0.7] # Weight keyword vs semantic search )
RAG Implementation
from langchain.chains import RetrievalQA, ConversationalRetrievalChain from langchain.prompts import PromptTemplate # Basic RAG chain qa_chain = RetrievalQA.from_chain_type( llm=llm, chain_type="stuff", # or "map_reduce", "refine", "map_rerank" retriever=retriever, return_source_documents=True, verbose=True ) # Query the chain response = qa_chain({"query": "What is LangChain?"}) print(f"Answer: {response['result']}") print(f"Sources: {[doc.metadata for doc in response['source_documents']]}") # Custom RAG prompt rag_prompt = PromptTemplate( input_variables=["context", "question"], template=""" You are an AI assistant helping users understand technical documentation. Use the following context to answer the question. If you don't know the answer, say so - don't make up information. Context: {context} Question: {question} Answer: """ ) custom_qa_chain = RetrievalQA.from_chain_type( llm=llm, retriever=retriever, chain_type_kwargs={"prompt": rag_prompt} ) # Conversational RAG with memory conversational_chain = ConversationalRetrievalChain.from_llm( llm=llm, retriever=compression_retriever, memory=ConversationBufferMemory( memory_key="chat_history", return_messages=True ), combine_docs_chain_kwargs={"prompt": rag_prompt} ) # Have a conversation result = conversational_chain({"question": "What is RAG?"}) result = conversational_chain({"question": "Can you give me an example?"}) # Advanced RAG with source citation class CitationRAGChain: """RAG chain that provides inline citations""" def __init__(self, llm, retriever): self.llm = llm self.retriever = retriever self.citation_prompt = PromptTemplate( input_variables=["context", "question"], template=""" Answer the question based on the given context. Cite your sources using [1], [2], etc. format. Context: {context} Question: {question} Answer with citations: """ ) def run(self, question: str) -> Dict[str, Any]: # Retrieve relevant documents docs = self.retriever.get_relevant_documents(question) # Format context with numbering context_parts = [] for i, doc in enumerate(docs, 1): source = doc.metadata.get('source', 'Unknown') context_parts.append(f"[{i}] {doc.page_content} (Source: {source})") context = "\n\n".join(context_parts) # Generate answer chain = LLMChain(llm=self.llm, prompt=self.citation_prompt) answer = chain.run(context=context, question=question) # Extract citations from answer import re citation_pattern = r'[(d+)]' cited_indices = set(int(match) - 1 for match in re.findall(citation_pattern, answer)) cited_docs = [docs[i] for i in cited_indices if i < len(docs)] return { 'answer': answer, 'source_documents': docs, 'cited_documents': cited_docs } # Use citation chain citation_chain = CitationRAGChain(llm, retriever) result = citation_chain.run("What are the benefits of RAG?") print(f"Answer: {result['answer']}") print(f"Cited sources: {[doc.metadata['source'] for doc in result['cited_documents']]}")
Hybrid Search Implementation
# Hybrid search combining multiple techniques from typing import List, Tuple import numpy as np class HybridRetriever: """Combines semantic, keyword, and metadata-based retrieval""" def __init__(self, semantic_store: FAISS, keyword_retriever: BM25Retriever, metadata_store: Dict[str, Document]): self.semantic_store = semantic_store self.keyword_retriever = keyword_retriever self.metadata_store = metadata_store def retrieve(self, query: str, k: int = 5, semantic_weight: float = 0.5, keyword_weight: float = 0.3, metadata_weight: float = 0.2, filters: Optional[Dict] = None) -> List[Document]: """Retrieve documents using hybrid approach""" # Semantic search semantic_docs = self.semantic_store.similarity_search_with_score(query, k=k*2) # Keyword search keyword_docs = self.keyword_retriever.get_relevant_documents(query)[:k*2] # Score normalization doc_scores = {} # Add semantic scores if semantic_docs: max_semantic_score = max(score for _, score in semantic_docs) for doc, score in semantic_docs: doc_id = doc.metadata.get('doc_id', doc.page_content[:50]) doc_scores[doc_id] = { 'doc': doc, 'semantic_score': score / max_semantic_score * semantic_weight } # Add keyword scores for i, doc in enumerate(keyword_docs): doc_id = doc.metadata.get('doc_id', doc.page_content[:50]) keyword_score = (len(keyword_docs) - i) / len(keyword_docs) * keyword_weight if doc_id in doc_scores: doc_scores[doc_id]['keyword_score'] = keyword_score else: doc_scores[doc_id] = { 'doc': doc, 'semantic_score': 0, 'keyword_score': keyword_score } # Add metadata boost if filters: for doc_id, data in doc_scores.items(): metadata_match = all( data['doc'].metadata.get(key) == value for key, value in filters.items() ) data['metadata_score'] = metadata_weight if metadata_match else 0 # Calculate final scores final_scores = [] for doc_id, data in doc_scores.items(): total_score = ( data.get('semantic_score', 0) + data.get('keyword_score', 0) + data.get('metadata_score', 0) ) final_scores.append((data['doc'], total_score)) # Sort by score and return top k final_scores.sort(key=lambda x: x[1], reverse=True) return [doc for doc, _ in final_scores[:k]] # Vector store with metadata filtering class MetadataVectorStore: """Vector store with advanced metadata filtering""" def __init__(self, embeddings): self.embeddings = embeddings self.vectors = [] self.documents = [] self.metadata_index = {} def add_documents(self, documents: List[Document]): """Add documents with metadata indexing""" for doc in documents: # Generate embedding embedding = self.embeddings.embed_query(doc.page_content) self.vectors.append(embedding) self.documents.append(doc) # Index metadata for key, value in doc.metadata.items(): if key not in self.metadata_index: self.metadata_index[key] = {} if value not in self.metadata_index[key]: self.metadata_index[key][value] = [] self.metadata_index[key][value].append(len(self.documents) - 1) def similarity_search_with_filter(self, query: str, k: int = 5, filters: Optional[Dict] = None) -> List[Document]: """Search with metadata filtering""" # Get candidate indices based on filters if filters: candidate_indices = None for key, value in filters.items(): if key in self.metadata_index and value in self.metadata_index[key]: indices = set(self.metadata_index[key][value]) if candidate_indices is None: candidate_indices = indices else: candidate_indices = candidate_indices.intersection(indices) if not candidate_indices: return [] else: candidate_indices = set(range(len(self.documents))) # Generate query embedding query_embedding = self.embeddings.embed_query(query) # Calculate similarities for candidates only similarities = [] for idx in candidate_indices: similarity = np.dot(query_embedding, self.vectors[idx]) similarities.append((idx, similarity)) # Sort and return top k similarities.sort(key=lambda x: x[1], reverse=True) return [self.documents[idx] for idx, _ in similarities[:k]]
LangChain Expression Language (LCEL)
Declarative way to compose chains
LCEL Basics
from langchain_core.runnables import RunnablePassthrough, RunnableParallel from operator import itemgetter # Simple LCEL chain prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}") chain = prompt | llm # Invoke the chain result = chain.invoke({"topic": "programming"}) print(result.content) # Chain with output parser from langchain.output_parsers import CommaSeparatedListOutputParser parser = CommaSeparatedListOutputParser() prompt = PromptTemplate( template="List 5 {category} items separated by commas:", input_variables=["category"] ) list_chain = prompt | llm | parser items = list_chain.invoke({"category": "programming languages"}) print(items) # ['Python', 'JavaScript', 'Java', 'C++', 'Go'] # Parallel execution parallel_chain = RunnableParallel( joke=prompt | llm, facts=PromptTemplate.from_template("List 3 facts about {topic}") | llm, definition=PromptTemplate.from_template("Define {topic} in one sentence") | llm ) results = parallel_chain.invoke({"topic": "artificial intelligence"}) print(f"Joke: {results['joke'].content}") print(f"Facts: {results['facts'].content}") print(f"Definition: {results['definition'].content}") # Complex chain with retrieval retrieval_chain = ( { "context": retriever | (lambda docs: "\n".join(doc.page_content for doc in docs)), "question": RunnablePassthrough() } | PromptTemplate.from_template(""" Context: {context} Question: {question} Answer based on the context: """) | llm | StrOutputParser() ) answer = retrieval_chain.invoke("What is LangChain?") # Branching with routing def route_question(info): """Route to different chains based on question type""" question = info["question"].lower() if "calculate" in question or "math" in question: return "math" elif "translate" in question: return "translation" else: return "general" math_chain = PromptTemplate.from_template("Calculate: {question}") | llm translation_chain = PromptTemplate.from_template("Translate: {question}") | llm general_chain = PromptTemplate.from_template("Answer: {question}") | llm branch_chain = ( RunnablePassthrough.assign( route=lambda x: route_question(x) ) | { "math": math_chain, "translation": translation_chain, "general": general_chain } | itemgetter(itemgetter("route")) )
Advanced LCEL Patterns
# Fallback chains from langchain_core.runnables import RunnableLambda primary_chain = prompt | ChatOpenAI(model="gpt-4") fallback_chain = prompt | ChatOpenAI(model="gpt-3.5-turbo") def with_fallback(primary, fallback): """Create chain with fallback""" def run_with_fallback(inputs): try: return primary.invoke(inputs) except Exception as e: print(f"Primary failed: {e}, using fallback") return fallback.invoke(inputs) return RunnableLambda(run_with_fallback) robust_chain = with_fallback(primary_chain, fallback_chain) # Streaming with LCEL async def stream_chain(): """Stream responses token by token""" chain = prompt | llm async for chunk in chain.astream({"topic": "space exploration"}): print(chunk.content, end="", flush=True) # Batch processing batch_chain = prompt | llm topics = [ {"topic": "quantum computing"}, {"topic": "climate change"}, {"topic": "space exploration"} ] # Process in parallel batch_results = await batch_chain.abatch(topics) # Custom runnable from langchain_core.runnables import Runnable class CustomProcessor(Runnable): """Custom processing step""" def invoke(self, input: Dict, config: Optional[Dict] = None) -> Dict: # Custom processing logic processed = input.copy() processed['word_count'] = len(input.get('text', '').split()) processed['char_count'] = len(input.get('text', '')) return processed async def ainvoke(self, input: Dict, config: Optional[Dict] = None) -> Dict: # Async version return self.invoke(input, config) # Use in chain analysis_chain = ( {"text": RunnablePassthrough()} | CustomProcessor() | RunnableLambda(lambda x: f"Text has {x['word_count']} words and {x['char_count']} characters") ) # Conditional chains def conditional_chain(): """Chain with conditional logic""" def check_length(text: str) -> str: return "long" if len(text) > 100 else "short" short_prompt = PromptTemplate.from_template("Expand this short text: {text}") long_prompt = PromptTemplate.from_template("Summarize this long text: {text}") return ( RunnablePassthrough.assign( length_type=lambda x: check_length(x["text"]) ) | RunnableLambda( lambda x: short_prompt if x["length_type"] == "short" else long_prompt ) | llm ) # Chain with retry and timeout from langchain_core.runnables import RunnableConfig config = RunnableConfig( max_retries=3, timeout=30.0, tags=["production", "critical"], metadata={"version": "1.0"} ) production_chain = (prompt | llm).with_config(config)
Streaming and Async Operations
Implementing responsive, real-time LLM applications
Streaming Responses
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler from langchain.callbacks.base import AsyncCallbackHandler from typing import Any, Dict, List, Optional import asyncio # Basic streaming streaming_llm = ChatOpenAI( streaming=True, callbacks=[StreamingStdOutCallbackHandler()] ) # Stream to console response = streaming_llm.invoke("Write a short story about AI") # Custom streaming handler class CustomStreamHandler(AsyncCallbackHandler): """Custom handler for streaming events""" def __init__(self, websocket=None): self.websocket = websocket self.tokens = [] async def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs): """Called when LLM starts""" print("Starting generation...") if self.websocket: await self.websocket.send_json({"type": "start"}) async def on_llm_new_token(self, token: str, **kwargs): """Called for each new token""" self.tokens.append(token) if self.websocket: await self.websocket.send_json({ "type": "token", "content": token }) else: print(token, end="", flush=True) async def on_llm_end(self, response, **kwargs): """Called when LLM finishes""" print("\nGeneration complete!") if self.websocket: await self.websocket.send_json({ "type": "complete", "full_response": "".join(self.tokens) }) async def on_llm_error(self, error: Exception, **kwargs): """Called on error""" print(f"Error: {error}") if self.websocket: await self.websocket.send_json({ "type": "error", "error": str(error) }) # Use custom handler custom_handler = CustomStreamHandler() streaming_llm = ChatOpenAI( streaming=True, callbacks=[custom_handler] ) # Stream with token counting class TokenCounterHandler(AsyncCallbackHandler): """Count tokens while streaming""" def __init__(self): self.token_count = 0 self.total_tokens = 0 async def on_llm_new_token(self, token: str, **kwargs): self.token_count += 1 # Rough estimation - would use tiktoken for accuracy self.total_tokens += len(token.split()) if self.token_count % 10 == 0: print(f"\n[Tokens: {self.token_count}]", end="") # Streaming with multiple handlers multi_handler_llm = ChatOpenAI( streaming=True, callbacks=[ StreamingStdOutCallbackHandler(), TokenCounterHandler(), custom_handler ] )
Async Operations
# Async chain execution async def async_rag_example(): """Async RAG implementation""" # Async retriever async def async_retrieve(query: str) -> List[Document]: # Simulate async retrieval await asyncio.sleep(0.1) return await retriever.aget_relevant_documents(query) # Async chain async_chain = ( { "context": lambda x: async_retrieve(x["question"]), "question": RunnablePassthrough() } | PromptTemplate.from_template(""" Context: {context} Question: {question} Answer: """) | llm ) # Single async call result = await async_chain.ainvoke({"question": "What is async programming?"}) print(result) # Batch async calls questions = [ {"question": "What is Python?"}, {"question": "What is JavaScript?"}, {"question": "What is Rust?"} ] results = await async_chain.abatch(questions) for i, result in enumerate(results): print(f"Q{i+1}: {result.content[:100]}...") # Async streaming async def async_stream_example(): """Stream responses asynchronously""" chain = prompt | llm async for chunk in chain.astream({"topic": "future of technology"}): print(chunk.content, end="", flush=True) # Simulate processing each chunk await asyncio.sleep(0.01) # Concurrent async operations async def concurrent_llm_calls(): """Make multiple LLM calls concurrently""" tasks = [] prompts = [ "Explain quantum computing in simple terms", "What are the benefits of renewable energy?", "How does machine learning work?" ] for prompt_text in prompts: task = llm.ainvoke(prompt_text) tasks.append(task) # Wait for all tasks to complete start_time = asyncio.get_event_loop().time() results = await asyncio.gather(*tasks) end_time = asyncio.get_event_loop().time() print(f"Completed {len(results)} calls in {end_time - start_time:.2f} seconds") return results # Async agent with streaming class AsyncStreamingAgent: """Agent that streams responses asynchronously""" def __init__(self, llm, tools, websocket=None): self.llm = llm self.tools = tools self.websocket = websocket async def arun(self, query: str): """Run agent with streaming""" # Stream thinking process if self.websocket: await self.websocket.send_json({ "type": "thinking", "content": "Processing your request..." }) # Determine tool to use tool_chain = ( PromptTemplate.from_template( "Which tool should I use for: {query}\nTools: {tools}\nTool:" ) | self.llm ) tool_name = await tool_chain.ainvoke({ "query": query, "tools": ", ".join([t.name for t in self.tools]) }) # Stream tool selection if self.websocket: await self.websocket.send_json({ "type": "tool_selection", "tool": tool_name.content }) # Execute tool and stream results # ... tool execution logic ... # WebSocket handler for real-time streaming async def websocket_handler(websocket, path): """Handle WebSocket connections for streaming""" handler = CustomStreamHandler(websocket) streaming_llm = ChatOpenAI( streaming=True, callbacks=[handler] ) async for message in websocket: data = json.loads(message) if data["type"] == "query": # Stream response back through WebSocket await streaming_llm.ainvoke(data["content"]) elif data["type"] == "stop": # Handle stop generation handler.should_stop = True # Rate-limited async operations from asyncio import Semaphore class RateLimitedLLM: """LLM with rate limiting for async operations""" def __init__(self, llm, max_concurrent: int = 5): self.llm = llm self.semaphore = Semaphore(max_concurrent) async def ainvoke(self, prompt: str) -> str: async with self.semaphore: return await self.llm.ainvoke(prompt) async def abatch(self, prompts: List[str]) -> List[str]: tasks = [self.ainvoke(prompt) for prompt in prompts] return await asyncio.gather(*tasks)
Production Deployment Best Practices
Deploying LangChain applications at scale
Error Handling and Fallbacks
from langchain.schema import OutputParserException from tenacity import retry, stop_after_attempt, wait_exponential import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class RobustLangChainApp: """Production-ready LangChain application with error handling""" def __init__(self): self.primary_llm = ChatOpenAI(model="gpt-4", temperature=0.7) self.fallback_llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7) self.cache = {} @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10) ) async def _call_llm_with_retry(self, prompt: str, llm) -> str: """Call LLM with exponential backoff retry""" try: response = await llm.ainvoke(prompt) return response.content except Exception as e: logger.error(f"LLM call failed: {e}") raise async def process_request(self, prompt: str) -> Dict[str, Any]: """Process request with fallback and caching""" # Check cache first cache_key = hashlib.md5(prompt.encode()).hexdigest() if cache_key in self.cache: logger.info("Returning cached response") return self.cache[cache_key] start_time = time.time() try: # Try primary LLM response = await self._call_llm_with_retry(prompt, self.primary_llm) model_used = "gpt-4" except Exception as e: logger.warning(f"Primary LLM failed, falling back: {e}") try: # Fallback to secondary LLM response = await self._call_llm_with_retry(prompt, self.fallback_llm) model_used = "gpt-3.5-turbo" except Exception as e: logger.error(f"All LLMs failed: {e}") return { "error": "Service temporarily unavailable", "status": "failed", "duration": time.time() - start_time } # Prepare result result = { "response": response, "model_used": model_used, "duration": time.time() - start_time, "cached": False, "timestamp": datetime.utcnow().isoformat() } # Cache successful responses self.cache[cache_key] = result return result # Environment-specific configuration class LangChainConfig: """Configuration management for different environments""" def __init__(self, environment: str = "development"): self.environment = environment self.config = self._load_config() def _load_config(self) -> Dict[str, Any]: """Load environment-specific configuration""" base_config = { "llm_timeout": 30, "max_retries": 3, "cache_ttl": 3600, "log_level": "INFO" } env_configs = { "development": { "llm_model": "gpt-3.5-turbo", "temperature": 0.9, "verbose": True, "cache_enabled": False }, "staging": { "llm_model": "gpt-3.5-turbo", "temperature": 0.7, "verbose": True, "cache_enabled": True }, "production": { "llm_model": "gpt-4", "temperature": 0.3, "verbose": False, "cache_enabled": True, "fallback_model": "gpt-3.5-turbo" } } config = {**base_config, **env_configs.get(self.environment, {})} # Override with environment variables for key in config: env_key = f"LANGCHAIN_{key.upper()}" if env_key in os.environ: config[key] = os.environ[env_key] return config def get_llm(self) -> ChatOpenAI: """Get configured LLM instance""" return ChatOpenAI( model=self.config["llm_model"], temperature=self.config["temperature"], timeout=self.config["llm_timeout"], max_retries=self.config["max_retries"] )
Monitoring and Observability
from langchain.callbacks import LangChainTracer from langsmith import Client import prometheus_client from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc import trace_exporter from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor # Prometheus metrics request_counter = prometheus_client.Counter( 'langchain_requests_total', 'Total LangChain requests', ['chain_type', 'model', 'status'] ) request_duration = prometheus_client.Histogram( 'langchain_request_duration_seconds', 'LangChain request duration', ['chain_type', 'model'] ) token_counter = prometheus_client.Counter( 'langchain_tokens_total', 'Total tokens used', ['model', 'token_type'] ) # OpenTelemetry setup trace.set_tracer_provider(TracerProvider()) tracer = trace.get_tracer(__name__) otlp_exporter = trace_exporter.OTLPSpanExporter( endpoint="localhost:4317", insecure=True ) span_processor = BatchSpanProcessor(otlp_exporter) trace.get_tracer_provider().add_span_processor(span_processor) # Custom monitoring callback class MonitoringCallback(AsyncCallbackHandler): """Callback for monitoring and metrics collection""" async def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs): """Track chain execution start""" self.start_time = time.time() self.chain_type = serialized.get("name", "unknown") # Start OpenTelemetry span self.span = tracer.start_span(f"langchain.{self.chain_type}") self.span.set_attribute("chain.type", self.chain_type) self.span.set_attribute("input.size", len(str(inputs))) async def on_llm_end(self, response, **kwargs): """Track LLM completion""" duration = time.time() - self.start_time # Update Prometheus metrics request_duration.labels( chain_type=self.chain_type, model=response.llm_output.get("model_name", "unknown") ).observe(duration) # Track token usage if "token_usage" in response.llm_output: usage = response.llm_output["token_usage"] token_counter.labels( model=response.llm_output.get("model_name", "unknown"), token_type="prompt" ).inc(usage.get("prompt_tokens", 0)) token_counter.labels( model=response.llm_output.get("model_name", "unknown"), token_type="completion" ).inc(usage.get("completion_tokens", 0)) # End span if hasattr(self, 'span'): self.span.set_attribute("tokens.prompt", usage.get("prompt_tokens", 0)) self.span.set_attribute("tokens.completion", usage.get("completion_tokens", 0)) self.span.set_attribute("duration.seconds", duration) self.span.end() async def on_chain_error(self, error: Exception, **kwargs): """Track errors""" request_counter.labels( chain_type=self.chain_type, model="unknown", status="error" ).inc() if hasattr(self, 'span'): self.span.record_exception(error) self.span.set_status(trace.Status(trace.StatusCode.ERROR, str(error))) self.span.end() # LangSmith integration langsmith_client = Client() class LangSmithMonitor: """Monitor LangChain apps with LangSmith""" def __init__(self, project_name: str): self.project_name = project_name self.tracer = LangChainTracer( project_name=project_name, client=langsmith_client ) def monitor_chain(self, chain): """Add monitoring to a chain""" # Add tracer callback if hasattr(chain, 'callbacks'): chain.callbacks.append(self.tracer) return chain async def evaluate_chain(self, chain, test_cases: List[Dict]): """Evaluate chain performance""" results = [] for test_case in test_cases: try: result = await chain.ainvoke(test_case["input"]) # Compare with expected output score = self._calculate_similarity( result, test_case["expected_output"] ) results.append({ "input": test_case["input"], "output": result, "expected": test_case["expected_output"], "score": score, "passed": score > 0.8 }) except Exception as e: results.append({ "input": test_case["input"], "error": str(e), "passed": False }) # Log to LangSmith langsmith_client.create_test_results( project_name=self.project_name, results=results ) return results
Deployment Architecture
# FastAPI application with LangChain from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.responses import StreamingResponse from pydantic import BaseModel import uvicorn from contextlib import asynccontextmanager import redis from typing import Optional # Request/Response models class ChatRequest(BaseModel): message: str session_id: Optional[str] = None stream: bool = False class ChatResponse(BaseModel): response: str session_id: str model_used: str duration: float # Application lifespan management @asynccontextmanager async def lifespan(app: FastAPI): # Startup app.state.llm = ChatOpenAI( model="gpt-3.5-turbo", streaming=True, callbacks=[MonitoringCallback()] ) app.state.redis = redis.Redis( host=os.environ.get("REDIS_HOST", "localhost"), decode_responses=True ) app.state.memory_store = {} yield # Shutdown app.state.redis.close() # Create FastAPI app app = FastAPI(lifespan=lifespan) # Health check endpoint @app.get("/health") async def health_check(): return { "status": "healthy", "version": "1.0.0", "model": "gpt-3.5-turbo" } # Chat endpoint @app.post("/chat", response_model=ChatResponse) async def chat(request: ChatRequest, background_tasks: BackgroundTasks): """Process chat request""" try: # Get or create session session_id = request.session_id or str(uuid.uuid4()) # Get conversation memory if session_id in app.state.memory_store: memory = app.state.memory_store[session_id] else: memory = ConversationBufferMemory() app.state.memory_store[session_id] = memory # Create chain with memory chain = ConversationChain( llm=app.state.llm, memory=memory ) # Process request start_time = time.time() response = await chain.ainvoke({"input": request.message}) duration = time.time() - start_time # Background task to save conversation background_tasks.add_task( save_conversation, session_id, request.message, response["response"] ) return ChatResponse( response=response["response"], session_id=session_id, model_used="gpt-3.5-turbo", duration=duration ) except Exception as e: logger.error(f"Chat error: {e}") raise HTTPException(status_code=500, detail=str(e)) # Streaming endpoint @app.post("/chat/stream") async def chat_stream(request: ChatRequest): """Stream chat responses""" async def generate(): chain = ConversationChain(llm=app.state.llm) async for chunk in chain.astream({"input": request.message}): yield f"data: {json.dumps({'chunk': chunk})}\n\n" yield f"data: {json.dumps({'done': True})}\n\n" return StreamingResponse( generate(), media_type="text/event-stream" ) # Background task async def save_conversation(session_id: str, message: str, response: str): """Save conversation to Redis""" key = f"conversation:{session_id}" conversation = { "timestamp": datetime.utcnow().isoformat(), "message": message, "response": response } app.state.redis.lpush(key, json.dumps(conversation)) app.state.redis.expire(key, 3600) # 1 hour TTL # Docker deployment dockerfile_content = ''' FROM python:3.11-slim WORKDIR /app # Install dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Copy application COPY . . # Environment variables ENV PYTHONUNBUFFERED=1 ENV LANGCHAIN_TRACING_V2=true ENV LANGCHAIN_PROJECT=production # Health check HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \ CMD python -c "import requests; requests.get('http://localhost:8000/health')" # Run application CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"] ''' # Kubernetes deployment k8s_deployment = ''' apiVersion: apps/v1 kind: Deployment metadata: name: langchain-app spec: replicas: 3 selector: matchLabels: app: langchain-app template: metadata: labels: app: langchain-app spec: containers: - name: langchain image: langchain-app:latest ports: - containerPort: 8000 env: - name: OPENAI_API_KEY valueFrom: secretKeyRef: name: langchain-secrets key: openai-api-key - name: REDIS_HOST value: redis-service resources: requests: memory: "512Mi" cpu: "500m" limits: memory: "1Gi" cpu: "1000m" livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10 --- apiVersion: v1 kind: Service metadata: name: langchain-service spec: selector: app: langchain-app ports: - port: 80 targetPort: 8000 type: LoadBalancer '''
Cost Optimization Strategies
Reducing costs while maintaining quality
Token Usage Optimization
from tiktoken import encoding_for_model import functools class TokenOptimizer: """Optimize token usage in LangChain applications""" def __init__(self, model: str = "gpt-3.5-turbo"): self.model = model self.encoding = encoding_for_model(model) self.token_costs = { "gpt-3.5-turbo": {"input": 0.0015, "output": 0.002}, "gpt-4": {"input": 0.03, "output": 0.06}, "claude-3-sonnet": {"input": 0.003, "output": 0.015} } def count_tokens(self, text: str) -> int: """Count tokens in text""" return len(self.encoding.encode(text)) def estimate_cost(self, input_text: str, output_text: str) -> float: """Estimate cost for a request""" input_tokens = self.count_tokens(input_text) output_tokens = self.count_tokens(output_text) costs = self.token_costs.get(self.model, self.token_costs["gpt-3.5-turbo"]) input_cost = (input_tokens / 1000) * costs["input"] output_cost = (output_tokens / 1000) * costs["output"] return input_cost + output_cost def optimize_prompt(self, prompt: str, max_tokens: int = 2000) -> str: """Optimize prompt to fit within token limit""" tokens = self.count_tokens(prompt) if tokens <= max_tokens: return prompt # Truncate intelligently sentences = prompt.split('. ') optimized = [] current_tokens = 0 for sentence in sentences: sentence_tokens = self.count_tokens(sentence + '. ') if current_tokens + sentence_tokens <= max_tokens: optimized.append(sentence) current_tokens += sentence_tokens else: break return '. '.join(optimized) + '.' # Caching layer for cost reduction class CostEffectiveChain: """Chain with caching and model selection for cost optimization""" def __init__(self, redis_client): self.redis_client = redis_client self.token_optimizer = TokenOptimizer() # Model hierarchy from cheapest to most expensive self.model_hierarchy = [ ("gpt-3.5-turbo", 0.7), ("gpt-4", 0.5), ("claude-3-opus", 0.3) ] def get_cached_response(self, prompt: str) -> Optional[str]: """Check cache for response""" cache_key = f"llm_cache:{hashlib.md5(prompt.encode()).hexdigest()}" cached = self.redis_client.get(cache_key) if cached: # Increment cache hit counter self.redis_client.incr("cache_hits") return json.loads(cached)["response"] return None def cache_response(self, prompt: str, response: str, ttl: int = 3600): """Cache response with TTL""" cache_key = f"llm_cache:{hashlib.md5(prompt.encode()).hexdigest()}" self.redis_client.setex( cache_key, ttl, json.dumps({ "response": response, "timestamp": datetime.utcnow().isoformat(), "prompt_tokens": self.token_optimizer.count_tokens(prompt), "response_tokens": self.token_optimizer.count_tokens(response) }) ) async def run_with_cost_optimization(self, prompt: str) -> Dict[str, Any]: """Run chain with cost optimization strategies""" # 1. Check cache first cached = self.get_cached_response(prompt) if cached: return { "response": cached, "cached": True, "cost": 0, "model": "cache" } # 2. Optimize prompt optimized_prompt = self.token_optimizer.optimize_prompt(prompt) # 3. Try models in order of cost for model, temperature in self.model_hierarchy: try: llm = ChatOpenAI(model=model, temperature=temperature) # Estimate if this will fit in context if self.token_optimizer.count_tokens(optimized_prompt) > 3500: continue response = await llm.ainvoke(optimized_prompt) response_text = response.content # Calculate actual cost cost = self.token_optimizer.estimate_cost( optimized_prompt, response_text ) # Cache successful response self.cache_response(prompt, response_text) return { "response": response_text, "cached": False, "cost": cost, "model": model, "tokens": { "input": self.token_optimizer.count_tokens(optimized_prompt), "output": self.token_optimizer.count_tokens(response_text) } } except Exception as e: logger.warning(f"Model {model} failed: {e}") continue raise Exception("All models failed") # Batch processing for cost efficiency class BatchProcessor: """Process requests in batches for better pricing""" def __init__(self, llm, batch_size: int = 10, wait_time: float = 1.0): self.llm = llm self.batch_size = batch_size self.wait_time = wait_time self.pending_requests = [] self.results = {} async def add_request(self, request_id: str, prompt: str): """Add request to batch""" self.pending_requests.append({ "id": request_id, "prompt": prompt, "timestamp": time.time() }) # Process if batch is full if len(self.pending_requests) >= self.batch_size: await self._process_batch() else: # Schedule batch processing asyncio.create_task(self._delayed_process()) async def _delayed_process(self): """Process batch after wait time""" await asyncio.sleep(self.wait_time) if self.pending_requests: await self._process_batch() async def _process_batch(self): """Process all pending requests""" if not self.pending_requests: return batch = self.pending_requests.copy() self.pending_requests.clear() # Create batch prompt combined_prompt = self._create_batch_prompt(batch) try: # Single LLM call for all requests response = await self.llm.ainvoke(combined_prompt) # Parse and distribute results results = self._parse_batch_response(response.content) for i, request in enumerate(batch): if i < len(results): self.results[request["id"]] = { "response": results[i], "batched": True, "batch_size": len(batch) } except Exception as e: logger.error(f"Batch processing failed: {e}") # Fall back to individual processing for request in batch: try: response = await self.llm.ainvoke(request["prompt"]) self.results[request["id"]] = { "response": response.content, "batched": False } except Exception as e: self.results[request["id"]] = { "error": str(e) } def _create_batch_prompt(self, batch: List[Dict]) -> str: """Create combined prompt for batch""" prompt = "Process the following requests and provide numbered responses:\n\n" for i, request in enumerate(batch, 1): prompt += f"{i}. {request['prompt']}\n\n" prompt += "Provide clear, numbered responses for each request." return prompt def _parse_batch_response(self, response: str) -> List[str]: """Parse batch response into individual responses""" # Simple parsing - in production, use more robust parsing import re pattern = r'\d+\.\s*(.*?)(?=\d+\.|$)' matches = re.findall(pattern, response, re.DOTALL) return [match.strip() for match in matches]
Cost Optimization Tips
Use caching aggressively
Optimize prompts for token usage
Route to cheaper models when possible
Batch similar requests
Use streaming to stop early
Monitoring Metrics
Token usage per request
Track
Cache hit rate
Track
Cost per user session
Track
Model distribution
Track
References
Additional resources and documentation
Official Documentation
LangChain is rapidly evolving. Always check the official documentation for the latest features and best practices.