Quick Start
Create a new Slack bot with LLM integration:
# Python setup pip install slack-bolt openai langchain redis pip install python-dotenv aiohttp # Node.js setup npm install @slack/bolt openai langchain redis npm install dotenv express
1. Slack App Setup & Permissions
Create Slack App
- Go to api.slack.com/apps
- Click "Create New App" → "From scratch"
- Name your app and select workspace
- Navigate to "OAuth & Permissions"
Required Bot Token Scopes
# OAuth & Permissions → Bot Token Scopes app_mentions:read # Read messages that mention your bot channels:history # View messages in public channels channels:join # Join public channels channels:read # View basic channel info chat:write # Send messages files:read # Access files shared in conversations files:write # Upload files groups:history # View messages in private channels groups:read # View basic private channel info im:history # View direct messages im:read # View basic DM info im:write # Send direct messages mpim:history # View group DM messages mpim:read # View basic group DM info mpim:write # Send group DM messages reactions:write # Add emoji reactions users:read # View user info commands # Add slash commands
App Configuration
// config/slack-config.ts export interface SlackConfig { signingSecret: string botToken: string appToken: string clientId: string clientSecret: string stateSecret: string } export const slackConfig: SlackConfig = { signingSecret: process.env.SLACK_SIGNING_SECRET!, botToken: process.env.SLACK_BOT_TOKEN!, appToken: process.env.SLACK_APP_TOKEN!, clientId: process.env.SLACK_CLIENT_ID!, clientSecret: process.env.SLACK_CLIENT_SECRET!, stateSecret: process.env.SLACK_STATE_SECRET!, } // Environment variables (.env) SLACK_SIGNING_SECRET=your_signing_secret SLACK_BOT_TOKEN=xoxb-your-bot-token SLACK_APP_TOKEN=xapp-your-app-token SLACK_CLIENT_ID=your_client_id SLACK_CLIENT_SECRET=your_client_secret OPENAI_API_KEY=sk-... PARROTROUTER_API_KEY=your_api_key
2. Event Handling
Python Implementation with Slack Bolt
# bot.py from slack_bolt import App from slack_bolt.adapter.socket_mode import SocketModeHandler import openai from typing import Dict, Any import re # Initialize app app = App(token=SLACK_BOT_TOKEN, signing_secret=SLACK_SIGNING_SECRET) openai.api_key = OPENAI_API_KEY # Handle app mentions @app.event("app_mention") def handle_app_mention(event: Dict[str, Any], say, client, logger): """Handle when someone mentions the bot""" try: # Extract user and text user = event["user"] text = event["text"] channel = event["channel"] thread_ts = event.get("thread_ts", event["ts"]) # Remove bot mention from text bot_user_id = client.auth_test()["user_id"] cleaned_text = re.sub(f'<@{bot_user_id}>', '', text).strip() # Generate LLM response response = generate_llm_response(cleaned_text, user) # Reply in thread say( text=response, thread_ts=thread_ts, channel=channel ) except Exception as e: logger.error(f"Error handling mention: {e}") say("Sorry, I encountered an error processing your request.") # Handle direct messages @app.event("message") def handle_direct_message(event: Dict[str, Any], say, client, logger): """Handle direct messages to the bot""" # Skip if it's a bot message or thread broadcast if event.get("subtype") or event.get("bot_id"): return # Check if it's a DM channel_info = client.conversations_info(channel=event["channel"]) is_dm = channel_info["channel"]["is_im"] if is_dm: user = event["user"] text = event["text"] # Generate response response = generate_llm_response(text, user) # Reply say(response) # Handle message in channels (optional) @app.event("message") def handle_channel_message(event: Dict[str, Any], say, client, logger): """Handle messages in channels where bot is present""" # Skip if not mentioned or if it's a bot message if event.get("subtype") or event.get("bot_id"): return # Check if bot is mentioned in the message bot_user_id = client.auth_test()["user_id"] if f"<@{bot_user_id}>" in event.get("text", ""): # Already handled by app_mention event return # Optional: Respond to specific keywords text = event.get("text", "").lower() if "help" in text and "bot" in text: say( text="Need help? Just mention me with your question!", thread_ts=event["ts"] ) def generate_llm_response(text: str, user_id: str) -> str: """Generate response using LLM""" try: # Get user info for personalization user_info = app.client.users_info(user=user_id) user_name = user_info["user"]["real_name"] # Create conversation with context messages = [ { "role": "system", "content": f"""You are a helpful Slack bot assistant powered by ParrotRouter. You're talking to {user_name}. Be friendly, concise, and helpful. Format your responses using Slack markdown when appropriate.""" }, { "role": "user", "content": text } ] # Call LLM API response = openai.ChatCompletion.create( model="gpt-4", messages=messages, temperature=0.7, max_tokens=500 ) return response.choices[0].message.content except Exception as e: logger.error(f"LLM Error: {e}") return "I'm having trouble generating a response right now. Please try again later."
Node.js Implementation
// bot.ts import { App, LogLevel } from '@slack/bolt' import { WebClient } from '@slack/web-api' import OpenAI from 'openai' const app = new App({ token: process.env.SLACK_BOT_TOKEN, signingSecret: process.env.SLACK_SIGNING_SECRET, socketMode: true, appToken: process.env.SLACK_APP_TOKEN, logLevel: LogLevel.DEBUG, }) const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY, }) // Handle app mentions app.event('app_mention', async ({ event, context, client, say }) => { try { const { user, text, channel, thread_ts, ts } = event // Remove bot mention const botUserId = context.botUserId const cleanedText = text.replace(new RegExp(`<@${botUserId}>`, 'g'), '').trim() // Generate response const response = await generateLLMResponse(cleanedText, user) // Reply in thread await say({ text: response, thread_ts: thread_ts || ts, channel, }) } catch (error) { console.error('Error handling mention:', error) await say('Sorry, I encountered an error processing your request.') } }) // Handle direct messages app.message(async ({ message, say, client }) => { // Skip bot messages if (message.subtype || message.bot_id) return try { // Check if DM const channelInfo = await client.conversations.info({ channel: message.channel, }) if (channelInfo.channel?.is_im) { const response = await generateLLMResponse(message.text!, message.user!) await say(response) } } catch (error) { console.error('Error handling DM:', error) } }) async function generateLLMResponse(text: string, userId: string): Promise<string> { try { // Get user info const userInfo = await app.client.users.info({ user: userId }) const userName = userInfo.user?.real_name || 'there' const completion = await openai.chat.completions.create({ model: 'gpt-4', messages: [ { role: 'system', content: `You are a helpful Slack bot assistant powered by ParrotRouter. You're talking to ${userName}. Be friendly, concise, and helpful. Use Slack markdown for formatting.`, }, { role: 'user', content: text, }, ], temperature: 0.7, max_tokens: 500, }) return completion.choices[0].message.content || 'I couldn't generate a response.' } catch (error) { console.error('LLM Error:', error) return 'I'm having trouble generating a response right now.' } }
3. Slash Commands
Command Registration
In your Slack app settings, go to "Slash Commands" and create commands:
/ask [question]
- Ask the AI a question/summarize [url or text]
- Summarize content/help
- Show available commands
Command Implementation
# slash_commands.py from slack_bolt import App from slack_sdk.models.blocks import SectionBlock, DividerBlock, MarkdownTextObject import asyncio from typing import Dict, Any @app.command("/ask") def handle_ask_command(ack, respond, command): """Handle /ask command""" # Acknowledge command request within 3 seconds ack() user_id = command["user_id"] text = command["text"] if not text: respond("Please provide a question after /ask") return # Show loading message respond("🤔 Thinking...") try: # Generate response response = generate_llm_response(text, user_id) # Format response with blocks blocks = [ { "type": "section", "text": { "type": "mrkdwn", "text": f"*Question:* {text}" } }, {"type": "divider"}, { "type": "section", "text": { "type": "mrkdwn", "text": f"*Answer:*\n{response}" } } ] # Update with formatted response respond(blocks=blocks, replace_original=True) except Exception as e: respond(f"Error: {str(e)}", replace_original=True) @app.command("/summarize") async def handle_summarize_command(ack, respond, command, client): """Handle /summarize command""" ack() text = command["text"] if not text: respond("Please provide text or a URL to summarize") return # Check if it's a URL url_pattern = re.compile(r'https?://\S+') urls = url_pattern.findall(text) try: if urls: # Handle URL summarization content = await fetch_url_content(urls[0]) summary = await summarize_content(content) else: # Summarize provided text summary = await summarize_content(text) blocks = [ { "type": "header", "text": { "type": "plain_text", "text": "📄 Summary" } }, { "type": "section", "text": { "type": "mrkdwn", "text": summary } } ] respond(blocks=blocks) except Exception as e: respond(f"Error summarizing: {str(e)}") @app.command("/help") def handle_help_command(ack, respond): """Handle /help command""" ack() help_text = """ *Available Commands:* • `/ask [question]` - Ask me anything • `/summarize [text or URL]` - Get a summary of text or webpage • `/help` - Show this help message *How to use me:* • Mention me in any channel with @bot-name • Send me a direct message • Use slash commands for specific tasks *Tips:* • I remember context within threads • I can analyze images if you share them • I format code with ```language blocks """ blocks = [ { "type": "header", "text": { "type": "plain_text", "text": "🤖 Bot Help" } }, { "type": "section", "text": { "type": "mrkdwn", "text": help_text } } ] respond(blocks=blocks)
4. Interactive Components
Interactive Messages with Buttons
# interactive_components.py @app.command("/feedback") def handle_feedback_command(ack, respond, command): """Handle feedback command with interactive buttons""" ack() blocks = [ { "type": "section", "text": { "type": "mrkdwn", "text": "How would you rate my response?" } }, { "type": "actions", "elements": [ { "type": "button", "text": { "type": "plain_text", "text": "👍 Good" }, "style": "primary", "action_id": "feedback_good", "value": "good" }, { "type": "button", "text": { "type": "plain_text", "text": "👎 Bad" }, "style": "danger", "action_id": "feedback_bad", "value": "bad" } ] } ] respond(blocks=blocks) @app.action("feedback_good") def handle_good_feedback(ack, body, respond): """Handle positive feedback""" ack() user = body["user"]["id"] # Log feedback log_feedback(user, "good", body.get("message", {}).get("ts")) # Update message respond( text="Thanks for the positive feedback! 🎉", replace_original=True ) @app.action("feedback_bad") def handle_bad_feedback(ack, body, client): """Handle negative feedback - open modal for details""" ack() client.views_open( trigger_id=body["trigger_id"], view={ "type": "modal", "callback_id": "feedback_modal", "title": { "type": "plain_text", "text": "Feedback Details" }, "submit": { "type": "plain_text", "text": "Submit" }, "close": { "type": "plain_text", "text": "Cancel" }, "blocks": [ { "type": "input", "block_id": "feedback_input", "element": { "type": "plain_text_input", "action_id": "feedback_text", "multiline": True, "placeholder": { "type": "plain_text", "text": "What could I improve?" } }, "label": { "type": "plain_text", "text": "Your Feedback" } } ] } )
Modal Handling
@app.view("feedback_modal") def handle_feedback_submission(ack, body, view, client): """Handle modal submission""" ack() # Extract values user = body["user"]["id"] feedback = view["state"]["values"]["feedback_input"]["feedback_text"]["value"] # Store feedback store_feedback(user, feedback) # Send confirmation DM client.chat_postMessage( channel=user, text=f"Thank you for your feedback! We'll use it to improve: \n\n_{feedback}_" ) # Select Menu Example @app.command("/settings") def show_settings(ack, respond, command): """Show settings with select menu""" ack() blocks = [ { "type": "section", "text": { "type": "mrkdwn", "text": "Configure your bot preferences:" }, "accessory": { "type": "static_select", "placeholder": { "type": "plain_text", "text": "Select model" }, "action_id": "model_select", "options": [ { "text": { "type": "plain_text", "text": "GPT-4 (Most capable)" }, "value": "gpt-4" }, { "text": { "type": "plain_text", "text": "GPT-3.5 (Faster)" }, "value": "gpt-3.5-turbo" }, { "text": { "type": "plain_text", "text": "Claude 3 (Anthropic)" }, "value": "claude-3" } ] } } ] respond(blocks=blocks) @app.action("model_select") def handle_model_selection(ack, body, client): """Handle model selection""" ack() selected_model = body["actions"][0]["selected_option"]["value"] user_id = body["user"]["id"] # Save user preference save_user_preference(user_id, "model", selected_model) # Send confirmation client.chat_postMessage( channel=user_id, text=f"✅ Model preference updated to: {selected_model}" )
5. Thread Management
Conversation Threading
# thread_management.py from typing import List, Dict, Any import redis import json class ThreadManager: def __init__(self, redis_client: redis.Redis): self.redis = redis_client self.ttl = 3600 * 24 # 24 hours def get_thread_context(self, thread_ts: str) -> List[Dict[str, Any]]: """Get conversation history for a thread""" key = f"thread:{thread_ts}" data = self.redis.get(key) if data: return json.loads(data) return [] def add_to_thread(self, thread_ts: str, role: str, content: str): """Add message to thread context""" context = self.get_thread_context(thread_ts) context.append({ "role": role, "content": content, "timestamp": datetime.now().isoformat() }) # Keep only last 10 messages for context if len(context) > 10: context = context[-10:] self.redis.setex( f"thread:{thread_ts}", self.ttl, json.dumps(context) ) def clear_thread(self, thread_ts: str): """Clear thread context""" self.redis.delete(f"thread:{thread_ts}") # Initialize thread manager thread_manager = ThreadManager(redis.from_url(REDIS_URL)) @app.event("app_mention") def handle_mention_with_context(event, say, client): """Handle mentions with thread context""" user = event["user"] text = event["text"] thread_ts = event.get("thread_ts", event["ts"]) # Get thread context context = thread_manager.get_thread_context(thread_ts) # Add current message to context thread_manager.add_to_thread(thread_ts, "user", text) # Generate response with context response = generate_contextual_response(text, context, user) # Add response to context thread_manager.add_to_thread(thread_ts, "assistant", response) # Reply in thread say( text=response, thread_ts=thread_ts ) def generate_contextual_response( message: str, context: List[Dict[str, Any]], user_id: str ) -> str: """Generate response with conversation context""" # Build messages for LLM messages = [ { "role": "system", "content": """You are a helpful Slack bot. You have access to the conversation history. Maintain context and refer to previous messages when relevant.""" } ] # Add context messages for ctx_msg in context[:-1]: # Exclude current message messages.append({ "role": ctx_msg["role"], "content": ctx_msg["content"] }) # Add current message messages.append({ "role": "user", "content": message }) # Generate response response = openai.ChatCompletion.create( model="gpt-4", messages=messages, temperature=0.7 ) return response.choices[0].message.content # Thread summary command @app.command("/summarize_thread") def summarize_thread(ack, respond, command, client): """Summarize a conversation thread""" ack() # Get thread messages channel = command["channel_id"] # Find most recent thread in channel conversations = client.conversations_history( channel=channel, limit=20 ) thread_ts = None for message in conversations["messages"]: if "thread_ts" in message: thread_ts = message["thread_ts"] break if not thread_ts: respond("No recent threads found in this channel") return # Get thread replies thread_messages = client.conversations_replies( channel=channel, ts=thread_ts ) # Create summary summary_prompt = "Summarize this Slack conversation:\n\n" for msg in thread_messages["messages"]: summary_prompt += f"{msg.get('user', 'Unknown')}: {msg.get('text', '')}\n" summary = generate_llm_response(summary_prompt, command["user_id"]) respond(f"*Thread Summary:*\n{summary}")
6. Rate Limiting
Rate Limiter Implementation
# rate_limiting.py from typing import Dict, Tuple import time import redis from functools import wraps class RateLimiter: def __init__(self, redis_client: redis.Redis): self.redis = redis_client # Slack API limits self.slack_limits = { "chat.postMessage": (1, 1), # 1 per second "conversations.history": (50, 60), # 50 per minute "users.info": (100, 60), # 100 per minute } # LLM API limits (example) self.llm_limits = { "openai": (60, 60), # 60 per minute "anthropic": (50, 60), # 50 per minute } def check_rate_limit(self, key: str, max_requests: int, window: int) -> Tuple[bool, int]: """Check if request is within rate limit""" now = time.time() pipeline = self.redis.pipeline() pipeline.zremrangebyscore(key, 0, now - window) pipeline.zadd(key, {str(now): now}) pipeline.zcount(key, now - window, now) pipeline.expire(key, window) results = pipeline.execute() current_requests = results[2] if current_requests > max_requests: # Calculate wait time oldest = self.redis.zrange(key, 0, 0, withscores=True) if oldest: wait_time = int(oldest[0][1] + window - now) + 1 return False, wait_time return False, window return True, 0 def rate_limit_slack(self, method: str): """Decorator for Slack API rate limiting""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): if method in self.slack_limits: limit, window = self.slack_limits[method] key = f"slack_rate:{method}" allowed, wait_time = self.check_rate_limit(key, limit, window) if not allowed: time.sleep(wait_time) return func(*args, **kwargs) return wrapper return decorator def rate_limit_llm(self, provider: str): """Decorator for LLM API rate limiting""" def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): if provider in self.llm_limits: limit, window = self.llm_limits[provider] key = f"llm_rate:{provider}" allowed, wait_time = self.check_rate_limit(key, limit, window) if not allowed: await asyncio.sleep(wait_time) return await func(*args, **kwargs) return wrapper return decorator # Initialize rate limiter rate_limiter = RateLimiter(redis.from_url(REDIS_URL)) # Usage example @rate_limiter.rate_limit_slack("chat.postMessage") def send_message(channel: str, text: str): """Rate-limited message sending""" return app.client.chat_postMessage( channel=channel, text=text ) @rate_limiter.rate_limit_llm("openai") async def call_openai(messages: List[Dict]): """Rate-limited OpenAI call""" return await openai.ChatCompletion.acreate( model="gpt-4", messages=messages ) # User-level rate limiting class UserRateLimiter: def __init__(self, redis_client: redis.Redis): self.redis = redis_client self.user_limits = { "default": (10, 60), # 10 requests per minute "premium": (100, 60), # 100 requests per minute } def check_user_limit(self, user_id: str, user_type: str = "default") -> Tuple[bool, str]: """Check user-specific rate limits""" limit, window = self.user_limits.get(user_type, self.user_limits["default"]) key = f"user_rate:{user_id}" allowed, wait_time = rate_limiter.check_rate_limit(key, limit, window) if not allowed: return False, f"Rate limit exceeded. Please wait {wait_time} seconds." return True, "" user_limiter = UserRateLimiter(redis.from_url(REDIS_URL)) @app.event("app_mention") def handle_mention_with_rate_limit(event, say): """Handle mention with user rate limiting""" user_id = event["user"] # Check user rate limit allowed, message = user_limiter.check_user_limit(user_id) if not allowed: say(message, thread_ts=event.get("thread_ts", event["ts"])) return # Process request handle_app_mention(event, say)
Rate Limiting Best Practices
- • Implement exponential backoff for retries
- • Cache user information to reduce API calls
- • Use Slack's rate limit headers to adjust dynamically
- • Queue non-urgent messages for batch processing
- • Monitor rate limit usage and alert on high usage
7. Context Management
Advanced Context Manager
# context_management.py from typing import List, Dict, Any, Optional import redis import json from datetime import datetime, timedelta from langchain.memory import ConversationBufferWindowMemory from langchain.schema import BaseMessage, HumanMessage, AIMessage class ConversationContextManager: def __init__(self, redis_client: redis.Redis): self.redis = redis_client self.memories = {} # In-memory cache for active conversations self.ttl = 3600 * 24 # 24 hours self.max_messages = 20 # Maximum messages to keep def get_or_create_memory(self, conversation_id: str) -> ConversationBufferWindowMemory: """Get or create conversation memory""" if conversation_id not in self.memories: # Try to load from Redis stored_messages = self.load_conversation(conversation_id) memory = ConversationBufferWindowMemory( k=self.max_messages, return_messages=True ) # Restore messages for msg in stored_messages: if msg["role"] == "human": memory.chat_memory.add_user_message(msg["content"]) elif msg["role"] == "ai": memory.chat_memory.add_ai_message(msg["content"]) self.memories[conversation_id] = memory return self.memories[conversation_id] def add_message( self, conversation_id: str, role: str, content: str, metadata: Optional[Dict] = None ): """Add message to conversation""" memory = self.get_or_create_memory(conversation_id) if role == "human": memory.chat_memory.add_user_message(content) elif role == "ai": memory.chat_memory.add_ai_message(content) # Save to Redis self.save_conversation(conversation_id, memory, metadata) def save_conversation( self, conversation_id: str, memory: ConversationBufferWindowMemory, metadata: Optional[Dict] = None ): """Save conversation to Redis""" messages = [] for message in memory.chat_memory.messages: msg_dict = { "role": "human" if isinstance(message, HumanMessage) else "ai", "content": message.content, "timestamp": datetime.now().isoformat() } if metadata: msg_dict["metadata"] = metadata messages.append(msg_dict) key = f"conversation:{conversation_id}" self.redis.setex( key, self.ttl, json.dumps(messages) ) def load_conversation(self, conversation_id: str) -> List[Dict]: """Load conversation from Redis""" key = f"conversation:{conversation_id}" data = self.redis.get(key) if data: return json.loads(data) return [] def get_context_summary(self, conversation_id: str) -> str: """Get a summary of the conversation context""" memory = self.get_or_create_memory(conversation_id) messages = memory.chat_memory.messages if not messages: return "No previous context." # Create a brief summary summary = f"Previous {len(messages)} messages in conversation:\n" for i, msg in enumerate(messages[-5:]): # Last 5 messages role = "User" if isinstance(msg, HumanMessage) else "Assistant" summary += f"{role}: {msg.content[:100]}...\n" return summary def clear_old_conversations(self, days: int = 7): """Clear conversations older than specified days""" pattern = "conversation:*" cursor = 0 while True: cursor, keys = self.redis.scan(cursor, match=pattern) for key in keys: data = self.redis.get(key) if data: messages = json.loads(data) if messages: last_timestamp = messages[-1].get("timestamp") if last_timestamp: last_date = datetime.fromisoformat(last_timestamp) if datetime.now() - last_date > timedelta(days=days): self.redis.delete(key) if cursor == 0: break # Initialize context manager context_manager = ConversationContextManager(redis.from_url(REDIS_URL)) # Integration with bot @app.event("app_mention") def handle_mention_with_context(event, say): """Handle mention with full context management""" user_id = event["user"] channel = event["channel"] text = event["text"] thread_ts = event.get("thread_ts", event["ts"]) # Create conversation ID (channel + thread) conversation_id = f"{channel}:{thread_ts}" # Add user message to context context_manager.add_message( conversation_id, "human", text, metadata={"user_id": user_id, "channel": channel} ) # Get conversation memory memory = context_manager.get_or_create_memory(conversation_id) # Generate response with context messages = [ { "role": "system", "content": "You are a helpful Slack bot assistant powered by ParrotRouter." } ] # Add conversation history for msg in memory.chat_memory.messages: if isinstance(msg, HumanMessage): messages.append({"role": "user", "content": msg.content}) else: messages.append({"role": "assistant", "content": msg.content}) # Generate response response = openai.ChatCompletion.create( model="gpt-4", messages=messages, temperature=0.7 ) ai_response = response.choices[0].message.content # Add AI response to context context_manager.add_message(conversation_id, "ai", ai_response) # Send response say(text=ai_response, thread_ts=thread_ts)
8. File & Image Handling
File Processing
# file_handling.py import requests from typing import Optional, Dict, Any import mimetypes import base64 from PIL import Image import io import PyPDF2 import docx class FileHandler: def __init__(self, bot_token: str): self.bot_token = bot_token self.headers = {"Authorization": f"Bearer {bot_token}"} def download_file(self, file_url: str) -> bytes: """Download file from Slack""" response = requests.get(file_url, headers=self.headers) response.raise_for_status() return response.content def process_file(self, file_info: Dict[str, Any]) -> Dict[str, Any]: """Process different file types""" file_url = file_info["url_private"] mimetype = file_info["mimetype"] filename = file_info["name"] # Download file file_content = self.download_file(file_url) result = { "filename": filename, "mimetype": mimetype, "size": file_info["size"] } # Process based on file type if mimetype.startswith("image/"): result.update(self.process_image(file_content, mimetype)) elif mimetype == "application/pdf": result.update(self.process_pdf(file_content)) elif mimetype in ["application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/msword"]: result.update(self.process_word(file_content)) elif mimetype.startswith("text/"): result.update(self.process_text(file_content)) else: result["error"] = f"Unsupported file type: {mimetype}" return result def process_image(self, content: bytes, mimetype: str) -> Dict[str, Any]: """Process image for vision models""" # Open image image = Image.open(io.BytesIO(content)) # Resize if too large max_size = (1024, 1024) if image.size[0] > max_size[0] or image.size[1] > max_size[1]: image.thumbnail(max_size, Image.LANCZOS) # Convert to base64 buffered = io.BytesIO() image.save(buffered, format=image.format or "PNG") base64_image = base64.b64encode(buffered.getvalue()).decode() return { "type": "image", "base64": base64_image, "dimensions": image.size, "mode": image.mode } def process_pdf(self, content: bytes) -> Dict[str, Any]: """Extract text from PDF""" pdf_reader = PyPDF2.PdfReader(io.BytesIO(content)) text = "" for page_num in range(len(pdf_reader.pages)): page = pdf_reader.pages[page_num] text += page.extract_text() + "\n" return { "type": "pdf", "text": text.strip(), "pages": len(pdf_reader.pages) } def process_word(self, content: bytes) -> Dict[str, Any]: """Extract text from Word document""" doc = docx.Document(io.BytesIO(content)) text = "\n".join([paragraph.text for paragraph in doc.paragraphs]) return { "type": "word", "text": text.strip(), "paragraphs": len(doc.paragraphs) } def process_text(self, content: bytes) -> Dict[str, Any]: """Process text file""" try: text = content.decode('utf-8') except UnicodeDecodeError: text = content.decode('latin-1') return { "type": "text", "text": text } # Initialize file handler file_handler = FileHandler(SLACK_BOT_TOKEN) @app.event("file_shared") def handle_file_shared(event, say, client): """Handle file sharing events""" file_id = event["file_id"] # Get file info file_info = client.files_info(file=file_id)["file"] # Process file try: result = file_handler.process_file(file_info) if "error" in result: say(f"Sorry, I couldn't process this file: {result['error']}") return # Handle based on file type if result["type"] == "image": # Use vision model response = analyze_image_with_llm(result["base64"]) say(f"*Image Analysis:*\n{response}") elif result["type"] in ["pdf", "word", "text"]: # Summarize or answer questions about the document text = result["text"][:4000] # Limit text length summary = summarize_document(text) say(f"*Document Summary:*\n{summary}") # Offer to answer questions say("Feel free to ask me questions about this document!") # Store document in context thread_ts = event.get("thread_ts", event["ts"]) context_manager.add_message( f"{event['channel']}:{thread_ts}", "system", f"Document content: {text}", metadata={"file_id": file_id, "filename": file_info["name"]} ) except Exception as e: logger.error(f"Error processing file: {e}") say("Sorry, I encountered an error processing this file.") def analyze_image_with_llm(base64_image: str) -> str: """Analyze image using vision model""" response = openai.ChatCompletion.create( model="gpt-4-vision-preview", messages=[ { "role": "system", "content": "You are analyzing an image shared in Slack. Provide a helpful description." }, { "role": "user", "content": [ { "type": "text", "text": "What's in this image?" }, { "type": "image_url", "image_url": { "url": f"data:image/png;base64,{base64_image}" } } ] } ], max_tokens=500 ) return response.choices[0].message.content
9. Security Best Practices
Request Verification
# security.py import hmac import hashlib import time from typing import Dict, Any class SlackSecurityHandler: def __init__(self, signing_secret: str): self.signing_secret = signing_secret def verify_request(self, headers: Dict[str, str], body: str) -> bool: """Verify Slack request signature""" timestamp = headers.get("X-Slack-Request-Timestamp", "") signature = headers.get("X-Slack-Signature", "") # Check timestamp to prevent replay attacks if abs(time.time() - float(timestamp)) > 60 * 5: return False # Create signature base string sig_basestring = f"v0:{timestamp}:{body}" # Calculate expected signature my_signature = "v0=" + hmac.new( self.signing_secret.encode(), sig_basestring.encode(), hashlib.sha256 ).hexdigest() # Compare signatures return hmac.compare_digest(my_signature, signature) # Input sanitization class InputSanitizer: @staticmethod def sanitize_user_input(text: str) -> str: """Sanitize user input before processing""" # Remove potential injection attempts sanitized = text.strip() # Remove control characters sanitized = ''.join(char for char in sanitized if ord(char) >= 32) # Limit length max_length = 4000 if len(sanitized) > max_length: sanitized = sanitized[:max_length] return sanitized @staticmethod def sanitize_llm_output(text: str) -> str: """Sanitize LLM output before sending to Slack""" # Remove any potential Slack tokens or secrets import re # Remove anything that looks like a token text = re.sub(r'xox[baprs]-[0-9a-zA-Z-]+', '[REDACTED]', text) # Remove email addresses text = re.sub(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '[EMAIL]', text) # Escape special Slack characters if needed # But preserve formatting return text # Secure configuration class SecureConfig: def __init__(self): self.required_env_vars = [ "SLACK_SIGNING_SECRET", "SLACK_BOT_TOKEN", "OPENAI_API_KEY", "REDIS_URL" ] self.validate_environment() def validate_environment(self): """Ensure all required environment variables are set""" missing = [] for var in self.required_env_vars: if not os.environ.get(var): missing.append(var) if missing: raise ValueError(f"Missing required environment variables: {missing}") @property def slack_signing_secret(self) -> str: return os.environ["SLACK_SIGNING_SECRET"] @property def slack_bot_token(self) -> str: return os.environ["SLACK_BOT_TOKEN"] @property def openai_api_key(self) -> str: return os.environ["OPENAI_API_KEY"] # Audit logging class AuditLogger: def __init__(self, logger): self.logger = logger def log_interaction( self, user_id: str, channel: str, input_text: str, output_text: str, metadata: Dict[str, Any] = None ): """Log user interactions for audit trail""" self.logger.info( "User interaction", extra={ "user_id": user_id, "channel": channel, "input_length": len(input_text), "output_length": len(output_text), "timestamp": datetime.now().isoformat(), "metadata": metadata or {} } ) def log_error(self, error: Exception, context: Dict[str, Any]): """Log errors with context""" self.logger.error( f"Error: {str(error)}", extra={ "error_type": type(error).__name__, "context": context, "timestamp": datetime.now().isoformat() } )
Security Checklist
- ✓ Never expose tokens in logs or responses
- ✓ Verify all Slack requests with signing secret
- ✓ Sanitize all user inputs before processing
- ✓ Use environment variables for secrets
- ✓ Implement request timeouts
- ✓ Log security events for audit
- ✓ Regularly rotate API keys
- ✓ Use HTTPS for all external calls
10. Deployment Options
Docker Deployment
# Dockerfile FROM python:3.11-slim WORKDIR /app # Install system dependencies RUN apt-get update && apt-get install -y \ gcc \ && rm -rf /var/lib/apt/lists/* # Install Python dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Copy application COPY . . # Create non-root user RUN useradd -m -u 1000 slackbot && chown -R slackbot:slackbot /app USER slackbot # Run the bot CMD ["python", "bot.py"]
Docker Compose
# docker-compose.yml version: '3.8' services: slack-bot: build: . environment: - SLACK_SIGNING_SECRET=${SLACK_SIGNING_SECRET} - SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN} - SLACK_APP_TOKEN=${SLACK_APP_TOKEN} - OPENAI_API_KEY=${OPENAI_API_KEY} - REDIS_URL=redis://redis:6379 depends_on: - redis restart: unless-stopped logging: driver: "json-file" options: max-size: "10m" max-file: "3" redis: image: redis:7-alpine command: redis-server --appendonly yes volumes: - redis_data:/data restart: unless-stopped volumes: redis_data:
AWS Lambda Deployment
# lambda_handler.py from slack_bolt import App from slack_bolt.adapter.aws_lambda import SlackRequestHandler import os # Initialize app app = App( token=os.environ["SLACK_BOT_TOKEN"], signing_secret=os.environ["SLACK_SIGNING_SECRET"], process_before_response=True # Important for Lambda ) # Register handlers (import from your modules) from handlers import register_handlers register_handlers(app) # Lambda handler def lambda_handler(event, context): slack_handler = SlackRequestHandler(app=app) return slack_handler.handle(event, context) # serverless.yml service: slack-llm-bot provider: name: aws runtime: python3.11 region: us-east-1 timeout: 30 environment: SLACK_BOT_TOKEN: ${env:SLACK_BOT_TOKEN} SLACK_SIGNING_SECRET: ${env:SLACK_SIGNING_SECRET} OPENAI_API_KEY: ${env:OPENAI_API_KEY} functions: slack: handler: lambda_handler.lambda_handler events: - http: path: slack/events method: post cors: true layers: - arn:aws:lambda:${aws:region}:xxx:layer:slack-bolt-python:1 plugins: - serverless-python-requirements custom: pythonRequirements: dockerizePip: true
Kubernetes Deployment
# k8s/deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: slack-llm-bot spec: replicas: 2 selector: matchLabels: app: slack-llm-bot template: metadata: labels: app: slack-llm-bot spec: containers: - name: bot image: your-registry/slack-llm-bot:latest env: - name: SLACK_BOT_TOKEN valueFrom: secretKeyRef: name: slack-secrets key: bot-token - name: SLACK_SIGNING_SECRET valueFrom: secretKeyRef: name: slack-secrets key: signing-secret - name: OPENAI_API_KEY valueFrom: secretKeyRef: name: llm-secrets key: openai-key - name: REDIS_URL value: redis://redis-service:6379 resources: requests: memory: "256Mi" cpu: "100m" limits: memory: "512Mi" cpu: "500m" livenessProbe: httpGet: path: /health port: 3000 initialDelaySeconds: 30 periodSeconds: 30 --- apiVersion: v1 kind: Service metadata: name: slack-bot-service spec: selector: app: slack-llm-bot ports: - port: 80 targetPort: 3000 type: LoadBalancer
Production Monitoring
# monitoring.py from prometheus_client import Counter, Histogram, Gauge, generate_latest import time # Metrics message_counter = Counter( 'slack_bot_messages_total', 'Total messages processed', ['event_type', 'status'] ) response_time = Histogram( 'slack_bot_response_time_seconds', 'Response time in seconds', ['command'] ) active_conversations = Gauge( 'slack_bot_active_conversations', 'Number of active conversations' ) llm_tokens_used = Counter( 'slack_bot_llm_tokens_total', 'Total LLM tokens used', ['model', 'operation'] ) # Monitoring decorator def monitor_performance(operation: str): def decorator(func): def wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) message_counter.labels( event_type=operation, status='success' ).inc() return result except Exception as e: message_counter.labels( event_type=operation, status='error' ).inc() raise finally: response_time.labels(command=operation).observe( time.time() - start_time ) return wrapper return decorator # Health check endpoint @app.route("/health") def health_check(): return {"status": "healthy", "timestamp": datetime.now().isoformat()} @app.route("/metrics") def metrics(): return generate_latest()
Complete Bot Example
# main.py - Complete Slack LLM Bot import os from slack_bolt import App from slack_bolt.adapter.socket_mode import SocketModeHandler import openai import redis from datetime import datetime import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize services app = App( token=os.environ["SLACK_BOT_TOKEN"], signing_secret=os.environ["SLACK_SIGNING_SECRET"] ) openai.api_key = os.environ["OPENAI_API_KEY"] redis_client = redis.from_url(os.environ["REDIS_URL"]) # Initialize managers from context_management import ConversationContextManager from rate_limiting import RateLimiter, UserRateLimiter from file_handling import FileHandler from security import InputSanitizer, AuditLogger context_manager = ConversationContextManager(redis_client) rate_limiter = RateLimiter(redis_client) user_limiter = UserRateLimiter(redis_client) file_handler = FileHandler(os.environ["SLACK_BOT_TOKEN"]) sanitizer = InputSanitizer() audit_logger = AuditLogger(logger) # Main event handlers @app.event("app_mention") def handle_app_mention(event, say, client): """Handle when someone mentions the bot""" user_id = event["user"] text = sanitizer.sanitize_user_input(event["text"]) thread_ts = event.get("thread_ts", event["ts"]) # Check rate limits allowed, message = user_limiter.check_user_limit(user_id) if not allowed: say(message, thread_ts=thread_ts) return try: # Get conversation context conversation_id = f"{event['channel']}:{thread_ts}" context_manager.add_message(conversation_id, "human", text) # Generate response memory = context_manager.get_or_create_memory(conversation_id) messages = [ { "role": "system", "content": "You are a helpful Slack bot powered by ParrotRouter." } ] # Add conversation history for msg in memory.chat_memory.messages[-10:]: role = "user" if msg.type == "human" else "assistant" messages.append({"role": role, "content": msg.content}) # Generate response response = openai.ChatCompletion.create( model="gpt-4", messages=messages, temperature=0.7, max_tokens=500 ) ai_response = sanitizer.sanitize_llm_output( response.choices[0].message.content ) # Add to context context_manager.add_message(conversation_id, "ai", ai_response) # Log interaction audit_logger.log_interaction( user_id=user_id, channel=event["channel"], input_text=text, output_text=ai_response ) # Send response say(text=ai_response, thread_ts=thread_ts) except Exception as e: logger.error(f"Error in app_mention: {e}") audit_logger.log_error(e, {"event": event}) say("Sorry, I encountered an error. Please try again.", thread_ts=thread_ts) # Slash commands @app.command("/ask") def handle_ask(ack, respond, command): ack() text = sanitizer.sanitize_user_input(command["text"]) if not text: respond("Please provide a question after /ask") return try: response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": "Answer concisely."}, {"role": "user", "content": text} ], max_tokens=300 ) answer = sanitizer.sanitize_llm_output( response.choices[0].message.content ) respond(blocks=[ { "type": "section", "text": {"type": "mrkdwn", "text": f"*Q:* {text}"} }, {"type": "divider"}, { "type": "section", "text": {"type": "mrkdwn", "text": f"*A:* {answer}"} } ]) except Exception as e: logger.error(f"Error in /ask: {e}") respond("Sorry, I couldn't process your question.") # Start the bot if __name__ == "__main__": # Socket Mode handler = SocketModeHandler(app, os.environ["SLACK_APP_TOKEN"]) handler.start() # Or use Flask for HTTP mode # from flask import Flask, request # from slack_bolt.adapter.flask import SlackRequestHandler # # flask_app = Flask(__name__) # handler = SlackRequestHandler(app) # # @flask_app.route("/slack/events", methods=["POST"]) # def slack_events(): # return handler.handle(request) # # flask_app.run(port=3000)
References & Citations
Ready to Build Your Slack Bot?
Create intelligent Slack bots powered by LLMs using ParrotRouter's unified API gateway.
References
- [1] AWS. "Lambda Documentation" (2024)
- [2] Vercel. "Streaming Responses" (2024)
- [3] GitHub. "OpenAI Node.js Library" (2024)