Traffik provides rate limiting capabilities for starlette-based applications like FastAPI with support for both HTTP and WebSocket connections. It uses a token bucket algorithm for smooth, burst-friendly rate limiting and offers multiple backend options including in-memory storage for development and Redis for production environments.
The library features a dynamic backend system for multi-tenant applications, customizable throttling strategies, and comprehensive error handling. Whether you need simple per-endpoint limits or complex multi-tenant rate limiting, Traffik provides the flexibility to handle your use case.
Traffik was inspired by fastapi-limiter, and some of the code is adapted from it. However, Traffik aims to provide a more flexible and extensible solution with a focus on ease of use and advanced features like dynamic backend resolution.
- 🚀 Easy Integration: Simple decorator and dependency-based throttling
- 🪣 Token Bucket Algorithm: Smooth, burst-friendly rate limiting with gradual token refill
- 🔄 Multiple Backends: In-memory (development) and Redis (production) support
- 🌐 Protocol Support: Both HTTP and WebSocket throttling
- 🏢 Dynamic Backend Resolution: Multi-tenant support with runtime backend switching
- 🔧 Flexible Configuration: Time-based limits with multiple time units
- 🎯 Per-Route Throttling: Individual limits for different endpoints
- 📊 Client Identification: Customizable client identification strategies
- 🛡️ Thread-Safe Design: Immutable throttles with proper async locking
- ⚡ High Performance: Optimized scripts and efficient in-memory operations
Traffik uses a token bucket algorithm for rate limiting, which provides several advantages over traditional fixed-window approaches:
Think of a bucket that holds tokens:
- Bucket Capacity: Your rate limit (e.g., 100 requests)
- Token Refill Rate: Tokens are added continuously over time
- Request Processing: Each request consumes one token
- Burst Handling: Allows temporary bursts up to bucket capacity
# Configuration
limit = 100 # Bucket holds 100 tokens max
expires_after = 3600000 # 1 hour in milliseconds
refill_rate = 100 / 3600000 # ≈ 0.0278 tokens per millisecond
# Behavior:
# - Bucket starts full (100 tokens)
# - Client can make 100 requests immediately (burst)
# - After burst, tokens refill at ~1.67 per minute
# - Sustained rate: ~27.8 requests per 1000 seconds
- Smooth Rate Limiting: No sudden resets at window boundaries
- Burst Tolerance: Allows legitimate traffic spikes
- Fairness: Gradual token replenishment prevents starvation
- Predictable: Wait times are calculated precisely
We recommend using uv
, however, it is not a strict requirement.
Visit the uv documentation for installation instructions.
uv add traffik
# or using pip
pip install traffik
Let's also install fastapi
if you haven't already:
uv add traffik[fastapi]
# or using pip
pip install traffik[fastapi]
uv add traffik[redis]
# or using pip
pip install traffik[redis]
uv add traffik[all]
# or using pip
pip install traffik[all]
git clone https://github.com/your-username/traffik.git
cd traffik
uv sync --extra dev
# or using pip
pip install -e .[dev]
For quick testing across different platforms and Python versions:
# Run fast tests
./docker-test.sh test-fast
# Run full test suite
./docker-test.sh test
# Start development environment
./docker-test.sh dev
# Test across Python versions
./docker-test.sh test-matrix
Testing Documentation:
- DOCKER.md - Complete Docker testing guide
- TESTING.md - Quick testing guide
- TESTING_COMPLETE.md - Comprehensive testing reference
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.requests import Request
from starlette.responses import JSONResponse
from traffik.throttles import HTTPThrottle
from traffik.backends.inmemory import InMemoryBackend
# Create backend
throttle_backend = InMemoryBackend(prefix="myapp", persistent=False)
throttle = HTTPThrottle(
limit=5, # 5 requests
seconds=10, # per 10 seconds
)
async def throttled_endpoint(request: Request):
"""
Endpoint that is throttled.
"""
await throttle(request)
return JSONResponse({"message": "Success"})
app = Starlette(
routes=[
Route("/throttled", throttled_endpoint, methods=["GET"]),
],
lifespan=throttle_backend.lifespan, # Use `throttle_backend.lifespan` for cleanup
)
from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager
from traffik.backends.inmemory import InMemoryBackend
from traffik.throttles import HTTPThrottle
# Create backend
throttle_backend = InMemoryBackend()
# Create FastAPI app lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
async with throttle_backend(app):
yield
app = FastAPI(lifespan=lifespan)
# Create throttle
throttle = HTTPThrottle(
limit=10, # 10 requests
seconds=60, # per 60 seconds
)
@app.get("/api/hello", dependencies=[Depends(throttle)])
async def say_hello():
return {"message": "Hello World"}
Currently, the available decorator is only for FastAPI applications.
from fastapi import FastAPI
from contextlib import asynccontextmanager
from traffik.throttles import HTTPThrottle
from traffik.decorators import throttled # Requires `traffik[fastapi]` or `traffik[all]`
from traffik.backends.redis import RedisBackend
throttle_backend = RedisBackend(
connection="redis://localhost:6379/0",
prefix="myapp", # Key prefix
persistent=True, # Survive restarts
)
# Setup FastAPI app with lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
async with throttle_backend(app):
yield
app = FastAPI(lifespan=lifespan)
@app.get("/api/limited")
@throttled(HTTPThrottle(limit=5, minutes=1))
async def limited_endpoint():
return {"data": "Limited access"}
WebSocket throttling limits the rate of messages a client can send over a WebSocket connection:
from traffik.throttles import WebSocketThrottle
from starlette.websockets import WebSocket
from starlette.exceptions import HTTPException
ws_throttle = WebSocketThrottle(limit=3, seconds=10)
async def ws_endpoint(websocket: WebSocket) -> None:
await websocket.accept()
while True:
try:
data = await websocket.receive_json()
await ws_throttle(websocket) # Check rate limit
await websocket.send_json({
"status": "success",
"data": data,
})
except HTTPException as exc:
await websocket.send_json({
"status": "error",
"status_code": exc.status_code,
"detail": exc.detail,
})
break
except Exception:
await websocket.send_json({
"status": "error",
"detail": "Internal error"
})
break
await websocket.close()
Use this WebSocket endpoint in your application:
from starlette.applications import Starlette
from starlette.routing import WebSocketRoute
from fastapi import FastAPI
# For Starlette
app = Starlette(routes=[WebSocketRoute("/ws/limited", ws_endpoint)])
# For FastAPI
app = FastAPI()
app.websocket("/ws/limited")(ws_endpoint)
Perfect for development, testing, and single-process applications:
from traffik.backends.inmemory import InMemoryBackend
inmemory_throttle_backend = InMemoryBackend(
prefix="myapp", # Key prefix
persistent=False, # Don't persist across restarts
)
Pros:
- No external dependencies
- Fast and simple
- Great for testing
Cons:
- Not suitable for multi-process/distributed systems
- Data is lost on restart (even with persistent=True)
Recommended for production environments:
from traffik.backends.redis import RedisBackend
# From URL
redis_throttle_backend = RedisBackend(
connection="redis://localhost:6379/0",
prefix="myapp",
persistent=True, # Survive restarts
)
# From Redis instance
import redis.asyncio as redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
redis_throttle_backend = RedisBackend(
connection=redis_client,
prefix="myapp",
)
Pros:
- Distributed throttling across multiple processes
- Persistence across restarts
- Production-ready
Cons:
- Requires Redis server
- Additional infrastructure dependency
You can create custom backends by subclassing ThrottleBackend
and implementing the token bucket algorithm. Here's a simplified example:
import time
import typing
from traffik.backends.base import ThrottleBackend
from traffik.types import HTTPConnectionT
class CustomBackend(ThrottleBackend[typing.Dict, HTTPConnectionT]):
"""Custom backend implementing token bucket algorithm"""
def __init__(self, storage_config: str = "custom://config", **kwargs):
# Initialize your storage connection
self._storage = {}
super().__init__(connection=self._storage, **kwargs)
async def get_wait_period(self, key: str, limit: int, expires_after: int) -> int:
"""Token bucket implementation for custom storage"""
now = int(time.monotonic() * 1000)
# Get or create token bucket record
record = self._storage.get(key, {
"tokens": float(limit),
"last_refill": now
})
# Calculate tokens to add based on elapsed time
time_elapsed = now - record["last_refill"]
refill_rate = limit / expires_after # tokens per millisecond
tokens_to_add = time_elapsed * refill_rate
# Refill tokens (capped at limit)
record["tokens"] = min(float(limit), record["tokens"] + tokens_to_add)
record["last_refill"] = now
# Check if request can be served
if record["tokens"] >= 1.0:
record["tokens"] -= 1.0
self._storage[key] = record
return 0 # Allow request
# Calculate wait time for next token
tokens_needed = 1.0 - record["tokens"]
wait_time = int(tokens_needed / refill_rate)
self._storage[key] = record
return wait_time
async def reset(self) -> None:
"""Reset all throttle records"""
pattern = str(self.key_pattern)
keys_to_delete = [k for k in self._storage.keys() if k.startswith(pattern.replace("*", ""))]
for key in keys_to_delete:
del self._storage[key]
async def close(self) -> None:
"""Clean up resources"""
self._storage.clear()
# Usage
custom_backend = CustomBackend(prefix="myapp")
throttle = HTTPThrottle(limit=100, minutes=1, backend=custom_backend)
Throttles support multiple time units that can be combined:
HTTPThrottle(
limit=100,
milliseconds=500, # 500ms
seconds=30, # + 30 seconds
minutes=5, # + 5 minutes
hours=1, # + 1 hour
# Total: 1 hour, 5 minutes, 30.5 seconds
)
By default, clients are identified by IP address and path. You can customize this:
from starlette.requests import HTTPConnection
from traffik.throttles import HTTPThrottle
async def custom_identifier(connection: HTTPConnection):
# Use user ID from JWT token
user_id = extract_user_id(connection.headers.get("authorization"))
return f"user:{user_id}:{connection.scope['path']}"
throttle = HTTPThrottle(
limit=10,
minutes=1,
identifier=custom_identifier, # Override default (backend) identifier
)
You can exclude certain connections from throttling by writing a custom identifier that returns traffik.UNLIMITED
for those connections. This is useful when you have throttles you want to skip for specific clients and/or routes.
import typing
from starlette.requests import HTTPConnection
from traffik import UNLIMITED, HTTPThrottle
def extract_user_id(authorization: str) -> str:
# Dummy function to extract user ID from JWT token
# Replace with actual JWT decoding logic
return authorization.split(" ")[1] if authorization else "anonymous"
def extract_user_role(authorization: str) -> str:
# Dummy function to extract user role from JWT token
# Replace with actual JWT decoding logic
return "admin" if "admin" in authorization else "user"
async def user_identifier(connection: HTTPConnection) -> str:
# Use user ID from JWT token
user_id = extract_user_id(connection.headers.get("authorization"))
return f"user:{user_id}:{connection.scope['path']}"
async def no_throttle_admin_identifier(connection: HTTPConnection) -> typing.Any:
user_role = extract_user_role(connection.headers.get("authorization"))
if user_role == "admin":
return UNLIMITED # Skip throttling for admin users
return user_identifier(connection)
throttle = HTTPThrottle(
limit=10,
minutes=1,
identifier=no_throttle_admin_identifier, # Override default (backend) identifier
)
Customize what happens when a client is throttled:
from starlette.requests import HTTPConnection
from starlette.exceptions import HTTPException
import traffik
async def custom_throttled_handler(
connection: HTTPConnection,
wait_period: int,
*args, **kwargs
):
raise HTTPException(
status_code=429,
detail=f"Too many requests. Try again in {wait_period // 1000} seconds.",
headers={"Retry-After": str(wait_period // 1000)},
)
throttle = traffik.HTTPThrottle(
limit=5,
minutes=1,
handle_throttled=custom_throttled_handler,
)
Different limits can be applied to the same endpoint using multiple throttles. This is useful for burst and sustained limits. Take the FastAPI example below:
from fastapi import FastAPI, Depends
from traffik.throttles import HTTPThrottle
from traffik.backends.inmemory import InMemoryBackend
throttle_backend = InMemoryBackend(prefix="myapp", persistent=False)
@asynccontextmanager
async def lifespan(app: FastAPI):
async with throttle_backend(app):
yield
app = FastAPI(lifespan=lifespan)
# Burst limit: 10 requests per minute
burst_throttle = HTTPThrottle(limit=10, minutes=1)
# Sustained limit: 100 requests per hour
sustained_throttle = HTTPThrottle(limit=100, hours=1)
async def search_db(query: str):
# Simulate a database search
return {"query": query, "result": ...}
@app.get(
"/api/data",
dependencies=[
Depends(burst_throttle),
Depends(sustained_throttle)
],
)
async def search(query: str):
data = await search_db(query)
return {"message": "Data retrieved", "data": data}
from traffik.throttles import HTTPThrottle
from starlette.requests import Request
async def get_user_id(request: Request):
# Extract user ID from request state
return request.state.user.id if hasattr(request.state, 'user') else None
async def user_identifier(request: Request) -> str:
# Extract user ID from JWT or session
user_id = await get_user_id(request)
return f"user:{user_id}"
user_throttle = HTTPThrottle(
limit=100,
hours=1,
identifier=user_identifier,
)
The dynamic_backend=True
feature enables runtime backend switching, perfect for multi-tenant SaaS applications where different tenants require isolated rate limiting storage.
from fastapi import FastAPI, Request, Depends, HTTPException
from traffik.throttles import HTTPThrottle
from traffik.backends.redis import RedisBackend
from traffik.backends.inmemory import InMemoryBackend
import jwt
# Shared throttle instance for all tenants
api_quota_throttle = HTTPThrottle(
uid="api_quota",
limit=1000, # 1000 requests
hours=1, # per hour
dynamic_backend=True # Enable runtime backend resolution
)
# Tenant configuration
TENANT_CONFIG = {
"enterprise": {
"redis_url": "redis://enterprise-cluster:6379/0",
"quota_multiplier": 5.0, # 5x higher limits
},
"premium": {
"redis_url": "redis://premium-redis:6379/0",
"quota_multiplier": 2.0, # 2x higher limits
},
"free": {
"redis_url": None, # Use in-memory for free tier
"quota_multiplier": 1.0,
}
}
def extract_tenant_from_jwt(authorization: str) -> dict:
"""Extract tenant info from JWT token"""
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(401, "Missing or invalid authorization")
token = authorization.split(" ")[1]
try:
payload = jwt.decode(token, "your-secret", algorithms=["HS256"])
tenant_tier = payload.get("tenant_tier", "free")
tenant_id = payload.get("tenant_id", "unknown")
return {"tier": tenant_tier, "id": tenant_id}
except jwt.InvalidTokenError:
raise HTTPException(401, "Invalid token")
async def tenant_middleware(request: Request, call_next):
"""Middleware to set up tenant-specific backend context"""
# Extract tenant from request
auth_header = request.headers.get("authorization", "")
tenant_info = extract_tenant_from_jwt(auth_header)
# Get tenant configuration
tenant_config = TENANT_CONFIG.get(tenant_info["tier"], TENANT_CONFIG["free"])
# Create tenant-specific backend
if tenant_config["redis_url"]:
# Premium/Enterprise: Dedicated Redis instance
backend = RedisBackend(
connection=tenant_config["redis_url"],
prefix=f"tenant_{tenant_info['id']}",
persistent=True
)
else:
# Free tier: In-memory backend
backend = InMemoryBackend(
prefix=f"tenant_{tenant_info['id']}",
persistent=False
)
# Set tenant context for request
request.state.tenant = tenant_info
# Execute request within backend context
async with backend:
response = await call_next(request)
return response
app = FastAPI()
app.middleware("http")(tenant_middleware)
@app.get("/api/data")
async def get_data(request: Request, _: None = Depends(api_quota_throttle)):
"""API endpoint with tenant-aware rate limiting"""
tenant = request.state.tenant
return {
"message": f"Data for {tenant['tier']} tenant {tenant['id']}",
"remaining_quota": "Calculated based on tenant tier"
}
# Usage example:
# curl -H "Authorization: Bearer <jwt-with-tenant-info>" http://localhost:8000/api/data
from traffik.throttles import HTTPThrottle
from traffik.backends.redis import RedisBackend
from traffik.backends.inmemory import InMemoryBackend
# Shared throttle for testing different backends
test_throttle = HTTPThrottle(
uid="test_throttle",
limit=5,
seconds=10,
dynamic_backend=True
)
async def test_backend_switching():
"""Test the same throttle with different backends"""
# Test with Redis backend
redis_backend = RedisBackend("redis://localhost:6379/1", prefix="test_redis")
async with redis_backend:
for i in range(3):
await test_throttle(mock_request)
print(f"Redis backend - Request {i+1} successful")
# Test with in-memory backend (completely separate state)
inmemory_backend = InMemoryBackend(prefix="test_memory")
async with inmemory_backend:
for i in range(3):
await test_throttle(mock_request) # Fresh counter
print(f"In-memory backend - Request {i+1} successful")
# Nested contexts for A/B testing
backend_a = InMemoryBackend(prefix="variant_a")
backend_b = InMemoryBackend(prefix="variant_b")
async with backend_a:
await test_throttle(mock_request) # Uses backend_a
async with backend_b:
await test_throttle(mock_request) # Switches to backend_b
await test_throttle(mock_request) # Still backend_b
await test_throttle(mock_request) # Back to backend_a
When to Use Dynamic Backends:
- ✅ Multi-tenant SaaS with tenant-specific storage requirements
- ✅ A/B testing different rate limiting strategies
- ✅ Environment-specific backend selection (dev/staging/prod)
- ✅ Request-type based storage (e.g., different limits for API vs Web requests)
When NOT to Use:
- ❌ Simple shared storage across services (use explicit
backend
parameter) - ❌ Single-tenant applications (adds unnecessary complexity)
- ❌ When backend choice is known at application startup
Performance Impact:
- Small overhead: Backend resolution on each request
- Memory efficiency: Only one throttle instance needed per limit type
- Context switching: May cause data fragmentation if inconsistent
For Starlette applications, you can manage the backend lifecycle using the lifespan
context manager on the throttle backend. This ensures that the backend is properly initialized and cleaned up when the application starts and stops.
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.requests import Request
from traffik.backends.inmemory import InMemoryBackend
from traffik.throttles import HTTPThrottle
# Create backend
throttle_backend = InMemoryBackend(prefix="myapp", persistent=False)
throttle = HTTPThrottle(
limit=5, # 5 requests
seconds=10, # per 10 seconds
)
async def throttled_endpoint(request: Request):
"""
Endpoint that is throttled.
"""
await throttle(request)
return JSONResponse({"message": "Success"})
app = Starlette(
routes=[
Route("/throttled", throttled_endpoint, methods=["GET"]),
],
lifespan=throttle_backend.lifespan, # Use `throttle_backend.lifespan` for cleanup
)
For FastAPI applications, you can use the lifespan
attribute of the throttle backend to manage the backend lifecycle. However, you can also use the asynccontextmanager
decorator to create a lifespan context manager for your FastAPI application. This allows you to perform other setup and teardown tasks when the application starts and stops.
from fastapi import FastAPI
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# Other setup tasks can go here
async with backend(app):
yield
# Shutdown - backend cleanup handled automatically
app = FastAPI(lifespan=lifespan)
Traffik provides powerful middleware capabilities that allow you to apply rate limiting across multiple endpoints with sophisticated filtering and routing logic. The middleware system is ideal for applying consistent rate limiting policies across your application while maintaining flexibility for specific requirements.
from fastapi import FastAPI
from traffik.middleware import ThrottleMiddleware, MiddlewareThrottle
from traffik.throttles import HTTPThrottle
from traffik.backends.inmemory import InMemoryBackend
app = FastAPI()
backend = InMemoryBackend(prefix="api")
# Create a throttle instance
api_throttle = HTTPThrottle(
uid="api_global",
limit=100,
minutes=1
)
# Wrap it in middleware throttle - applies to all endpoints
basic_middleware_throttle = MiddlewareThrottle(api_throttle)
# Add middleware
app.add_middleware(
ThrottleMiddleware,
middleware_throttles=[basic_middleware_throttle],
backend=backend
)
@app.get("/api/users")
async def get_users():
return {"users": []}
@app.get("/api/posts")
async def get_posts():
return {"posts": []}
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import JSONResponse
from traffik.middleware import ThrottleMiddleware, MiddlewareThrottle
from traffik.throttles import HTTPThrottle
from traffik.backends.inmemory import InMemoryBackend
async def users_endpoint(request):
return JSONResponse({"users": []})
async def posts_endpoint(request):
return JSONResponse({"posts": []})
backend = InMemoryBackend(prefix="api")
# Create throttle instance
api_throttle = HTTPThrottle(
uid="api_throttle",
limit=50,
minutes=1
)
# Wrap in middleware throttle
middleware_throttle = MiddlewareThrottle(api_throttle)
app = Starlette(
routes=[
Route("/api/users", users_endpoint),
Route("/api/posts", posts_endpoint),
],
middleware=[
(ThrottleMiddleware, {
"middleware_throttles": [middleware_throttle],
"backend": backend
})
]
)
Apply different limits to different HTTP methods:
# Strict limits for write operations
write_throttle = HTTPThrottle(
uid="write_operations",
limit=10,
minutes=1
)
# Generous limits for read operations
read_throttle = HTTPThrottle(
uid="read_operations",
limit=1000,
minutes=1
)
# Create middleware throttles with method filtering
write_middleware = MiddlewareThrottle(
write_throttle,
methods={"POST", "PUT", "DELETE"} # Only write methods
)
read_middleware = MiddlewareThrottle(
read_throttle,
methods={"GET", "HEAD"} # Only read methods
)
app.add_middleware(
ThrottleMiddleware,
middleware_throttles=[write_middleware, read_middleware],
backend=backend
)
Use string patterns or regex to target specific endpoints:
# Create throttle instances
api_throttle = HTTPThrottle(uid="api_endpoints", limit=100, minutes=1)
admin_throttle = HTTPThrottle(uid="admin_endpoints", limit=5, minutes=1)
static_throttle = HTTPThrottle(uid="static_files", limit=10000, minutes=1)
# Create middleware throttles with path filtering
api_middleware = MiddlewareThrottle(
api_throttle,
path="/api/" # Matches paths starting with /api/
)
admin_middleware = MiddlewareThrottle(
admin_throttle,
path="/admin/" # Matches paths starting with /admin/
)
# For complex patterns, use regex strings
static_middleware = MiddlewareThrottle(
static_throttle,
path=r"^/(static|assets|media)/.*" # Regex pattern for static files
)
app.add_middleware(
ThrottleMiddleware,
middleware_throttles=[admin_middleware, api_middleware, static_middleware],
backend=backend
)
Implement complex business logic with custom hooks:
from starlette.requests import HTTPConnection
# Create throttle instances
auth_throttle = HTTPThrottle(uid="authenticated_users", limit=200, minutes=1)
intensive_throttle = HTTPThrottle(uid="intensive_operations", limit=20, minutes=1)
external_throttle = HTTPThrottle(uid="external_api_calls", limit=50, minutes=5)
async def authenticated_users_only(connection: HTTPConnection) -> bool:
"""Only apply throttle to authenticated users"""
auth_header = connection.headers.get("authorization")
return auth_header is not None and auth_header.startswith("Bearer ")
async def high_priority_endpoints(connection: HTTPConnection) -> bool:
"""Apply strict limits to resource-intensive endpoints"""
intensive_paths = ["/api/reports/", "/api/analytics/", "/api/exports/", "/api/search"]
path = connection.scope["path"]
return any(path.startswith(intensive_path) for intensive_path in intensive_paths)
async def external_api_calls(connection: HTTPConnection) -> bool:
"""Identify requests that trigger external API calls"""
headers = dict(connection.headers)
return "x-external-api" in headers
# Create middleware throttles with custom hooks
auth_middleware = MiddlewareThrottle(
auth_throttle,
hook=authenticated_users_only
)
intensive_middleware = MiddlewareThrottle(
intensive_throttle,
hook=high_priority_endpoints
)
external_middleware = MiddlewareThrottle(
external_throttle,
hook=external_api_calls
)
app.add_middleware(
ThrottleMiddleware,
middleware_throttles=[intensive_middleware, external_middleware, auth_middleware],
backend=backend
)
Combine multiple filters for precise targeting:
# Create throttle instance
complex_throttle = HTTPThrottle(
uid="authenticated_api_posts",
limit=25,
minutes=1
)
async def authenticated_users_only(connection: HTTPConnection) -> bool:
auth_header = connection.headers.get("authorization")
return auth_header is not None and auth_header.startswith("Bearer ")
# Complex middleware: POST requests to API endpoints by authenticated users
complex_middleware = MiddlewareThrottle(
complex_throttle,
path="/api/", # Path starts with /api/
methods={"POST"}, # Only POST requests
hook=authenticated_users_only # Only authenticated users
)
# This throttle will only apply to requests that match ALL criteria:
# - Path starts with /api/
# - Method is POST
# - Hook function returns True (user is authenticated)
Use middleware with dynamic backends for multi-tenant applications:
from fastapi import FastAPI, Request
from traffik.middleware import ThrottleMiddleware, MiddlewareThrottle
from traffik.throttles import HTTPThrottle
from traffik.backends.redis import RedisBackend
from traffik.backends.inmemory import InMemoryBackend
import jwt
# Create tenant-aware throttles
api_throttle = HTTPThrottle(
uid="api_quota",
limit=1000,
hours=1,
dynamic_backend=True # Enable dynamic backend resolution
)
admin_throttle = HTTPThrottle(
uid="admin_quota",
limit=100,
hours=1,
dynamic_backend=True
)
# Create middleware throttles with path filtering
api_middleware = MiddlewareThrottle(api_throttle, path="/api/")
admin_middleware = MiddlewareThrottle(admin_throttle, path="/admin/")
async def tenant_context_middleware(request: Request, call_next):
"""Set up tenant-specific backend context"""
# Extract tenant from JWT (simplified)
auth_header = request.headers.get("authorization", "")
tenant_id = "default"
if auth_header.startswith("Bearer "):
try:
token = auth_header.split(" ")[1]
payload = jwt.decode(token, "secret", algorithms=["HS256"])
tenant_id = payload.get("tenant_id", "default")
except jwt.InvalidTokenError:
pass
# Choose backend based on tenant
if tenant_id.startswith("enterprise_"):
backend = RedisBackend(
connection="redis://enterprise-redis:6379/0",
prefix=f"tenant_{tenant_id}"
)
else:
backend = InMemoryBackend(prefix=f"tenant_{tenant_id}")
# Execute request within tenant's backend context
async with backend:
response = await call_next(request)
return response
app = FastAPI()
# Add tenant middleware first
app.middleware("http")(tenant_context_middleware)
# Add throttle middleware
app.add_middleware(
ThrottleMiddleware,
middleware_throttles=[admin_middleware, api_middleware],
# No backend specified - uses dynamic backend from context
)
The middleware automatically handles WebSocket connections properly:
from fastapi import FastAPI, WebSocket
# Create throttle for WebSocket connections
ws_throttle = WebSocketThrottle(
uid="websocket_connections",
limit=10,
minutes=1
)
# Create middleware throttle
ws_middleware = MiddlewareThrottle(ws_throttle)
app = FastAPI()
app.add_middleware(
ThrottleMiddleware,
middleware_throttles=[ws_middleware],
backend=backend
)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# WebSocket throttling is handled by middleware
await websocket.send_text("Hello WebSocket!")
await websocket.close()
# Regular HTTP endpoints also covered
@app.get("/api/data")
async def get_data():
return {"data": "value"}
Create sophisticated exemption rules:
async def admin_exemption_hook(connection: HTTPConnection) -> bool:
"""Exempt admin users from rate limiting"""
auth_header = connection.headers.get("authorization", "")
if not auth_header.startswith("Bearer "):
return True # Apply throttle to non-authenticated users
try:
token = auth_header.split(" ")[1]
payload = jwt.decode(token, "secret", algorithms=["HS256"])
user_role = payload.get("role", "user")
return user_role != "admin" # False = exempt admin, True = throttle others
except jwt.InvalidTokenError:
return True # Apply throttle to invalid tokens
# Create throttle for non-admin users
user_throttle = HTTPThrottle(uid="non_admin_users", limit=100, minutes=1)
# Throttle that exempts admin users
user_middleware = MiddlewareThrottle(
user_throttle,
hook=admin_exemption_hook
)
The middleware uses the same exception handling as individual throttles:
from traffik.exceptions import ConnectionThrottled
from starlette.responses import JSONResponse
# Custom exception handler for middleware throttling
@app.exception_handler(ConnectionThrottled)
async def throttled_handler(request: Request, exc: ConnectionThrottled):
return JSONResponse(
status_code=429,
content={
"error": "Rate limit exceeded",
"message": f"Too many requests. Try again in {exc.retry_after} seconds.",
"retry_after": exc.retry_after,
"limit_type": "middleware_throttle"
},
headers={"Retry-After": str(int(exc.retry_after))}
)
Place throttle middleware early in the stack for optimal performance:
from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.gzip import GZipMiddleware
app = FastAPI()
# Create throttle
api_throttle = HTTPThrottle(uid="api_rate_limit", limit=1000, minutes=1)
api_middleware = MiddlewareThrottle(api_throttle)
# Add throttling early to reject requests before heavy processing
app.add_middleware(
ThrottleMiddleware,
middleware_throttles=[api_middleware],
backend=backend
)
# Add other middleware after throttling
app.add_middleware(GZipMiddleware)
app.add_middleware(CORSMiddleware, allow_origins=["*"])
Use Redis backend for production middleware deployments:
from traffik.backends.redis import RedisBackend
# Production Redis setup
production_backend = RedisBackend(
connection="redis://redis-cluster:6379/0",
prefix="prod_api",
persistent=True
)
# Create production throttles
production_api_throttle = HTTPThrottle(
uid="production_api",
limit=10000,
hours=1
)
expensive_operations_throttle = HTTPThrottle(
uid="expensive_operations",
limit=100,
minutes=1
)
# Create middleware throttles with filtering
api_middleware = MiddlewareThrottle(production_api_throttle, path="/api/")
expensive_middleware = MiddlewareThrottle(
expensive_operations_throttle,
path=r"^/api/(search|analytics)/.*" # Expensive endpoints
)
app.add_middleware(
ThrottleMiddleware,
middleware_throttles=[expensive_middleware, api_middleware], # Order matters!
backend=production_backend
)
- Specific Before General: Place more specific throttles before general ones in the middleware_throttles list
- Early Placement: Add throttle middleware early in the middleware stack
- Production Backends: Use Redis for multi-instance deployments
- Monitoring: Log throttle hits for monitoring and tuning
- Graceful Degradation: Provide meaningful error messages to clients
- Testing: Thoroughly test filter combinations and edge cases
Traffik provides specific exceptions for different error conditions:
TraffikException
- Base exception for all Traffik-related errorsConfigurationError
- Raised when throttle or backend configuration is invalidAnonymousConnection
- Raised when connection identifier cannot be determinedConnectionThrottled
- HTTP 429 exception raised when rate limits are exceeded
Handle configuration errors:
from traffik.exceptions import ConfigurationError
from traffik.backends.redis import RedisBackend
try:
backend = RedisBackend(connection="invalid://url")
await backend.initialize()
except ConfigurationError as e:
print(f"Backend configuration error: {e}")
Handle anonymous connections:
from starlette.requests import HTTPConnection
from traffik.exceptions import AnonymousConnection
from traffik.backends.base import connection_identifier
async def safe_identifier(connection: HTTPConnection) -> str:
try:
return await connection_identifier(connection)
except AnonymousConnection:
return f"anonymous:{connection.scope['path']}"
Raise throttled exception:
from traffik.exceptions import ConnectionThrottled
async def custom_throttle_handler(connection, wait_period, *args, **kwargs):
raise ConnectionThrottled(
wait_period=wait_period,
detail=f"Rate limited. Retry in {wait_period}s",
headers={"X-Custom-Header": "throttled"}
)
Here's an example of how to test throttling behavior using the InMemoryBackend
. You can
write something similar for your custom backend too.
import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport # pip install httpx
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.requests import Request
from starlette.responses import JSONResponse
from traffik.backends.inmemory import InMemoryBackend
from traffik.throttles import HTTPThrottle
@pytest.fixture(scope="function")
async def backend() -> InMemoryBackend:
return InMemoryBackend(prefix="test", persistent=False)
@pytest.mark.anyio
async def test_throttling(backend: InMemoryBackend):
throttle = HTTPThrottle(limit=2, seconds=1)
async def throttled_endpoint(request: Request):
"""
Endpoint that is throttled.
"""
await throttle(request)
return JSONResponse({"message": "Success"})
app = Starlette(
routes=[
Route("/throttled", throttled_endpoint, methods=["GET"]),
],
lifespan=backend.lifespan, # Use `backend.lifespan` for cleanup
)
# Test that first 2 requests succeed
async with AsyncClient(
transport=ASGITransport(app),
base_url="http://127.0.0.1:123",
) as client:
response1 = await client.get("/throttled")
response2 = await client.get("/throttled")
assert response1.status_code == 200
assert response2.status_code == 200
# Third request should be throttled
response3 = await client.get("/throttled")
assert response3.status_code == 429
- Purpose: Rate limiting for HTTP requests
- Usage: As FastAPI dependency or decorator
- Key Generation: Based on route, client IP, and throttle instance
- Purpose: Rate limiting for WebSocket connections
- Usage: Call directly in WebSocket handlers
- Key Generation: Based on WebSocket path, client, and optional context
- Purpose: Base class for custom throttle implementations
- Customizable: Override
get_key()
method for custom key generation
- Storage: Python dictionary
- Suitable for: Development, testing, single-process apps
- Persistence: Optional (not recommended for production)
- Storage: Redis database
- Suitable for: Production, multi-process, distributed systems
- Persistence: Built-in Redis persistence
We welcome contributions! Please see our Contributing Guide for details.
git clone https://github.com/ti-oluwa/traffik.git
cd traffik
uv sync --extra dev
# Run tests
uv run pytest
# Run linting
uv run ruff check src/ tests/
# Run formatting
uv run ruff format src/ tests/
This project is licensed under the MIT License - see the LICENSE file for details.
See CHANGELOG.md for version history and changes.