A comprehensive Python library providing battle-tested decorators and patterns for congestion control in high-performance web applications. Designed to prevent system overload, ensure reliability, and maintain optimal performance under various load conditions.
- π‘οΈ SingleFlight Pattern: Eliminates cache stampedes by deduplicating concurrent identical requests
- β‘ Rate Limiting: Sliding window rate limiting to prevent abuse and control resource usage
- οΏ½ Circuit Breaker: Prevents cascading failures by temporarily stopping requests to failing services
- οΏ½π Dual Support: Works seamlessly with both FastAPI (async) and Flask (sync) applications
- π― Composable: Stack multiple decorators for comprehensive congestion control
- π Thread-Safe: Safe for concurrent access in production environments
- βοΈ Configurable: Customizable TTL, timeouts, rate limits, and key functions
- π§ͺ Well-Tested: Comprehensive test suite with 100% coverage
SingleFlight can reduce redundant computations by up to 70% during cache stampedes, Rate Limiting prevents system overload from abusive requests, and Circuit Breaker stops cascading failures.
| Scenario | Without Controls | With CongestionControls | Improvement |
|---|---|---|---|
| Cache Stampede (10 concurrent requests) | 10 DB calls | 1 DB call | 90% reduction |
| Rate Limited API | Unlimited abuse | Controlled access | Predictable load |
| Failing Service Calls | Cascading failures | Isolated failures | System stability |
# Clone the repository
git clone https://github.com/DarshanAguru/congestioncontrols.git
cd congestioncontrols
# Install dependencies
pip install -r requirements.txt# Copy the congestionControls folder to your project
# Add it to your Python pathcongestionControls/
βββ __init__.py # Package exports
βββ singleflight.py # SingleFlight implementation
βββ rate_limiter.py # RateLimiter implementation
βββ circuit_breaker.py # Circuit Breaker implementation
βββ bulkhead.py # Future: Bulkhead pattern
βββ retry.py # Future: Retry mechanisms
βββ timeout.py # Future: Request timeouts
tests/
βββ test_singleflight.py # SingleFlight tests
βββ test_rate_limiter.py # RateLimiter tests
βββ test_circuit_breaker.py # Circuit Breaker tests
βββ ...
.github/
βββ ISSUE_TEMPLATE/ # Issue templates
βββ PULL_REQUEST_TEMPLATE.md # PR template
βββ ...
from congestionControls import singleflight
# FastAPI example
@singleflight(key_func=lambda user_id: f"user:{user_id}", ttl=0.2)
async def get_user_profile(user_id: int):
# Expensive database call - only executes once per key
return await db.get_user(user_id)
# Multiple concurrent calls to get_user_profile(123) will share the resultfrom congestionControls import rate_limiter
# Flask example
@rate_limiter(key_func=lambda user_id, action: f"user:{user_id}",
max_calls=10, window_seconds=60)
def api_action(user_id: int, action: str):
# Rate limited to 10 calls per user per minute
return perform_action(user_id, action)from congestionControls import circuit_breaker
# API call with circuit breaker
@circuit_breaker(failure_threshold=5, recovery_time=60.0)
def call_external_service():
# If this fails 5 times, circuit opens for 60 seconds
return external_api_call()from congestionControls import singleflight, rate_limiter, circuit_breaker
@singleflight(key_func=lambda user_id: f"user:{user_id}")
@rate_limiter(key_func=lambda user_id: f"user:{user_id}", max_calls=5, window_seconds=60)
@circuit_breaker(failure_threshold=3, recovery_time=30.0)
async def get_user_data(user_id: int):
# Both deduplicated AND rate limited AND circuit protected
return await fetch_user_data(user_id)singleflight(
key_func: Optional[Callable[..., str]] = None,
ttl: float = 0.2,
timeout: float = 5.0
) -> SingleFlightParameters:
key_func: Function to generate cache keys from function argumentsttl: Time-to-live for cached results in secondstimeout: Maximum wait time for ongoing flights in seconds
Example:
@singleflight(key_func=lambda *args, **kwargs: str(args[0]), ttl=1.0, timeout=10.0)
async def expensive_operation(user_id):
return await compute_result(user_id)rate_limiter(
key_func: Optional[Callable[..., str]] = None,
max_calls: int = 10,
window_seconds: float = 60.0
) -> RateLimiterParameters:
key_func: Function to generate rate limit keys from function argumentsmax_calls: Maximum calls allowed per time windowwindow_seconds: Sliding window duration in seconds
Example:
@rate_limiter(key_func=lambda user_id: f"user:{user_id}", max_calls=100, window_seconds=3600)
def api_endpoint(user_id):
return process_request(user_id)circuit_breaker(
failure_threshold: int = 5,
recovery_time: float = 60.0,
success_threshold: int = 1,
expected_exception: type = Exception,
logger: Optional[logging.Logger] = None
) -> CircuitBreakerParameters:
failure_threshold: Number of consecutive failures to open the circuitrecovery_time: Time in seconds to wait before attempting recoverysuccess_threshold: Number of consecutive successes in HALF_OPEN to close the circuitexpected_exception: Exception type to consider as failureslogger: Logger instance for logging state changes
States:
CLOSED: Normal operation, requests pass throughOPEN: Failure threshold exceeded, requests are blockedHALF_OPEN: Recovery period, allows limited requests to test service health
Example:
@circuit_breaker(failure_threshold=3, recovery_time=30.0, expected_exception=ConnectionError)
def call_external_api():
return external_service_call()Run the comprehensive test suite:
# Install test dependencies
pip install pytest pytest-asyncio
# Run all tests
pytest
# Run specific test files
pytest tests/test_singleflight.py
pytest tests/test_rate_limiter.py
pytest tests/test_circuit_breaker.py
# Run with verbose output
pytest -vRun the live demonstration:
python example.py@rate_limiter(key_func=lambda ip: f"ip:{ip}", max_calls=100, window_seconds=60)
@singleflight(key_func=lambda ip, endpoint: f"{endpoint}:{ip}")
async def api_handler(ip: str, endpoint: str):
return await process_api_request(ip, endpoint)@singleflight(key_func=lambda user_id: f"user_data:{user_id}", ttl=5.0)
async def get_user_with_posts(user_id: int):
# Complex join query - cached briefly to prevent stampedes
return await db.get_user_with_posts(user_id)@circuit_breaker(failure_threshold=3, recovery_time=60.0, expected_exception=requests.RequestException)
@rate_limiter(key_func=lambda api_key: f"external:{api_key}", max_calls=50, window_seconds=60)
@singleflight(key_func=lambda api_key, params: f"ext_call:{api_key}:{hash(params)}")
async def call_external_api(api_key: str, params: dict):
return await external_api_call(api_key, params)@circuit_breaker(failure_threshold=5, recovery_time=30.0, expected_exception=grpc.RpcError)
async def call_user_service(user_id: int):
# Circuit breaker protects against failing user service
return await user_service_client.get_user(user_id)import os
# Default configurations
DEFAULT_SINGLEFLIGHT_TTL = float(os.getenv('SINGLEFLIGHT_TTL', '0.2'))
DEFAULT_RATE_LIMIT_MAX = int(os.getenv('RATE_LIMIT_MAX', '100'))
DEFAULT_RATE_WINDOW = float(os.getenv('RATE_WINDOW', '60.0'))# Complex key generation
def complex_key_func(*args, **kwargs):
# Include relevant parameters in key
key_parts = [str(arg) for arg in args]
key_parts.extend(f"{k}:{v}" for k, v in sorted(kwargs.items()))
return ":".join(key_parts)
@singleflight(key_func=complex_key_func)
def my_function(a, b, c=None):
pass- Fork the repository
- Create a feature branch:
git checkout -b feature/amazing-feature - Write tests for your changes
- Ensure all tests pass:
pytest - Commit your changes:
git commit -m 'Add amazing feature' - Push to the branch:
git push origin feature/amazing-feature - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
- Built for the Python community to solve real-world congestion control problems
Darshan Aguru
- π§ Email: agurudf@gmail.com
- π Website: thisdarshiii.in
- π GitHub: @DarshanAguru
- π§ Email: agurudf@gmail.com
- π Website: thisdarshiii.in
- π Issues: GitHub Issues
- π¬ Discussions: GitHub Discussions
- π Documentation: This README and inline code documentation
Made with β€οΈ for reliable, high-performance Python applications