# Security Guide Comprehensive security implementation guide for UtilsBot+ covering permissions, best practices, and security features. ## πŸ“‹ Table of Contents - [Security Architecture](#security-architecture) - [Access Control](#access-control) - [Input Validation](#input-validation) - [API Security](#api-security) - [Data Protection](#data-protection) - [Deployment Security](#deployment-security) - [Monitoring & Auditing](#monitoring--auditing) - [Security Best Practices](#security-best-practices) --- ## Security Architecture ### Multi-Layer Security Model ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ User Input Layer β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ Discord β”‚ β”‚ Slash Cmd β”‚ β”‚ Rate Limiting β”‚ β”‚ β”‚ β”‚ Validation β”‚ β”‚ Validation β”‚ β”‚ & Cooldowns β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Permission Layer β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ Developer β”‚ β”‚ Whitelist β”‚ β”‚ Guild/User β”‚ β”‚ β”‚ β”‚ Checks β”‚ β”‚ Checks β”‚ β”‚ Permissions β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Application Layer β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ Input β”‚ β”‚ Output β”‚ β”‚ Error β”‚ β”‚ β”‚ β”‚ Sanitizationβ”‚ β”‚ Filtering β”‚ β”‚ Handling β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Data Layer β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ Database β”‚ β”‚ API Key β”‚ β”‚ Secrets β”‚ β”‚ β”‚ β”‚ Protection β”‚ β”‚ Management β”‚ β”‚ Management β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` ### Security Principles 1. **Defense in Depth**: Multiple security layers 2. **Least Privilege**: Minimal required permissions 3. **Input Validation**: All user input sanitized 4. **Secure by Default**: Safe configuration defaults 5. **Fail Securely**: Graceful security failures 6. **Audit Everything**: Complete action logging --- ## Access Control ### Permission Hierarchy ``` Developer (Bot Owner) β”œβ”€β”€ Full system access β”œβ”€β”€ Can execute any command β”œβ”€β”€ Can manage whitelist β”œβ”€β”€ Can reload/sync commands └── Bypass all restrictions Whitelisted User (Beta Access) β”œβ”€β”€ Access to all public commands β”œβ”€β”€ Subject to rate limits β”œβ”€β”€ Cannot access dev commands └── Database tracked usage Public User (Open Access) β”œβ”€β”€ Limited command access β”œβ”€β”€ Subject to rate limits β”œβ”€β”€ No dev commands └── Basic functionality only Blacklisted User β”œβ”€β”€ No command access β”œβ”€β”€ All interactions blocked β”œβ”€β”€ Logged for monitoring └── Cannot use any features ``` ### Implementation **Developer Check Decorator:** ```python def dev_only(): """Restrict command to developers only.""" async def predicate(interaction: discord.Interaction) -> bool: user_id = interaction.user.id return user_id in settings.dev_ids return app_commands.check(predicate) # Usage @dev_only() @app_commands.command(name="eval", description="Execute Python code") async def eval_command(self, interaction: discord.Interaction, code: str): # Developer-only functionality pass ``` **Whitelist Check Decorator:** ```python def requires_whitelist(): """Check if user is whitelisted (if beta mode enabled).""" async def predicate(interaction: discord.Interaction) -> bool: # Developers always have access if interaction.user.id in settings.dev_ids: return True # If not in closed beta, allow all users if not settings.closed_beta: return True # Check database whitelist async with bot.db.async_session() as session: user = await session.get(User, {'discord_id': str(interaction.user.id)}) return user and user.is_whitelisted and not user.is_blacklisted return app_commands.check(predicate) # Usage @requires_whitelist() @app_commands.command(name="ask", description="Ask AI a question") async def ask_command(self, interaction: discord.Interaction, question: str): # Whitelisted functionality pass ``` **Blacklist Protection:** ```python async def check_blacklist(user_id: int) -> bool: """Check if user is blacklisted.""" async with bot.db.async_session() as session: user = await session.get(User, {'discord_id': str(user_id)}) return user and user.is_blacklisted # Global check in bot core @bot.check async def global_blacklist_check(interaction: discord.Interaction): if await check_blacklist(interaction.user.id): await interaction.response.send_message( "❌ You are blacklisted from using this bot.", ephemeral=True ) return False return True ``` ### Rate Limiting & Cooldowns **Command-Level Rate Limiting:** ```python from discord.ext import commands # Rate limiting decorator @app_commands.command() @cooldown(rate=3, per=60) # 3 commands per 60 seconds async def ai_command(self, interaction: discord.Interaction): # Rate-limited command pass # Global rate limiting @app_commands.command() @cooldown(rate=5, per=60) # Default: 5 commands per minute async def general_command(self, interaction: discord.Interaction): pass ``` **API-Specific Rate Limiting:** ```python async def check_api_rate_limit(user_id: str, api_name: str) -> bool: """Check if user has exceeded API rate limits.""" async with bot.db.async_session() as session: usage = await session.get(APIUsage, { 'user_discord_id': user_id, 'api_name': api_name }) if not usage: return True # First time usage # Check daily limits if usage.daily_reset_date < date.today(): usage.daily_usage_count = 0 usage.daily_reset_date = date.today() # API-specific limits daily_limits = { 'gemini': 50, 'screenshot': 20, 'ip-api': 100 } limit = daily_limits.get(api_name, 10) return usage.daily_usage_count < limit ``` --- ## Input Validation ### User Input Sanitization **URL Validation:** ```python import re from urllib.parse import urlparse def validate_url(url: str) -> tuple[bool, str]: """Validate and sanitize URLs.""" try: # Basic format validation if not url.startswith(('http://', 'https://')): return False, "URL must start with http:// or https://" parsed = urlparse(url) # Check for valid domain if not parsed.netloc: return False, "Invalid domain in URL" # Prevent local/private IPs if is_private_ip(parsed.hostname): return False, "Private IP addresses are not allowed" # Length limits if len(url) > 2000: return False, "URL too long (max 2000 characters)" return True, url except Exception as e: return False, f"Invalid URL format: {str(e)}" def is_private_ip(hostname: str) -> bool: """Check if hostname resolves to private IP.""" import ipaddress import socket try: ip = socket.gethostbyname(hostname) ip_obj = ipaddress.ip_address(ip) return ip_obj.is_private or ip_obj.is_loopback except: return False ``` **Text Input Sanitization:** ```python import html import re def sanitize_text_input(text: str, max_length: int = 2000) -> str: """Sanitize user text input.""" # HTML escape text = html.escape(text) # Remove potentially dangerous patterns text = re.sub(r'<[^>]*>', '', text) # Remove HTML tags text = re.sub(r'javascript:', '', text, flags=re.IGNORECASE) # Remove JS text = re.sub(r'data:', '', text, flags=re.IGNORECASE) # Remove data URLs # Length limits if len(text) > max_length: text = text[:max_length] + "..." # Remove excessive whitespace text = re.sub(r'\s+', ' ', text).strip() return text ``` **TOTP Secret Validation:** ```python import base64 import re def validate_totp_secret(secret: str) -> tuple[bool, str]: """Validate TOTP secret format.""" try: # Remove whitespace and convert to uppercase secret = re.sub(r'\s', '', secret.upper()) # Check base32 format if not re.match(r'^[A-Z2-7]+=*$', secret): return False, "Invalid base32 format" # Check length (typically 16 or 32 characters) if len(secret) < 16 or len(secret) > 64: return False, "Secret length must be between 16-64 characters" # Try to decode to validate base64.b32decode(secret) return True, secret except Exception as e: return False, f"Invalid TOTP secret: {str(e)}" ``` ### Parameter Validation **Slash Command Parameter Validation:** ```python from discord import app_commands from typing import Literal @app_commands.command() @app_commands.describe( text="Text to encode/decode (max 2000 chars)", operation="Choose encode or decode" ) async def base64_command( self, interaction: discord.Interaction, text: app_commands.Range[str, 1, 2000], # Length validation operation: Literal["encode", "decode"] # Choice validation ): # Parameter validation handled by Discord pass # Custom transformers for complex validation class URLTransformer(app_commands.Transformer): async def transform(self, interaction: discord.Interaction, value: str) -> str: is_valid, result = validate_url(value) if not is_valid: raise app_commands.TransformerError(result) return result @app_commands.command() async def screenshot_command( self, interaction: discord.Interaction, url: app_commands.Transform[str, URLTransformer] ): # URL is pre-validated pass ``` --- ## API Security ### External API Key Management **Environment-Based Configuration:** ```python from pydantic import BaseSettings, validator from typing import Optional class Settings(BaseSettings): # Required API keys bot_token: str gemini_api_key: str # Optional API keys screenshot_api_key: Optional[str] = None rapidapi_key: Optional[str] = None sentry_dsn: Optional[str] = None @validator('bot_token') def validate_bot_token(cls, v): if not v or len(v) < 50: raise ValueError('Invalid Discord bot token') return v @validator('gemini_api_key') def validate_gemini_key(cls, v): if not v or not v.startswith('AI'): raise ValueError('Invalid Gemini API key format') return v class Config: env_file = '.env' case_sensitive = False ``` **API Key Rotation Support:** ```python import asyncio from datetime import datetime, timedelta class APIKeyManager: def __init__(self): self.keys = {} self.rotation_schedule = {} async def get_api_key(self, service: str) -> str: """Get current API key for service.""" if service in self.rotation_schedule: if datetime.now() > self.rotation_schedule[service]: await self.rotate_key(service) return self.keys.get(service) async def rotate_key(self, service: str): """Rotate API key for service.""" # Implementation depends on service pass ``` ### HTTP Request Security **Secure HTTP Client Configuration:** ```python import aiohttp import ssl class SecureHTTPClient: def __init__(self): # Create secure SSL context ssl_context = ssl.create_default_context() ssl_context.check_hostname = True ssl_context.verify_mode = ssl.CERT_REQUIRED # Configure timeouts timeout = aiohttp.ClientTimeout( total=30, connect=10, sock_read=10 ) # Create session with security settings connector = aiohttp.TCPConnector( ssl=ssl_context, limit=100, limit_per_host=30 ) self.session = aiohttp.ClientSession( connector=connector, timeout=timeout, headers={ 'User-Agent': 'UtilsBot+/2.0 (+https://github.com/ad1107/utils-bot-plus)' } ) async def get(self, url: str, **kwargs): """Secure GET request.""" async with self.session.get(url, **kwargs) as response: return response ``` **Request Sanitization:** ```python async def safe_api_request(url: str, params: dict = None) -> dict: """Make safe API request with validation.""" # Validate URL is_valid, validated_url = validate_url(url) if not is_valid: raise ValueError(f"Invalid URL: {validated_url}") # Sanitize parameters if params: params = {k: sanitize_text_input(str(v)) for k, v in params.items()} try: async with secure_http_client.get(validated_url, params=params) as response: if response.status == 200: return await response.json() else: raise APIError(f"API returned status {response.status}") except asyncio.TimeoutError: raise APIError("Request timed out") except Exception as e: logger.error(f"API request failed: {e}") raise APIError("External service unavailable") ``` --- ## Data Protection ### Sensitive Data Handling **Password and Secret Handling:** ```python import secrets import hashlib from cryptography.fernet import Fernet class SecretManager: def __init__(self, secret_key: str): self.cipher = Fernet(secret_key.encode()) def encrypt_secret(self, secret: str) -> str: """Encrypt sensitive data.""" return self.cipher.encrypt(secret.encode()).decode() def decrypt_secret(self, encrypted: str) -> str: """Decrypt sensitive data.""" return self.cipher.decrypt(encrypted.encode()).decode() @staticmethod def generate_secure_password(length: int = 16) -> str: """Generate cryptographically secure password.""" alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*" return ''.join(secrets.choice(alphabet) for _ in range(length)) @staticmethod def hash_data(data: str) -> str: """Hash data with salt.""" salt = secrets.token_hex(16) hash_obj = hashlib.pbkdf2_hmac('sha256', data.encode(), salt.encode(), 100000) return f"{salt}:{hash_obj.hex()}" ``` **Database Security:** ```python from sqlalchemy import event from sqlalchemy.engine import Engine # Encrypt sensitive columns class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) discord_id = Column(String(20), unique=True, nullable=False) username = Column(String(100)) # Encrypted fields _encrypted_preferences = Column('preferences', Text) @property def preferences(self) -> dict: if self._encrypted_preferences: return json.loads(secret_manager.decrypt_secret(self._encrypted_preferences)) return {} @preferences.setter def preferences(self, value: dict): self._encrypted_preferences = secret_manager.encrypt_secret(json.dumps(value)) # Database connection security @event.listens_for(Engine, "connect") def set_sqlite_pragma(dbapi_connection, connection_record): """Set secure SQLite pragmas.""" if 'sqlite' in str(dbapi_connection): cursor = dbapi_connection.cursor() # Enable foreign key constraints cursor.execute("PRAGMA foreign_keys=ON") # Set secure temp store cursor.execute("PRAGMA secure_delete=ON") cursor.close() ``` ### Privacy Protection **Data Minimization:** ```python class PrivacyManager: @staticmethod def anonymize_user_data(user_data: dict) -> dict: """Anonymize user data for logging.""" sensitive_fields = ['discord_id', 'username', 'email'] anonymized = user_data.copy() for field in sensitive_fields: if field in anonymized: anonymized[field] = f"***{anonymized[field][-4:]}" return anonymized @staticmethod def should_log_command(command_name: str) -> bool: """Determine if command should be logged.""" # Don't log sensitive commands sensitive_commands = ['totp', 'password'] return command_name not in sensitive_commands ``` **GDPR Compliance:** ```python async def handle_data_request(user_id: str, request_type: str): """Handle GDPR data requests.""" async with bot.db.async_session() as session: if request_type == "export": # Export all user data user_data = await get_all_user_data(session, user_id) return format_data_export(user_data) elif request_type == "delete": # Delete all user data await delete_user_data(session, user_id) return "Data deleted successfully" ``` --- ## Deployment Security ### Environment Security **Secure Environment Configuration:** ```bash # .env file security chmod 600 .env # Restrict file permissions # Environment variables validation export BOT_TOKEN="your_secure_token_here" export SECRET_KEY="$(openssl rand -hex 32)" # Generate secure key export DATABASE_URL="postgresql://user:pass@localhost/db" # Production environment export DEBUG=false export LOG_LEVEL=WARNING export SENTRY_DSN="your_sentry_dsn" ``` **Docker Security:** ```dockerfile # Dockerfile security best practices FROM python:3.11-slim # Create non-root user RUN useradd --create-home --shell /bin/bash botuser # Set working directory WORKDIR /app # Copy requirements first (layer caching) COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Copy application code COPY . . # Change ownership and switch to non-root user RUN chown -R botuser:botuser /app USER botuser # Remove unnecessary packages RUN apt-get autoremove -y && apt-get clean # Health check HEALTHCHECK --interval=30s --timeout=3s --retries=3 \ CMD python -c "import requests; requests.get('http://localhost:8000/health')" CMD ["python", "main.py"] ``` **Docker Compose Security:** ```yaml version: '3.8' services: bot: build: . restart: unless-stopped read_only: true # Read-only filesystem tmpfs: - /tmp cap_drop: - ALL # Drop all capabilities cap_add: - CHOWN # Only required capabilities security_opt: - no-new-privileges:true environment: - BOT_TOKEN_FILE=/run/secrets/bot_token secrets: - bot_token networks: - bot-network secrets: bot_token: external: true networks: bot-network: driver: bridge internal: true # No external access ``` ### Server Hardening **System Security:** ```bash # Firewall configuration ufw enable ufw default deny incoming ufw default allow outgoing ufw allow ssh # SSH hardening sed -i 's/#PermitRootLogin yes/PermitRootLogin no/' /etc/ssh/sshd_config sed -i 's/#PasswordAuthentication yes/PasswordAuthentication no/' /etc/ssh/sshd_config # Install security updates apt update && apt upgrade -y apt install unattended-upgrades -y # Install fail2ban apt install fail2ban -y systemctl enable fail2ban ``` --- ## Monitoring & Auditing ### Security Logging **Comprehensive Audit Trail:** ```python import structlog from datetime import datetime class SecurityLogger: def __init__(self): self.logger = structlog.get_logger("security") async def log_command_execution(self, interaction: discord.Interaction, command: str): """Log command execution for audit.""" await self.logger.info( "command_executed", user_id=interaction.user.id, username=interaction.user.name, guild_id=interaction.guild_id if interaction.guild else None, command=command, timestamp=datetime.utcnow().isoformat(), channel_id=interaction.channel_id ) async def log_security_event(self, event_type: str, details: dict): """Log security-related events.""" await self.logger.warning( "security_event", event_type=event_type, details=details, timestamp=datetime.utcnow().isoformat() ) async def log_failed_permission_check(self, user_id: int, command: str, reason: str): """Log failed permission checks.""" await self.logger.warning( "permission_denied", user_id=user_id, command=command, reason=reason, timestamp=datetime.utcnow().isoformat() ) ``` **Anomaly Detection:** ```python class AnomalyDetector: def __init__(self): self.user_patterns = {} async def check_unusual_activity(self, user_id: str, command: str) -> bool: """Detect unusual user activity patterns.""" now = datetime.utcnow() if user_id not in self.user_patterns: self.user_patterns[user_id] = { 'commands': [], 'last_activity': now } pattern = self.user_patterns[user_id] pattern['commands'].append((command, now)) # Clean old commands (last hour) hour_ago = now - timedelta(hours=1) pattern['commands'] = [ (cmd, time) for cmd, time in pattern['commands'] if time > hour_ago ] # Check for suspicious patterns if len(pattern['commands']) > 100: # Too many commands await security_logger.log_security_event( "suspicious_activity", {"user_id": user_id, "command_count": len(pattern['commands'])} ) return True return False ``` ### Error Monitoring **Sentry Integration:** ```python import sentry_sdk from sentry_sdk.integrations.aiohttp import AioHttpIntegration from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration def setup_error_monitoring(): """Configure error monitoring.""" if settings.sentry_dsn: sentry_sdk.init( dsn=settings.sentry_dsn, integrations=[ AioHttpIntegration(), SqlalchemyIntegration(), ], traces_sample_rate=0.1, profiles_sample_rate=0.1, before_send=filter_sensitive_data ) def filter_sensitive_data(event, hint): """Filter sensitive data from error reports.""" # Remove sensitive environment variables if 'environment' in event: sensitive_vars = ['BOT_TOKEN', 'SECRET_KEY', 'API_KEY'] for var in sensitive_vars: if var in event['environment']: event['environment'][var] = '[REDACTED]' return event ``` --- ## Security Best Practices ### Development Security **Secure Coding Practices:** 1. **Input Validation**: Validate all user inputs 2. **Output Encoding**: Escape output to prevent injection 3. **Error Handling**: Don't expose internal details 4. **Secrets Management**: Never hardcode secrets 5. **Dependency Updates**: Keep dependencies current 6. **Code Review**: Review all code changes **Pre-commit Security Checks:** ```yaml # .pre-commit-config.yaml repos: - repo: https://github.com/PyCQA/bandit rev: '1.7.4' hooks: - id: bandit args: ['-r', '.'] - repo: https://github.com/Yelp/detect-secrets rev: v1.4.0 hooks: - id: detect-secrets args: ['--baseline', '.secrets.baseline'] ``` ### Production Security **Security Checklist:** - [ ] All secrets in environment variables - [ ] SSL/TLS enabled for all connections - [ ] Database connections encrypted - [ ] Regular security updates applied - [ ] Monitoring and alerting configured - [ ] Backup and recovery tested - [ ] Access controls properly configured - [ ] Audit logging enabled **Regular Security Tasks:** ```bash # Weekly security update script #!/bin/bash apt update && apt list --upgradable pip list --outdated docker images --filter "dangling=true" -q | xargs docker rmi # Monthly security audit bandit -r . -f json -o security-report.json safety check --json --output safety-report.json ``` --- ## πŸ”— Related Pages - **[Deployment Guide](Deployment-Guide)** - Secure deployment practices - **[Configuration Guide](Configuration-Guide)** - Security configuration - **[Troubleshooting](Troubleshooting)** - Security issue resolution - **[Developer Guide](Developer-Guide)** - Secure development practices --- ## πŸ“ What's Next? - [Configure secure deployment](Deployment-Guide#security-considerations) - [Set up monitoring and alerts](Troubleshooting#monitoring) - [Review configuration security](Configuration-Guide#security-settings) - [Learn about development security](Developer-Guide#security-best-practices)