Skip to content

fix(ai): wrap streaming responses in AsyncStreamWrapper to support async with#622

Open
devteamaegis wants to merge 4 commits into
PostHog:mainfrom
devteamaegis:fix/async-stream-context-manager
Open

fix(ai): wrap streaming responses in AsyncStreamWrapper to support async with#622
devteamaegis wants to merge 4 commits into
PostHog:mainfrom
devteamaegis:fix/async-stream-context-manager

Conversation

@devteamaegis
Copy link
Copy Markdown

Problem

When using the PostHog AI wrappers (posthog.ai.openai.AsyncOpenAI,
posthog.ai.anthropic.AsyncAnthropic) with libraries that expect streaming
responses to be async context managers (e.g. pydantic-ai, any code that
uses async with response:), a TypeError is raised immediately:

TypeError: 'async_generator' object does not support the
asynchronous context manager protocol

Root cause: both WrappedCompletions._create_streaming and
WrappedResponses._create_streaming (OpenAI), and
AsyncMessages._create_streaming (Anthropic), return a bare
async_generator() object. Async generators implement __aiter__ /
__anext__ but not __aenter__ / __aexit__.

The OpenAI and Anthropic SDKs return AsyncStream objects that implement
both protocols. The PostHog wrappers lost that compatibility.

Fixes #393.

Fix

Add posthog/ai/stream.py with a lightweight AsyncStreamWrapper class
that wraps an async generator and adds the missing context-manager protocol:

async with response as stream:   # now works
    async for chunk in stream:
        ...

Key behaviors:

  • __aenter__ returns self so async with response as r: async for c in r: works as expected.
  • __aexit__ calls aclose() on the underlying generator, ensuring the finally block (PostHog event capture) runs even when the caller breaks out early.
  • __getattr__ proxies attribute access to the underlying generator for provider-specific metadata.
  • Backward compatible: code using async for chunk in response: directly continues to work unchanged.

Changes

File Change
posthog/ai/stream.py New — AsyncStreamWrapper
posthog/ai/openai/openai_async.py Both _create_streaming methods return AsyncStreamWrapper(async_generator())
posthog/ai/anthropic/anthropic_async.py _create_streaming returns AsyncStreamWrapper(generator())
posthog/test/test_async_stream_wrapper.py 6 regression tests (no mocks, no external deps)

Tests

pytest posthog/test/test_async_stream_wrapper.py -v
# 6 passed

@devteamaegis devteamaegis requested a review from a team as a code owner May 22, 2026 18:56
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 22, 2026

Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 2
posthog/ai/openai/openai_async.py:3
`AsyncIterator` is imported but never referenced anywhere in the file — it was added in this PR but left unused.

```suggestion
from typing import Any, Dict, List, Optional
```

### Issue 2 of 2
posthog/test/test_async_stream_wrapper.py:88-106
**Prefer parameterised tests for similar cases**

`test_finally_block_runs_on_early_exit` and `test_finally_block_runs_on_full_exhaustion` share the same structure (build a generator with a `finally` side-effect, wrap it, drive it with `async with`, assert the flag). The team's preference is to express these as a single `@pytest.mark.parametrize` test so the assertion logic is written OnceAndOnlyOnce and new cases are cheap to add.

Reviews (1): Last reviewed commit: "test(ai): add regression tests for Async..." | Re-trigger Greptile

import time
import uuid
from typing import Any, Dict, List, Optional
from typing import Any, AsyncIterator, Dict, List, Optional
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 AsyncIterator is imported but never referenced anywhere in the file — it was added in this PR but left unused.

Suggested change
from typing import Any, AsyncIterator, Dict, List, Optional
from typing import Any, Dict, List, Optional
Prompt To Fix With AI
This is a comment left during a code review.
Path: posthog/ai/openai/openai_async.py
Line: 3

Comment:
`AsyncIterator` is imported but never referenced anywhere in the file — it was added in this PR but left unused.

```suggestion
from typing import Any, Dict, List, Optional
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +88 to +106
@pytest.mark.asyncio
async def test_finally_block_runs_on_full_exhaustion():
"""The underlying generator's finally block must also run on normal
exhaustion (``aclose()`` on an exhausted generator is a no-op)."""
finally_ran = []

async def gen_with_finally():
try:
yield 1
yield 2
finally:
finally_ran.append(True)

wrapper = AsyncStreamWrapper(gen_with_finally())
async with wrapper as stream:
async for _ in stream:
pass

assert finally_ran == [True]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Prefer parameterised tests for similar cases

test_finally_block_runs_on_early_exit and test_finally_block_runs_on_full_exhaustion share the same structure (build a generator with a finally side-effect, wrap it, drive it with async with, assert the flag). The team's preference is to express these as a single @pytest.mark.parametrize test so the assertion logic is written OnceAndOnlyOnce and new cases are cheap to add.

Prompt To Fix With AI
This is a comment left during a code review.
Path: posthog/test/test_async_stream_wrapper.py
Line: 88-106

Comment:
**Prefer parameterised tests for similar cases**

`test_finally_block_runs_on_early_exit` and `test_finally_block_runs_on_full_exhaustion` share the same structure (build a generator with a `finally` side-effect, wrap it, drive it with `async with`, assert the flag). The team's preference is to express these as a single `@pytest.mark.parametrize` test so the assertion logic is written OnceAndOnlyOnce and new cases are cheap to add.

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

@marandaneto marandaneto requested a review from a team May 23, 2026 09:28
@marandaneto
Copy link
Copy Markdown
Member

cc @PostHog/team-ai-observability

@marandaneto
Copy link
Copy Markdown
Member

Thanks for the PR — the underlying issue is real and the overall direction makes sense. Issue #393 happens because our async streaming wrappers return a bare async generator, while callers like pydantic-ai expect the provider stream to also support async with response:.

Before we merge, could you please address a few things?

  1. Close the original provider stream on context exit
    AsyncStreamWrapper.__aexit__() currently closes only the PostHog wrapper generator. It does not close the original OpenAI/Anthropic SDK stream / HTTP response. Native SDK streams call close() on context exit, so the wrapper should also close the original provider stream when available, e.g. via close() or aclose().

  2. Fix or remove the attribute proxy behavior
    __getattr__ currently proxies to the local async generator, not the original provider stream. That means the docstring/test claim about provider metadata like .response is misleading unless the wrapper also keeps a reference to the provider stream and proxies to that.

  3. Fix lint
    Ruff currently fails because AsyncIterator is imported but unused in posthog/ai/openai/openai_async.py.

  4. Add regression tests around the actual wrapped clients
    The direct AsyncStreamWrapper tests are useful, but we should also cover the real behavior:

    • AsyncOpenAI.chat.completions.create(..., stream=True) works with async with
    • AsyncOpenAI.responses.create(..., stream=True) works with async with
    • AsyncAnthropic.messages.create(..., stream=True) works with async with
    • early exit from the context closes the original provider stream
  5. Clarify / handle Anthropic .messages.stream() compatibility
    AsyncAnthropic.messages.stream() is still an async def, so native Anthropic-style usage like async with client.messages.stream(...) still returns a coroutine and fails unless awaited first. If this PR intends to restore Anthropic stream API compatibility, that needs addressing; otherwise the PR description should narrow the claim to messages.create(..., stream=True).

I verified the new wrapper tests and existing OpenAI/Anthropic tests pass, but lint currently fails on the unused import.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

PostHog with pydantic-ai streaming raises TypeError: 'async_generator'

2 participants