feat(agent): support live streaming output with Markdown rendering#56
feat(agent): support live streaming output with Markdown rendering#56
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces live streaming for agent responses, utilizing rich.live to display text and thinking blocks in real-time. The core logic is refactored into a new _stream_response function that handles various content block deltas. Review feedback identifies a formatting issue where rendering Markdown for every thinking delta causes incorrect vertical alignment. Additionally, a critical bug was found where returning MessageDeltaUsage instead of the final aggregated message results in missing input_tokens and potential runtime errors during token tracking. Improvements to type hints for usage objects were also suggested.
|
|
||
| def _flush_thinking(text: str) -> None: | ||
| """Print thinking text immediately so it appears live.""" | ||
| console.print(Markdown(text), end="", style=LIGHT_HINT_STYLE_RICH) |
There was a problem hiding this comment.
Using Markdown(text) for every thinking delta is incorrect for streaming. Markdown is a block-level element in rich and will append a newline after every call, causing the thinking output to render vertically (one delta per line). Additionally, it is inefficient to create a new Markdown object for every token. Since thinking blocks are intended to be streamed inline with dim styling, you should print the raw text directly.
| console.print(Markdown(text), end="", style=LIGHT_HINT_STYLE_RICH) | |
| console.print(text, end="", style=LIGHT_HINT_STYLE_RICH) |
| _stop_live() | ||
|
|
||
| return _StreamedResponse(content_blocks, usage, stop_reason) |
There was a problem hiding this comment.
The current implementation of _stream_response has a critical bug: it returns a MessageDeltaUsage object (captured from the message_delta event) which lacks input_tokens. This causes an AttributeError in agent_loop at line 239.
Instead of manually accumulating usage and content, you should use stream.get_final_message() after the loop. This ensures you get the fully aggregated message with complete usage statistics (including cache hits/misses) and correctly parsed tool inputs.
| _stop_live() | |
| return _StreamedResponse(content_blocks, usage, stop_reason) | |
| _stop_live() | |
| final_msg = stream.get_final_message() | |
| return _StreamedResponse(final_msg.content, final_msg.usage, final_msg.stop_reason) |
| usage: anthropic.types.Usage | ||
| | anthropic.types.message_delta_usage.MessageDeltaUsage | ||
| | None, |
There was a problem hiding this comment.
The MessageDeltaUsage type should be removed from the type hint. If the response is built correctly using the final state of the stream, the usage will always be a full anthropic.types.Usage object. Including MessageDeltaUsage is misleading as that type lacks critical fields like input_tokens which are required by the token tracker.
usage: anthropic.types.Usage | None,| elif event.type == "message_delta": | ||
| stop_reason = event.delta.stop_reason or stop_reason | ||
| usage = event.usage | ||
|
|
||
| _stop_live() |
There was a problem hiding this comment.
🔴 Input and cache token tracking always reports 0 because message_start event usage is never captured
The _stream_response function only captures usage from message_delta events (line 177), but the Anthropic streaming API reports input_tokens, cache_creation_input_tokens, and cache_read_input_tokens in the message_start event, not in message_delta. The MessageDeltaUsage object returned by message_delta has all of these fields defaulting to None (verified against anthropic SDK v0.94.0). As a result, input_tokens is always None → 0 (via or 0 at line 241), and cache tokens are always None → 0 (via getattr(..., 0) or 0 at lines 237-238). The old code used stream.get_final_message() which internally combined usage from both message_start and message_delta into a complete Usage object. The new manual event loop needs to also handle message_start events to capture the input token counts.
MessageDeltaUsage defaults
MessageDeltaUsage(output_tokens=100) yields:
input_tokens: Nonecache_creation_input_tokens: Nonecache_read_input_tokens: None
These are all Optional[int] fields that default to None, unlike Usage where input_tokens is required.
Prompt for agents
The _stream_response function in src/mini_agent/agent/agent.py needs to also handle message_start events to capture input token usage. Currently only message_delta events are handled (lines 175-179), which provides a MessageDeltaUsage object that has input_tokens=None, cache_creation_input_tokens=None, and cache_read_input_tokens=None.
The fix should:
1. Add handling for event.type == 'message_start' in the event loop (around line 107).
2. From the message_start event, extract event.message.usage which is an anthropic.types.Usage object containing the actual input_tokens and cache token counts.
3. Store this initial usage separately (e.g. as initial_usage).
4. When building the final _StreamedResponse, combine the input token info from message_start with the output_tokens from message_delta. One approach: store both usages and merge them, or just track input_tokens/cache tokens from message_start and output_tokens from message_delta separately, then construct a single Usage object to return.
The old code used stream.get_final_message() which internally handled this merging. The new manual event processing must replicate that behavior.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
3 issues found across 1 file
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="src/mini_agent/agent/agent.py">
<violation number="1" location="src/mini_agent/agent/agent.py:61">
P2: `Markdown(text)` is a block-level renderable in Rich — each call produces a self-contained block with trailing newlines. Using it for every tiny thinking delta means each token is rendered as a separate Markdown paragraph, causing vertical stacking instead of inline streaming. Since thinking output is meant to stream inline with dim styling, print the raw text directly instead.</violation>
<violation number="2" location="src/mini_agent/agent/agent.py:104">
P1: The `Live` display is not cleaned up if an exception occurs during streaming. Wrap the streaming loop in `try/finally` to ensure `_stop_live()` is always called, preventing terminal corruption on errors or `KeyboardInterrupt`.</violation>
<violation number="3" location="src/mini_agent/agent/agent.py:177">
P1: `message_delta` events carry a `MessageDeltaUsage` object that only contains `output_tokens`. The `input_tokens`, `cache_creation_input_tokens`, and `cache_read_input_tokens` fields are reported in the `message_start` event's `Usage` object, which this event loop never handles. As a result, `usage.input_tokens` downstream will either raise `AttributeError` or silently report 0, breaking all input/cache token tracking.
Add a handler for `message_start` events to capture the initial `Usage` (e.g., `event.message.usage`), then merge the input-side counts from `message_start` with the `output_tokens` from `message_delta` when constructing `_StreamedResponse`. The previous `stream.get_final_message()` call did this merge internally.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| ) | ||
| live_display.start() | ||
|
|
||
| with client.messages.stream(**stream_kwargs) as stream: |
There was a problem hiding this comment.
P1: The Live display is not cleaned up if an exception occurs during streaming. Wrap the streaming loop in try/finally to ensure _stop_live() is always called, preventing terminal corruption on errors or KeyboardInterrupt.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/mini_agent/agent/agent.py, line 104:
<comment>The `Live` display is not cleaned up if an exception occurs during streaming. Wrap the streaming loop in `try/finally` to ensure `_stop_live()` is always called, preventing terminal corruption on errors or `KeyboardInterrupt`.</comment>
<file context>
@@ -35,6 +38,149 @@
+ )
+ live_display.start()
+
+ with client.messages.stream(**stream_kwargs) as stream:
+ for event in stream:
+ if event.type == "content_block_start":
</file context>
df7e053 to
23a23cf
Compare
Stream AI responses token-by-token while keeping the SDK-built Message object. The _display_stream_events() function handles only visual output (Live Markdown for text, inline flush for thinking), and stream.get_final_message() still provides the proper SDK-built response for tool execution — no manual block reconstruction. - Add _display_stream_events() to drive live display from stream events - Use stream.get_final_message() for the correctly-built Message object - Stop the Thinking spinner before streaming so it doesn't fight Live - Guard against None input_tokens from MessageDeltaUsage Co-authored-by: kowyo-bot <258374017+kowyo-bot@users.noreply.github.com>
23a23cf to
82535a5
Compare
Each thinking delta was wrapped in Markdown() which caused Rich to emit unwanted trailing newlines per chunk. Print raw strings instead and use console.print() (not bare print()) for the final newline after the thinking block ends, so Rich's line tracking stays consistent. Co-authored-by: kowyo-bot <258374017+kowyo-bot@users.noreply.github.com>
The newline emitted by console.print() at content_block_stop can be buffered and lost when Rich's Live display starts immediately after for the next text block. Explicitly flush stdout after each block-stop newline. Co-authored-by: kowyo-bot <258374017+kowyo-bot@users.noreply.github.com>
…etween thinking and text Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1bf0f6f to
fdfa7ba
Compare
Description
Adds live streaming output so users see AI responses token-by-token as they arrive, rather than waiting for the full response. Markdown rendering is preserved during streaming via
rich.live.Live.How it works
rich.live.Live+Markdown— code fences, headings, bold, lists all appear live as the model writes themImplementation
_stream_response()iterates AnthropicRawStreamEventitems and drives live display_StreamedResponsethin wrapper replaces theMessageobject previously obtained fromstream.get_final_message()_flush_thinking()prints thinking deltas withsys.stdout.flush()for immediacyThinking…spinner before streaming begins so it doesn't fight with LiveBefore/After
stream.get_final_message()blocks entire responsefor event in stream:emits per-tokenType of change
Test
ruff checkandruff formatpasscommitizen checkpasses_StreamedResponse, stream kwarg building for all effort levels