A simple, precise Python rate limiter with built-in context manager support for hassle-free API throttling.
- Simple and Intuitive: Easy-to-use context manager interface
- Precise Rate Limiting: Token bucket algorithm ensures accurate timing
- Burst Control: Configure initial tokens to control startup burst behaviour
- Call Tracking: Optional tracking of calls, delays, and efficiency metrics
- Thread-Safe: Safe for use in multi-threaded applications
- Zero Dependencies: No runtime dependencies, lightweight and fast
- Flexible Configuration: Support for any positive rate (calls per second)
- Comprehensive Testing: Extensive test suite with 100% coverage
pip install easylimit# Simplest: 2 calls per second (defaults to a 1-second period)
from easylimit import RateLimiter
limiter = RateLimiter(limit=2)
with limiter:
make_api_call()from datetime import timedelta
from easylimit import RateLimiter
# Minimal period-based usage (recommended)
limiter = RateLimiter(limit=120, period=timedelta(minutes=1))
with limiter:
make_api_call()
# Control initial burst behaviour
controlled_burst = RateLimiter(limit=10, initial_tokens=3) # Start with 3 tokens instead of 10
# Minimal legacy usage (deprecated)
legacy = RateLimiter(max_calls_per_second=2)
# Unlimited variant
unlimited = RateLimiter.unlimited()See runnable examples under examples/, including:
period_based_basic.py: context manager usageperiod_based_manual_acquire.py: manualtry_acquire/acquireunlimited_basic.py: unlimited limiter, with and without trackinginitial_tokens_basic.py: controlling initial burst behaviourinitial_tokens_advanced.py: advanced initial tokens scenarioslegacy_basic.py: legacy API
See examples/legacy_basic.py.
See examples/period_based_basic.py and examples/period_based_manual_acquire.py for similar usage.
See examples/period_based_manual_acquire.py.
See examples/period_based_manual_acquire.py.
For scenarios where rate limiting is optional or needs to be disabled:
See examples/unlimited_basic.py.
The rate limiter supports async context managers and async acquisition methods for use in asyncio applications.
See examples/async_demo.py for a complete demonstration of async usage patterns.
See examples for tracked usage patterns.
See examples for tracking and efficiency calculations.
limit: Maximum number of calls per period (float, required unless using legacy API)period: Time period for the rate limit using timedelta (defaults to 1 second)initial_tokens: Initial number of tokens in the bucket (defaults tolimitfor full bucket)track_calls: Enable call tracking (default: False)history_window_seconds: How long to keep call history for windowed queries (default: 3600)
The initial_tokens parameter allows you to control the initial burst behaviour:
None(default): Start with a full bucket (backward compatible)0: Start with an empty bucket (no initial burst)- Any value between 0 and
limit: Start with a specific number of tokens
See src/easylimit/rate_limiter.py for full API reference.
-
unlimited(track_calls: bool = False) -> RateLimiterCreate an unlimited rate limiter that performs no rate limiting.
track_calls: Enable call tracking (default: False for optimal performance)- Returns: RateLimiter instance configured for unlimited calls
This method returns a RateLimiter instance that maintains the same API surface whilst allowing unlimited calls per second. Ideal for conditional rate limiting scenarios.
-
acquire(timeout: Optional[float] = None) -> boolAcquire a token, blocking if necessary.
timeout: Maximum time to wait for a token (None for no timeout)- Returns:
Trueif token was acquired,Falseif timeout occurred
-
try_acquire() -> boolTry to acquire a token without blocking.
- Returns:
Trueif token was acquired,Falseotherwise
- Returns:
-
async_acquire(timeout: Optional[float] = None) -> boolAsync version of acquire that doesn't block the event loop.
timeout: Maximum time to wait for a token (None for no timeout)- Returns:
Trueif token was acquired,Falseif timeout occurred
-
async_try_acquire() -> boolAsync version of try_acquire.
- Returns:
Trueif token was acquired,Falseotherwise
- Returns:
-
available_tokens() -> floatGet the current number of available tokens.
- Returns: Number of tokens currently available
-
call_count -> intTotal number of calls made through this rate limiter.
- Returns: Total call count
- Raises:
ValueErrorif call tracking is not enabled
-
stats -> CallStatsDetailed statistics about calls and timing.
- Returns:
CallStatsobject with comprehensive metrics - Raises:
ValueErrorif call tracking is not enabled
- Returns:
-
reset_call_count() -> NoneReset the call counter and all tracking data to zero.
- Raises:
ValueErrorif call tracking is not enabled
- Raises:
-
calls_in_window(window_seconds: int) -> intReturn number of calls made in the last N seconds.
window_seconds: Time window in seconds (must be positive)- Returns: Number of calls in the specified window
- Raises:
ValueErrorif call tracking is not enabled or window_seconds is not positive
-
get_efficiency(window_seconds: int = 60) -> floatCalculate rate limit efficiency as percentage (0.0 to 100.0).
window_seconds: Time window for efficiency calculation (default: 60)- Returns: Efficiency percentage (0.0 = no calls, 100.0 = maximum rate utilisation)
- Raises:
ValueErrorif call tracking is not enabled or window_seconds is not positive
The RateLimiter can be used as both sync and async context manager:
Synchronous:
with limiter:
# This block will only execute after acquiring a token
make_api_call()Asynchronous:
async with limiter:
# This block will only execute after acquiring a token
await make_async_api_call()When call tracking is enabled, the stats property returns a CallStats object:
from easylimit import CallStats
@dataclass
class CallStats:
"""Statistics about rate limiter usage."""
total_calls: int # Total number of calls made
total_delay_seconds: float # Cumulative delay time
average_delay_seconds: float # Average delay per call
max_delay_seconds: float # Maximum delay encountered
calls_per_second_average: float # Average call rate
efficiency_percentage: float # Rate limit efficiency (0-100%)
tracking_start_time: datetime # When tracking began
last_call_time: Optional[datetime] # Timestamp of most recent calleasylimit uses a token bucket algorithm to provide precise rate limiting:
- Token Bucket: A bucket holds tokens, with a maximum capacity equal to
max_calls_per_second - Token Refill: Tokens are added to the bucket at a constant rate over time
- Token Consumption: Each operation consumes one token from the bucket
- Rate Limiting: When the bucket is empty, operations must wait for new tokens
This approach allows for:
- Burst Handling: Initial burst of calls up to the bucket capacity
- Sustained Rate: Steady rate limiting after the initial burst
- Precise Timing: Accurate rate control based on elapsed time
When track_calls=True, the rate limiter maintains detailed statistics:
- Call Counting: Each successful token acquisition increments the call counter
- Delay Tracking: Records the time spent waiting for tokens
- Timestamp History: Maintains a sliding window of call timestamps for windowed queries
- Memory Management: Automatically cleans up old timestamps beyond the history window
- Thread Safety: All tracking operations are protected by the same lock as rate limiting
Efficiency Calculation: Efficiency is calculated as (actual_calls / max_possible_calls) * 100% for a given time window, helping you understand how well you're utilising your rate limit.
easylimit is fully thread-safe and can be used safely in multi-threaded applications. All operations, including call tracking, are protected by internal locking mechanisms using threading.RLock().
The rate limiter supports mixed sync and async usage safely. Internal state is protected by a single lock (threading.RLock), with async paths delegating state mutations to a background thread via asyncio.to_thread to avoid blocking the event loop. This ensures that mixed sync/async usage is safe and consistent.
import threading
from easylimit import RateLimiter
# Thread-safe rate limiting with tracking
limiter = RateLimiter(limit=5, track_calls=True)
def worker(worker_id):
for i in range(10):
with limiter:
# Thread-safe API call with automatic tracking
make_api_call(f"worker_{worker_id}_call_{i}")
# Create multiple threads
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# All tracking data is consistent across threads
print(f"Total calls from all threads: {limiter.call_count}")
print(f"Thread-safe efficiency: {limiter.get_efficiency():.1f}%")# Clone the repository
git clone https://github.com/man8/easylimit.git
cd easylimit
# Install uv (if not already installed)
curl -LsSf https://astral.sh/uv/install.sh | sh
# Create an in-project virtualenv (.venv)
uv venv --python 3.13
# (Optional) activate it; uv run does not require activation
source .venv/bin/activate
# Install dependencies
uv sync --all-extras --dev# Run all tests (default)
uv run pytest
# Run only non-legacy tests
uv run pytest -m 'not legacy'
# Run only legacy tests
uv run pytest -m legacy
# Run with coverage
uv run pytest --cov=easylimit --cov-report=html# Lint code
uv run ruff check .
# Format code
uv run ruff format .
# Type checking
uv run mypy src/Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Make your changes
- Add tests for your changes
- Ensure all tests pass (
uv run pytest) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
See CHANGELOG.md for version history and release notes.