Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,61 @@ graph LR

**Layer 3 — Resource Proxies**: `ProxyHandler` implementations know how to deconstruct live objects (file handles, HTTP sessions, cloud clients) into a JSON-serializable recipe, and how to reconstruct them on the worker before the task function is called. Recipes are optionally HMAC-signed for tamper detection.

## Failure Model

Taskito provides **at-least-once delivery**. Here's what happens when things go wrong:

### Worker crash mid-task

The job stays in `running` status. The scheduler's stale reaper detects it after `timeout_ms` elapses, marks it failed, and retries (if retries remain) or moves to the dead letter queue. No manual intervention needed.

### Parent process crash

All worker threads stop. Jobs in `running` stay in that state until the next worker starts, when the stale reaper picks them up. Jobs in `pending` are unaffected — they'll be dispatched normally on restart.

### Database unavailable

Scheduler polls fail silently (logged via `log::error!`). No new jobs are dispatched. In-flight jobs complete normally — results are cached in memory until the database becomes available.

### Network partition (Postgres/Redis)

Same behavior as database unavailable. The scheduler retries on the next poll cycle (default: every 50ms). Connection pools handle reconnection automatically.

### Duplicate execution

`claim_execution` prevents two workers from picking up the same job simultaneously. But if a worker crashes *after* starting execution, the job will be retried — potentially executing the same task twice. Design tasks to be [idempotent](guide/guarantees.md) to handle this safely.

### Recovery timeline

```mermaid
sequenceDiagram
participant C as Client
participant DB as Database
participant S as Scheduler
participant W as Worker

C->>DB: enqueue(job)
S->>DB: dequeue + claim_execution
S->>W: dispatch job
W->>W: execute task...
Note over W: Worker crashes at T=5s
Note over S: Scheduler continues polling...
Note over S: T=300s: reap_stale_jobs() detects<br/>job.started_at + timeout_ms < now
S->>DB: mark failed, schedule retry
S->>DB: dequeue (same job, retry_count=1)
S->>W: dispatch to different worker
W->>DB: complete + clear claim
```

### Partial writes

If a task completes successfully but the result write to the database fails (e.g., database full, connection lost), the job stays in `running` status. The stale reaper eventually marks it failed and retries it. The task will execute again — make sure it's [idempotent](guide/guarantees.md).

### Jobs without timeouts

!!! warning
If a job has no `timeout_ms` set and the worker crashes, the job stays in `running` **forever**. The stale reaper only detects jobs that have exceeded their timeout. Always set a timeout on production tasks.

## Serialization

taskito uses a pluggable serializer for task arguments and results. The default is `CloudpickleSerializer`, which supports lambdas, closures, and complex Python objects.
Expand Down
73 changes: 44 additions & 29 deletions docs/comparison.md
Original file line number Diff line number Diff line change
@@ -1,37 +1,38 @@
# Comparison

How taskito compares to other Python task queues.
**TL;DR**: Taskito is Celery without the broker. Rust scheduler, no Redis/RabbitMQ, lower latency, better concurrency. Start with SQLite, scale to Postgres when needed.

## Feature Matrix

| Feature | taskito | Celery | RQ | Dramatiq | Huey |
|---|---|---|---|---|---|
| Broker required | **No** | Redis / RabbitMQ | Redis | Redis / RabbitMQ | Redis |
| Core language | **Rust + Python** | Python | Python | Python | Python |
| Priority queues | **Yes** | Yes | No | No | Yes |
| Rate limiting | **Yes** | Yes | No | Yes | No |
| Dead letter queue | **Yes** | No | Yes | No | No |
| Task chaining | **Yes** (chain/group/chord) | Yes (canvas) | No | Yes (pipelines) | No |
| Job cancellation | **Yes** | Yes (revoke) | No | No | Yes |
| Progress tracking | **Yes** | Yes (custom) | No | No | No |
| Unique tasks | **Yes** | No (manual) | No | No | Yes |
| Batch enqueue | **Yes** | No | No | No | No |
| Retry with backoff | **Yes** (exponential + jitter) | Yes | Yes | Yes | Yes |
| Periodic/cron tasks | **Yes** (6-field with seconds) | Yes (celery-beat) | Yes (rq-scheduler) | Yes (APScheduler) | Yes |
| Async support | **Yes** | Yes | No | No | No |
| Cancel running tasks | **Yes** (cooperative) | Yes (revoke) | No | No | No |
| Soft timeouts | **Yes** | No | No | No | No |
| Custom serializers | **Yes** | Yes | No | No | No |
| Per-task middleware | **Yes** | No | No | Yes | No |
| Multi-process (prefork) | **Yes** | Yes | No | No | No |
| Namespace isolation | **Yes** | No | No | No | No |
| Result streaming | **Yes** (publish/stream) | No | No | No | No |
| Worker discovery | **Yes** (hostname/pid/status) | Yes (flower) | No | No | No |
| Lifecycle events | **Yes** (13 types) | Yes (signals) | No | Yes (actors) | No |
| OpenTelemetry | **Yes** (optional) | Yes (contrib) | No | No | No |
| CLI | **Yes** | Yes | Yes | Yes | Yes |
| Result backend | **Built-in** (SQLite) | Redis / DB / custom | Redis | Redis / custom | Redis / SQLite |
| Setup complexity | **`pip install`** | Broker + backend | Redis server | Broker | Redis server |
| Feature | taskito | Celery | RQ | Dramatiq | Huey | TaskIQ |
|---|---|---|---|---|---|---|
| Broker required | **No** | Redis / RabbitMQ | Redis | Redis / RabbitMQ | Redis | Redis / RabbitMQ / Nats |
| Core language | **Rust + Python** | Python | Python | Python | Python | Python |
| Priority queues | **Yes** | Yes | No | No | Yes | Yes |
| Rate limiting | **Yes** | Yes | No | Yes | No | No |
| Dead letter queue | **Yes** | No | Yes | No | No | No |
| Task chaining | **Yes** (chain/group/chord) | Yes (canvas) | No | Yes (pipelines) | No | Yes (pipelines) |
| Job cancellation | **Yes** | Yes (revoke) | No | No | Yes | No |
| Progress tracking | **Yes** | Yes (custom) | No | No | No | No |
| Unique tasks | **Yes** | No (manual) | No | No | Yes | No |
| Batch enqueue | **Yes** | No | No | No | No | No |
| Retry with backoff | **Yes** (exponential + jitter) | Yes | Yes | Yes | Yes | Yes |
| Periodic/cron tasks | **Yes** (6-field with seconds) | Yes (celery-beat) | Yes (rq-scheduler) | Yes (APScheduler) | Yes | Yes (taskiq-cron) |
| Async support | **Yes** | Yes | No | No | No | Yes (native) |
| Cancel running tasks | **Yes** (cooperative) | Yes (revoke) | No | No | No | No |
| Soft timeouts | **Yes** | No | No | No | No | No |
| Custom serializers | **Yes** | Yes | No | No | No | Yes |
| Per-task middleware | **Yes** | No | No | Yes | No | Yes |
| Multi-process (prefork) | **Yes** | Yes | No | No | No | No |
| Namespace isolation | **Yes** | No | No | No | No | No |
| Result streaming | **Yes** (publish/stream) | No | No | No | No | No |
| Worker discovery | **Yes** (hostname/pid/status) | Yes (flower) | No | No | No | No |
| Lifecycle events | **Yes** (13 types) | Yes (signals) | No | Yes (actors) | No | No |
| Async canvas | **Yes** | No | No | No | No | No |
| OpenTelemetry | **Yes** (optional) | Yes (contrib) | No | No | No | Yes (built-in) |
| CLI | **Yes** | Yes | Yes | Yes | Yes | Yes |
| Result backend | **Built-in** (SQLite) | Redis / DB / custom | Redis | Redis / custom | Redis / SQLite | Redis / custom |
| Setup complexity | **`pip install`** | Broker + backend | Redis server | Broker | Redis server | Broker + backend |

## When to Use taskito

Expand Down Expand Up @@ -117,3 +118,17 @@ Huey is a lightweight task queue with Redis or SQLite backends.

**Choose taskito** if you want higher performance and more features with SQLite.
**Choose Huey** if you need a mature, well-documented SQLite-backed queue.

### vs TaskIQ

TaskIQ is a modern, async-native task queue. It's a good fit if you're fully async and already have a broker.

| | taskito | TaskIQ |
|---|---|---|
| **Broker** | None (DB-backed) | Redis / RabbitMQ / Nats |
| **Async** | Native + sync | Async-first |
| **Scheduler** | Rust (Tokio) | Python |
| **GIL** | Rust scheduler bypasses GIL | Python scheduler competes for GIL |
| **Setup** | `pip install taskito` | Install broker + taskiq + broker plugin |

Choose taskito if you want zero infrastructure. Choose TaskIQ if you're fully async and already have Redis/Nats.
36 changes: 36 additions & 0 deletions docs/examples/benchmark.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,42 @@ Final stats: {'pending': 0, 'running': 0, 'completed': 20100, 'failed': 0, 'dead
| **r2d2 pool** | Up to 8 concurrent SQLite connections |
| **Diesel ORM** | Compiled SQL queries, no runtime query building |

## How It Compares

Rough directional comparison on the same hardware (8-core, single machine). These are not scientific benchmarks — run the script above on your own hardware for accurate numbers.

| Metric | taskito (SQLite) | taskito (Postgres) | Celery + Redis | Dramatiq + Redis |
|--------|-----------------|-------------------|---------------|-----------------|
| Enqueue throughput | ~55,000/s | ~20,000/s | ~5,000/s | ~3,000/s |
| Processing (noop, 8 workers) | ~4,000/s | ~3,500/s | ~2,000/s | ~1,500/s |
| p50 latency | 1.1ms | 2.5ms | 5–10ms | 8–15ms |
| p99 latency | 3.4ms | 8ms | 20–50ms | 30–80ms |
| Memory (idle worker) | ~30 MB | ~35 MB | ~80 MB | ~60 MB |
| Setup | `pip install taskito` | + Postgres | + Redis + Celery | + Redis + Dramatiq |
| External services | 0 | 1 (Postgres) | 2 (Redis + result backend) | 1 (Redis) |

!!! note
Celery numbers are from public benchmarks and community reports. Your mileage will vary depending on workload, serializer, and broker configuration. Run your own benchmarks before making decisions.

**Why is taskito faster?**

- Rust scheduler avoids GIL contention — scheduling and dispatch never block Python
- SQLite WAL mode with batch inserts — disk I/O is minimized
- Direct DB polling — no broker hop (enqueue → DB → dequeue is one less network round-trip vs enqueue → Redis → dequeue)
- OS thread pool with per-task GIL acquisition — no multiprocessing overhead for I/O-bound tasks

## Tune for Your Workload

| Symptom | Config to change | Why |
|---------|-----------------|-----|
| Low throughput (I/O tasks) | Increase `workers` | More threads = more concurrent I/O |
| Low throughput (CPU tasks) | Use `pool="prefork"` | Each process gets its own GIL |
| High latency | Decrease `scheduler_poll_interval_ms` | Scheduler checks for ready jobs more often |
| Database too busy | Increase `scheduler_poll_interval_ms` | Less frequent polling reduces DB load |
| Memory growing | Set `result_ttl` | Auto-cleanup old results and metrics |
| Jobs timing out | Increase `default_timeout` | Give tasks more time to complete |
| Jobs piling up | Add more workers or use Postgres | SQLite single-writer limit may bottleneck |

## Tuning

Adjust these for your workload:
Expand Down
23 changes: 13 additions & 10 deletions docs/examples/data-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ data-pipeline/
```python
"""ETL pipeline with task dependencies and named queues."""

import csv
import json
import time

import httpx

from taskito import Queue, current_job

Expand All @@ -33,15 +35,15 @@ queue = Queue(
@queue.task(queue="extract", max_retries=5, retry_backoff=2.0)
def extract_api(endpoint: str) -> list[dict]:
"""Pull records from an API endpoint with retries."""
# Simulate API call
time.sleep(1)
return [{"id": i, "value": f"record_{i}"} for i in range(100)]
response = httpx.get(endpoint, timeout=30)
response.raise_for_status()
return response.json()

@queue.task(queue="extract")
def extract_csv(file_path: str) -> list[dict]:
"""Read records from a CSV file."""
time.sleep(0.5)
return [{"id": i, "row": f"csv_row_{i}"} for i in range(200)]
with open(file_path, newline="") as f:
return list(csv.DictReader(f))

# ── Transform Tasks ──────────────────────────────────────

Expand All @@ -50,7 +52,6 @@ def normalize(records: list[dict], schema: str) -> list[dict]:
"""Normalize records against a schema with progress tracking."""
results = []
for i, record in enumerate(records):
# Simulate normalization
results.append({**record, "schema": schema, "normalized": True})
if (i + 1) % 50 == 0:
current_job.update_progress(int((i + 1) / len(records) * 100))
Expand All @@ -72,9 +73,11 @@ def deduplicate(records: list[dict]) -> list[dict]:

@queue.task(queue="load")
def load_to_warehouse(records: list[dict], table: str) -> dict:
"""Load records into the data warehouse."""
time.sleep(1)
return {"table": table, "rows_inserted": len(records)}
"""Load records into the data warehouse (writes JSON to disk as stand-in)."""
dest = f"/tmp/{table.replace('.', '_')}.json"
with open(dest, "w") as f:
json.dump(records, f, indent=2)
return {"table": table, "rows_inserted": len(records), "dest": dest}

# ── DAG Construction ─────────────────────────────────────

Expand Down
45 changes: 31 additions & 14 deletions docs/examples/notifications.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ notifications/
```python
"""Notification tasks with priority and deduplication."""

import time
import os

import httpx

from taskito import Queue

Expand All @@ -29,33 +31,48 @@ queue = Queue(

# ── Notification Tasks ───────────────────────────────────

@queue.task(priority=10)
@queue.task(priority=10, max_retries=3, retry_backoff=2.0)
def send_urgent_email(to: str, subject: str, body: str) -> dict:
"""High-priority email — runs before bulk notifications."""
# Simulate sending
time.sleep(0.2)
print(f"[URGENT] Email to {to}: {subject}")
response = httpx.post(
"https://api.mailgun.net/v3/YOUR_DOMAIN/messages",
auth=("api", os.environ["MAILGUN_API_KEY"]),
data={"from": "noreply@example.com", "to": to, "subject": subject, "text": body},
)
response.raise_for_status()
return {"to": to, "subject": subject, "sent": True}

@queue.task(priority=0)
@queue.task(priority=0, max_retries=3, retry_backoff=2.0)
def send_bulk_email(to: str, subject: str, body: str) -> dict:
"""Low-priority bulk email."""
time.sleep(0.1)
print(f"[BULK] Email to {to}: {subject}")
response = httpx.post(
"https://api.mailgun.net/v3/YOUR_DOMAIN/messages",
auth=("api", os.environ["MAILGUN_API_KEY"]),
data={"from": "noreply@example.com", "to": to, "subject": subject, "text": body},
)
response.raise_for_status()
return {"to": to, "subject": subject, "sent": True}

@queue.task(priority=5, max_retries=5, retry_backoff=2.0)
def send_push(user_id: str, title: str, message: str) -> dict:
"""Push notification with retries."""
time.sleep(0.3)
print(f"[PUSH] {user_id}: {title}")
response = httpx.post(
"https://fcm.googleapis.com/fcm/send",
headers={"Authorization": f"key={os.environ['FCM_SERVER_KEY']}"},
json={"to": f"/topics/user-{user_id}", "notification": {"title": title, "body": message}},
)
response.raise_for_status()
return {"user_id": user_id, "title": title, "sent": True}

@queue.task()
@queue.task(max_retries=3, retry_backoff=2.0)
def send_sms(phone: str, message: str) -> dict:
"""SMS notification."""
time.sleep(0.5)
print(f"[SMS] {phone}: {message}")
"""SMS notification via Twilio."""
response = httpx.post(
f"https://api.twilio.com/2010-04-01/Accounts/{os.environ['TWILIO_ACCOUNT_SID']}/Messages.json",
auth=(os.environ["TWILIO_ACCOUNT_SID"], os.environ["TWILIO_AUTH_TOKEN"]),
data={"From": os.environ["TWILIO_FROM_NUMBER"], "To": phone, "Body": message},
)
response.raise_for_status()
return {"phone": phone, "sent": True}

# ── Periodic Digest ──────────────────────────────────────
Expand Down
14 changes: 14 additions & 0 deletions docs/guide/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,20 @@ taskito's [Postgres backend](postgres.md) addresses all of these limitations whi

Increasing the pool beyond ~16 typically doesn't help, since SQLite write serialization is the bottleneck.

## Sizing Your Deployment

| Throughput | Backend | Workers | Pool | Notes |
|-----------|---------|---------|------|-------|
| < 100 jobs/s | SQLite | 4 | thread | Default config works fine |
| 100–1K jobs/s | SQLite | 8–16 | thread or prefork | Increase `workers`, monitor WAL size |
| 1K–5K jobs/s | SQLite | 16 | prefork | Prefork for CPU-bound; SQLite handles this well with WAL |
| 5K–20K jobs/s | Postgres | 16–32 | prefork | Switch to Postgres for concurrent writers |
| 20K–50K jobs/s | Postgres | 32+ | prefork | Multiple worker processes, tune `pool_size` |
| > 50K jobs/s | — | — | — | Consider Celery + RabbitMQ for this scale |

!!! note
These are rough guidelines for noop tasks. Real throughput depends on task duration, payload size, and I/O patterns. Run the [benchmark](../examples/benchmark.md) on your hardware to get accurate numbers.

## Checklist

- [ ] Use an absolute path for `db_path`
Expand Down
27 changes: 27 additions & 0 deletions docs/guide/events-webhooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,33 @@ for event in [EventType.JOB_ENQUEUED, EventType.JOB_COMPLETED, EventType.JOB_FAI
queue.on_event(event, audit_log)
```

## Event Ordering

Events fire in the order the scheduler processes results — typically the order jobs complete. For jobs that complete nearly simultaneously, ordering is **not guaranteed** across different workers or threads.

Within a single job's lifecycle, events always fire in this order:

1. `JOB_ENQUEUED` (at enqueue time)
2. `JOB_COMPLETED` / `JOB_FAILED` / `JOB_CANCELLED` (at completion)
3. `JOB_RETRYING` (if retried, before the next attempt)
4. `JOB_DEAD` (if all retries exhausted)

## Backpressure

Events are dispatched to a thread pool (default size: 4, configurable via `event_workers=N`). If callbacks are slow and events arrive faster than they can be processed, they queue in memory.

For high-volume event scenarios:

```python
queue = Queue(event_workers=16) # More threads for slow callbacks
```

If a callback raises an exception, it is logged and the event is dropped — it does not retry or block other callbacks.

## Webhook Failure

Webhooks retry with exponential backoff (up to `max_retries`). After all retries are exhausted, the webhook delivery is **logged and dropped** — there is no dead-letter queue for webhooks. Monitor webhook failures via the `on_failure` callback or structured logging.

### Webhook Receiver (Flask)

A minimal Flask app that receives and verifies taskito webhooks:
Expand Down
Loading
Loading