Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/comparison.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ How taskito compares to other Python task queues.
| Soft timeouts | **Yes** | No | No | No | No |
| Custom serializers | **Yes** | Yes | No | No | No |
| Per-task middleware | **Yes** | No | No | Yes | No |
| Multi-process (prefork) | **Yes** | Yes | No | No | No |
| Namespace isolation | **Yes** | No | No | No | No |
| Result streaming | **Yes** (publish/stream) | No | No | No | No |
| Worker discovery | **Yes** (hostname/pid/status) | Yes (flower) | No | No | No |
| Lifecycle events | **Yes** (13 types) | Yes (signals) | No | Yes (actors) | No |
| OpenTelemetry | **Yes** (optional) | Yes (contrib) | No | No | No |
| CLI | **Yes** | Yes | Yes | Yes | Yes |
| Result backend | **Built-in** (SQLite) | Redis / DB / custom | Redis | Redis / custom | Redis / SQLite |
Expand All @@ -42,7 +47,7 @@ taskito is ideal when:

Consider alternatives when:

- **Multi-server workers** — you need workers on separate machines (use Celery or Dramatiq)
- **Multi-server workers** — you need workers on separate machines (taskito supports this with Postgres/Redis backends, but Celery has more mature distributed tooling)
- **Very high throughput** — millions of jobs/sec across a cluster (use Celery + RabbitMQ)
- **Existing Redis infrastructure** — if Redis is already in your stack, RQ or Huey are simple choices
- **Complex routing** — you need topic exchanges, message filtering, or pub/sub patterns (use Celery + RabbitMQ)
Expand Down
102 changes: 102 additions & 0 deletions docs/guide/prefork.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Prefork Worker Pool

Spawn separate child processes for true CPU parallelism. Each child has its own Python GIL, so CPU-bound tasks don't block each other.

## When to Use

| Workload | Recommended pool | Why |
|----------|-----------------|-----|
| I/O-bound (HTTP calls, DB queries) | `thread` (default) | Threads release the GIL during I/O |
| CPU-bound (data processing, ML) | `prefork` | Each process owns its GIL |
| Mixed workloads | `prefork` | CPU tasks benefit; I/O tasks work fine too |

## Getting Started

```python
queue = Queue(db_path="myapp.db", workers=4)
queue.run_worker(pool="prefork", app="myapp:queue")
```

```bash
taskito worker --app myapp:queue --pool prefork
```

The `app` parameter tells each child process how to import your Queue instance. It must be an importable path in `module:attribute` format.

## How It Works

```mermaid
graph LR
S["Scheduler<br/>(Rust)"] -->|"Job JSON"| P["PreforkPool"]
P -->|stdin| C1["Child 1<br/>(own GIL)"]
P -->|stdin| C2["Child 2<br/>(own GIL)"]
P -->|stdin| CN["Child N<br/>(own GIL)"]

C1 -->|stdout| R1["Reader 1"]
C2 -->|stdout| R2["Reader 2"]
CN -->|stdout| RN["Reader N"]

R1 -->|JobResult| RCH["Result Channel"]
R2 -->|JobResult| RCH
RN -->|JobResult| RCH

RCH --> ML["Result Handler"]
ML -->|"complete / retry / DLQ"| DB[("Storage")]
```

1. The Rust scheduler dequeues jobs from storage
2. `PreforkPool` serializes each job as JSON and writes it to the least-loaded child's stdin pipe
3. Each child deserializes the job, executes the task wrapper (with middleware, resources, proxies), and writes the result as JSON to stdout
4. Reader threads parse results and feed them back to the scheduler
5. The scheduler updates job status in storage

## Configuration

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `pool` | `str` | `"thread"` | Set to `"prefork"` to enable |
| `app` | `str` | — | Import path to Queue instance (required) |
| `workers` | `int` | CPU count | Number of child processes |

## Migrating from Thread Pool

The thread pool is the default. To switch to prefork:

=== "Before (thread pool)"

```python
queue.run_worker()
```

=== "After (prefork)"

```python
queue.run_worker(pool="prefork", app="myapp:queue")
```

Everything else stays the same — task decorators, middleware, resources, events, and the scheduler all work identically. The only difference is where task code executes (child process vs. worker thread).

## Debugging Child Processes

Children inherit the parent's stderr, so `print()` statements and Python logging appear in the parent's terminal.

Enable debug logging to see child lifecycle events:

```python
import logging
logging.getLogger("taskito.prefork.child").setLevel(logging.DEBUG)
```

Log output includes:
- `child ready (app=..., pid=...)` — child initialized and waiting for jobs
- `executing task_name[job_id]` — job received (DEBUG level)
- `task task_name[job_id] failed: ...` — task error
- `shutdown received` — clean shutdown
- `resource teardown error` — resource cleanup failure

## Limitations

- **Tasks must be importable**: Each child process imports the app module independently. Tasks defined inside functions or closures cannot be imported.
- **No shared state**: Children are separate processes. In-memory caches, globals, or module-level state are not shared between children.
- **Startup cost**: Each child imports the full app module on start. This happens once per child, not per job.
- **Resource re-initialization**: Worker resources (DB connections, etc.) are initialized independently in each child.
126 changes: 126 additions & 0 deletions docs/guide/streaming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Result Streaming

Stream intermediate results from long-running tasks. Instead of waiting for the final result, consumers receive partial data as it becomes available.

## Publishing Partial Results

Inside a task, call `current_job.publish(data)` to emit a partial result:

```python
from taskito import current_job

@queue.task()
def process_batch(items):
results = []
for i, item in enumerate(items):
result = process(item)
results.append(result)
current_job.publish({"item_id": item.id, "status": "ok", "result": result})
current_job.update_progress(int((i + 1) / len(items) * 100))
return {"total": len(items), "results": results}
```

`publish()` accepts any JSON-serializable value — dicts, lists, strings, numbers.

## Consuming with `stream()`

The caller iterates over partial results as they arrive:

```python
job = process_batch.delay(items)

for partial in job.stream(timeout=120, poll_interval=0.5):
print(f"Processed item {partial['item_id']}: {partial['status']}")

# After stream ends, get the final result
final = job.result(timeout=5)
```

`stream()` polls the database for new partial results, yields each one, and stops when the job reaches a terminal state (complete, failed, dead, cancelled).

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `timeout` | `float` | `60.0` | Maximum seconds to wait |
| `poll_interval` | `float` | `0.5` | Seconds between polls |

## Async Streaming

Use `astream()` in async contexts:

```python
async for partial in job.astream(timeout=120, poll_interval=0.5):
print(f"Got: {partial}")
```

## FastAPI SSE

The built-in FastAPI progress endpoint supports streaming partial results:

```
GET /jobs/{job_id}/progress?include_results=true
```

Events include partial results alongside progress:

```
data: {"status": "running", "progress": 25}
data: {"status": "running", "progress": 25, "partial_result": {"item_id": 1, "status": "ok"}}
data: {"status": "running", "progress": 50}
data: {"status": "complete", "progress": 100}
```

## Patterns

### ETL Pipeline

```python
@queue.task()
def etl_pipeline(source_tables):
for table in source_tables:
rows = extract(table)
transformed = transform(rows)
load(transformed)
current_job.publish({
"table": table,
"rows_processed": len(rows),
"status": "loaded",
})
return {"tables": len(source_tables)}
```

### ML Training

```python
@queue.task()
def train_model(config):
model = build_model(config)
for epoch in range(config["epochs"]):
loss = train_epoch(model)
current_job.publish({
"epoch": epoch + 1,
"loss": float(loss),
"lr": optimizer.param_groups[0]["lr"],
})
save_model(model)
return {"final_loss": float(loss)}
```

### Batch Processing with Error Tracking

```python
@queue.task()
def process_orders(order_ids):
for oid in order_ids:
try:
process_order(oid)
current_job.publish({"order_id": oid, "status": "ok"})
except Exception as e:
current_job.publish({"order_id": oid, "status": "error", "error": str(e)})
return {"total": len(order_ids)}
```

## How It Works

`publish()` stores data as a task log entry with `level="result"`, reusing the existing `task_logs` table. No new tables or Rust changes are needed.

`stream()` polls `get_task_logs(job_id)`, filters for `level == "result"`, tracks the last-seen timestamp, and yields only new entries. It stops when the job's status becomes terminal.
2 changes: 2 additions & 0 deletions zensical.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ nav = [
{ "User Guide" = [
{ "Tasks" = "guide/tasks.md" },
{ "Workers" = "guide/workers.md" },
{ "Prefork Pool" = "guide/prefork.md" },
{ "Queues & Priority" = "guide/queues.md" },
{ "Scheduling" = "guide/scheduling.md" },
{ "Retries & Dead Letters" = "guide/retries.md" },
Expand All @@ -33,6 +34,7 @@ nav = [
{ "Events & Webhooks" = "guide/events-webhooks.md" },
{ "Web Dashboard" = "guide/dashboard.md" },
{ "Native Async Tasks" = "guide/async-tasks.md" },
{ "Result Streaming" = "guide/streaming.md" },
{ "Testing" = "guide/testing.md" },
{ "Deployment" = "guide/deployment.md" },
{ "KEDA Autoscaling" = "guide/keda.md" },
Expand Down
Loading