From 469f18c5e044dddaffe7acc86de6c499f6f8fb18 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 22 Mar 2026 08:59:19 +0530 Subject: [PATCH 1/3] docs: add dedicated prefork pool guide --- docs/guide/prefork.md | 102 ++++++++++++++++++++++++++++++++++++++++++ zensical.toml | 2 + 2 files changed, 104 insertions(+) create mode 100644 docs/guide/prefork.md diff --git a/docs/guide/prefork.md b/docs/guide/prefork.md new file mode 100644 index 0000000..a4c1ebc --- /dev/null +++ b/docs/guide/prefork.md @@ -0,0 +1,102 @@ +# Prefork Worker Pool + +Spawn separate child processes for true CPU parallelism. Each child has its own Python GIL, so CPU-bound tasks don't block each other. + +## When to Use + +| Workload | Recommended pool | Why | +|----------|-----------------|-----| +| I/O-bound (HTTP calls, DB queries) | `thread` (default) | Threads release the GIL during I/O | +| CPU-bound (data processing, ML) | `prefork` | Each process owns its GIL | +| Mixed workloads | `prefork` | CPU tasks benefit; I/O tasks work fine too | + +## Getting Started + +```python +queue = Queue(db_path="myapp.db", workers=4) +queue.run_worker(pool="prefork", app="myapp:queue") +``` + +```bash +taskito worker --app myapp:queue --pool prefork +``` + +The `app` parameter tells each child process how to import your Queue instance. It must be an importable path in `module:attribute` format. + +## How It Works + +```mermaid +graph LR + S["Scheduler
(Rust)"] -->|"Job JSON"| P["PreforkPool"] + P -->|stdin| C1["Child 1
(own GIL)"] + P -->|stdin| C2["Child 2
(own GIL)"] + P -->|stdin| CN["Child N
(own GIL)"] + + C1 -->|stdout| R1["Reader 1"] + C2 -->|stdout| R2["Reader 2"] + CN -->|stdout| RN["Reader N"] + + R1 -->|JobResult| RCH["Result Channel"] + R2 -->|JobResult| RCH + RN -->|JobResult| RCH + + RCH --> ML["Result Handler"] + ML -->|"complete / retry / DLQ"| DB[("Storage")] +``` + +1. The Rust scheduler dequeues jobs from storage +2. `PreforkPool` serializes each job as JSON and writes it to the least-loaded child's stdin pipe +3. Each child deserializes the job, executes the task wrapper (with middleware, resources, proxies), and writes the result as JSON to stdout +4. Reader threads parse results and feed them back to the scheduler +5. The scheduler updates job status in storage + +## Configuration + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `pool` | `str` | `"thread"` | Set to `"prefork"` to enable | +| `app` | `str` | — | Import path to Queue instance (required) | +| `workers` | `int` | CPU count | Number of child processes | + +## Migrating from Thread Pool + +The thread pool is the default. To switch to prefork: + +=== "Before (thread pool)" + + ```python + queue.run_worker() + ``` + +=== "After (prefork)" + + ```python + queue.run_worker(pool="prefork", app="myapp:queue") + ``` + +Everything else stays the same — task decorators, middleware, resources, events, and the scheduler all work identically. The only difference is where task code executes (child process vs. worker thread). + +## Debugging Child Processes + +Children inherit the parent's stderr, so `print()` statements and Python logging appear in the parent's terminal. + +Enable debug logging to see child lifecycle events: + +```python +import logging +logging.getLogger("taskito.prefork.child").setLevel(logging.DEBUG) +``` + +Log output includes: +- `child ready (app=..., pid=...)` — child initialized and waiting for jobs +- `executing task_name[job_id]` — job received (DEBUG level) +- `task task_name[job_id] failed: ...` — task error +- `shutdown received` — clean shutdown +- `resource teardown error` — resource cleanup failure + +## Limitations + +- **Tasks must be importable**: Each child process imports the app module independently. Tasks defined inside functions or closures cannot be imported. +- **No shared state**: Children are separate processes. In-memory caches, globals, or module-level state are not shared between children. +- **Startup cost**: Each child imports the full app module on start. This happens once per child, not per job. +- **Resource re-initialization**: Worker resources (DB connections, etc.) are initialized independently in each child. diff --git a/zensical.toml b/zensical.toml index 2d72dfc..a0d8b3f 100644 --- a/zensical.toml +++ b/zensical.toml @@ -17,6 +17,7 @@ nav = [ { "User Guide" = [ { "Tasks" = "guide/tasks.md" }, { "Workers" = "guide/workers.md" }, + { "Prefork Pool" = "guide/prefork.md" }, { "Queues & Priority" = "guide/queues.md" }, { "Scheduling" = "guide/scheduling.md" }, { "Retries & Dead Letters" = "guide/retries.md" }, @@ -33,6 +34,7 @@ nav = [ { "Events & Webhooks" = "guide/events-webhooks.md" }, { "Web Dashboard" = "guide/dashboard.md" }, { "Native Async Tasks" = "guide/async-tasks.md" }, + { "Result Streaming" = "guide/streaming.md" }, { "Testing" = "guide/testing.md" }, { "Deployment" = "guide/deployment.md" }, { "KEDA Autoscaling" = "guide/keda.md" }, From 587efae17c61c8ab1f74f43afa406f446cdbbb90 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 22 Mar 2026 08:59:26 +0530 Subject: [PATCH 2/3] docs: add result streaming guide with patterns --- docs/guide/streaming.md | 126 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 docs/guide/streaming.md diff --git a/docs/guide/streaming.md b/docs/guide/streaming.md new file mode 100644 index 0000000..f1ed540 --- /dev/null +++ b/docs/guide/streaming.md @@ -0,0 +1,126 @@ +# Result Streaming + +Stream intermediate results from long-running tasks. Instead of waiting for the final result, consumers receive partial data as it becomes available. + +## Publishing Partial Results + +Inside a task, call `current_job.publish(data)` to emit a partial result: + +```python +from taskito import current_job + +@queue.task() +def process_batch(items): + results = [] + for i, item in enumerate(items): + result = process(item) + results.append(result) + current_job.publish({"item_id": item.id, "status": "ok", "result": result}) + current_job.update_progress(int((i + 1) / len(items) * 100)) + return {"total": len(items), "results": results} +``` + +`publish()` accepts any JSON-serializable value — dicts, lists, strings, numbers. + +## Consuming with `stream()` + +The caller iterates over partial results as they arrive: + +```python +job = process_batch.delay(items) + +for partial in job.stream(timeout=120, poll_interval=0.5): + print(f"Processed item {partial['item_id']}: {partial['status']}") + +# After stream ends, get the final result +final = job.result(timeout=5) +``` + +`stream()` polls the database for new partial results, yields each one, and stops when the job reaches a terminal state (complete, failed, dead, cancelled). + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `timeout` | `float` | `60.0` | Maximum seconds to wait | +| `poll_interval` | `float` | `0.5` | Seconds between polls | + +## Async Streaming + +Use `astream()` in async contexts: + +```python +async for partial in job.astream(timeout=120, poll_interval=0.5): + print(f"Got: {partial}") +``` + +## FastAPI SSE + +The built-in FastAPI progress endpoint supports streaming partial results: + +``` +GET /jobs/{job_id}/progress?include_results=true +``` + +Events include partial results alongside progress: + +``` +data: {"status": "running", "progress": 25} +data: {"status": "running", "progress": 25, "partial_result": {"item_id": 1, "status": "ok"}} +data: {"status": "running", "progress": 50} +data: {"status": "complete", "progress": 100} +``` + +## Patterns + +### ETL Pipeline + +```python +@queue.task() +def etl_pipeline(source_tables): + for table in source_tables: + rows = extract(table) + transformed = transform(rows) + load(transformed) + current_job.publish({ + "table": table, + "rows_processed": len(rows), + "status": "loaded", + }) + return {"tables": len(source_tables)} +``` + +### ML Training + +```python +@queue.task() +def train_model(config): + model = build_model(config) + for epoch in range(config["epochs"]): + loss = train_epoch(model) + current_job.publish({ + "epoch": epoch + 1, + "loss": float(loss), + "lr": optimizer.param_groups[0]["lr"], + }) + save_model(model) + return {"final_loss": float(loss)} +``` + +### Batch Processing with Error Tracking + +```python +@queue.task() +def process_orders(order_ids): + for oid in order_ids: + try: + process_order(oid) + current_job.publish({"order_id": oid, "status": "ok"}) + except Exception as e: + current_job.publish({"order_id": oid, "status": "error", "error": str(e)}) + return {"total": len(order_ids)} +``` + +## How It Works + +`publish()` stores data as a task log entry with `level="result"`, reusing the existing `task_logs` table. No new tables or Rust changes are needed. + +`stream()` polls `get_task_logs(job_id)`, filters for `level == "result"`, tracks the last-seen timestamp, and yields only new entries. It stops when the job's status becomes terminal. From 82ea90a0a644f53a4a09267516fec6c64faf9f04 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 22 Mar 2026 08:59:33 +0530 Subject: [PATCH 3/3] docs: update comparison table with prefork, streaming, namespace, discovery --- docs/comparison.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/docs/comparison.md b/docs/comparison.md index 248e0a5..b34d798 100644 --- a/docs/comparison.md +++ b/docs/comparison.md @@ -23,6 +23,11 @@ How taskito compares to other Python task queues. | Soft timeouts | **Yes** | No | No | No | No | | Custom serializers | **Yes** | Yes | No | No | No | | Per-task middleware | **Yes** | No | No | Yes | No | +| Multi-process (prefork) | **Yes** | Yes | No | No | No | +| Namespace isolation | **Yes** | No | No | No | No | +| Result streaming | **Yes** (publish/stream) | No | No | No | No | +| Worker discovery | **Yes** (hostname/pid/status) | Yes (flower) | No | No | No | +| Lifecycle events | **Yes** (13 types) | Yes (signals) | No | Yes (actors) | No | | OpenTelemetry | **Yes** (optional) | Yes (contrib) | No | No | No | | CLI | **Yes** | Yes | Yes | Yes | Yes | | Result backend | **Built-in** (SQLite) | Redis / DB / custom | Redis | Redis / custom | Redis / SQLite | @@ -42,7 +47,7 @@ taskito is ideal when: Consider alternatives when: -- **Multi-server workers** — you need workers on separate machines (use Celery or Dramatiq) +- **Multi-server workers** — you need workers on separate machines (taskito supports this with Postgres/Redis backends, but Celery has more mature distributed tooling) - **Very high throughput** — millions of jobs/sec across a cluster (use Celery + RabbitMQ) - **Existing Redis infrastructure** — if Redis is already in your stack, RQ or Huey are simple choices - **Complex routing** — you need topic exchanges, message filtering, or pub/sub patterns (use Celery + RabbitMQ)