Enterprise-grade distributed task queue system with scheduling, monitoring, and auto-scaling capabilities.
- β Distributed Task Processing - Scale workers horizontally across multiple machines
- β Priority Queues - High/medium/low priority task routing
- β Task Scheduling - Cron-like scheduling with timezone support
- β Retry Mechanisms - Exponential backoff, max retries, dead letter queues
- β Result Backend - Store task results in Redis, PostgreSQL, or S3
- β Task Chains & Groups - Compose complex workflows
- β Rate Limiting - Control task execution rate per worker
- β Auto-scaling - Scale workers based on queue depth
| Backend | Type | Use Case |
|---|---|---|
| Redis | Broker + Result | High performance, low latency |
| RabbitMQ | Broker | Reliable message delivery |
| PostgreSQL | Result | Persistent result storage |
| Amazon SQS | Broker | Cloud-native, serverless |
| Kafka | Broker | High throughput streaming |
- β Multi-processing - Process multiple tasks concurrently
- β Auto-reload - Restart workers on code changes
- β Health Checks - Monitor worker health and restart if needed
- β Prefetch Multiplier - Control task prefetching
- β Task Time Limits - Hard and soft timeouts
- β Worker Pools - ThreadPool, ProcessPool, GreenletPool
- β Web Dashboard - Real-time task and worker monitoring
- β Prometheus Metrics - Export metrics for Grafana
- β Task Tracing - Track task execution flow
- β Alerts - Get notified on failures
- β Log Aggregation - Centralized logging
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β TASKQUEUE PRO β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Client β β Client β β Client β β
β β (API) β β (CLI) β β (SDK) β β
β ββββββββ¬ββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ β
β β β β β
β ββββββββββββββββββββββΌβββββββββββββββββββββ β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β BROKER (Redis/RabbitMQ) β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β β High Priorityβ βMed Priority β βLow Priority β β β
β β β Queue β β Queue β β Queue β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β ββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββββββββΌββββββββββββββββββββ β
β βΌ βΌ βΌ β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Worker 1 β β Worker 2 β β Worker N β β
β β (Process) β β (Process) β β (Process) β β
β β β β β β β β
β β ββββββββββββ β β ββββββββββββ β β ββββββββββββ β β
β β β Prefetch β β β β Prefetch β β β β Prefetch β β β
β β β = 4 β β β β = 4 β β β β = 4 β β β
β β ββββββββββββ β β ββββββββββββ β β ββββββββββββ β β
β ββββββββ¬ββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ β
β β β β β
β ββββββββββββββββββββΌβββββββββββββββββββ β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β RESULT BACKEND β β
β β (Redis / PostgreSQL / S3 / MongoDB) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β MONITORING DASHBOARD β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Task Stats β β Worker Stats β β Queue Depth β β
β β Real-time β β Health β β Chart β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Success Rate β β Latency β β Failed Tasks β β
β β Graph β β Histogram β β Retry Queue β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
pip install taskqueue-profrom taskqueue import TaskQueue, task
# Create task queue
tq = TaskQueue(broker_url="redis://localhost:6379/0")
# Define a task
@task(queue="default", max_retries=3)
def send_email(to, subject, body):
"""Send email task"""
import smtplib
# Email sending logic
print(f"Sending email to {to}")
return {"status": "sent", "to": to}
# Enqueue task
result = send_email.delay("user@example.com", "Hello", "World!")
print(f"Task ID: {result.id}")
# Get result
print(result.get(timeout=10))# Start worker
taskqueue worker --queues default,high --concurrency 4
# Start with auto-reload
taskqueue worker --queues default --autoreload
# Start scheduler
taskqueue scheduler --config scheduler.ymlfrom taskqueue import crontab
# Schedule task to run every hour
@task(queue="reports")
def generate_daily_report():
"""Generate daily report"""
print("Generating report...")
return {"status": "completed"}
# Schedule using cron syntax
generate_daily_report.schedule(crontab(hour="*/1"))
# Or use human-readable syntax
generate_daily_report.every("1 hour")from taskqueue import chain
# Chain tasks
workflow = chain(
download_file.s("https://example.com/data.csv"),
process_data.s(),
send_notification.s()
)
result = workflow.delay()from taskqueue import group
# Run tasks in parallel
tasks = group([
send_email.s(f"user{i}@example.com", "Hello", "Body")
for i in range(10)
])
result = tasks.delay()
results = result.get() # Get all results@task(queue="api", rate_limit="10/m") # 10 per minute
def call_external_api(endpoint):
"""Rate limited API call"""
import requests
return requests.get(endpoint).json()# Route specific tasks to specific queues
@task(queue="gpu", routing_key="ml.*")
def train_model(dataset):
"""ML training task - routed to GPU workers"""
pass
@task(queue="io", routing_key="file.*")
def process_large_file(filepath):
"""File processing task - routed to IO workers"""
pass# Clone repository
git clone https://github.com/efealtiparmakoglu/taskqueue-pro.git
cd taskqueue-pro
# Start infrastructure
docker-compose up -d redis postgres
# Start workers
docker-compose up -d worker
# Start API server
docker-compose up -d api
# Access dashboard
open http://localhost:8080# Start dashboard
taskqueue dashboard --port 8080from taskqueue.metrics import task_completed, task_failed
# Custom metrics
@task()
def my_task():
try:
# Task logic
task_completed.inc()
except Exception:
task_failed.inc()
raise# Check worker health
curl http://localhost:8080/health
# Get queue stats
curl http://localhost:8080/api/queues
# Get worker stats
curl http://localhost:8080/api/workers# Test tasks synchronously
result = send_email.run("test@example.com", "Subject", "Body")
# Mock tasks for testing
from taskqueue.testing import task_mock
@task_mock
def test_task():
assert send_email.delay().get() == {"status": "sent"}# config/taskqueue.yml
broker:
url: redis://localhost:6379/0
result_backend:
url: postgresql://user:pass@localhost/taskqueue
worker:
concurrency: 4
prefetch_multiplier: 4
task_time_limit: 3600
max_tasks_per_child: 1000
queues:
- name: default
priority: 5
- name: high
priority: 10
- name: low
priority: 1
monitoring:
enabled: true
port: 8080
metrics:
enabled: true
port: 9090- Use Redis with persistence enabled
- Configure task result expiration
- Set up monitoring and alerts
- Enable task rate limiting
- Configure dead letter queues
- Set up log aggregation
- Enable SSL/TLS for broker connections
- Configure worker auto-scaling
- Set up backup for result backend
@task(
queue="default", # Queue name
priority=5, # Task priority (1-10)
max_retries=3, # Max retry attempts
retry_delay=60, # Initial retry delay (seconds)
retry_backoff=True, # Use exponential backoff
time_limit=3600, # Hard time limit
soft_time_limit=3000, # Soft time limit
rate_limit="100/h", # Rate limit
result_expires=86400, # Result expiration (seconds)
bind=True, # Pass self as first arg
ignore_result=False, # Don't store result
store_errors_even_if_ignored=True
)- Efe AltΔ±parmakoΔlu - @efealtiparmakoglu
MIT License - see LICENSE file