Advanced
January 18, 202420 min read

Complete Guide to LLM Function Calling

Master function calling with OpenAI, Claude, and other APIs. Includes schema design, error handling, and real-world examples for building tool-using AI applications.

How Function Calling Works

Modern LLMs can generate structured function calls when provided with function schemas. The process involves three key steps[1]:

Function Calling Flow
1

Define Functions

Register available functions with names, descriptions, and parameter schemas

2

LLM Decides

Model analyzes user input and generates appropriate function calls

3

Execute & Return

Execute the function and optionally return results to the model

Provider Comparison

Each provider implements function calling slightly differently, though the core concepts remain similar[2]:

OpenAI Function Calling
from openai import OpenAI
import json

client = OpenAI()

# Define function schema
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get the current weather in a given location",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "The city and state, e.g. San Francisco, CA",
                    },
                    "unit": {
                        "type": "string",
                        "enum": ["celsius", "fahrenheit"],
                        "description": "The unit of temperature",
                    },
                },
                "required": ["location"],
            },
        },
    }
]

# Make API call with functions
response = client.chat.completions.create(
    model="gpt-4-turbo-preview",
    messages=[
        {"role": "user", "content": "What's the weather like in Boston?"}
    ],
    tools=tools,
    tool_choice="auto",  # Let model decide when to use tools
)

# Check if model wants to call a function
message = response.choices[0].message

if message.tool_calls:
    for tool_call in message.tool_calls:
        function_name = tool_call.function.name
        function_args = json.loads(tool_call.function.arguments)
        
        print(f"Calling {function_name} with args: {function_args}")
        
        # Execute function (mock implementation)
        if function_name == "get_weather":
            result = get_weather(**function_args)
            
            # Send result back to model
            messages = [
                {"role": "user", "content": "What's the weather like in Boston?"},
                message,  # Include assistant's tool call
                {
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": json.dumps(result),
                }
            ]
            
            final_response = client.chat.completions.create(
                model="gpt-4-turbo-preview",
                messages=messages,
            )
            
            print(final_response.choices[0].message.content)

Schema Design Best Practices

Well-designed function schemas are crucial for reliable function calling[2][5]. Follow these principles for optimal results:

Function Schema Guidelines

1. Clear Naming

❌ func1, do_thing, process

✓ search_products, calculate_tax, send_email

2. Detailed Descriptions

{
  "name": "search_products",
  "description": "Search for products in the inventory. Returns up to 10 products matching the query. Searches across product name, description, and tags. Results are sorted by relevance.",
  "parameters": {
    "type": "object",
    "properties": {
      "query": {
        "type": "string",
        "description": "Search query. Can include product names, features, or categories"
      },
      "category": {
        "type": "string",
        "enum": ["electronics", "clothing", "home", "sports", "books"],
        "description": "Optional category filter to narrow results"
      },
      "max_price": {
        "type": "number",
        "description": "Maximum price in USD. Must be positive number"
      },
      "in_stock_only": {
        "type": "boolean",
        "description": "If true, only return products currently in stock",
        "default": true
      }
    },
    "required": ["query"]
  }
}

3. Type Constraints

Strings
"email": {
  "type": "string",
  "format": "email",
  "pattern": "^[\w\.-]+@[\w\.-]+\.\w+$"
}
Numbers
"quantity": {
  "type": "integer",
  "minimum": 1,
  "maximum": 100
}

Error Handling and Validation

Robust error handling is essential for production function calling systems[1]. Implement multiple layers of validation:

Comprehensive Error Handling
from typing import Dict, Any, Optional, List
from pydantic import BaseModel, ValidationError, Field
import json
import logging

logger = logging.getLogger(__name__)

# Define schemas with Pydantic for type safety
class WeatherParams(BaseModel):
    location: str = Field(..., description="City and state")
    unit: str = Field("fahrenheit", pattern="^(celsius|fahrenheit)$")

class DatabaseQueryParams(BaseModel):
    table: str = Field(..., pattern="^[a-zA-Z_][a-zA-Z0-9_]*$")
    filters: Dict[str, Any] = Field(default_factory=dict)
    limit: int = Field(10, ge=1, le=100)

class FunctionCallHandler:
    """Handles function calls with comprehensive error handling"""
    
    def __init__(self):
        self.functions = {
            "get_weather": (self.get_weather, WeatherParams),
            "query_database": (self.query_database, DatabaseQueryParams),
        }
        self.call_history = []
    
    def validate_and_execute(
        self,
        function_name: str,
        arguments: Union[str, dict],
        tool_call_id: Optional[str] = None
    ) -> Dict[str, Any]:
        """Validate arguments and execute function with error handling"""
        
        # Log the call
        call_record = {
            "function": function_name,
            "arguments": arguments,
            "tool_call_id": tool_call_id,
            "timestamp": time.time()
        }
        self.call_history.append(call_record)
        
        try:
            # Check if function exists
            if function_name not in self.functions:
                return self._error_response(
                    f"Unknown function: {function_name}",
                    "FUNCTION_NOT_FOUND"
                )
            
            func, schema = self.functions[function_name]
            
            # Parse arguments if string
            if isinstance(arguments, str):
                try:
                    arguments = json.loads(arguments)
                except json.JSONDecodeError as e:
                    return self._error_response(
                        f"Invalid JSON arguments: {e}",
                        "INVALID_JSON"
                    )
            
            # Validate with Pydantic
            try:
                validated_args = schema(**arguments)
            except ValidationError as e:
                errors = []
                for error in e.errors():
                    field = " -> ".join(str(x) for x in error["loc"])
                    errors.append(f"{field}: {error['msg']}")
                
                return self._error_response(
                    f"Validation failed: {'; '.join(errors)}",
                    "VALIDATION_ERROR"
                )
            
            # Execute function with timeout
            import signal
            
            def timeout_handler(signum, frame):
                raise TimeoutError("Function execution timed out")
            
            # Set timeout (Unix only, use threading.Timer for cross-platform)
            signal.signal(signal.SIGALRM, timeout_handler)
            signal.alarm(30)  # 30 second timeout
            
            try:
                result = func(validated_args)
                signal.alarm(0)  # Cancel timeout
                
                return {
                    "success": True,
                    "result": result,
                    "function": function_name
                }
                
            except TimeoutError:
                return self._error_response(
                    "Function execution timed out after 30 seconds",
                    "TIMEOUT"
                )
            except Exception as e:
                logger.exception(f"Function {function_name} failed")
                return self._error_response(
                    f"Execution failed: {str(e)}",
                    "EXECUTION_ERROR"
                )
                
        except Exception as e:
            logger.exception(f"Unexpected error in function call handler")
            return self._error_response(
                f"Unexpected error: {str(e)}",
                "UNKNOWN_ERROR"
            )
    
    def _error_response(self, message: str, code: str) -> Dict[str, Any]:
        """Create standardized error response"""
        return {
            "success": False,
            "error": {
                "message": message,
                "code": code
            }
        }
    
    def get_weather(self, params: WeatherParams) -> Dict[str, Any]:
        """Mock weather function"""
        # In production, call actual weather API
        return {
            "location": params.location,
            "temperature": 72,
            "unit": params.unit,
            "conditions": "Partly cloudy"
        }
    
    def query_database(self, params: DatabaseQueryParams) -> List[Dict]:
        """Mock database query"""
        # Validate table name against whitelist
        allowed_tables = ["users", "products", "orders"]
        if params.table not in allowed_tables:
            raise ValueError(f"Access denied for table: {params.table}")
        
        # In production, use parameterized queries
        return [
            {"id": 1, "name": "Sample"},
            {"id": 2, "name": "Data"}
        ]

# Usage with OpenAI
handler = FunctionCallHandler()

def process_tool_calls(message) -> List[Dict[str, Any]]:
    """Process all tool calls from a message"""
    results = []
    
    if hasattr(message, 'tool_calls') and message.tool_calls:
        for tool_call in message.tool_calls:
            result = handler.validate_and_execute(
                function_name=tool_call.function.name,
                arguments=tool_call.function.arguments,
                tool_call_id=tool_call.id
            )
            results.append(result)
    
    return results

Parallel Function Execution

Modern LLMs can request multiple function calls in a single response. Execute them in parallel for better performance[1][3]:

Parallel Execution Implementation
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any
import time

class ParallelFunctionExecutor:
    """Execute multiple function calls in parallel"""
    
    def __init__(self, max_workers: int = 10):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.async_functions = {}
        self.sync_functions = {}
    
    def register_function(
        self,
        name: str,
        func: callable,
        is_async: bool = False
    ):
        """Register a function for parallel execution"""
        if is_async:
            self.async_functions[name] = func
        else:
            self.sync_functions[name] = func
    
    async def execute_parallel_async(
        self,
        tool_calls: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        """Execute multiple async functions in parallel"""
        tasks = []
        
        for call in tool_calls:
            func_name = call["function"]["name"]
            args = call["function"]["arguments"]
            
            if func_name in self.async_functions:
                task = self._execute_async_with_metadata(
                    self.async_functions[func_name],
                    args,
                    call.get("id")
                )
                tasks.append(task)
            elif func_name in self.sync_functions:
                # Run sync function in thread pool
                task = self._execute_sync_in_thread(
                    self.sync_functions[func_name],
                    args,
                    call.get("id")
                )
                tasks.append(task)
        
        # Wait for all tasks to complete
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Process results
        processed_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                processed_results.append({
                    "tool_call_id": tool_calls[i].get("id"),
                    "error": str(result),
                    "success": False
                })
            else:
                processed_results.append(result)
        
        return processed_results
    
    async def _execute_async_with_metadata(
        self,
        func: callable,
        args: Dict[str, Any],
        tool_call_id: str
    ) -> Dict[str, Any]:
        """Execute async function with timing and metadata"""
        start_time = time.time()
        
        try:
            result = await func(**args)
            execution_time = time.time() - start_time
            
            return {
                "tool_call_id": tool_call_id,
                "result": result,
                "success": True,
                "execution_time": execution_time
            }
        except Exception as e:
            return {
                "tool_call_id": tool_call_id,
                "error": str(e),
                "success": False,
                "execution_time": time.time() - start_time
            }
    
    async def _execute_sync_in_thread(
        self,
        func: callable,
        args: Dict[str, Any],
        tool_call_id: str
    ) -> Dict[str, Any]:
        """Execute sync function in thread pool"""
        loop = asyncio.get_event_loop()
        
        def wrapped():
            start_time = time.time()
            try:
                result = func(**args)
                return {
                    "tool_call_id": tool_call_id,
                    "result": result,
                    "success": True,
                    "execution_time": time.time() - start_time
                }
            except Exception as e:
                return {
                    "tool_call_id": tool_call_id,
                    "error": str(e),
                    "success": False,
                    "execution_time": time.time() - start_time
                }
        
        return await loop.run_in_executor(self.executor, wrapped)
    
    def execute_parallel_sync(
        self,
        tool_calls: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        """Execute multiple functions in parallel (sync version)"""
        futures = []
        
        for call in tool_calls:
            func_name = call["function"]["name"]
            args = call["function"]["arguments"]
            
            if func_name in self.sync_functions:
                future = self.executor.submit(
                    self._execute_with_metadata,
                    self.sync_functions[func_name],
                    args,
                    call.get("id")
                )
                futures.append(future)
        
        # Collect results as they complete
        results = []
        for future in as_completed(futures):
            try:
                result = future.result(timeout=30)
                results.append(result)
            except Exception as e:
                results.append({
                    "error": str(e),
                    "success": False
                })
        
        return results
    
    def _execute_with_metadata(
        self,
        func: callable,
        args: Dict[str, Any],
        tool_call_id: str
    ) -> Dict[str, Any]:
        """Execute function with metadata"""
        start_time = time.time()
        
        try:
            result = func(**args)
            return {
                "tool_call_id": tool_call_id,
                "result": result,
                "success": True,
                "execution_time": time.time() - start_time
            }
        except Exception as e:
            return {
                "tool_call_id": tool_call_id,
                "error": str(e),
                "success": False,
                "execution_time": time.time() - start_time
            }

# Example usage
executor = ParallelFunctionExecutor()

# Register functions
executor.register_function("get_weather", get_weather)
executor.register_function("search_products", search_products)
executor.register_function("calculate_shipping", calculate_shipping)

# Simulate multiple tool calls from LLM
tool_calls = [
    {
        "id": "call_123",
        "function": {
            "name": "get_weather",
            "arguments": {"location": "Boston", "unit": "fahrenheit"}
        }
    },
    {
        "id": "call_124",
        "function": {
            "name": "search_products",
            "arguments": {"query": "laptop", "max_price": 1000}
        }
    },
    {
        "id": "call_125",
        "function": {
            "name": "calculate_shipping",
            "arguments": {"weight": 5, "destination": "CA"}
        }
    }
]

# Execute in parallel
results = executor.execute_parallel_sync(tool_calls)

# Process results
for result in results:
    if result["success"]:
        print(f"✓ {result['tool_call_id']}: {result['result']}")
        print(f"  Execution time: {result['execution_time']:.2f}s")
    else:
        print(f"✗ {result['tool_call_id']}: {result['error']}")

Complex Multi-Step Orchestration

Build sophisticated workflows by chaining function calls and maintaining context between steps[3]:

Workflow Orchestration Example
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
import json

class WorkflowState(Enum):
    STARTED = "started"
    IN_PROGRESS = "in_progress"
    WAITING_FOR_INPUT = "waiting_for_input"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class WorkflowStep:
    name: str
    function: str
    depends_on: List[str] = None
    condition: Optional[str] = None

class WorkflowOrchestrator:
    """Orchestrate complex multi-step function calling workflows"""
    
    def __init__(self, llm_client, function_handler):
        self.llm_client = llm_client
        self.function_handler = function_handler
        self.workflows = {}
        self.context = {}
    
    def define_workflow(self, name: str, steps: List[WorkflowStep]):
        """Define a reusable workflow"""
        self.workflows[name] = steps
    
    async def execute_workflow(
        self,
        workflow_name: str,
        initial_input: str,
        max_steps: int = 10
    ) -> Dict[str, Any]:
        """Execute a predefined workflow"""
        
        if workflow_name not in self.workflows:
            raise ValueError(f"Unknown workflow: {workflow_name}")
        
        workflow = self.workflows[workflow_name]
        state = WorkflowState.STARTED
        context = {
            "initial_input": initial_input,
            "steps_completed": [],
            "results": {},
            "messages": [
                {"role": "user", "content": initial_input}
            ]
        }
        
        for step_count in range(max_steps):
            # Find next step to execute
            next_step = self._find_next_step(workflow, context)
            
            if not next_step:
                state = WorkflowState.COMPLETED
                break
            
            # Check conditions
            if next_step.condition and not self._evaluate_condition(
                next_step.condition, context
            ):
                continue
            
            # Execute step
            result = await self._execute_step(next_step, context)
            
            # Update context
            context["steps_completed"].append(next_step.name)
            context["results"][next_step.name] = result
            
            # Check if workflow needs user input
            if result.get("requires_input"):
                state = WorkflowState.WAITING_FOR_INPUT
                break
        
        return {
            "state": state.value,
            "context": context,
            "final_result": self._prepare_final_result(context)
        }
    
    def _find_next_step(
        self,
        workflow: List[WorkflowStep],
        context: Dict[str, Any]
    ) -> Optional[WorkflowStep]:
        """Find the next step that can be executed"""
        
        completed = set(context["steps_completed"])
        
        for step in workflow:
            # Skip completed steps
            if step.name in completed:
                continue
            
            # Check dependencies
            if step.depends_on:
                deps_met = all(dep in completed for dep in step.depends_on)
                if not deps_met:
                    continue
            
            return step
        
        return None
    
    async def _execute_step(
        self,
        step: WorkflowStep,
        context: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Execute a single workflow step"""
        
        # Prepare prompt with context
        prompt = self._build_step_prompt(step, context)
        
        # Get LLM to decide on function parameters
        response = await self.llm_client.chat.completions.create(
            model="gpt-4-turbo-preview",
            messages=context["messages"] + [
                {"role": "user", "content": prompt}
            ],
            tools=[self._get_tool_definition(step.function)],
            tool_choice={"type": "function", "function": {"name": step.function}}
        )
        
        message = response.choices[0].message
        context["messages"].append(message)
        
        # Execute function calls
        if message.tool_calls:
            results = []
            for tool_call in message.tool_calls:
                result = self.function_handler.validate_and_execute(
                    tool_call.function.name,
                    tool_call.function.arguments,
                    tool_call.id
                )
                results.append(result)
                
                # Add result to context
                context["messages"].append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": json.dumps(result)
                })
            
            return {"step": step.name, "results": results}
        
        return {"step": step.name, "error": "No function call generated"}
    
    def _build_step_prompt(
        self,
        step: WorkflowStep,
        context: Dict[str, Any]
    ) -> str:
        """Build prompt for workflow step"""
        
        prompt = f"Execute step: {step.name}\n"
        
        # Add relevant context from previous steps
        if step.depends_on:
            prompt += "\nContext from previous steps:\n"
            for dep in step.depends_on:
                if dep in context["results"]:
                    prompt += f"- {dep}: {json.dumps(context['results'][dep])}\n"
        
        return prompt

# Example: E-commerce order workflow
orchestrator = WorkflowOrchestrator(llm_client, function_handler)

# Define order processing workflow
order_workflow = [
    WorkflowStep(
        name="validate_customer",
        function="lookup_customer",
        depends_on=[]
    ),
    WorkflowStep(
        name="check_inventory",
        function="check_product_availability",
        depends_on=["validate_customer"]
    ),
    WorkflowStep(
        name="calculate_pricing",
        function="calculate_total_price",
        depends_on=["check_inventory"]
    ),
    WorkflowStep(
        name="process_payment",
        function="charge_payment",
        depends_on=["calculate_pricing"],
        condition="context['results']['calculate_pricing']['total'] > 0"
    ),
    WorkflowStep(
        name="create_order",
        function="create_order_record",
        depends_on=["process_payment"]
    ),
    WorkflowStep(
        name="send_confirmation",
        function="send_email",
        depends_on=["create_order"]
    )
]

orchestrator.define_workflow("process_order", order_workflow)

# Execute workflow
result = await orchestrator.execute_workflow(
    "process_order",
    "I want to order 2 laptops for customer ID 12345"
)

Real-World Examples

Here are practical examples of function calling in production scenarios[1][3]:

Database Query Assistant
SQL Generation
tools = [{
    "type": "function",
    "function": {
        "name": "execute_sql_query",
        "description": "Execute a SQL query on the database",
        "parameters": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "SQL query to execute"
                },
                "database": {
                    "type": "string",
                    "enum": ["customers", "products", "orders"],
                    "description": "Target database"
                }
            },
            "required": ["query", "database"]
        }
    }
}]

# Natural language to SQL
user: "Show me all orders over $1000 from last month"
# LLM generates:
execute_sql_query(
    query="SELECT * FROM orders WHERE total_amount > 1000 AND created_at >= DATE_SUB(CURDATE(), INTERVAL 1 MONTH)",
    database="orders"
)
API Integration Hub
External APIs
tools = [{
    "type": "function",
    "function": {
        "name": "call_external_api",
        "description": "Make HTTP request to external API",
        "parameters": {
            "type": "object",
            "properties": {
                "service": {
                    "type": "string",
                    "enum": ["stripe", "sendgrid", "twilio"],
                    "description": "API service to call"
                },
                "action": {
                    "type": "string",
                    "description": "Action to perform"
                },
                "params": {
                    "type": "object",
                    "description": "API-specific parameters"
                }
            },
            "required": ["service", "action", "params"]
        }
    }
}]

# Multi-service orchestration
user: "Refund order #123 and notify customer"
# LLM generates multiple calls:
1. call_external_api(service="stripe", action="refund", params={"charge_id": "ch_123"})
2. call_external_api(service="sendgrid", action="send_email", params={"to": "customer@example.com", "template": "refund_confirmation"})

Security Best Practices

Function calling introduces security risks that must be carefully managed[2][5]:

Security Risks
  • Injection attacks: Malicious function parameters
  • Privilege escalation: Accessing unauthorized functions
  • Data exfiltration: Extracting sensitive information
  • Resource exhaustion: Expensive or infinite loops
  • Side effects: Unintended state mutations
Mitigation Strategies
  • Input sanitization: Validate all parameters
  • Whitelisting: Only allow specific functions
  • Rate limiting: Prevent abuse
  • Audit logging: Track all function calls
  • Sandboxing: Isolate execution environment
Security Implementation
import re
from typing import Set, Dict, Any
import hashlib
import time

class SecureFunctionCaller:
    """Secure function calling with comprehensive safety checks"""
    
    def __init__(self):
        self.allowed_functions: Set[str] = set()
        self.rate_limits: Dict[str, Dict] = {}
        self.audit_log = []
        
    def register_safe_function(
        self,
        name: str,
        func: callable,
        rate_limit: int = 100,  # calls per minute
        requires_auth: bool = False
    ):
        """Register a function as safe to call"""
        self.allowed_functions.add(name)
        self.rate_limits[name] = {
            "limit": rate_limit,
            "window": 60,  # seconds
            "calls": []
        }
    
    def validate_function_call(
        self,
        function_name: str,
        arguments: Dict[str, Any],
        user_id: Optional[str] = None
    ) -> Tuple[bool, Optional[str]]:
        """Validate function call for security"""
        
        # Check if function is allowed
        if function_name not in self.allowed_functions:
            return False, "Function not in whitelist"
        
        # Check rate limits
        if not self._check_rate_limit(function_name, user_id):
            return False, "Rate limit exceeded"
        
        # Validate arguments
        validation_result = self._validate_arguments(function_name, arguments)
        if not validation_result[0]:
            return validation_result
        
        # Log the call
        self._audit_log_call(function_name, arguments, user_id)
        
        return True, None
    
    def _check_rate_limit(
        self,
        function_name: str,
        user_id: Optional[str]
    ) -> bool:
        """Check if call is within rate limits"""
        limits = self.rate_limits[function_name]
        current_time = time.time()
        
        # Clean old calls
        limits["calls"] = [
            call for call in limits["calls"]
            if current_time - call < limits["window"]
        ]
        
        # Check limit
        if len(limits["calls"]) >= limits["limit"]:
            return False
        
        # Record call
        limits["calls"].append(current_time)
        return True
    
    def _validate_arguments(
        self,
        function_name: str,
        arguments: Dict[str, Any]
    ) -> Tuple[bool, Optional[str]]:
        """Validate function arguments for security issues"""
        
        # SQL injection prevention
        if function_name == "execute_sql_query":
            query = arguments.get("query", "")
            
            # Block dangerous SQL keywords
            dangerous_keywords = [
                "DROP", "DELETE", "TRUNCATE", "ALTER", 
                "GRANT", "REVOKE", "--", "/*", "*/"
            ]
            
            for keyword in dangerous_keywords:
                if keyword in query.upper():
                    return False, f"Dangerous SQL keyword detected: {keyword}"
        
        # Path traversal prevention
        for key, value in arguments.items():
            if isinstance(value, str):
                if "../" in value or ".."" in value:
                    return False, "Path traversal attempt detected"
                
                # Check for null bytes
                if "\x00" in value or "\0" in value:
                    return False, "Null byte injection detected"
        
        # Command injection prevention
        if function_name in ["execute_command", "run_script"]:
            command = arguments.get("command", "")
            
            # Only allow alphanumeric and safe characters
            if not re.match(r'^[a-zA-Z0-9\s\-_./]+$', command):
                return False, "Invalid characters in command"
        
        return True, None
    
    def _audit_log_call(
        self,
        function_name: str,
        arguments: Dict[str, Any],
        user_id: Optional[str]
    ):
        """Log function call for audit trail"""
        
        # Hash sensitive data
        safe_args = {}
        sensitive_keys = ["password", "api_key", "secret"]
        
        for key, value in arguments.items():
            if any(sensitive in key.lower() for sensitive in sensitive_keys):
                safe_args[key] = hashlib.sha256(
                    str(value).encode()
                ).hexdigest()[:8] + "..."
            else:
                safe_args[key] = value
        
        self.audit_log.append({
            "timestamp": time.time(),
            "function": function_name,
            "arguments": safe_args,
            "user_id": user_id
        })

# Usage
secure_caller = SecureFunctionCaller()

# Register safe functions
secure_caller.register_safe_function(
    "search_products",
    search_products,
    rate_limit=1000
)

secure_caller.register_safe_function(
    "get_user_profile",
    get_user_profile,
    rate_limit=100,
    requires_auth=True
)

# Validate before calling
is_valid, error = secure_caller.validate_function_call(
    "search_products",
    {"query": "'; DROP TABLE products; --"},
    user_id="user123"
)

if not is_valid:
    print(f"Security check failed: {error}")

Testing Strategies

Comprehensive testing ensures your function calling system works reliably[3]:

Testing Framework
import pytest
from unittest.mock import Mock, patch, AsyncMock
import json

class TestFunctionCalling:
    """Comprehensive test suite for function calling"""
    
    @pytest.fixture
    def mock_llm_client(self):
        """Mock LLM client for testing"""
        client = Mock()
        return client
    
    @pytest.fixture
    def function_handler(self):
        """Create function handler for tests"""
        handler = FunctionCallHandler()
        return handler
    
    def test_function_schema_validation(self):
        """Test that function schemas are valid"""
        schema = {
            "name": "test_function",
            "description": "Test function",
            "parameters": {
                "type": "object",
                "properties": {
                    "param1": {"type": "string"},
                    "param2": {"type": "number"}
                },
                "required": ["param1"]
            }
        }
        
        # Validate schema structure
        assert schema["name"].replace("_", "").isalnum()
        assert len(schema["description"]) > 0
        assert schema["parameters"]["type"] == "object"
    
    def test_argument_validation(self, function_handler):
        """Test argument validation"""
        
        # Valid arguments
        result = function_handler.validate_and_execute(
            "get_weather",
            {"location": "Boston", "unit": "celsius"}
        )
        assert result["success"] is True
        
        # Missing required argument
        result = function_handler.validate_and_execute(
            "get_weather",
            {"unit": "celsius"}  # Missing location
        )
        assert result["success"] is False
        assert "location" in result["error"]["message"]
        
        # Invalid enum value
        result = function_handler.validate_and_execute(
            "get_weather",
            {"location": "Boston", "unit": "kelvin"}  # Invalid unit
        )
        assert result["success"] is False
    
    @pytest.mark.asyncio
    async def test_parallel_execution(self):
        """Test parallel function execution"""
        executor = ParallelFunctionExecutor()
        
        # Mock functions with delays
        async def slow_function(delay: float):
            await asyncio.sleep(delay)
            return f"Completed after {delay}s"
        
        executor.register_function("slow1", lambda: slow_function(0.1), True)
        executor.register_function("slow2", lambda: slow_function(0.2), True)
        executor.register_function("slow3", lambda: slow_function(0.3), True)
        
        tool_calls = [
            {"id": "1", "function": {"name": "slow1", "arguments": {}}},
            {"id": "2", "function": {"name": "slow2", "arguments": {}}},
            {"id": "3", "function": {"name": "slow3", "arguments": {}}}
        ]
        
        # Execute in parallel
        start_time = time.time()
        results = await executor.execute_parallel_async(tool_calls)
        total_time = time.time() - start_time
        
        # Should complete in ~0.3s (not 0.6s sequential)
        assert total_time < 0.4
        assert all(r["success"] for r in results)
    
    def test_error_handling(self, function_handler):
        """Test error handling scenarios"""
        
        # Function that raises exception
        def failing_function():
            raise ValueError("Intentional error")
        
        function_handler.functions["failing"] = (failing_function, BaseModel)
        
        result = function_handler.validate_and_execute("failing", {})
        assert result["success"] is False
        assert "Intentional error" in result["error"]["message"]
    
    def test_security_validation(self):
        """Test security checks"""
        secure_caller = SecureFunctionCaller()
        secure_caller.register_safe_function("query_db", Mock())
        
        # SQL injection attempt
        is_valid, error = secure_caller.validate_function_call(
            "query_db",
            {"query": "SELECT * FROM users; DROP TABLE users;"}
        )
        assert is_valid is False
        assert "dangerous" in error.lower()
        
        # Path traversal attempt
        is_valid, error = secure_caller.validate_function_call(
            "read_file",
            {"path": "../../etc/passwd"}
        )
        assert is_valid is False
        assert "traversal" in error.lower()
    
    @patch('openai.ChatCompletion.create')
    def test_end_to_end_flow(self, mock_create, function_handler):
        """Test complete function calling flow"""
        
        # Mock LLM response with function call
        mock_create.return_value = Mock(
            choices=[
                Mock(
                    message=Mock(
                        tool_calls=[
                            Mock(
                                id="call_123",
                                function=Mock(
                                    name="get_weather",
                                    arguments='{"location": "Boston"}'
                                )
                            )
                        ]
                    )
                )
            ]
        )
        
        # Process the response
        response = mock_create()
        message = response.choices[0].message
        
        # Execute function calls
        results = []
        for tool_call in message.tool_calls:
            result = function_handler.validate_and_execute(
                tool_call.function.name,
                json.loads(tool_call.function.arguments),
                tool_call.id
            )
            results.append(result)
        
        assert len(results) == 1
        assert results[0]["success"] is True
        assert "temperature" in results[0]["result"]

# Run tests
pytest.main([__file__, "-v"])

Performance Optimization

Optimize function calling for production workloads[3]:

Optimization Techniques
  • Schema caching: Cache parsed schemas
  • Connection pooling: Reuse API connections
  • Batch processing: Group similar calls
  • Async execution: Non-blocking I/O
  • Result caching: Cache deterministic results
  • Lazy loading: Load functions on demand
Monitoring Metrics
  • Latency: Function execution time
  • Success rate: Successful vs failed calls
  • Concurrency: Parallel execution count
  • Error types: Validation vs execution
  • Token usage: Function description overhead
  • Cache hit rate: Reused results

Conclusion

Function calling transforms LLMs from text generators into powerful agents capable of real-world actions. By following the patterns and best practices in this guide, you can build robust, secure, and scalable function calling systems[4].

References

  1. [1] OpenAI. "Function Calling Guide" (2024)
  2. [2] Martin Fowler. "Function calling using LLMs" (2025)
  3. [3] Apideck. "An introduction to function calling and tool use" (2025)
  4. [4] Anthropic. "Tool Use (Function Calling)" (2024)
  5. [5] Google AI. "AI Edge Function Calling SDK" (2025)
  6. [6] Future AGI. "LLM Function Calling & API Integration: Practical Guide" (2025)
  7. [7] Daily Dose of Data Science. "Function Calling & MCP for LLMs" (2025)
  8. [8] Google Cloud. "Function Calling with Gemini" (2024)
  9. [9] LangChain. "Tools and Function Calling" (2024)
  10. [10] OpenAI Cookbook. "How to Call Functions with Chat Models" (2024)
  11. [11] Microsoft. "Azure OpenAI Function Calling" (2024)