add new features#41
Conversation
Adds an opt-in collect: bool = False parameter to StreamContext that accumulates all emitted content (text, reasoning, tool calls, sources, step count, finish reason) into a StreamRecord accessible via ctx.record. Useful for persisting conversation turns to a database after streaming.
feat: add collect=True to StreamContext for stream data recording
* feat: add typed custom_information support to StreamContext Closes #6 - StreamContext is now Generic[_InfoT] (bound to pydantic.BaseModel) - New `custom_information` constructor parameter stores a read-only Pydantic model accessible via ctx.info throughout the stream lifecycle - Useful for carrying request-scoped metadata (user_id, rate_limit, etc.) through service layers without extra function arguments - 5 new tests; pyright passes with 0 errors * fix: correct docstring type and test organization for custom_information - Fix `info : _InfoT` → `info : _InfoT | None` in class Attributes docstring - Group custom_information tests into TestCustomInformation class
Add ErrorEvent Pydantic model (type: "error", errorText: str), ctx.error(error_text) which emits the event then terminates the stream, and exports ErrorEvent from the top-level package. Resolves: #8
Add AbortEvent model (type: "abort", reason: str | None) and update ctx.abort() to accept an optional reason and emit the event before the [DONE] sentinel. Backward-compatible: bare ctx.abort() still works and omits the reason field from the wire output. Resolves: #13
Add FileEvent model (type: "file", url, mediaType), ctx.write_file() which auto-emits start/start-step before the event, FileRecord for collection, and files list on StreamRecord with to_dict() support. Resolves: #9
Add DataEvent model (dynamic data-{name} type), ctx.write_data() with
name validation and transient flag, DataPartRecord for collection, and
data_parts list on StreamRecord with to_dict() support. Transient parts
are sent on the wire but not persisted to ctx.record.
Resolves: #10
Add start_tool_input(), stream_tool_input_delta(), and finish_tool_input() to StreamContext. start_tool_input emits tool-input-start and returns a ToolCallHandle; each stream_tool_input_delta emits a tool-input-delta; finish_tool_input emits tool-input-available and updates the collected ToolCallRecord with the final input dict. begin_tool_call() is unchanged. Resolves: #11
* Initial plan * feat: add on_finish callback for post-stream persistence Co-authored-by: shloimy-wiesel <144027408+shloimy-wiesel@users.noreply.github.com> Agent-Logs-Url: https://github.com/shloimy-wiesel/ai-sdk-stream-python/sessions/5f2f939a-91ee-46f4-9d8e-5abc6ef8a02e --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: shloimy-wiesel <144027408+shloimy-wiesel@users.noreply.github.com>
…vent `messageMetadata` (#26) feat: support start_metadata on StreamContext for start event messageMetadata Co-authored-by: shloimy-wiesel <144027408+shloimy-wiesel@users.noreply.github.com> Agent-Logs-Url: https://github.com/shloimy-wiesel/ai-sdk-stream-python/sessions/e1fc71ca-6c44-48e5-9f74-df0da0cb80f5 --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: shloimy-wiesel <144027408+shloimy-wiesel@users.noreply.github.com>
When a background task raises an unhandled exception the stream now terminates with an error event instead of blocking the client forever. run() also auto-calls finish() if the coroutine returns without it. Resolves: #20
Adds a new optional contrib module that eliminates the ~40-line boilerplate every OpenAI + StreamContext backend must write. - Maps delta.content → ctx.write_text() - Maps delta.reasoning / delta.reasoning_content → ctx.write_reasoning() - Buffers tool call chunks by index; emits tool-input-start + tool-input-available at end (or streams deltas when stream_tool_deltas=True) - Extracts finish_reason and usage (when stream_options include_usage=True) - No hard dependency on the openai package — uses duck typing throughout - Returns ConsumeResult with content, tool_calls, finish_reason, usage Closes #23
…izer Closes #17 - Add `reasoning_tokens`, `answer_tokens`, `prompt_tokens` fields to `StreamRecord` - Add `total_output_tokens` and `total_tokens` computed properties - `write_text` and `write_reasoning` now auto-increment token counts via configurable `count_func` (defaults to `len`, i.e. character count) - Add `StreamContext(count_func=...)` parameter to swap in any `Callable[[str], int]` (e.g. tiktoken, word count) - Add `ctx.set_usage(prompt_tokens, reasoning_tokens, answer_tokens)` to override auto-counted values with exact LLM-reported numbers - Include all token fields in `StreamRecord.to_dict()`
feat: add timing (created_at/finished_at/duration_ms) to StreamRecord Co-authored-by: shloimy-wiesel <144027408+shloimy-wiesel@users.noreply.github.com> Agent-Logs-Url: https://github.com/shloimy-wiesel/ai-sdk-stream-python/sessions/781f63ee-0c28-4c82-bcc1-73f004039b4d * fix: address review comments on StreamRecord timing fields - Remove dead-code guard in to_dict(): created_at is non-optional so the `if self.created_at else None` branch was never reachable - Document created_at, finished_at, and duration_ms in StreamRecord docstring - Rename test_finished_at_none_before_finish → test_finished_at_none_initially to avoid confusion (the test called finish() internally) --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: shloimy-wiesel <144027408+shloimy-wiesel@users.noreply.github.com> Co-authored-by: shloimy wiesel <w.63071@gmail.com>
…cstring Expand the ctx.run() docstring to describe its three safety guarantees (auto-finish, auto-error, task GC prevention) with a full FastAPI example. Update all README examples to use ctx.run() instead of manual try/finally + asyncio.create_task(). Add a dedicated "ctx.run() — safe task runner" section to the StreamContext API docs. Closes #29
Add a `collect` keyword parameter (default True) to write_text, write_reasoning, write_source, write_file, begin_tool_call, and start_tool_input. When collect=False, the event is still emitted to the SSE stream but not recorded in ctx.record. This lets developers stream ephemeral content (status messages, internal tool calls) without polluting the persisted record. Closes #30
… context Change per-call collect parameter default from True to None (follow context setting). When a caller explicitly passes collect=True but the StreamContext was created with collect=False, raise RuntimeError instead of silently discarding the data. This prevents a class of silent bugs where developers think they are collecting but the context has no record. Three-way semantics: - None (default): collect if context-level collection is enabled - True: require collection; raise if no record exists - False: skip collection even if context-level is enabled
The AI SDK v6 wire protocol allows the `data` field in `data-{name}`
events to be any JSON value (string, number, null, array, or object).
The previous `dict[str, Any]` constraint made common patterns like
streaming chat titles or signaling artifact completion impossible.
Closes #34
…aEvent.encode() - write_data() now accepts collect: bool | None = None and routes through _should_collect(), consistent with all other high-level write helpers - DataEvent.encode() uses model_dump_json() (Pydantic's serializer) instead of model_dump() + json.dumps(), fixing TypeError for non-JSON-native types (datetime, UUID, Enum, etc.) inside the data field
…json-value fix: write_data() accepts any JSON-serializable value, not just dict
|
Someone is attempting to deploy this pull request to the Shloimy's projects Team on Vercel. No GitHub account was found matching the commit author email address. To deploy this pull request, the commit author's email address needs to be associated with a GitHub account. Learn more about how to change the commit author information. |
Converts AI SDK v6 UIMessage parts-based format to
list[ChatCompletionMessageParam] for the OpenAI API, completing the
input → output pipeline in contrib.openai alongside consume_openai_stream().
Part mapping:
- TextUIPart → content string or {"type":"text"} block
- FileUIPart (image) → {"type":"image_url"} content block
- ToolUIPart (output-available) → assistant tool_calls + role:"tool" message
- ToolUIPart (output-error) → assistant tool_calls + tool error message
- ToolUIPart (other states) → assistant tool_calls only
- ReasoningUIPart → dropped by default; included in <reasoning> tags when
include_reasoning=True
Closes #31.
- Root AGENTS.md: comprehensive overview of what library solves, state machine architecture, event flow, wire protocol, capabilities, conventions, anti-patterns - src/ai_sdk_stream_python/AGENTS.md: internal implementation details, module map, method→event mapping, collection fields, OpenAI adapter internals
There was a problem hiding this comment.
Pull request overview
This PR expands ai-sdk-stream-python toward a fuller AI SDK v6 backend toolkit by adding request-body Pydantic types, stream data collection, and optional OpenAI integration helpers alongside new event types and richer StreamContext capabilities.
Changes:
- Add
StreamContextfeatures: collection (collect=True+ctx.record),ctx.run()safe runner, custom info (ctx.info), streaming tool-input deltas, and error/abort/data/file helpers. - Add new protocol models/events (
file,error,abort, dynamicdata-*) and request deserialization models (ChatRequest,UIMessage,*UIPart). - Add
contrib.openaiadapter utilities with extensive tests and documentation updates.
Reviewed changes
Copilot reviewed 16 out of 17 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
uv.lock |
Updates editable package version entry (currently mismatched with pyproject.toml). |
pyproject.toml |
Bumps project version and defines build/test/tooling configuration. |
README.md |
Documents ctx.run(), collection, and ctx.info usage and API guidance. |
CHANGELOG.md |
Records new alpha releases and feature additions. |
AGENTS.md |
Project-level knowledge base for architecture and conventions. |
src/ai_sdk_stream_python/AGENTS.md |
Library-internals documentation and state machine/event mapping details. |
src/ai_sdk_stream_python/context.py |
Core StreamContext enhancements: collection, callbacks, metadata, data/file/error/abort, tool delta streaming, run(). |
src/ai_sdk_stream_python/events.py |
Adds FileEvent, ErrorEvent, AbortEvent, and DataEvent encode behavior; updates union exports. |
src/ai_sdk_stream_python/collect.py |
Introduces StreamRecord and related record dataclasses for persistence/auditing. |
src/ai_sdk_stream_python/types.py |
Adds Pydantic models for incoming AI SDK v6 request payloads. |
src/ai_sdk_stream_python/contrib/openai.py |
Adds OpenAI stream consumer and UIMessage→OpenAI message conversion utilities. |
src/ai_sdk_stream_python/contrib/__init__.py |
Introduces contrib package marker/docstring. |
src/ai_sdk_stream_python/__init__.py |
Re-exports new public API surface (records, types, new events, callback type). |
tests/test_context.py |
Extends context tests for new tool delta streaming, data/file, abort/error, callbacks, metadata, and run(). |
tests/test_collect.py |
Adds comprehensive tests for collect=True and per-call collect tri-state semantics. |
tests/test_types.py |
Adds tests for request-body Pydantic models and discriminated unions. |
tests/test_contrib_openai.py |
Adds extensive tests for OpenAI adapter behavior (text/reasoning/tool calls/messages conversion). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| fn_name = getattr(fn, "name", None) | ||
| if fn_name: | ||
| buf["name"] += fn_name |
There was a problem hiding this comment.
fn.name is appended to buf["name"], which can duplicate the tool name if the stream repeats the name on multiple chunks (resulting in e.g. searchsearch). Consider only setting the name when it’s empty, or otherwise de-duplicating, while still supporting truly incremental name streaming if needed.
| buf["name"] += fn_name | |
| # Avoid duplicating the tool name across chunks. | |
| # If we don't have a name yet, just set it. | |
| if not buf["name"]: | |
| buf["name"] = fn_name | |
| else: | |
| # Support incremental name streaming (e.g. "se" -> "sea" -> "search"): | |
| # if the new chunk starts with the existing name, treat it as an update. | |
| if fn_name.startswith(buf["name"]): | |
| buf["name"] = fn_name | |
| # If we already have the full name (or a longer variant), do nothing. | |
| elif buf["name"].endswith(fn_name): | |
| pass | |
| else: | |
| # Fallback: append only when it doesn't clearly duplicate. | |
| buf["name"] += fn_name |
| if stream_tool_deltas: | ||
| # Start streaming once the tool name is known | ||
| if buf["handle"] is None and buf["name"]: | ||
| buf["handle"] = await ctx.start_tool_input( | ||
| buf["name"], | ||
| tool_call_id=buf["id"] or None, | ||
| ) | ||
| if buf["handle"] is not None: | ||
| await ctx.stream_tool_input_delta( | ||
| buf["handle"].toolCallId, | ||
| fn_args, | ||
| ) | ||
|
|
||
| # ── Finalize tool calls ────────────────────────────────────────────────── | ||
| result_tool_calls: list[dict[str, str]] = [] | ||
| for idx in sorted(tool_buffer): | ||
| buf = tool_buffer[idx] | ||
| try: | ||
| parsed_input: dict[str, Any] = json.loads(buf["arguments"] or "{}") | ||
| except json.JSONDecodeError: | ||
| parsed_input = {} | ||
|
|
||
| if stream_tool_deltas: | ||
| if buf["handle"] is not None: | ||
| await ctx.finish_tool_input( | ||
| buf["handle"].toolCallId, | ||
| buf["name"], | ||
| parsed_input, | ||
| ) | ||
| else: | ||
| # No arguments were streamed (zero-arg tool or empty chunks); | ||
| # fall back to the non-streaming path. | ||
| await ctx.begin_tool_call( | ||
| buf["name"], | ||
| parsed_input, | ||
| tool_call_id=buf["id"] or None, | ||
| ) | ||
| else: | ||
| await ctx.begin_tool_call( | ||
| buf["name"], | ||
| parsed_input, | ||
| tool_call_id=buf["id"] or None, | ||
| ) | ||
|
|
||
| result_tool_calls.append( | ||
| { | ||
| "id": buf["id"], | ||
| "name": buf["name"], | ||
| "arguments": buf["arguments"], | ||
| } | ||
| ) |
There was a problem hiding this comment.
When tc.id is missing (or arrives after start_tool_input is called), StreamContext will generate a new toolCallId but ConsumeResult.tool_calls still records id as an empty string / possibly a different value. Capture and reuse the actual ToolCallHandle.toolCallId returned by start_tool_input/begin_tool_call so emitted events and the returned ConsumeResult stay consistent.
|
@copilot apply changes based on the comments in this thread |
…y in consume_openai_stream Agent-Logs-Url: https://github.com/shloimy-wiesel/ai-sdk-stream-python/sessions/f0d42dd6-0fe4-44ed-be7e-1e5e9abaaf43 Co-authored-by: shloimy-wiesel <144027408+shloimy-wiesel@users.noreply.github.com>
Both review comments have been addressed in
Added 4 new tests covering both edge cases in streaming and non-streaming modes. All 237 tests pass. |
…xample only) - Implemented 5 tools: getWeather, createDocument, updateDocument, editDocument, requestSuggestions - Fixed conversation history persistence: assistant messages now saved to Redis - Uses StreamContext for lifecycle-safe streaming - Redis for session persistence (chat history + documents) - Handles delta strategy per artifact kind (text=incremental, code/sheet=full) - All tools verified working via browser Playwright tests
consume_openai_stream— only setbuf["name"]when empty instead of appendingToolCallHandle.toolCallIdfrombegin_tool_call/start_tool_inputand use it inConsumeResult.tool_calls💬 Send tasks to Copilot coding agent from Slack and Teams to turn conversations into code. Copilot posts an update in your thread when it's finished.