Skip to content

API design: run() return type Task[None] is misleading and encourages incorrect double-wrapping #45

@shloimy-wiesel

Description

@shloimy-wiesel

Correction Notice

This issue was originally filed with an incorrect root cause analysis. The initial claim — that await ctx.run(work) blocks the FastAPI handler — was wrong. This has been corrected below.


What Actually Happens with await ctx.run(work)

run() is an async def that internally calls asyncio.create_task(_safe()) and returns the task immediately:

async def run(self, coro) -> asyncio.Task[None]:
    async def _safe() -> None:
        try:
            await coro(self)
        except Exception as exc:
            if not self.is_finished:
                await self.error(str(exc))
        finally:
            if not self.is_finished:
                await self.finish()

    task = asyncio.create_task(_safe())
    self._task = task  # prevent GC
    return task  # returns the Task object, does NOT await it

When the caller does await ctx.run(work):

  • await runs the coroutine run() to completion — which just schedules _safe and returns the Task
  • The Task itself runs in the background; await here does NOT wait for _safe() to finish
  • The FastAPI handler falls through immediately to return StreamingResponse(...)

Both of these patterns are behaviorally identical:

await ctx.run(work)                    # awaits the coroutine, not the task
asyncio.create_task(ctx.run(work))     # also just schedules the coroutine

Confirmed via curl: backend SSE events arrive progressively at ~6ms per token. No buffering occurs with either pattern.


Actual API Design Issue

Even though both patterns work, the return type -> asyncio.Task[None] is misleading:

  1. It looks like the caller should store and await the task — but doing so (task = await ctx.run(work); await task) would ACTUALLY block, since await task awaits the background task to completion.

  2. asyncio.create_task(ctx.run(work)) is unnecessarily double-wrappedctx.run() already calls create_task internally. Wrapping it in another create_task creates an outer task that immediately exits after scheduling the inner _safe task. It works but is wasteful and confusing.

  3. The intended usage is ambiguous — should callers await the returned Task? Store it? Ignore it? The API doesn't make this clear.

Concrete confusion this causes

A caller who correctly reads the return type might write:

task = await ctx.run(work)
await task  # "wait for work to complete before returning"
return StreamingResponse(ctx.stream(), ...)
# This DOES block — stream gets everything in one burst

Or they might search for workarounds and land on the double-wrapped pattern:

asyncio.create_task(ctx.run(work))  # works but wraps create_task in create_task
return StreamingResponse(ctx.stream(), ...)

Suggested Fix

Return None instead of asyncio.Task[None]. The task is already stored on self._task to prevent GC — the return value serves no purpose for the caller:

def run(self, coro: Callable[[StreamContext], Awaitable[None]]) -> None:
    """Start work as a background task (fire-and-forget)."""
    async def _safe() -> None:
        ...

    task = asyncio.create_task(_safe())
    self._task = task  # GC protection — caller doesn't need the reference

Callers then write the natural, unambiguous pattern:

ctx.run(work)  # clearly fire-and-forget, can't be misused
return StreamingResponse(ctx.stream(), headers=ctx.response_headers)

If waiting for completion is needed (e.g. tests, batch scenarios), a separate await ctx.run_and_wait(work) could be provided with explicit semantics.


Additional Findings

Two other issues found while reading the source:

1. Race condition in finish() callback — The on_finish callback runs without lock protection. Concurrent write_text() calls append to self._record via put_nowait() without a lock, so the record can be mutated while on_finish is reading it.

2. StateStore lock can introduce event delivery jitterStateStore.get() / set() acquire an asyncio.Lock. Heavy concurrent ctx.store access can block the stream generator's await queue.get(), causing uneven event delivery timing.


Environment

  • ai-sdk-stream-python==0.2.0a7
  • FastAPI + uvicorn
  • Long-running async work (LLM inference via OpenAI-compatible streaming endpoint)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions