What is Function Calling?
How Function Calling Works
Modern LLMs can generate structured function calls when provided with function schemas. The process involves three key steps[1]:
Define Functions
Register available functions with names, descriptions, and parameter schemas
LLM Decides
Model analyzes user input and generates appropriate function calls
Execute & Return
Execute the function and optionally return results to the model
Provider Comparison
Each provider implements function calling slightly differently, though the core concepts remain similar[2]:
from openai import OpenAI
import json
client = OpenAI()
# Define function schema
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The unit of temperature",
},
},
"required": ["location"],
},
},
}
]
# Make API call with functions
response = client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=[
{"role": "user", "content": "What's the weather like in Boston?"}
],
tools=tools,
tool_choice="auto", # Let model decide when to use tools
)
# Check if model wants to call a function
message = response.choices[0].message
if message.tool_calls:
for tool_call in message.tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
print(f"Calling {function_name} with args: {function_args}")
# Execute function (mock implementation)
if function_name == "get_weather":
result = get_weather(**function_args)
# Send result back to model
messages = [
{"role": "user", "content": "What's the weather like in Boston?"},
message, # Include assistant's tool call
{
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result),
}
]
final_response = client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=messages,
)
print(final_response.choices[0].message.content)
Schema Design Best Practices
Well-designed function schemas are crucial for reliable function calling[2][5]. Follow these principles for optimal results:
1. Clear Naming
❌ func1, do_thing, process
✓ search_products, calculate_tax, send_email
2. Detailed Descriptions
{
"name": "search_products",
"description": "Search for products in the inventory. Returns up to 10 products matching the query. Searches across product name, description, and tags. Results are sorted by relevance.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query. Can include product names, features, or categories"
},
"category": {
"type": "string",
"enum": ["electronics", "clothing", "home", "sports", "books"],
"description": "Optional category filter to narrow results"
},
"max_price": {
"type": "number",
"description": "Maximum price in USD. Must be positive number"
},
"in_stock_only": {
"type": "boolean",
"description": "If true, only return products currently in stock",
"default": true
}
},
"required": ["query"]
}
}
3. Type Constraints
"email": { "type": "string", "format": "email", "pattern": "^[\w\.-]+@[\w\.-]+\.\w+$" }
"quantity": { "type": "integer", "minimum": 1, "maximum": 100 }
Error Handling and Validation
Robust error handling is essential for production function calling systems[1]. Implement multiple layers of validation:
from typing import Dict, Any, Optional, List
from pydantic import BaseModel, ValidationError, Field
import json
import logging
logger = logging.getLogger(__name__)
# Define schemas with Pydantic for type safety
class WeatherParams(BaseModel):
location: str = Field(..., description="City and state")
unit: str = Field("fahrenheit", pattern="^(celsius|fahrenheit)$")
class DatabaseQueryParams(BaseModel):
table: str = Field(..., pattern="^[a-zA-Z_][a-zA-Z0-9_]*$")
filters: Dict[str, Any] = Field(default_factory=dict)
limit: int = Field(10, ge=1, le=100)
class FunctionCallHandler:
"""Handles function calls with comprehensive error handling"""
def __init__(self):
self.functions = {
"get_weather": (self.get_weather, WeatherParams),
"query_database": (self.query_database, DatabaseQueryParams),
}
self.call_history = []
def validate_and_execute(
self,
function_name: str,
arguments: Union[str, dict],
tool_call_id: Optional[str] = None
) -> Dict[str, Any]:
"""Validate arguments and execute function with error handling"""
# Log the call
call_record = {
"function": function_name,
"arguments": arguments,
"tool_call_id": tool_call_id,
"timestamp": time.time()
}
self.call_history.append(call_record)
try:
# Check if function exists
if function_name not in self.functions:
return self._error_response(
f"Unknown function: {function_name}",
"FUNCTION_NOT_FOUND"
)
func, schema = self.functions[function_name]
# Parse arguments if string
if isinstance(arguments, str):
try:
arguments = json.loads(arguments)
except json.JSONDecodeError as e:
return self._error_response(
f"Invalid JSON arguments: {e}",
"INVALID_JSON"
)
# Validate with Pydantic
try:
validated_args = schema(**arguments)
except ValidationError as e:
errors = []
for error in e.errors():
field = " -> ".join(str(x) for x in error["loc"])
errors.append(f"{field}: {error['msg']}")
return self._error_response(
f"Validation failed: {'; '.join(errors)}",
"VALIDATION_ERROR"
)
# Execute function with timeout
import signal
def timeout_handler(signum, frame):
raise TimeoutError("Function execution timed out")
# Set timeout (Unix only, use threading.Timer for cross-platform)
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(30) # 30 second timeout
try:
result = func(validated_args)
signal.alarm(0) # Cancel timeout
return {
"success": True,
"result": result,
"function": function_name
}
except TimeoutError:
return self._error_response(
"Function execution timed out after 30 seconds",
"TIMEOUT"
)
except Exception as e:
logger.exception(f"Function {function_name} failed")
return self._error_response(
f"Execution failed: {str(e)}",
"EXECUTION_ERROR"
)
except Exception as e:
logger.exception(f"Unexpected error in function call handler")
return self._error_response(
f"Unexpected error: {str(e)}",
"UNKNOWN_ERROR"
)
def _error_response(self, message: str, code: str) -> Dict[str, Any]:
"""Create standardized error response"""
return {
"success": False,
"error": {
"message": message,
"code": code
}
}
def get_weather(self, params: WeatherParams) -> Dict[str, Any]:
"""Mock weather function"""
# In production, call actual weather API
return {
"location": params.location,
"temperature": 72,
"unit": params.unit,
"conditions": "Partly cloudy"
}
def query_database(self, params: DatabaseQueryParams) -> List[Dict]:
"""Mock database query"""
# Validate table name against whitelist
allowed_tables = ["users", "products", "orders"]
if params.table not in allowed_tables:
raise ValueError(f"Access denied for table: {params.table}")
# In production, use parameterized queries
return [
{"id": 1, "name": "Sample"},
{"id": 2, "name": "Data"}
]
# Usage with OpenAI
handler = FunctionCallHandler()
def process_tool_calls(message) -> List[Dict[str, Any]]:
"""Process all tool calls from a message"""
results = []
if hasattr(message, 'tool_calls') and message.tool_calls:
for tool_call in message.tool_calls:
result = handler.validate_and_execute(
function_name=tool_call.function.name,
arguments=tool_call.function.arguments,
tool_call_id=tool_call.id
)
results.append(result)
return results
Parallel Function Execution
Modern LLMs can request multiple function calls in a single response. Execute them in parallel for better performance[1][3]:
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any
import time
class ParallelFunctionExecutor:
"""Execute multiple function calls in parallel"""
def __init__(self, max_workers: int = 10):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.async_functions = {}
self.sync_functions = {}
def register_function(
self,
name: str,
func: callable,
is_async: bool = False
):
"""Register a function for parallel execution"""
if is_async:
self.async_functions[name] = func
else:
self.sync_functions[name] = func
async def execute_parallel_async(
self,
tool_calls: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Execute multiple async functions in parallel"""
tasks = []
for call in tool_calls:
func_name = call["function"]["name"]
args = call["function"]["arguments"]
if func_name in self.async_functions:
task = self._execute_async_with_metadata(
self.async_functions[func_name],
args,
call.get("id")
)
tasks.append(task)
elif func_name in self.sync_functions:
# Run sync function in thread pool
task = self._execute_sync_in_thread(
self.sync_functions[func_name],
args,
call.get("id")
)
tasks.append(task)
# Wait for all tasks to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
"tool_call_id": tool_calls[i].get("id"),
"error": str(result),
"success": False
})
else:
processed_results.append(result)
return processed_results
async def _execute_async_with_metadata(
self,
func: callable,
args: Dict[str, Any],
tool_call_id: str
) -> Dict[str, Any]:
"""Execute async function with timing and metadata"""
start_time = time.time()
try:
result = await func(**args)
execution_time = time.time() - start_time
return {
"tool_call_id": tool_call_id,
"result": result,
"success": True,
"execution_time": execution_time
}
except Exception as e:
return {
"tool_call_id": tool_call_id,
"error": str(e),
"success": False,
"execution_time": time.time() - start_time
}
async def _execute_sync_in_thread(
self,
func: callable,
args: Dict[str, Any],
tool_call_id: str
) -> Dict[str, Any]:
"""Execute sync function in thread pool"""
loop = asyncio.get_event_loop()
def wrapped():
start_time = time.time()
try:
result = func(**args)
return {
"tool_call_id": tool_call_id,
"result": result,
"success": True,
"execution_time": time.time() - start_time
}
except Exception as e:
return {
"tool_call_id": tool_call_id,
"error": str(e),
"success": False,
"execution_time": time.time() - start_time
}
return await loop.run_in_executor(self.executor, wrapped)
def execute_parallel_sync(
self,
tool_calls: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Execute multiple functions in parallel (sync version)"""
futures = []
for call in tool_calls:
func_name = call["function"]["name"]
args = call["function"]["arguments"]
if func_name in self.sync_functions:
future = self.executor.submit(
self._execute_with_metadata,
self.sync_functions[func_name],
args,
call.get("id")
)
futures.append(future)
# Collect results as they complete
results = []
for future in as_completed(futures):
try:
result = future.result(timeout=30)
results.append(result)
except Exception as e:
results.append({
"error": str(e),
"success": False
})
return results
def _execute_with_metadata(
self,
func: callable,
args: Dict[str, Any],
tool_call_id: str
) -> Dict[str, Any]:
"""Execute function with metadata"""
start_time = time.time()
try:
result = func(**args)
return {
"tool_call_id": tool_call_id,
"result": result,
"success": True,
"execution_time": time.time() - start_time
}
except Exception as e:
return {
"tool_call_id": tool_call_id,
"error": str(e),
"success": False,
"execution_time": time.time() - start_time
}
# Example usage
executor = ParallelFunctionExecutor()
# Register functions
executor.register_function("get_weather", get_weather)
executor.register_function("search_products", search_products)
executor.register_function("calculate_shipping", calculate_shipping)
# Simulate multiple tool calls from LLM
tool_calls = [
{
"id": "call_123",
"function": {
"name": "get_weather",
"arguments": {"location": "Boston", "unit": "fahrenheit"}
}
},
{
"id": "call_124",
"function": {
"name": "search_products",
"arguments": {"query": "laptop", "max_price": 1000}
}
},
{
"id": "call_125",
"function": {
"name": "calculate_shipping",
"arguments": {"weight": 5, "destination": "CA"}
}
}
]
# Execute in parallel
results = executor.execute_parallel_sync(tool_calls)
# Process results
for result in results:
if result["success"]:
print(f"✓ {result['tool_call_id']}: {result['result']}")
print(f" Execution time: {result['execution_time']:.2f}s")
else:
print(f"✗ {result['tool_call_id']}: {result['error']}")
Complex Multi-Step Orchestration
Build sophisticated workflows by chaining function calls and maintaining context between steps[3]:
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
import json
class WorkflowState(Enum):
STARTED = "started"
IN_PROGRESS = "in_progress"
WAITING_FOR_INPUT = "waiting_for_input"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class WorkflowStep:
name: str
function: str
depends_on: List[str] = None
condition: Optional[str] = None
class WorkflowOrchestrator:
"""Orchestrate complex multi-step function calling workflows"""
def __init__(self, llm_client, function_handler):
self.llm_client = llm_client
self.function_handler = function_handler
self.workflows = {}
self.context = {}
def define_workflow(self, name: str, steps: List[WorkflowStep]):
"""Define a reusable workflow"""
self.workflows[name] = steps
async def execute_workflow(
self,
workflow_name: str,
initial_input: str,
max_steps: int = 10
) -> Dict[str, Any]:
"""Execute a predefined workflow"""
if workflow_name not in self.workflows:
raise ValueError(f"Unknown workflow: {workflow_name}")
workflow = self.workflows[workflow_name]
state = WorkflowState.STARTED
context = {
"initial_input": initial_input,
"steps_completed": [],
"results": {},
"messages": [
{"role": "user", "content": initial_input}
]
}
for step_count in range(max_steps):
# Find next step to execute
next_step = self._find_next_step(workflow, context)
if not next_step:
state = WorkflowState.COMPLETED
break
# Check conditions
if next_step.condition and not self._evaluate_condition(
next_step.condition, context
):
continue
# Execute step
result = await self._execute_step(next_step, context)
# Update context
context["steps_completed"].append(next_step.name)
context["results"][next_step.name] = result
# Check if workflow needs user input
if result.get("requires_input"):
state = WorkflowState.WAITING_FOR_INPUT
break
return {
"state": state.value,
"context": context,
"final_result": self._prepare_final_result(context)
}
def _find_next_step(
self,
workflow: List[WorkflowStep],
context: Dict[str, Any]
) -> Optional[WorkflowStep]:
"""Find the next step that can be executed"""
completed = set(context["steps_completed"])
for step in workflow:
# Skip completed steps
if step.name in completed:
continue
# Check dependencies
if step.depends_on:
deps_met = all(dep in completed for dep in step.depends_on)
if not deps_met:
continue
return step
return None
async def _execute_step(
self,
step: WorkflowStep,
context: Dict[str, Any]
) -> Dict[str, Any]:
"""Execute a single workflow step"""
# Prepare prompt with context
prompt = self._build_step_prompt(step, context)
# Get LLM to decide on function parameters
response = await self.llm_client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=context["messages"] + [
{"role": "user", "content": prompt}
],
tools=[self._get_tool_definition(step.function)],
tool_choice={"type": "function", "function": {"name": step.function}}
)
message = response.choices[0].message
context["messages"].append(message)
# Execute function calls
if message.tool_calls:
results = []
for tool_call in message.tool_calls:
result = self.function_handler.validate_and_execute(
tool_call.function.name,
tool_call.function.arguments,
tool_call.id
)
results.append(result)
# Add result to context
context["messages"].append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result)
})
return {"step": step.name, "results": results}
return {"step": step.name, "error": "No function call generated"}
def _build_step_prompt(
self,
step: WorkflowStep,
context: Dict[str, Any]
) -> str:
"""Build prompt for workflow step"""
prompt = f"Execute step: {step.name}\n"
# Add relevant context from previous steps
if step.depends_on:
prompt += "\nContext from previous steps:\n"
for dep in step.depends_on:
if dep in context["results"]:
prompt += f"- {dep}: {json.dumps(context['results'][dep])}\n"
return prompt
# Example: E-commerce order workflow
orchestrator = WorkflowOrchestrator(llm_client, function_handler)
# Define order processing workflow
order_workflow = [
WorkflowStep(
name="validate_customer",
function="lookup_customer",
depends_on=[]
),
WorkflowStep(
name="check_inventory",
function="check_product_availability",
depends_on=["validate_customer"]
),
WorkflowStep(
name="calculate_pricing",
function="calculate_total_price",
depends_on=["check_inventory"]
),
WorkflowStep(
name="process_payment",
function="charge_payment",
depends_on=["calculate_pricing"],
condition="context['results']['calculate_pricing']['total'] > 0"
),
WorkflowStep(
name="create_order",
function="create_order_record",
depends_on=["process_payment"]
),
WorkflowStep(
name="send_confirmation",
function="send_email",
depends_on=["create_order"]
)
]
orchestrator.define_workflow("process_order", order_workflow)
# Execute workflow
result = await orchestrator.execute_workflow(
"process_order",
"I want to order 2 laptops for customer ID 12345"
)
Real-World Examples
Here are practical examples of function calling in production scenarios[1][3]:
tools = [{
"type": "function",
"function": {
"name": "execute_sql_query",
"description": "Execute a SQL query on the database",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "SQL query to execute"
},
"database": {
"type": "string",
"enum": ["customers", "products", "orders"],
"description": "Target database"
}
},
"required": ["query", "database"]
}
}
}]
# Natural language to SQL
user: "Show me all orders over $1000 from last month"
# LLM generates:
execute_sql_query(
query="SELECT * FROM orders WHERE total_amount > 1000 AND created_at >= DATE_SUB(CURDATE(), INTERVAL 1 MONTH)",
database="orders"
)
tools = [{
"type": "function",
"function": {
"name": "call_external_api",
"description": "Make HTTP request to external API",
"parameters": {
"type": "object",
"properties": {
"service": {
"type": "string",
"enum": ["stripe", "sendgrid", "twilio"],
"description": "API service to call"
},
"action": {
"type": "string",
"description": "Action to perform"
},
"params": {
"type": "object",
"description": "API-specific parameters"
}
},
"required": ["service", "action", "params"]
}
}
}]
# Multi-service orchestration
user: "Refund order #123 and notify customer"
# LLM generates multiple calls:
1. call_external_api(service="stripe", action="refund", params={"charge_id": "ch_123"})
2. call_external_api(service="sendgrid", action="send_email", params={"to": "customer@example.com", "template": "refund_confirmation"})
Security Best Practices
Function calling introduces security risks that must be carefully managed[2][5]:
- • Injection attacks: Malicious function parameters
- • Privilege escalation: Accessing unauthorized functions
- • Data exfiltration: Extracting sensitive information
- • Resource exhaustion: Expensive or infinite loops
- • Side effects: Unintended state mutations
- • Input sanitization: Validate all parameters
- • Whitelisting: Only allow specific functions
- • Rate limiting: Prevent abuse
- • Audit logging: Track all function calls
- • Sandboxing: Isolate execution environment
import re
from typing import Set, Dict, Any
import hashlib
import time
class SecureFunctionCaller:
"""Secure function calling with comprehensive safety checks"""
def __init__(self):
self.allowed_functions: Set[str] = set()
self.rate_limits: Dict[str, Dict] = {}
self.audit_log = []
def register_safe_function(
self,
name: str,
func: callable,
rate_limit: int = 100, # calls per minute
requires_auth: bool = False
):
"""Register a function as safe to call"""
self.allowed_functions.add(name)
self.rate_limits[name] = {
"limit": rate_limit,
"window": 60, # seconds
"calls": []
}
def validate_function_call(
self,
function_name: str,
arguments: Dict[str, Any],
user_id: Optional[str] = None
) -> Tuple[bool, Optional[str]]:
"""Validate function call for security"""
# Check if function is allowed
if function_name not in self.allowed_functions:
return False, "Function not in whitelist"
# Check rate limits
if not self._check_rate_limit(function_name, user_id):
return False, "Rate limit exceeded"
# Validate arguments
validation_result = self._validate_arguments(function_name, arguments)
if not validation_result[0]:
return validation_result
# Log the call
self._audit_log_call(function_name, arguments, user_id)
return True, None
def _check_rate_limit(
self,
function_name: str,
user_id: Optional[str]
) -> bool:
"""Check if call is within rate limits"""
limits = self.rate_limits[function_name]
current_time = time.time()
# Clean old calls
limits["calls"] = [
call for call in limits["calls"]
if current_time - call < limits["window"]
]
# Check limit
if len(limits["calls"]) >= limits["limit"]:
return False
# Record call
limits["calls"].append(current_time)
return True
def _validate_arguments(
self,
function_name: str,
arguments: Dict[str, Any]
) -> Tuple[bool, Optional[str]]:
"""Validate function arguments for security issues"""
# SQL injection prevention
if function_name == "execute_sql_query":
query = arguments.get("query", "")
# Block dangerous SQL keywords
dangerous_keywords = [
"DROP", "DELETE", "TRUNCATE", "ALTER",
"GRANT", "REVOKE", "--", "/*", "*/"
]
for keyword in dangerous_keywords:
if keyword in query.upper():
return False, f"Dangerous SQL keyword detected: {keyword}"
# Path traversal prevention
for key, value in arguments.items():
if isinstance(value, str):
if "../" in value or ".."" in value:
return False, "Path traversal attempt detected"
# Check for null bytes
if "\x00" in value or "\0" in value:
return False, "Null byte injection detected"
# Command injection prevention
if function_name in ["execute_command", "run_script"]:
command = arguments.get("command", "")
# Only allow alphanumeric and safe characters
if not re.match(r'^[a-zA-Z0-9\s\-_./]+$', command):
return False, "Invalid characters in command"
return True, None
def _audit_log_call(
self,
function_name: str,
arguments: Dict[str, Any],
user_id: Optional[str]
):
"""Log function call for audit trail"""
# Hash sensitive data
safe_args = {}
sensitive_keys = ["password", "api_key", "secret"]
for key, value in arguments.items():
if any(sensitive in key.lower() for sensitive in sensitive_keys):
safe_args[key] = hashlib.sha256(
str(value).encode()
).hexdigest()[:8] + "..."
else:
safe_args[key] = value
self.audit_log.append({
"timestamp": time.time(),
"function": function_name,
"arguments": safe_args,
"user_id": user_id
})
# Usage
secure_caller = SecureFunctionCaller()
# Register safe functions
secure_caller.register_safe_function(
"search_products",
search_products,
rate_limit=1000
)
secure_caller.register_safe_function(
"get_user_profile",
get_user_profile,
rate_limit=100,
requires_auth=True
)
# Validate before calling
is_valid, error = secure_caller.validate_function_call(
"search_products",
{"query": "'; DROP TABLE products; --"},
user_id="user123"
)
if not is_valid:
print(f"Security check failed: {error}")
Testing Strategies
Comprehensive testing ensures your function calling system works reliably[3]:
import pytest
from unittest.mock import Mock, patch, AsyncMock
import json
class TestFunctionCalling:
"""Comprehensive test suite for function calling"""
@pytest.fixture
def mock_llm_client(self):
"""Mock LLM client for testing"""
client = Mock()
return client
@pytest.fixture
def function_handler(self):
"""Create function handler for tests"""
handler = FunctionCallHandler()
return handler
def test_function_schema_validation(self):
"""Test that function schemas are valid"""
schema = {
"name": "test_function",
"description": "Test function",
"parameters": {
"type": "object",
"properties": {
"param1": {"type": "string"},
"param2": {"type": "number"}
},
"required": ["param1"]
}
}
# Validate schema structure
assert schema["name"].replace("_", "").isalnum()
assert len(schema["description"]) > 0
assert schema["parameters"]["type"] == "object"
def test_argument_validation(self, function_handler):
"""Test argument validation"""
# Valid arguments
result = function_handler.validate_and_execute(
"get_weather",
{"location": "Boston", "unit": "celsius"}
)
assert result["success"] is True
# Missing required argument
result = function_handler.validate_and_execute(
"get_weather",
{"unit": "celsius"} # Missing location
)
assert result["success"] is False
assert "location" in result["error"]["message"]
# Invalid enum value
result = function_handler.validate_and_execute(
"get_weather",
{"location": "Boston", "unit": "kelvin"} # Invalid unit
)
assert result["success"] is False
@pytest.mark.asyncio
async def test_parallel_execution(self):
"""Test parallel function execution"""
executor = ParallelFunctionExecutor()
# Mock functions with delays
async def slow_function(delay: float):
await asyncio.sleep(delay)
return f"Completed after {delay}s"
executor.register_function("slow1", lambda: slow_function(0.1), True)
executor.register_function("slow2", lambda: slow_function(0.2), True)
executor.register_function("slow3", lambda: slow_function(0.3), True)
tool_calls = [
{"id": "1", "function": {"name": "slow1", "arguments": {}}},
{"id": "2", "function": {"name": "slow2", "arguments": {}}},
{"id": "3", "function": {"name": "slow3", "arguments": {}}}
]
# Execute in parallel
start_time = time.time()
results = await executor.execute_parallel_async(tool_calls)
total_time = time.time() - start_time
# Should complete in ~0.3s (not 0.6s sequential)
assert total_time < 0.4
assert all(r["success"] for r in results)
def test_error_handling(self, function_handler):
"""Test error handling scenarios"""
# Function that raises exception
def failing_function():
raise ValueError("Intentional error")
function_handler.functions["failing"] = (failing_function, BaseModel)
result = function_handler.validate_and_execute("failing", {})
assert result["success"] is False
assert "Intentional error" in result["error"]["message"]
def test_security_validation(self):
"""Test security checks"""
secure_caller = SecureFunctionCaller()
secure_caller.register_safe_function("query_db", Mock())
# SQL injection attempt
is_valid, error = secure_caller.validate_function_call(
"query_db",
{"query": "SELECT * FROM users; DROP TABLE users;"}
)
assert is_valid is False
assert "dangerous" in error.lower()
# Path traversal attempt
is_valid, error = secure_caller.validate_function_call(
"read_file",
{"path": "../../etc/passwd"}
)
assert is_valid is False
assert "traversal" in error.lower()
@patch('openai.ChatCompletion.create')
def test_end_to_end_flow(self, mock_create, function_handler):
"""Test complete function calling flow"""
# Mock LLM response with function call
mock_create.return_value = Mock(
choices=[
Mock(
message=Mock(
tool_calls=[
Mock(
id="call_123",
function=Mock(
name="get_weather",
arguments='{"location": "Boston"}'
)
)
]
)
)
]
)
# Process the response
response = mock_create()
message = response.choices[0].message
# Execute function calls
results = []
for tool_call in message.tool_calls:
result = function_handler.validate_and_execute(
tool_call.function.name,
json.loads(tool_call.function.arguments),
tool_call.id
)
results.append(result)
assert len(results) == 1
assert results[0]["success"] is True
assert "temperature" in results[0]["result"]
# Run tests
pytest.main([__file__, "-v"])
Performance Optimization
Optimize function calling for production workloads[3]:
- • Schema caching: Cache parsed schemas
- • Connection pooling: Reuse API connections
- • Batch processing: Group similar calls
- • Async execution: Non-blocking I/O
- • Result caching: Cache deterministic results
- • Lazy loading: Load functions on demand
- • Latency: Function execution time
- • Success rate: Successful vs failed calls
- • Concurrency: Parallel execution count
- • Error types: Validation vs execution
- • Token usage: Function description overhead
- • Cache hit rate: Reused results
Conclusion
Function calling transforms LLMs from text generators into powerful agents capable of real-world actions. By following the patterns and best practices in this guide, you can build robust, secure, and scalable function calling systems[4].
Ready to Build?
References
- [1] OpenAI. "Function Calling Guide" (2024)
- [2] Martin Fowler. "Function calling using LLMs" (2025)
- [3] Apideck. "An introduction to function calling and tool use" (2025)
- [4] Anthropic. "Tool Use (Function Calling)" (2024)
- [5] Google AI. "AI Edge Function Calling SDK" (2025)
- [6] Future AGI. "LLM Function Calling & API Integration: Practical Guide" (2025)
- [7] Daily Dose of Data Science. "Function Calling & MCP for LLMs" (2025)
- [8] Google Cloud. "Function Calling with Gemini" (2024)
- [9] LangChain. "Tools and Function Calling" (2024)
- [10] OpenAI Cookbook. "How to Call Functions with Chat Models" (2024)
- [11] Microsoft. "Azure OpenAI Function Calling" (2024)