Skip to content

fix(sdk): support async generator functions in control() decorator#116

Open
KazChe wants to merge 1 commit intoagentcontrol:mainfrom
KazChe:fix/async-generator-control-decorator
Open

fix(sdk): support async generator functions in control() decorator#116
KazChe wants to merge 1 commit intoagentcontrol:mainfrom
KazChe:fix/async-generator-control-decorator

Conversation

@KazChe
Copy link

@KazChe KazChe commented Mar 12, 2026

Summary

  • adds async generator (streaming) support to the control() decorator, fixing crashes and silent bypasses when applied to async def functions with yield — the standard pattern for LLM response streaming.

Scope

  • User-facing/API changes: @control() now correctly wraps async generator functions, preserving inspect.isasyncgenfunction() identity
  • Internal changes: New async_gen_wrapper path in control() that runs pre-check before first chunk, yields chunks in real-time while accumulating, and runs post-check on full accumulated output after stream completes
  • Out of scope: Sync generator support, per-chunk evaluation

Risk and Rollout

  • Risk level: low
  • Rollback plan: Revert commit — the change is additive (new code path) with no modifications to existing sync/async wrapper logic

Testing

  • Added or updated automated tests
  • Ran make check (lint + typecheck pass; server tests have pre-existing postgres failures unrelated to this change)
  • Manually verified behavior

Checklist

The control() decorator silently broke on streaming (async generator)
functions — the standard pattern for LLM response streaming.This adds
an async_gen_wrapper path that runs pre-check before the first chunk,
yields chunks in real-time while accumulating output, and runs
post-check on the full accumulated output after the stream completes.
fixes agentcontrol#113
@abhinav-galileo abhinav-galileo self-requested a review March 12, 2026 14:43
@KazChe
Copy link
Author

KazChe commented Mar 13, 2026

Hihi, sdk-ts-ci check failin due to a missing SPEAKEASY_API_KEY secret that'snot available to fork PRs and unrelated to the changes in this PR (python sdk only)

# Yield chunks while accumulating full output for post-check
accumulated: list[str] = []
async for chunk in func(*args, **kwargs):
accumulated.append(str(chunk))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

str(chunk) is lossy - if chunks are dicts, dataclasses, or framework message objects, the original data can't be recovered for the post-check. Can we handle non-string chunks here?

@lan17
Copy link
Contributor

lan17 commented Mar 16, 2026

Hihi, sdk-ts-ci check failin due to a missing SPEAKEASY_API_KEY secret that'snot available to fork PRs and unrelated to the changes in this PR (python sdk only)

thanks for pointing this out, we will work on fixing this.

@namrataghadi-galileo
Copy link
Contributor

Please also add tests for:
Early termination: break after first chunk, agen.aclose(), task cancellation.
Structured chunks: dict/bytes/object chunks, not only strings.
Generator failure mid-stream: exception after some chunks to confirm cleanup/state/logging behavior.

# PRE-EXECUTION: Check controls with check_stage="pre"
await _run_control_check(ctx, "pre", ctx.pre_payload(), controls)

# Yield chunks while accumulating full output for post-check
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This path only runs the post-stage check after the async for finishes. If the caller breaks early, cancels the task, or the client disconnects mid-stream, Python closes the wrapper and runs finally, but never reaches the post-check. That leaves a real bypass for streaming consumers, which is exactly where partial reads are common. This path needs explicit handling/documentation and tests for break, cancellation, and aclose().


# Yield chunks while accumulating full output for post-check
accumulated: list[str] = []
async for chunk in func(*args, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike the existing non-streaming wrapper, this implementation yields every chunk to the caller before enforcing the post-stage result. A post-stage deny or steer therefore raises only after the full response has already been delivered, so it no longer provides the same fail-closed behavior as control() on normal async functions. If that tradeoff is intentional, it needs to be called out very clearly; otherwise the stream must be buffered or evaluated chunk-by-chunk.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

control() decorator can break on streaming (async generator) functions for agents that have stream=true for latency.

4 participants