-
Notifications
You must be signed in to change notification settings - Fork 0
11. Security Guide
Comprehensive security implementation guide for UtilsBot+ covering permissions, best practices, and security features.
- Security Architecture
- Access Control
- Input Validation
- API Security
- Data Protection
- Deployment Security
- Monitoring & Auditing
- Security Best Practices
┌─────────────────────────────────────────────────────────────┐
│ User Input Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Discord │ │ Slash Cmd │ │ Rate Limiting │ │
│ │ Validation │ │ Validation │ │ & Cooldowns │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
└──────────────────────┬──────────────────────────────────────┘
│
┌──────────────────────▼──────────────────────────────────────┐
│ Permission Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Developer │ │ Whitelist │ │ Guild/User │ │
│ │ Checks │ │ Checks │ │ Permissions │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
└──────────────────────┬──────────────────────────────────────┘
│
┌──────────────────────▼──────────────────────────────────────┐
│ Application Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Input │ │ Output │ │ Error │ │
│ │ Sanitization│ │ Filtering │ │ Handling │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
└──────────────────────┬──────────────────────────────────────┘
│
┌──────────────────────▼──────────────────────────────────────┐
│ Data Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Database │ │ API Key │ │ Secrets │ │
│ │ Protection │ │ Management │ │ Management │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
- Defense in Depth: Multiple security layers
- Least Privilege: Minimal required permissions
- Input Validation: All user input sanitized
- Secure by Default: Safe configuration defaults
- Fail Securely: Graceful security failures
- Audit Everything: Complete action logging
Developer (Bot Owner)
├── Full system access
├── Can execute any command
├── Can manage whitelist
├── Can reload/sync commands
└── Bypass all restrictions
Whitelisted User (Beta Access)
├── Access to all public commands
├── Subject to rate limits
├── Cannot access dev commands
└── Database tracked usage
Public User (Open Access)
├── Limited command access
├── Subject to rate limits
├── No dev commands
└── Basic functionality only
Blacklisted User
├── No command access
├── All interactions blocked
├── Logged for monitoring
└── Cannot use any features
Developer Check Decorator:
def dev_only():
"""Restrict command to developers only."""
async def predicate(interaction: discord.Interaction) -> bool:
user_id = interaction.user.id
return user_id in settings.dev_ids
return app_commands.check(predicate)
# Usage
@dev_only()
@app_commands.command(name="eval", description="Execute Python code")
async def eval_command(self, interaction: discord.Interaction, code: str):
# Developer-only functionality
pass
Whitelist Check Decorator:
def requires_whitelist():
"""Check if user is whitelisted (if beta mode enabled)."""
async def predicate(interaction: discord.Interaction) -> bool:
# Developers always have access
if interaction.user.id in settings.dev_ids:
return True
# If not in closed beta, allow all users
if not settings.closed_beta:
return True
# Check database whitelist
async with bot.db.async_session() as session:
user = await session.get(User, {'discord_id': str(interaction.user.id)})
return user and user.is_whitelisted and not user.is_blacklisted
return app_commands.check(predicate)
# Usage
@requires_whitelist()
@app_commands.command(name="ask", description="Ask AI a question")
async def ask_command(self, interaction: discord.Interaction, question: str):
# Whitelisted functionality
pass
Blacklist Protection:
async def check_blacklist(user_id: int) -> bool:
"""Check if user is blacklisted."""
async with bot.db.async_session() as session:
user = await session.get(User, {'discord_id': str(user_id)})
return user and user.is_blacklisted
# Global check in bot core
@bot.check
async def global_blacklist_check(interaction: discord.Interaction):
if await check_blacklist(interaction.user.id):
await interaction.response.send_message(
"❌ You are blacklisted from using this bot.",
ephemeral=True
)
return False
return True
Command-Level Rate Limiting:
from discord.ext import commands
# Rate limiting decorator
@app_commands.command()
@cooldown(rate=3, per=60) # 3 commands per 60 seconds
async def ai_command(self, interaction: discord.Interaction):
# Rate-limited command
pass
# Global rate limiting
@app_commands.command()
@cooldown(rate=5, per=60) # Default: 5 commands per minute
async def general_command(self, interaction: discord.Interaction):
pass
API-Specific Rate Limiting:
async def check_api_rate_limit(user_id: str, api_name: str) -> bool:
"""Check if user has exceeded API rate limits."""
async with bot.db.async_session() as session:
usage = await session.get(APIUsage, {
'user_discord_id': user_id,
'api_name': api_name
})
if not usage:
return True # First time usage
# Check daily limits
if usage.daily_reset_date < date.today():
usage.daily_usage_count = 0
usage.daily_reset_date = date.today()
# API-specific limits
daily_limits = {
'gemini': 50,
'screenshot': 20,
'ip-api': 100
}
limit = daily_limits.get(api_name, 10)
return usage.daily_usage_count < limit
URL Validation:
import re
from urllib.parse import urlparse
def validate_url(url: str) -> tuple[bool, str]:
"""Validate and sanitize URLs."""
try:
# Basic format validation
if not url.startswith(('http://', 'https://')):
return False, "URL must start with http:// or https://"
parsed = urlparse(url)
# Check for valid domain
if not parsed.netloc:
return False, "Invalid domain in URL"
# Prevent local/private IPs
if is_private_ip(parsed.hostname):
return False, "Private IP addresses are not allowed"
# Length limits
if len(url) > 2000:
return False, "URL too long (max 2000 characters)"
return True, url
except Exception as e:
return False, f"Invalid URL format: {str(e)}"
def is_private_ip(hostname: str) -> bool:
"""Check if hostname resolves to private IP."""
import ipaddress
import socket
try:
ip = socket.gethostbyname(hostname)
ip_obj = ipaddress.ip_address(ip)
return ip_obj.is_private or ip_obj.is_loopback
except:
return False
Text Input Sanitization:
import html
import re
def sanitize_text_input(text: str, max_length: int = 2000) -> str:
"""Sanitize user text input."""
# HTML escape
text = html.escape(text)
# Remove potentially dangerous patterns
text = re.sub(r'<[^>]*>', '', text) # Remove HTML tags
text = re.sub(r'javascript:', '', text, flags=re.IGNORECASE) # Remove JS
text = re.sub(r'data:', '', text, flags=re.IGNORECASE) # Remove data URLs
# Length limits
if len(text) > max_length:
text = text[:max_length] + "..."
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text).strip()
return text
TOTP Secret Validation:
import base64
import re
def validate_totp_secret(secret: str) -> tuple[bool, str]:
"""Validate TOTP secret format."""
try:
# Remove whitespace and convert to uppercase
secret = re.sub(r'\s', '', secret.upper())
# Check base32 format
if not re.match(r'^[A-Z2-7]+=*$', secret):
return False, "Invalid base32 format"
# Check length (typically 16 or 32 characters)
if len(secret) < 16 or len(secret) > 64:
return False, "Secret length must be between 16-64 characters"
# Try to decode to validate
base64.b32decode(secret)
return True, secret
except Exception as e:
return False, f"Invalid TOTP secret: {str(e)}"
Slash Command Parameter Validation:
from discord import app_commands
from typing import Literal
@app_commands.command()
@app_commands.describe(
text="Text to encode/decode (max 2000 chars)",
operation="Choose encode or decode"
)
async def base64_command(
self,
interaction: discord.Interaction,
text: app_commands.Range[str, 1, 2000], # Length validation
operation: Literal["encode", "decode"] # Choice validation
):
# Parameter validation handled by Discord
pass
# Custom transformers for complex validation
class URLTransformer(app_commands.Transformer):
async def transform(self, interaction: discord.Interaction, value: str) -> str:
is_valid, result = validate_url(value)
if not is_valid:
raise app_commands.TransformerError(result)
return result
@app_commands.command()
async def screenshot_command(
self,
interaction: discord.Interaction,
url: app_commands.Transform[str, URLTransformer]
):
# URL is pre-validated
pass
Environment-Based Configuration:
from pydantic import BaseSettings, validator
from typing import Optional
class Settings(BaseSettings):
# Required API keys
bot_token: str
gemini_api_key: str
# Optional API keys
screenshot_api_key: Optional[str] = None
rapidapi_key: Optional[str] = None
sentry_dsn: Optional[str] = None
@validator('bot_token')
def validate_bot_token(cls, v):
if not v or len(v) < 50:
raise ValueError('Invalid Discord bot token')
return v
@validator('gemini_api_key')
def validate_gemini_key(cls, v):
if not v or not v.startswith('AI'):
raise ValueError('Invalid Gemini API key format')
return v
class Config:
env_file = '.env'
case_sensitive = False
API Key Rotation Support:
import asyncio
from datetime import datetime, timedelta
class APIKeyManager:
def __init__(self):
self.keys = {}
self.rotation_schedule = {}
async def get_api_key(self, service: str) -> str:
"""Get current API key for service."""
if service in self.rotation_schedule:
if datetime.now() > self.rotation_schedule[service]:
await self.rotate_key(service)
return self.keys.get(service)
async def rotate_key(self, service: str):
"""Rotate API key for service."""
# Implementation depends on service
pass
Secure HTTP Client Configuration:
import aiohttp
import ssl
class SecureHTTPClient:
def __init__(self):
# Create secure SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = True
ssl_context.verify_mode = ssl.CERT_REQUIRED
# Configure timeouts
timeout = aiohttp.ClientTimeout(
total=30,
connect=10,
sock_read=10
)
# Create session with security settings
connector = aiohttp.TCPConnector(
ssl=ssl_context,
limit=100,
limit_per_host=30
)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={
'User-Agent': 'UtilsBot+/2.0 (+https://github.com/ad1107/utils-bot-plus)'
}
)
async def get(self, url: str, **kwargs):
"""Secure GET request."""
async with self.session.get(url, **kwargs) as response:
return response
Request Sanitization:
async def safe_api_request(url: str, params: dict = None) -> dict:
"""Make safe API request with validation."""
# Validate URL
is_valid, validated_url = validate_url(url)
if not is_valid:
raise ValueError(f"Invalid URL: {validated_url}")
# Sanitize parameters
if params:
params = {k: sanitize_text_input(str(v)) for k, v in params.items()}
try:
async with secure_http_client.get(validated_url, params=params) as response:
if response.status == 200:
return await response.json()
else:
raise APIError(f"API returned status {response.status}")
except asyncio.TimeoutError:
raise APIError("Request timed out")
except Exception as e:
logger.error(f"API request failed: {e}")
raise APIError("External service unavailable")
Password and Secret Handling:
import secrets
import hashlib
from cryptography.fernet import Fernet
class SecretManager:
def __init__(self, secret_key: str):
self.cipher = Fernet(secret_key.encode())
def encrypt_secret(self, secret: str) -> str:
"""Encrypt sensitive data."""
return self.cipher.encrypt(secret.encode()).decode()
def decrypt_secret(self, encrypted: str) -> str:
"""Decrypt sensitive data."""
return self.cipher.decrypt(encrypted.encode()).decode()
@staticmethod
def generate_secure_password(length: int = 16) -> str:
"""Generate cryptographically secure password."""
alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*"
return ''.join(secrets.choice(alphabet) for _ in range(length))
@staticmethod
def hash_data(data: str) -> str:
"""Hash data with salt."""
salt = secrets.token_hex(16)
hash_obj = hashlib.pbkdf2_hmac('sha256', data.encode(), salt.encode(), 100000)
return f"{salt}:{hash_obj.hex()}"
Database Security:
from sqlalchemy import event
from sqlalchemy.engine import Engine
# Encrypt sensitive columns
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
discord_id = Column(String(20), unique=True, nullable=False)
username = Column(String(100))
# Encrypted fields
_encrypted_preferences = Column('preferences', Text)
@property
def preferences(self) -> dict:
if self._encrypted_preferences:
return json.loads(secret_manager.decrypt_secret(self._encrypted_preferences))
return {}
@preferences.setter
def preferences(self, value: dict):
self._encrypted_preferences = secret_manager.encrypt_secret(json.dumps(value))
# Database connection security
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
"""Set secure SQLite pragmas."""
if 'sqlite' in str(dbapi_connection):
cursor = dbapi_connection.cursor()
# Enable foreign key constraints
cursor.execute("PRAGMA foreign_keys=ON")
# Set secure temp store
cursor.execute("PRAGMA secure_delete=ON")
cursor.close()
Data Minimization:
class PrivacyManager:
@staticmethod
def anonymize_user_data(user_data: dict) -> dict:
"""Anonymize user data for logging."""
sensitive_fields = ['discord_id', 'username', 'email']
anonymized = user_data.copy()
for field in sensitive_fields:
if field in anonymized:
anonymized[field] = f"***{anonymized[field][-4:]}"
return anonymized
@staticmethod
def should_log_command(command_name: str) -> bool:
"""Determine if command should be logged."""
# Don't log sensitive commands
sensitive_commands = ['totp', 'password']
return command_name not in sensitive_commands
GDPR Compliance:
async def handle_data_request(user_id: str, request_type: str):
"""Handle GDPR data requests."""
async with bot.db.async_session() as session:
if request_type == "export":
# Export all user data
user_data = await get_all_user_data(session, user_id)
return format_data_export(user_data)
elif request_type == "delete":
# Delete all user data
await delete_user_data(session, user_id)
return "Data deleted successfully"
Secure Environment Configuration:
# .env file security
chmod 600 .env # Restrict file permissions
# Environment variables validation
export BOT_TOKEN="your_secure_token_here"
export SECRET_KEY="$(openssl rand -hex 32)" # Generate secure key
export DATABASE_URL="postgresql://user:pass@localhost/db"
# Production environment
export DEBUG=false
export LOG_LEVEL=WARNING
export SENTRY_DSN="your_sentry_dsn"
Docker Security:
# Dockerfile security best practices
FROM python:3.11-slim
# Create non-root user
RUN useradd --create-home --shell /bin/bash botuser
# Set working directory
WORKDIR /app
# Copy requirements first (layer caching)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Change ownership and switch to non-root user
RUN chown -R botuser:botuser /app
USER botuser
# Remove unnecessary packages
RUN apt-get autoremove -y && apt-get clean
# Health check
HEALTHCHECK --interval=30s --timeout=3s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health')"
CMD ["python", "main.py"]
Docker Compose Security:
version: '3.8'
services:
bot:
build: .
restart: unless-stopped
read_only: true # Read-only filesystem
tmpfs:
- /tmp
cap_drop:
- ALL # Drop all capabilities
cap_add:
- CHOWN # Only required capabilities
security_opt:
- no-new-privileges:true
environment:
- BOT_TOKEN_FILE=/run/secrets/bot_token
secrets:
- bot_token
networks:
- bot-network
secrets:
bot_token:
external: true
networks:
bot-network:
driver: bridge
internal: true # No external access
System Security:
# Firewall configuration
ufw enable
ufw default deny incoming
ufw default allow outgoing
ufw allow ssh
# SSH hardening
sed -i 's/#PermitRootLogin yes/PermitRootLogin no/' /etc/ssh/sshd_config
sed -i 's/#PasswordAuthentication yes/PasswordAuthentication no/' /etc/ssh/sshd_config
# Install security updates
apt update && apt upgrade -y
apt install unattended-upgrades -y
# Install fail2ban
apt install fail2ban -y
systemctl enable fail2ban
Comprehensive Audit Trail:
import structlog
from datetime import datetime
class SecurityLogger:
def __init__(self):
self.logger = structlog.get_logger("security")
async def log_command_execution(self, interaction: discord.Interaction, command: str):
"""Log command execution for audit."""
await self.logger.info(
"command_executed",
user_id=interaction.user.id,
username=interaction.user.name,
guild_id=interaction.guild_id if interaction.guild else None,
command=command,
timestamp=datetime.utcnow().isoformat(),
channel_id=interaction.channel_id
)
async def log_security_event(self, event_type: str, details: dict):
"""Log security-related events."""
await self.logger.warning(
"security_event",
event_type=event_type,
details=details,
timestamp=datetime.utcnow().isoformat()
)
async def log_failed_permission_check(self, user_id: int, command: str, reason: str):
"""Log failed permission checks."""
await self.logger.warning(
"permission_denied",
user_id=user_id,
command=command,
reason=reason,
timestamp=datetime.utcnow().isoformat()
)
Anomaly Detection:
class AnomalyDetector:
def __init__(self):
self.user_patterns = {}
async def check_unusual_activity(self, user_id: str, command: str) -> bool:
"""Detect unusual user activity patterns."""
now = datetime.utcnow()
if user_id not in self.user_patterns:
self.user_patterns[user_id] = {
'commands': [],
'last_activity': now
}
pattern = self.user_patterns[user_id]
pattern['commands'].append((command, now))
# Clean old commands (last hour)
hour_ago = now - timedelta(hours=1)
pattern['commands'] = [
(cmd, time) for cmd, time in pattern['commands']
if time > hour_ago
]
# Check for suspicious patterns
if len(pattern['commands']) > 100: # Too many commands
await security_logger.log_security_event(
"suspicious_activity",
{"user_id": user_id, "command_count": len(pattern['commands'])}
)
return True
return False
Sentry Integration:
import sentry_sdk
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
def setup_error_monitoring():
"""Configure error monitoring."""
if settings.sentry_dsn:
sentry_sdk.init(
dsn=settings.sentry_dsn,
integrations=[
AioHttpIntegration(),
SqlalchemyIntegration(),
],
traces_sample_rate=0.1,
profiles_sample_rate=0.1,
before_send=filter_sensitive_data
)
def filter_sensitive_data(event, hint):
"""Filter sensitive data from error reports."""
# Remove sensitive environment variables
if 'environment' in event:
sensitive_vars = ['BOT_TOKEN', 'SECRET_KEY', 'API_KEY']
for var in sensitive_vars:
if var in event['environment']:
event['environment'][var] = '[REDACTED]'
return event
Secure Coding Practices:
- Input Validation: Validate all user inputs
- Output Encoding: Escape output to prevent injection
- Error Handling: Don't expose internal details
- Secrets Management: Never hardcode secrets
- Dependency Updates: Keep dependencies current
- Code Review: Review all code changes
Pre-commit Security Checks:
# .pre-commit-config.yaml
repos:
- repo: https://github.com/PyCQA/bandit
rev: '1.7.4'
hooks:
- id: bandit
args: ['-r', '.']
- repo: https://github.com/Yelp/detect-secrets
rev: v1.4.0
hooks:
- id: detect-secrets
args: ['--baseline', '.secrets.baseline']
Security Checklist:
- All secrets in environment variables
- SSL/TLS enabled for all connections
- Database connections encrypted
- Regular security updates applied
- Monitoring and alerting configured
- Backup and recovery tested
- Access controls properly configured
- Audit logging enabled
Regular Security Tasks:
# Weekly security update script
#!/bin/bash
apt update && apt list --upgradable
pip list --outdated
docker images --filter "dangling=true" -q | xargs docker rmi
# Monthly security audit
bandit -r . -f json -o security-report.json
safety check --json --output safety-report.json
- Deployment Guide - Secure deployment practices
- Configuration Guide - Security configuration
- Troubleshooting - Security issue resolution
- Developer Guide - Secure development practices