From ca3259a14553ef8a34f185031373fe1f7bf4fbb9 Mon Sep 17 00:00:00 2001 From: Seth Raphael Date: Mon, 23 Feb 2026 21:24:13 -0800 Subject: [PATCH 1/4] Add HTTP streaming demo, integration tests, and fix streaming bugs Add a streaming demo example that showcases all three streaming patterns (delta, one-shot, HTTP) side-by-side, along with comprehensive integration tests for DeltaStreamer and delta consumption. Fixes discovered during testing: - HTTP streaming showing duplicate content: clear httpText when streaming ends, hide pending DB message during HTTP stream, render tool call parts inline in the HTTP bubble - One-shot mode using useMutation instead of useAction - useSmoothText animating non-streaming messages: track hasStreamed and snap cursor to full text when streaming was never activated - DeltaStreamer.finish()/fail() not fully draining the self-chaining write queue (use while loop instead of single await) - Unhandled rejection in #sendDelta: return instead of re-throw after onAsyncAbort since the fire-and-forget caller cannot catch it Also adds backwards-compat test suite and HTTP streaming requirements doc. Co-Authored-By: Claude Opus 4.6 --- docs/http-streaming-requirements.md | 302 ++++++ example/convex/_generated/api.d.ts | 2 + example/convex/chat/streamingDemo.ts | 189 ++++ example/ui/chat/StreamingDemo.tsx | 592 +++++++++++ example/ui/main.tsx | 16 + src/backwards-compat.test.ts | 1007 ++++++++++++++++++ src/client/streaming.integration.test.ts | 1211 ++++++++++++++++++++++ src/client/streaming.ts | 12 +- src/react/useSmoothText.ts | 17 + 9 files changed, 3345 insertions(+), 3 deletions(-) create mode 100644 docs/http-streaming-requirements.md create mode 100644 example/convex/chat/streamingDemo.ts create mode 100644 example/ui/chat/StreamingDemo.tsx create mode 100644 src/backwards-compat.test.ts create mode 100644 src/client/streaming.integration.test.ts diff --git a/docs/http-streaming-requirements.md b/docs/http-streaming-requirements.md new file mode 100644 index 00000000..5050d41e --- /dev/null +++ b/docs/http-streaming-requirements.md @@ -0,0 +1,302 @@ +# Technical Requirements: HTTP Streaming for @convex-dev/agent + +## Status: Draft +## Date: 2026-02-23 + +--- + +## 1. Executive Summary + +The current streaming architecture relies exclusively on Convex's reactive query system (WebSocket-based delta polling). This document specifies requirements for adding HTTP streaming support, including delta filtering logic, stream ID lifecycle management, and backwards compatibility constraints. + +--- + +## 2. Current Architecture + +### 2.1 Streaming Transport (WebSocket Delta Polling) + +The existing system persists stream data as discrete deltas in the database, which clients poll via Convex reactive queries. There is no HTTP streaming transport. + +**Flow:** +1. `DeltaStreamer` (client action) writes compressed parts via `streams.addDelta` mutations +2. React hooks (`useDeltaStreams`) issue two reactive queries per render cycle: + - `kind: "list"` — discovers active `streamingMessages` for the thread + - `kind: "deltas"` — fetches new deltas using per-stream cursors +3. `deriveUIMessagesFromDeltas()` materializes `UIMessage[]` from accumulated deltas + +**Key files:** +- `src/client/streaming.ts` — `DeltaStreamer` class, compression, `syncStreams()` +- `src/component/streams.ts` — Backend mutations/queries (`create`, `addDelta`, `listDeltas`, `finish`, `abort`) +- `src/react/useDeltaStreams.ts` — Client-side cursor tracking and delta accumulation +- `src/deltas.ts` — Delta-to-UIMessage materialization + +### 2.2 Stream State Machine + +``` + create() addDelta() (with heartbeat) + │ │ + ▼ ▼ +┌──────────┐ ┌──────────┐ +│ streaming │─────▶│ streaming │──── heartbeat every ~2.5 min +└──────────┘ └──────────┘ + │ │ + │ finish() │ abort() / timeout (10 min) + ▼ ▼ +┌──────────┐ ┌─────────┐ +│ finished │ │ aborted │ +└──────────┘ └─────────┘ + │ + │ cleanup (5 min delay) + ▼ + [deleted] +``` + +### 2.3 Data Formats + +Two delta formats are supported, declared per-stream: + +| Format | Description | Primary Use | +|--------|-------------|-------------| +| `UIMessageChunk` | AI SDK v6 native format (`text-delta`, `tool-input-delta`, `reasoning-delta`, etc.) | Default for new streams | +| `TextStreamPart` | Legacy AI SDK format | Backwards compatibility | + +--- + +## 3. HTTP Streaming Requirements + +### 3.1 Transport Layer + +**REQ-HTTP-1**: Provide an HTTP streaming endpoint that emits deltas as Server-Sent Events (SSE) or newline-delimited JSON (NDJSON), enabling clients that cannot use Convex WebSocket subscriptions (e.g., non-JS environments, CLI tools, third-party integrations). + +**REQ-HTTP-2**: The HTTP endpoint must support resumption. A client that disconnects and reconnects with a cursor value must receive only deltas it hasn't seen, not replay the full stream. + +**REQ-HTTP-3**: The HTTP endpoint must respect the same rate-limiting constants as the WebSocket path: +- `MAX_DELTAS_PER_REQUEST = 1000` (total across all streams) +- `MAX_DELTAS_PER_STREAM = 100` (per stream per request) + +**REQ-HTTP-4**: The HTTP endpoint must support filtering by stream status (`streaming`, `finished`, `aborted`) matching the existing `listStreams` query interface. + +**REQ-HTTP-5**: The HTTP endpoint must emit a terminal event when the stream reaches `finished` or `aborted` state, so clients know to stop polling/listening. + +### 3.2 Response Format + +**REQ-HTTP-6**: Each SSE/NDJSON frame must include: +```typescript +{ + streamId: string; // ID of the streaming message + start: number; // Inclusive cursor position + end: number; // Exclusive cursor position + parts: any[]; // Delta parts (UIMessageChunk[] or TextStreamPart[]) +} +``` + +This matches the existing `StreamDelta` type (`src/validators.ts:628-634`). + +**REQ-HTTP-7**: Stream metadata must be available either as an initial frame or via a separate endpoint, containing: +```typescript +{ + streamId: string; + status: "streaming" | "finished" | "aborted"; + format: "UIMessageChunk" | "TextStreamPart" | undefined; + order: number; + stepOrder: number; + userId?: string; + agentName?: string; + model?: string; + provider?: string; + providerOptions?: ProviderOptions; +} +``` + +This matches the existing `StreamMessage` type (`src/validators.ts:607-626`). + +--- + +## 4. Delta Stream Filtering Logic + +### 4.1 Server-Side Filtering + +**REQ-FILT-1**: The `listDeltas` query must continue to filter by stream ID + cursor position using the `streamId_start_end` index: +``` +.withIndex("streamId_start_end", (q) => + q.eq("streamId", cursor.streamId).gte("start", cursor.cursor)) +``` + +**REQ-FILT-2**: Stream discovery (`list` query) must filter by: +- `threadId` (required) — scoped to a single thread +- `state.kind` (optional, defaults to `["streaming"]`) — which statuses to include +- `startOrder` (optional, defaults to 0) — minimum message order position + +This uses the compound index `threadId_state_order_stepOrder`. + +**REQ-FILT-3**: For HTTP streaming, add support for filtering deltas by a single `streamId` (not requiring `threadId`), for clients that already know which stream they want to follow. + +### 4.2 Client-Side Filtering + +**REQ-FILT-4**: The `useDeltaStreams` hook's cursor management must be preserved: +- Per-stream cursor tracking via `Record` +- Gap detection: assert `previousEnd === delta.start` for consecutive deltas +- Stale delta rejection: skip deltas where `delta.start < oldCursor` +- Cache-friendly `startOrder` rounding (round down to nearest 10) + +**REQ-FILT-5**: Support `skipStreamIds` filtering to allow callers to exclude specific streams (used when streams are already materialized from stored messages). + +### 4.3 Delta Compression + +**REQ-FILT-6**: Delta compression must happen before persistence (in `DeltaStreamer.#createDelta`). Two compression strategies: + +1. **UIMessageChunk compression** (`compressUIMessageChunks`): + - Merge consecutive `text-delta` parts with same `id` by concatenating `.delta` + - Merge consecutive `reasoning-delta` parts with same `id` by concatenating `.delta` + +2. **TextStreamPart compression** (`compressTextStreamParts`): + - Merge consecutive `text-delta` parts with same `id` by concatenating `.text` + - Merge consecutive `reasoning-delta` parts with same `id` by concatenating `.text` + - Strip `Uint8Array` data from `file` parts (not suitable for delta transport) + +**REQ-FILT-7**: Throttling must remain configurable per-stream: +- Default: `250ms` between delta writes +- Configurable via `StreamingOptions.throttleMs` +- Chunking granularity: `"word"`, `"line"`, `RegExp`, or custom `ChunkDetector` (default: `/[\p{P}\s]/u` — punctuation + whitespace) + +--- + +## 5. Stream ID Tracking + +### 5.1 Stream ID Lifecycle + +**REQ-SID-1**: Stream IDs are Convex document IDs (`Id<"streamingMessages">`) generated lazily on first delta write: +- `DeltaStreamer.getStreamId()` creates the stream via `streams.create` mutation +- Race-condition safe: only one creation promise via `#creatingStreamIdPromise` +- Stream ID is `undefined` until the first `addParts()` call + +**REQ-SID-2**: The `streams.create` mutation must: +1. Insert a `streamingMessages` document with `state: { kind: "streaming", lastHeartbeat: Date.now() }` +2. Schedule a timeout function at `TIMEOUT_INTERVAL` (10 minutes) +3. Patch the document with the `timeoutFnId` + +**REQ-SID-3**: Stream IDs must be passed to `addMessages` via `finishStreamId` for atomic stream finish + message persistence (prevents UI flicker from separate mutations). + +### 5.2 Client-Side Stream ID Management + +**REQ-SID-4**: React hooks must track multiple concurrent streams per thread: +- `useDeltaStreams` returns `Array<{ streamMessage: StreamMessage; deltas: StreamDelta[] }>` +- Each stream accumulates deltas independently +- Streams are sorted by `[order, stepOrder]` for display + +**REQ-SID-5**: When a thread changes (`threadId` differs from previous render): +- Clear all accumulated delta streams (`state.deltaStreams = undefined`) +- Reset all cursors (`setCursors({})`) +- Reset `startOrder` + +**REQ-SID-6**: Stream identity in UIMessages uses the convention `id: "stream:{streamId}"` to distinguish streaming messages from persisted messages. + +### 5.3 Heartbeat & Timeout + +**REQ-SID-7**: Heartbeat behavior: +- Triggered on every `addDelta` call +- Debounced: only writes if >2.5 minutes since last heartbeat (`TIMEOUT_INTERVAL / 4`) +- Updates `state.lastHeartbeat` and reschedules the timeout function + +**REQ-SID-8**: Timeout behavior: +- After 10 minutes of inactivity, `timeoutStream` internal mutation fires +- Checks if `lastHeartbeat + TIMEOUT_INTERVAL < Date.now()` +- If expired: aborts the stream with reason `"timeout"` +- If not expired: reschedules for the remaining time + +**REQ-SID-9**: Cleanup behavior: +- `finish()` schedules `deleteStream` after `DELETE_STREAM_DELAY` (5 minutes) +- `deleteStream` removes the `streamingMessages` document and all associated `streamDeltas` +- 5-minute delay allows clients to fetch final deltas before cleanup + +--- + +## 6. Backwards Compatibility Requirements + +### 6.1 Transport Compatibility + +**REQ-BC-1**: The existing WebSocket/reactive-query streaming path must remain the default and primary transport. HTTP streaming is additive, not a replacement. + +**REQ-BC-2**: All existing public APIs must remain unchanged: +- `syncStreams()` function signature and return type (`SyncStreamsReturnValue`) +- `listStreams()` function signature +- `abortStream()` function signature +- `vStreamMessagesReturnValue` validator + +**REQ-BC-3**: The `StreamArgs` union type must be extended (not replaced) to support HTTP streaming parameters: +```typescript +// Existing (preserved): +type StreamArgs = + | { kind: "list"; startOrder: number } + | { kind: "deltas"; cursors: Array<{ streamId: string; cursor: number }> } +// New (additive): + | { kind: "http"; streamId: string; cursor?: number } +``` + +### 6.2 Data Format Compatibility + +**REQ-BC-4**: Both `UIMessageChunk` and `TextStreamPart` delta formats must be supported in perpetuity. The `format` field on `streamingMessages` is `v.optional(...)`, so streams created before format tracking was added (format = `undefined`) must default to `TextStreamPart` behavior. + +**REQ-BC-5**: Forward compatibility for new `TextStreamPart` types from future AI SDK versions must be maintained via the `default` case in `updateFromTextStreamParts` (`src/deltas.ts:520-527`): +```typescript +default: { + console.warn(`Received unexpected part: ${JSON.stringify(part)}`); + break; +} +``` + +**REQ-BC-6**: The `readUIMessageStream` error suppression for `"no tool invocation found"` must be preserved (`src/deltas.ts:77-81`). This handles tool approval continuation streams that have `tool-result` without the original `tool-call`. + +### 6.3 React Hook Compatibility + +**REQ-BC-7**: Existing React hooks must not change behavior: +- `useThreadMessages` — paginated messages + streaming +- `useUIMessages` — UIMessage-first with metadata +- `useSmoothText` — animated text rendering + +**REQ-BC-8**: New HTTP-streaming React hooks (if any) must be additive exports from `@convex-dev/agent/react`, not replacements. + +### 6.4 Schema Compatibility + +**REQ-BC-9**: No breaking changes to the component schema tables: +- `streamingMessages` — no field removals or type changes +- `streamDeltas` — no field removals or type changes +- Indexes must not be dropped (can add new ones) + +**REQ-BC-10**: The `vStreamDelta` and `vStreamMessage` validators must remain structurally compatible. New optional fields may be added but existing fields must not change type or be removed. + +### 6.5 Export Surface Compatibility + +**REQ-BC-11**: All four export surfaces must remain stable: +- `@convex-dev/agent` — main exports +- `@convex-dev/agent/react` — React hooks +- `@convex-dev/agent/validators` — Convex validators +- `@convex-dev/agent/test` — testing utilities + +HTTP streaming additions should be exported from the main surface or a new `@convex-dev/agent/http` surface (not mixed into existing surfaces that would break tree-shaking). + +--- + +## 7. Non-Functional Requirements + +**REQ-NF-1**: HTTP streaming latency must not exceed the WebSocket path latency by more than 100ms for equivalent payload sizes. + +**REQ-NF-2**: HTTP streaming must support concurrent streams per thread (matching current behavior of up to 100 active streams per thread, per the `list` query's `.take(100)`). + +**REQ-NF-3**: HTTP streaming must gracefully handle client disconnection without leaving orphaned streams (existing heartbeat/timeout mechanism applies). + +**REQ-NF-4**: Delta writes must remain throttled at the configured `throttleMs` regardless of transport, to avoid excessive database writes. + +--- + +## 8. Open Questions + +1. **SSE vs NDJSON**: Should the HTTP transport use SSE (native browser support, automatic reconnection) or NDJSON (simpler, works with `fetch` + `ReadableStream`)? + +2. **Authentication**: How should HTTP streaming endpoints authenticate? Convex actions have auth context, but raw HTTP endpoints may need token-based auth. + +3. **Multi-stream HTTP**: Should a single HTTP connection support multiplexed streams (like the current WebSocket path with multi-cursor queries), or should each HTTP connection follow a single stream? + +4. **Convex HTTP actions**: Should HTTP streaming be implemented as Convex HTTP actions (which have a 2-minute timeout and limited streaming support), or as a separate server/proxy? + +5. **Atomic finish over HTTP**: The current `finishStreamId` pattern enables atomic stream finish + message save. How should this translate to the HTTP transport where the client may not be the writer? diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index b7cf3c58..84bd78fa 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -19,6 +19,7 @@ import type * as chat_basic from "../chat/basic.js"; import type * as chat_human from "../chat/human.js"; import type * as chat_streamAbort from "../chat/streamAbort.js"; import type * as chat_streaming from "../chat/streaming.js"; +import type * as chat_streamingDemo from "../chat/streamingDemo.js"; import type * as chat_streamingReasoning from "../chat/streamingReasoning.js"; import type * as chat_withoutAgent from "../chat/withoutAgent.js"; import type * as crons from "../crons.js"; @@ -68,6 +69,7 @@ declare const fullApi: ApiFromModules<{ "chat/human": typeof chat_human; "chat/streamAbort": typeof chat_streamAbort; "chat/streaming": typeof chat_streaming; + "chat/streamingDemo": typeof chat_streamingDemo; "chat/streamingReasoning": typeof chat_streamingReasoning; "chat/withoutAgent": typeof chat_withoutAgent; crons: typeof crons; diff --git a/example/convex/chat/streamingDemo.ts b/example/convex/chat/streamingDemo.ts new file mode 100644 index 00000000..9f11b0c8 --- /dev/null +++ b/example/convex/chat/streamingDemo.ts @@ -0,0 +1,189 @@ +/** + * Full Streaming Demo + * + * Demonstrates ALL streaming patterns in one place: + * 1. Async delta streaming (recommended) - mutation saves prompt, action streams + * 2. HTTP streaming - direct text stream over HTTP response + * 3. One-shot streaming - single action call with delta persistence + * 4. Stream lifecycle management - abort, status transitions, cleanup + * + * This is intended as a comprehensive reference for integrating streaming + * with @convex-dev/agent. + */ +import { paginationOptsValidator } from "convex/server"; +import { + createThread, + listUIMessages, + syncStreams, + abortStream, + listStreams, + vStreamArgs, +} from "@convex-dev/agent"; +import { components, internal } from "../_generated/api"; +import { + action, + httpAction, + internalAction, + mutation, + query, +} from "../_generated/server"; +import { v } from "convex/values"; +import { authorizeThreadAccess } from "../threads"; +import { agent } from "../agents/simple"; + +// ============================================================================ +// Pattern 1: Async Delta Streaming (RECOMMENDED) +// +// Two-phase approach: +// Phase 1 (mutation): Save the user message and schedule the action. +// Phase 2 (action): Stream the AI response, saving deltas to the DB. +// +// Clients subscribe via `useUIMessages` with `stream: true` and see real-time +// delta updates through Convex's reactive query system. +// ============================================================================ + +export const sendMessage = mutation({ + args: { prompt: v.string(), threadId: v.string() }, + handler: async (ctx, { prompt, threadId }) => { + await authorizeThreadAccess(ctx, threadId); + const { messageId } = await agent.saveMessage(ctx, { + threadId, + prompt, + skipEmbeddings: true, + }); + await ctx.scheduler.runAfter( + 0, + internal.chat.streamingDemo.streamResponse, + { threadId, promptMessageId: messageId }, + ); + }, +}); + +export const streamResponse = internalAction({ + args: { promptMessageId: v.string(), threadId: v.string() }, + handler: async (ctx, { promptMessageId, threadId }) => { + const result = await agent.streamText( + ctx, + { threadId }, + { promptMessageId }, + { saveStreamDeltas: { chunking: "word", throttleMs: 100 } }, + ); + await result.consumeStream(); + }, +}); + +// ============================================================================ +// Pattern 2: HTTP Streaming +// +// Streams text directly over an HTTP response. Useful for single-client +// consumption (e.g., curl, fetch, or SSE-like patterns). +// +// Note: deltas are NOT saved by default here. To save deltas AND stream over +// HTTP simultaneously, add `saveStreamDeltas: true` to the options. +// ============================================================================ + +export const streamOverHttp = httpAction(async (ctx, request) => { + const body = (await request.json()) as { + threadId?: string; + prompt: string; + }; + const threadId = + body.threadId ?? (await createThread(ctx, components.agent)); + const result = await agent.streamText(ctx, { threadId }, body); + const response = result.toTextStreamResponse(); + response.headers.set("X-Message-Id", result.promptMessageId!); + return response; +}); + +// ============================================================================ +// Pattern 3: One-Shot Streaming +// +// Single action call that both streams and persists deltas. Simpler than +// the two-phase approach but does not support optimistic client updates. +// ============================================================================ + +export const streamOneShot = action({ + args: { prompt: v.string(), threadId: v.string() }, + handler: async (ctx, { prompt, threadId }) => { + await authorizeThreadAccess(ctx, threadId); + await agent.streamText( + ctx, + { threadId }, + { prompt }, + { saveStreamDeltas: true }, + ); + }, +}); + +// ============================================================================ +// Queries: Messages + Stream Sync +// ============================================================================ + +/** + * The main query used by useUIMessages. Returns paginated messages PLUS + * live stream deltas so the React hook can merge them into a single list. + */ +export const listThreadMessages = query({ + args: { + threadId: v.string(), + paginationOpts: paginationOptsValidator, + streamArgs: vStreamArgs, + }, + handler: async (ctx, args) => { + const { threadId, streamArgs } = args; + await authorizeThreadAccess(ctx, threadId); + const streams = await syncStreams(ctx, components.agent, { + threadId, + streamArgs, + }); + const paginated = await listUIMessages(ctx, components.agent, args); + return { ...paginated, streams }; + }, +}); + +/** + * Returns only active streaming messages (no paginated history). + * Useful for a lightweight "is anything streaming?" indicator. + */ +export const listActiveStreams = query({ + args: { threadId: v.string() }, + handler: async (ctx, { threadId }) => { + await authorizeThreadAccess(ctx, threadId); + return listStreams(ctx, components.agent, { threadId }); + }, +}); + +// ============================================================================ +// Stream Lifecycle Management +// ============================================================================ + +/** + * Abort a stream by its order (message position in thread). + * This is the user-facing "Stop" button action. + */ +export const abortStreamByOrder = mutation({ + args: { threadId: v.string(), order: v.number() }, + handler: async (ctx, { threadId, order }) => { + await authorizeThreadAccess(ctx, threadId); + return abortStream(ctx, components.agent, { + threadId, + order, + reason: "User requested abort", + }); + }, +}); + +/** + * Query all streams for a thread including finished and aborted ones. + * Useful for debugging and the demo's stream inspector panel. + */ +export const listAllStreams = query({ + args: { threadId: v.string() }, + handler: async (ctx, { threadId }) => { + await authorizeThreadAccess(ctx, threadId); + return listStreams(ctx, components.agent, { + threadId, + includeStatuses: ["streaming", "finished", "aborted"], + }); + }, +}); diff --git a/example/ui/chat/StreamingDemo.tsx b/example/ui/chat/StreamingDemo.tsx new file mode 100644 index 00000000..069fdd1a --- /dev/null +++ b/example/ui/chat/StreamingDemo.tsx @@ -0,0 +1,592 @@ +/** + * Full Streaming Demo UI + * + * Demonstrates ALL streaming patterns with a comprehensive UI: + * + * - Async delta streaming with real-time message updates + * - HTTP streaming via fetch with text decoding + * - Stream lifecycle visualization (streaming / finished / aborted) + * - Abort in-progress streams + * - Stream inspector panel showing active/finished/aborted streams + * - Smooth text animation via useSmoothText + * - Optimistic message sending + */ +import { useAction, useMutation, useQuery } from "convex/react"; +import { api } from "../../convex/_generated/api"; +import { + optimisticallySendMessage, + useSmoothText, + useUIMessages, + type UIMessage, +} from "@convex-dev/agent/react"; +import { useCallback, useEffect, useRef, useState } from "react"; +import { cn } from "@/lib/utils"; +import { useDemoThread } from "@/hooks/use-demo-thread"; + +type StreamMode = "delta" | "http" | "oneshot"; + +export default function StreamingDemo() { + const { threadId, resetThread } = useDemoThread("Streaming Demo"); + + return ( + <> +
+

+ Full Streaming Demo +

+
+
+ {threadId ? ( + void resetThread()} /> + ) : ( +
+ Loading... +
+ )} +
+ + ); +} + +function DemoApp({ + threadId, + reset, +}: { + threadId: string; + reset: () => void; +}) { + const [streamMode, setStreamMode] = useState("delta"); + + return ( +
+ {/* Left: Chat area */} +
+ + +
+ + {/* Right: Stream Inspector */} +
+ +
+
+ ); +} + +// ============================================================================ +// Mode Selector +// ============================================================================ + +function ModeSelector({ + mode, + onChange, +}: { + mode: StreamMode; + onChange: (m: StreamMode) => void; +}) { + const modes: { value: StreamMode; label: string; desc: string }[] = [ + { + value: "delta", + label: "Delta Streaming", + desc: "Async mutation + action with delta persistence (recommended)", + }, + { + value: "http", + label: "HTTP Streaming", + desc: "Direct text stream over HTTP response", + }, + { + value: "oneshot", + label: "One-Shot", + desc: "Single action call with delta persistence", + }, + ]; + + return ( +
+ {modes.map((m) => ( + + ))} +
+ ); +} + +// ============================================================================ +// Chat Panel +// ============================================================================ + +function ChatPanel({ + threadId, + mode, + reset, +}: { + threadId: string; + mode: StreamMode; + reset: () => void; +}) { + const { + results: messages, + status, + loadMore, + } = useUIMessages( + api.chat.streamingDemo.listThreadMessages, + { threadId }, + { initialNumItems: 20, stream: true }, + ); + + const sendDelta = useMutation( + api.chat.streamingDemo.sendMessage, + ).withOptimisticUpdate( + optimisticallySendMessage(api.chat.streamingDemo.listThreadMessages), + ); + const sendOneShot = useAction(api.chat.streamingDemo.streamOneShot); + const abortByOrder = useMutation( + api.chat.streamingDemo.abortStreamByOrder, + ); + + const [prompt, setPrompt] = useState("Hello! Tell me a joke."); + const [httpText, setHttpText] = useState(""); + const [httpStreaming, setHttpStreaming] = useState(false); + const messagesEndRef = useRef(null); + + const scrollToBottom = useCallback(() => { + messagesEndRef.current?.scrollIntoView({ behavior: "smooth" }); + }, []); + + useEffect(() => { + scrollToBottom(); + }, [messages, httpText, scrollToBottom]); + + // Clear the HTTP stream text once streaming ends. The final message is + // saved to the DB during streaming (via onStepFinish), so by the time + // the HTTP stream closes, the stored message is already in useUIMessages. + useEffect(() => { + if (httpText && !httpStreaming) { + setHttpText(""); + } + }, [httpText, httpStreaming]); + + const isStreaming = messages.some((m) => m.status === "streaming"); + + async function handleSend() { + const text = prompt.trim(); + if (!text) return; + setPrompt(""); + + if (mode === "delta") { + await sendDelta({ threadId, prompt: text }); + } else if (mode === "oneshot") { + // Don't await — the action runs server-side while deltas stream + // to the client via reactive queries. + sendOneShot({ threadId, prompt: text }).catch((e) => + console.error("oneshot error:", e), + ); + } else if (mode === "http") { + await streamOverHttp(threadId, text, setHttpText, setHttpStreaming); + } + } + + return ( + <> +
+ {messages.length > 0 || httpText ? ( +
+ {status === "CanLoadMore" && ( + + )} + {messages + .filter( + (m) => + // While HTTP streaming, hide the pending assistant message — + // its content is shown in the HTTP stream bubble instead. + !(httpText && m.role === "assistant" && m.status === "pending"), + ) + .map((m) => ( + + ))} + {httpText && (() => { + // Grab tool parts from the pending assistant message + const pending = messages.find( + (m) => m.role === "assistant" && m.status === "pending", + ); + const toolParts = pending?.parts.filter((p) => + p.type.startsWith("tool-"), + ) ?? []; + return ( +
+
+ + [HTTP stream{httpStreaming ? " - live" : " - done"}] + + {toolParts.map((p: any) => ( +
+ {p.type} + {p.state && ( + ({p.state}) + )} + {p.output && ( +
+ {typeof p.output === "string" + ? p.output + : JSON.stringify(p.output)} +
+ )} +
+ ))} +
{httpText}
+
+
+ ); + })()} +
+
+ ) : ( +
+ Pick a streaming mode above and start chatting. +
+ )} +
+ +
+
{ + e.preventDefault(); + void handleSend(); + }} + > + setPrompt(e.target.value)} + className="flex-1 px-4 py-2 rounded-lg border border-gray-300 focus:outline-none focus:ring-2 focus:ring-indigo-400 bg-gray-50" + placeholder="Type a message..." + /> + {isStreaming || httpStreaming ? ( + + ) : ( + + )} + +
+
+ Mode:{" "} + + {mode === "delta" + ? "Async Delta Streaming" + : mode === "http" + ? "HTTP Streaming" + : "One-Shot Streaming"} + +
+
+ + ); +} + +// ============================================================================ +// HTTP Streaming Helper +// ============================================================================ + +async function streamOverHttp( + threadId: string, + prompt: string, + onText: (text: string) => void, + onStreaming: (streaming: boolean) => void, +) { + const convexUrl = import.meta.env.VITE_CONVEX_URL as string; + // Derive the HTTP actions URL from the Convex deployment URL + const httpUrl = convexUrl.replace(/\.cloud$/, ".site"); + onStreaming(true); + onText(""); + + try { + const res = await fetch(`${httpUrl}/streamText`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ threadId, prompt }), + }); + + if (!res.ok || !res.body) { + onText(`Error: ${res.status} ${res.statusText}`); + onStreaming(false); + return; + } + + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let accumulated = ""; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + accumulated += decoder.decode(value, { stream: true }); + onText(accumulated); + } + } catch (err) { + onText(`Stream error: ${err}`); + } finally { + onStreaming(false); + } +} + +// ============================================================================ +// Message Bubble +// ============================================================================ + +function MessageBubble({ message }: { message: UIMessage }) { + const isUser = message.role === "user"; + const [visibleText] = useSmoothText(message.text, { + startStreaming: message.status === "streaming", + }); + const [reasoningText] = useSmoothText( + message.parts + .filter((p) => p.type === "reasoning") + .map((p) => p.text) + .join("\n") ?? "", + { startStreaming: message.status === "streaming" }, + ); + + const toolParts = message.parts.filter((p) => p.type.startsWith("tool-")); + + return ( +
+
+ {/* Status badge */} + {message.status !== "success" && message.role !== "user" && ( + + [{message.status}] + + )} + + {/* Reasoning */} + {reasoningText && ( +
+ {reasoningText} +
+ )} + + {/* Tool calls */} + {toolParts.map((p: any) => ( +
+ {p.type} + {p.state && ( + ({p.state}) + )} + {p.output && ( +
+ {typeof p.output === "string" + ? p.output + : JSON.stringify(p.output)} +
+ )} +
+ ))} + + {/* Main text */} +
{visibleText || (isUser ? message.text : "...")}
+
+
+ ); +} + +// ============================================================================ +// Stream Inspector Panel +// ============================================================================ + +function StreamInspector({ threadId }: { threadId: string }) { + const allStreams = useQuery(api.chat.streamingDemo.listAllStreams, { + threadId, + }); + + const streaming = allStreams?.filter((s) => s.status === "streaming") ?? []; + const finished = allStreams?.filter((s) => s.status === "finished") ?? []; + const aborted = allStreams?.filter((s) => s.status === "aborted") ?? []; + + return ( +
+

+ Stream Inspector +

+ + + + + + {allStreams?.length === 0 && ( +

+ No streams yet. Send a message to see stream lifecycle. +

+ )} + +
+

How it works:

+
    +
  • + Delta Streaming: Mutation saves prompt, schedules + an action. The action streams AI response and saves deltas to the + database. Clients subscribe via reactive queries. +
  • +
  • + HTTP Streaming: Direct text stream over HTTP. + Response chunks are decoded by the browser. No database + persistence of intermediate deltas. +
  • +
  • + One-Shot: Single action call. Simpler but no + optimistic updates. +
  • +
  • + Abort: Transitions the stream to "aborted" state. + Clients see the partial response with failed status. +
  • +
  • + Fallback: When streaming finishes, the full + message is saved to the database. The deduplication logic prefers + finalized messages over streaming ones. +
  • +
+
+
+ ); +} + +function StreamSection({ + label, + streams, + color, +}: { + label: string; + streams: any[]; + color: "green" | "blue" | "red"; +}) { + if (streams.length === 0) return null; + + const dotColors = { + green: "bg-green-400", + blue: "bg-blue-400", + red: "bg-red-400", + }; + const bgColors = { + green: "bg-green-50 border-green-200", + blue: "bg-blue-50 border-blue-200", + red: "bg-red-50 border-red-200", + }; + + return ( +
+
+
+ + {label} ({streams.length}) + +
+
+ {streams.map((s: any) => ( +
+
+ id: {s.streamId.slice(0, 12)}... +
+
+ order: {s.order}, step: {s.stepOrder} +
+ {s.agentName &&
agent: {s.agentName}
} + {s.model &&
model: {s.model}
} +
+ ))} +
+
+ ); +} diff --git a/example/ui/main.tsx b/example/ui/main.tsx index 3e0e6386..22aa909d 100644 --- a/example/ui/main.tsx +++ b/example/ui/main.tsx @@ -12,6 +12,7 @@ import RagBasic from "./rag/RagBasic"; import { StrictMode } from "react"; import StreamArray from "./objects/StreamArray"; import ChatApproval from "./chat/ChatApproval"; +import StreamingDemo from "./chat/StreamingDemo"; const convex = new ConvexReactClient(import.meta.env.VITE_CONVEX_URL as string); @@ -48,6 +49,7 @@ export function App() { } /> } /> } /> + } /> @@ -152,6 +154,20 @@ function Index() { with an optional reason.

+
  • + + Full Streaming Demo + +

    + Comprehensive demo of all streaming patterns: async delta + streaming, HTTP streaming, one-shot streaming, stream abort, + lifecycle visualization, and fallback behavior. Includes a + stream inspector panel. +

    +
  • More examples coming soon! diff --git a/src/backwards-compat.test.ts b/src/backwards-compat.test.ts new file mode 100644 index 00000000..aeb1aabd --- /dev/null +++ b/src/backwards-compat.test.ts @@ -0,0 +1,1007 @@ +/** + * Backwards Compatibility & Performance Tests for @convex-dev/agent v0.6.0 + * + * Tests that: + * 1. Legacy v5 message formats (args → input) are handled correctly + * 2. Deprecated APIs (textEmbeddingModel, maxSteps, handler/args in createTool) still work + * 3. Delta streaming performance: compression ratios, throttling, materialization speed + * 4. Both UIMessageChunk and TextStreamPart delta formats work correctly + * 5. Tool definition backwards compatibility (args/inputSchema, handler/execute) + */ +import { describe, expect, test, vi, beforeEach } from "vitest"; +import { streamText } from "ai"; +import type { GenericSchema, SchemaDefinition } from "convex/server"; +import type { TestConvex } from "convex-test"; +import { + toModelMessageContent, + serializeMessage, +} from "./mapping.js"; +import { + compressUIMessageChunks, + compressTextStreamParts, + DeltaStreamer, + DEFAULT_STREAMING_OPTIONS, +} from "./client/streaming.js"; +import { + getParts, + deriveUIMessagesFromDeltas, + deriveUIMessagesFromTextStreamParts, +} from "./deltas.js"; +import type { StreamDelta, StreamMessage } from "./validators.js"; +import type { ActionCtx, AgentComponent } from "./client/types.js"; +import { api } from "./component/_generated/api.js"; +import { createThread } from "./client/index.js"; +import { mockModel } from "./client/mockModel.js"; +import { components, initConvexTest } from "./client/setup.test.js"; + +// ============================================================================ +// 1. Legacy v5 Message Format Backwards Compatibility +// ============================================================================ + +describe("Legacy v5 message format backwards compatibility", () => { + test("tool-call with only 'args' field is deserialized correctly", () => { + // v5 stored tool calls with `args` instead of `input` + const legacyToolCall = { + type: "tool-call" as const, + toolCallId: "tc-legacy-1", + toolName: "search", + args: { query: "hello world" }, + }; + + // toModelMessageContent should handle the legacy format + const [deserialized] = toModelMessageContent([legacyToolCall as any]); + expect(deserialized).toBeDefined(); + expect((deserialized as any).type).toBe("tool-call"); + expect((deserialized as any).input).toEqual({ query: "hello world" }); + expect((deserialized as any).toolCallId).toBe("tc-legacy-1"); + expect((deserialized as any).toolName).toBe("search"); + }); + + test("tool-call with both 'args' and 'input' fields prefers 'input'", () => { + const dualToolCall = { + type: "tool-call" as const, + toolCallId: "tc-dual-1", + toolName: "search", + input: { query: "from input" }, + args: { query: "from args" }, + }; + + const [deserialized] = toModelMessageContent([dualToolCall as any]); + expect((deserialized as any).input).toEqual({ query: "from input" }); + }); + + test("tool-call with neither 'args' nor 'input' defaults to empty object", () => { + const emptyToolCall = { + type: "tool-call" as const, + toolCallId: "tc-empty-1", + toolName: "search", + }; + + const [deserialized] = toModelMessageContent([emptyToolCall as any]); + expect((deserialized as any).input).toEqual({}); + }); + + test("round-trip serialization preserves both 'input' and 'args' fields", async () => { + const ctx = { + runAction: async () => undefined, + runMutation: async () => undefined, + storage: { + store: async () => "storageId", + getUrl: async () => "https://example.com/file", + delete: async () => undefined, + }, + } as unknown as ActionCtx; + const component = api as unknown as AgentComponent; + + const message = { + role: "assistant" as const, + content: [ + { + type: "tool-call" as const, + toolCallId: "tc-rt-1", + toolName: "search", + input: { query: "test" }, + }, + ], + }; + + const { message: serialized } = await serializeMessage( + ctx, + component, + message, + ); + const content = serialized.content as any[]; + + // Serialized should have both args and input for backwards compat + expect(content[0].input).toEqual({ query: "test" }); + expect(content[0].args).toEqual({ query: "test" }); + }); + + test("tool-result with legacy 'result' field is normalized to 'output'", () => { + const legacyToolResult = { + type: "tool-result" as const, + toolCallId: "tc-legacy-2", + toolName: "search", + result: "found 3 results", + }; + + const [deserialized] = toModelMessageContent([legacyToolResult as any]); + expect((deserialized as any).output).toEqual({ + type: "text", + value: "found 3 results", + }); + }); + + test("mimeType field is accepted alongside mediaType", async () => { + const ctx = { + runAction: async () => undefined, + runMutation: async () => undefined, + storage: { + store: async () => "storageId", + getUrl: async () => "https://example.com/file", + delete: async () => undefined, + }, + } as unknown as ActionCtx; + const component = api as unknown as AgentComponent; + + // Legacy format with mimeType + const message = { + role: "user" as const, + content: [ + { + type: "file" as const, + data: new ArrayBuffer(5), + mimeType: "image/png", + }, + ], + }; + + const { message: serialized } = await serializeMessage( + ctx, + component, + message as any, + ); + const content = serialized.content as any[]; + // Should be stored with mediaType (or both) + expect( + content[0].mediaType || content[0].mimeType, + ).toBe("image/png"); + }); +}); + +// ============================================================================ +// 2. Deprecated API Surface +// ============================================================================ + +describe("Deprecated API surface backwards compatibility", () => { + test("textEmbeddingModel config is accepted and used", () => { + // This test verifies the type accepts textEmbeddingModel + // The actual embedding functionality requires a running Convex backend + const config = { + textEmbeddingModel: { modelId: "test" }, + embeddingModel: undefined, + }; + + // Both should be accepted - textEmbeddingModel as fallback + expect(config.textEmbeddingModel).toBeDefined(); + }); + + test("maxSteps config is still supported in Config type", () => { + // maxSteps is kept for backwards compatibility + const config = { + maxSteps: 5, + }; + expect(config.maxSteps).toBe(5); + }); + + test("createTool with deprecated 'args' shows deprecation but works at runtime", async () => { + // Import dynamically to test runtime behavior + const { createTool } = await import("./client/createTool.js"); + const { z } = await import("zod/v4"); + + const consoleSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + // Legacy v5 pattern - should work at runtime via backwards compat. + // createTool's types intentionally reject the deprecated args/handler, + // so we call through a Function-typed reference to test runtime behavior. + const createToolCompat = createTool as (...args: any[]) => any; + const legacyTool = createToolCompat({ + description: "Test legacy tool", + args: z.object({ query: z.string() }), + handler: async ( + _ctx: Record, + input: { query: string }, + ) => { + return input.query.toUpperCase(); + }, + }); + + expect(legacyTool).toBeDefined(); + + consoleSpy.mockRestore(); + }); + + test("createTool with v6 'inputSchema' and 'execute' works correctly", async () => { + const { createTool } = await import("./client/createTool.js"); + const { z } = await import("zod/v4"); + + const modernTool = createTool({ + description: "Test modern tool", + inputSchema: z.object({ query: z.string() }), + execute: async (_ctx, input, _options) => { + return (input as { query: string }).query.toUpperCase(); + }, + }); + + expect(modernTool).toBeDefined(); + expect(modernTool.inputSchema).toBeDefined(); + }); +}); + +// ============================================================================ +// 3. Delta Streaming Performance Tests +// ============================================================================ + +describe("Delta streaming performance characteristics", () => { + let t: TestConvex>; + let threadId: string; + + const defaultTestOptions = { + throttleMs: 0, + abortSignal: undefined, + compress: null, + onAsyncAbort: async (_reason: string) => {}, + }; + + const testMetadata = { + order: 0, + stepOrder: 0, + agentName: "perf-test", + model: "mock-model", + provider: "mock", + providerOptions: {}, + format: "UIMessageChunk" as const, + }; + + beforeEach(async () => { + t = initConvexTest(); + await t.run(async (ctx) => { + threadId = await createThread(ctx, components.agent, {}); + }); + }); + + test("compression reduces UIMessageChunk delta count by merging consecutive text-deltas", () => { + // Simulate per-character streaming (worst case for bandwidth) + const chars = "Hello, this is a test of delta compression efficiency."; + const parts = chars.split("").map((char) => ({ + type: "text-delta" as const, + id: "txt-0", + delta: char, + })); + + const compressed = compressUIMessageChunks(parts); + + // All consecutive text-deltas with same ID should merge into one + expect(compressed).toHaveLength(1); + const first = compressed[0]; + expect(first.type).toBe("text-delta"); + if (first.type === "text-delta") { + expect(first.delta).toBe(chars); + } + + // Compression ratio + const ratio = parts.length / compressed.length; + expect(ratio).toBeGreaterThan(1); + }); + + test("compression handles interleaved text and reasoning deltas", () => { + const parts = [ + { type: "text-delta" as const, id: "txt-0", delta: "Hello" }, + { type: "text-delta" as const, id: "txt-0", delta: " world" }, + { type: "reasoning-delta" as const, id: "r-0", delta: "Thinking" }, + { type: "reasoning-delta" as const, id: "r-0", delta: " about" }, + { type: "text-delta" as const, id: "txt-1", delta: "More" }, + { type: "text-delta" as const, id: "txt-1", delta: " text" }, + ]; + + const compressed = compressUIMessageChunks(parts); + + // Should produce 3 merged groups: text-0, reasoning-0, text-1 + expect(compressed).toHaveLength(3); + expect(compressed[0]).toEqual({ + type: "text-delta", + id: "txt-0", + delta: "Hello world", + }); + expect(compressed[1]).toEqual({ + type: "reasoning-delta", + id: "r-0", + delta: "Thinking about", + }); + expect(compressed[2]).toEqual({ + type: "text-delta", + id: "txt-1", + delta: "More text", + }); + }); + + test("TextStreamPart compression merges consecutive text-deltas", () => { + const parts = [ + { type: "text-delta" as const, id: "txt-0", text: "Hello" }, + { type: "text-delta" as const, id: "txt-0", text: " " }, + { type: "text-delta" as const, id: "txt-0", text: "world" }, + ] as any[]; + + const compressed = compressTextStreamParts(parts); + expect(compressed).toHaveLength(1); + expect((compressed[0] as any).text).toBe("Hello world"); + }); + + test("TextStreamPart compression strips Uint8Array from file parts", () => { + const filePart = { + type: "file" as const, + file: { + data: new Uint8Array([1, 2, 3]), + uint8Array: new Uint8Array([1, 2, 3]), + mediaType: "application/octet-stream", + }, + }; + + const compressed = compressTextStreamParts([filePart as any]); + // File part is preserved but with Uint8Array stripped + const fileParts = compressed.filter((p) => p.type === "file"); + expect(fileParts.length).toBeGreaterThan(0); + }); + + test("large delta set materializes correctly", () => { + // Simulate 100 deltas from a long streaming response + const streamId = "perf-stream"; + const deltas: StreamDelta[] = []; + let cursor = 0; + + for (let i = 0; i < 100; i++) { + const parts = [ + { type: "text-delta", id: "txt-0", delta: `chunk-${i} ` }, + ]; + deltas.push({ + streamId, + start: cursor, + end: cursor + parts.length, + parts, + }); + cursor += parts.length; + } + + const { parts, cursor: finalCursor } = getParts(deltas, 0); + expect(parts).toHaveLength(100); + expect(finalCursor).toBe(100); + + // All text should be reconstructable + const fullText = parts + .map((p: any) => p.delta) + .join(""); + expect(fullText).toContain("chunk-0"); + expect(fullText).toContain("chunk-99"); + }); + + test("delta materialization performance: deriveUIMessagesFromDeltas handles many deltas", async () => { + const streamId = "perf-stream-2"; + const streamMessage: StreamMessage = { + streamId, + order: 0, + stepOrder: 0, + status: "finished", + format: "UIMessageChunk", + }; + + // Build 50 deltas with text content + const deltas: StreamDelta[] = []; + let cursor = 0; + // First delta: start parts + deltas.push({ + streamId, + start: cursor, + end: cursor + 3, + parts: [ + { type: "start" }, + { type: "start-step" }, + { type: "text-start", id: "txt-0" }, + ], + }); + cursor += 3; + + for (let i = 0; i < 50; i++) { + deltas.push({ + streamId, + start: cursor, + end: cursor + 1, + parts: [{ type: "text-delta", id: "txt-0", delta: `word${i} ` }], + }); + cursor += 1; + } + + // Final delta: end parts + deltas.push({ + streamId, + start: cursor, + end: cursor + 3, + parts: [ + { type: "text-end", id: "txt-0" }, + { type: "finish-step" }, + { type: "finish" }, + ], + }); + + const start = performance.now(); + const messages = await deriveUIMessagesFromDeltas( + "perf-thread", + [streamMessage], + deltas, + ); + const elapsed = performance.now() - start; + + expect(messages).toHaveLength(1); + expect(messages[0].role).toBe("assistant"); + expect(messages[0].text).toContain("word0"); + expect(messages[0].text).toContain("word49"); + + // Should materialize quickly (< 100ms for 50 deltas) + expect(elapsed).toBeLessThan(100); + }); + + test("DeltaStreamer with compression produces fewer deltas", async () => { + await t.run(async (ctx) => { + // Without compression + const noCompressStreamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions, compress: null }, + { ...testMetadata, threadId, order: 0 }, + ); + const r1 = streamText({ + model: mockModel({ + content: [{ type: "text", text: "Hello beautiful world of streaming" }], + }), + prompt: "Test", + }); + await noCompressStreamer.consumeStream(r1.toUIMessageStream()); + const noCompressId = noCompressStreamer.streamId!; + + const noCompressDeltas = await ctx.runQuery( + components.agent.streams.listDeltas, + { threadId, cursors: [{ cursor: 0, streamId: noCompressId }] }, + ); + + // With compression + const compressStreamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions, compress: compressUIMessageChunks }, + { ...testMetadata, threadId, order: 1 }, + ); + const r2 = streamText({ + model: mockModel({ + content: [{ type: "text", text: "Hello beautiful world of streaming" }], + }), + prompt: "Test", + }); + await compressStreamer.consumeStream(r2.toUIMessageStream()); + const compressId = compressStreamer.streamId!; + + const compressDeltas = await ctx.runQuery( + components.agent.streams.listDeltas, + { threadId, cursors: [{ cursor: 0, streamId: compressId }] }, + ); + + // Compressed deltas should have fewer or equal total parts + const noCompressParts = getParts(noCompressDeltas).parts; + const compressParts = getParts(compressDeltas).parts; + expect(compressParts.length).toBeLessThanOrEqual( + noCompressParts.length, + ); + }); + }); + + test("DEFAULT_STREAMING_OPTIONS has expected defaults", () => { + expect(DEFAULT_STREAMING_OPTIONS.throttleMs).toBe(250); + expect(DEFAULT_STREAMING_OPTIONS.returnImmediately).toBe(false); + expect(DEFAULT_STREAMING_OPTIONS.chunking).toBeInstanceOf(RegExp); + }); +}); + +// ============================================================================ +// 4. Both UIMessageChunk and TextStreamPart Formats +// ============================================================================ + +describe("Dual format delta support", () => { + test("UIMessageChunk format: full reconstruction with text and reasoning", async () => { + const streamId = "uimc-1"; + const streamMessage: StreamMessage = { + streamId, + order: 0, + stepOrder: 0, + status: "finished", + format: "UIMessageChunk", + }; + + const deltas: StreamDelta[] = [ + { + streamId, + start: 0, + end: 5, + parts: [ + { type: "start" }, + { type: "start-step" }, + { type: "reasoning-start", id: "r-0" }, + { type: "reasoning-delta", id: "r-0", delta: "Let me think..." }, + { type: "reasoning-end", id: "r-0" }, + ], + }, + { + streamId, + start: 5, + end: 9, + parts: [ + { type: "text-start", id: "txt-0" }, + { type: "text-delta", id: "txt-0", delta: "Here is the answer." }, + { type: "text-end", id: "txt-0" }, + { type: "finish-step" }, + ], + }, + { + streamId, + start: 9, + end: 10, + parts: [{ type: "finish" }], + }, + ]; + + const messages = await deriveUIMessagesFromDeltas( + "test-thread", + [streamMessage], + deltas, + ); + + expect(messages).toHaveLength(1); + const msg = messages[0]; + expect(msg.role).toBe("assistant"); + expect(msg.text).toContain("Here is the answer."); + expect(msg.status).toBe("success"); + + // Check reasoning parts + const reasoningParts = msg.parts.filter((p) => p.type === "reasoning"); + expect(reasoningParts.length).toBeGreaterThan(0); + expect((reasoningParts[0] as any).text).toContain("Let me think"); + }); + + test("TextStreamPart format: reconstruction with tool calls", () => { + const streamId = "tsp-1"; + const streamMessage: StreamMessage = { + streamId, + order: 0, + stepOrder: 0, + status: "finished", + // No format = TextStreamPart (legacy) + }; + + const deltas: StreamDelta[] = [ + { + streamId, + start: 0, + end: 1, + parts: [ + { type: "text-delta", id: "txt-0", text: "Calling search... " }, + ], + }, + { + streamId, + start: 1, + end: 2, + parts: [ + { + type: "tool-call", + toolCallId: "tc-tsp-1", + toolName: "search", + input: { query: "test" }, + }, + ], + }, + { + streamId, + start: 2, + end: 3, + parts: [ + { + type: "tool-result", + toolCallId: "tc-tsp-1", + toolName: "search", + output: "Found 5 results", + }, + ], + }, + { + streamId, + start: 3, + end: 4, + parts: [ + { + type: "text-delta", + id: "txt-1", + text: "Results processed.", + }, + ], + }, + ]; + + const [messages, , changed] = deriveUIMessagesFromTextStreamParts( + "test-thread", + [streamMessage], + [], + deltas, + ); + + expect(messages).toHaveLength(1); + expect(changed).toBe(true); + + const msg = messages[0]; + expect(msg.text).toContain("Calling search..."); + expect(msg.text).toContain("Results processed."); + + const toolParts = msg.parts.filter((p: any) => + p.type.startsWith("tool-"), + ); + expect(toolParts.length).toBeGreaterThan(0); + }); + + test("streams without format field default to TextStreamPart", async () => { + const streamId = "no-format"; + const streamMessage: StreamMessage = { + streamId, + order: 0, + stepOrder: 0, + status: "finished", + // format is intentionally omitted + }; + + const deltas: StreamDelta[] = [ + { + streamId, + start: 0, + end: 1, + parts: [{ type: "text-delta", id: "txt-0", text: "Hello from TextStreamPart" }], + }, + ]; + + // deriveUIMessagesFromDeltas should detect missing format and use TextStreamPart path + const messages = await deriveUIMessagesFromDeltas( + "test-thread", + [streamMessage], + deltas, + ); + + expect(messages).toHaveLength(1); + expect(messages[0].text).toContain("Hello from TextStreamPart"); + }); +}); + +// ============================================================================ +// 5. Stream Status Mapping +// ============================================================================ + +describe("Stream status mapping backwards compatibility", () => { + test("streaming → streaming status", async () => { + const msg: StreamMessage = { + streamId: "s1", + order: 0, + stepOrder: 0, + status: "streaming", + }; + const messages = await deriveUIMessagesFromDeltas("t1", [msg], []); + expect(messages[0].status).toBe("streaming"); + }); + + test("finished → success status", async () => { + const msg: StreamMessage = { + streamId: "s2", + order: 0, + stepOrder: 0, + status: "finished", + }; + const messages = await deriveUIMessagesFromDeltas("t1", [msg], []); + expect(messages[0].status).toBe("success"); + }); + + test("aborted → failed status", async () => { + const msg: StreamMessage = { + streamId: "s3", + order: 0, + stepOrder: 0, + status: "aborted", + }; + const messages = await deriveUIMessagesFromDeltas("t1", [msg], []); + expect(messages[0].status).toBe("failed"); + }); +}); + +// ============================================================================ +// 6. Delta Cursor Mechanics +// ============================================================================ + +describe("Delta cursor mechanics and gap handling", () => { + test("getParts handles empty delta array", () => { + const { parts, cursor } = getParts([], 0); + expect(parts).toHaveLength(0); + expect(cursor).toBe(0); + }); + + test("getParts handles deltas starting after cursor (gap)", () => { + const deltas: StreamDelta[] = [ + { streamId: "s1", start: 5, end: 8, parts: [{ type: "a" }] }, + ]; + // Cursor at 0, delta starts at 5 - there's a gap + const { parts, cursor } = getParts(deltas, 0); + expect(parts).toHaveLength(0); + expect(cursor).toBe(0); + }); + + test("getParts handles overlapping deltas (already consumed)", () => { + const deltas: StreamDelta[] = [ + { + streamId: "s1", + start: 0, + end: 3, + parts: [{ type: "old1" }, { type: "old2" }, { type: "old3" }], + }, + { + streamId: "s1", + start: 3, + end: 5, + parts: [{ type: "new1" }, { type: "new2" }], + }, + ]; + + // Cursor at 3 - first delta should be skipped + const { parts, cursor } = getParts<{ type: string }>(deltas, 3); + expect(parts).toHaveLength(2); + expect(parts[0].type).toBe("new1"); + expect(cursor).toBe(5); + }); + + test("getParts handles unsorted deltas by sorting them", () => { + const deltas: StreamDelta[] = [ + { + streamId: "s1", + start: 3, + end: 6, + parts: [{ type: "second" }], + }, + { + streamId: "s1", + start: 0, + end: 3, + parts: [{ type: "first" }], + }, + ]; + + const { parts, cursor } = getParts<{ type: string }>(deltas, 0); + expect(parts).toHaveLength(2); + expect(parts[0].type).toBe("first"); + expect(parts[1].type).toBe("second"); + expect(cursor).toBe(6); + }); +}); + +// ============================================================================ +// 7. Multi-Stream Delta Materialization +// ============================================================================ + +describe("Multi-stream delta materialization", () => { + test("multiple streams produce sorted UIMessages", async () => { + const streams: StreamMessage[] = [ + { streamId: "s2", order: 2, stepOrder: 0, status: "finished" }, + { streamId: "s1", order: 1, stepOrder: 0, status: "finished" }, + { streamId: "s3", order: 3, stepOrder: 0, status: "streaming" }, + ]; + + const deltas: StreamDelta[] = [ + { + streamId: "s1", + start: 0, + end: 1, + parts: [{ type: "text-delta", id: "t", text: "First" }], + }, + { + streamId: "s2", + start: 0, + end: 1, + parts: [{ type: "text-delta", id: "t", text: "Second" }], + }, + { + streamId: "s3", + start: 0, + end: 1, + parts: [{ type: "text-delta", id: "t", text: "Third" }], + }, + ]; + + const messages = await deriveUIMessagesFromDeltas("t1", streams, deltas); + + expect(messages).toHaveLength(3); + // Messages should be sorted by order + expect(messages[0].order).toBe(1); + expect(messages[1].order).toBe(2); + expect(messages[2].order).toBe(3); + }); + + test("streams at same order but different stepOrders produce separate messages", async () => { + const streams: StreamMessage[] = [ + { + streamId: "s1", + order: 1, + stepOrder: 0, + status: "finished", + format: "UIMessageChunk", + }, + { + streamId: "s2", + order: 1, + stepOrder: 1, + status: "finished", + format: "UIMessageChunk", + }, + ]; + + const deltas: StreamDelta[] = [ + { + streamId: "s1", + start: 0, + end: 3, + parts: [ + { type: "start" }, + { type: "text-start", id: "txt-0" }, + { type: "text-delta", id: "txt-0", delta: "Step 0" }, + ], + }, + { + streamId: "s2", + start: 0, + end: 3, + parts: [ + { type: "start" }, + { type: "text-start", id: "txt-0" }, + { type: "text-delta", id: "txt-0", delta: "Step 1" }, + ], + }, + ]; + + const messages = await deriveUIMessagesFromDeltas("t1", streams, deltas); + expect(messages).toHaveLength(2); + }); +}); + +// ============================================================================ +// 8. Integration: Full Streaming Lifecycle +// ============================================================================ + +describe("Full streaming lifecycle integration", () => { + let t: TestConvex>; + let threadId: string; + + const defaultTestOptions = { + throttleMs: 0, + abortSignal: undefined, + compress: null, + onAsyncAbort: async (_reason: string) => {}, + }; + + const testMetadata = { + order: 0, + stepOrder: 0, + agentName: "lifecycle-test", + model: "mock-model", + provider: "mock", + providerOptions: {}, + format: "UIMessageChunk" as const, + }; + + beforeEach(async () => { + t = initConvexTest(); + await t.run(async (ctx) => { + threadId = await createThread(ctx, components.agent, {}); + }); + }); + + test("end-to-end: stream → persist → reconstruct produces correct text", async () => { + await t.run(async (ctx) => { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId }, + ); + + const testText = "The quick brown fox jumps over the lazy dog"; + const result = streamText({ + model: mockModel({ + content: [{ type: "text", text: testText }], + }), + prompt: "Test", + }); + + await streamer.consumeStream(result.toUIMessageStream()); + const streamId = streamer.streamId!; + + // Fetch and reconstruct + const streams = await ctx.runQuery(components.agent.streams.list, { + threadId, + statuses: ["finished"], + }); + const deltas = await ctx.runQuery( + components.agent.streams.listDeltas, + { threadId, cursors: [{ cursor: 0, streamId }] }, + ); + + const messages = await deriveUIMessagesFromDeltas( + threadId, + streams, + deltas, + ); + + expect(messages).toHaveLength(1); + // The reconstructed text should contain all words + for (const word of testText.split(" ")) { + expect(messages[0].text).toContain(word); + } + }); + }); + + test("end-to-end: stream with reasoning → persist → reconstruct", async () => { + await t.run(async (ctx) => { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId }, + ); + + const result = streamText({ + model: mockModel({ + content: [ + { type: "reasoning", text: "I need to think about this carefully" }, + { type: "text", text: "After careful thought, here is my answer" }, + ], + }), + prompt: "Test", + }); + + await streamer.consumeStream(result.toUIMessageStream()); + const streamId = streamer.streamId!; + + const streams = await ctx.runQuery(components.agent.streams.list, { + threadId, + statuses: ["finished"], + }); + const deltas = await ctx.runQuery( + components.agent.streams.listDeltas, + { threadId, cursors: [{ cursor: 0, streamId }] }, + ); + + const messages = await deriveUIMessagesFromDeltas( + threadId, + streams, + deltas, + ); + + expect(messages).toHaveLength(1); + const msg = messages[0]; + + // Text content + expect(msg.text).toContain("After careful thought"); + + // Reasoning parts + const reasoning = msg.parts.filter((p) => p.type === "reasoning"); + expect(reasoning.length).toBeGreaterThan(0); + expect((reasoning[0] as any).text).toContain("think about this"); + }); + }); +}); diff --git a/src/client/streaming.integration.test.ts b/src/client/streaming.integration.test.ts new file mode 100644 index 00000000..a2cfee09 --- /dev/null +++ b/src/client/streaming.integration.test.ts @@ -0,0 +1,1211 @@ +import { beforeEach, describe, expect, test } from "vitest"; +import { createThread } from "./index.js"; +import type { GenericSchema, SchemaDefinition } from "convex/server"; +import { streamText } from "ai"; +import { components, initConvexTest } from "./setup.test.js"; +import { mockModel } from "./mockModel.js"; +import { + compressUIMessageChunks, + DeltaStreamer, + mergeTransforms, +} from "./streaming.js"; +import { + getParts, + deriveUIMessagesFromDeltas, + deriveUIMessagesFromTextStreamParts, +} from "../deltas.js"; +import type { TestConvex } from "convex-test"; +import type { StreamDelta, StreamMessage } from "../validators.js"; +import { dedupeMessages } from "../react/useUIMessages.js"; + +const defaultTestOptions = { + throttleMs: 0, + abortSignal: undefined, + compress: null, + onAsyncAbort: async (_reason: string) => { + // In integration tests, async aborts can happen when the stream + // finishes before a pending delta write completes. This is expected. + }, +}; + +const testMetadata = { + order: 0, + stepOrder: 0, + agentName: "test agent", + model: "test model", + provider: "test provider", + providerOptions: {}, + format: "UIMessageChunk" as const, +}; + +// ============================================================================ +// HTTP Streaming Initiation +// ============================================================================ + +describe("HTTP Streaming Initiation", () => { + let t: TestConvex>; + let threadId: string; + + beforeEach(async () => { + t = initConvexTest(); + await t.run(async (ctx) => { + threadId = await createThread(ctx, components.agent, {}); + }); + }); + + test("DeltaStreamer creates a stream on first addParts call", async () => { + await t.run(async (ctx) => { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId }, + ); + + expect(streamer.streamId).toBeUndefined(); + + await streamer.addParts([{ type: "start" }]); + expect(streamer.streamId).toBeDefined(); + }); + }); + + test("DeltaStreamer.getStreamId creates the stream lazily", async () => { + await t.run(async (ctx) => { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId }, + ); + + expect(streamer.streamId).toBeUndefined(); + const streamId = await streamer.getStreamId(); + expect(streamId).toBeDefined(); + expect(streamer.streamId).toBe(streamId); + }); + }); + + test("DeltaStreamer.getStreamId returns the same ID on repeated calls", async () => { + await t.run(async (ctx) => { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId }, + ); + + const id1 = await streamer.getStreamId(); + const id2 = await streamer.getStreamId(); + expect(id1).toBe(id2); + }); + }); + + test("Stream is created with streaming state", async () => { + await t.run(async (ctx) => { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId }, + ); + + await streamer.getStreamId(); + + const streams = await ctx.runQuery(components.agent.streams.list, { + threadId, + statuses: ["streaming"], + }); + expect(streams).toHaveLength(1); + expect(streams[0].status).toBe("streaming"); + expect(streams[0].agentName).toBe("test agent"); + expect(streams[0].model).toBe("test model"); + }); + }); + + test("consumeStream processes full AI SDK stream to deltas", async () => { + await t.run(async (ctx) => { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId }, + ); + + const result = streamText({ + model: mockModel({ + content: [{ type: "text", text: "Hello world" }], + }), + prompt: "Test", + }); + + await streamer.consumeStream(result.toUIMessageStream()); + // Ensure the AI SDK result is also fully consumed + await result.consumeStream(); + expect(streamer.streamId).toBeDefined(); + + // Verify deltas were saved + const deltas = await ctx.runQuery(components.agent.streams.listDeltas, { + threadId, + cursors: [{ cursor: 0, streamId: streamer.streamId! }], + }); + expect(deltas.length).toBeGreaterThan(0); + + // Verify we can reconstruct the text from deltas + const { parts } = getParts(deltas); + const textParts = parts.filter( + (p: any) => p.type === "text-delta", + ); + expect(textParts.length).toBeGreaterThan(0); + }); + }); + + test("consumeStream transitions stream to finished state", async () => { + await t.run(async (ctx) => { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId }, + ); + + const result = streamText({ + model: mockModel({ + content: [{ type: "text", text: "Done" }], + }), + prompt: "Test", + }); + + await streamer.consumeStream(result.toUIMessageStream()); + + // Stream should now be finished + const streamingStreams = await ctx.runQuery( + components.agent.streams.list, + { threadId, statuses: ["streaming"] }, + ); + expect(streamingStreams).toHaveLength(0); + + const finishedStreams = await ctx.runQuery( + components.agent.streams.list, + { threadId, statuses: ["finished"] }, + ); + expect(finishedStreams).toHaveLength(1); + expect(finishedStreams[0].status).toBe("finished"); + }); + }); + + test("markFinishedExternally prevents consumeStream from calling finish", async () => { + await t.run(async (ctx) => { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId }, + ); + + await streamer.getStreamId(); + streamer.markFinishedExternally(); + + const result = streamText({ + model: mockModel({ + content: [{ type: "text", text: "Hello" }], + }), + prompt: "Test", + }); + + await streamer.consumeStream(result.toUIMessageStream()); + + // Stream should still be in streaming state since finish was skipped + const streamingStreams = await ctx.runQuery( + components.agent.streams.list, + { threadId, statuses: ["streaming"] }, + ); + expect(streamingStreams).toHaveLength(1); + }); + }); +}); + +// ============================================================================ +// Stream Exclusion Logic +// ============================================================================ + +describe("Stream Exclusion Logic", () => { + let t: TestConvex>; + let threadId: string; + + beforeEach(async () => { + t = initConvexTest(); + await t.run(async (ctx) => { + threadId = await createThread(ctx, components.agent, {}); + }); + }); + + test("list defaults to only streaming status", async () => { + await t.run(async (ctx) => { + // Create a stream and finish it + const streamer1 = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId, order: 0 }, + ); + const r1 = streamText({ + model: mockModel({ content: [{ type: "text", text: "Finished" }] }), + prompt: "Test", + }); + await streamer1.consumeStream(r1.toUIMessageStream()); + + // Create a still-streaming stream + const streamer2 = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId, order: 1 }, + ); + await streamer2.getStreamId(); + await streamer2.addParts([{ type: "start" }]); + + // Default list: only streaming + const defaultStreams = await ctx.runQuery( + components.agent.streams.list, + { threadId }, + ); + expect(defaultStreams).toHaveLength(1); + expect(defaultStreams[0].status).toBe("streaming"); + expect(defaultStreams[0].order).toBe(1); + }); + }); + + test("list with includeStatuses filters correctly", async () => { + await t.run(async (ctx) => { + // Create and finish a stream + const finishedStreamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId, order: 0 }, + ); + const r = streamText({ + model: mockModel({ content: [{ type: "text", text: "Done" }] }), + prompt: "Test", + }); + await finishedStreamer.consumeStream(r.toUIMessageStream()); + + // Create and abort a stream + const abortedStreamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId, order: 1 }, + ); + await abortedStreamer.getStreamId(); + await abortedStreamer.fail("test abort"); + + // Create a still-streaming stream + const activeStreamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId, order: 2 }, + ); + await activeStreamer.getStreamId(); + + // Query for all statuses + const allStreams = await ctx.runQuery(components.agent.streams.list, { + threadId, + statuses: ["streaming", "finished", "aborted"], + }); + expect(allStreams).toHaveLength(3); + + // Query for only finished + const finishedStreams = await ctx.runQuery( + components.agent.streams.list, + { threadId, statuses: ["finished"] }, + ); + expect(finishedStreams).toHaveLength(1); + expect(finishedStreams[0].status).toBe("finished"); + + // Query for only aborted + const abortedStreams = await ctx.runQuery( + components.agent.streams.list, + { threadId, statuses: ["aborted"] }, + ); + expect(abortedStreams).toHaveLength(1); + expect(abortedStreams[0].status).toBe("aborted"); + + // Query for streaming + aborted + const streamingAndAborted = await ctx.runQuery( + components.agent.streams.list, + { threadId, statuses: ["streaming", "aborted"] }, + ); + expect(streamingAndAborted).toHaveLength(2); + }); + }); + + test("startOrder filters out streams with lower order", async () => { + await t.run(async (ctx) => { + // Create streams at different orders + for (const order of [0, 1, 2, 3]) { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId, order }, + ); + await streamer.getStreamId(); + } + + // startOrder=2 should only return streams with order >= 2 + const filtered = await ctx.runQuery(components.agent.streams.list, { + threadId, + startOrder: 2, + statuses: ["streaming"], + }); + expect(filtered).toHaveLength(2); + expect(filtered.every((s) => s.order >= 2)).toBe(true); + }); + }); + + test("streams from different threads are isolated", async () => { + let threadId2: string; + await t.run(async (ctx) => { + threadId2 = await createThread(ctx, components.agent, {}); + + // Create a stream in thread 1 + const s1 = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId, order: 0 }, + ); + await s1.getStreamId(); + + // Create a stream in thread 2 + const s2 = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId: threadId2, order: 0 }, + ); + await s2.getStreamId(); + + // Each thread should only see its own streams + const t1Streams = await ctx.runQuery(components.agent.streams.list, { + threadId, + statuses: ["streaming"], + }); + expect(t1Streams).toHaveLength(1); + + const t2Streams = await ctx.runQuery(components.agent.streams.list, { + threadId: threadId2, + statuses: ["streaming"], + }); + expect(t2Streams).toHaveLength(1); + + expect(t1Streams[0].streamId).not.toBe(t2Streams[0].streamId); + }); + }); + + test("dedupeMessages prefers finalized over streaming over pending", () => { + type M = { + order: number; + stepOrder: number; + status: "pending" | "success" | "failed" | "streaming"; + }; + + const messages: M[] = [ + { order: 1, stepOrder: 0, status: "pending" }, + { order: 2, stepOrder: 0, status: "success" }, + { order: 3, stepOrder: 0, status: "pending" }, + ]; + const streamMessages: M[] = [ + { order: 1, stepOrder: 0, status: "streaming" }, + { order: 2, stepOrder: 0, status: "streaming" }, + { order: 3, stepOrder: 0, status: "success" }, + ]; + + const result = dedupeMessages(messages, streamMessages); + expect(result).toHaveLength(3); + // pending replaced by streaming + expect(result[0].status).toBe("streaming"); + // success kept over streaming + expect(result[1].status).toBe("success"); + // pending replaced by success + expect(result[2].status).toBe("success"); + }); +}); + +// ============================================================================ +// Delta Stream Consumption +// ============================================================================ + +describe("Delta Stream Consumption", () => { + let t: TestConvex>; + let threadId: string; + + beforeEach(async () => { + t = initConvexTest(); + await t.run(async (ctx) => { + threadId = await createThread(ctx, components.agent, {}); + }); + }); + + test("cursor-based incremental delta fetching", async () => { + await t.run(async (ctx) => { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId }, + ); + + const result = streamText({ + model: mockModel({ + content: [{ type: "text", text: "One Two Three Four" }], + }), + prompt: "Test", + }); + await streamer.consumeStream(result.toUIMessageStream()); + const streamId = streamer.streamId!; + + // Fetch all deltas from start + const allDeltas = await ctx.runQuery( + components.agent.streams.listDeltas, + { threadId, cursors: [{ cursor: 0, streamId }] }, + ); + expect(allDeltas.length).toBeGreaterThan(0); + const { parts: allParts, cursor: endCursor } = getParts(allDeltas); + + // Fetch from midpoint cursor - should only get remaining deltas + const midCursor = Math.floor(endCursor / 2); + const laterDeltas = await ctx.runQuery( + components.agent.streams.listDeltas, + { threadId, cursors: [{ cursor: midCursor, streamId }] }, + ); + const { parts: laterParts } = getParts(laterDeltas, midCursor); + + // Later parts should be a subset of all parts + expect(laterParts.length).toBeLessThanOrEqual(allParts.length); + + // Fetching from the end cursor should yield nothing + const noDeltas = await ctx.runQuery( + components.agent.streams.listDeltas, + { threadId, cursors: [{ cursor: endCursor, streamId }] }, + ); + expect(noDeltas).toHaveLength(0); + }); + }); + + test("multi-stream delta fetching with separate cursors", async () => { + await t.run(async (ctx) => { + // Create two streams with different content + const streamer1 = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId, order: 0 }, + ); + const r1 = streamText({ + model: mockModel({ content: [{ type: "text", text: "Stream One" }] }), + prompt: "Test 1", + }); + await streamer1.consumeStream(r1.toUIMessageStream()); + + const streamer2 = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId, order: 1 }, + ); + const r2 = streamText({ + model: mockModel({ content: [{ type: "text", text: "Stream Two" }] }), + prompt: "Test 2", + }); + await streamer2.consumeStream(r2.toUIMessageStream()); + + const id1 = streamer1.streamId!; + const id2 = streamer2.streamId!; + + // Fetch deltas for both streams simultaneously + const deltas = await ctx.runQuery( + components.agent.streams.listDeltas, + { + threadId, + cursors: [ + { cursor: 0, streamId: id1 }, + { cursor: 0, streamId: id2 }, + ], + }, + ); + + // Should have deltas for both streams + const s1Deltas = deltas.filter((d) => d.streamId === id1); + const s2Deltas = deltas.filter((d) => d.streamId === id2); + expect(s1Deltas.length).toBeGreaterThan(0); + expect(s2Deltas.length).toBeGreaterThan(0); + }); + }); + + test("deriveUIMessagesFromDeltas reconstructs messages from UIMessageChunk format", async () => { + await t.run(async (ctx) => { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId }, + ); + + const result = streamText({ + model: mockModel({ + content: [{ type: "text", text: "Hello from deltas" }], + }), + prompt: "Test", + }); + await streamer.consumeStream(result.toUIMessageStream()); + const streamId = streamer.streamId!; + + // Fetch stream messages and deltas + const streams = await ctx.runQuery(components.agent.streams.list, { + threadId, + statuses: ["finished"], + }); + const deltas = await ctx.runQuery( + components.agent.streams.listDeltas, + { threadId, cursors: [{ cursor: 0, streamId }] }, + ); + + // Derive UI messages + const uiMessages = await deriveUIMessagesFromDeltas( + threadId, + streams, + deltas, + ); + expect(uiMessages).toHaveLength(1); + expect(uiMessages[0].role).toBe("assistant"); + expect(uiMessages[0].text).toContain("Hello"); + expect(uiMessages[0].text).toContain("from"); + expect(uiMessages[0].text).toContain("deltas"); + }); + }); + + test("compression merges consecutive text deltas", async () => { + await t.run(async (ctx) => { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { + throttleMs: 1000, + abortSignal: undefined, + compress: compressUIMessageChunks, + onAsyncAbort: async () => { + throw new Error("async abort"); + }, + }, + { ...testMetadata, threadId }, + ); + + const result = streamText({ + model: mockModel({ + content: [ + { type: "text", text: "A B C" }, + { type: "reasoning", text: "X Y Z" }, + ], + }), + prompt: "Test", + }); + await streamer.consumeStream(result.toUIMessageStream()); + const streamId = streamer.streamId!; + + const deltas = await ctx.runQuery( + components.agent.streams.listDeltas, + { threadId, cursors: [{ cursor: 0, streamId }] }, + ); + type DeltaPart = { type: string; delta?: string; id?: string }; + const { parts } = getParts(deltas); + + // Compressed: all text-deltas for one text section should be merged + const textDeltas = parts.filter((p) => p.type === "text-delta"); + // With compression and throttleMs=1000, text deltas should be merged + expect(textDeltas.length).toBeLessThanOrEqual(1); + if (textDeltas.length === 1) { + expect(textDeltas[0].delta).toBe("A B C"); + } + + // Reasoning deltas should also be merged + const reasoningDeltas = parts.filter( + (p) => p.type === "reasoning-delta", + ); + expect(reasoningDeltas.length).toBeLessThanOrEqual(1); + }); + }); + + test("getParts validates delta continuity", () => { + const streamId = "test-stream"; + + // Normal continuous deltas + const deltas: StreamDelta[] = [ + { streamId, start: 0, end: 3, parts: [{ type: "text-delta" }] }, + { streamId, start: 3, end: 6, parts: [{ type: "text-delta" }] }, + { streamId, start: 6, end: 9, parts: [{ type: "text-delta" }] }, + ]; + const { parts, cursor } = getParts(deltas); + expect(parts).toHaveLength(3); + expect(cursor).toBe(9); + }); + + test("getParts handles gap in deltas gracefully", () => { + const streamId = "test-stream"; + + // Deltas with a gap (missing 3-6) + const deltas: StreamDelta[] = [ + { streamId, start: 0, end: 3, parts: [{ type: "a" }] }, + { streamId, start: 6, end: 9, parts: [{ type: "b" }] }, + ]; + const { parts, cursor } = getParts(deltas); + // Should stop at the gap + expect(parts).toHaveLength(1); + expect(cursor).toBe(3); + }); + + test("getParts skips already-consumed deltas", () => { + const streamId = "test-stream"; + const deltas: StreamDelta[] = [ + { streamId, start: 0, end: 3, parts: [{ type: "old" }] }, + { streamId, start: 3, end: 6, parts: [{ type: "new" }] }, + ]; + // Start from cursor=3 to skip first delta + const { parts, cursor } = getParts<{ type: string }>(deltas, 3); + expect(parts).toHaveLength(1); + expect(parts[0].type).toBe("new"); + expect(cursor).toBe(6); + }); + + test("TextStreamPart format delta reconstruction with tool calls", () => { + const streamId = "s1"; + const streamMessage: StreamMessage = { + streamId, + order: 1, + stepOrder: 0, + status: "streaming", + }; + const deltas: StreamDelta[] = [ + { + streamId, + start: 0, + end: 1, + parts: [{ type: "text-delta", id: "txt-0", text: "Let me call a tool. " }], + }, + { + streamId, + start: 1, + end: 2, + parts: [ + { + type: "tool-call", + toolCallId: "tc1", + toolName: "search", + input: { query: "hello" }, + }, + ], + }, + { + streamId, + start: 2, + end: 3, + parts: [ + { + type: "tool-result", + toolCallId: "tc1", + toolName: "search", + output: "Found 3 results", + }, + ], + }, + { + streamId, + start: 3, + end: 4, + parts: [ + { type: "text-delta", id: "txt-1", text: "Here are the results." }, + ], + }, + ]; + + const [messages, , changed] = deriveUIMessagesFromTextStreamParts( + "thread1", + [streamMessage], + [], + deltas, + ); + + expect(messages).toHaveLength(1); + expect(changed).toBe(true); + + const msg = messages[0]; + expect(msg.text).toContain("Let me call a tool."); + expect(msg.text).toContain("Here are the results."); + + const toolParts = msg.parts.filter((p: any) => + p.type.startsWith("tool-"), + ); + expect(toolParts.length).toBeGreaterThan(0); + }); +}); + +// ============================================================================ +// Fallback Behavior between HTTP and Delta Streams +// ============================================================================ + +describe("Fallback Behavior", () => { + let t: TestConvex>; + let threadId: string; + + beforeEach(async () => { + t = initConvexTest(); + await t.run(async (ctx) => { + threadId = await createThread(ctx, components.agent, {}); + }); + }); + + test("aborted stream transitions to aborted state", async () => { + await t.run(async (ctx) => { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId }, + ); + await streamer.getStreamId(); + + await streamer.fail("User canceled"); + + const aborted = await ctx.runQuery(components.agent.streams.list, { + threadId, + statuses: ["aborted"], + }); + expect(aborted).toHaveLength(1); + expect(aborted[0].status).toBe("aborted"); + + // No streaming streams left + const streaming = await ctx.runQuery(components.agent.streams.list, { + threadId, + statuses: ["streaming"], + }); + expect(streaming).toHaveLength(0); + }); + }); + + test("abort via abortByOrder aborts all streams at that order", async () => { + await t.run(async (ctx) => { + // Create two streams at the same order (different stepOrders) + const s1 = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId, order: 5, stepOrder: 0 }, + ); + await s1.getStreamId(); + + const s2 = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId, order: 5, stepOrder: 1 }, + ); + await s2.getStreamId(); + + // Abort by order + const result = await ctx.runMutation( + components.agent.streams.abortByOrder, + { threadId, order: 5, reason: "batch abort" }, + ); + expect(result).toBe(true); + + const streaming = await ctx.runQuery(components.agent.streams.list, { + threadId, + statuses: ["streaming"], + }); + expect(streaming).toHaveLength(0); + + const aborted = await ctx.runQuery(components.agent.streams.list, { + threadId, + statuses: ["aborted"], + }); + expect(aborted).toHaveLength(2); + }); + }); + + test("fail on already-aborted stream is a no-op", async () => { + await t.run(async (ctx) => { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId }, + ); + await streamer.getStreamId(); + + // First abort + await streamer.fail("First abort"); + + // Second abort is a no-op (no error thrown) + await streamer.fail("Second abort"); + + const aborted = await ctx.runQuery(components.agent.streams.list, { + threadId, + statuses: ["aborted"], + }); + expect(aborted).toHaveLength(1); + }); + }); + + test("finish on non-existent stream is a no-op", async () => { + await t.run(async (ctx) => { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId }, + ); + + // Calling finish without ever creating a stream should be safe + await streamer.finish(); + expect(streamer.streamId).toBeUndefined(); + }); + }); + + test("deriveUIMessagesFromDeltas maps stream status correctly", async () => { + // Streaming status + const streamingMsg: StreamMessage = { + streamId: "s1", + order: 0, + stepOrder: 0, + status: "streaming", + }; + const finishedMsg: StreamMessage = { + streamId: "s2", + order: 1, + stepOrder: 0, + status: "finished", + }; + const abortedMsg: StreamMessage = { + streamId: "s3", + order: 2, + stepOrder: 0, + status: "aborted", + }; + + const msgs = await deriveUIMessagesFromDeltas( + "t1", + [streamingMsg, finishedMsg, abortedMsg], + [], + ); + expect(msgs[0].status).toBe("streaming"); + expect(msgs[1].status).toBe("success"); + expect(msgs[2].status).toBe("failed"); + }); + + test("dedupeMessages handles fallback from streaming to finalized gracefully", () => { + type M = { + order: number; + stepOrder: number; + status: "pending" | "success" | "failed" | "streaming"; + text: string; + }; + + // Simulate: full messages from DB include finalized versions, streaming + // messages are still around from the delta stream + const dbMessages: M[] = [ + { order: 1, stepOrder: 0, status: "success", text: "Final answer" }, + { order: 2, stepOrder: 0, status: "pending", text: "Thinking..." }, + ]; + const streamMessages: M[] = [ + { order: 1, stepOrder: 0, status: "streaming", text: "Final ans..." }, + { order: 2, stepOrder: 0, status: "streaming", text: "Thinking..." }, + ]; + + const result = dedupeMessages(dbMessages, streamMessages); + + // Order 1: finalized DB version preferred over streaming + expect(result[0].status).toBe("success"); + expect(result[0].text).toBe("Final answer"); + + // Order 2: streaming preferred over pending DB version + expect(result[1].status).toBe("streaming"); + }); + + test("mergeTransforms adds smoothStream when streaming is enabled", () => { + // No streaming options - returns existing transforms + expect(mergeTransforms(undefined, undefined)).toBeUndefined(); + + // Boolean true - adds smoothStream + const transforms = mergeTransforms(true, undefined); + expect(transforms).toBeDefined(); + expect(Array.isArray(transforms)).toBe(true); + expect((transforms as any[]).length).toBe(1); + + // With existing transforms - appends + const existing = [(chunk: any) => chunk]; + const merged = mergeTransforms(true, existing); + expect(Array.isArray(merged)).toBe(true); + expect((merged as any[]).length).toBe(2); + + // Custom chunking + const custom = mergeTransforms({ chunking: "word" }, undefined); + expect(custom).toBeDefined(); + expect(Array.isArray(custom)).toBe(true); + }); +}); + +// ============================================================================ +// Stream Lifecycle Integration +// ============================================================================ + +describe("Stream Lifecycle Integration", () => { + let t: TestConvex>; + let threadId: string; + + beforeEach(async () => { + t = initConvexTest(); + await t.run(async (ctx) => { + threadId = await createThread(ctx, components.agent, {}); + }); + }); + + test("full lifecycle: create -> stream -> finish -> derive messages", async () => { + await t.run(async (ctx) => { + // 1. Create the stream + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId }, + ); + + // 2. Stream content + const result = streamText({ + model: mockModel({ + content: [ + { type: "text", text: "Once upon a time" }, + { type: "reasoning", text: "I should tell a story" }, + ], + }), + prompt: "Tell me a story", + }); + await streamer.consumeStream(result.toUIMessageStream()); + const streamId = streamer.streamId!; + + // 3. Verify finish state + const finished = await ctx.runQuery(components.agent.streams.list, { + threadId, + statuses: ["finished"], + }); + expect(finished).toHaveLength(1); + + // 4. Derive UI messages from stored deltas + const deltas = await ctx.runQuery( + components.agent.streams.listDeltas, + { threadId, cursors: [{ cursor: 0, streamId }] }, + ); + const uiMessages = await deriveUIMessagesFromDeltas( + threadId, + finished, + deltas, + ); + + expect(uiMessages).toHaveLength(1); + const msg = uiMessages[0]; + expect(msg.role).toBe("assistant"); + expect(msg.text).toContain("Once"); + expect(msg.text).toContain("upon"); + expect(msg.text).toContain("time"); + expect(msg.status).toBe("success"); + + // Check that reasoning parts are present + const reasoningParts = msg.parts.filter( + (p: any) => p.type === "reasoning", + ); + expect(reasoningParts.length).toBeGreaterThan(0); + }); + }); + + test("full lifecycle: create -> partial stream -> abort -> derive aborted messages", async () => { + await t.run(async (ctx) => { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId }, + ); + + // Stream some content then abort + await streamer.addParts([ + { type: "start" }, + { type: "start-step" }, + { type: "text-start", id: "txt-0" }, + { type: "text-delta", id: "txt-0", delta: "Partial" }, + ]); + await streamer.fail("User aborted"); + + const streamId = streamer.streamId!; + + // Verify aborted state + const aborted = await ctx.runQuery(components.agent.streams.list, { + threadId, + statuses: ["aborted"], + }); + expect(aborted).toHaveLength(1); + expect(aborted[0].status).toBe("aborted"); + + // Even aborted streams have their deltas stored + const deltas = await ctx.runQuery( + components.agent.streams.listDeltas, + { threadId, cursors: [{ cursor: 0, streamId }] }, + ); + expect(deltas.length).toBeGreaterThan(0); + }); + }); + + test("multiple concurrent streams in same thread", async () => { + await t.run(async (ctx) => { + const streamers = []; + for (let i = 0; i < 3; i++) { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId, order: i }, + ); + const r = streamText({ + model: mockModel({ + content: [{ type: "text", text: `Message ${i}` }], + }), + prompt: "Test", + }); + await streamer.consumeStream(r.toUIMessageStream()); + streamers.push(streamer); + } + + // All should be finished + const finished = await ctx.runQuery(components.agent.streams.list, { + threadId, + statuses: ["finished"], + }); + expect(finished).toHaveLength(3); + + // Derive all messages + const allDeltas = await ctx.runQuery( + components.agent.streams.listDeltas, + { + threadId, + cursors: streamers.map((s) => ({ + cursor: 0, + streamId: s.streamId!, + })), + }, + ); + const uiMessages = await deriveUIMessagesFromDeltas( + threadId, + finished, + allDeltas, + ); + expect(uiMessages).toHaveLength(3); + }); + }); + + test("stream deletion removes both stream and its deltas", async () => { + await t.run(async (ctx) => { + const streamer = new DeltaStreamer( + components.agent, + ctx, + { ...defaultTestOptions }, + { ...testMetadata, threadId }, + ); + const r = streamText({ + model: mockModel({ content: [{ type: "text", text: "Delete me" }] }), + prompt: "Test", + }); + await streamer.consumeStream(r.toUIMessageStream()); + const streamId = streamer.streamId!; + + // Verify deltas exist + const beforeDeltas = await ctx.runQuery( + components.agent.streams.listDeltas, + { threadId, cursors: [{ cursor: 0, streamId }] }, + ); + expect(beforeDeltas.length).toBeGreaterThan(0); + + // Delete the stream + await ctx.runMutation(components.agent.streams.deleteStreamSync, { + streamId, + }); + + // Both stream and deltas should be gone + const afterStreams = await ctx.runQuery(components.agent.streams.list, { + threadId, + statuses: ["streaming", "finished", "aborted"], + }); + expect(afterStreams).toHaveLength(0); + + const afterDeltas = await ctx.runQuery( + components.agent.streams.listDeltas, + { threadId, cursors: [{ cursor: 0, streamId }] }, + ); + expect(afterDeltas).toHaveLength(0); + }); + }); +}); + +// ============================================================================ +// Compression +// ============================================================================ + +describe("Compression", () => { + test("compressUIMessageChunks merges consecutive text-delta parts", () => { + const parts = [ + { type: "text-delta" as const, id: "1", delta: "Hello" }, + { type: "text-delta" as const, id: "1", delta: " " }, + { type: "text-delta" as const, id: "1", delta: "World" }, + ]; + const compressed = compressUIMessageChunks(parts); + expect(compressed).toHaveLength(1); + expect(compressed[0]).toEqual({ + type: "text-delta", + id: "1", + delta: "Hello World", + }); + }); + + test("compressUIMessageChunks does not merge different IDs", () => { + const parts = [ + { type: "text-delta" as const, id: "1", delta: "Hello" }, + { type: "text-delta" as const, id: "2", delta: "World" }, + ]; + const compressed = compressUIMessageChunks(parts); + expect(compressed).toHaveLength(2); + }); + + test("compressUIMessageChunks merges consecutive reasoning-delta parts", () => { + const parts = [ + { type: "reasoning-delta" as const, id: "r1", delta: "Think" }, + { type: "reasoning-delta" as const, id: "r1", delta: "ing" }, + ]; + const compressed = compressUIMessageChunks(parts); + expect(compressed).toHaveLength(1); + const first = compressed[0]; + expect(first.type).toBe("reasoning-delta"); + if (first.type === "reasoning-delta") { + expect(first.delta).toBe("Thinking"); + } + }); + + test("compressUIMessageChunks preserves non-delta parts", () => { + const parts: Parameters[0] = [ + { type: "text-start", id: "1" }, + { type: "text-delta", id: "1", delta: "A" }, + { type: "text-delta", id: "1", delta: "B" }, + { type: "text-end", id: "1" }, + ]; + const compressed = compressUIMessageChunks(parts); + expect(compressed).toHaveLength(3); // start, merged text, finish + }); +}); diff --git a/src/client/streaming.ts b/src/client/streaming.ts index 6cdc2b2b..ffcaeb7d 100644 --- a/src/client/streaming.ts +++ b/src/client/streaming.ts @@ -339,7 +339,7 @@ export class DeltaStreamer { e instanceof Error ? e.message : "unknown error", ); this.abortController.abort(); - throw e; + return; } // Now that we've sent the delta, check if we need to send another one. if ( @@ -374,7 +374,11 @@ export class DeltaStreamer { if (!this.streamId) { return; } - await this.#ongoingWrite; + // Drain the entire self-chaining write queue. Each #sendDelta may + // spawn a follow-up by reassigning #ongoingWrite, so loop until idle. + while (this.#ongoingWrite) { + await this.#ongoingWrite; + } await this.#sendDelta(); await this.ctx.runMutation(this.component.streams.finish, { streamId: this.streamId, @@ -389,7 +393,9 @@ export class DeltaStreamer { if (!this.streamId) { return; } - await this.#ongoingWrite; + while (this.#ongoingWrite) { + await this.#ongoingWrite; + } await this.ctx.runMutation(this.component.streams.abort, { streamId: this.streamId, reason, diff --git a/src/react/useSmoothText.ts b/src/react/useSmoothText.ts index a8e4000c..7a1d68f5 100644 --- a/src/react/useSmoothText.ts +++ b/src/react/useSmoothText.ts @@ -43,8 +43,25 @@ export function useSmoothText( lastUpdateLength: text.length, charsPerMs: charsPerSec / 1000, initial: true, + hasStreamed: startStreaming, }); + // Track if streaming has ever been activated + if (startStreaming) { + smoothState.current.hasStreamed = true; + } + + // If streaming was never activated, snap to the full text immediately. + // This prevents non-streaming messages (e.g. HTTP-streamed results that + // arrive all at once in the DB) from being animated. + if (!smoothState.current.hasStreamed && smoothState.current.cursor < text.length) { + smoothState.current.cursor = text.length; + smoothState.current.lastUpdateLength = text.length; + if (visibleText !== text) { + setVisibleText(text); + } + } + const isStreaming = smoothState.current.cursor < text.length; useEffect(() => { From aa8a63fe2ab403601b2ac09d9f16777087c71f47 Mon Sep 17 00:00:00 2001 From: Seth Raphael Date: Mon, 23 Feb 2026 22:30:51 -0800 Subject: [PATCH 2/4] Add HTTP streaming as first-class library feature - Add `streamId` to `GenerationOutputMetadata` for deduplication - Add `httpStreamText()` and `httpStreamUIMessages()` standalone helpers - Add `agent.asHttpAction()` factory method for HTTP streaming endpoints - Add `useHttpStream()` React hook with `streamId`/`messageId`/`abort()` - Extract standalone `generateText()` (Agent.generateText now delegates) - Export standalone `streamText()` and `generateText()` from barrel - Add `consumeTextStream()` and `supportsStreaming()` React utilities - Update example to use `asHttpAction()`, `useHttpStream()`, `skipStreamIds` - Fix HTTP streaming duplicate bubble regression (gate on `httpStreaming`) - Add tests for all new code paths (16 new tests) Co-Authored-By: Claude Opus 4.6 --- example/convex/chat/streamingDemo.ts | 20 +- example/convex/http.ts | 9 +- example/ui/chat/StreamingDemo.tsx | 83 ++----- src/client/generateText.ts | 88 +++++++ src/client/http.test.ts | 355 +++++++++++++++++++++++++++ src/client/http.ts | 157 ++++++++++++ src/client/index.ts | 189 +++++++++++--- src/client/streamText.ts | 1 + src/client/types.ts | 8 + src/react/httpStreamUtils.test.ts | 94 +++++++ src/react/httpStreamUtils.ts | 45 ++++ src/react/index.ts | 8 +- src/react/useHttpStream.ts | 146 +++++++++++ 13 files changed, 1093 insertions(+), 110 deletions(-) create mode 100644 src/client/generateText.ts create mode 100644 src/client/http.test.ts create mode 100644 src/client/http.ts create mode 100644 src/react/httpStreamUtils.test.ts create mode 100644 src/react/httpStreamUtils.ts create mode 100644 src/react/useHttpStream.ts diff --git a/example/convex/chat/streamingDemo.ts b/example/convex/chat/streamingDemo.ts index 9f11b0c8..bf649aea 100644 --- a/example/convex/chat/streamingDemo.ts +++ b/example/convex/chat/streamingDemo.ts @@ -12,7 +12,6 @@ */ import { paginationOptsValidator } from "convex/server"; import { - createThread, listUIMessages, syncStreams, abortStream, @@ -78,22 +77,17 @@ export const streamResponse = internalAction({ // Streams text directly over an HTTP response. Useful for single-client // consumption (e.g., curl, fetch, or SSE-like patterns). // +// Uses `agent.asHttpAction()` — a factory method that returns a handler +// for httpAction(). It parses the JSON body, creates a thread if needed, +// streams the response, and sets X-Message-Id / X-Stream-Id headers. +// // Note: deltas are NOT saved by default here. To save deltas AND stream over // HTTP simultaneously, add `saveStreamDeltas: true` to the options. // ============================================================================ -export const streamOverHttp = httpAction(async (ctx, request) => { - const body = (await request.json()) as { - threadId?: string; - prompt: string; - }; - const threadId = - body.threadId ?? (await createThread(ctx, components.agent)); - const result = await agent.streamText(ctx, { threadId }, body); - const response = result.toTextStreamResponse(); - response.headers.set("X-Message-Id", result.promptMessageId!); - return response; -}); +export const streamOverHttp = httpAction( + agent.asHttpAction(), +); // ============================================================================ // Pattern 3: One-Shot Streaming diff --git a/example/convex/http.ts b/example/convex/http.ts index 39e109e5..0401139d 100644 --- a/example/convex/http.ts +++ b/example/convex/http.ts @@ -1,5 +1,6 @@ import { httpRouter } from "convex/server"; import { streamOverHttp } from "./chat/streaming"; +import { streamOverHttp as streamOverHttpDemo } from "./chat/streamingDemo"; import { corsRouter } from "convex-helpers/server/cors"; const http = httpRouter(); @@ -7,7 +8,7 @@ const http = httpRouter(); const cors = corsRouter(http, { allowCredentials: true, allowedHeaders: ["Authorization", "Content-Type"], - exposedHeaders: ["Content-Type", "Content-Length", "X-Message-Id"], + exposedHeaders: ["Content-Type", "Content-Length", "X-Message-Id", "X-Stream-Id"], }); cors.route({ @@ -16,5 +17,11 @@ cors.route({ handler: streamOverHttp, }); +cors.route({ + path: "/streamTextDemo", + method: "POST", + handler: streamOverHttpDemo, +}); + // Convex expects the router to be the default export of `convex/http.js`. export default http; diff --git a/example/ui/chat/StreamingDemo.tsx b/example/ui/chat/StreamingDemo.tsx index 069fdd1a..3cb3ea2e 100644 --- a/example/ui/chat/StreamingDemo.tsx +++ b/example/ui/chat/StreamingDemo.tsx @@ -15,6 +15,7 @@ import { useAction, useMutation, useQuery } from "convex/react"; import { api } from "../../convex/_generated/api"; import { optimisticallySendMessage, + useHttpStream, useSmoothText, useUIMessages, type UIMessage, @@ -136,6 +137,11 @@ function ChatPanel({ mode: StreamMode; reset: () => void; }) { + const convexUrl = import.meta.env.VITE_CONVEX_URL as string; + const httpUrl = convexUrl.replace(/\.cloud$/, ".site"); + + const httpStream = useHttpStream({ url: `${httpUrl}/streamTextDemo` }); + const { results: messages, status, @@ -143,7 +149,11 @@ function ChatPanel({ } = useUIMessages( api.chat.streamingDemo.listThreadMessages, { threadId }, - { initialNumItems: 20, stream: true }, + { + initialNumItems: 20, + stream: true, + skipStreamIds: httpStream.streamId ? [httpStream.streamId] : [], + }, ); const sendDelta = useMutation( @@ -157,10 +167,11 @@ function ChatPanel({ ); const [prompt, setPrompt] = useState("Hello! Tell me a joke."); - const [httpText, setHttpText] = useState(""); - const [httpStreaming, setHttpStreaming] = useState(false); const messagesEndRef = useRef(null); + const httpText = httpStream.text; + const httpStreaming = httpStream.isStreaming; + const scrollToBottom = useCallback(() => { messagesEndRef.current?.scrollIntoView({ behavior: "smooth" }); }, []); @@ -169,15 +180,6 @@ function ChatPanel({ scrollToBottom(); }, [messages, httpText, scrollToBottom]); - // Clear the HTTP stream text once streaming ends. The final message is - // saved to the DB during streaming (via onStepFinish), so by the time - // the HTTP stream closes, the stored message is already in useUIMessages. - useEffect(() => { - if (httpText && !httpStreaming) { - setHttpText(""); - } - }, [httpText, httpStreaming]); - const isStreaming = messages.some((m) => m.status === "streaming"); async function handleSend() { @@ -194,7 +196,7 @@ function ChatPanel({ console.error("oneshot error:", e), ); } else if (mode === "http") { - await streamOverHttp(threadId, text, setHttpText, setHttpStreaming); + await httpStream.send({ threadId, prompt: text }); } } @@ -216,12 +218,12 @@ function ChatPanel({ (m) => // While HTTP streaming, hide the pending assistant message — // its content is shown in the HTTP stream bubble instead. - !(httpText && m.role === "assistant" && m.status === "pending"), + !(httpStreaming && httpText && m.role === "assistant" && m.status === "pending"), ) .map((m) => ( ))} - {httpText && (() => { + {httpStreaming && httpText && (() => { // Grab tool parts from the pending assistant message const pending = messages.find( (m) => m.role === "assistant" && m.status === "pending", @@ -294,6 +296,9 @@ function ChatPanel({ type="button" className="px-4 py-2 rounded-lg bg-red-100 text-red-700 hover:bg-red-200 transition font-medium" onClick={() => { + if (httpStreaming) { + httpStream.abort(); + } const streaming = messages.find( (m) => m.status === "streaming", ); @@ -317,7 +322,7 @@ function ChatPanel({ type="button" className="px-3 py-2 rounded-lg bg-gray-100 text-gray-600 hover:bg-gray-200 transition text-sm" onClick={() => { - setHttpText(""); + httpStream.abort(); reset(); }} > @@ -339,52 +344,6 @@ function ChatPanel({ ); } -// ============================================================================ -// HTTP Streaming Helper -// ============================================================================ - -async function streamOverHttp( - threadId: string, - prompt: string, - onText: (text: string) => void, - onStreaming: (streaming: boolean) => void, -) { - const convexUrl = import.meta.env.VITE_CONVEX_URL as string; - // Derive the HTTP actions URL from the Convex deployment URL - const httpUrl = convexUrl.replace(/\.cloud$/, ".site"); - onStreaming(true); - onText(""); - - try { - const res = await fetch(`${httpUrl}/streamText`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ threadId, prompt }), - }); - - if (!res.ok || !res.body) { - onText(`Error: ${res.status} ${res.statusText}`); - onStreaming(false); - return; - } - - const reader = res.body.getReader(); - const decoder = new TextDecoder(); - let accumulated = ""; - - while (true) { - const { done, value } = await reader.read(); - if (done) break; - accumulated += decoder.decode(value, { stream: true }); - onText(accumulated); - } - } catch (err) { - onText(`Stream error: ${err}`); - } finally { - onStreaming(false); - } -} - // ============================================================================ // Message Bubble // ============================================================================ diff --git a/src/client/generateText.ts b/src/client/generateText.ts new file mode 100644 index 00000000..435e98d1 --- /dev/null +++ b/src/client/generateText.ts @@ -0,0 +1,88 @@ +import type { + GenerateTextResult, + StepResult, + ToolSet, +} from "ai"; +import { generateText as generateTextAi } from "ai"; +import type { + ActionCtx, + AgentComponent, + AgentPrompt, + GenerationOutputMetadata, + Options, + Output, +} from "./types.js"; +import { startGeneration } from "./start.js"; +import type { Agent } from "./index.js"; +import { errorToString, willContinue } from "./utils.js"; + +/** + * This behaves like {@link generateText} from the "ai" package except that + * it adds context based on the userId and threadId and saves the input and + * resulting messages to the thread, if specified. + * + * This is the standalone version — it requires all arguments explicitly. + * For the Agent-bound version that inherits model/tools/instructions from + * the Agent config, use `agent.generateText()`. + */ +export async function generateText< + TOOLS extends ToolSet, + OUTPUT extends Output = never, +>( + ctx: ActionCtx, + component: AgentComponent, + /** + * The arguments to the generateText function, similar to the ai sdk's + * {@link generateText} function, along with Agent prompt options. + */ + generateTextArgs: AgentPrompt & + Omit< + Parameters>[0], + "model" | "prompt" | "messages" + > & { + /** + * The tools to use for the tool calls. + */ + tools?: TOOLS; + }, + /** + * The {@link Options} to use for fetching contextual messages + * and saving input/output messages. + */ + options: Options & { + agentName: string; + userId?: string | null; + threadId?: string; + agentForToolCtx?: Agent; + }, +): Promise & GenerationOutputMetadata> { + const { args, promptMessageId, order, ...call } = + await startGeneration(ctx, component, generateTextArgs, options); + + const steps: StepResult[] = []; + try { + const result = (await generateTextAi({ + ...args, + prepareStep: async (options) => { + const result = await generateTextArgs.prepareStep?.(options); + call.updateModel(result?.model ?? options.model); + return result; + }, + onStepFinish: async (step) => { + steps.push(step); + await call.save({ step }, await willContinue(steps, args.stopWhen)); + return generateTextArgs.onStepFinish?.(step); + }, + })) as GenerateTextResult; + const metadata: GenerationOutputMetadata = { + promptMessageId, + order, + savedMessages: call.getSavedMessages(), + messageId: promptMessageId, + }; + return Object.assign(result, metadata); + } catch (error) { + await call.fail(errorToString(error)); + throw error; + } +} diff --git a/src/client/http.test.ts b/src/client/http.test.ts new file mode 100644 index 00000000..1882b587 --- /dev/null +++ b/src/client/http.test.ts @@ -0,0 +1,355 @@ +import { describe, expect, test } from "vitest"; +import { Agent, createThread } from "./index.js"; +import { + anyApi, + actionGeneric, + defineSchema, + type DataModelFromSchemaDefinition, +} from "convex/server"; +import type { ApiFromModules, ActionBuilder } from "convex/server"; +import { components, initConvexTest } from "./setup.test.js"; +import { mockModel } from "./mockModel.js"; +import { generateText } from "./generateText.js"; +import { streamText } from "./streamText.js"; + +const schema = defineSchema({}); +type DataModel = DataModelFromSchemaDefinition; +const action = actionGeneric as ActionBuilder; + +const model = mockModel({ + content: [{ type: "text", text: "Hello from mock" }], +}); + +const agent = new Agent(components.agent, { + name: "test-http", + instructions: "You are a test agent for HTTP streaming", + languageModel: model, +}); + +// ============================================================================ +// Exported test actions (used by convex-test) +// ============================================================================ + +export const testStreamTextStandalone = action({ + args: {}, + handler: async (ctx) => { + const threadId = await createThread(ctx, components.agent, {}); + const result = await streamText( + ctx, + components.agent, + { + model: mockModel({ + content: [{ type: "text", text: "standalone stream text" }], + }), + prompt: "test prompt", + }, + { + agentName: "standalone-test", + threadId, + saveStreamDeltas: true, + }, + ); + await result.consumeStream(); + return { + text: await result.text, + promptMessageId: result.promptMessageId, + streamId: result.streamId, + order: result.order, + }; + }, +}); + +export const testStreamTextWithoutDeltas = action({ + args: {}, + handler: async (ctx) => { + const threadId = await createThread(ctx, components.agent, {}); + const result = await streamText( + ctx, + components.agent, + { + model: mockModel({ + content: [{ type: "text", text: "no deltas stream" }], + }), + prompt: "test prompt", + }, + { + agentName: "standalone-test", + threadId, + }, + ); + await result.consumeStream(); + return { + text: await result.text, + promptMessageId: result.promptMessageId, + streamId: result.streamId, + order: result.order, + }; + }, +}); + +export const testGenerateTextStandalone = action({ + args: {}, + handler: async (ctx) => { + const threadId = await createThread(ctx, components.agent, {}); + const result = await generateText( + ctx, + components.agent, + { + model: mockModel({ + content: [{ type: "text", text: "standalone generate text" }], + }), + prompt: "test prompt", + }, + { + agentName: "standalone-test", + threadId, + }, + ); + return { + text: result.text, + promptMessageId: result.promptMessageId, + order: result.order, + }; + }, +}); + +export const testAgentGenerateTextDelegation = action({ + args: {}, + handler: async (ctx) => { + const { thread } = await agent.createThread(ctx, { userId: "user1" }); + const result = await thread.generateText({ + prompt: "test prompt", + }); + return { + text: result.text, + promptMessageId: result.promptMessageId, + order: result.order, + savedMessages: result.savedMessages?.map((m) => m._id), + }; + }, +}); + +export const testAsHttpActionParsesBody = action({ + args: {}, + handler: async (ctx) => { + const threadId = await createThread(ctx, components.agent, {}); + const handler = agent.asHttpAction(); + const request = new Request("https://example.com/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ threadId, prompt: "Hello" }), + }); + const response = await handler(ctx as any, request); + const text = await response.text(); + return { + status: response.status, + hasText: text.length > 0, + hasMessageId: response.headers.has("X-Message-Id"), + }; + }, +}); + +export const testAsHttpActionCreatesThread = action({ + args: {}, + handler: async (ctx) => { + const handler = agent.asHttpAction(); + const request = new Request("https://example.com/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ prompt: "Hello" }), + }); + const response = await handler(ctx as any, request); + const text = await response.text(); + return { + status: response.status, + hasText: text.length > 0, + hasMessageId: response.headers.has("X-Message-Id"), + }; + }, +}); + +export const testAsHttpActionWithCorsHeaders = action({ + args: {}, + handler: async (ctx) => { + const threadId = await createThread(ctx, components.agent, {}); + const handler = agent.asHttpAction({ + corsHeaders: { + "Access-Control-Allow-Origin": "*", + "Access-Control-Expose-Headers": "X-Message-Id, X-Stream-Id", + }, + }); + const request = new Request("https://example.com/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ threadId, prompt: "Hello" }), + }); + const response = await handler(ctx as any, request); + await response.text(); + return { + status: response.status, + corsOrigin: response.headers.get("Access-Control-Allow-Origin"), + corsExpose: response.headers.get("Access-Control-Expose-Headers"), + hasMessageId: response.headers.has("X-Message-Id"), + }; + }, +}); + +export const testAsHttpActionWithSaveDeltas = action({ + args: {}, + handler: async (ctx) => { + const threadId = await createThread(ctx, components.agent, {}); + const handler = agent.asHttpAction({ + saveStreamDeltas: true, + }); + const request = new Request("https://example.com/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ threadId, prompt: "Hello" }), + }); + const response = await handler(ctx as any, request); + await response.text(); + return { + status: response.status, + hasStreamId: response.headers.has("X-Stream-Id"), + hasMessageId: response.headers.has("X-Message-Id"), + }; + }, +}); + +export const testAsHttpActionUIMessages = action({ + args: {}, + handler: async (ctx) => { + const threadId = await createThread(ctx, components.agent, {}); + const handler = agent.asHttpAction({ + format: "ui-messages", + }); + const request = new Request("https://example.com/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ threadId, prompt: "Hello" }), + }); + const response = await handler(ctx as any, request); + const text = await response.text(); + return { + status: response.status, + hasText: text.length > 0, + hasMessageId: response.headers.has("X-Message-Id"), + // UI message stream format is different from plain text (it contains + // structured data, not just the raw text) + textDiffers: !text.includes("Hello from mock") || text.length > "Hello from mock".length, + }; + }, +}); + +const testApi: ApiFromModules<{ + fns: { + testStreamTextStandalone: typeof testStreamTextStandalone; + testStreamTextWithoutDeltas: typeof testStreamTextWithoutDeltas; + testGenerateTextStandalone: typeof testGenerateTextStandalone; + testAgentGenerateTextDelegation: typeof testAgentGenerateTextDelegation; + testAsHttpActionParsesBody: typeof testAsHttpActionParsesBody; + testAsHttpActionCreatesThread: typeof testAsHttpActionCreatesThread; + testAsHttpActionWithCorsHeaders: typeof testAsHttpActionWithCorsHeaders; + testAsHttpActionWithSaveDeltas: typeof testAsHttpActionWithSaveDeltas; + testAsHttpActionUIMessages: typeof testAsHttpActionUIMessages; + }; +}>["fns"] = anyApi["http.test"] as any; + +// ============================================================================ +// Tests +// ============================================================================ + +describe("Standalone streamText", () => { + test("returns streamId when saveStreamDeltas is enabled", async () => { + const t = initConvexTest(schema); + const result = await t.action(testApi.testStreamTextStandalone, {}); + expect(result.text).toBe("standalone stream text"); + expect(result.promptMessageId).toBeDefined(); + expect(result.streamId).toBeDefined(); + expect(result.order).toBeTypeOf("number"); + }); + + test("streamId is undefined when saveStreamDeltas is not set", async () => { + const t = initConvexTest(schema); + const result = await t.action(testApi.testStreamTextWithoutDeltas, {}); + expect(result.text).toBe("no deltas stream"); + expect(result.streamId).toBeUndefined(); + }); +}); + +describe("Standalone generateText", () => { + test("generates text and returns metadata", async () => { + const t = initConvexTest(schema); + const result = await t.action(testApi.testGenerateTextStandalone, {}); + expect(result.text).toBe("standalone generate text"); + expect(result.promptMessageId).toBeDefined(); + expect(result.order).toBeTypeOf("number"); + }); +}); + +describe("Agent.generateText delegation to standalone", () => { + test("agent.generateText delegates correctly", async () => { + const t = initConvexTest(schema); + const result = await t.action( + testApi.testAgentGenerateTextDelegation, + {}, + ); + expect(result.text).toBe("Hello from mock"); + expect(result.promptMessageId).toBeDefined(); + expect(result.savedMessages).toBeDefined(); + expect(result.savedMessages!.length).toBeGreaterThan(0); + }); +}); + +describe("agent.asHttpAction()", () => { + test("parses JSON body and streams text", async () => { + const t = initConvexTest(schema); + const result = await t.action(testApi.testAsHttpActionParsesBody, {}); + expect(result.status).toBe(200); + expect(result.hasText).toBe(true); + expect(result.hasMessageId).toBe(true); + }); + + test("creates thread when not provided in body", async () => { + const t = initConvexTest(schema); + const result = await t.action( + testApi.testAsHttpActionCreatesThread, + {}, + ); + expect(result.status).toBe(200); + expect(result.hasText).toBe(true); + expect(result.hasMessageId).toBe(true); + }); + + test("adds CORS headers when corsHeaders is specified", async () => { + const t = initConvexTest(schema); + const result = await t.action( + testApi.testAsHttpActionWithCorsHeaders, + {}, + ); + expect(result.status).toBe(200); + expect(result.corsOrigin).toBe("*"); + expect(result.corsExpose).toBe("X-Message-Id, X-Stream-Id"); + expect(result.hasMessageId).toBe(true); + }); + + test("sets X-Stream-Id when saveStreamDeltas is enabled", async () => { + const t = initConvexTest(schema); + const result = await t.action( + testApi.testAsHttpActionWithSaveDeltas, + {}, + ); + expect(result.status).toBe(200); + expect(result.hasStreamId).toBe(true); + expect(result.hasMessageId).toBe(true); + }); + + test("returns UI message stream format when format is ui-messages", async () => { + const t = initConvexTest(schema); + const result = await t.action(testApi.testAsHttpActionUIMessages, {}); + expect(result.status).toBe(200); + expect(result.hasText).toBe(true); + expect(result.hasMessageId).toBe(true); + expect(result.textDiffers).toBe(true); + }); +}); diff --git a/src/client/http.ts b/src/client/http.ts new file mode 100644 index 00000000..05c118ac --- /dev/null +++ b/src/client/http.ts @@ -0,0 +1,157 @@ +import type { streamText as streamTextAi, ToolSet } from "ai"; +import { streamText } from "./streamText.js"; +import { createThread } from "./threads.js"; +import type { + ActionCtx, + AgentComponent, + AgentPrompt, + Options, + Output, +} from "./types.js"; +import type { StreamingOptions } from "./streaming.js"; + +/** + * Options for HTTP streaming helpers. + */ +export type HttpStreamOptions = Options & { + /** The agent name attributed to messages. */ + agentName: string; + /** The user to associate with the thread / messages. */ + userId?: string | null; + /** The thread to continue. If omitted, a new thread is created. */ + threadId?: string; + /** + * Whether to save incremental data (deltas) from streaming responses + * to the database alongside the HTTP stream. Defaults to false. + */ + saveStreamDeltas?: boolean | StreamingOptions; + /** + * Extra headers to add to the response (e.g. CORS headers). + */ + corsHeaders?: Record; +}; + +type StreamTextInputArgs< + TOOLS extends ToolSet, + OUTPUT extends Output, +> = AgentPrompt & + Omit< + Parameters>[0], + "model" | "prompt" | "messages" + > & { + tools?: TOOLS; + }; + +/** + * Stream text over HTTP, returning a standard `Response` with a + * readable text stream body. Wraps the standalone {@link streamText} + * and formats the result with `toTextStreamResponse()`. + * + * Response headers include: + * - `X-Message-Id` — the prompt message ID (for client-side tracking) + * - `X-Stream-Id` — the delta stream ID (for deduplication with `skipStreamIds`) + * + * @example + * ```ts + * import { httpStreamText } from "@convex-dev/agent"; + * + * export const chat = httpAction(async (ctx, request) => { + * const { prompt, threadId } = await request.json(); + * return httpStreamText(ctx, components.agent, { prompt }, { + * agentName: "myAgent", + * threadId, + * model: openai.chat("gpt-4o-mini"), + * }); + * }); + * ``` + */ +export async function httpStreamText< + TOOLS extends ToolSet = ToolSet, + OUTPUT extends Output = never, +>( + ctx: ActionCtx, + component: AgentComponent, + streamTextArgs: StreamTextInputArgs, + options: HttpStreamOptions, +): Promise { + const threadId = + options.threadId ?? (await createThread(ctx, component)); + + const result = await streamText( + ctx, + component, + streamTextArgs, + { + ...options, + threadId, + }, + ); + + const response = result.toTextStreamResponse(); + applyHeaders(response, result, options.corsHeaders); + return response; +} + +/** + * Stream UI messages over HTTP, returning a standard `Response` + * using AI SDK's `toUIMessageStreamResponse()` format. This provides + * richer streaming data including tool calls, reasoning, and sources. + * + * @example + * ```ts + * import { httpStreamUIMessages } from "@convex-dev/agent"; + * + * export const chat = httpAction(async (ctx, request) => { + * const { prompt, threadId } = await request.json(); + * return httpStreamUIMessages(ctx, components.agent, { prompt }, { + * agentName: "myAgent", + * threadId, + * model: openai.chat("gpt-4o-mini"), + * }); + * }); + * ``` + */ +export async function httpStreamUIMessages< + TOOLS extends ToolSet = ToolSet, + OUTPUT extends Output = never, +>( + ctx: ActionCtx, + component: AgentComponent, + streamTextArgs: StreamTextInputArgs, + options: HttpStreamOptions, +): Promise { + const threadId = + options.threadId ?? (await createThread(ctx, component)); + + const result = await streamText( + ctx, + component, + streamTextArgs, + { + ...options, + threadId, + }, + ); + + const response = result.toUIMessageStreamResponse(); + applyHeaders(response, result, options.corsHeaders); + return response; +} + +function applyHeaders( + response: Response, + result: { promptMessageId?: string; streamId?: string }, + corsHeaders?: Record, +) { + if (result.promptMessageId) { + response.headers.set("X-Message-Id", result.promptMessageId); + } + if (result.streamId) { + response.headers.set("X-Stream-Id", result.streamId); + } + if (corsHeaders) { + for (const [key, value] of Object.entries(corsHeaders)) { + response.headers.set(key, value); + } + } +} diff --git a/src/client/index.ts b/src/client/index.ts index 1995e096..51888276 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -17,7 +17,7 @@ import type { ToolChoice, ToolSet, } from "ai"; -import { generateObject, generateText, stepCountIs, streamObject } from "ai"; +import { generateObject, stepCountIs, streamObject } from "ai"; const MIGRATION_URL = "node_modules/@convex-dev/agent/MIGRATION.md"; const warnedDeprecations = new Set(); @@ -98,10 +98,16 @@ import type { AgentPrompt, Output, } from "./types.js"; +import { generateText as standaloneGenerateText } from "./generateText.js"; import { streamText } from "./streamText.js"; -import { errorToString, willContinue } from "./utils.js"; +import { errorToString } from "./utils.js"; export { stepCountIs } from "ai"; +export { + httpStreamText, + httpStreamUIMessages, + type HttpStreamOptions, +} from "./http.js"; export { docsToModelMessages, toModelMessage, @@ -161,7 +167,9 @@ export { embedMessages, embedMany, } from "./search.js"; +export { generateText } from "./generateText.js"; export { startGeneration } from "./start.js"; +export { streamText } from "./streamText.js"; export { DEFAULT_STREAMING_OPTIONS, DeltaStreamer, @@ -483,39 +491,25 @@ export class Agent< GenerateTextResult & GenerationOutputMetadata > { - const { args, promptMessageId, order, ...call } = await this.start( + type Tools = TOOLS extends undefined ? AgentTools : TOOLS; + return standaloneGenerateText( ctx, - generateTextArgs, - { ...threadOpts, ...options }, + this.component, + { + ...generateTextArgs, + model: generateTextArgs.model ?? this.options.languageModel, + tools: (generateTextArgs.tools ?? this.options.tools) as Tools, + system: generateTextArgs.system ?? this.options.instructions, + stopWhen: (generateTextArgs.stopWhen ?? this.options.stopWhen) as any, + }, + { + ...threadOpts, + ...this.options, + agentName: this.options.name, + agentForToolCtx: this, + ...options, + }, ); - - type Tools = TOOLS extends undefined ? AgentTools : TOOLS; - const steps: StepResult[] = []; - try { - const result = (await generateText({ - ...args, - prepareStep: async (options) => { - const result = await generateTextArgs.prepareStep?.(options); - call.updateModel(result?.model ?? options.model); - return result; - }, - onStepFinish: async (step) => { - steps.push(step); - await call.save({ step }, await willContinue(steps, args.stopWhen)); - return generateTextArgs.onStepFinish?.(step); - }, - })) as GenerateTextResult; - const metadata: GenerationOutputMetadata = { - promptMessageId, - order, - savedMessages: call.getSavedMessages(), - messageId: promptMessageId, - }; - return Object.assign(result, metadata); - } catch (error) { - await call.fail(errorToString(error)); - throw error; - } } /** @@ -1600,6 +1594,135 @@ export class Agent< }); } + /** + * Create a handler function for an HTTP streaming endpoint. + * Returns a plain function `(ctx, request) => Promise` that + * you wrap in your app's `httpAction()`. + * + * The handler parses the JSON body for `{ threadId?, prompt?, messages? }`, + * creates a thread if needed, streams the response, and returns it with + * `X-Message-Id` and `X-Stream-Id` headers. + * + * @example + * ```ts + * import { httpAction } from "./_generated/server"; + * + * // In convex/http.ts: + * http.route({ + * path: "/chat", + * method: "POST", + * handler: httpAction(myAgent.asHttpAction()), + * }); + * ``` + */ + asHttpAction( + spec?: MaybeCustomCtx & { + /** + * Whether to save incremental data (deltas) from streaming responses + * to the database alongside the HTTP stream. Defaults to false. + */ + saveStreamDeltas?: boolean | StreamingOptions; + /** + * When to stop generating text. + * Defaults to the {@link Agent["options"].stopWhen} option. + */ + stopWhen?: StopCondition | Array>; + /** + * The response format: + * - `"text"` (default) — plain text stream via `toTextStreamResponse()` + * - `"ui-messages"` — rich stream via `toUIMessageStreamResponse()` + */ + format?: "text" | "ui-messages"; + /** + * Extra headers to add to the response (e.g. CORS headers). + */ + corsHeaders?: Record; + /** + * Optional authorization callback. Receives the raw request and + * returns `{ userId?, threadId? }` to override the body values. + * Throw to reject the request. + */ + authorize?: ( + ctx: GenericActionCtx, + request: Request, + ) => Promise<{ userId?: string; threadId?: string } | void>; + } & Options, + ): ( + ctx: GenericActionCtx, + request: Request, + ) => Promise { + return async (ctx_: GenericActionCtx, request: Request) => { + const body = (await request.json()) as { + threadId?: string; + prompt?: string; + messages?: any[]; + [key: string]: unknown; + }; + + let userId: string | undefined; + let threadId: string | undefined = body.threadId; + + if (spec?.authorize) { + const authResult = await spec.authorize(ctx_, request); + if (authResult?.userId) userId = authResult.userId; + if (authResult?.threadId) threadId = authResult.threadId; + } + + const ctx = ( + spec?.customCtx + ? { + ...ctx_, + ...spec.customCtx( + ctx_, + { userId, threadId }, + { prompt: body.prompt } as any, + ), + } + : ctx_ + ) as GenericActionCtx & CustomCtx; + + if (!threadId) { + threadId = await createThread(ctx, this.component, { + userId: userId ?? null, + }); + } + + const result = await this.streamText( + ctx as ActionCtx & CustomCtx, + { threadId, userId }, + { + prompt: body.prompt, + messages: body.messages?.map(toModelMessage), + stopWhen: spec?.stopWhen, + }, + { + contextOptions: spec?.contextOptions, + storageOptions: spec?.storageOptions, + saveStreamDeltas: spec?.saveStreamDeltas, + }, + ); + + const response = + spec?.format === "ui-messages" + ? result.toUIMessageStreamResponse() + : result.toTextStreamResponse(); + + if (result.promptMessageId) { + response.headers.set("X-Message-Id", result.promptMessageId); + } + if (result.streamId) { + response.headers.set("X-Stream-Id", result.streamId); + } + if (spec?.corsHeaders) { + for (const [key, value] of Object.entries(spec.corsHeaders)) { + response.headers.set(key, value); + } + } + + return response; + }; + } + /** * @deprecated Use {@link saveMessages} directly instead. */ diff --git a/src/client/streamText.ts b/src/client/streamText.ts index b1276832..34e8d317 100644 --- a/src/client/streamText.ts +++ b/src/client/streamText.ts @@ -181,6 +181,7 @@ export async function streamText< promptMessageId, order, savedMessages: call.getSavedMessages(), + streamId: streamer?.streamId, messageId: promptMessageId, }; return Object.assign(result, metadata); diff --git a/src/client/types.ts b/src/client/types.ts index ea07439a..a17e6a1c 100644 --- a/src/client/types.ts +++ b/src/client/types.ts @@ -268,6 +268,14 @@ export type GenerationOutputMetadata = { * If you passed promptMessageId, it will not include that message. */ savedMessages?: MessageDoc[]; + /** + * The ID of the delta stream, if `saveStreamDeltas` was enabled. + * Useful for HTTP streaming deduplication: pass this as a `skipStreamIds` + * entry to `useUIMessages` / `useThreadMessages` to avoid showing the + * same content twice (once from the HTTP stream and once from the + * persisted delta stream). + */ + streamId?: string; /** * @deprecated Use promptMessageId instead. * The ID of the prompt message for the generation. diff --git a/src/react/httpStreamUtils.test.ts b/src/react/httpStreamUtils.test.ts new file mode 100644 index 00000000..b519fd7c --- /dev/null +++ b/src/react/httpStreamUtils.test.ts @@ -0,0 +1,94 @@ +import { describe, expect, test } from "vitest"; +import { consumeTextStream, supportsStreaming } from "./httpStreamUtils.js"; + +function makeReadableStream( + chunks: string[], +): ReadableStream { + const encoder = new TextEncoder(); + return new ReadableStream({ + start(controller) { + for (const chunk of chunks) { + controller.enqueue(encoder.encode(chunk)); + } + controller.close(); + }, + }); +} + +describe("consumeTextStream", () => { + test("decodes chunks and calls onChunk", async () => { + const chunks: string[] = []; + const stream = makeReadableStream(["Hello ", "world", "!"]); + await consumeTextStream(stream.getReader(), { + onChunk: (text) => chunks.push(text), + }); + expect(chunks).toEqual(["Hello ", "world", "!"]); + }); + + test("handles empty stream", async () => { + const chunks: string[] = []; + const stream = makeReadableStream([]); + await consumeTextStream(stream.getReader(), { + onChunk: (text) => chunks.push(text), + }); + expect(chunks).toEqual([]); + }); + + test("handles single large chunk", async () => { + const longText = "A".repeat(10000); + const chunks: string[] = []; + const stream = makeReadableStream([longText]); + await consumeTextStream(stream.getReader(), { + onChunk: (text) => chunks.push(text), + }); + expect(chunks.join("")).toBe(longText); + }); + + test("stops when abort signal is already aborted", async () => { + const chunks: string[] = []; + const controller = new AbortController(); + // Abort before consuming + controller.abort(); + + const stream = makeReadableStream(["first ", "second"]); + + await consumeTextStream(stream.getReader(), { + onChunk: (text) => chunks.push(text), + signal: controller.signal, + }); + + // Should not consume any chunks since signal is already aborted + expect(chunks).toEqual([]); + }); + + test("handles multi-byte UTF-8 characters split across chunks", async () => { + // The emoji "😀" is 4 bytes in UTF-8: F0 9F 98 80 + const emoji = new Uint8Array([0xf0, 0x9f, 0x98, 0x80]); + const chunks: string[] = []; + + // Split the emoji across two chunks + const stream = new ReadableStream({ + start(ctrl) { + ctrl.enqueue(emoji.slice(0, 2)); // First 2 bytes + ctrl.enqueue(emoji.slice(2, 4)); // Last 2 bytes + ctrl.close(); + }, + }); + + await consumeTextStream(stream.getReader(), { + onChunk: (text) => chunks.push(text), + }); + + // TextDecoder with stream: true handles this correctly + // First chunk produces empty string (incomplete character) + // Second chunk produces the full emoji + flush produces nothing + expect(chunks.join("")).toContain("\u{1F600}"); + }); +}); + +describe("supportsStreaming", () => { + test("returns true in browser-like environment", () => { + // Node test environment has ReadableStream and fetch + expect(supportsStreaming()).toBe(true); + }); +}); diff --git a/src/react/httpStreamUtils.ts b/src/react/httpStreamUtils.ts new file mode 100644 index 00000000..9c371d81 --- /dev/null +++ b/src/react/httpStreamUtils.ts @@ -0,0 +1,45 @@ +/** + * Consume a ReadableStream of Uint8Array chunks, decoding them as text + * and calling `onChunk` for each decoded segment. + * + * Handles multi-byte characters correctly using `TextDecoder({ stream: true })`. + */ +export async function consumeTextStream( + reader: ReadableStreamDefaultReader, + options: { + onChunk: (text: string) => void; + signal?: AbortSignal; + }, +): Promise { + const decoder = new TextDecoder(); + + try { + while (true) { + if (options.signal?.aborted) break; + const { done, value } = await reader.read(); + if (done) break; + const text = decoder.decode(value, { stream: true }); + options.onChunk(text); + } + // Flush any remaining bytes in the decoder + const remaining = decoder.decode(); + if (remaining) { + options.onChunk(remaining); + } + } finally { + reader.releaseLock(); + } +} + +/** + * Returns `true` when running in a browser that supports + * `ReadableStream` on `Response.body`. Returns `false` during SSR + * or in environments where streaming fetch is unavailable. + */ +export function supportsStreaming(): boolean { + return ( + typeof globalThis !== "undefined" && + typeof globalThis.ReadableStream !== "undefined" && + typeof globalThis.fetch !== "undefined" + ); +} diff --git a/src/react/index.ts b/src/react/index.ts index 2ede9a07..c49bdb2f 100644 --- a/src/react/index.ts +++ b/src/react/index.ts @@ -12,9 +12,15 @@ export { } from "./useThreadMessages.js"; export { type UIMessagesQuery, useUIMessages } from "./useUIMessages.js"; export { useStreamingUIMessages } from "./useStreamingUIMessages.js"; +export { useHttpStream } from "./useHttpStream.js"; +export { consumeTextStream, supportsStreaming } from "./httpStreamUtils.js"; /** - * @deprecated use useThreadMessages or useStreamingThreadMessages instead + * @deprecated Use {@link useHttpStream} instead for HTTP streaming with + * deduplication support (`streamId`, `messageId`, `abort()`), or use + * `useUIMessages` / `useThreadMessages` with `stream: true` for + * WebSocket delta streaming. + * * Use this hook to stream text from a server action, using the * toTextStreamResponse or equivalent HTTP streaming endpoint returning text. * @param url The URL of the server action to stream text from. diff --git a/src/react/useHttpStream.ts b/src/react/useHttpStream.ts new file mode 100644 index 00000000..f59f85db --- /dev/null +++ b/src/react/useHttpStream.ts @@ -0,0 +1,146 @@ +"use client"; +import { useCallback, useRef, useState } from "react"; +import { consumeTextStream } from "./httpStreamUtils.js"; + +/** + * React hook for consuming an HTTP text stream from a Convex HTTP action. + * + * Returns `streamId` and `messageId` from response headers so you can + * pass them to `useUIMessages` via `skipStreamIds` for deduplication. + * + * @example + * ```tsx + * const httpStream = useHttpStream({ url: `${siteUrl}/chat` }); + * const messages = useUIMessages(api.chat.listMessages, { threadId }, { + * stream: true, + * skipStreamIds: httpStream.streamId ? [httpStream.streamId] : [], + * }); + * + * // Send a message + * await httpStream.send({ threadId, prompt: "Hello!" }); + * ``` + */ +export function useHttpStream(options: { + /** The full URL of the HTTP streaming endpoint. */ + url: string; + /** + * Auth token to send as `Authorization: Bearer `. + * e.g. from `useAuthToken()` via `@convex-dev/auth/react`. + */ + token?: string; + /** Additional headers to include in the request. */ + headers?: Record; +}): { + /** The accumulated text received so far. */ + text: string; + /** Whether a stream is currently active. */ + isStreaming: boolean; + /** The last error encountered, if any. */ + error: Error | null; + /** + * The delta stream ID from the `X-Stream-Id` response header. + * Pass to `skipStreamIds` on `useUIMessages` to avoid duplicating + * this stream's content. + */ + streamId: string | null; + /** The prompt message ID from the `X-Message-Id` response header. */ + messageId: string | null; + /** + * Send a request to the streaming endpoint. + * The body is JSON-serialized and sent as a POST request. + */ + send: (body: { + threadId?: string; + prompt?: string; + [key: string]: unknown; + }) => Promise; + /** Abort the current stream, if any. */ + abort: () => void; +} { + const [text, setText] = useState(""); + const [isStreaming, setIsStreaming] = useState(false); + const [error, setError] = useState(null); + const [streamId, setStreamId] = useState(null); + const [messageId, setMessageId] = useState(null); + const abortControllerRef = useRef(null); + + const abort = useCallback(() => { + abortControllerRef.current?.abort(); + abortControllerRef.current = null; + }, []); + + const send = useCallback( + async (body: { + threadId?: string; + prompt?: string; + [key: string]: unknown; + }) => { + // Abort any existing stream + abort(); + + const controller = new AbortController(); + abortControllerRef.current = controller; + + setText(""); + setError(null); + setStreamId(null); + setMessageId(null); + setIsStreaming(true); + + try { + const response = await fetch(options.url, { + method: "POST", + headers: { + "Content-Type": "application/json", + ...(options.token + ? { Authorization: `Bearer ${options.token}` } + : {}), + ...options.headers, + }, + body: JSON.stringify(body), + signal: controller.signal, + }); + + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + + // Extract metadata headers + const responseStreamId = response.headers.get("X-Stream-Id"); + const responseMessageId = response.headers.get("X-Message-Id"); + if (responseStreamId) setStreamId(responseStreamId); + if (responseMessageId) setMessageId(responseMessageId); + + if (!response.body) { + throw new Error("Response body is not readable"); + } + + const reader = response.body.getReader(); + let accumulated = ""; + + await consumeTextStream(reader, { + onChunk: (chunk) => { + accumulated += chunk; + setText(accumulated); + }, + signal: controller.signal, + }); + } catch (e) { + if (e instanceof Error && e.name === "AbortError") { + // Intentional abort — not an error + return; + } + const err = e instanceof Error ? e : new Error(String(e)); + setError(err); + } finally { + setIsStreaming(false); + if (abortControllerRef.current === controller) { + abortControllerRef.current = null; + } + } + }, + [options.url, options.token, options.headers, abort], + ); + + return { text, isStreaming, error, streamId, messageId, send, abort }; +} From 5518ef9a80dc8ea80c8af8144f11b15c8c2fee14 Mon Sep 17 00:00:00 2001 From: Seth Raphael Date: Mon, 23 Feb 2026 22:53:01 -0800 Subject: [PATCH 3/4] Add tool approval to streaming demo with HTTP continuation support - New streaming demo agent with approval tools (deleteFile, transferMoney, checkBalance) - Backend: submitApproval, triggerContinuation, continueAfterApprovals endpoints - UI: approval buttons, denial reason input, auto-continuation, disabled input while pending - asHttpAction() now accepts promptMessageId for HTTP streaming continuations - HTTP mode continuations stay in HTTP mode instead of falling back to delta streaming Co-Authored-By: Claude Opus 4.6 --- example/convex/_generated/api.d.ts | 2 + example/convex/agents/streamingDemo.ts | 61 ++++++ example/convex/chat/streamingDemo.ts | 78 +++++++- example/ui/chat/StreamingDemo.tsx | 257 ++++++++++++++++++++++--- src/client/http.test.ts | 39 ++++ src/client/index.ts | 2 + 6 files changed, 409 insertions(+), 30 deletions(-) create mode 100644 example/convex/agents/streamingDemo.ts diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index 84bd78fa..9f706627 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -13,6 +13,7 @@ import type * as agents_config from "../agents/config.js"; import type * as agents_fashion from "../agents/fashion.js"; import type * as agents_simple from "../agents/simple.js"; import type * as agents_story from "../agents/story.js"; +import type * as agents_streamingDemo from "../agents/streamingDemo.js"; import type * as agents_weather from "../agents/weather.js"; import type * as chat_approval from "../chat/approval.js"; import type * as chat_basic from "../chat/basic.js"; @@ -63,6 +64,7 @@ declare const fullApi: ApiFromModules<{ "agents/fashion": typeof agents_fashion; "agents/simple": typeof agents_simple; "agents/story": typeof agents_story; + "agents/streamingDemo": typeof agents_streamingDemo; "agents/weather": typeof agents_weather; "chat/approval": typeof chat_approval; "chat/basic": typeof chat_basic; diff --git a/example/convex/agents/streamingDemo.ts b/example/convex/agents/streamingDemo.ts new file mode 100644 index 00000000..c1824862 --- /dev/null +++ b/example/convex/agents/streamingDemo.ts @@ -0,0 +1,61 @@ +// Agent for the full streaming demo with tool approval support. +// Combines streaming patterns with approval-gated tools. +import { Agent, createTool, stepCountIs } from "@convex-dev/agent"; +import { components } from "../_generated/api"; +import { defaultConfig } from "./config"; +import { z } from "zod/v4"; + +// Tool that always requires approval +const deleteFileTool = createTool({ + description: "Delete a file from the system", + inputSchema: z.object({ + filename: z.string().describe("The name of the file to delete"), + }), + needsApproval: () => true, + execute: async (_ctx, input) => { + return `Successfully deleted file: ${input.filename}`; + }, +}); + +// Tool with conditional approval (requires approval for amounts > $100) +const transferMoneyTool = createTool({ + description: "Transfer money to an account", + inputSchema: z.object({ + amount: z.number().describe("The amount to transfer"), + toAccount: z.string().describe("The destination account"), + }), + needsApproval: async (_ctx, input) => { + return input.amount > 100; + }, + execute: async (_ctx, input) => { + return `Transferred $${input.amount} to account ${input.toAccount}`; + }, +}); + +// Tool that doesn't need approval +const checkBalanceTool = createTool({ + description: "Check the account balance", + inputSchema: z.object({ + accountId: z.string().describe("The account to check"), + }), + execute: async (_ctx, _input) => { + return `Balance: $1,234.56`; + }, +}); + +export const streamingDemoAgent = new Agent(components.agent, { + name: "Streaming Demo Agent", + instructions: + "You are a concise assistant who responds with emojis " + + "and abbreviations like lmao, lol, iirc, afaik, etc. where appropriate. " + + "You can delete files, transfer money, and check account balances. " + + "Always confirm what action you took after it completes.", + tools: { + deleteFile: deleteFileTool, + transferMoney: transferMoneyTool, + checkBalance: checkBalanceTool, + }, + stopWhen: stepCountIs(5), + ...defaultConfig, + callSettings: { temperature: 0 }, +}); diff --git a/example/convex/chat/streamingDemo.ts b/example/convex/chat/streamingDemo.ts index bf649aea..5a5c42f5 100644 --- a/example/convex/chat/streamingDemo.ts +++ b/example/convex/chat/streamingDemo.ts @@ -28,7 +28,7 @@ import { } from "../_generated/server"; import { v } from "convex/values"; import { authorizeThreadAccess } from "../threads"; -import { agent } from "../agents/simple"; +import { streamingDemoAgent } from "../agents/streamingDemo"; // ============================================================================ // Pattern 1: Async Delta Streaming (RECOMMENDED) @@ -45,7 +45,7 @@ export const sendMessage = mutation({ args: { prompt: v.string(), threadId: v.string() }, handler: async (ctx, { prompt, threadId }) => { await authorizeThreadAccess(ctx, threadId); - const { messageId } = await agent.saveMessage(ctx, { + const { messageId } = await streamingDemoAgent.saveMessage(ctx, { threadId, prompt, skipEmbeddings: true, @@ -61,7 +61,7 @@ export const sendMessage = mutation({ export const streamResponse = internalAction({ args: { promptMessageId: v.string(), threadId: v.string() }, handler: async (ctx, { promptMessageId, threadId }) => { - const result = await agent.streamText( + const result = await streamingDemoAgent.streamText( ctx, { threadId }, { promptMessageId }, @@ -86,7 +86,7 @@ export const streamResponse = internalAction({ // ============================================================================ export const streamOverHttp = httpAction( - agent.asHttpAction(), + streamingDemoAgent.asHttpAction(), ); // ============================================================================ @@ -100,7 +100,7 @@ export const streamOneShot = action({ args: { prompt: v.string(), threadId: v.string() }, handler: async (ctx, { prompt, threadId }) => { await authorizeThreadAccess(ctx, threadId); - await agent.streamText( + await streamingDemoAgent.streamText( ctx, { threadId }, { prompt }, @@ -109,6 +109,74 @@ export const streamOneShot = action({ }, }); +// ============================================================================ +// Tool Approval +// +// When the model calls a tool with `needsApproval`, generation pauses. +// The client shows Approve/Deny buttons. After all pending approvals are +// resolved, the client triggers continuation via delta streaming. +// +// In HTTP mode, the HTTP stream ends when approval is requested. The +// continuation runs via delta streaming, which the UI already subscribes to. +// ============================================================================ + +/** + * Submit an approval decision for a single tool call. + */ +export const submitApproval = mutation({ + args: { + threadId: v.string(), + approvalId: v.string(), + approved: v.boolean(), + reason: v.optional(v.string()), + }, + returns: v.object({ messageId: v.string() }), + handler: async (ctx, { threadId, approvalId, approved, reason }) => { + await authorizeThreadAccess(ctx, threadId); + const { messageId } = approved + ? await streamingDemoAgent.approveToolCall(ctx, { threadId, approvalId, reason }) + : await streamingDemoAgent.denyToolCall(ctx, { threadId, approvalId, reason }); + return { messageId }; + }, +}); + +/** + * Schedule continuation after all approvals are resolved. + */ +export const triggerContinuation = mutation({ + args: { + threadId: v.string(), + lastApprovalMessageId: v.string(), + }, + handler: async (ctx, { threadId, lastApprovalMessageId }) => { + await authorizeThreadAccess(ctx, threadId); + await ctx.scheduler.runAfter( + 0, + internal.chat.streamingDemo.continueAfterApprovals, + { threadId, lastApprovalMessageId }, + ); + }, +}); + +/** + * Continue generation after all approvals in a step have been resolved. + */ +export const continueAfterApprovals = internalAction({ + args: { + threadId: v.string(), + lastApprovalMessageId: v.string(), + }, + handler: async (ctx, { threadId, lastApprovalMessageId }) => { + const result = await streamingDemoAgent.streamText( + ctx, + { threadId }, + { promptMessageId: lastApprovalMessageId }, + { saveStreamDeltas: { chunking: "word", throttleMs: 100 } }, + ); + await result.consumeStream(); + }, +}); + // ============================================================================ // Queries: Messages + Stream Sync // ============================================================================ diff --git a/example/ui/chat/StreamingDemo.tsx b/example/ui/chat/StreamingDemo.tsx index 3cb3ea2e..849b58ec 100644 --- a/example/ui/chat/StreamingDemo.tsx +++ b/example/ui/chat/StreamingDemo.tsx @@ -10,6 +10,7 @@ * - Stream inspector panel showing active/finished/aborted streams * - Smooth text animation via useSmoothText * - Optimistic message sending + * - Tool approval flow (approve/deny buttons, auto-continuation) */ import { useAction, useMutation, useQuery } from "convex/react"; import { api } from "../../convex/_generated/api"; @@ -23,6 +24,7 @@ import { import { useCallback, useEffect, useRef, useState } from "react"; import { cn } from "@/lib/utils"; import { useDemoThread } from "@/hooks/use-demo-thread"; +import type { ToolUIPart } from "ai"; type StreamMode = "delta" | "http" | "oneshot"; @@ -166,7 +168,60 @@ function ChatPanel({ api.chat.streamingDemo.abortStreamByOrder, ); - const [prompt, setPrompt] = useState("Hello! Tell me a joke."); + // Tool approval mutations + const submitApproval = useMutation(api.chat.streamingDemo.submitApproval); + const triggerContinuation = useMutation(api.chat.streamingDemo.triggerContinuation); + + // Track the last approval messageId so we can use it for continuation. + const lastApprovalMessageIdRef = useRef(null); + // Track whether we've already triggered continuation for this batch. + const continuationTriggeredRef = useRef(false); + // Track the mode used when the request was sent, so continuation uses the same mode. + const requestModeRef = useRef(mode); + + const hasPendingApprovals = messages.some((m) => + m.parts.some( + (p) => p.type.startsWith("tool-") && (p as ToolUIPart).state === "approval-requested", + ), + ); + + // When all approvals are resolved (hasPendingApprovals goes false) + // and we have a saved messageId, trigger continuation. + // In HTTP mode, continuation also goes over HTTP. Otherwise, delta streaming. + useEffect(() => { + if ( + !hasPendingApprovals && + lastApprovalMessageIdRef.current && + !continuationTriggeredRef.current + ) { + continuationTriggeredRef.current = true; + const messageId = lastApprovalMessageIdRef.current; + lastApprovalMessageIdRef.current = null; + if (requestModeRef.current === "http") { + void httpStream.send({ threadId, promptMessageId: messageId }); + } else { + void triggerContinuation({ + threadId, + lastApprovalMessageId: messageId, + }); + } + } + if (hasPendingApprovals) { + continuationTriggeredRef.current = false; + } + }, [hasPendingApprovals, threadId, triggerContinuation, httpStream]); + + async function handleApproval(args: { + threadId: string; + approvalId: string; + approved: boolean; + reason?: string; + }) { + const { messageId } = await submitApproval(args); + lastApprovalMessageIdRef.current = messageId; + } + + const [prompt, setPrompt] = useState("Delete the file important.txt"); const messagesEndRef = useRef(null); const httpText = httpStream.text; @@ -186,6 +241,7 @@ function ChatPanel({ const text = prompt.trim(); if (!text) return; setPrompt(""); + requestModeRef.current = mode; if (mode === "delta") { await sendDelta({ threadId, prompt: text }); @@ -221,7 +277,12 @@ function ChatPanel({ !(httpStreaming && httpText && m.role === "assistant" && m.status === "pending"), ) .map((m) => ( - + ))} {httpStreaming && httpText && (() => { // Grab tool parts from the pending assistant message @@ -288,8 +349,9 @@ function ChatPanel({ type="text" value={prompt} onChange={(e) => setPrompt(e.target.value)} - className="flex-1 px-4 py-2 rounded-lg border border-gray-300 focus:outline-none focus:ring-2 focus:ring-indigo-400 bg-gray-50" - placeholder="Type a message..." + className="flex-1 px-4 py-2 rounded-lg border border-gray-300 focus:outline-none focus:ring-2 focus:ring-indigo-400 bg-gray-50 disabled:opacity-50 disabled:cursor-not-allowed" + placeholder={hasPendingApprovals ? "Respond to pending approvals first..." : "Type a message..."} + disabled={hasPendingApprovals} /> {isStreaming || httpStreaming ? ( @@ -323,6 +385,8 @@ function ChatPanel({ className="px-3 py-2 rounded-lg bg-gray-100 text-gray-600 hover:bg-gray-200 transition text-sm" onClick={() => { httpStream.abort(); + lastApprovalMessageIdRef.current = null; + continuationTriggeredRef.current = false; reset(); }} > @@ -348,7 +412,20 @@ function ChatPanel({ // Message Bubble // ============================================================================ -function MessageBubble({ message }: { message: UIMessage }) { +function MessageBubble({ + message, + threadId, + onApproval, +}: { + message: UIMessage; + threadId: string; + onApproval: (args: { + threadId: string; + approvalId: string; + approved: boolean; + reason?: string; + }) => Promise; +}) { const isUser = message.role === "user"; const [visibleText] = useSmoothText(message.text, { startStreaming: message.status === "streaming", @@ -361,7 +438,9 @@ function MessageBubble({ message }: { message: UIMessage }) { { startStreaming: message.status === "streaming" }, ); - const toolParts = message.parts.filter((p) => p.type.startsWith("tool-")); + const toolParts = message.parts.filter( + (p): p is ToolUIPart => p.type.startsWith("tool-"), + ); return (
    @@ -396,24 +475,14 @@ function MessageBubble({ message }: { message: UIMessage }) {
    )} - {/* Tool calls */} - {toolParts.map((p: any) => ( -
    - {p.type} - {p.state && ( - ({p.state}) - )} - {p.output && ( -
    - {typeof p.output === "string" - ? p.output - : JSON.stringify(p.output)} -
    - )} -
    + {/* Tool calls with approval UI */} + {toolParts.map((tool) => ( + ))} {/* Main text */} @@ -423,6 +492,144 @@ function MessageBubble({ message }: { message: UIMessage }) { ); } +// ============================================================================ +// Tool Call Display with Approval UI +// ============================================================================ + +function ToolCallDisplay({ + tool, + threadId, + onApproval, +}: { + tool: ToolUIPart; + threadId: string; + onApproval: (args: { + threadId: string; + approvalId: string; + approved: boolean; + reason?: string; + }) => Promise; +}) { + const [denialReason, setDenialReason] = useState(""); + const [showReasonInput, setShowReasonInput] = useState(false); + const toolName = tool.type.replace("tool-", ""); + const approvalId = getToolApprovalId(tool); + const approvalReason = getToolApprovalReason(tool); + + return ( +
    +
    + {toolName}({JSON.stringify(tool.input)}) +
    + + {tool.state === "approval-requested" && approvalId && ( +
    +
    + Approval required +
    + {showReasonInput ? ( +
    + setDenialReason(e.target.value)} + placeholder="Reason for denial..." + className="flex-1 px-2 py-1 text-sm rounded border border-gray-300" + /> + + +
    + ) : ( +
    + + +
    + )} +
    + )} + + {tool.state === "approval-responded" && ( +
    Approved - executing...
    + )} + + {tool.state === "output-denied" && ( +
    + Denied + {approvalReason && `: ${approvalReason}`} +
    + )} + + {tool.state === "output-available" && ( +
    + Result: {JSON.stringify("output" in tool ? tool.output : undefined)} +
    + )} + + {tool.state === "output-error" && ( +
    + Error: {"errorText" in tool ? tool.errorText : "Unknown error"} +
    + )} + + {(tool.state === "input-available" || tool.state === "input-streaming") && ( +
    Processing...
    + )} +
    + ); +} + +function getToolApprovalId(tool: ToolUIPart): string | undefined { + if (tool.state !== "approval-requested" || !("approval" in tool)) { + return undefined; + } + const approval = tool.approval as { id?: unknown } | undefined; + return typeof approval?.id === "string" ? approval.id : undefined; +} + +function getToolApprovalReason(tool: ToolUIPart): string | undefined { + if ( + (tool.state !== "output-denied" && tool.state !== "approval-requested") || + !("approval" in tool) + ) { + return undefined; + } + const approval = tool.approval as { reason?: unknown } | undefined; + return typeof approval?.reason === "string" ? approval.reason : undefined; +} + // ============================================================================ // Stream Inspector Panel // ============================================================================ diff --git a/src/client/http.test.ts b/src/client/http.test.ts index 1882b587..fcc53105 100644 --- a/src/client/http.test.ts +++ b/src/client/http.test.ts @@ -241,6 +241,33 @@ export const testAsHttpActionUIMessages = action({ }, }); +export const testAsHttpActionWithPromptMessageId = action({ + args: {}, + handler: async (ctx) => { + const threadId = await createThread(ctx, components.agent, {}); + // First, save a user message to get a promptMessageId + const { messageId } = await agent.saveMessage(ctx, { + threadId, + prompt: "Hello", + skipEmbeddings: true, + }); + // Then use the HTTP action with promptMessageId (continuation pattern) + const handler = agent.asHttpAction(); + const request = new Request("https://example.com/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ threadId, promptMessageId: messageId }), + }); + const response = await handler(ctx as any, request); + const text = await response.text(); + return { + status: response.status, + hasText: text.length > 0, + hasMessageId: response.headers.has("X-Message-Id"), + }; + }, +}); + const testApi: ApiFromModules<{ fns: { testStreamTextStandalone: typeof testStreamTextStandalone; @@ -252,6 +279,7 @@ const testApi: ApiFromModules<{ testAsHttpActionWithCorsHeaders: typeof testAsHttpActionWithCorsHeaders; testAsHttpActionWithSaveDeltas: typeof testAsHttpActionWithSaveDeltas; testAsHttpActionUIMessages: typeof testAsHttpActionUIMessages; + testAsHttpActionWithPromptMessageId: typeof testAsHttpActionWithPromptMessageId; }; }>["fns"] = anyApi["http.test"] as any; @@ -352,4 +380,15 @@ describe("agent.asHttpAction()", () => { expect(result.hasMessageId).toBe(true); expect(result.textDiffers).toBe(true); }); + + test("accepts promptMessageId for continuations", async () => { + const t = initConvexTest(schema); + const result = await t.action( + testApi.testAsHttpActionWithPromptMessageId, + {}, + ); + expect(result.status).toBe(200); + expect(result.hasText).toBe(true); + expect(result.hasMessageId).toBe(true); + }); }); diff --git a/src/client/index.ts b/src/client/index.ts index 51888276..997c022b 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -1655,6 +1655,7 @@ export class Agent< const body = (await request.json()) as { threadId?: string; prompt?: string; + promptMessageId?: string; messages?: any[]; [key: string]: unknown; }; @@ -1692,6 +1693,7 @@ export class Agent< { threadId, userId }, { prompt: body.prompt, + promptMessageId: body.promptMessageId, messages: body.messages?.map(toModelMessage), stopWhen: spec?.stopWhen, }, From 0e4d9f01dbcc56ba82e4fc87650fb6e4ffaedd01 Mon Sep 17 00:00:00 2001 From: Seth Raphael Date: Mon, 23 Feb 2026 23:01:07 -0800 Subject: [PATCH 4/4] Fix CodeRabbit review issues: URL derivation warning, test unhandled rejection - Add console.warn when Convex URL doesn't end with .cloud (HTTP streaming) - Replace throwing onAsyncAbort in integration test with no-op to prevent unhandled promise rejections from fire-and-forget delta writes Co-Authored-By: Claude Opus 4.6 --- example/ui/chat/StreamingDemo.tsx | 3 +++ src/client/streaming.integration.test.ts | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/example/ui/chat/StreamingDemo.tsx b/example/ui/chat/StreamingDemo.tsx index 849b58ec..94f93b43 100644 --- a/example/ui/chat/StreamingDemo.tsx +++ b/example/ui/chat/StreamingDemo.tsx @@ -140,6 +140,9 @@ function ChatPanel({ reset: () => void; }) { const convexUrl = import.meta.env.VITE_CONVEX_URL as string; + if (!convexUrl.endsWith(".cloud")) { + console.warn("Unexpected Convex URL format; HTTP streaming may not work:", convexUrl); + } const httpUrl = convexUrl.replace(/\.cloud$/, ".site"); const httpStream = useHttpStream({ url: `${httpUrl}/streamTextDemo` }); diff --git a/src/client/streaming.integration.test.ts b/src/client/streaming.integration.test.ts index a2cfee09..deb8e64c 100644 --- a/src/client/streaming.integration.test.ts +++ b/src/client/streaming.integration.test.ts @@ -597,7 +597,7 @@ describe("Delta Stream Consumption", () => { abortSignal: undefined, compress: compressUIMessageChunks, onAsyncAbort: async () => { - throw new Error("async abort"); + // No-op — async aborts can happen in integration tests }, }, { ...testMetadata, threadId },