LangChain Integration Guide

Build sophisticated LLM applications with LangChain's powerful orchestration framework

Overview
What is LangChain and why use it for LLM applications

LangChain is an open-source framework designed to simplify the creation of applications using large language models. It provides a standard interface for chains, lots of integrations with other tools, and end-to-end chains for common applications.

Key Benefits
  • • Modular architecture
  • • Built-in memory management
  • • Tool & agent orchestration
  • • Production-ready patterns
Use Cases
  • • Chatbots with memory
  • • Question-answering systems
  • • Document analysis
  • • Agent-based workflows
Core Components
Understanding LangChain's building blocks

Models

Wrappers around LLMs, Chat Models, and Embeddings

LLMs
Chat Models
Embeddings

Prompts

Templates, example selectors, and output parsers

PromptTemplate
FewShotPrompt
OutputParser

Chains

Sequences of calls to LLMs, tools, or other chains

LLMChain
SequentialChain
RouterChain

Agents

Dynamic decision-making with tool selection

ReAct
Tools
AgentExecutor

Memory

State persistence across chain/agent calls

ConversationBuffer
VectorStoreMemory
Summary
Setup & Configuration
Installing and configuring LangChain with various LLM providers

Installation

# Core installation
pip install langchain langchain-community

# Provider-specific packages
pip install langchain-openai      # For OpenAI
pip install langchain-anthropic   # For Anthropic
pip install langchain-google-genai # For Google
pip install langchain-cohere      # For Cohere

# Additional dependencies
pip install chromadb  # Vector store
pip install faiss-cpu # Vector store
pip install beautifulsoup4 # Web scraping
pip install pypdf     # PDF processing

Basic Setup with ParrotRouter

import os
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage, SystemMessage

# Configure ParrotRouter as OpenAI-compatible endpoint
os.environ['OPENAI_API_KEY'] = 'your-parrotrouter-api-key'
os.environ['OPENAI_API_BASE'] = 'https://api.parrotrouter.com/v1'

# Initialize LLM
llm = ChatOpenAI(
    model="gpt-3.5-turbo",
    temperature=0.7,
    max_tokens=1000,
    streaming=True  # Enable streaming
)

# Basic usage
messages = [
    SystemMessage(content="You are a helpful assistant."),
    HumanMessage(content="What is LangChain?")
]

response = llm.invoke(messages)
print(response.content)

Multi-Provider Configuration

from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.chat_models.base import BaseChatModel
from typing import Dict

class LLMFactory:
    """Factory for creating LLM instances with different providers"""
    
    def __init__(self):
        self.providers: Dict[str, BaseChatModel] = {}
        self._initialize_providers()
    
    def _initialize_providers(self):
        """Initialize available LLM providers"""
        # ParrotRouter (OpenAI-compatible)
        if os.environ.get('PARROTROUTER_API_KEY'):
            self.providers['parrotrouter'] = ChatOpenAI(
                openai_api_key=os.environ['PARROTROUTER_API_KEY'],
                openai_api_base='https://api.parrotrouter.com/v1',
                model="gpt-3.5-turbo"
            )
        
        # Native OpenAI
        if os.environ.get('OPENAI_API_KEY'):
            self.providers['openai'] = ChatOpenAI(
                model="gpt-4"
            )
        
        # Anthropic
        if os.environ.get('ANTHROPIC_API_KEY'):
            self.providers['anthropic'] = ChatAnthropic(
                model="claude-3-sonnet-20240229"
            )
        
        # Google
        if os.environ.get('GOOGLE_API_KEY'):
            self.providers['google'] = ChatGoogleGenerativeAI(
                model="gemini-pro"
            )
    
    def get_llm(self, provider: str = 'parrotrouter') -> BaseChatModel:
        """Get LLM instance by provider name"""
        if provider not in self.providers:
            raise ValueError(f"Provider {provider} not configured")
        return self.providers[provider]

# Usage
factory = LLMFactory()
llm = factory.get_llm('parrotrouter')  # or 'openai', 'anthropic', 'google'
Chains, Prompts, and Output Parsers
Building modular LLM pipelines with structured outputs

Prompt Templates

from langchain.prompts import PromptTemplate, ChatPromptTemplate, FewShotPromptTemplate
from langchain.prompts.example_selector import SemanticSimilarityExampleSelector
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS

# Basic prompt template
basic_prompt = PromptTemplate(
    input_variables=["product", "language"],
    template="""
    Write a short product description for {product} in {language}.
    Make it engaging and highlight key benefits.
    """
)

# Chat prompt template
chat_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful product description writer."),
    ("human", "Write a description for {product} targeting {audience}"),
])

# Few-shot prompt with examples
examples = [
    {
        "input": "smartphone",
        "output": "A cutting-edge device that keeps you connected to what matters most."
    },
    {
        "input": "laptop",
        "output": "Your portable powerhouse for productivity and creativity on the go."
    }
]

example_prompt = PromptTemplate(
    input_variables=["input", "output"],
    template="Product: {input}\nDescription: {output}"
)

# Semantic similarity selector for relevant examples
example_selector = SemanticSimilarityExampleSelector.from_examples(
    examples,
    OpenAIEmbeddings(),
    FAISS,
    k=2
)

few_shot_prompt = FewShotPromptTemplate(
    example_selector=example_selector,
    example_prompt=example_prompt,
    prefix="Write concise product descriptions:",
    suffix="Product: {input}\nDescription:",
    input_variables=["input"]
)

Chains

from langchain.chains import LLMChain, SequentialChain, TransformChain
from langchain.callbacks import StreamingStdOutCallbackHandler

# Simple LLM Chain
simple_chain = LLMChain(
    llm=llm,
    prompt=basic_prompt,
    verbose=True
)

result = simple_chain.run(product="wireless earbuds", language="English")

# Sequential Chain - Multiple steps
# Step 1: Generate description
description_chain = LLMChain(
    llm=llm,
    prompt=PromptTemplate(
        input_variables=["product"],
        template="Write a product description for {product}"
    ),
    output_key="description"
)

# Step 2: Extract key features
features_chain = LLMChain(
    llm=llm,
    prompt=PromptTemplate(
        input_variables=["description"],
        template="Extract 3 key features from this description: {description}"
    ),
    output_key="features"
)

# Step 3: Create marketing tagline
tagline_chain = LLMChain(
    llm=llm,
    prompt=PromptTemplate(
        input_variables=["product", "features"],
        template="Create a catchy tagline for {product} based on these features: {features}"
    ),
    output_key="tagline"
)

# Combine into sequential chain
marketing_chain = SequentialChain(
    chains=[description_chain, features_chain, tagline_chain],
    input_variables=["product"],
    output_variables=["description", "features", "tagline"],
    verbose=True
)

result = marketing_chain({"product": "smart home security camera"})

# Transform Chain for preprocessing
def preprocess_text(inputs: dict) -> dict:
    """Clean and prepare text"""
    text = inputs["text"]
    # Remove extra whitespace
    cleaned = " ".join(text.split())
    # Convert to lowercase for consistency
    cleaned = cleaned.lower()
    return {"cleaned_text": cleaned}

transform_chain = TransformChain(
    input_variables=["text"],
    output_variables=["cleaned_text"],
    transform=preprocess_text
)

# Chain with streaming
streaming_chain = LLMChain(
    llm=llm,
    prompt=basic_prompt,
    callbacks=[StreamingStdOutCallbackHandler()]
)

# Stream the response
streaming_chain.run(product="electric bicycle", language="English")

Output Parsers

from langchain.output_parsers import (
    PydanticOutputParser, 
    StructuredOutputParser,
    ResponseSchema,
    OutputFixingParser,
    RetryOutputParser
)
from pydantic import BaseModel, Field
from typing import List

# Pydantic parser for structured data
class ProductInfo(BaseModel):
    name: str = Field(description="Product name")
    price: float = Field(description="Product price in USD")
    features: List[str] = Field(description="List of key features")
    in_stock: bool = Field(description="Whether product is in stock")

pydantic_parser = PydanticOutputParser(pydantic_object=ProductInfo)

# Create prompt with format instructions
prompt_with_parser = PromptTemplate(
    template="""
    Extract product information from the following text.
    {format_instructions}
    
    Text: {text}
    """,
    input_variables=["text"],
    partial_variables={"format_instructions": pydantic_parser.get_format_instructions()}
)

# Use the chain
parsing_chain = LLMChain(llm=llm, prompt=prompt_with_parser)
raw_output = parsing_chain.run(
    text="The new iPhone 15 Pro costs $999 and features a titanium design, A17 Pro chip, and improved camera. Currently in stock."
)

# Parse the output
try:
    parsed_output = pydantic_parser.parse(raw_output)
    print(f"Product: {parsed_output.name}")
    print(f"Price: $\{parsed_output.price}")
    print(f"Features: {', '.join(parsed_output.features)}")
except Exception as e:
    print(f"Parsing error: {e}")

# Structured output parser with schemas
response_schemas = [
    ResponseSchema(name="sentiment", description="sentiment of the text (positive/negative/neutral)"),
    ResponseSchema(name="confidence", description="confidence score between 0 and 1"),
    ResponseSchema(name="key_points", description="list of key points mentioned")
]

structured_parser = StructuredOutputParser.from_response_schemas(response_schemas)

# Output fixing parser - attempts to fix malformed outputs
fixing_parser = OutputFixingParser.from_llm(parser=pydantic_parser, llm=llm)

# Retry parser - retries with better instructions if parsing fails
retry_parser = RetryOutputParser.from_llm(parser=pydantic_parser, llm=llm)

# Combined chain with error handling
def safe_parse_chain(text: str) -> ProductInfo:
    """Parse with fallback strategies"""
    chain = LLMChain(llm=llm, prompt=prompt_with_parser)
    raw_output = chain.run(text=text)
    
    try:
        # Try standard parsing
        return pydantic_parser.parse(raw_output)
    except Exception as e:
        print(f"Initial parsing failed: {e}")
        try:
            # Try fixing parser
            return fixing_parser.parse(raw_output)
        except Exception as e:
            print(f"Fixing parser failed: {e}")
            # Final attempt with retry parser
            return retry_parser.parse_with_prompt(raw_output, prompt_with_parser.format(text=text))
Memory and Conversation Management
Implementing stateful conversations with context retention

Memory Types

from langchain.memory import (
    ConversationBufferMemory,
    ConversationBufferWindowMemory,
    ConversationSummaryMemory,
    ConversationSummaryBufferMemory,
    VectorStoreRetrieverMemory,
    CombinedMemory
)
from langchain.chains import ConversationChain

# 1. Buffer Memory - Stores everything
buffer_memory = ConversationBufferMemory()
conversation = ConversationChain(
    llm=llm,
    memory=buffer_memory,
    verbose=True
)

# Have a conversation
conversation.predict(input="Hi, I'm Alice and I love hiking")
conversation.predict(input="What's my name?")
conversation.predict(input="What do I enjoy doing?")

# 2. Window Memory - Keeps last K interactions
window_memory = ConversationBufferWindowMemory(k=3)
windowed_conversation = ConversationChain(
    llm=llm,
    memory=window_memory
)

# 3. Summary Memory - Summarizes conversation
summary_memory = ConversationSummaryMemory(llm=llm)
summary_conversation = ConversationChain(
    llm=llm,
    memory=summary_memory
)

# 4. Summary Buffer Memory - Hybrid approach
summary_buffer_memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=200  # Keep recent messages, summarize older ones
)

# 5. Vector Store Memory - Semantic search over conversation history
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS

embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_texts([], embeddings)
retriever = vectorstore.as_retriever(search_kwargs=dict(k=3))

vector_memory = VectorStoreRetrieverMemory(
    retriever=retriever,
    memory_key="history",
    input_key="input"
)

# Add memories
vector_memory.save_context(
    {"input": "I'm working on a Python project"}, 
    {"output": "That's great! What kind of project?"}
)
vector_memory.save_context(
    {"input": "It's a web scraper"}, 
    {"output": "Web scraping can be very useful. Are you using BeautifulSoup or Scrapy?"}
)

# Retrieve relevant memories
relevant_history = vector_memory.load_memory_variables({"input": "Python coding"})
print(relevant_history)

Custom Memory Implementation

from langchain.schema import BaseMemory
from typing import List, Dict, Any
import json
from datetime import datetime

class PersistentConversationMemory(BaseMemory):
    """Custom memory that persists to disk with metadata"""
    
    def __init__(self, file_path: str = "conversation_history.json"):
        self.file_path = file_path
        self.history: List[Dict[str, Any]] = []
        self.load_history()
    
    @property
    def memory_variables(self) -> List[str]:
        """Define memory variables to inject into prompt"""
        return ["history", "user_info"]
    
    def load_history(self):
        """Load conversation history from disk"""
        try:
            with open(self.file_path, 'r') as f:
                data = json.load(f)
                self.history = data.get('history', [])
        except FileNotFoundError:
            self.history = []
    
    def save_history(self):
        """Save conversation history to disk"""
        with open(self.file_path, 'w') as f:
            json.dump({
                'history': self.history,
                'last_updated': datetime.now().isoformat()
            }, f, indent=2)
    
    def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
        """Save conversation turn with metadata"""
        self.history.append({
            'timestamp': datetime.now().isoformat(),
            'input': inputs.get('input', ''),
            'output': outputs.get('output', ''),
            'metadata': {
                'tokens_used': outputs.get('tokens_used', 0),
                'model': outputs.get('model', 'unknown')
            }
        })
        self.save_history()
    
    def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """Load memory variables for prompt"""
        # Format recent history
        recent_history = self.history[-5:]  # Last 5 exchanges
        formatted_history = "\n".join([
            f"Human: {turn['input']}\nAssistant: {turn['output']}"
            for turn in recent_history
        ])
        
        # Extract user information from history
        user_info = self._extract_user_info()
        
        return {
            'history': formatted_history,
            'user_info': user_info
        }
    
    def _extract_user_info(self) -> str:
        """Extract key user information from conversation history"""
        # Simple implementation - could use NLP for better extraction
        info_pieces = []
        for turn in self.history:
            input_lower = turn['input'].lower()
            if 'my name is' in input_lower or "i'm " in input_lower:
                info_pieces.append(turn['input'])
        
        return ' '.join(info_pieces[-3:]) if info_pieces else "No user info available"
    
    def clear(self) -> None:
        """Clear conversation history"""
        self.history = []
        self.save_history()

# Use custom memory
custom_memory = PersistentConversationMemory()
custom_chain = ConversationChain(
    llm=llm,
    memory=custom_memory,
    prompt=PromptTemplate(
        input_variables=["history", "user_info", "input"],
        template="""
        Previous conversation:
        {history}
        
        Known user information: {user_info}
        
        Human: {input}
        Assistant: """
    )
)

Memory with Multiple Chains

# Shared memory across multiple chains
from langchain.memory import ConversationEntityMemory

# Entity memory tracks entities mentioned in conversation
entity_memory = ConversationEntityMemory(llm=llm)

# Chain 1: Information gathering
info_chain = ConversationChain(
    llm=llm,
    memory=entity_memory,
    prompt=PromptTemplate(
        input_variables=["entities", "history", "input"],
        template="""
        You are gathering information about the user.
        
        Known entities: {entities}
        Conversation history: {history}
        
        Human: {input}
        Assistant:"""
    )
)

# Chain 2: Personalized recommendations
recommendation_chain = ConversationChain(
    llm=llm,
    memory=entity_memory,  # Same memory instance
    prompt=PromptTemplate(
        input_variables=["entities", "history", "input"],
        template="""
        You provide personalized recommendations based on user information.
        
        Known about user: {entities}
        Previous conversations: {history}
        
        Human: {input}
        Assistant:"""
    )
)

# Use both chains with shared memory
info_chain.predict(input="I'm John and I love Italian food and jazz music")
recommendation = recommendation_chain.predict(input="Recommend a restaurant for me")
print(recommendation)
Agents and Tools Implementation
Building intelligent agents that can use tools and make decisions

Creating Custom Tools

from langchain.agents import Tool, AgentExecutor, create_react_agent
from langchain.tools import BaseTool, StructuredTool
from langchain import hub
from typing import Optional, Type
from pydantic import BaseModel, Field
import requests
import json

# Simple function tool
def get_weather(location: str) -> str:
    """Get weather for a location (mock implementation)"""
    # In real implementation, call weather API
    weather_data = {
        "New York": "Sunny, 72°F",
        "London": "Cloudy, 59°F", 
        "Tokyo": "Rainy, 65°F"
    }
    return weather_data.get(location, "Weather data not available")

# Create tool from function
weather_tool = Tool(
    name="get_weather",
    func=get_weather,
    description="Get current weather for a city. Input should be a city name."
)

# Structured tool with Pydantic
class CalculatorInput(BaseModel):
    expression: str = Field(description="Mathematical expression to evaluate")

def calculate(expression: str) -> str:
    """Safely evaluate mathematical expressions"""
    try:
        # Only allow safe operations
        allowed_chars = "0123456789+-*/()., "
        if all(c in allowed_chars for c in expression):
            result = eval(expression)
            return f"The result is: {result}"
        else:
            return "Invalid expression. Only basic math operations allowed."
    except Exception as e:
        return f"Error calculating: {str(e)}"

calculator_tool = StructuredTool.from_function(
    func=calculate,
    name="calculator",
    description="Perform mathematical calculations",
    args_schema=CalculatorInput
)

# Custom tool class
class WebSearchTool(BaseTool):
    """Custom tool for web search"""
    name = "web_search"
    description = "Search the web for current information"
    
    def _run(self, query: str) -> str:
        """Execute web search"""
        # Mock implementation - replace with actual search API
        results = {
            "LangChain": "LangChain is a framework for developing applications powered by language models",
            "Python": "Python is a high-level programming language",
        }
        
        for key in results:
            if key.lower() in query.lower():
                return results[key]
        
        return f"No results found for: {query}"
    
    async def _arun(self, query: str) -> str:
        """Async version"""
        return self._run(query)

# ParrotRouter API tool
class ParrotRouterTool(BaseTool):
    """Tool for making LLM calls via ParrotRouter"""
    name = "ask_ai"
    description = "Ask an AI model for help with complex questions"
    
    def __init__(self, api_key: str):
        super().__init__()
        self.api_key = api_key
    
    def _run(self, question: str) -> str:
        """Make API call to ParrotRouter"""
        headers = {
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json'
        }
        
        payload = {
            'model': 'gpt-3.5-turbo',
            'messages': [{'role': 'user', 'content': question}],
            'max_tokens': 200
        }
        
        try:
            response = requests.post(
                'https://api.parrotrouter.com/v1/chat/completions',
                headers=headers,
                json=payload
            )
            response.raise_for_status()
            return response.json()['choices'][0]['message']['content']
        except Exception as e:
            return f"Error calling AI: {str(e)}"
    
    async def _arun(self, question: str) -> str:
        """Async version"""
        # Implement async version with aiohttp
        return self._run(question)

Agent Implementation

# Initialize tools
tools = [
    weather_tool,
    calculator_tool,
    WebSearchTool(),
    ParrotRouterTool(api_key=os.environ['PARROTROUTER_API_KEY'])
]

# Get ReAct prompt from hub
prompt = hub.pull("hwchase17/react")

# Create agent
agent = create_react_agent(
    llm=llm,
    tools=tools,
    prompt=prompt
)

# Create agent executor
agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,
    max_iterations=5,
    early_stopping_method="generate"
)

# Use the agent
result = agent_executor.invoke({
    "input": "What's the weather in New York? Also, calculate 23 * 45 for me."
})
print(result['output'])

# Custom agent with specific behavior
from langchain.agents import AgentOutputParser
from langchain.schema import AgentAction, AgentFinish
import re

class CustomAgentOutputParser(AgentOutputParser):
    """Custom parser for agent outputs"""
    
    def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]:
        # Check if agent should finish
        if "Final Answer:" in llm_output:
            return AgentFinish(
                return_values={"output": llm_output.split("Final Answer:")[-1].strip()},
                log=llm_output,
            )
        
        # Parse action and input
        regex = r"Action: (.*?)[\n]*Action Input: (.*)"
        match = re.search(regex, llm_output, re.DOTALL)
        
        if not match:
            raise ValueError(f"Could not parse LLM output: '{llm_output}'")
        
        action = match.group(1).strip()
        action_input = match.group(2).strip()
        
        return AgentAction(tool=action, tool_input=action_input, log=llm_output)

# Agent with custom tools and memory
from langchain.memory import ConversationBufferMemory

agent_memory = ConversationBufferMemory(memory_key="chat_history")

conversational_agent = create_react_agent(
    llm=llm,
    tools=tools,
    prompt=PromptTemplate(
        input_variables=["input", "chat_history", "agent_scratchpad"],
        template="""
        You are a helpful AI assistant with access to various tools.
        
        Previous conversation:
        {chat_history}
        
        Available tools:
        - get_weather: Get current weather for a city
        - calculator: Perform mathematical calculations
        - web_search: Search the web for information
        - ask_ai: Ask another AI for help with complex questions
        
        To use a tool, respond with:
        Thought: [your reasoning]
        Action: [tool name]
        Action Input: [tool input]
        
        When you have a final answer, respond with:
        Final Answer: [your answer]
        
        Human: {input}
        {agent_scratchpad}
        """
    )
)

conversational_executor = AgentExecutor(
    agent=conversational_agent,
    tools=tools,
    memory=agent_memory,
    verbose=True
)

Tool Selection Strategies

# Multi-agent system with specialized agents
from langchain.agents import initialize_agent, AgentType

# Research agent with search tools
research_tools = [WebSearchTool(), ParrotRouterTool(api_key=os.environ['PARROTROUTER_API_KEY'])]
research_agent = initialize_agent(
    research_tools,
    llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True
)

# Math agent with calculator
math_tools = [calculator_tool]
math_agent = initialize_agent(
    math_tools,
    llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True
)

# Router agent that delegates to specialized agents
class RouterAgent:
    """Routes queries to appropriate specialized agents"""
    
    def __init__(self, llm, agents: Dict[str, Any]):
        self.llm = llm
        self.agents = agents
        self.router_prompt = PromptTemplate(
            input_variables=["query"],
            template="""
            Analyze this query and determine which type of agent should handle it:
            - 'research' for web searches, current events, general knowledge
            - 'math' for calculations and mathematical problems
            - 'general' for everything else
            
            Query: {query}
            
            Respond with just the agent type (research/math/general):
            """
        )
    
    def route(self, query: str) -> str:
        """Determine which agent to use"""
        router_chain = LLMChain(llm=self.llm, prompt=self.router_prompt)
        agent_type = router_chain.run(query=query).strip().lower()
        
        if agent_type in self.agents:
            return self.agents[agent_type].run(query)
        else:
            return self.agents['general'].run(query)

# Initialize router
router = RouterAgent(
    llm=llm,
    agents={
        'research': research_agent,
        'math': math_agent,
        'general': agent_executor
    }
)

# Use router
result = router.route("What's the population of Tokyo?")  # Goes to research
result = router.route("Calculate the compound interest on $1000 at 5% for 10 years")  # Goes to math
Document Loaders and Text Splitters
Processing documents for LLM consumption

Document Loaders

from langchain_community.document_loaders import (
    TextLoader,
    PyPDFLoader,
    CSVLoader,
    UnstructuredHTMLLoader,
    WebBaseLoader,
    DirectoryLoader,
    JSONLoader,
    UnstructuredMarkdownLoader
)
from langchain.schema import Document
import glob

# Text file loader
text_loader = TextLoader("document.txt", encoding='utf-8')
text_docs = text_loader.load()

# PDF loader
pdf_loader = PyPDFLoader("document.pdf")
pdf_docs = pdf_loader.load()

# Load and split pages
pdf_pages = pdf_loader.load_and_split()

# CSV loader with custom settings
csv_loader = CSVLoader(
    file_path="data.csv",
    csv_args={
        'delimiter': ',',
        'quotechar': '"',
        'fieldnames': ['name', 'description', 'price']
    }
)
csv_docs = csv_loader.load()

# Web loader
web_loader = WebBaseLoader([
    "https://example.com/page1",
    "https://example.com/page2"
])
web_docs = web_loader.load()

# Directory loader for multiple files
dir_loader = DirectoryLoader(
    'documents/',
    glob="**/*.txt",
    loader_cls=TextLoader,
    loader_kwargs={'encoding': 'utf-8'}
)
all_docs = dir_loader.load()

# JSON loader with jq schema
json_loader = JSONLoader(
    file_path='data.json',
    jq_schema='.records[]',  # Extract each record
    text_content=False,      # Load as structured data
    metadata_func=lambda record, metadata: {**metadata, "source": "json"}
)

# Custom loader
class CustomDataLoader:
    """Custom loader for proprietary format"""
    
    def __init__(self, file_path: str):
        self.file_path = file_path
    
    def load(self) -> List[Document]:
        """Load and parse custom format"""
        documents = []
        
        with open(self.file_path, 'r') as f:
            content = f.read()
            
        # Custom parsing logic
        sections = content.split('---SECTION---')
        
        for i, section in enumerate(sections):
            if section.strip():
                # Extract metadata
                lines = section.strip().split('\n')
                title = lines[0] if lines else f"Section {i}"
                content = '\n'.join(lines[1:]) if len(lines) > 1 else ""
                
                doc = Document(
                    page_content=content,
                    metadata={
                        'source': self.file_path,
                        'section': i,
                        'title': title
                    }
                )
                documents.append(doc)
        
        return documents

# Load from multiple sources
def load_knowledge_base(sources: Dict[str, str]) -> List[Document]:
    """Load documents from multiple sources"""
    all_documents = []
    
    for source_type, source_path in sources.items():
        try:
            if source_type == 'pdf':
                loader = PyPDFLoader(source_path)
            elif source_type == 'csv':
                loader = CSVLoader(source_path)
            elif source_type == 'html':
                loader = UnstructuredHTMLLoader(source_path)
            elif source_type == 'markdown':
                loader = UnstructuredMarkdownLoader(source_path)
            else:
                loader = TextLoader(source_path)
            
            documents = loader.load()
            # Add source type to metadata
            for doc in documents:
                doc.metadata['source_type'] = source_type
            
            all_documents.extend(documents)
            
        except Exception as e:
            print(f"Error loading {source_path}: {e}")
    
    return all_documents

Text Splitters

from langchain.text_splitter import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
    TokenTextSplitter,
    MarkdownTextSplitter,
    PythonCodeTextSplitter,
    RecursiveJsonSplitter
)

# Recursive character splitter (recommended for most use cases)
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    length_function=len,
    separators=["\n\n", "\n", " ", ""]  # Split by paragraphs, then lines, then words
)

documents = text_splitter.split_documents(text_docs)

# Token-based splitter (for precise token control)
from tiktoken import encoding_for_model

encoding = encoding_for_model('gpt-3.5-turbo')
token_splitter = TokenTextSplitter(
    chunk_size=500,  # tokens
    chunk_overlap=50,
    encoding_name='cl100k_base'
)

# Markdown splitter (preserves structure)
markdown_splitter = MarkdownTextSplitter(
    chunk_size=1000,
    chunk_overlap=100
)

# Code splitter (language-aware)
python_splitter = PythonCodeTextSplitter(
    chunk_size=1000,
    chunk_overlap=200
)

# Custom splitter with metadata preservation
class MetadataPreservingSplitter:
    """Splitter that maintains document metadata"""
    
    def __init__(self, base_splitter):
        self.base_splitter = base_splitter
    
    def split_documents(self, documents: List[Document]) -> List[Document]:
        """Split documents while preserving and enriching metadata"""
        all_splits = []
        
        for doc in documents:
            splits = self.base_splitter.split_text(doc.page_content)
            
            for i, split in enumerate(splits):
                # Create new document with enriched metadata
                split_doc = Document(
                    page_content=split,
                    metadata={
                        **doc.metadata,
                        'chunk_index': i,
                        'total_chunks': len(splits),
                        'chunk_size': len(split),
                        'original_size': len(doc.page_content)
                    }
                )
                all_splits.append(split_doc)
        
        return all_splits

# Semantic splitter (splits based on meaning)
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings

semantic_splitter = SemanticChunker(
    embeddings=OpenAIEmbeddings(),
    breakpoint_threshold_type="percentile",  # or "standard_deviation", "interquartile"
    breakpoint_threshold_amount=0.5
)

# Split by document type
def smart_split_documents(documents: List[Document]) -> List[Document]:
    """Apply appropriate splitter based on document type"""
    split_docs = []
    
    for doc in documents:
        source_type = doc.metadata.get('source_type', 'text')
        
        if source_type == 'code':
            splitter = PythonCodeTextSplitter(chunk_size=1000, chunk_overlap=200)
        elif source_type == 'markdown':
            splitter = MarkdownTextSplitter(chunk_size=1000, chunk_overlap=100)
        elif source_type == 'structured':
            splitter = RecursiveJsonSplitter(max_chunk_size=1000)
        else:
            splitter = RecursiveCharacterTextSplitter(
                chunk_size=1000,
                chunk_overlap=200
            )
        
        splits = splitter.split_text(doc.page_content)
        
        for i, split in enumerate(splits):
            split_doc = Document(
                page_content=split,
                metadata={
                    **doc.metadata,
                    'split_index': i,
                    'splitter_type': type(splitter).__name__
                }
            )
            split_docs.append(split_doc)
    
    return split_docs
Vector Stores and Retrieval Chains
Implementing Retrieval-Augmented Generation (RAG) systems

Vector Store Setup

from langchain_community.vectorstores import FAISS, Chroma, Pinecone
from langchain_openai import OpenAIEmbeddings
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
import pinecone

# Initialize embeddings
embeddings = OpenAIEmbeddings()

# FAISS - Local vector store
faiss_store = FAISS.from_documents(documents, embeddings)

# Save and load FAISS
faiss_store.save_local("faiss_index")
loaded_faiss = FAISS.load_local("faiss_index", embeddings)

# Chroma - Persistent local store
chroma_store = Chroma.from_documents(
    documents,
    embeddings,
    persist_directory="./chroma_db",
    collection_name="my_collection"
)

# Pinecone - Cloud vector database
pinecone.init(api_key=os.environ['PINECONE_API_KEY'], environment='us-west1-gcp')
pinecone_store = Pinecone.from_documents(
    documents,
    embeddings,
    index_name="langchain-index",
    namespace="my-namespace"
)

# Advanced retrieval with filters
retriever = faiss_store.as_retriever(
    search_type="similarity",  # or "mmr" for diversity
    search_kwargs={
        "k": 5,  # Return top 5 results
        "score_threshold": 0.7,  # Minimum similarity score
        "filter": {"source": "technical_docs"}  # Metadata filter
    }
)

# Contextual compression retriever
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=retriever
)

# Multi-query retriever for better results
from langchain.retrievers.multi_query import MultiQueryRetriever

multi_query_retriever = MultiQueryRetriever.from_llm(
    retriever=faiss_store.as_retriever(),
    llm=llm
)

# Ensemble retriever combining multiple strategies
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever

# BM25 for keyword search
bm25_retriever = BM25Retriever.from_documents(documents)
bm25_retriever.k = 5

# Combine with semantic search
ensemble_retriever = EnsembleRetriever(
    retrievers=[bm25_retriever, faiss_store.as_retriever()],
    weights=[0.3, 0.7]  # Weight keyword vs semantic search
)

RAG Implementation

from langchain.chains import RetrievalQA, ConversationalRetrievalChain
from langchain.prompts import PromptTemplate

# Basic RAG chain
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",  # or "map_reduce", "refine", "map_rerank"
    retriever=retriever,
    return_source_documents=True,
    verbose=True
)

# Query the chain
response = qa_chain({"query": "What is LangChain?"})
print(f"Answer: {response['result']}")
print(f"Sources: {[doc.metadata for doc in response['source_documents']]}")

# Custom RAG prompt
rag_prompt = PromptTemplate(
    input_variables=["context", "question"],
    template="""
    You are an AI assistant helping users understand technical documentation.
    Use the following context to answer the question. If you don't know the answer,
    say so - don't make up information.
    
    Context:
    {context}
    
    Question: {question}
    
    Answer: """
)

custom_qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    retriever=retriever,
    chain_type_kwargs={"prompt": rag_prompt}
)

# Conversational RAG with memory
conversational_chain = ConversationalRetrievalChain.from_llm(
    llm=llm,
    retriever=compression_retriever,
    memory=ConversationBufferMemory(
        memory_key="chat_history",
        return_messages=True
    ),
    combine_docs_chain_kwargs={"prompt": rag_prompt}
)

# Have a conversation
result = conversational_chain({"question": "What is RAG?"})
result = conversational_chain({"question": "Can you give me an example?"})

# Advanced RAG with source citation
class CitationRAGChain:
    """RAG chain that provides inline citations"""
    
    def __init__(self, llm, retriever):
        self.llm = llm
        self.retriever = retriever
        self.citation_prompt = PromptTemplate(
            input_variables=["context", "question"],
            template="""
            Answer the question based on the given context. 
            Cite your sources using [1], [2], etc. format.
            
            Context:
            {context}
            
            Question: {question}
            
            Answer with citations: """
        )
    
    def run(self, question: str) -> Dict[str, Any]:
        # Retrieve relevant documents
        docs = self.retriever.get_relevant_documents(question)
        
        # Format context with numbering
        context_parts = []
        for i, doc in enumerate(docs, 1):
            source = doc.metadata.get('source', 'Unknown')
            context_parts.append(f"[{i}] {doc.page_content} (Source: {source})")
        
        context = "\n\n".join(context_parts)
        
        # Generate answer
        chain = LLMChain(llm=self.llm, prompt=self.citation_prompt)
        answer = chain.run(context=context, question=question)
        
        # Extract citations from answer
        import re
        citation_pattern = r'[(d+)]'
        cited_indices = set(int(match) - 1 for match in re.findall(citation_pattern, answer))
        cited_docs = [docs[i] for i in cited_indices if i < len(docs)]
        
        return {
            'answer': answer,
            'source_documents': docs,
            'cited_documents': cited_docs
        }

# Use citation chain
citation_chain = CitationRAGChain(llm, retriever)
result = citation_chain.run("What are the benefits of RAG?")
print(f"Answer: {result['answer']}")
print(f"Cited sources: {[doc.metadata['source'] for doc in result['cited_documents']]}")

Hybrid Search Implementation

# Hybrid search combining multiple techniques
from typing import List, Tuple
import numpy as np

class HybridRetriever:
    """Combines semantic, keyword, and metadata-based retrieval"""
    
    def __init__(self, 
                 semantic_store: FAISS,
                 keyword_retriever: BM25Retriever,
                 metadata_store: Dict[str, Document]):
        self.semantic_store = semantic_store
        self.keyword_retriever = keyword_retriever
        self.metadata_store = metadata_store
        
    def retrieve(self, 
                 query: str, 
                 k: int = 5,
                 semantic_weight: float = 0.5,
                 keyword_weight: float = 0.3,
                 metadata_weight: float = 0.2,
                 filters: Optional[Dict] = None) -> List[Document]:
        """Retrieve documents using hybrid approach"""
        
        # Semantic search
        semantic_docs = self.semantic_store.similarity_search_with_score(query, k=k*2)
        
        # Keyword search
        keyword_docs = self.keyword_retriever.get_relevant_documents(query)[:k*2]
        
        # Score normalization
        doc_scores = {}
        
        # Add semantic scores
        if semantic_docs:
            max_semantic_score = max(score for _, score in semantic_docs)
            for doc, score in semantic_docs:
                doc_id = doc.metadata.get('doc_id', doc.page_content[:50])
                doc_scores[doc_id] = {
                    'doc': doc,
                    'semantic_score': score / max_semantic_score * semantic_weight
                }
        
        # Add keyword scores
        for i, doc in enumerate(keyword_docs):
            doc_id = doc.metadata.get('doc_id', doc.page_content[:50])
            keyword_score = (len(keyword_docs) - i) / len(keyword_docs) * keyword_weight
            
            if doc_id in doc_scores:
                doc_scores[doc_id]['keyword_score'] = keyword_score
            else:
                doc_scores[doc_id] = {
                    'doc': doc,
                    'semantic_score': 0,
                    'keyword_score': keyword_score
                }
        
        # Add metadata boost
        if filters:
            for doc_id, data in doc_scores.items():
                metadata_match = all(
                    data['doc'].metadata.get(key) == value 
                    for key, value in filters.items()
                )
                data['metadata_score'] = metadata_weight if metadata_match else 0
        
        # Calculate final scores
        final_scores = []
        for doc_id, data in doc_scores.items():
            total_score = (
                data.get('semantic_score', 0) + 
                data.get('keyword_score', 0) + 
                data.get('metadata_score', 0)
            )
            final_scores.append((data['doc'], total_score))
        
        # Sort by score and return top k
        final_scores.sort(key=lambda x: x[1], reverse=True)
        return [doc for doc, _ in final_scores[:k]]

# Vector store with metadata filtering
class MetadataVectorStore:
    """Vector store with advanced metadata filtering"""
    
    def __init__(self, embeddings):
        self.embeddings = embeddings
        self.vectors = []
        self.documents = []
        self.metadata_index = {}
    
    def add_documents(self, documents: List[Document]):
        """Add documents with metadata indexing"""
        for doc in documents:
            # Generate embedding
            embedding = self.embeddings.embed_query(doc.page_content)
            self.vectors.append(embedding)
            self.documents.append(doc)
            
            # Index metadata
            for key, value in doc.metadata.items():
                if key not in self.metadata_index:
                    self.metadata_index[key] = {}
                if value not in self.metadata_index[key]:
                    self.metadata_index[key][value] = []
                self.metadata_index[key][value].append(len(self.documents) - 1)
    
    def similarity_search_with_filter(self, 
                                      query: str, 
                                      k: int = 5,
                                      filters: Optional[Dict] = None) -> List[Document]:
        """Search with metadata filtering"""
        # Get candidate indices based on filters
        if filters:
            candidate_indices = None
            for key, value in filters.items():
                if key in self.metadata_index and value in self.metadata_index[key]:
                    indices = set(self.metadata_index[key][value])
                    if candidate_indices is None:
                        candidate_indices = indices
                    else:
                        candidate_indices = candidate_indices.intersection(indices)
            
            if not candidate_indices:
                return []
        else:
            candidate_indices = set(range(len(self.documents)))
        
        # Generate query embedding
        query_embedding = self.embeddings.embed_query(query)
        
        # Calculate similarities for candidates only
        similarities = []
        for idx in candidate_indices:
            similarity = np.dot(query_embedding, self.vectors[idx])
            similarities.append((idx, similarity))
        
        # Sort and return top k
        similarities.sort(key=lambda x: x[1], reverse=True)
        return [self.documents[idx] for idx, _ in similarities[:k]]
LangChain Expression Language (LCEL)
Declarative way to compose chains

LCEL Basics

from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from operator import itemgetter

# Simple LCEL chain
prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
chain = prompt | llm

# Invoke the chain
result = chain.invoke({"topic": "programming"})
print(result.content)

# Chain with output parser
from langchain.output_parsers import CommaSeparatedListOutputParser

parser = CommaSeparatedListOutputParser()
prompt = PromptTemplate(
    template="List 5 {category} items separated by commas:",
    input_variables=["category"]
)

list_chain = prompt | llm | parser
items = list_chain.invoke({"category": "programming languages"})
print(items)  # ['Python', 'JavaScript', 'Java', 'C++', 'Go']

# Parallel execution
parallel_chain = RunnableParallel(
    joke=prompt | llm,
    facts=PromptTemplate.from_template("List 3 facts about {topic}") | llm,
    definition=PromptTemplate.from_template("Define {topic} in one sentence") | llm
)

results = parallel_chain.invoke({"topic": "artificial intelligence"})
print(f"Joke: {results['joke'].content}")
print(f"Facts: {results['facts'].content}")
print(f"Definition: {results['definition'].content}")

# Complex chain with retrieval
retrieval_chain = (
    {
        "context": retriever | (lambda docs: "\n".join(doc.page_content for doc in docs)),
        "question": RunnablePassthrough()
    }
    | PromptTemplate.from_template("""
        Context: {context}
        
        Question: {question}
        
        Answer based on the context:
    """)
    | llm
    | StrOutputParser()
)

answer = retrieval_chain.invoke("What is LangChain?")

# Branching with routing
def route_question(info):
    """Route to different chains based on question type"""
    question = info["question"].lower()
    if "calculate" in question or "math" in question:
        return "math"
    elif "translate" in question:
        return "translation"
    else:
        return "general"

math_chain = PromptTemplate.from_template("Calculate: {question}") | llm
translation_chain = PromptTemplate.from_template("Translate: {question}") | llm
general_chain = PromptTemplate.from_template("Answer: {question}") | llm

branch_chain = (
    RunnablePassthrough.assign(
        route=lambda x: route_question(x)
    )
    | {
        "math": math_chain,
        "translation": translation_chain,
        "general": general_chain
    }
    | itemgetter(itemgetter("route"))
)

Advanced LCEL Patterns

# Fallback chains
from langchain_core.runnables import RunnableLambda

primary_chain = prompt | ChatOpenAI(model="gpt-4")
fallback_chain = prompt | ChatOpenAI(model="gpt-3.5-turbo")

def with_fallback(primary, fallback):
    """Create chain with fallback"""
    def run_with_fallback(inputs):
        try:
            return primary.invoke(inputs)
        except Exception as e:
            print(f"Primary failed: {e}, using fallback")
            return fallback.invoke(inputs)
    
    return RunnableLambda(run_with_fallback)

robust_chain = with_fallback(primary_chain, fallback_chain)

# Streaming with LCEL
async def stream_chain():
    """Stream responses token by token"""
    chain = prompt | llm
    
    async for chunk in chain.astream({"topic": "space exploration"}):
        print(chunk.content, end="", flush=True)

# Batch processing
batch_chain = prompt | llm

topics = [
    {"topic": "quantum computing"},
    {"topic": "climate change"},
    {"topic": "space exploration"}
]

# Process in parallel
batch_results = await batch_chain.abatch(topics)

# Custom runnable
from langchain_core.runnables import Runnable

class CustomProcessor(Runnable):
    """Custom processing step"""
    
    def invoke(self, input: Dict, config: Optional[Dict] = None) -> Dict:
        # Custom processing logic
        processed = input.copy()
        processed['word_count'] = len(input.get('text', '').split())
        processed['char_count'] = len(input.get('text', ''))
        return processed
    
    async def ainvoke(self, input: Dict, config: Optional[Dict] = None) -> Dict:
        # Async version
        return self.invoke(input, config)

# Use in chain
analysis_chain = (
    {"text": RunnablePassthrough()}
    | CustomProcessor()
    | RunnableLambda(lambda x: f"Text has {x['word_count']} words and {x['char_count']} characters")
)

# Conditional chains
def conditional_chain():
    """Chain with conditional logic"""
    
    def check_length(text: str) -> str:
        return "long" if len(text) > 100 else "short"
    
    short_prompt = PromptTemplate.from_template("Expand this short text: {text}")
    long_prompt = PromptTemplate.from_template("Summarize this long text: {text}")
    
    return (
        RunnablePassthrough.assign(
            length_type=lambda x: check_length(x["text"])
        )
        | RunnableLambda(
            lambda x: short_prompt if x["length_type"] == "short" else long_prompt
        )
        | llm
    )

# Chain with retry and timeout
from langchain_core.runnables import RunnableConfig

config = RunnableConfig(
    max_retries=3,
    timeout=30.0,
    tags=["production", "critical"],
    metadata={"version": "1.0"}
)

production_chain = (prompt | llm).with_config(config)
Streaming and Async Operations
Implementing responsive, real-time LLM applications

Streaming Responses

from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.callbacks.base import AsyncCallbackHandler
from typing import Any, Dict, List, Optional
import asyncio

# Basic streaming
streaming_llm = ChatOpenAI(
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()]
)

# Stream to console
response = streaming_llm.invoke("Write a short story about AI")

# Custom streaming handler
class CustomStreamHandler(AsyncCallbackHandler):
    """Custom handler for streaming events"""
    
    def __init__(self, websocket=None):
        self.websocket = websocket
        self.tokens = []
    
    async def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs):
        """Called when LLM starts"""
        print("Starting generation...")
        if self.websocket:
            await self.websocket.send_json({"type": "start"})
    
    async def on_llm_new_token(self, token: str, **kwargs):
        """Called for each new token"""
        self.tokens.append(token)
        if self.websocket:
            await self.websocket.send_json({
                "type": "token",
                "content": token
            })
        else:
            print(token, end="", flush=True)
    
    async def on_llm_end(self, response, **kwargs):
        """Called when LLM finishes"""
        print("\nGeneration complete!")
        if self.websocket:
            await self.websocket.send_json({
                "type": "complete",
                "full_response": "".join(self.tokens)
            })
    
    async def on_llm_error(self, error: Exception, **kwargs):
        """Called on error"""
        print(f"Error: {error}")
        if self.websocket:
            await self.websocket.send_json({
                "type": "error",
                "error": str(error)
            })

# Use custom handler
custom_handler = CustomStreamHandler()
streaming_llm = ChatOpenAI(
    streaming=True,
    callbacks=[custom_handler]
)

# Stream with token counting
class TokenCounterHandler(AsyncCallbackHandler):
    """Count tokens while streaming"""
    
    def __init__(self):
        self.token_count = 0
        self.total_tokens = 0
    
    async def on_llm_new_token(self, token: str, **kwargs):
        self.token_count += 1
        # Rough estimation - would use tiktoken for accuracy
        self.total_tokens += len(token.split())
        
        if self.token_count % 10 == 0:
            print(f"\n[Tokens: {self.token_count}]", end="")

# Streaming with multiple handlers
multi_handler_llm = ChatOpenAI(
    streaming=True,
    callbacks=[
        StreamingStdOutCallbackHandler(),
        TokenCounterHandler(),
        custom_handler
    ]
)

Async Operations

# Async chain execution
async def async_rag_example():
    """Async RAG implementation"""
    
    # Async retriever
    async def async_retrieve(query: str) -> List[Document]:
        # Simulate async retrieval
        await asyncio.sleep(0.1)
        return await retriever.aget_relevant_documents(query)
    
    # Async chain
    async_chain = (
        {
            "context": lambda x: async_retrieve(x["question"]),
            "question": RunnablePassthrough()
        }
        | PromptTemplate.from_template("""
            Context: {context}
            Question: {question}
            Answer:
        """)
        | llm
    )
    
    # Single async call
    result = await async_chain.ainvoke({"question": "What is async programming?"})
    print(result)
    
    # Batch async calls
    questions = [
        {"question": "What is Python?"},
        {"question": "What is JavaScript?"},
        {"question": "What is Rust?"}
    ]
    
    results = await async_chain.abatch(questions)
    for i, result in enumerate(results):
        print(f"Q{i+1}: {result.content[:100]}...")

# Async streaming
async def async_stream_example():
    """Stream responses asynchronously"""
    
    chain = prompt | llm
    
    async for chunk in chain.astream({"topic": "future of technology"}):
        print(chunk.content, end="", flush=True)
        
        # Simulate processing each chunk
        await asyncio.sleep(0.01)

# Concurrent async operations
async def concurrent_llm_calls():
    """Make multiple LLM calls concurrently"""
    
    tasks = []
    prompts = [
        "Explain quantum computing in simple terms",
        "What are the benefits of renewable energy?",
        "How does machine learning work?"
    ]
    
    for prompt_text in prompts:
        task = llm.ainvoke(prompt_text)
        tasks.append(task)
    
    # Wait for all tasks to complete
    start_time = asyncio.get_event_loop().time()
    results = await asyncio.gather(*tasks)
    end_time = asyncio.get_event_loop().time()
    
    print(f"Completed {len(results)} calls in {end_time - start_time:.2f} seconds")
    
    return results

# Async agent with streaming
class AsyncStreamingAgent:
    """Agent that streams responses asynchronously"""
    
    def __init__(self, llm, tools, websocket=None):
        self.llm = llm
        self.tools = tools
        self.websocket = websocket
    
    async def arun(self, query: str):
        """Run agent with streaming"""
        
        # Stream thinking process
        if self.websocket:
            await self.websocket.send_json({
                "type": "thinking",
                "content": "Processing your request..."
            })
        
        # Determine tool to use
        tool_chain = (
            PromptTemplate.from_template(
                "Which tool should I use for: {query}\nTools: {tools}\nTool:"
            )
            | self.llm
        )
        
        tool_name = await tool_chain.ainvoke({
            "query": query,
            "tools": ", ".join([t.name for t in self.tools])
        })
        
        # Stream tool selection
        if self.websocket:
            await self.websocket.send_json({
                "type": "tool_selection",
                "tool": tool_name.content
            })
        
        # Execute tool and stream results
        # ... tool execution logic ...

# WebSocket handler for real-time streaming
async def websocket_handler(websocket, path):
    """Handle WebSocket connections for streaming"""
    
    handler = CustomStreamHandler(websocket)
    streaming_llm = ChatOpenAI(
        streaming=True,
        callbacks=[handler]
    )
    
    async for message in websocket:
        data = json.loads(message)
        
        if data["type"] == "query":
            # Stream response back through WebSocket
            await streaming_llm.ainvoke(data["content"])
        
        elif data["type"] == "stop":
            # Handle stop generation
            handler.should_stop = True

# Rate-limited async operations
from asyncio import Semaphore

class RateLimitedLLM:
    """LLM with rate limiting for async operations"""
    
    def __init__(self, llm, max_concurrent: int = 5):
        self.llm = llm
        self.semaphore = Semaphore(max_concurrent)
    
    async def ainvoke(self, prompt: str) -> str:
        async with self.semaphore:
            return await self.llm.ainvoke(prompt)
    
    async def abatch(self, prompts: List[str]) -> List[str]:
        tasks = [self.ainvoke(prompt) for prompt in prompts]
        return await asyncio.gather(*tasks)
Production Deployment Best Practices
Deploying LangChain applications at scale

Error Handling and Fallbacks

from langchain.schema import OutputParserException
from tenacity import retry, stop_after_attempt, wait_exponential
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class RobustLangChainApp:
    """Production-ready LangChain application with error handling"""
    
    def __init__(self):
        self.primary_llm = ChatOpenAI(model="gpt-4", temperature=0.7)
        self.fallback_llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)
        self.cache = {}
        
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10)
    )
    async def _call_llm_with_retry(self, prompt: str, llm) -> str:
        """Call LLM with exponential backoff retry"""
        try:
            response = await llm.ainvoke(prompt)
            return response.content
        except Exception as e:
            logger.error(f"LLM call failed: {e}")
            raise
    
    async def process_request(self, prompt: str) -> Dict[str, Any]:
        """Process request with fallback and caching"""
        
        # Check cache first
        cache_key = hashlib.md5(prompt.encode()).hexdigest()
        if cache_key in self.cache:
            logger.info("Returning cached response")
            return self.cache[cache_key]
        
        start_time = time.time()
        
        try:
            # Try primary LLM
            response = await self._call_llm_with_retry(prompt, self.primary_llm)
            model_used = "gpt-4"
            
        except Exception as e:
            logger.warning(f"Primary LLM failed, falling back: {e}")
            
            try:
                # Fallback to secondary LLM
                response = await self._call_llm_with_retry(prompt, self.fallback_llm)
                model_used = "gpt-3.5-turbo"
                
            except Exception as e:
                logger.error(f"All LLMs failed: {e}")
                return {
                    "error": "Service temporarily unavailable",
                    "status": "failed",
                    "duration": time.time() - start_time
                }
        
        # Prepare result
        result = {
            "response": response,
            "model_used": model_used,
            "duration": time.time() - start_time,
            "cached": False,
            "timestamp": datetime.utcnow().isoformat()
        }
        
        # Cache successful responses
        self.cache[cache_key] = result
        
        return result

# Environment-specific configuration
class LangChainConfig:
    """Configuration management for different environments"""
    
    def __init__(self, environment: str = "development"):
        self.environment = environment
        self.config = self._load_config()
    
    def _load_config(self) -> Dict[str, Any]:
        """Load environment-specific configuration"""
        
        base_config = {
            "llm_timeout": 30,
            "max_retries": 3,
            "cache_ttl": 3600,
            "log_level": "INFO"
        }
        
        env_configs = {
            "development": {
                "llm_model": "gpt-3.5-turbo",
                "temperature": 0.9,
                "verbose": True,
                "cache_enabled": False
            },
            "staging": {
                "llm_model": "gpt-3.5-turbo",
                "temperature": 0.7,
                "verbose": True,
                "cache_enabled": True
            },
            "production": {
                "llm_model": "gpt-4",
                "temperature": 0.3,
                "verbose": False,
                "cache_enabled": True,
                "fallback_model": "gpt-3.5-turbo"
            }
        }
        
        config = {**base_config, **env_configs.get(self.environment, {})}
        
        # Override with environment variables
        for key in config:
            env_key = f"LANGCHAIN_{key.upper()}"
            if env_key in os.environ:
                config[key] = os.environ[env_key]
        
        return config
    
    def get_llm(self) -> ChatOpenAI:
        """Get configured LLM instance"""
        return ChatOpenAI(
            model=self.config["llm_model"],
            temperature=self.config["temperature"],
            timeout=self.config["llm_timeout"],
            max_retries=self.config["max_retries"]
        )

Monitoring and Observability

from langchain.callbacks import LangChainTracer
from langsmith import Client
import prometheus_client
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc import trace_exporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# Prometheus metrics
request_counter = prometheus_client.Counter(
    'langchain_requests_total',
    'Total LangChain requests',
    ['chain_type', 'model', 'status']
)

request_duration = prometheus_client.Histogram(
    'langchain_request_duration_seconds',
    'LangChain request duration',
    ['chain_type', 'model']
)

token_counter = prometheus_client.Counter(
    'langchain_tokens_total',
    'Total tokens used',
    ['model', 'token_type']
)

# OpenTelemetry setup
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

otlp_exporter = trace_exporter.OTLPSpanExporter(
    endpoint="localhost:4317",
    insecure=True
)

span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

# Custom monitoring callback
class MonitoringCallback(AsyncCallbackHandler):
    """Callback for monitoring and metrics collection"""
    
    async def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs):
        """Track chain execution start"""
        self.start_time = time.time()
        self.chain_type = serialized.get("name", "unknown")
        
        # Start OpenTelemetry span
        self.span = tracer.start_span(f"langchain.{self.chain_type}")
        self.span.set_attribute("chain.type", self.chain_type)
        self.span.set_attribute("input.size", len(str(inputs)))
    
    async def on_llm_end(self, response, **kwargs):
        """Track LLM completion"""
        duration = time.time() - self.start_time
        
        # Update Prometheus metrics
        request_duration.labels(
            chain_type=self.chain_type,
            model=response.llm_output.get("model_name", "unknown")
        ).observe(duration)
        
        # Track token usage
        if "token_usage" in response.llm_output:
            usage = response.llm_output["token_usage"]
            token_counter.labels(
                model=response.llm_output.get("model_name", "unknown"),
                token_type="prompt"
            ).inc(usage.get("prompt_tokens", 0))
            
            token_counter.labels(
                model=response.llm_output.get("model_name", "unknown"),
                token_type="completion"
            ).inc(usage.get("completion_tokens", 0))
        
        # End span
        if hasattr(self, 'span'):
            self.span.set_attribute("tokens.prompt", usage.get("prompt_tokens", 0))
            self.span.set_attribute("tokens.completion", usage.get("completion_tokens", 0))
            self.span.set_attribute("duration.seconds", duration)
            self.span.end()
    
    async def on_chain_error(self, error: Exception, **kwargs):
        """Track errors"""
        request_counter.labels(
            chain_type=self.chain_type,
            model="unknown",
            status="error"
        ).inc()
        
        if hasattr(self, 'span'):
            self.span.record_exception(error)
            self.span.set_status(trace.Status(trace.StatusCode.ERROR, str(error)))
            self.span.end()

# LangSmith integration
langsmith_client = Client()

class LangSmithMonitor:
    """Monitor LangChain apps with LangSmith"""
    
    def __init__(self, project_name: str):
        self.project_name = project_name
        self.tracer = LangChainTracer(
            project_name=project_name,
            client=langsmith_client
        )
    
    def monitor_chain(self, chain):
        """Add monitoring to a chain"""
        # Add tracer callback
        if hasattr(chain, 'callbacks'):
            chain.callbacks.append(self.tracer)
        
        return chain
    
    async def evaluate_chain(self, chain, test_cases: List[Dict]):
        """Evaluate chain performance"""
        results = []
        
        for test_case in test_cases:
            try:
                result = await chain.ainvoke(test_case["input"])
                
                # Compare with expected output
                score = self._calculate_similarity(
                    result,
                    test_case["expected_output"]
                )
                
                results.append({
                    "input": test_case["input"],
                    "output": result,
                    "expected": test_case["expected_output"],
                    "score": score,
                    "passed": score > 0.8
                })
                
            except Exception as e:
                results.append({
                    "input": test_case["input"],
                    "error": str(e),
                    "passed": False
                })
        
        # Log to LangSmith
        langsmith_client.create_test_results(
            project_name=self.project_name,
            results=results
        )
        
        return results

Deployment Architecture

# FastAPI application with LangChain
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import uvicorn
from contextlib import asynccontextmanager
import redis
from typing import Optional

# Request/Response models
class ChatRequest(BaseModel):
    message: str
    session_id: Optional[str] = None
    stream: bool = False

class ChatResponse(BaseModel):
    response: str
    session_id: str
    model_used: str
    duration: float

# Application lifespan management
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    app.state.llm = ChatOpenAI(
        model="gpt-3.5-turbo",
        streaming=True,
        callbacks=[MonitoringCallback()]
    )
    app.state.redis = redis.Redis(
        host=os.environ.get("REDIS_HOST", "localhost"),
        decode_responses=True
    )
    app.state.memory_store = {}
    
    yield
    
    # Shutdown
    app.state.redis.close()

# Create FastAPI app
app = FastAPI(lifespan=lifespan)

# Health check endpoint
@app.get("/health")
async def health_check():
    return {
        "status": "healthy",
        "version": "1.0.0",
        "model": "gpt-3.5-turbo"
    }

# Chat endpoint
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest, background_tasks: BackgroundTasks):
    """Process chat request"""
    
    try:
        # Get or create session
        session_id = request.session_id or str(uuid.uuid4())
        
        # Get conversation memory
        if session_id in app.state.memory_store:
            memory = app.state.memory_store[session_id]
        else:
            memory = ConversationBufferMemory()
            app.state.memory_store[session_id] = memory
        
        # Create chain with memory
        chain = ConversationChain(
            llm=app.state.llm,
            memory=memory
        )
        
        # Process request
        start_time = time.time()
        response = await chain.ainvoke({"input": request.message})
        duration = time.time() - start_time
        
        # Background task to save conversation
        background_tasks.add_task(
            save_conversation,
            session_id,
            request.message,
            response["response"]
        )
        
        return ChatResponse(
            response=response["response"],
            session_id=session_id,
            model_used="gpt-3.5-turbo",
            duration=duration
        )
        
    except Exception as e:
        logger.error(f"Chat error: {e}")
        raise HTTPException(status_code=500, detail=str(e))

# Streaming endpoint
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    """Stream chat responses"""
    
    async def generate():
        chain = ConversationChain(llm=app.state.llm)
        
        async for chunk in chain.astream({"input": request.message}):
            yield f"data: {json.dumps({'chunk': chunk})}\n\n"
        
        yield f"data: {json.dumps({'done': True})}\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

# Background task
async def save_conversation(session_id: str, message: str, response: str):
    """Save conversation to Redis"""
    key = f"conversation:{session_id}"
    conversation = {
        "timestamp": datetime.utcnow().isoformat(),
        "message": message,
        "response": response
    }
    
    app.state.redis.lpush(key, json.dumps(conversation))
    app.state.redis.expire(key, 3600)  # 1 hour TTL

# Docker deployment
dockerfile_content = '''
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Environment variables
ENV PYTHONUNBUFFERED=1
ENV LANGCHAIN_TRACING_V2=true
ENV LANGCHAIN_PROJECT=production

# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
  CMD python -c "import requests; requests.get('http://localhost:8000/health')"

# Run application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
'''

# Kubernetes deployment
k8s_deployment = '''
apiVersion: apps/v1
kind: Deployment
metadata:
  name: langchain-app
spec:
  replicas: 3
  selector:
    matchLabels:
      app: langchain-app
  template:
    metadata:
      labels:
        app: langchain-app
    spec:
      containers:
      - name: langchain
        image: langchain-app:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: langchain-secrets
              key: openai-api-key
        - name: REDIS_HOST
          value: redis-service
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: langchain-service
spec:
  selector:
    app: langchain-app
  ports:
  - port: 80
    targetPort: 8000
  type: LoadBalancer
'''
Cost Optimization Strategies
Reducing costs while maintaining quality

Token Usage Optimization

from tiktoken import encoding_for_model
import functools

class TokenOptimizer:
    """Optimize token usage in LangChain applications"""
    
    def __init__(self, model: str = "gpt-3.5-turbo"):
        self.model = model
        self.encoding = encoding_for_model(model)
        self.token_costs = {
            "gpt-3.5-turbo": {"input": 0.0015, "output": 0.002},
            "gpt-4": {"input": 0.03, "output": 0.06},
            "claude-3-sonnet": {"input": 0.003, "output": 0.015}
        }
    
    def count_tokens(self, text: str) -> int:
        """Count tokens in text"""
        return len(self.encoding.encode(text))
    
    def estimate_cost(self, input_text: str, output_text: str) -> float:
        """Estimate cost for a request"""
        input_tokens = self.count_tokens(input_text)
        output_tokens = self.count_tokens(output_text)
        
        costs = self.token_costs.get(self.model, self.token_costs["gpt-3.5-turbo"])
        
        input_cost = (input_tokens / 1000) * costs["input"]
        output_cost = (output_tokens / 1000) * costs["output"]
        
        return input_cost + output_cost
    
    def optimize_prompt(self, prompt: str, max_tokens: int = 2000) -> str:
        """Optimize prompt to fit within token limit"""
        tokens = self.count_tokens(prompt)
        
        if tokens <= max_tokens:
            return prompt
        
        # Truncate intelligently
        sentences = prompt.split('. ')
        optimized = []
        current_tokens = 0
        
        for sentence in sentences:
            sentence_tokens = self.count_tokens(sentence + '. ')
            if current_tokens + sentence_tokens <= max_tokens:
                optimized.append(sentence)
                current_tokens += sentence_tokens
            else:
                break
        
        return '. '.join(optimized) + '.'

# Caching layer for cost reduction
class CostEffectiveChain:
    """Chain with caching and model selection for cost optimization"""
    
    def __init__(self, redis_client):
        self.redis_client = redis_client
        self.token_optimizer = TokenOptimizer()
        
        # Model hierarchy from cheapest to most expensive
        self.model_hierarchy = [
            ("gpt-3.5-turbo", 0.7),
            ("gpt-4", 0.5),
            ("claude-3-opus", 0.3)
        ]
    
    def get_cached_response(self, prompt: str) -> Optional[str]:
        """Check cache for response"""
        cache_key = f"llm_cache:{hashlib.md5(prompt.encode()).hexdigest()}"
        cached = self.redis_client.get(cache_key)
        
        if cached:
            # Increment cache hit counter
            self.redis_client.incr("cache_hits")
            return json.loads(cached)["response"]
        
        return None
    
    def cache_response(self, prompt: str, response: str, ttl: int = 3600):
        """Cache response with TTL"""
        cache_key = f"llm_cache:{hashlib.md5(prompt.encode()).hexdigest()}"
        
        self.redis_client.setex(
            cache_key,
            ttl,
            json.dumps({
                "response": response,
                "timestamp": datetime.utcnow().isoformat(),
                "prompt_tokens": self.token_optimizer.count_tokens(prompt),
                "response_tokens": self.token_optimizer.count_tokens(response)
            })
        )
    
    async def run_with_cost_optimization(self, prompt: str) -> Dict[str, Any]:
        """Run chain with cost optimization strategies"""
        
        # 1. Check cache first
        cached = self.get_cached_response(prompt)
        if cached:
            return {
                "response": cached,
                "cached": True,
                "cost": 0,
                "model": "cache"
            }
        
        # 2. Optimize prompt
        optimized_prompt = self.token_optimizer.optimize_prompt(prompt)
        
        # 3. Try models in order of cost
        for model, temperature in self.model_hierarchy:
            try:
                llm = ChatOpenAI(model=model, temperature=temperature)
                
                # Estimate if this will fit in context
                if self.token_optimizer.count_tokens(optimized_prompt) > 3500:
                    continue
                
                response = await llm.ainvoke(optimized_prompt)
                response_text = response.content
                
                # Calculate actual cost
                cost = self.token_optimizer.estimate_cost(
                    optimized_prompt,
                    response_text
                )
                
                # Cache successful response
                self.cache_response(prompt, response_text)
                
                return {
                    "response": response_text,
                    "cached": False,
                    "cost": cost,
                    "model": model,
                    "tokens": {
                        "input": self.token_optimizer.count_tokens(optimized_prompt),
                        "output": self.token_optimizer.count_tokens(response_text)
                    }
                }
                
            except Exception as e:
                logger.warning(f"Model {model} failed: {e}")
                continue
        
        raise Exception("All models failed")

# Batch processing for cost efficiency
class BatchProcessor:
    """Process requests in batches for better pricing"""
    
    def __init__(self, llm, batch_size: int = 10, wait_time: float = 1.0):
        self.llm = llm
        self.batch_size = batch_size
        self.wait_time = wait_time
        self.pending_requests = []
        self.results = {}
    
    async def add_request(self, request_id: str, prompt: str):
        """Add request to batch"""
        self.pending_requests.append({
            "id": request_id,
            "prompt": prompt,
            "timestamp": time.time()
        })
        
        # Process if batch is full
        if len(self.pending_requests) >= self.batch_size:
            await self._process_batch()
        else:
            # Schedule batch processing
            asyncio.create_task(self._delayed_process())
    
    async def _delayed_process(self):
        """Process batch after wait time"""
        await asyncio.sleep(self.wait_time)
        if self.pending_requests:
            await self._process_batch()
    
    async def _process_batch(self):
        """Process all pending requests"""
        if not self.pending_requests:
            return
        
        batch = self.pending_requests.copy()
        self.pending_requests.clear()
        
        # Create batch prompt
        combined_prompt = self._create_batch_prompt(batch)
        
        try:
            # Single LLM call for all requests
            response = await self.llm.ainvoke(combined_prompt)
            
            # Parse and distribute results
            results = self._parse_batch_response(response.content)
            
            for i, request in enumerate(batch):
                if i < len(results):
                    self.results[request["id"]] = {
                        "response": results[i],
                        "batched": True,
                        "batch_size": len(batch)
                    }
                
        except Exception as e:
            logger.error(f"Batch processing failed: {e}")
            # Fall back to individual processing
            for request in batch:
                try:
                    response = await self.llm.ainvoke(request["prompt"])
                    self.results[request["id"]] = {
                        "response": response.content,
                        "batched": False
                    }
                except Exception as e:
                    self.results[request["id"]] = {
                        "error": str(e)
                    }
    
    def _create_batch_prompt(self, batch: List[Dict]) -> str:
        """Create combined prompt for batch"""
        prompt = "Process the following requests and provide numbered responses:\n\n"
        
        for i, request in enumerate(batch, 1):
            prompt += f"{i}. {request['prompt']}\n\n"
        
        prompt += "Provide clear, numbered responses for each request."
        return prompt
    
    def _parse_batch_response(self, response: str) -> List[str]:
        """Parse batch response into individual responses"""
        # Simple parsing - in production, use more robust parsing
        import re
        
        pattern = r'\d+\.\s*(.*?)(?=\d+\.|$)'
        matches = re.findall(pattern, response, re.DOTALL)
        
        return [match.strip() for match in matches]
Cost Optimization Tips
Use caching aggressively
Optimize prompts for token usage
Route to cheaper models when possible
Batch similar requests
Use streaming to stop early
Monitoring Metrics
Token usage per request
Track
Cache hit rate
Track
Cost per user session
Track
Model distribution
Track
References
Additional resources and documentation