fix(ai): wrap streaming responses in AsyncStreamWrapper to support async with#622
fix(ai): wrap streaming responses in AsyncStreamWrapper to support async with#622devteamaegis wants to merge 4 commits into
Conversation
Prompt To Fix All With AIFix the following 2 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 2
posthog/ai/openai/openai_async.py:3
`AsyncIterator` is imported but never referenced anywhere in the file — it was added in this PR but left unused.
```suggestion
from typing import Any, Dict, List, Optional
```
### Issue 2 of 2
posthog/test/test_async_stream_wrapper.py:88-106
**Prefer parameterised tests for similar cases**
`test_finally_block_runs_on_early_exit` and `test_finally_block_runs_on_full_exhaustion` share the same structure (build a generator with a `finally` side-effect, wrap it, drive it with `async with`, assert the flag). The team's preference is to express these as a single `@pytest.mark.parametrize` test so the assertion logic is written OnceAndOnlyOnce and new cases are cheap to add.
Reviews (1): Last reviewed commit: "test(ai): add regression tests for Async..." | Re-trigger Greptile |
| import time | ||
| import uuid | ||
| from typing import Any, Dict, List, Optional | ||
| from typing import Any, AsyncIterator, Dict, List, Optional |
There was a problem hiding this comment.
AsyncIterator is imported but never referenced anywhere in the file — it was added in this PR but left unused.
| from typing import Any, AsyncIterator, Dict, List, Optional | |
| from typing import Any, Dict, List, Optional |
Prompt To Fix With AI
This is a comment left during a code review.
Path: posthog/ai/openai/openai_async.py
Line: 3
Comment:
`AsyncIterator` is imported but never referenced anywhere in the file — it was added in this PR but left unused.
```suggestion
from typing import Any, Dict, List, Optional
```
How can I resolve this? If you propose a fix, please make it concise.| @pytest.mark.asyncio | ||
| async def test_finally_block_runs_on_full_exhaustion(): | ||
| """The underlying generator's finally block must also run on normal | ||
| exhaustion (``aclose()`` on an exhausted generator is a no-op).""" | ||
| finally_ran = [] | ||
|
|
||
| async def gen_with_finally(): | ||
| try: | ||
| yield 1 | ||
| yield 2 | ||
| finally: | ||
| finally_ran.append(True) | ||
|
|
||
| wrapper = AsyncStreamWrapper(gen_with_finally()) | ||
| async with wrapper as stream: | ||
| async for _ in stream: | ||
| pass | ||
|
|
||
| assert finally_ran == [True] |
There was a problem hiding this comment.
Prefer parameterised tests for similar cases
test_finally_block_runs_on_early_exit and test_finally_block_runs_on_full_exhaustion share the same structure (build a generator with a finally side-effect, wrap it, drive it with async with, assert the flag). The team's preference is to express these as a single @pytest.mark.parametrize test so the assertion logic is written OnceAndOnlyOnce and new cases are cheap to add.
Prompt To Fix With AI
This is a comment left during a code review.
Path: posthog/test/test_async_stream_wrapper.py
Line: 88-106
Comment:
**Prefer parameterised tests for similar cases**
`test_finally_block_runs_on_early_exit` and `test_finally_block_runs_on_full_exhaustion` share the same structure (build a generator with a `finally` side-effect, wrap it, drive it with `async with`, assert the flag). The team's preference is to express these as a single `@pytest.mark.parametrize` test so the assertion logic is written OnceAndOnlyOnce and new cases are cheap to add.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
|
cc @PostHog/team-ai-observability |
|
Thanks for the PR — the underlying issue is real and the overall direction makes sense. Issue #393 happens because our async streaming wrappers return a bare async generator, while callers like pydantic-ai expect the provider stream to also support Before we merge, could you please address a few things?
I verified the new wrapper tests and existing OpenAI/Anthropic tests pass, but lint currently fails on the unused import. |
Problem
When using the PostHog AI wrappers (
posthog.ai.openai.AsyncOpenAI,posthog.ai.anthropic.AsyncAnthropic) with libraries that expect streamingresponses to be async context managers (e.g. pydantic-ai, any code that
uses
async with response:), aTypeErroris raised immediately:Root cause: both
WrappedCompletions._create_streamingandWrappedResponses._create_streaming(OpenAI), andAsyncMessages._create_streaming(Anthropic), return a bareasync_generator()object. Async generators implement__aiter__/__anext__but not__aenter__/__aexit__.The OpenAI and Anthropic SDKs return
AsyncStreamobjects that implementboth protocols. The PostHog wrappers lost that compatibility.
Fixes #393.
Fix
Add
posthog/ai/stream.pywith a lightweightAsyncStreamWrapperclassthat wraps an async generator and adds the missing context-manager protocol:
Key behaviors:
__aenter__returnsselfsoasync with response as r: async for c in r:works as expected.__aexit__callsaclose()on the underlying generator, ensuring thefinallyblock (PostHog event capture) runs even when the caller breaks out early.__getattr__proxies attribute access to the underlying generator for provider-specific metadata.async for chunk in response:directly continues to work unchanged.Changes
posthog/ai/stream.pyAsyncStreamWrapperposthog/ai/openai/openai_async.py_create_streamingmethods returnAsyncStreamWrapper(async_generator())posthog/ai/anthropic/anthropic_async.py_create_streamingreturnsAsyncStreamWrapper(generator())posthog/test/test_async_stream_wrapper.pyTests