Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions docs/api/context.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,34 @@ while job.status == "running":
time.sleep(1)
```

### `current_job.publish()`

```python
current_job.publish(data: Any) -> None
```

Publish a partial result visible to [`job.stream()`](result.md#jobstream) consumers. Use this to stream intermediate data from long-running tasks.

`data` must be JSON-serializable. It is stored as a task log entry with `level="result"`, distinguishing it from regular logs.

```python
@queue.task()
def process_batch(items):
for i, item in enumerate(items):
result = process(item)
current_job.publish({"item_id": item.id, "status": "ok"})
current_job.update_progress(int((i + 1) / len(items) * 100))
return {"total": len(items)}
```

Consumer side:

```python
job = process_batch.delay(items)
for partial in job.stream(timeout=120):
print(f"Processed: {partial}")
```

## How It Works

**Sync tasks (thread pool):**
Expand Down
38 changes: 38 additions & 0 deletions docs/api/result.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,41 @@ Async version of `result()`. Uses `asyncio.sleep()` instead of `time.sleep()`, s
job = add.delay(2, 3)
result = await job.aresult(timeout=10)
```

### `job.stream()`

```python
job.stream(
timeout: float = 60.0,
poll_interval: float = 0.5,
) -> Iterator[Any]
```

Iterate over partial results published by the task via [`current_job.publish()`](context.md#current_jobpublish). Yields each result as it arrives, stops when the job reaches a terminal state.

| Parameter | Type | Default | Description |
|---|---|---|---|
| `timeout` | `float` | `60.0` | Maximum seconds to wait |
| `poll_interval` | `float` | `0.5` | Seconds between polls |

```python
job = batch_process.delay(items)
for partial in job.stream(timeout=120):
print(f"Got: {partial}")
```

### `await job.astream()`

```python
async for partial in job.astream(
timeout: float = 60.0,
poll_interval: float = 0.5,
) -> AsyncIterator[Any]
```

Async version of `stream()`. Uses `asyncio.sleep` so it won't block the event loop.

```python
async for partial in job.astream(timeout=120):
print(f"Got: {partial}")
```
278 changes: 2 additions & 276 deletions docs/changelog.md

Large diffs are not rendered by default.

282 changes: 282 additions & 0 deletions docs/changelog/archive.md

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions py_src/taskito/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,25 @@ def log(
extra_str = None
_queue_ref._inner.write_task_log(ctx.job_id, ctx.task_name, level, message, extra_str)

def publish(self, data: Any) -> None:
"""Publish a partial result visible to ``job.stream()`` consumers.

Use this to stream intermediate results from long-running tasks
(e.g. batch processing, ETL pipelines, ML training steps).

Args:
data: Any JSON-serializable value. Stored as a task log entry
with ``level="result"`` so it can be filtered from regular logs.
"""
ctx = self._require_context()
if _queue_ref is None:
raise RuntimeError("Queue reference not set.")
try:
extra_str = json.dumps(data)
except (TypeError, ValueError):
extra_str = str(data)
_queue_ref._inner.write_task_log(ctx.job_id, ctx.task_name, "result", "", extra_str)

def check_cancelled(self) -> None:
"""Check if cancellation has been requested for this job.

Expand Down
40 changes: 36 additions & 4 deletions py_src/taskito/contrib/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,14 @@ async def get_job_result(
if self._should_register("job-progress"):

@self.get("/jobs/{job_id}/progress")
async def stream_progress(job_id: str) -> StreamingResponse:
"""SSE stream of progress updates until job reaches terminal state."""
async def stream_progress(
job_id: str, include_results: bool = False
) -> StreamingResponse:
"""SSE stream of progress updates until job reaches terminal state.

Pass ``?include_results=true`` to also stream partial results
published via ``current_job.publish()``.
"""
job = queue.get_job(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Job not found")
Expand All @@ -324,15 +330,41 @@ async def stream_progress(job_id: str) -> StreamingResponse:

async def event_stream() -> AsyncGenerator[str, None]:
terminal = {"complete", "failed", "dead", "cancelled"}
last_seen_at: int = 0
while True:
refreshed = queue.get_job(job_id)
if refreshed is None:
yield f"data: {json.dumps({'status': 'not_found'})}\n\n"
return

d = refreshed.to_dict()
payload = json.dumps({"status": d["status"], "progress": d["progress"]})
yield f"data: {payload}\n\n"
event: dict[str, Any] = {
"status": d["status"],
"progress": d["progress"],
}
yield f"data: {json.dumps(event)}\n\n"

if include_results:
logs = queue._inner.get_task_logs(job_id)
for entry in logs:
if entry["level"] != "result":
continue
logged_at = entry.get("logged_at", 0)
if logged_at <= last_seen_at:
continue
last_seen_at = logged_at
extra = entry.get("extra")
if extra:
try:
partial = json.loads(extra)
except (json.JSONDecodeError, TypeError):
partial = extra
result_event = {
"status": d["status"],
"progress": d["progress"],
"partial_result": partial,
}
yield f"data: {json.dumps(result_event)}\n\n"

if d["status"] in terminal:
return
Expand Down
89 changes: 89 additions & 0 deletions py_src/taskito/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

from __future__ import annotations

import asyncio
import json
import time
from collections.abc import AsyncIterator, Iterator
from typing import TYPE_CHECKING, Any

from taskito.async_support.result import AsyncJobResultMixin
Expand Down Expand Up @@ -156,6 +159,92 @@ def result(
time.sleep(min(current_interval, max(0, deadline - time.monotonic())))
current_interval = min(current_interval * 1.5, max_poll_interval)

_TERMINAL_STATUSES = frozenset({"complete", "failed", "dead", "cancelled"})

def stream(
self,
timeout: float = 60.0,
poll_interval: float = 0.5,
) -> Iterator[Any]:
"""Iterate over partial results published by the task via ``current_job.publish()``.

Yields each partial result as it becomes available. Stops when the
job reaches a terminal state (complete, failed, dead, cancelled).

Args:
timeout: Maximum seconds to wait for results.
poll_interval: Seconds between polls.

Yields:
Deserialized partial result data (whatever was passed to ``publish()``).
"""
deadline = time.monotonic() + timeout
last_seen_at: int = 0

while time.monotonic() < deadline:
logs = self._queue._inner.get_task_logs(self.id)
for entry in logs:
if entry["level"] != "result":
continue
logged_at = entry.get("logged_at", 0)
if logged_at <= last_seen_at:
continue
last_seen_at = logged_at
extra = entry.get("extra")
if extra:
try:
yield json.loads(extra)
except (json.JSONDecodeError, TypeError):
yield extra

self.refresh()
if self._py_job.status in self._TERMINAL_STATUSES:
return

time.sleep(min(poll_interval, max(0, deadline - time.monotonic())))

async def astream(
self,
timeout: float = 60.0,
poll_interval: float = 0.5,
) -> AsyncIterator[Any]:
"""Async iterate over partial results published by the task.

Async version of :meth:`stream`. Uses ``asyncio.sleep`` so it won't
block the event loop.

Args:
timeout: Maximum seconds to wait for results.
poll_interval: Seconds between polls.

Yields:
Deserialized partial result data.
"""
deadline = time.monotonic() + timeout
last_seen_at: int = 0

while time.monotonic() < deadline:
logs = self._queue._inner.get_task_logs(self.id)
for entry in logs:
if entry["level"] != "result":
continue
logged_at = entry.get("logged_at", 0)
if logged_at <= last_seen_at:
continue
last_seen_at = logged_at
extra = entry.get("extra")
if extra:
try:
yield json.loads(extra)
except (json.JSONDecodeError, TypeError):
yield extra

self.refresh()
if self._py_job.status in self._TERMINAL_STATUSES:
return

await asyncio.sleep(min(poll_interval, max(0, deadline - time.monotonic())))

def to_dict(self) -> dict[str, Any]:
"""Convert to a plain dictionary for JSON serialization.

Expand Down
Loading
Loading