From 469f18c5e044dddaffe7acc86de6c499f6f8fb18 Mon Sep 17 00:00:00 2001
From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com>
Date: Sun, 22 Mar 2026 08:59:19 +0530
Subject: [PATCH 1/3] docs: add dedicated prefork pool guide
---
docs/guide/prefork.md | 102 ++++++++++++++++++++++++++++++++++++++++++
zensical.toml | 2 +
2 files changed, 104 insertions(+)
create mode 100644 docs/guide/prefork.md
diff --git a/docs/guide/prefork.md b/docs/guide/prefork.md
new file mode 100644
index 0000000..a4c1ebc
--- /dev/null
+++ b/docs/guide/prefork.md
@@ -0,0 +1,102 @@
+# Prefork Worker Pool
+
+Spawn separate child processes for true CPU parallelism. Each child has its own Python GIL, so CPU-bound tasks don't block each other.
+
+## When to Use
+
+| Workload | Recommended pool | Why |
+|----------|-----------------|-----|
+| I/O-bound (HTTP calls, DB queries) | `thread` (default) | Threads release the GIL during I/O |
+| CPU-bound (data processing, ML) | `prefork` | Each process owns its GIL |
+| Mixed workloads | `prefork` | CPU tasks benefit; I/O tasks work fine too |
+
+## Getting Started
+
+```python
+queue = Queue(db_path="myapp.db", workers=4)
+queue.run_worker(pool="prefork", app="myapp:queue")
+```
+
+```bash
+taskito worker --app myapp:queue --pool prefork
+```
+
+The `app` parameter tells each child process how to import your Queue instance. It must be an importable path in `module:attribute` format.
+
+## How It Works
+
+```mermaid
+graph LR
+ S["Scheduler
(Rust)"] -->|"Job JSON"| P["PreforkPool"]
+ P -->|stdin| C1["Child 1
(own GIL)"]
+ P -->|stdin| C2["Child 2
(own GIL)"]
+ P -->|stdin| CN["Child N
(own GIL)"]
+
+ C1 -->|stdout| R1["Reader 1"]
+ C2 -->|stdout| R2["Reader 2"]
+ CN -->|stdout| RN["Reader N"]
+
+ R1 -->|JobResult| RCH["Result Channel"]
+ R2 -->|JobResult| RCH
+ RN -->|JobResult| RCH
+
+ RCH --> ML["Result Handler"]
+ ML -->|"complete / retry / DLQ"| DB[("Storage")]
+```
+
+1. The Rust scheduler dequeues jobs from storage
+2. `PreforkPool` serializes each job as JSON and writes it to the least-loaded child's stdin pipe
+3. Each child deserializes the job, executes the task wrapper (with middleware, resources, proxies), and writes the result as JSON to stdout
+4. Reader threads parse results and feed them back to the scheduler
+5. The scheduler updates job status in storage
+
+## Configuration
+
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `pool` | `str` | `"thread"` | Set to `"prefork"` to enable |
+| `app` | `str` | — | Import path to Queue instance (required) |
+| `workers` | `int` | CPU count | Number of child processes |
+
+## Migrating from Thread Pool
+
+The thread pool is the default. To switch to prefork:
+
+=== "Before (thread pool)"
+
+ ```python
+ queue.run_worker()
+ ```
+
+=== "After (prefork)"
+
+ ```python
+ queue.run_worker(pool="prefork", app="myapp:queue")
+ ```
+
+Everything else stays the same — task decorators, middleware, resources, events, and the scheduler all work identically. The only difference is where task code executes (child process vs. worker thread).
+
+## Debugging Child Processes
+
+Children inherit the parent's stderr, so `print()` statements and Python logging appear in the parent's terminal.
+
+Enable debug logging to see child lifecycle events:
+
+```python
+import logging
+logging.getLogger("taskito.prefork.child").setLevel(logging.DEBUG)
+```
+
+Log output includes:
+- `child ready (app=..., pid=...)` — child initialized and waiting for jobs
+- `executing task_name[job_id]` — job received (DEBUG level)
+- `task task_name[job_id] failed: ...` — task error
+- `shutdown received` — clean shutdown
+- `resource teardown error` — resource cleanup failure
+
+## Limitations
+
+- **Tasks must be importable**: Each child process imports the app module independently. Tasks defined inside functions or closures cannot be imported.
+- **No shared state**: Children are separate processes. In-memory caches, globals, or module-level state are not shared between children.
+- **Startup cost**: Each child imports the full app module on start. This happens once per child, not per job.
+- **Resource re-initialization**: Worker resources (DB connections, etc.) are initialized independently in each child.
diff --git a/zensical.toml b/zensical.toml
index 2d72dfc..a0d8b3f 100644
--- a/zensical.toml
+++ b/zensical.toml
@@ -17,6 +17,7 @@ nav = [
{ "User Guide" = [
{ "Tasks" = "guide/tasks.md" },
{ "Workers" = "guide/workers.md" },
+ { "Prefork Pool" = "guide/prefork.md" },
{ "Queues & Priority" = "guide/queues.md" },
{ "Scheduling" = "guide/scheduling.md" },
{ "Retries & Dead Letters" = "guide/retries.md" },
@@ -33,6 +34,7 @@ nav = [
{ "Events & Webhooks" = "guide/events-webhooks.md" },
{ "Web Dashboard" = "guide/dashboard.md" },
{ "Native Async Tasks" = "guide/async-tasks.md" },
+ { "Result Streaming" = "guide/streaming.md" },
{ "Testing" = "guide/testing.md" },
{ "Deployment" = "guide/deployment.md" },
{ "KEDA Autoscaling" = "guide/keda.md" },
From 587efae17c61c8ab1f74f43afa406f446cdbbb90 Mon Sep 17 00:00:00 2001
From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com>
Date: Sun, 22 Mar 2026 08:59:26 +0530
Subject: [PATCH 2/3] docs: add result streaming guide with patterns
---
docs/guide/streaming.md | 126 ++++++++++++++++++++++++++++++++++++++++
1 file changed, 126 insertions(+)
create mode 100644 docs/guide/streaming.md
diff --git a/docs/guide/streaming.md b/docs/guide/streaming.md
new file mode 100644
index 0000000..f1ed540
--- /dev/null
+++ b/docs/guide/streaming.md
@@ -0,0 +1,126 @@
+# Result Streaming
+
+Stream intermediate results from long-running tasks. Instead of waiting for the final result, consumers receive partial data as it becomes available.
+
+## Publishing Partial Results
+
+Inside a task, call `current_job.publish(data)` to emit a partial result:
+
+```python
+from taskito import current_job
+
+@queue.task()
+def process_batch(items):
+ results = []
+ for i, item in enumerate(items):
+ result = process(item)
+ results.append(result)
+ current_job.publish({"item_id": item.id, "status": "ok", "result": result})
+ current_job.update_progress(int((i + 1) / len(items) * 100))
+ return {"total": len(items), "results": results}
+```
+
+`publish()` accepts any JSON-serializable value — dicts, lists, strings, numbers.
+
+## Consuming with `stream()`
+
+The caller iterates over partial results as they arrive:
+
+```python
+job = process_batch.delay(items)
+
+for partial in job.stream(timeout=120, poll_interval=0.5):
+ print(f"Processed item {partial['item_id']}: {partial['status']}")
+
+# After stream ends, get the final result
+final = job.result(timeout=5)
+```
+
+`stream()` polls the database for new partial results, yields each one, and stops when the job reaches a terminal state (complete, failed, dead, cancelled).
+
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `timeout` | `float` | `60.0` | Maximum seconds to wait |
+| `poll_interval` | `float` | `0.5` | Seconds between polls |
+
+## Async Streaming
+
+Use `astream()` in async contexts:
+
+```python
+async for partial in job.astream(timeout=120, poll_interval=0.5):
+ print(f"Got: {partial}")
+```
+
+## FastAPI SSE
+
+The built-in FastAPI progress endpoint supports streaming partial results:
+
+```
+GET /jobs/{job_id}/progress?include_results=true
+```
+
+Events include partial results alongside progress:
+
+```
+data: {"status": "running", "progress": 25}
+data: {"status": "running", "progress": 25, "partial_result": {"item_id": 1, "status": "ok"}}
+data: {"status": "running", "progress": 50}
+data: {"status": "complete", "progress": 100}
+```
+
+## Patterns
+
+### ETL Pipeline
+
+```python
+@queue.task()
+def etl_pipeline(source_tables):
+ for table in source_tables:
+ rows = extract(table)
+ transformed = transform(rows)
+ load(transformed)
+ current_job.publish({
+ "table": table,
+ "rows_processed": len(rows),
+ "status": "loaded",
+ })
+ return {"tables": len(source_tables)}
+```
+
+### ML Training
+
+```python
+@queue.task()
+def train_model(config):
+ model = build_model(config)
+ for epoch in range(config["epochs"]):
+ loss = train_epoch(model)
+ current_job.publish({
+ "epoch": epoch + 1,
+ "loss": float(loss),
+ "lr": optimizer.param_groups[0]["lr"],
+ })
+ save_model(model)
+ return {"final_loss": float(loss)}
+```
+
+### Batch Processing with Error Tracking
+
+```python
+@queue.task()
+def process_orders(order_ids):
+ for oid in order_ids:
+ try:
+ process_order(oid)
+ current_job.publish({"order_id": oid, "status": "ok"})
+ except Exception as e:
+ current_job.publish({"order_id": oid, "status": "error", "error": str(e)})
+ return {"total": len(order_ids)}
+```
+
+## How It Works
+
+`publish()` stores data as a task log entry with `level="result"`, reusing the existing `task_logs` table. No new tables or Rust changes are needed.
+
+`stream()` polls `get_task_logs(job_id)`, filters for `level == "result"`, tracks the last-seen timestamp, and yields only new entries. It stops when the job's status becomes terminal.
From 82ea90a0a644f53a4a09267516fec6c64faf9f04 Mon Sep 17 00:00:00 2001
From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com>
Date: Sun, 22 Mar 2026 08:59:33 +0530
Subject: [PATCH 3/3] docs: update comparison table with prefork, streaming,
namespace, discovery
---
docs/comparison.md | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/docs/comparison.md b/docs/comparison.md
index 248e0a5..b34d798 100644
--- a/docs/comparison.md
+++ b/docs/comparison.md
@@ -23,6 +23,11 @@ How taskito compares to other Python task queues.
| Soft timeouts | **Yes** | No | No | No | No |
| Custom serializers | **Yes** | Yes | No | No | No |
| Per-task middleware | **Yes** | No | No | Yes | No |
+| Multi-process (prefork) | **Yes** | Yes | No | No | No |
+| Namespace isolation | **Yes** | No | No | No | No |
+| Result streaming | **Yes** (publish/stream) | No | No | No | No |
+| Worker discovery | **Yes** (hostname/pid/status) | Yes (flower) | No | No | No |
+| Lifecycle events | **Yes** (13 types) | Yes (signals) | No | Yes (actors) | No |
| OpenTelemetry | **Yes** (optional) | Yes (contrib) | No | No | No |
| CLI | **Yes** | Yes | Yes | Yes | Yes |
| Result backend | **Built-in** (SQLite) | Redis / DB / custom | Redis | Redis / custom | Redis / SQLite |
@@ -42,7 +47,7 @@ taskito is ideal when:
Consider alternatives when:
-- **Multi-server workers** — you need workers on separate machines (use Celery or Dramatiq)
+- **Multi-server workers** — you need workers on separate machines (taskito supports this with Postgres/Redis backends, but Celery has more mature distributed tooling)
- **Very high throughput** — millions of jobs/sec across a cluster (use Celery + RabbitMQ)
- **Existing Redis infrastructure** — if Redis is already in your stack, RQ or Huey are simple choices
- **Complex routing** — you need topic exchanges, message filtering, or pub/sub patterns (use Celery + RabbitMQ)