Quick Start
Create a new Slack bot with LLM integration:
# Python setup pip install slack-bolt openai langchain redis pip install python-dotenv aiohttp # Node.js setup npm install @slack/bolt openai langchain redis npm install dotenv express
1. Slack App Setup & Permissions
Create Slack App
- Go to api.slack.com/apps
- Click "Create New App" → "From scratch"
- Name your app and select workspace
- Navigate to "OAuth & Permissions"
Required Bot Token Scopes
# OAuth & Permissions → Bot Token Scopes app_mentions:read # Read messages that mention your bot channels:history # View messages in public channels channels:join # Join public channels channels:read # View basic channel info chat:write # Send messages files:read # Access files shared in conversations files:write # Upload files groups:history # View messages in private channels groups:read # View basic private channel info im:history # View direct messages im:read # View basic DM info im:write # Send direct messages mpim:history # View group DM messages mpim:read # View basic group DM info mpim:write # Send group DM messages reactions:write # Add emoji reactions users:read # View user info commands # Add slash commands
App Configuration
// config/slack-config.ts
export interface SlackConfig {
signingSecret: string
botToken: string
appToken: string
clientId: string
clientSecret: string
stateSecret: string
}
export const slackConfig: SlackConfig = {
signingSecret: process.env.SLACK_SIGNING_SECRET!,
botToken: process.env.SLACK_BOT_TOKEN!,
appToken: process.env.SLACK_APP_TOKEN!,
clientId: process.env.SLACK_CLIENT_ID!,
clientSecret: process.env.SLACK_CLIENT_SECRET!,
stateSecret: process.env.SLACK_STATE_SECRET!,
}
// Environment variables (.env)
SLACK_SIGNING_SECRET=your_signing_secret
SLACK_BOT_TOKEN=xoxb-your-bot-token
SLACK_APP_TOKEN=xapp-your-app-token
SLACK_CLIENT_ID=your_client_id
SLACK_CLIENT_SECRET=your_client_secret
OPENAI_API_KEY=sk-...
PARROTROUTER_API_KEY=your_api_key2. Event Handling
Python Implementation with Slack Bolt
# bot.py
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
import openai
from typing import Dict, Any
import re
# Initialize app
app = App(token=SLACK_BOT_TOKEN, signing_secret=SLACK_SIGNING_SECRET)
openai.api_key = OPENAI_API_KEY
# Handle app mentions
@app.event("app_mention")
def handle_app_mention(event: Dict[str, Any], say, client, logger):
"""Handle when someone mentions the bot"""
try:
# Extract user and text
user = event["user"]
text = event["text"]
channel = event["channel"]
thread_ts = event.get("thread_ts", event["ts"])
# Remove bot mention from text
bot_user_id = client.auth_test()["user_id"]
cleaned_text = re.sub(f'<@{bot_user_id}>', '', text).strip()
# Generate LLM response
response = generate_llm_response(cleaned_text, user)
# Reply in thread
say(
text=response,
thread_ts=thread_ts,
channel=channel
)
except Exception as e:
logger.error(f"Error handling mention: {e}")
say("Sorry, I encountered an error processing your request.")
# Handle direct messages
@app.event("message")
def handle_direct_message(event: Dict[str, Any], say, client, logger):
"""Handle direct messages to the bot"""
# Skip if it's a bot message or thread broadcast
if event.get("subtype") or event.get("bot_id"):
return
# Check if it's a DM
channel_info = client.conversations_info(channel=event["channel"])
is_dm = channel_info["channel"]["is_im"]
if is_dm:
user = event["user"]
text = event["text"]
# Generate response
response = generate_llm_response(text, user)
# Reply
say(response)
# Handle message in channels (optional)
@app.event("message")
def handle_channel_message(event: Dict[str, Any], say, client, logger):
"""Handle messages in channels where bot is present"""
# Skip if not mentioned or if it's a bot message
if event.get("subtype") or event.get("bot_id"):
return
# Check if bot is mentioned in the message
bot_user_id = client.auth_test()["user_id"]
if f"<@{bot_user_id}>" in event.get("text", ""):
# Already handled by app_mention event
return
# Optional: Respond to specific keywords
text = event.get("text", "").lower()
if "help" in text and "bot" in text:
say(
text="Need help? Just mention me with your question!",
thread_ts=event["ts"]
)
def generate_llm_response(text: str, user_id: str) -> str:
"""Generate response using LLM"""
try:
# Get user info for personalization
user_info = app.client.users_info(user=user_id)
user_name = user_info["user"]["real_name"]
# Create conversation with context
messages = [
{
"role": "system",
"content": f"""You are a helpful Slack bot assistant powered by ParrotRouter.
You're talking to {user_name}. Be friendly, concise, and helpful.
Format your responses using Slack markdown when appropriate."""
},
{
"role": "user",
"content": text
}
]
# Call LLM API
response = openai.ChatCompletion.create(
model="gpt-4",
messages=messages,
temperature=0.7,
max_tokens=500
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"LLM Error: {e}")
return "I'm having trouble generating a response right now. Please try again later."Node.js Implementation
// bot.ts
import { App, LogLevel } from '@slack/bolt'
import { WebClient } from '@slack/web-api'
import OpenAI from 'openai'
const app = new App({
token: process.env.SLACK_BOT_TOKEN,
signingSecret: process.env.SLACK_SIGNING_SECRET,
socketMode: true,
appToken: process.env.SLACK_APP_TOKEN,
logLevel: LogLevel.DEBUG,
})
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
})
// Handle app mentions
app.event('app_mention', async ({ event, context, client, say }) => {
try {
const { user, text, channel, thread_ts, ts } = event
// Remove bot mention
const botUserId = context.botUserId
const cleanedText = text.replace(new RegExp(`<@${botUserId}>`, 'g'), '').trim()
// Generate response
const response = await generateLLMResponse(cleanedText, user)
// Reply in thread
await say({
text: response,
thread_ts: thread_ts || ts,
channel,
})
} catch (error) {
console.error('Error handling mention:', error)
await say('Sorry, I encountered an error processing your request.')
}
})
// Handle direct messages
app.message(async ({ message, say, client }) => {
// Skip bot messages
if (message.subtype || message.bot_id) return
try {
// Check if DM
const channelInfo = await client.conversations.info({
channel: message.channel,
})
if (channelInfo.channel?.is_im) {
const response = await generateLLMResponse(message.text!, message.user!)
await say(response)
}
} catch (error) {
console.error('Error handling DM:', error)
}
})
async function generateLLMResponse(text: string, userId: string): Promise<string> {
try {
// Get user info
const userInfo = await app.client.users.info({ user: userId })
const userName = userInfo.user?.real_name || 'there'
const completion = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{
role: 'system',
content: `You are a helpful Slack bot assistant powered by ParrotRouter.
You're talking to ${userName}. Be friendly, concise, and helpful.
Use Slack markdown for formatting.`,
},
{
role: 'user',
content: text,
},
],
temperature: 0.7,
max_tokens: 500,
})
return completion.choices[0].message.content || 'I couldn't generate a response.'
} catch (error) {
console.error('LLM Error:', error)
return 'I'm having trouble generating a response right now.'
}
}3. Slash Commands
Command Registration
In your Slack app settings, go to "Slash Commands" and create commands:
/ask [question]- Ask the AI a question/summarize [url or text]- Summarize content/help- Show available commands
Command Implementation
# slash_commands.py
from slack_bolt import App
from slack_sdk.models.blocks import SectionBlock, DividerBlock, MarkdownTextObject
import asyncio
from typing import Dict, Any
@app.command("/ask")
def handle_ask_command(ack, respond, command):
"""Handle /ask command"""
# Acknowledge command request within 3 seconds
ack()
user_id = command["user_id"]
text = command["text"]
if not text:
respond("Please provide a question after /ask")
return
# Show loading message
respond("🤔 Thinking...")
try:
# Generate response
response = generate_llm_response(text, user_id)
# Format response with blocks
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Question:* {text}"
}
},
{"type": "divider"},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Answer:*\n{response}"
}
}
]
# Update with formatted response
respond(blocks=blocks, replace_original=True)
except Exception as e:
respond(f"Error: {str(e)}", replace_original=True)
@app.command("/summarize")
async def handle_summarize_command(ack, respond, command, client):
"""Handle /summarize command"""
ack()
text = command["text"]
if not text:
respond("Please provide text or a URL to summarize")
return
# Check if it's a URL
url_pattern = re.compile(r'https?://\S+')
urls = url_pattern.findall(text)
try:
if urls:
# Handle URL summarization
content = await fetch_url_content(urls[0])
summary = await summarize_content(content)
else:
# Summarize provided text
summary = await summarize_content(text)
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "📄 Summary"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": summary
}
}
]
respond(blocks=blocks)
except Exception as e:
respond(f"Error summarizing: {str(e)}")
@app.command("/help")
def handle_help_command(ack, respond):
"""Handle /help command"""
ack()
help_text = """
*Available Commands:*
• `/ask [question]` - Ask me anything
• `/summarize [text or URL]` - Get a summary of text or webpage
• `/help` - Show this help message
*How to use me:*
• Mention me in any channel with @bot-name
• Send me a direct message
• Use slash commands for specific tasks
*Tips:*
• I remember context within threads
• I can analyze images if you share them
• I format code with ```language blocks
"""
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "🤖 Bot Help"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": help_text
}
}
]
respond(blocks=blocks)4. Interactive Components
Interactive Messages with Buttons
# interactive_components.py
@app.command("/feedback")
def handle_feedback_command(ack, respond, command):
"""Handle feedback command with interactive buttons"""
ack()
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "How would you rate my response?"
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {
"type": "plain_text",
"text": "👍 Good"
},
"style": "primary",
"action_id": "feedback_good",
"value": "good"
},
{
"type": "button",
"text": {
"type": "plain_text",
"text": "👎 Bad"
},
"style": "danger",
"action_id": "feedback_bad",
"value": "bad"
}
]
}
]
respond(blocks=blocks)
@app.action("feedback_good")
def handle_good_feedback(ack, body, respond):
"""Handle positive feedback"""
ack()
user = body["user"]["id"]
# Log feedback
log_feedback(user, "good", body.get("message", {}).get("ts"))
# Update message
respond(
text="Thanks for the positive feedback! 🎉",
replace_original=True
)
@app.action("feedback_bad")
def handle_bad_feedback(ack, body, client):
"""Handle negative feedback - open modal for details"""
ack()
client.views_open(
trigger_id=body["trigger_id"],
view={
"type": "modal",
"callback_id": "feedback_modal",
"title": {
"type": "plain_text",
"text": "Feedback Details"
},
"submit": {
"type": "plain_text",
"text": "Submit"
},
"close": {
"type": "plain_text",
"text": "Cancel"
},
"blocks": [
{
"type": "input",
"block_id": "feedback_input",
"element": {
"type": "plain_text_input",
"action_id": "feedback_text",
"multiline": True,
"placeholder": {
"type": "plain_text",
"text": "What could I improve?"
}
},
"label": {
"type": "plain_text",
"text": "Your Feedback"
}
}
]
}
)Modal Handling
@app.view("feedback_modal")
def handle_feedback_submission(ack, body, view, client):
"""Handle modal submission"""
ack()
# Extract values
user = body["user"]["id"]
feedback = view["state"]["values"]["feedback_input"]["feedback_text"]["value"]
# Store feedback
store_feedback(user, feedback)
# Send confirmation DM
client.chat_postMessage(
channel=user,
text=f"Thank you for your feedback! We'll use it to improve: \n\n_{feedback}_"
)
# Select Menu Example
@app.command("/settings")
def show_settings(ack, respond, command):
"""Show settings with select menu"""
ack()
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "Configure your bot preferences:"
},
"accessory": {
"type": "static_select",
"placeholder": {
"type": "plain_text",
"text": "Select model"
},
"action_id": "model_select",
"options": [
{
"text": {
"type": "plain_text",
"text": "GPT-4 (Most capable)"
},
"value": "gpt-4"
},
{
"text": {
"type": "plain_text",
"text": "GPT-3.5 (Faster)"
},
"value": "gpt-3.5-turbo"
},
{
"text": {
"type": "plain_text",
"text": "Claude 3 (Anthropic)"
},
"value": "claude-3"
}
]
}
}
]
respond(blocks=blocks)
@app.action("model_select")
def handle_model_selection(ack, body, client):
"""Handle model selection"""
ack()
selected_model = body["actions"][0]["selected_option"]["value"]
user_id = body["user"]["id"]
# Save user preference
save_user_preference(user_id, "model", selected_model)
# Send confirmation
client.chat_postMessage(
channel=user_id,
text=f"✅ Model preference updated to: {selected_model}"
)5. Thread Management
Conversation Threading
# thread_management.py
from typing import List, Dict, Any
import redis
import json
class ThreadManager:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.ttl = 3600 * 24 # 24 hours
def get_thread_context(self, thread_ts: str) -> List[Dict[str, Any]]:
"""Get conversation history for a thread"""
key = f"thread:{thread_ts}"
data = self.redis.get(key)
if data:
return json.loads(data)
return []
def add_to_thread(self, thread_ts: str, role: str, content: str):
"""Add message to thread context"""
context = self.get_thread_context(thread_ts)
context.append({
"role": role,
"content": content,
"timestamp": datetime.now().isoformat()
})
# Keep only last 10 messages for context
if len(context) > 10:
context = context[-10:]
self.redis.setex(
f"thread:{thread_ts}",
self.ttl,
json.dumps(context)
)
def clear_thread(self, thread_ts: str):
"""Clear thread context"""
self.redis.delete(f"thread:{thread_ts}")
# Initialize thread manager
thread_manager = ThreadManager(redis.from_url(REDIS_URL))
@app.event("app_mention")
def handle_mention_with_context(event, say, client):
"""Handle mentions with thread context"""
user = event["user"]
text = event["text"]
thread_ts = event.get("thread_ts", event["ts"])
# Get thread context
context = thread_manager.get_thread_context(thread_ts)
# Add current message to context
thread_manager.add_to_thread(thread_ts, "user", text)
# Generate response with context
response = generate_contextual_response(text, context, user)
# Add response to context
thread_manager.add_to_thread(thread_ts, "assistant", response)
# Reply in thread
say(
text=response,
thread_ts=thread_ts
)
def generate_contextual_response(
message: str,
context: List[Dict[str, Any]],
user_id: str
) -> str:
"""Generate response with conversation context"""
# Build messages for LLM
messages = [
{
"role": "system",
"content": """You are a helpful Slack bot.
You have access to the conversation history.
Maintain context and refer to previous messages when relevant."""
}
]
# Add context messages
for ctx_msg in context[:-1]: # Exclude current message
messages.append({
"role": ctx_msg["role"],
"content": ctx_msg["content"]
})
# Add current message
messages.append({
"role": "user",
"content": message
})
# Generate response
response = openai.ChatCompletion.create(
model="gpt-4",
messages=messages,
temperature=0.7
)
return response.choices[0].message.content
# Thread summary command
@app.command("/summarize_thread")
def summarize_thread(ack, respond, command, client):
"""Summarize a conversation thread"""
ack()
# Get thread messages
channel = command["channel_id"]
# Find most recent thread in channel
conversations = client.conversations_history(
channel=channel,
limit=20
)
thread_ts = None
for message in conversations["messages"]:
if "thread_ts" in message:
thread_ts = message["thread_ts"]
break
if not thread_ts:
respond("No recent threads found in this channel")
return
# Get thread replies
thread_messages = client.conversations_replies(
channel=channel,
ts=thread_ts
)
# Create summary
summary_prompt = "Summarize this Slack conversation:\n\n"
for msg in thread_messages["messages"]:
summary_prompt += f"{msg.get('user', 'Unknown')}: {msg.get('text', '')}\n"
summary = generate_llm_response(summary_prompt, command["user_id"])
respond(f"*Thread Summary:*\n{summary}")6. Rate Limiting
Rate Limiter Implementation
# rate_limiting.py
from typing import Dict, Tuple
import time
import redis
from functools import wraps
class RateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
# Slack API limits
self.slack_limits = {
"chat.postMessage": (1, 1), # 1 per second
"conversations.history": (50, 60), # 50 per minute
"users.info": (100, 60), # 100 per minute
}
# LLM API limits (example)
self.llm_limits = {
"openai": (60, 60), # 60 per minute
"anthropic": (50, 60), # 50 per minute
}
def check_rate_limit(self, key: str, max_requests: int, window: int) -> Tuple[bool, int]:
"""Check if request is within rate limit"""
now = time.time()
pipeline = self.redis.pipeline()
pipeline.zremrangebyscore(key, 0, now - window)
pipeline.zadd(key, {str(now): now})
pipeline.zcount(key, now - window, now)
pipeline.expire(key, window)
results = pipeline.execute()
current_requests = results[2]
if current_requests > max_requests:
# Calculate wait time
oldest = self.redis.zrange(key, 0, 0, withscores=True)
if oldest:
wait_time = int(oldest[0][1] + window - now) + 1
return False, wait_time
return False, window
return True, 0
def rate_limit_slack(self, method: str):
"""Decorator for Slack API rate limiting"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
if method in self.slack_limits:
limit, window = self.slack_limits[method]
key = f"slack_rate:{method}"
allowed, wait_time = self.check_rate_limit(key, limit, window)
if not allowed:
time.sleep(wait_time)
return func(*args, **kwargs)
return wrapper
return decorator
def rate_limit_llm(self, provider: str):
"""Decorator for LLM API rate limiting"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
if provider in self.llm_limits:
limit, window = self.llm_limits[provider]
key = f"llm_rate:{provider}"
allowed, wait_time = self.check_rate_limit(key, limit, window)
if not allowed:
await asyncio.sleep(wait_time)
return await func(*args, **kwargs)
return wrapper
return decorator
# Initialize rate limiter
rate_limiter = RateLimiter(redis.from_url(REDIS_URL))
# Usage example
@rate_limiter.rate_limit_slack("chat.postMessage")
def send_message(channel: str, text: str):
"""Rate-limited message sending"""
return app.client.chat_postMessage(
channel=channel,
text=text
)
@rate_limiter.rate_limit_llm("openai")
async def call_openai(messages: List[Dict]):
"""Rate-limited OpenAI call"""
return await openai.ChatCompletion.acreate(
model="gpt-4",
messages=messages
)
# User-level rate limiting
class UserRateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.user_limits = {
"default": (10, 60), # 10 requests per minute
"premium": (100, 60), # 100 requests per minute
}
def check_user_limit(self, user_id: str, user_type: str = "default") -> Tuple[bool, str]:
"""Check user-specific rate limits"""
limit, window = self.user_limits.get(user_type, self.user_limits["default"])
key = f"user_rate:{user_id}"
allowed, wait_time = rate_limiter.check_rate_limit(key, limit, window)
if not allowed:
return False, f"Rate limit exceeded. Please wait {wait_time} seconds."
return True, ""
user_limiter = UserRateLimiter(redis.from_url(REDIS_URL))
@app.event("app_mention")
def handle_mention_with_rate_limit(event, say):
"""Handle mention with user rate limiting"""
user_id = event["user"]
# Check user rate limit
allowed, message = user_limiter.check_user_limit(user_id)
if not allowed:
say(message, thread_ts=event.get("thread_ts", event["ts"]))
return
# Process request
handle_app_mention(event, say)Rate Limiting Best Practices
- • Implement exponential backoff for retries
- • Cache user information to reduce API calls
- • Use Slack's rate limit headers to adjust dynamically
- • Queue non-urgent messages for batch processing
- • Monitor rate limit usage and alert on high usage
7. Context Management
Advanced Context Manager
# context_management.py
from typing import List, Dict, Any, Optional
import redis
import json
from datetime import datetime, timedelta
from langchain.memory import ConversationBufferWindowMemory
from langchain.schema import BaseMessage, HumanMessage, AIMessage
class ConversationContextManager:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.memories = {} # In-memory cache for active conversations
self.ttl = 3600 * 24 # 24 hours
self.max_messages = 20 # Maximum messages to keep
def get_or_create_memory(self, conversation_id: str) -> ConversationBufferWindowMemory:
"""Get or create conversation memory"""
if conversation_id not in self.memories:
# Try to load from Redis
stored_messages = self.load_conversation(conversation_id)
memory = ConversationBufferWindowMemory(
k=self.max_messages,
return_messages=True
)
# Restore messages
for msg in stored_messages:
if msg["role"] == "human":
memory.chat_memory.add_user_message(msg["content"])
elif msg["role"] == "ai":
memory.chat_memory.add_ai_message(msg["content"])
self.memories[conversation_id] = memory
return self.memories[conversation_id]
def add_message(
self,
conversation_id: str,
role: str,
content: str,
metadata: Optional[Dict] = None
):
"""Add message to conversation"""
memory = self.get_or_create_memory(conversation_id)
if role == "human":
memory.chat_memory.add_user_message(content)
elif role == "ai":
memory.chat_memory.add_ai_message(content)
# Save to Redis
self.save_conversation(conversation_id, memory, metadata)
def save_conversation(
self,
conversation_id: str,
memory: ConversationBufferWindowMemory,
metadata: Optional[Dict] = None
):
"""Save conversation to Redis"""
messages = []
for message in memory.chat_memory.messages:
msg_dict = {
"role": "human" if isinstance(message, HumanMessage) else "ai",
"content": message.content,
"timestamp": datetime.now().isoformat()
}
if metadata:
msg_dict["metadata"] = metadata
messages.append(msg_dict)
key = f"conversation:{conversation_id}"
self.redis.setex(
key,
self.ttl,
json.dumps(messages)
)
def load_conversation(self, conversation_id: str) -> List[Dict]:
"""Load conversation from Redis"""
key = f"conversation:{conversation_id}"
data = self.redis.get(key)
if data:
return json.loads(data)
return []
def get_context_summary(self, conversation_id: str) -> str:
"""Get a summary of the conversation context"""
memory = self.get_or_create_memory(conversation_id)
messages = memory.chat_memory.messages
if not messages:
return "No previous context."
# Create a brief summary
summary = f"Previous {len(messages)} messages in conversation:\n"
for i, msg in enumerate(messages[-5:]): # Last 5 messages
role = "User" if isinstance(msg, HumanMessage) else "Assistant"
summary += f"{role}: {msg.content[:100]}...\n"
return summary
def clear_old_conversations(self, days: int = 7):
"""Clear conversations older than specified days"""
pattern = "conversation:*"
cursor = 0
while True:
cursor, keys = self.redis.scan(cursor, match=pattern)
for key in keys:
data = self.redis.get(key)
if data:
messages = json.loads(data)
if messages:
last_timestamp = messages[-1].get("timestamp")
if last_timestamp:
last_date = datetime.fromisoformat(last_timestamp)
if datetime.now() - last_date > timedelta(days=days):
self.redis.delete(key)
if cursor == 0:
break
# Initialize context manager
context_manager = ConversationContextManager(redis.from_url(REDIS_URL))
# Integration with bot
@app.event("app_mention")
def handle_mention_with_context(event, say):
"""Handle mention with full context management"""
user_id = event["user"]
channel = event["channel"]
text = event["text"]
thread_ts = event.get("thread_ts", event["ts"])
# Create conversation ID (channel + thread)
conversation_id = f"{channel}:{thread_ts}"
# Add user message to context
context_manager.add_message(
conversation_id,
"human",
text,
metadata={"user_id": user_id, "channel": channel}
)
# Get conversation memory
memory = context_manager.get_or_create_memory(conversation_id)
# Generate response with context
messages = [
{
"role": "system",
"content": "You are a helpful Slack bot assistant powered by ParrotRouter."
}
]
# Add conversation history
for msg in memory.chat_memory.messages:
if isinstance(msg, HumanMessage):
messages.append({"role": "user", "content": msg.content})
else:
messages.append({"role": "assistant", "content": msg.content})
# Generate response
response = openai.ChatCompletion.create(
model="gpt-4",
messages=messages,
temperature=0.7
)
ai_response = response.choices[0].message.content
# Add AI response to context
context_manager.add_message(conversation_id, "ai", ai_response)
# Send response
say(text=ai_response, thread_ts=thread_ts)8. File & Image Handling
File Processing
# file_handling.py
import requests
from typing import Optional, Dict, Any
import mimetypes
import base64
from PIL import Image
import io
import PyPDF2
import docx
class FileHandler:
def __init__(self, bot_token: str):
self.bot_token = bot_token
self.headers = {"Authorization": f"Bearer {bot_token}"}
def download_file(self, file_url: str) -> bytes:
"""Download file from Slack"""
response = requests.get(file_url, headers=self.headers)
response.raise_for_status()
return response.content
def process_file(self, file_info: Dict[str, Any]) -> Dict[str, Any]:
"""Process different file types"""
file_url = file_info["url_private"]
mimetype = file_info["mimetype"]
filename = file_info["name"]
# Download file
file_content = self.download_file(file_url)
result = {
"filename": filename,
"mimetype": mimetype,
"size": file_info["size"]
}
# Process based on file type
if mimetype.startswith("image/"):
result.update(self.process_image(file_content, mimetype))
elif mimetype == "application/pdf":
result.update(self.process_pdf(file_content))
elif mimetype in ["application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/msword"]:
result.update(self.process_word(file_content))
elif mimetype.startswith("text/"):
result.update(self.process_text(file_content))
else:
result["error"] = f"Unsupported file type: {mimetype}"
return result
def process_image(self, content: bytes, mimetype: str) -> Dict[str, Any]:
"""Process image for vision models"""
# Open image
image = Image.open(io.BytesIO(content))
# Resize if too large
max_size = (1024, 1024)
if image.size[0] > max_size[0] or image.size[1] > max_size[1]:
image.thumbnail(max_size, Image.LANCZOS)
# Convert to base64
buffered = io.BytesIO()
image.save(buffered, format=image.format or "PNG")
base64_image = base64.b64encode(buffered.getvalue()).decode()
return {
"type": "image",
"base64": base64_image,
"dimensions": image.size,
"mode": image.mode
}
def process_pdf(self, content: bytes) -> Dict[str, Any]:
"""Extract text from PDF"""
pdf_reader = PyPDF2.PdfReader(io.BytesIO(content))
text = ""
for page_num in range(len(pdf_reader.pages)):
page = pdf_reader.pages[page_num]
text += page.extract_text() + "\n"
return {
"type": "pdf",
"text": text.strip(),
"pages": len(pdf_reader.pages)
}
def process_word(self, content: bytes) -> Dict[str, Any]:
"""Extract text from Word document"""
doc = docx.Document(io.BytesIO(content))
text = "\n".join([paragraph.text for paragraph in doc.paragraphs])
return {
"type": "word",
"text": text.strip(),
"paragraphs": len(doc.paragraphs)
}
def process_text(self, content: bytes) -> Dict[str, Any]:
"""Process text file"""
try:
text = content.decode('utf-8')
except UnicodeDecodeError:
text = content.decode('latin-1')
return {
"type": "text",
"text": text
}
# Initialize file handler
file_handler = FileHandler(SLACK_BOT_TOKEN)
@app.event("file_shared")
def handle_file_shared(event, say, client):
"""Handle file sharing events"""
file_id = event["file_id"]
# Get file info
file_info = client.files_info(file=file_id)["file"]
# Process file
try:
result = file_handler.process_file(file_info)
if "error" in result:
say(f"Sorry, I couldn't process this file: {result['error']}")
return
# Handle based on file type
if result["type"] == "image":
# Use vision model
response = analyze_image_with_llm(result["base64"])
say(f"*Image Analysis:*\n{response}")
elif result["type"] in ["pdf", "word", "text"]:
# Summarize or answer questions about the document
text = result["text"][:4000] # Limit text length
summary = summarize_document(text)
say(f"*Document Summary:*\n{summary}")
# Offer to answer questions
say("Feel free to ask me questions about this document!")
# Store document in context
thread_ts = event.get("thread_ts", event["ts"])
context_manager.add_message(
f"{event['channel']}:{thread_ts}",
"system",
f"Document content: {text}",
metadata={"file_id": file_id, "filename": file_info["name"]}
)
except Exception as e:
logger.error(f"Error processing file: {e}")
say("Sorry, I encountered an error processing this file.")
def analyze_image_with_llm(base64_image: str) -> str:
"""Analyze image using vision model"""
response = openai.ChatCompletion.create(
model="gpt-4-vision-preview",
messages=[
{
"role": "system",
"content": "You are analyzing an image shared in Slack. Provide a helpful description."
},
{
"role": "user",
"content": [
{
"type": "text",
"text": "What's in this image?"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{base64_image}"
}
}
]
}
],
max_tokens=500
)
return response.choices[0].message.content9. Security Best Practices
Request Verification
# security.py
import hmac
import hashlib
import time
from typing import Dict, Any
class SlackSecurityHandler:
def __init__(self, signing_secret: str):
self.signing_secret = signing_secret
def verify_request(self, headers: Dict[str, str], body: str) -> bool:
"""Verify Slack request signature"""
timestamp = headers.get("X-Slack-Request-Timestamp", "")
signature = headers.get("X-Slack-Signature", "")
# Check timestamp to prevent replay attacks
if abs(time.time() - float(timestamp)) > 60 * 5:
return False
# Create signature base string
sig_basestring = f"v0:{timestamp}:{body}"
# Calculate expected signature
my_signature = "v0=" + hmac.new(
self.signing_secret.encode(),
sig_basestring.encode(),
hashlib.sha256
).hexdigest()
# Compare signatures
return hmac.compare_digest(my_signature, signature)
# Input sanitization
class InputSanitizer:
@staticmethod
def sanitize_user_input(text: str) -> str:
"""Sanitize user input before processing"""
# Remove potential injection attempts
sanitized = text.strip()
# Remove control characters
sanitized = ''.join(char for char in sanitized if ord(char) >= 32)
# Limit length
max_length = 4000
if len(sanitized) > max_length:
sanitized = sanitized[:max_length]
return sanitized
@staticmethod
def sanitize_llm_output(text: str) -> str:
"""Sanitize LLM output before sending to Slack"""
# Remove any potential Slack tokens or secrets
import re
# Remove anything that looks like a token
text = re.sub(r'xox[baprs]-[0-9a-zA-Z-]+', '[REDACTED]', text)
# Remove email addresses
text = re.sub(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '[EMAIL]', text)
# Escape special Slack characters if needed
# But preserve formatting
return text
# Secure configuration
class SecureConfig:
def __init__(self):
self.required_env_vars = [
"SLACK_SIGNING_SECRET",
"SLACK_BOT_TOKEN",
"OPENAI_API_KEY",
"REDIS_URL"
]
self.validate_environment()
def validate_environment(self):
"""Ensure all required environment variables are set"""
missing = []
for var in self.required_env_vars:
if not os.environ.get(var):
missing.append(var)
if missing:
raise ValueError(f"Missing required environment variables: {missing}")
@property
def slack_signing_secret(self) -> str:
return os.environ["SLACK_SIGNING_SECRET"]
@property
def slack_bot_token(self) -> str:
return os.environ["SLACK_BOT_TOKEN"]
@property
def openai_api_key(self) -> str:
return os.environ["OPENAI_API_KEY"]
# Audit logging
class AuditLogger:
def __init__(self, logger):
self.logger = logger
def log_interaction(
self,
user_id: str,
channel: str,
input_text: str,
output_text: str,
metadata: Dict[str, Any] = None
):
"""Log user interactions for audit trail"""
self.logger.info(
"User interaction",
extra={
"user_id": user_id,
"channel": channel,
"input_length": len(input_text),
"output_length": len(output_text),
"timestamp": datetime.now().isoformat(),
"metadata": metadata or {}
}
)
def log_error(self, error: Exception, context: Dict[str, Any]):
"""Log errors with context"""
self.logger.error(
f"Error: {str(error)}",
extra={
"error_type": type(error).__name__,
"context": context,
"timestamp": datetime.now().isoformat()
}
)Security Checklist
- ✓ Never expose tokens in logs or responses
- ✓ Verify all Slack requests with signing secret
- ✓ Sanitize all user inputs before processing
- ✓ Use environment variables for secrets
- ✓ Implement request timeouts
- ✓ Log security events for audit
- ✓ Regularly rotate API keys
- ✓ Use HTTPS for all external calls
10. Deployment Options
Docker Deployment
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Create non-root user
RUN useradd -m -u 1000 slackbot && chown -R slackbot:slackbot /app
USER slackbot
# Run the bot
CMD ["python", "bot.py"]Docker Compose
# docker-compose.yml
version: '3.8'
services:
slack-bot:
build: .
environment:
- SLACK_SIGNING_SECRET=${SLACK_SIGNING_SECRET}
- SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN}
- SLACK_APP_TOKEN=${SLACK_APP_TOKEN}
- OPENAI_API_KEY=${OPENAI_API_KEY}
- REDIS_URL=redis://redis:6379
depends_on:
- redis
restart: unless-stopped
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
redis:
image: redis:7-alpine
command: redis-server --appendonly yes
volumes:
- redis_data:/data
restart: unless-stopped
volumes:
redis_data:AWS Lambda Deployment
# lambda_handler.py
from slack_bolt import App
from slack_bolt.adapter.aws_lambda import SlackRequestHandler
import os
# Initialize app
app = App(
token=os.environ["SLACK_BOT_TOKEN"],
signing_secret=os.environ["SLACK_SIGNING_SECRET"],
process_before_response=True # Important for Lambda
)
# Register handlers (import from your modules)
from handlers import register_handlers
register_handlers(app)
# Lambda handler
def lambda_handler(event, context):
slack_handler = SlackRequestHandler(app=app)
return slack_handler.handle(event, context)
# serverless.yml
service: slack-llm-bot
provider:
name: aws
runtime: python3.11
region: us-east-1
timeout: 30
environment:
SLACK_BOT_TOKEN: ${env:SLACK_BOT_TOKEN}
SLACK_SIGNING_SECRET: ${env:SLACK_SIGNING_SECRET}
OPENAI_API_KEY: ${env:OPENAI_API_KEY}
functions:
slack:
handler: lambda_handler.lambda_handler
events:
- http:
path: slack/events
method: post
cors: true
layers:
- arn:aws:lambda:${aws:region}:xxx:layer:slack-bolt-python:1
plugins:
- serverless-python-requirements
custom:
pythonRequirements:
dockerizePip: trueKubernetes Deployment
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: slack-llm-bot
spec:
replicas: 2
selector:
matchLabels:
app: slack-llm-bot
template:
metadata:
labels:
app: slack-llm-bot
spec:
containers:
- name: bot
image: your-registry/slack-llm-bot:latest
env:
- name: SLACK_BOT_TOKEN
valueFrom:
secretKeyRef:
name: slack-secrets
key: bot-token
- name: SLACK_SIGNING_SECRET
valueFrom:
secretKeyRef:
name: slack-secrets
key: signing-secret
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: llm-secrets
key: openai-key
- name: REDIS_URL
value: redis://redis-service:6379
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 3000
initialDelaySeconds: 30
periodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
name: slack-bot-service
spec:
selector:
app: slack-llm-bot
ports:
- port: 80
targetPort: 3000
type: LoadBalancerProduction Monitoring
# monitoring.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
import time
# Metrics
message_counter = Counter(
'slack_bot_messages_total',
'Total messages processed',
['event_type', 'status']
)
response_time = Histogram(
'slack_bot_response_time_seconds',
'Response time in seconds',
['command']
)
active_conversations = Gauge(
'slack_bot_active_conversations',
'Number of active conversations'
)
llm_tokens_used = Counter(
'slack_bot_llm_tokens_total',
'Total LLM tokens used',
['model', 'operation']
)
# Monitoring decorator
def monitor_performance(operation: str):
def decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
message_counter.labels(
event_type=operation,
status='success'
).inc()
return result
except Exception as e:
message_counter.labels(
event_type=operation,
status='error'
).inc()
raise
finally:
response_time.labels(command=operation).observe(
time.time() - start_time
)
return wrapper
return decorator
# Health check endpoint
@app.route("/health")
def health_check():
return {"status": "healthy", "timestamp": datetime.now().isoformat()}
@app.route("/metrics")
def metrics():
return generate_latest()Complete Bot Example
# main.py - Complete Slack LLM Bot
import os
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
import openai
import redis
from datetime import datetime
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize services
app = App(
token=os.environ["SLACK_BOT_TOKEN"],
signing_secret=os.environ["SLACK_SIGNING_SECRET"]
)
openai.api_key = os.environ["OPENAI_API_KEY"]
redis_client = redis.from_url(os.environ["REDIS_URL"])
# Initialize managers
from context_management import ConversationContextManager
from rate_limiting import RateLimiter, UserRateLimiter
from file_handling import FileHandler
from security import InputSanitizer, AuditLogger
context_manager = ConversationContextManager(redis_client)
rate_limiter = RateLimiter(redis_client)
user_limiter = UserRateLimiter(redis_client)
file_handler = FileHandler(os.environ["SLACK_BOT_TOKEN"])
sanitizer = InputSanitizer()
audit_logger = AuditLogger(logger)
# Main event handlers
@app.event("app_mention")
def handle_app_mention(event, say, client):
"""Handle when someone mentions the bot"""
user_id = event["user"]
text = sanitizer.sanitize_user_input(event["text"])
thread_ts = event.get("thread_ts", event["ts"])
# Check rate limits
allowed, message = user_limiter.check_user_limit(user_id)
if not allowed:
say(message, thread_ts=thread_ts)
return
try:
# Get conversation context
conversation_id = f"{event['channel']}:{thread_ts}"
context_manager.add_message(conversation_id, "human", text)
# Generate response
memory = context_manager.get_or_create_memory(conversation_id)
messages = [
{
"role": "system",
"content": "You are a helpful Slack bot powered by ParrotRouter."
}
]
# Add conversation history
for msg in memory.chat_memory.messages[-10:]:
role = "user" if msg.type == "human" else "assistant"
messages.append({"role": role, "content": msg.content})
# Generate response
response = openai.ChatCompletion.create(
model="gpt-4",
messages=messages,
temperature=0.7,
max_tokens=500
)
ai_response = sanitizer.sanitize_llm_output(
response.choices[0].message.content
)
# Add to context
context_manager.add_message(conversation_id, "ai", ai_response)
# Log interaction
audit_logger.log_interaction(
user_id=user_id,
channel=event["channel"],
input_text=text,
output_text=ai_response
)
# Send response
say(text=ai_response, thread_ts=thread_ts)
except Exception as e:
logger.error(f"Error in app_mention: {e}")
audit_logger.log_error(e, {"event": event})
say("Sorry, I encountered an error. Please try again.", thread_ts=thread_ts)
# Slash commands
@app.command("/ask")
def handle_ask(ack, respond, command):
ack()
text = sanitizer.sanitize_user_input(command["text"])
if not text:
respond("Please provide a question after /ask")
return
try:
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": "Answer concisely."},
{"role": "user", "content": text}
],
max_tokens=300
)
answer = sanitizer.sanitize_llm_output(
response.choices[0].message.content
)
respond(blocks=[
{
"type": "section",
"text": {"type": "mrkdwn", "text": f"*Q:* {text}"}
},
{"type": "divider"},
{
"type": "section",
"text": {"type": "mrkdwn", "text": f"*A:* {answer}"}
}
])
except Exception as e:
logger.error(f"Error in /ask: {e}")
respond("Sorry, I couldn't process your question.")
# Start the bot
if __name__ == "__main__":
# Socket Mode
handler = SocketModeHandler(app, os.environ["SLACK_APP_TOKEN"])
handler.start()
# Or use Flask for HTTP mode
# from flask import Flask, request
# from slack_bolt.adapter.flask import SlackRequestHandler
#
# flask_app = Flask(__name__)
# handler = SlackRequestHandler(app)
#
# @flask_app.route("/slack/events", methods=["POST"])
# def slack_events():
# return handler.handle(request)
#
# flask_app.run(port=3000)References & Citations
Ready to Build Your Slack Bot?
Create intelligent Slack bots powered by LLMs using ParrotRouter's unified API gateway.
References
- [1] AWS. "Lambda Documentation" (2024)
- [2] Vercel. "Streaming Responses" (2024)
- [3] GitHub. "OpenAI Node.js Library" (2024)