Advanced rate limiting and queue management for Celery workers using Redis-based token bucket algorithms.
Celery Throttle provides a robust solution for processing tasks with strict rate controls, ensuring efficient resource usage and compliance with API rate limits or processing constraints.
- ๐ฏ Works with Your Existing Tasks - Apply rate limiting to any
@app.taskfunction without modification - ๐ Named Queues - Use meaningful names like
"email_notifications"instead of auto-generated UUIDs - โก Precise Rate Limiting - Redis Lua scripts ensure atomic, thread-safe rate control
- ๐ Dynamic Queue Management - Create, modify, and remove queues on-the-fly
- ๐ฅ Burst Control - Optional burst allowance for handling traffic spikes
- ๐ท๏ธ Multi-Service Support - Redis key prefixing enables multiple isolated services
- โ๏ธ Modern Configuration - Streamlined setup with pydantic-settings
pip install celery-throttleRequirements:
- Python 3.12+
- Redis server
- Celery 5.5+
from celery import Celery
from celery_throttle import CeleryThrottle
# Your existing Celery app and tasks
app = Celery('myapp')
@app.task
def send_email(to_email, subject, body):
# Your email sending logic
return {"status": "sent", "to": to_email}
@app.task
def call_external_api(endpoint, data):
# Your API calling logic
return {"status": "success", "endpoint": endpoint}
# Add rate limiting
throttle = CeleryThrottle(celery_app=app)
# Create named queues with different rate limits
throttle.create_queue("10/1m", "email_queue") # 10 emails per minute
throttle.create_queue("100/1h", "api_queue") # 100 API calls per hour
# Submit tasks with rate limiting
throttle.submit_task("email_queue", "myapp.send_email", "user@example.com", "Hello!", "Welcome!")
throttle.submit_task("api_queue", "myapp.call_external_api", "/users/123", {"action": "update"})You need two components:
# Terminal 1: Celery worker (processes the actual tasks)
celery -A myapp worker --loglevel=info --prefetch-multiplier=1
# Terminal 2: Rate limiter dispatcher (manages the queues)
celery-throttle dispatcher --celery-app=myapp:appImportant Worker Settings:
--prefetch-multiplier=1- Required to prevent task prefetching which defeats rate limiting
That's it! Your tasks are now rate-limited.
Flexible rate limit syntax with various time units and optional burst allowances.
- Seconds:
"10/60s"(10 requests per 60 seconds) - Minutes:
"10/5m"(10 requests per 5 minutes) - Hours:
"4000/3h"(4000 requests per 3 hours)
requests/period[time_unit][:burst_allowance]
requests: Number of requests allowedperiod: Time period valuetime_unit:s(seconds),m(minutes), orh(hours)burst_allowance: Optional burst token capacity (defaults to 1)
# Basic rate limits
throttle.create_queue("10/60s", "queue1") # 10 requests per 60 seconds
throttle.create_queue("5/2m", "queue2") # 5 requests per 2 minutes
throttle.create_queue("1000/1h", "queue3") # 1000 requests per 1 hour
# Rate limits with burst allowance
throttle.create_queue("10/60s:5", "queue4") # 10/min with 5 burst tokens
throttle.create_queue("100/1h:20", "queue5") # 100/hour with 20 burst tokensTasks are distributed evenly over time to prevent resource bursting:
throttle.create_queue("10/60s") # 1 task every 6 seconds
throttle.create_queue("100/1h") # 1 task every 36 secondsAllow traffic spikes while maintaining overall rate limits:
throttle.create_queue("10/60s:5") # Allow up to 5 immediate tasks, then smooth distributionfrom celery_throttle import CeleryThrottle
# Load from environment variables (CELERY_THROTTLE_* prefix)
throttle = CeleryThrottle(celery_app=app)
# Or use kwargs
throttle = CeleryThrottle(
celery_app=app,
target_queue="my-rate-limited-queue",
queue_prefix="myapp"
)from celery_throttle import CeleryThrottle
from celery_throttle.config import CeleryThrottleConfig, RedisConfig
config = CeleryThrottleConfig(
app_name="my-rate-limited-app",
target_queue="rate-limited-queue",
queue_prefix="my_app",
redis=RedisConfig(
host="localhost",
port=6379,
db=1
)
)
throttle = CeleryThrottle(celery_app=app, config=config)export CELERY_THROTTLE_REDIS_HOST=localhost
export CELERY_THROTTLE_REDIS_PORT=6379
export CELERY_THROTTLE_REDIS_DB=1
export CELERY_THROTTLE_APP_NAME=my-app
export CELERY_THROTTLE_TARGET_QUEUE=rate-limited-tasks
export CELERY_THROTTLE_QUEUE_PREFIX=my_appConfiguration Precedence:
- Kwargs (highest priority)
- Config object
- Environment variables
- Default values
See CONFIGURATION.md for detailed configuration options.
# Start the task dispatcher
celery-throttle dispatcher --celery-app=myapp:app
# With custom Redis settings
celery-throttle --redis-host=localhost --redis-port=6379 dispatcher --celery-app=myapp:app
# With custom interval (default: 0.1s)
celery-throttle dispatcher --celery-app=myapp:app --interval=0.5# Create queues
celery-throttle queue create "10/1m" # Auto-generated name
celery-throttle queue create "5/30s:3" # With burst allowance
# List all queues
celery-throttle queue list
# Show queue details
celery-throttle queue show <queue-name>
# Update rate limit
celery-throttle queue update <queue-name> "20/1m"
# Activate/deactivate queues
celery-throttle queue activate <queue-name>
celery-throttle queue deactivate <queue-name>
# Remove queues
celery-throttle queue remove <queue-name>
celery-throttle queue cleanup-empty # Remove empty queues
celery-throttle queue cleanup-all # Remove all queues# Get queue statistics
stats = throttle.get_queue_stats("email_queue")
print(f"Waiting: {stats.tasks_waiting}")
print(f"Processing: {stats.tasks_processing}")
print(f"Completed: {stats.tasks_completed}")
print(f"Failed: {stats.tasks_failed}")
# Get rate limit status
status = throttle.get_rate_limit_status("email_queue")
print(f"Available tokens: {status['available_tokens']}")
print(f"Next token in: {status['next_token_in']} seconds")
# List all queues
for queue in throttle.list_queues():
status = "active" if queue.active else "inactive"
print(f"{queue.name}: {queue.rate_limit} ({status})")- Queue Creation - Queues are created with rate limits and stored in Redis
- Task Submission - Tasks are processed immediately or queued based on token availability
- Token Management - Redis Lua scripts atomically manage token buckets for precise rate limiting
- Task Dispatch - Background dispatcher efficiently schedules queued tasks when tokens become available
- Worker Processing - Celery workers process tasks with strict rate limit compliance
- TokenBucketRateLimiter - Atomic Redis-based rate limiting with Lua scripts
- UniversalQueueManager - Dynamic queue creation and lifecycle management
- RateLimitedTaskProcessor - Celery integration with injectable app support
- RateLimitedTaskSubmitter - Task submission with rate limit checking
- RateLimitedTaskDispatcher - Efficient scheduling of queued tasks
- CLI - Complete command-line management interface
# Different API endpoints with different limits
throttle.create_queue("300/15m", "twitter_api") # Twitter limit
throttle.create_queue("5000/1h", "github_api") # GitHub limit
throttle.create_queue("1/1s", "slack_api") # Slack limit
# Submit tasks
throttle.submit_task("twitter_api", "myapp.post_tweet", "Hello world")
throttle.submit_task("github_api", "myapp.create_issue", "bug", "Fix this")
throttle.submit_task("slack_api", "myapp.send_message", "general", "Hi!")# Process large dataset over time
throttle.create_queue("100/5m", "batch_processing")
# Submit batch of tasks
tasks = [
("myapp.process_item", (i, f"item_{i}"), {})
for i in range(5000)
]
results = throttle.submit_multiple_tasks("batch_processing", tasks)
print(f"Immediately processed: {results['submitted']}")
print(f"Queued for later: {results['queued']}")# Service A - Email notifications
email_config = CeleryThrottleConfig(
app_name="email_service",
target_queue="email_rate_limited",
queue_prefix="email"
)
email_throttle = CeleryThrottle(celery_app=app, config=email_config)
email_throttle.create_queue("100/1m", "notifications")
# Service B - API calls
api_config = CeleryThrottleConfig(
app_name="api_service",
target_queue="api_rate_limited",
queue_prefix="api"
)
api_throttle = CeleryThrottle(celery_app=app, config=api_config)
api_throttle.create_queue("1000/1h", "external_calls")
# Each service has isolated Redis keys and Celery queuesSee EXAMPLES.md for more detailed examples.
You can extend RateLimitedTaskProcessor to add instrumentation, metrics, or custom logging:
from celery_throttle import CeleryThrottle, RateLimitedTaskProcessor
import time
class MetricsTaskProcessor(RateLimitedTaskProcessor):
def execute_task(self, queue_name: str, task_name: str, args: tuple, kwargs: dict):
# Add metrics, timing, custom logging, etc.
start_time = time.time()
logger.info(f"โฑ๏ธ Starting {task_name} from {queue_name}")
try:
result = super().execute_task(queue_name, task_name, args, kwargs)
duration = time.time() - start_time
logger.info(f"โ
Completed {task_name} in {duration:.2f}s")
# Send metrics to your monitoring system
return result
except Exception as e:
logger.error(f"โ Failed {task_name}: {e}")
# Send error metrics
raise
# Use custom processor
throttle = CeleryThrottle(celery_app=app, task_processor_cls=MetricsTaskProcessor)Note: This is optional and only needed if you want to instrument task execution. For most use cases, the default processor is sufficient.
For optimal rate limiting, use these Celery worker settings:
celery -A myapp worker \
--prefetch-multiplier=1 \
--without-mingle \
--without-gossip \
--loglevel=infoWhy these settings?
--prefetch-multiplier=1- Required - Prevents workers from prefetching multiple tasks--without-mingle- Improves startup time--without-gossip- Reduces network chatter
For better isolation, run dedicated workers:
# Worker for rate-limited tasks only
celery -A myapp worker --queues=rate-limited-queue --prefetch-multiplier=1
# Worker for regular tasks
celery -A myapp worker --queues=celeryUse different queue prefixes to isolate services:
# Service A
throttle_a = CeleryThrottle(celery_app=app, queue_prefix="service_a")
# Service B
throttle_b = CeleryThrottle(celery_app=app, queue_prefix="service_b")
# Redis keys: service_a:* and service_b:* are completely isolated# Install development dependencies
pip install celery-throttle[dev]
# Run tests
pytest
# Run with coverage
pytest --cov=celery_throttle
# Run specific test file
pytest tests/test_rate_limiter.py -vContributions are welcome! Please feel free to submit a Pull Request.
This project is licensed under the MIT License - see the LICENSE file for details.
- Quick Reference - Fast lookup for common operations
- Configuration Guide - Detailed configuration options
- Examples - Comprehensive usage examples
- Changelog - Version history and changes
- API Reference - Complete API documentation (coming soon)
- GitHub Repository
- Issue Tracker
- PyPI Package (coming soon)