Correction Notice
This issue was originally filed with an incorrect root cause analysis. The initial claim — that await ctx.run(work) blocks the FastAPI handler — was wrong. This has been corrected below.
What Actually Happens with await ctx.run(work)
run() is an async def that internally calls asyncio.create_task(_safe()) and returns the task immediately:
async def run(self, coro) -> asyncio.Task[None]:
async def _safe() -> None:
try:
await coro(self)
except Exception as exc:
if not self.is_finished:
await self.error(str(exc))
finally:
if not self.is_finished:
await self.finish()
task = asyncio.create_task(_safe())
self._task = task # prevent GC
return task # returns the Task object, does NOT await it
When the caller does await ctx.run(work):
await runs the coroutine run() to completion — which just schedules _safe and returns the Task
- The Task itself runs in the background;
await here does NOT wait for _safe() to finish
- The FastAPI handler falls through immediately to
return StreamingResponse(...)
Both of these patterns are behaviorally identical:
await ctx.run(work) # awaits the coroutine, not the task
asyncio.create_task(ctx.run(work)) # also just schedules the coroutine
Confirmed via curl: backend SSE events arrive progressively at ~6ms per token. No buffering occurs with either pattern.
Actual API Design Issue
Even though both patterns work, the return type -> asyncio.Task[None] is misleading:
-
It looks like the caller should store and await the task — but doing so (task = await ctx.run(work); await task) would ACTUALLY block, since await task awaits the background task to completion.
-
asyncio.create_task(ctx.run(work)) is unnecessarily double-wrapped — ctx.run() already calls create_task internally. Wrapping it in another create_task creates an outer task that immediately exits after scheduling the inner _safe task. It works but is wasteful and confusing.
-
The intended usage is ambiguous — should callers await the returned Task? Store it? Ignore it? The API doesn't make this clear.
Concrete confusion this causes
A caller who correctly reads the return type might write:
task = await ctx.run(work)
await task # "wait for work to complete before returning"
return StreamingResponse(ctx.stream(), ...)
# This DOES block — stream gets everything in one burst
Or they might search for workarounds and land on the double-wrapped pattern:
asyncio.create_task(ctx.run(work)) # works but wraps create_task in create_task
return StreamingResponse(ctx.stream(), ...)
Suggested Fix
Return None instead of asyncio.Task[None]. The task is already stored on self._task to prevent GC — the return value serves no purpose for the caller:
def run(self, coro: Callable[[StreamContext], Awaitable[None]]) -> None:
"""Start work as a background task (fire-and-forget)."""
async def _safe() -> None:
...
task = asyncio.create_task(_safe())
self._task = task # GC protection — caller doesn't need the reference
Callers then write the natural, unambiguous pattern:
ctx.run(work) # clearly fire-and-forget, can't be misused
return StreamingResponse(ctx.stream(), headers=ctx.response_headers)
If waiting for completion is needed (e.g. tests, batch scenarios), a separate await ctx.run_and_wait(work) could be provided with explicit semantics.
Additional Findings
Two other issues found while reading the source:
1. Race condition in finish() callback — The on_finish callback runs without lock protection. Concurrent write_text() calls append to self._record via put_nowait() without a lock, so the record can be mutated while on_finish is reading it.
2. StateStore lock can introduce event delivery jitter — StateStore.get() / set() acquire an asyncio.Lock. Heavy concurrent ctx.store access can block the stream generator's await queue.get(), causing uneven event delivery timing.
Environment
ai-sdk-stream-python==0.2.0a7
- FastAPI + uvicorn
- Long-running async work (LLM inference via OpenAI-compatible streaming endpoint)
Correction Notice
This issue was originally filed with an incorrect root cause analysis. The initial claim — that
await ctx.run(work)blocks the FastAPI handler — was wrong. This has been corrected below.What Actually Happens with
await ctx.run(work)run()is anasync defthat internally callsasyncio.create_task(_safe())and returns the task immediately:When the caller does
await ctx.run(work):awaitruns the coroutinerun()to completion — which just schedules_safeand returns the Taskawaithere does NOT wait for_safe()to finishreturn StreamingResponse(...)Both of these patterns are behaviorally identical:
Confirmed via curl: backend SSE events arrive progressively at ~6ms per token. No buffering occurs with either pattern.
Actual API Design Issue
Even though both patterns work, the return type
-> asyncio.Task[None]is misleading:It looks like the caller should store and await the task — but doing so (
task = await ctx.run(work); await task) would ACTUALLY block, sinceawait taskawaits the background task to completion.asyncio.create_task(ctx.run(work))is unnecessarily double-wrapped —ctx.run()already callscreate_taskinternally. Wrapping it in anothercreate_taskcreates an outer task that immediately exits after scheduling the inner_safetask. It works but is wasteful and confusing.The intended usage is ambiguous — should callers
awaitthe returned Task? Store it? Ignore it? The API doesn't make this clear.Concrete confusion this causes
A caller who correctly reads the return type might write:
Or they might search for workarounds and land on the double-wrapped pattern:
Suggested Fix
Return
Noneinstead ofasyncio.Task[None]. The task is already stored onself._taskto prevent GC — the return value serves no purpose for the caller:Callers then write the natural, unambiguous pattern:
If waiting for completion is needed (e.g. tests, batch scenarios), a separate
await ctx.run_and_wait(work)could be provided with explicit semantics.Additional Findings
Two other issues found while reading the source:
1. Race condition in
finish()callback — Theon_finishcallback runs without lock protection. Concurrentwrite_text()calls append toself._recordviaput_nowait()without a lock, so the record can be mutated whileon_finishis reading it.2.
StateStorelock can introduce event delivery jitter —StateStore.get()/set()acquire anasyncio.Lock. Heavy concurrentctx.storeaccess can block the stream generator'sawait queue.get(), causing uneven event delivery timing.Environment
ai-sdk-stream-python==0.2.0a7