Back to Integrations

Slack Bot with LLM Integration - Complete Guide

Build intelligent Slack bots that leverage LLM capabilities for natural conversations, automated workflows, and enhanced team productivity. This guide covers everything from setup to production deployment.

Quick Start

Create a new Slack bot with LLM integration:

# Python setup
pip install slack-bolt openai langchain redis
pip install python-dotenv aiohttp

# Node.js setup
npm install @slack/bolt openai langchain redis
npm install dotenv express

1. Slack App Setup & Permissions

Create Slack App

  1. Go to api.slack.com/apps
  2. Click "Create New App" → "From scratch"
  3. Name your app and select workspace
  4. Navigate to "OAuth & Permissions"

Required Bot Token Scopes

# OAuth & Permissions → Bot Token Scopes
app_mentions:read     # Read messages that mention your bot
channels:history      # View messages in public channels
channels:join         # Join public channels
channels:read         # View basic channel info
chat:write           # Send messages
files:read           # Access files shared in conversations
files:write          # Upload files
groups:history       # View messages in private channels
groups:read          # View basic private channel info
im:history           # View direct messages
im:read              # View basic DM info
im:write             # Send direct messages
mpim:history         # View group DM messages
mpim:read            # View basic group DM info
mpim:write           # Send group DM messages
reactions:write      # Add emoji reactions
users:read           # View user info
commands             # Add slash commands

App Configuration

// config/slack-config.ts
export interface SlackConfig {
  signingSecret: string
  botToken: string
  appToken: string
  clientId: string
  clientSecret: string
  stateSecret: string
}

export const slackConfig: SlackConfig = {
  signingSecret: process.env.SLACK_SIGNING_SECRET!,
  botToken: process.env.SLACK_BOT_TOKEN!,
  appToken: process.env.SLACK_APP_TOKEN!,
  clientId: process.env.SLACK_CLIENT_ID!,
  clientSecret: process.env.SLACK_CLIENT_SECRET!,
  stateSecret: process.env.SLACK_STATE_SECRET!,
}

// Environment variables (.env)
SLACK_SIGNING_SECRET=your_signing_secret
SLACK_BOT_TOKEN=xoxb-your-bot-token
SLACK_APP_TOKEN=xapp-your-app-token
SLACK_CLIENT_ID=your_client_id
SLACK_CLIENT_SECRET=your_client_secret
OPENAI_API_KEY=sk-...
PARROTROUTER_API_KEY=your_api_key

2. Event Handling

Python Implementation with Slack Bolt

# bot.py
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
import openai
from typing import Dict, Any
import re

# Initialize app
app = App(token=SLACK_BOT_TOKEN, signing_secret=SLACK_SIGNING_SECRET)
openai.api_key = OPENAI_API_KEY

# Handle app mentions
@app.event("app_mention")
def handle_app_mention(event: Dict[str, Any], say, client, logger):
    """Handle when someone mentions the bot"""
    try:
        # Extract user and text
        user = event["user"]
        text = event["text"]
        channel = event["channel"]
        thread_ts = event.get("thread_ts", event["ts"])
        
        # Remove bot mention from text
        bot_user_id = client.auth_test()["user_id"]
        cleaned_text = re.sub(f'<@{bot_user_id}>', '', text).strip()
        
        # Generate LLM response
        response = generate_llm_response(cleaned_text, user)
        
        # Reply in thread
        say(
            text=response,
            thread_ts=thread_ts,
            channel=channel
        )
        
    except Exception as e:
        logger.error(f"Error handling mention: {e}")
        say("Sorry, I encountered an error processing your request.")

# Handle direct messages
@app.event("message")
def handle_direct_message(event: Dict[str, Any], say, client, logger):
    """Handle direct messages to the bot"""
    # Skip if it's a bot message or thread broadcast
    if event.get("subtype") or event.get("bot_id"):
        return
    
    # Check if it's a DM
    channel_info = client.conversations_info(channel=event["channel"])
    is_dm = channel_info["channel"]["is_im"]
    
    if is_dm:
        user = event["user"]
        text = event["text"]
        
        # Generate response
        response = generate_llm_response(text, user)
        
        # Reply
        say(response)

# Handle message in channels (optional)
@app.event("message")
def handle_channel_message(event: Dict[str, Any], say, client, logger):
    """Handle messages in channels where bot is present"""
    # Skip if not mentioned or if it's a bot message
    if event.get("subtype") or event.get("bot_id"):
        return
    
    # Check if bot is mentioned in the message
    bot_user_id = client.auth_test()["user_id"]
    if f"<@{bot_user_id}>" in event.get("text", ""):
        # Already handled by app_mention event
        return
    
    # Optional: Respond to specific keywords
    text = event.get("text", "").lower()
    if "help" in text and "bot" in text:
        say(
            text="Need help? Just mention me with your question!",
            thread_ts=event["ts"]
        )

def generate_llm_response(text: str, user_id: str) -> str:
    """Generate response using LLM"""
    try:
        # Get user info for personalization
        user_info = app.client.users_info(user=user_id)
        user_name = user_info["user"]["real_name"]
        
        # Create conversation with context
        messages = [
            {
                "role": "system",
                "content": f"""You are a helpful Slack bot assistant powered by ParrotRouter. 
                You're talking to {user_name}. Be friendly, concise, and helpful.
                Format your responses using Slack markdown when appropriate."""
            },
            {
                "role": "user",
                "content": text
            }
        ]
        
        # Call LLM API
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=messages,
            temperature=0.7,
            max_tokens=500
        )
        
        return response.choices[0].message.content
        
    except Exception as e:
        logger.error(f"LLM Error: {e}")
        return "I'm having trouble generating a response right now. Please try again later."

Node.js Implementation

// bot.ts
import { App, LogLevel } from '@slack/bolt'
import { WebClient } from '@slack/web-api'
import OpenAI from 'openai'

const app = new App({
  token: process.env.SLACK_BOT_TOKEN,
  signingSecret: process.env.SLACK_SIGNING_SECRET,
  socketMode: true,
  appToken: process.env.SLACK_APP_TOKEN,
  logLevel: LogLevel.DEBUG,
})

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
})

// Handle app mentions
app.event('app_mention', async ({ event, context, client, say }) => {
  try {
    const { user, text, channel, thread_ts, ts } = event
    
    // Remove bot mention
    const botUserId = context.botUserId
    const cleanedText = text.replace(new RegExp(`<@${botUserId}>`, 'g'), '').trim()
    
    // Generate response
    const response = await generateLLMResponse(cleanedText, user)
    
    // Reply in thread
    await say({
      text: response,
      thread_ts: thread_ts || ts,
      channel,
    })
  } catch (error) {
    console.error('Error handling mention:', error)
    await say('Sorry, I encountered an error processing your request.')
  }
})

// Handle direct messages
app.message(async ({ message, say, client }) => {
  // Skip bot messages
  if (message.subtype || message.bot_id) return
  
  try {
    // Check if DM
    const channelInfo = await client.conversations.info({
      channel: message.channel,
    })
    
    if (channelInfo.channel?.is_im) {
      const response = await generateLLMResponse(message.text!, message.user!)
      await say(response)
    }
  } catch (error) {
    console.error('Error handling DM:', error)
  }
})

async function generateLLMResponse(text: string, userId: string): Promise<string> {
  try {
    // Get user info
    const userInfo = await app.client.users.info({ user: userId })
    const userName = userInfo.user?.real_name || 'there'
    
    const completion = await openai.chat.completions.create({
      model: 'gpt-4',
      messages: [
        {
          role: 'system',
          content: `You are a helpful Slack bot assistant powered by ParrotRouter.
          You're talking to ${userName}. Be friendly, concise, and helpful.
          Use Slack markdown for formatting.`,
        },
        {
          role: 'user',
          content: text,
        },
      ],
      temperature: 0.7,
      max_tokens: 500,
    })
    
    return completion.choices[0].message.content || 'I couldn't generate a response.'
  } catch (error) {
    console.error('LLM Error:', error)
    return 'I'm having trouble generating a response right now.'
  }
}

3. Slash Commands

Command Registration

In your Slack app settings, go to "Slash Commands" and create commands:

  • /ask [question] - Ask the AI a question
  • /summarize [url or text] - Summarize content
  • /help - Show available commands

Command Implementation

# slash_commands.py
from slack_bolt import App
from slack_sdk.models.blocks import SectionBlock, DividerBlock, MarkdownTextObject
import asyncio
from typing import Dict, Any

@app.command("/ask")
def handle_ask_command(ack, respond, command):
    """Handle /ask command"""
    # Acknowledge command request within 3 seconds
    ack()
    
    user_id = command["user_id"]
    text = command["text"]
    
    if not text:
        respond("Please provide a question after /ask")
        return
    
    # Show loading message
    respond("🤔 Thinking...")
    
    try:
        # Generate response
        response = generate_llm_response(text, user_id)
        
        # Format response with blocks
        blocks = [
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"*Question:* {text}"
                }
            },
            {"type": "divider"},
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"*Answer:*\n{response}"
                }
            }
        ]
        
        # Update with formatted response
        respond(blocks=blocks, replace_original=True)
        
    except Exception as e:
        respond(f"Error: {str(e)}", replace_original=True)

@app.command("/summarize")
async def handle_summarize_command(ack, respond, command, client):
    """Handle /summarize command"""
    ack()
    
    text = command["text"]
    
    if not text:
        respond("Please provide text or a URL to summarize")
        return
    
    # Check if it's a URL
    url_pattern = re.compile(r'https?://\S+')
    urls = url_pattern.findall(text)
    
    try:
        if urls:
            # Handle URL summarization
            content = await fetch_url_content(urls[0])
            summary = await summarize_content(content)
        else:
            # Summarize provided text
            summary = await summarize_content(text)
        
        blocks = [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": "📄 Summary"
                }
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": summary
                }
            }
        ]
        
        respond(blocks=blocks)
        
    except Exception as e:
        respond(f"Error summarizing: {str(e)}")

@app.command("/help")
def handle_help_command(ack, respond):
    """Handle /help command"""
    ack()
    
    help_text = """
*Available Commands:*
• `/ask [question]` - Ask me anything
• `/summarize [text or URL]` - Get a summary of text or webpage
• `/help` - Show this help message

*How to use me:*
• Mention me in any channel with @bot-name
• Send me a direct message
• Use slash commands for specific tasks

*Tips:*
• I remember context within threads
• I can analyze images if you share them
• I format code with ```language blocks
    """
    
    blocks = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": "🤖 Bot Help"
            }
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": help_text
            }
        }
    ]
    
    respond(blocks=blocks)

4. Interactive Components

Interactive Messages with Buttons

# interactive_components.py
@app.command("/feedback")
def handle_feedback_command(ack, respond, command):
    """Handle feedback command with interactive buttons"""
    ack()
    
    blocks = [
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": "How would you rate my response?"
            }
        },
        {
            "type": "actions",
            "elements": [
                {
                    "type": "button",
                    "text": {
                        "type": "plain_text",
                        "text": "👍 Good"
                    },
                    "style": "primary",
                    "action_id": "feedback_good",
                    "value": "good"
                },
                {
                    "type": "button",
                    "text": {
                        "type": "plain_text",
                        "text": "👎 Bad"
                    },
                    "style": "danger",
                    "action_id": "feedback_bad",
                    "value": "bad"
                }
            ]
        }
    ]
    
    respond(blocks=blocks)

@app.action("feedback_good")
def handle_good_feedback(ack, body, respond):
    """Handle positive feedback"""
    ack()
    
    user = body["user"]["id"]
    
    # Log feedback
    log_feedback(user, "good", body.get("message", {}).get("ts"))
    
    # Update message
    respond(
        text="Thanks for the positive feedback! 🎉",
        replace_original=True
    )

@app.action("feedback_bad")
def handle_bad_feedback(ack, body, client):
    """Handle negative feedback - open modal for details"""
    ack()
    
    client.views_open(
        trigger_id=body["trigger_id"],
        view={
            "type": "modal",
            "callback_id": "feedback_modal",
            "title": {
                "type": "plain_text",
                "text": "Feedback Details"
            },
            "submit": {
                "type": "plain_text",
                "text": "Submit"
            },
            "close": {
                "type": "plain_text",
                "text": "Cancel"
            },
            "blocks": [
                {
                    "type": "input",
                    "block_id": "feedback_input",
                    "element": {
                        "type": "plain_text_input",
                        "action_id": "feedback_text",
                        "multiline": True,
                        "placeholder": {
                            "type": "plain_text",
                            "text": "What could I improve?"
                        }
                    },
                    "label": {
                        "type": "plain_text",
                        "text": "Your Feedback"
                    }
                }
            ]
        }
    )

Modal Handling

@app.view("feedback_modal")
def handle_feedback_submission(ack, body, view, client):
    """Handle modal submission"""
    ack()
    
    # Extract values
    user = body["user"]["id"]
    feedback = view["state"]["values"]["feedback_input"]["feedback_text"]["value"]
    
    # Store feedback
    store_feedback(user, feedback)
    
    # Send confirmation DM
    client.chat_postMessage(
        channel=user,
        text=f"Thank you for your feedback! We'll use it to improve: \n\n_{feedback}_"
    )

# Select Menu Example
@app.command("/settings")
def show_settings(ack, respond, command):
    """Show settings with select menu"""
    ack()
    
    blocks = [
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": "Configure your bot preferences:"
            },
            "accessory": {
                "type": "static_select",
                "placeholder": {
                    "type": "plain_text",
                    "text": "Select model"
                },
                "action_id": "model_select",
                "options": [
                    {
                        "text": {
                            "type": "plain_text",
                            "text": "GPT-4 (Most capable)"
                        },
                        "value": "gpt-4"
                    },
                    {
                        "text": {
                            "type": "plain_text",
                            "text": "GPT-3.5 (Faster)"
                        },
                        "value": "gpt-3.5-turbo"
                    },
                    {
                        "text": {
                            "type": "plain_text",
                            "text": "Claude 3 (Anthropic)"
                        },
                        "value": "claude-3"
                    }
                ]
            }
        }
    ]
    
    respond(blocks=blocks)

@app.action("model_select")
def handle_model_selection(ack, body, client):
    """Handle model selection"""
    ack()
    
    selected_model = body["actions"][0]["selected_option"]["value"]
    user_id = body["user"]["id"]
    
    # Save user preference
    save_user_preference(user_id, "model", selected_model)
    
    # Send confirmation
    client.chat_postMessage(
        channel=user_id,
        text=f"✅ Model preference updated to: {selected_model}"
    )

5. Thread Management

Conversation Threading

# thread_management.py
from typing import List, Dict, Any
import redis
import json

class ThreadManager:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.ttl = 3600 * 24  # 24 hours
    
    def get_thread_context(self, thread_ts: str) -> List[Dict[str, Any]]:
        """Get conversation history for a thread"""
        key = f"thread:{thread_ts}"
        data = self.redis.get(key)
        
        if data:
            return json.loads(data)
        return []
    
    def add_to_thread(self, thread_ts: str, role: str, content: str):
        """Add message to thread context"""
        context = self.get_thread_context(thread_ts)
        context.append({
            "role": role,
            "content": content,
            "timestamp": datetime.now().isoformat()
        })
        
        # Keep only last 10 messages for context
        if len(context) > 10:
            context = context[-10:]
        
        self.redis.setex(
            f"thread:{thread_ts}",
            self.ttl,
            json.dumps(context)
        )
    
    def clear_thread(self, thread_ts: str):
        """Clear thread context"""
        self.redis.delete(f"thread:{thread_ts}")

# Initialize thread manager
thread_manager = ThreadManager(redis.from_url(REDIS_URL))

@app.event("app_mention")
def handle_mention_with_context(event, say, client):
    """Handle mentions with thread context"""
    user = event["user"]
    text = event["text"]
    thread_ts = event.get("thread_ts", event["ts"])
    
    # Get thread context
    context = thread_manager.get_thread_context(thread_ts)
    
    # Add current message to context
    thread_manager.add_to_thread(thread_ts, "user", text)
    
    # Generate response with context
    response = generate_contextual_response(text, context, user)
    
    # Add response to context
    thread_manager.add_to_thread(thread_ts, "assistant", response)
    
    # Reply in thread
    say(
        text=response,
        thread_ts=thread_ts
    )

def generate_contextual_response(
    message: str, 
    context: List[Dict[str, Any]], 
    user_id: str
) -> str:
    """Generate response with conversation context"""
    # Build messages for LLM
    messages = [
        {
            "role": "system",
            "content": """You are a helpful Slack bot. 
            You have access to the conversation history.
            Maintain context and refer to previous messages when relevant."""
        }
    ]
    
    # Add context messages
    for ctx_msg in context[:-1]:  # Exclude current message
        messages.append({
            "role": ctx_msg["role"],
            "content": ctx_msg["content"]
        })
    
    # Add current message
    messages.append({
        "role": "user",
        "content": message
    })
    
    # Generate response
    response = openai.ChatCompletion.create(
        model="gpt-4",
        messages=messages,
        temperature=0.7
    )
    
    return response.choices[0].message.content

# Thread summary command
@app.command("/summarize_thread")
def summarize_thread(ack, respond, command, client):
    """Summarize a conversation thread"""
    ack()
    
    # Get thread messages
    channel = command["channel_id"]
    
    # Find most recent thread in channel
    conversations = client.conversations_history(
        channel=channel,
        limit=20
    )
    
    thread_ts = None
    for message in conversations["messages"]:
        if "thread_ts" in message:
            thread_ts = message["thread_ts"]
            break
    
    if not thread_ts:
        respond("No recent threads found in this channel")
        return
    
    # Get thread replies
    thread_messages = client.conversations_replies(
        channel=channel,
        ts=thread_ts
    )
    
    # Create summary
    summary_prompt = "Summarize this Slack conversation:\n\n"
    for msg in thread_messages["messages"]:
        summary_prompt += f"{msg.get('user', 'Unknown')}: {msg.get('text', '')}\n"
    
    summary = generate_llm_response(summary_prompt, command["user_id"])
    
    respond(f"*Thread Summary:*\n{summary}")

6. Rate Limiting

Rate Limiter Implementation

# rate_limiting.py
from typing import Dict, Tuple
import time
import redis
from functools import wraps

class RateLimiter:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        
        # Slack API limits
        self.slack_limits = {
            "chat.postMessage": (1, 1),  # 1 per second
            "conversations.history": (50, 60),  # 50 per minute
            "users.info": (100, 60),  # 100 per minute
        }
        
        # LLM API limits (example)
        self.llm_limits = {
            "openai": (60, 60),  # 60 per minute
            "anthropic": (50, 60),  # 50 per minute
        }
    
    def check_rate_limit(self, key: str, max_requests: int, window: int) -> Tuple[bool, int]:
        """Check if request is within rate limit"""
        now = time.time()
        pipeline = self.redis.pipeline()
        pipeline.zremrangebyscore(key, 0, now - window)
        pipeline.zadd(key, {str(now): now})
        pipeline.zcount(key, now - window, now)
        pipeline.expire(key, window)
        
        results = pipeline.execute()
        current_requests = results[2]
        
        if current_requests > max_requests:
            # Calculate wait time
            oldest = self.redis.zrange(key, 0, 0, withscores=True)
            if oldest:
                wait_time = int(oldest[0][1] + window - now) + 1
                return False, wait_time
            return False, window
        
        return True, 0
    
    def rate_limit_slack(self, method: str):
        """Decorator for Slack API rate limiting"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                if method in self.slack_limits:
                    limit, window = self.slack_limits[method]
                    key = f"slack_rate:{method}"
                    
                    allowed, wait_time = self.check_rate_limit(key, limit, window)
                    if not allowed:
                        time.sleep(wait_time)
                
                return func(*args, **kwargs)
            return wrapper
        return decorator
    
    def rate_limit_llm(self, provider: str):
        """Decorator for LLM API rate limiting"""
        def decorator(func):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                if provider in self.llm_limits:
                    limit, window = self.llm_limits[provider]
                    key = f"llm_rate:{provider}"
                    
                    allowed, wait_time = self.check_rate_limit(key, limit, window)
                    if not allowed:
                        await asyncio.sleep(wait_time)
                
                return await func(*args, **kwargs)
            return wrapper
        return decorator

# Initialize rate limiter
rate_limiter = RateLimiter(redis.from_url(REDIS_URL))

# Usage example
@rate_limiter.rate_limit_slack("chat.postMessage")
def send_message(channel: str, text: str):
    """Rate-limited message sending"""
    return app.client.chat_postMessage(
        channel=channel,
        text=text
    )

@rate_limiter.rate_limit_llm("openai")
async def call_openai(messages: List[Dict]):
    """Rate-limited OpenAI call"""
    return await openai.ChatCompletion.acreate(
        model="gpt-4",
        messages=messages
    )

# User-level rate limiting
class UserRateLimiter:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.user_limits = {
            "default": (10, 60),  # 10 requests per minute
            "premium": (100, 60),  # 100 requests per minute
        }
    
    def check_user_limit(self, user_id: str, user_type: str = "default") -> Tuple[bool, str]:
        """Check user-specific rate limits"""
        limit, window = self.user_limits.get(user_type, self.user_limits["default"])
        key = f"user_rate:{user_id}"
        
        allowed, wait_time = rate_limiter.check_rate_limit(key, limit, window)
        
        if not allowed:
            return False, f"Rate limit exceeded. Please wait {wait_time} seconds."
        
        return True, ""

user_limiter = UserRateLimiter(redis.from_url(REDIS_URL))

@app.event("app_mention")
def handle_mention_with_rate_limit(event, say):
    """Handle mention with user rate limiting"""
    user_id = event["user"]
    
    # Check user rate limit
    allowed, message = user_limiter.check_user_limit(user_id)
    if not allowed:
        say(message, thread_ts=event.get("thread_ts", event["ts"]))
        return
    
    # Process request
    handle_app_mention(event, say)

Rate Limiting Best Practices

  • • Implement exponential backoff for retries
  • • Cache user information to reduce API calls
  • • Use Slack's rate limit headers to adjust dynamically
  • • Queue non-urgent messages for batch processing
  • • Monitor rate limit usage and alert on high usage

7. Context Management

Advanced Context Manager

# context_management.py
from typing import List, Dict, Any, Optional
import redis
import json
from datetime import datetime, timedelta
from langchain.memory import ConversationBufferWindowMemory
from langchain.schema import BaseMessage, HumanMessage, AIMessage

class ConversationContextManager:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.memories = {}  # In-memory cache for active conversations
        self.ttl = 3600 * 24  # 24 hours
        self.max_messages = 20  # Maximum messages to keep
    
    def get_or_create_memory(self, conversation_id: str) -> ConversationBufferWindowMemory:
        """Get or create conversation memory"""
        if conversation_id not in self.memories:
            # Try to load from Redis
            stored_messages = self.load_conversation(conversation_id)
            
            memory = ConversationBufferWindowMemory(
                k=self.max_messages,
                return_messages=True
            )
            
            # Restore messages
            for msg in stored_messages:
                if msg["role"] == "human":
                    memory.chat_memory.add_user_message(msg["content"])
                elif msg["role"] == "ai":
                    memory.chat_memory.add_ai_message(msg["content"])
            
            self.memories[conversation_id] = memory
        
        return self.memories[conversation_id]
    
    def add_message(
        self, 
        conversation_id: str, 
        role: str, 
        content: str,
        metadata: Optional[Dict] = None
    ):
        """Add message to conversation"""
        memory = self.get_or_create_memory(conversation_id)
        
        if role == "human":
            memory.chat_memory.add_user_message(content)
        elif role == "ai":
            memory.chat_memory.add_ai_message(content)
        
        # Save to Redis
        self.save_conversation(conversation_id, memory, metadata)
    
    def save_conversation(
        self, 
        conversation_id: str, 
        memory: ConversationBufferWindowMemory,
        metadata: Optional[Dict] = None
    ):
        """Save conversation to Redis"""
        messages = []
        
        for message in memory.chat_memory.messages:
            msg_dict = {
                "role": "human" if isinstance(message, HumanMessage) else "ai",
                "content": message.content,
                "timestamp": datetime.now().isoformat()
            }
            
            if metadata:
                msg_dict["metadata"] = metadata
            
            messages.append(msg_dict)
        
        key = f"conversation:{conversation_id}"
        self.redis.setex(
            key,
            self.ttl,
            json.dumps(messages)
        )
    
    def load_conversation(self, conversation_id: str) -> List[Dict]:
        """Load conversation from Redis"""
        key = f"conversation:{conversation_id}"
        data = self.redis.get(key)
        
        if data:
            return json.loads(data)
        return []
    
    def get_context_summary(self, conversation_id: str) -> str:
        """Get a summary of the conversation context"""
        memory = self.get_or_create_memory(conversation_id)
        messages = memory.chat_memory.messages
        
        if not messages:
            return "No previous context."
        
        # Create a brief summary
        summary = f"Previous {len(messages)} messages in conversation:\n"
        for i, msg in enumerate(messages[-5:]):  # Last 5 messages
            role = "User" if isinstance(msg, HumanMessage) else "Assistant"
            summary += f"{role}: {msg.content[:100]}...\n"
        
        return summary
    
    def clear_old_conversations(self, days: int = 7):
        """Clear conversations older than specified days"""
        pattern = "conversation:*"
        cursor = 0
        
        while True:
            cursor, keys = self.redis.scan(cursor, match=pattern)
            
            for key in keys:
                data = self.redis.get(key)
                if data:
                    messages = json.loads(data)
                    if messages:
                        last_timestamp = messages[-1].get("timestamp")
                        if last_timestamp:
                            last_date = datetime.fromisoformat(last_timestamp)
                            if datetime.now() - last_date > timedelta(days=days):
                                self.redis.delete(key)
            
            if cursor == 0:
                break

# Initialize context manager
context_manager = ConversationContextManager(redis.from_url(REDIS_URL))

# Integration with bot
@app.event("app_mention")
def handle_mention_with_context(event, say):
    """Handle mention with full context management"""
    user_id = event["user"]
    channel = event["channel"]
    text = event["text"]
    thread_ts = event.get("thread_ts", event["ts"])
    
    # Create conversation ID (channel + thread)
    conversation_id = f"{channel}:{thread_ts}"
    
    # Add user message to context
    context_manager.add_message(
        conversation_id,
        "human",
        text,
        metadata={"user_id": user_id, "channel": channel}
    )
    
    # Get conversation memory
    memory = context_manager.get_or_create_memory(conversation_id)
    
    # Generate response with context
    messages = [
        {
            "role": "system",
            "content": "You are a helpful Slack bot assistant powered by ParrotRouter."
        }
    ]
    
    # Add conversation history
    for msg in memory.chat_memory.messages:
        if isinstance(msg, HumanMessage):
            messages.append({"role": "user", "content": msg.content})
        else:
            messages.append({"role": "assistant", "content": msg.content})
    
    # Generate response
    response = openai.ChatCompletion.create(
        model="gpt-4",
        messages=messages,
        temperature=0.7
    )
    
    ai_response = response.choices[0].message.content
    
    # Add AI response to context
    context_manager.add_message(conversation_id, "ai", ai_response)
    
    # Send response
    say(text=ai_response, thread_ts=thread_ts)

8. File & Image Handling

File Processing

# file_handling.py
import requests
from typing import Optional, Dict, Any
import mimetypes
import base64
from PIL import Image
import io
import PyPDF2
import docx

class FileHandler:
    def __init__(self, bot_token: str):
        self.bot_token = bot_token
        self.headers = {"Authorization": f"Bearer {bot_token}"}
    
    def download_file(self, file_url: str) -> bytes:
        """Download file from Slack"""
        response = requests.get(file_url, headers=self.headers)
        response.raise_for_status()
        return response.content
    
    def process_file(self, file_info: Dict[str, Any]) -> Dict[str, Any]:
        """Process different file types"""
        file_url = file_info["url_private"]
        mimetype = file_info["mimetype"]
        filename = file_info["name"]
        
        # Download file
        file_content = self.download_file(file_url)
        
        result = {
            "filename": filename,
            "mimetype": mimetype,
            "size": file_info["size"]
        }
        
        # Process based on file type
        if mimetype.startswith("image/"):
            result.update(self.process_image(file_content, mimetype))
        elif mimetype == "application/pdf":
            result.update(self.process_pdf(file_content))
        elif mimetype in ["application/vnd.openxmlformats-officedocument.wordprocessingml.document", 
                         "application/msword"]:
            result.update(self.process_word(file_content))
        elif mimetype.startswith("text/"):
            result.update(self.process_text(file_content))
        else:
            result["error"] = f"Unsupported file type: {mimetype}"
        
        return result
    
    def process_image(self, content: bytes, mimetype: str) -> Dict[str, Any]:
        """Process image for vision models"""
        # Open image
        image = Image.open(io.BytesIO(content))
        
        # Resize if too large
        max_size = (1024, 1024)
        if image.size[0] > max_size[0] or image.size[1] > max_size[1]:
            image.thumbnail(max_size, Image.LANCZOS)
        
        # Convert to base64
        buffered = io.BytesIO()
        image.save(buffered, format=image.format or "PNG")
        base64_image = base64.b64encode(buffered.getvalue()).decode()
        
        return {
            "type": "image",
            "base64": base64_image,
            "dimensions": image.size,
            "mode": image.mode
        }
    
    def process_pdf(self, content: bytes) -> Dict[str, Any]:
        """Extract text from PDF"""
        pdf_reader = PyPDF2.PdfReader(io.BytesIO(content))
        text = ""
        
        for page_num in range(len(pdf_reader.pages)):
            page = pdf_reader.pages[page_num]
            text += page.extract_text() + "\n"
        
        return {
            "type": "pdf",
            "text": text.strip(),
            "pages": len(pdf_reader.pages)
        }
    
    def process_word(self, content: bytes) -> Dict[str, Any]:
        """Extract text from Word document"""
        doc = docx.Document(io.BytesIO(content))
        text = "\n".join([paragraph.text for paragraph in doc.paragraphs])
        
        return {
            "type": "word",
            "text": text.strip(),
            "paragraphs": len(doc.paragraphs)
        }
    
    def process_text(self, content: bytes) -> Dict[str, Any]:
        """Process text file"""
        try:
            text = content.decode('utf-8')
        except UnicodeDecodeError:
            text = content.decode('latin-1')
        
        return {
            "type": "text",
            "text": text
        }

# Initialize file handler
file_handler = FileHandler(SLACK_BOT_TOKEN)

@app.event("file_shared")
def handle_file_shared(event, say, client):
    """Handle file sharing events"""
    file_id = event["file_id"]
    
    # Get file info
    file_info = client.files_info(file=file_id)["file"]
    
    # Process file
    try:
        result = file_handler.process_file(file_info)
        
        if "error" in result:
            say(f"Sorry, I couldn't process this file: {result['error']}")
            return
        
        # Handle based on file type
        if result["type"] == "image":
            # Use vision model
            response = analyze_image_with_llm(result["base64"])
            say(f"*Image Analysis:*\n{response}")
            
        elif result["type"] in ["pdf", "word", "text"]:
            # Summarize or answer questions about the document
            text = result["text"][:4000]  # Limit text length
            
            summary = summarize_document(text)
            say(f"*Document Summary:*\n{summary}")
            
            # Offer to answer questions
            say("Feel free to ask me questions about this document!")
            
            # Store document in context
            thread_ts = event.get("thread_ts", event["ts"])
            context_manager.add_message(
                f"{event['channel']}:{thread_ts}",
                "system",
                f"Document content: {text}",
                metadata={"file_id": file_id, "filename": file_info["name"]}
            )
    
    except Exception as e:
        logger.error(f"Error processing file: {e}")
        say("Sorry, I encountered an error processing this file.")

def analyze_image_with_llm(base64_image: str) -> str:
    """Analyze image using vision model"""
    response = openai.ChatCompletion.create(
        model="gpt-4-vision-preview",
        messages=[
            {
                "role": "system",
                "content": "You are analyzing an image shared in Slack. Provide a helpful description."
            },
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": "What's in this image?"
                    },
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/png;base64,{base64_image}"
                        }
                    }
                ]
            }
        ],
        max_tokens=500
    )
    
    return response.choices[0].message.content

9. Security Best Practices

Request Verification

# security.py
import hmac
import hashlib
import time
from typing import Dict, Any

class SlackSecurityHandler:
    def __init__(self, signing_secret: str):
        self.signing_secret = signing_secret
    
    def verify_request(self, headers: Dict[str, str], body: str) -> bool:
        """Verify Slack request signature"""
        timestamp = headers.get("X-Slack-Request-Timestamp", "")
        signature = headers.get("X-Slack-Signature", "")
        
        # Check timestamp to prevent replay attacks
        if abs(time.time() - float(timestamp)) > 60 * 5:
            return False
        
        # Create signature base string
        sig_basestring = f"v0:{timestamp}:{body}"
        
        # Calculate expected signature
        my_signature = "v0=" + hmac.new(
            self.signing_secret.encode(),
            sig_basestring.encode(),
            hashlib.sha256
        ).hexdigest()
        
        # Compare signatures
        return hmac.compare_digest(my_signature, signature)

# Input sanitization
class InputSanitizer:
    @staticmethod
    def sanitize_user_input(text: str) -> str:
        """Sanitize user input before processing"""
        # Remove potential injection attempts
        sanitized = text.strip()
        
        # Remove control characters
        sanitized = ''.join(char for char in sanitized if ord(char) >= 32)
        
        # Limit length
        max_length = 4000
        if len(sanitized) > max_length:
            sanitized = sanitized[:max_length]
        
        return sanitized
    
    @staticmethod
    def sanitize_llm_output(text: str) -> str:
        """Sanitize LLM output before sending to Slack"""
        # Remove any potential Slack tokens or secrets
        import re
        
        # Remove anything that looks like a token
        text = re.sub(r'xox[baprs]-[0-9a-zA-Z-]+', '[REDACTED]', text)
        
        # Remove email addresses
        text = re.sub(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '[EMAIL]', text)
        
        # Escape special Slack characters if needed
        # But preserve formatting
        
        return text

# Secure configuration
class SecureConfig:
    def __init__(self):
        self.required_env_vars = [
            "SLACK_SIGNING_SECRET",
            "SLACK_BOT_TOKEN",
            "OPENAI_API_KEY",
            "REDIS_URL"
        ]
        
        self.validate_environment()
    
    def validate_environment(self):
        """Ensure all required environment variables are set"""
        missing = []
        
        for var in self.required_env_vars:
            if not os.environ.get(var):
                missing.append(var)
        
        if missing:
            raise ValueError(f"Missing required environment variables: {missing}")
    
    @property
    def slack_signing_secret(self) -> str:
        return os.environ["SLACK_SIGNING_SECRET"]
    
    @property
    def slack_bot_token(self) -> str:
        return os.environ["SLACK_BOT_TOKEN"]
    
    @property
    def openai_api_key(self) -> str:
        return os.environ["OPENAI_API_KEY"]

# Audit logging
class AuditLogger:
    def __init__(self, logger):
        self.logger = logger
    
    def log_interaction(
        self, 
        user_id: str, 
        channel: str, 
        input_text: str, 
        output_text: str,
        metadata: Dict[str, Any] = None
    ):
        """Log user interactions for audit trail"""
        self.logger.info(
            "User interaction",
            extra={
                "user_id": user_id,
                "channel": channel,
                "input_length": len(input_text),
                "output_length": len(output_text),
                "timestamp": datetime.now().isoformat(),
                "metadata": metadata or {}
            }
        )
    
    def log_error(self, error: Exception, context: Dict[str, Any]):
        """Log errors with context"""
        self.logger.error(
            f"Error: {str(error)}",
            extra={
                "error_type": type(error).__name__,
                "context": context,
                "timestamp": datetime.now().isoformat()
            }
        )

Security Checklist

  • ✓ Never expose tokens in logs or responses
  • ✓ Verify all Slack requests with signing secret
  • ✓ Sanitize all user inputs before processing
  • ✓ Use environment variables for secrets
  • ✓ Implement request timeouts
  • ✓ Log security events for audit
  • ✓ Regularly rotate API keys
  • ✓ Use HTTPS for all external calls

10. Deployment Options

Docker Deployment

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Create non-root user
RUN useradd -m -u 1000 slackbot && chown -R slackbot:slackbot /app
USER slackbot

# Run the bot
CMD ["python", "bot.py"]

Docker Compose

# docker-compose.yml
version: '3.8'

services:
  slack-bot:
    build: .
    environment:
      - SLACK_SIGNING_SECRET=${SLACK_SIGNING_SECRET}
      - SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN}
      - SLACK_APP_TOKEN=${SLACK_APP_TOKEN}
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis
    restart: unless-stopped
    logging:
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "3"
  
  redis:
    image: redis:7-alpine
    command: redis-server --appendonly yes
    volumes:
      - redis_data:/data
    restart: unless-stopped

volumes:
  redis_data:

AWS Lambda Deployment

# lambda_handler.py
from slack_bolt import App
from slack_bolt.adapter.aws_lambda import SlackRequestHandler
import os

# Initialize app
app = App(
    token=os.environ["SLACK_BOT_TOKEN"],
    signing_secret=os.environ["SLACK_SIGNING_SECRET"],
    process_before_response=True  # Important for Lambda
)

# Register handlers (import from your modules)
from handlers import register_handlers
register_handlers(app)

# Lambda handler
def lambda_handler(event, context):
    slack_handler = SlackRequestHandler(app=app)
    return slack_handler.handle(event, context)

# serverless.yml
service: slack-llm-bot

provider:
  name: aws
  runtime: python3.11
  region: us-east-1
  timeout: 30
  environment:
    SLACK_BOT_TOKEN: ${env:SLACK_BOT_TOKEN}
    SLACK_SIGNING_SECRET: ${env:SLACK_SIGNING_SECRET}
    OPENAI_API_KEY: ${env:OPENAI_API_KEY}

functions:
  slack:
    handler: lambda_handler.lambda_handler
    events:
      - http:
          path: slack/events
          method: post
          cors: true
    layers:
      - arn:aws:lambda:${aws:region}:xxx:layer:slack-bolt-python:1

plugins:
  - serverless-python-requirements

custom:
  pythonRequirements:
    dockerizePip: true

Kubernetes Deployment

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: slack-llm-bot
spec:
  replicas: 2
  selector:
    matchLabels:
      app: slack-llm-bot
  template:
    metadata:
      labels:
        app: slack-llm-bot
    spec:
      containers:
      - name: bot
        image: your-registry/slack-llm-bot:latest
        env:
        - name: SLACK_BOT_TOKEN
          valueFrom:
            secretKeyRef:
              name: slack-secrets
              key: bot-token
        - name: SLACK_SIGNING_SECRET
          valueFrom:
            secretKeyRef:
              name: slack-secrets
              key: signing-secret
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: llm-secrets
              key: openai-key
        - name: REDIS_URL
          value: redis://redis-service:6379
        resources:
          requests:
            memory: "256Mi"
            cpu: "100m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 3000
          initialDelaySeconds: 30
          periodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
  name: slack-bot-service
spec:
  selector:
    app: slack-llm-bot
  ports:
  - port: 80
    targetPort: 3000
  type: LoadBalancer

Production Monitoring

# monitoring.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
import time

# Metrics
message_counter = Counter(
    'slack_bot_messages_total',
    'Total messages processed',
    ['event_type', 'status']
)

response_time = Histogram(
    'slack_bot_response_time_seconds',
    'Response time in seconds',
    ['command']
)

active_conversations = Gauge(
    'slack_bot_active_conversations',
    'Number of active conversations'
)

llm_tokens_used = Counter(
    'slack_bot_llm_tokens_total',
    'Total LLM tokens used',
    ['model', 'operation']
)

# Monitoring decorator
def monitor_performance(operation: str):
    def decorator(func):
        def wrapper(*args, **kwargs):
            start_time = time.time()
            try:
                result = func(*args, **kwargs)
                message_counter.labels(
                    event_type=operation,
                    status='success'
                ).inc()
                return result
            except Exception as e:
                message_counter.labels(
                    event_type=operation,
                    status='error'
                ).inc()
                raise
            finally:
                response_time.labels(command=operation).observe(
                    time.time() - start_time
                )
        return wrapper
    return decorator

# Health check endpoint
@app.route("/health")
def health_check():
    return {"status": "healthy", "timestamp": datetime.now().isoformat()}

@app.route("/metrics")
def metrics():
    return generate_latest()

Complete Bot Example

# main.py - Complete Slack LLM Bot
import os
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
import openai
import redis
from datetime import datetime
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize services
app = App(
    token=os.environ["SLACK_BOT_TOKEN"],
    signing_secret=os.environ["SLACK_SIGNING_SECRET"]
)
openai.api_key = os.environ["OPENAI_API_KEY"]
redis_client = redis.from_url(os.environ["REDIS_URL"])

# Initialize managers
from context_management import ConversationContextManager
from rate_limiting import RateLimiter, UserRateLimiter
from file_handling import FileHandler
from security import InputSanitizer, AuditLogger

context_manager = ConversationContextManager(redis_client)
rate_limiter = RateLimiter(redis_client)
user_limiter = UserRateLimiter(redis_client)
file_handler = FileHandler(os.environ["SLACK_BOT_TOKEN"])
sanitizer = InputSanitizer()
audit_logger = AuditLogger(logger)

# Main event handlers
@app.event("app_mention")
def handle_app_mention(event, say, client):
    """Handle when someone mentions the bot"""
    user_id = event["user"]
    text = sanitizer.sanitize_user_input(event["text"])
    thread_ts = event.get("thread_ts", event["ts"])
    
    # Check rate limits
    allowed, message = user_limiter.check_user_limit(user_id)
    if not allowed:
        say(message, thread_ts=thread_ts)
        return
    
    try:
        # Get conversation context
        conversation_id = f"{event['channel']}:{thread_ts}"
        context_manager.add_message(conversation_id, "human", text)
        
        # Generate response
        memory = context_manager.get_or_create_memory(conversation_id)
        messages = [
            {
                "role": "system",
                "content": "You are a helpful Slack bot powered by ParrotRouter."
            }
        ]
        
        # Add conversation history
        for msg in memory.chat_memory.messages[-10:]:
            role = "user" if msg.type == "human" else "assistant"
            messages.append({"role": role, "content": msg.content})
        
        # Generate response
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=messages,
            temperature=0.7,
            max_tokens=500
        )
        
        ai_response = sanitizer.sanitize_llm_output(
            response.choices[0].message.content
        )
        
        # Add to context
        context_manager.add_message(conversation_id, "ai", ai_response)
        
        # Log interaction
        audit_logger.log_interaction(
            user_id=user_id,
            channel=event["channel"],
            input_text=text,
            output_text=ai_response
        )
        
        # Send response
        say(text=ai_response, thread_ts=thread_ts)
        
    except Exception as e:
        logger.error(f"Error in app_mention: {e}")
        audit_logger.log_error(e, {"event": event})
        say("Sorry, I encountered an error. Please try again.", thread_ts=thread_ts)

# Slash commands
@app.command("/ask")
def handle_ask(ack, respond, command):
    ack()
    
    text = sanitizer.sanitize_user_input(command["text"])
    if not text:
        respond("Please provide a question after /ask")
        return
    
    try:
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "Answer concisely."},
                {"role": "user", "content": text}
            ],
            max_tokens=300
        )
        
        answer = sanitizer.sanitize_llm_output(
            response.choices[0].message.content
        )
        
        respond(blocks=[
            {
                "type": "section",
                "text": {"type": "mrkdwn", "text": f"*Q:* {text}"}
            },
            {"type": "divider"},
            {
                "type": "section",
                "text": {"type": "mrkdwn", "text": f"*A:* {answer}"}
            }
        ])
        
    except Exception as e:
        logger.error(f"Error in /ask: {e}")
        respond("Sorry, I couldn't process your question.")

# Start the bot
if __name__ == "__main__":
    # Socket Mode
    handler = SocketModeHandler(app, os.environ["SLACK_APP_TOKEN"])
    handler.start()
    
    # Or use Flask for HTTP mode
    # from flask import Flask, request
    # from slack_bolt.adapter.flask import SlackRequestHandler
    # 
    # flask_app = Flask(__name__)
    # handler = SlackRequestHandler(app)
    # 
    # @flask_app.route("/slack/events", methods=["POST"])
    # def slack_events():
    #     return handler.handle(request)
    # 
    # flask_app.run(port=3000)

References & Citations

Ready to Build Your Slack Bot?

Create intelligent Slack bots powered by LLMs using ParrotRouter's unified API gateway.

References
  1. [1] AWS. "Lambda Documentation" (2024)
  2. [2] Vercel. "Streaming Responses" (2024)
  3. [3] GitHub. "OpenAI Node.js Library" (2024)