Skip to content
ad1107 edited this page May 24, 2025 · 2 revisions

Security Guide

Comprehensive security implementation guide for UtilsBot+ covering permissions, best practices, and security features.

📋 Table of Contents


Security Architecture

Multi-Layer Security Model

┌─────────────────────────────────────────────────────────────┐
│                    User Input Layer                         │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐    │
│  │  Discord    │ │ Slash Cmd   │ │   Rate Limiting     │    │
│  │ Validation  │ │ Validation  │ │   & Cooldowns       │    │
│  └─────────────┘ └─────────────┘ └─────────────────────┘    │
└──────────────────────┬──────────────────────────────────────┘
                       │
┌──────────────────────▼──────────────────────────────────────┐
│                 Permission Layer                            │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐    │
│  │  Developer  │ │  Whitelist  │ │    Guild/User       │    │
│  │   Checks    │ │   Checks    │ │    Permissions      │    │
│  └─────────────┘ └─────────────┘ └─────────────────────┘    │
└──────────────────────┬──────────────────────────────────────┘
                       │
┌──────────────────────▼──────────────────────────────────────┐
│                Application Layer                            │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐    │
│  │   Input     │ │   Output    │ │     Error           │    │
│  │ Sanitization│ │ Filtering   │ │   Handling          │    │
│  └─────────────┘ └─────────────┘ └─────────────────────┘    │
└──────────────────────┬──────────────────────────────────────┘
                       │
┌──────────────────────▼──────────────────────────────────────┐
│                  Data Layer                                 │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐    │
│  │  Database   │ │   API Key   │ │    Secrets          │    │
│  │ Protection  │ │ Management  │ │   Management        │    │
│  └─────────────┘ └─────────────┘ └─────────────────────┘    │
└─────────────────────────────────────────────────────────────┘

Security Principles

  1. Defense in Depth: Multiple security layers
  2. Least Privilege: Minimal required permissions
  3. Input Validation: All user input sanitized
  4. Secure by Default: Safe configuration defaults
  5. Fail Securely: Graceful security failures
  6. Audit Everything: Complete action logging

Access Control

Permission Hierarchy

Developer (Bot Owner)
├── Full system access
├── Can execute any command
├── Can manage whitelist
├── Can reload/sync commands
└── Bypass all restrictions

Whitelisted User (Beta Access)
├── Access to all public commands
├── Subject to rate limits
├── Cannot access dev commands
└── Database tracked usage

Public User (Open Access)
├── Limited command access
├── Subject to rate limits
├── No dev commands
└── Basic functionality only

Blacklisted User
├── No command access
├── All interactions blocked
├── Logged for monitoring
└── Cannot use any features

Implementation

Developer Check Decorator:

def dev_only():
    """Restrict command to developers only."""
    async def predicate(interaction: discord.Interaction) -> bool:
        user_id = interaction.user.id
        return user_id in settings.dev_ids
    
    return app_commands.check(predicate)

# Usage
@dev_only()
@app_commands.command(name="eval", description="Execute Python code")
async def eval_command(self, interaction: discord.Interaction, code: str):
    # Developer-only functionality
    pass

Whitelist Check Decorator:

def requires_whitelist():
    """Check if user is whitelisted (if beta mode enabled)."""
    async def predicate(interaction: discord.Interaction) -> bool:
        # Developers always have access
        if interaction.user.id in settings.dev_ids:
            return True
        
        # If not in closed beta, allow all users
        if not settings.closed_beta:
            return True
        
        # Check database whitelist
        async with bot.db.async_session() as session:
            user = await session.get(User, {'discord_id': str(interaction.user.id)})
            return user and user.is_whitelisted and not user.is_blacklisted
    
    return app_commands.check(predicate)

# Usage
@requires_whitelist()
@app_commands.command(name="ask", description="Ask AI a question")
async def ask_command(self, interaction: discord.Interaction, question: str):
    # Whitelisted functionality
    pass

Blacklist Protection:

async def check_blacklist(user_id: int) -> bool:
    """Check if user is blacklisted."""
    async with bot.db.async_session() as session:
        user = await session.get(User, {'discord_id': str(user_id)})
        return user and user.is_blacklisted

# Global check in bot core
@bot.check
async def global_blacklist_check(interaction: discord.Interaction):
    if await check_blacklist(interaction.user.id):
        await interaction.response.send_message(
            "❌ You are blacklisted from using this bot.", 
            ephemeral=True
        )
        return False
    return True

Rate Limiting & Cooldowns

Command-Level Rate Limiting:

from discord.ext import commands

# Rate limiting decorator
@app_commands.command()
@cooldown(rate=3, per=60)  # 3 commands per 60 seconds
async def ai_command(self, interaction: discord.Interaction):
    # Rate-limited command
    pass

# Global rate limiting
@app_commands.command()
@cooldown(rate=5, per=60)  # Default: 5 commands per minute
async def general_command(self, interaction: discord.Interaction):
    pass

API-Specific Rate Limiting:

async def check_api_rate_limit(user_id: str, api_name: str) -> bool:
    """Check if user has exceeded API rate limits."""
    async with bot.db.async_session() as session:
        usage = await session.get(APIUsage, {
            'user_discord_id': user_id,
            'api_name': api_name
        })
        
        if not usage:
            return True  # First time usage
        
        # Check daily limits
        if usage.daily_reset_date < date.today():
            usage.daily_usage_count = 0
            usage.daily_reset_date = date.today()
        
        # API-specific limits
        daily_limits = {
            'gemini': 50,
            'screenshot': 20,
            'ip-api': 100
        }
        
        limit = daily_limits.get(api_name, 10)
        return usage.daily_usage_count < limit

Input Validation

User Input Sanitization

URL Validation:

import re
from urllib.parse import urlparse

def validate_url(url: str) -> tuple[bool, str]:
    """Validate and sanitize URLs."""
    try:
        # Basic format validation
        if not url.startswith(('http://', 'https://')):
            return False, "URL must start with http:// or https://"
        
        parsed = urlparse(url)
        
        # Check for valid domain
        if not parsed.netloc:
            return False, "Invalid domain in URL"
        
        # Prevent local/private IPs
        if is_private_ip(parsed.hostname):
            return False, "Private IP addresses are not allowed"
        
        # Length limits
        if len(url) > 2000:
            return False, "URL too long (max 2000 characters)"
        
        return True, url
        
    except Exception as e:
        return False, f"Invalid URL format: {str(e)}"

def is_private_ip(hostname: str) -> bool:
    """Check if hostname resolves to private IP."""
    import ipaddress
    import socket
    
    try:
        ip = socket.gethostbyname(hostname)
        ip_obj = ipaddress.ip_address(ip)
        return ip_obj.is_private or ip_obj.is_loopback
    except:
        return False

Text Input Sanitization:

import html
import re

def sanitize_text_input(text: str, max_length: int = 2000) -> str:
    """Sanitize user text input."""
    # HTML escape
    text = html.escape(text)
    
    # Remove potentially dangerous patterns
    text = re.sub(r'<[^>]*>', '', text)  # Remove HTML tags
    text = re.sub(r'javascript:', '', text, flags=re.IGNORECASE)  # Remove JS
    text = re.sub(r'data:', '', text, flags=re.IGNORECASE)  # Remove data URLs
    
    # Length limits
    if len(text) > max_length:
        text = text[:max_length] + "..."
    
    # Remove excessive whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    
    return text

TOTP Secret Validation:

import base64
import re

def validate_totp_secret(secret: str) -> tuple[bool, str]:
    """Validate TOTP secret format."""
    try:
        # Remove whitespace and convert to uppercase
        secret = re.sub(r'\s', '', secret.upper())
        
        # Check base32 format
        if not re.match(r'^[A-Z2-7]+=*$', secret):
            return False, "Invalid base32 format"
        
        # Check length (typically 16 or 32 characters)
        if len(secret) < 16 or len(secret) > 64:
            return False, "Secret length must be between 16-64 characters"
        
        # Try to decode to validate
        base64.b32decode(secret)
        
        return True, secret
        
    except Exception as e:
        return False, f"Invalid TOTP secret: {str(e)}"

Parameter Validation

Slash Command Parameter Validation:

from discord import app_commands
from typing import Literal

@app_commands.command()
@app_commands.describe(
    text="Text to encode/decode (max 2000 chars)",
    operation="Choose encode or decode"
)
async def base64_command(
    self,
    interaction: discord.Interaction,
    text: app_commands.Range[str, 1, 2000],  # Length validation
    operation: Literal["encode", "decode"]   # Choice validation
):
    # Parameter validation handled by Discord
    pass

# Custom transformers for complex validation
class URLTransformer(app_commands.Transformer):
    async def transform(self, interaction: discord.Interaction, value: str) -> str:
        is_valid, result = validate_url(value)
        if not is_valid:
            raise app_commands.TransformerError(result)
        return result

@app_commands.command()
async def screenshot_command(
    self,
    interaction: discord.Interaction,
    url: app_commands.Transform[str, URLTransformer]
):
    # URL is pre-validated
    pass

API Security

External API Key Management

Environment-Based Configuration:

from pydantic import BaseSettings, validator
from typing import Optional

class Settings(BaseSettings):
    # Required API keys
    bot_token: str
    gemini_api_key: str
    
    # Optional API keys
    screenshot_api_key: Optional[str] = None
    rapidapi_key: Optional[str] = None
    sentry_dsn: Optional[str] = None
    
    @validator('bot_token')
    def validate_bot_token(cls, v):
        if not v or len(v) < 50:
            raise ValueError('Invalid Discord bot token')
        return v
    
    @validator('gemini_api_key')
    def validate_gemini_key(cls, v):
        if not v or not v.startswith('AI'):
            raise ValueError('Invalid Gemini API key format')
        return v
    
    class Config:
        env_file = '.env'
        case_sensitive = False

API Key Rotation Support:

import asyncio
from datetime import datetime, timedelta

class APIKeyManager:
    def __init__(self):
        self.keys = {}
        self.rotation_schedule = {}
    
    async def get_api_key(self, service: str) -> str:
        """Get current API key for service."""
        if service in self.rotation_schedule:
            if datetime.now() > self.rotation_schedule[service]:
                await self.rotate_key(service)
        
        return self.keys.get(service)
    
    async def rotate_key(self, service: str):
        """Rotate API key for service."""
        # Implementation depends on service
        pass

HTTP Request Security

Secure HTTP Client Configuration:

import aiohttp
import ssl

class SecureHTTPClient:
    def __init__(self):
        # Create secure SSL context
        ssl_context = ssl.create_default_context()
        ssl_context.check_hostname = True
        ssl_context.verify_mode = ssl.CERT_REQUIRED
        
        # Configure timeouts
        timeout = aiohttp.ClientTimeout(
            total=30,
            connect=10,
            sock_read=10
        )
        
        # Create session with security settings
        connector = aiohttp.TCPConnector(
            ssl=ssl_context,
            limit=100,
            limit_per_host=30
        )
        
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout,
            headers={
                'User-Agent': 'UtilsBot+/2.0 (+https://github.com/ad1107/utils-bot-plus)'
            }
        )
    
    async def get(self, url: str, **kwargs):
        """Secure GET request."""
        async with self.session.get(url, **kwargs) as response:
            return response

Request Sanitization:

async def safe_api_request(url: str, params: dict = None) -> dict:
    """Make safe API request with validation."""
    # Validate URL
    is_valid, validated_url = validate_url(url)
    if not is_valid:
        raise ValueError(f"Invalid URL: {validated_url}")
    
    # Sanitize parameters
    if params:
        params = {k: sanitize_text_input(str(v)) for k, v in params.items()}
    
    try:
        async with secure_http_client.get(validated_url, params=params) as response:
            if response.status == 200:
                return await response.json()
            else:
                raise APIError(f"API returned status {response.status}")
    
    except asyncio.TimeoutError:
        raise APIError("Request timed out")
    except Exception as e:
        logger.error(f"API request failed: {e}")
        raise APIError("External service unavailable")

Data Protection

Sensitive Data Handling

Password and Secret Handling:

import secrets
import hashlib
from cryptography.fernet import Fernet

class SecretManager:
    def __init__(self, secret_key: str):
        self.cipher = Fernet(secret_key.encode())
    
    def encrypt_secret(self, secret: str) -> str:
        """Encrypt sensitive data."""
        return self.cipher.encrypt(secret.encode()).decode()
    
    def decrypt_secret(self, encrypted: str) -> str:
        """Decrypt sensitive data."""
        return self.cipher.decrypt(encrypted.encode()).decode()
    
    @staticmethod
    def generate_secure_password(length: int = 16) -> str:
        """Generate cryptographically secure password."""
        alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*"
        return ''.join(secrets.choice(alphabet) for _ in range(length))
    
    @staticmethod
    def hash_data(data: str) -> str:
        """Hash data with salt."""
        salt = secrets.token_hex(16)
        hash_obj = hashlib.pbkdf2_hmac('sha256', data.encode(), salt.encode(), 100000)
        return f"{salt}:{hash_obj.hex()}"

Database Security:

from sqlalchemy import event
from sqlalchemy.engine import Engine

# Encrypt sensitive columns
class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    discord_id = Column(String(20), unique=True, nullable=False)
    username = Column(String(100))
    
    # Encrypted fields
    _encrypted_preferences = Column('preferences', Text)
    
    @property
    def preferences(self) -> dict:
        if self._encrypted_preferences:
            return json.loads(secret_manager.decrypt_secret(self._encrypted_preferences))
        return {}
    
    @preferences.setter
    def preferences(self, value: dict):
        self._encrypted_preferences = secret_manager.encrypt_secret(json.dumps(value))

# Database connection security
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
    """Set secure SQLite pragmas."""
    if 'sqlite' in str(dbapi_connection):
        cursor = dbapi_connection.cursor()
        # Enable foreign key constraints
        cursor.execute("PRAGMA foreign_keys=ON")
        # Set secure temp store
        cursor.execute("PRAGMA secure_delete=ON")
        cursor.close()

Privacy Protection

Data Minimization:

class PrivacyManager:
    @staticmethod
    def anonymize_user_data(user_data: dict) -> dict:
        """Anonymize user data for logging."""
        sensitive_fields = ['discord_id', 'username', 'email']
        anonymized = user_data.copy()
        
        for field in sensitive_fields:
            if field in anonymized:
                anonymized[field] = f"***{anonymized[field][-4:]}"
        
        return anonymized
    
    @staticmethod
    def should_log_command(command_name: str) -> bool:
        """Determine if command should be logged."""
        # Don't log sensitive commands
        sensitive_commands = ['totp', 'password']
        return command_name not in sensitive_commands

GDPR Compliance:

async def handle_data_request(user_id: str, request_type: str):
    """Handle GDPR data requests."""
    async with bot.db.async_session() as session:
        if request_type == "export":
            # Export all user data
            user_data = await get_all_user_data(session, user_id)
            return format_data_export(user_data)
        
        elif request_type == "delete":
            # Delete all user data
            await delete_user_data(session, user_id)
            return "Data deleted successfully"

Deployment Security

Environment Security

Secure Environment Configuration:

# .env file security
chmod 600 .env  # Restrict file permissions

# Environment variables validation
export BOT_TOKEN="your_secure_token_here"
export SECRET_KEY="$(openssl rand -hex 32)"  # Generate secure key
export DATABASE_URL="postgresql://user:pass@localhost/db"

# Production environment
export DEBUG=false
export LOG_LEVEL=WARNING
export SENTRY_DSN="your_sentry_dsn"

Docker Security:

# Dockerfile security best practices
FROM python:3.11-slim

# Create non-root user
RUN useradd --create-home --shell /bin/bash botuser

# Set working directory
WORKDIR /app

# Copy requirements first (layer caching)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Change ownership and switch to non-root user
RUN chown -R botuser:botuser /app
USER botuser

# Remove unnecessary packages
RUN apt-get autoremove -y && apt-get clean

# Health check
HEALTHCHECK --interval=30s --timeout=3s --retries=3 \
  CMD python -c "import requests; requests.get('http://localhost:8000/health')"

CMD ["python", "main.py"]

Docker Compose Security:

version: '3.8'

services:
  bot:
    build: .
    restart: unless-stopped
    read_only: true  # Read-only filesystem
    tmpfs:
      - /tmp
    cap_drop:
      - ALL  # Drop all capabilities
    cap_add:
      - CHOWN  # Only required capabilities
    security_opt:
      - no-new-privileges:true
    environment:
      - BOT_TOKEN_FILE=/run/secrets/bot_token
    secrets:
      - bot_token
    networks:
      - bot-network

secrets:
  bot_token:
    external: true

networks:
  bot-network:
    driver: bridge
    internal: true  # No external access

Server Hardening

System Security:

# Firewall configuration
ufw enable
ufw default deny incoming
ufw default allow outgoing
ufw allow ssh

# SSH hardening
sed -i 's/#PermitRootLogin yes/PermitRootLogin no/' /etc/ssh/sshd_config
sed -i 's/#PasswordAuthentication yes/PasswordAuthentication no/' /etc/ssh/sshd_config

# Install security updates
apt update && apt upgrade -y
apt install unattended-upgrades -y

# Install fail2ban
apt install fail2ban -y
systemctl enable fail2ban

Monitoring & Auditing

Security Logging

Comprehensive Audit Trail:

import structlog
from datetime import datetime

class SecurityLogger:
    def __init__(self):
        self.logger = structlog.get_logger("security")
    
    async def log_command_execution(self, interaction: discord.Interaction, command: str):
        """Log command execution for audit."""
        await self.logger.info(
            "command_executed",
            user_id=interaction.user.id,
            username=interaction.user.name,
            guild_id=interaction.guild_id if interaction.guild else None,
            command=command,
            timestamp=datetime.utcnow().isoformat(),
            channel_id=interaction.channel_id
        )
    
    async def log_security_event(self, event_type: str, details: dict):
        """Log security-related events."""
        await self.logger.warning(
            "security_event",
            event_type=event_type,
            details=details,
            timestamp=datetime.utcnow().isoformat()
        )
    
    async def log_failed_permission_check(self, user_id: int, command: str, reason: str):
        """Log failed permission checks."""
        await self.logger.warning(
            "permission_denied",
            user_id=user_id,
            command=command,
            reason=reason,
            timestamp=datetime.utcnow().isoformat()
        )

Anomaly Detection:

class AnomalyDetector:
    def __init__(self):
        self.user_patterns = {}
    
    async def check_unusual_activity(self, user_id: str, command: str) -> bool:
        """Detect unusual user activity patterns."""
        now = datetime.utcnow()
        
        if user_id not in self.user_patterns:
            self.user_patterns[user_id] = {
                'commands': [],
                'last_activity': now
            }
        
        pattern = self.user_patterns[user_id]
        pattern['commands'].append((command, now))
        
        # Clean old commands (last hour)
        hour_ago = now - timedelta(hours=1)
        pattern['commands'] = [
            (cmd, time) for cmd, time in pattern['commands'] 
            if time > hour_ago
        ]
        
        # Check for suspicious patterns
        if len(pattern['commands']) > 100:  # Too many commands
            await security_logger.log_security_event(
                "suspicious_activity",
                {"user_id": user_id, "command_count": len(pattern['commands'])}
            )
            return True
        
        return False

Error Monitoring

Sentry Integration:

import sentry_sdk
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration

def setup_error_monitoring():
    """Configure error monitoring."""
    if settings.sentry_dsn:
        sentry_sdk.init(
            dsn=settings.sentry_dsn,
            integrations=[
                AioHttpIntegration(),
                SqlalchemyIntegration(),
            ],
            traces_sample_rate=0.1,
            profiles_sample_rate=0.1,
            before_send=filter_sensitive_data
        )

def filter_sensitive_data(event, hint):
    """Filter sensitive data from error reports."""
    # Remove sensitive environment variables
    if 'environment' in event:
        sensitive_vars = ['BOT_TOKEN', 'SECRET_KEY', 'API_KEY']
        for var in sensitive_vars:
            if var in event['environment']:
                event['environment'][var] = '[REDACTED]'
    
    return event

Security Best Practices

Development Security

Secure Coding Practices:

  1. Input Validation: Validate all user inputs
  2. Output Encoding: Escape output to prevent injection
  3. Error Handling: Don't expose internal details
  4. Secrets Management: Never hardcode secrets
  5. Dependency Updates: Keep dependencies current
  6. Code Review: Review all code changes

Pre-commit Security Checks:

# .pre-commit-config.yaml
repos:
  - repo: https://github.com/PyCQA/bandit
    rev: '1.7.4'
    hooks:
      - id: bandit
        args: ['-r', '.']
  
  - repo: https://github.com/Yelp/detect-secrets
    rev: v1.4.0
    hooks:
      - id: detect-secrets
        args: ['--baseline', '.secrets.baseline']

Production Security

Security Checklist:

  • All secrets in environment variables
  • SSL/TLS enabled for all connections
  • Database connections encrypted
  • Regular security updates applied
  • Monitoring and alerting configured
  • Backup and recovery tested
  • Access controls properly configured
  • Audit logging enabled

Regular Security Tasks:

# Weekly security update script
#!/bin/bash
apt update && apt list --upgradable
pip list --outdated
docker images --filter "dangling=true" -q | xargs docker rmi

# Monthly security audit
bandit -r . -f json -o security-report.json
safety check --json --output safety-report.json

🔗 Related Pages


📝 What's Next?

Clone this wiki locally