Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 302 additions & 0 deletions docs/http-streaming-requirements.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
# Technical Requirements: HTTP Streaming for @convex-dev/agent

## Status: Draft
## Date: 2026-02-23

---

## 1. Executive Summary

The current streaming architecture relies exclusively on Convex's reactive query system (WebSocket-based delta polling). This document specifies requirements for adding HTTP streaming support, including delta filtering logic, stream ID lifecycle management, and backwards compatibility constraints.

---

## 2. Current Architecture

### 2.1 Streaming Transport (WebSocket Delta Polling)

The existing system persists stream data as discrete deltas in the database, which clients poll via Convex reactive queries. There is no HTTP streaming transport.

**Flow:**
1. `DeltaStreamer` (client action) writes compressed parts via `streams.addDelta` mutations
2. React hooks (`useDeltaStreams`) issue two reactive queries per render cycle:
- `kind: "list"` — discovers active `streamingMessages` for the thread
- `kind: "deltas"` — fetches new deltas using per-stream cursors
3. `deriveUIMessagesFromDeltas()` materializes `UIMessage[]` from accumulated deltas

**Key files:**
- `src/client/streaming.ts` — `DeltaStreamer` class, compression, `syncStreams()`
- `src/component/streams.ts` — Backend mutations/queries (`create`, `addDelta`, `listDeltas`, `finish`, `abort`)
- `src/react/useDeltaStreams.ts` — Client-side cursor tracking and delta accumulation
- `src/deltas.ts` — Delta-to-UIMessage materialization

### 2.2 Stream State Machine

```
create() addDelta() (with heartbeat)
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ streaming │─────▶│ streaming │──── heartbeat every ~2.5 min
└──────────┘ └──────────┘
│ │
│ finish() │ abort() / timeout (10 min)
▼ ▼
┌──────────┐ ┌─────────┐
│ finished │ │ aborted │
└──────────┘ └─────────┘
│ cleanup (5 min delay)
[deleted]
```

### 2.3 Data Formats

Two delta formats are supported, declared per-stream:

| Format | Description | Primary Use |
|--------|-------------|-------------|
| `UIMessageChunk` | AI SDK v6 native format (`text-delta`, `tool-input-delta`, `reasoning-delta`, etc.) | Default for new streams |
| `TextStreamPart` | Legacy AI SDK format | Backwards compatibility |

---

## 3. HTTP Streaming Requirements

### 3.1 Transport Layer

**REQ-HTTP-1**: Provide an HTTP streaming endpoint that emits deltas as Server-Sent Events (SSE) or newline-delimited JSON (NDJSON), enabling clients that cannot use Convex WebSocket subscriptions (e.g., non-JS environments, CLI tools, third-party integrations).

**REQ-HTTP-2**: The HTTP endpoint must support resumption. A client that disconnects and reconnects with a cursor value must receive only deltas it hasn't seen, not replay the full stream.

**REQ-HTTP-3**: The HTTP endpoint must respect the same rate-limiting constants as the WebSocket path:
- `MAX_DELTAS_PER_REQUEST = 1000` (total across all streams)
- `MAX_DELTAS_PER_STREAM = 100` (per stream per request)

**REQ-HTTP-4**: The HTTP endpoint must support filtering by stream status (`streaming`, `finished`, `aborted`) matching the existing `listStreams` query interface.

**REQ-HTTP-5**: The HTTP endpoint must emit a terminal event when the stream reaches `finished` or `aborted` state, so clients know to stop polling/listening.

### 3.2 Response Format

**REQ-HTTP-6**: Each SSE/NDJSON frame must include:
```typescript
{
streamId: string; // ID of the streaming message
start: number; // Inclusive cursor position
end: number; // Exclusive cursor position
parts: any[]; // Delta parts (UIMessageChunk[] or TextStreamPart[])
}
```

This matches the existing `StreamDelta` type (`src/validators.ts:628-634`).

**REQ-HTTP-7**: Stream metadata must be available either as an initial frame or via a separate endpoint, containing:
```typescript
{
streamId: string;
status: "streaming" | "finished" | "aborted";
format: "UIMessageChunk" | "TextStreamPart" | undefined;
order: number;
stepOrder: number;
userId?: string;
agentName?: string;
model?: string;
provider?: string;
providerOptions?: ProviderOptions;
}
```

This matches the existing `StreamMessage` type (`src/validators.ts:607-626`).

---

## 4. Delta Stream Filtering Logic

### 4.1 Server-Side Filtering

**REQ-FILT-1**: The `listDeltas` query must continue to filter by stream ID + cursor position using the `streamId_start_end` index:
```
.withIndex("streamId_start_end", (q) =>
q.eq("streamId", cursor.streamId).gte("start", cursor.cursor))
```

**REQ-FILT-2**: Stream discovery (`list` query) must filter by:
- `threadId` (required) — scoped to a single thread
- `state.kind` (optional, defaults to `["streaming"]`) — which statuses to include
- `startOrder` (optional, defaults to 0) — minimum message order position

This uses the compound index `threadId_state_order_stepOrder`.

**REQ-FILT-3**: For HTTP streaming, add support for filtering deltas by a single `streamId` (not requiring `threadId`), for clients that already know which stream they want to follow.

### 4.2 Client-Side Filtering

**REQ-FILT-4**: The `useDeltaStreams` hook's cursor management must be preserved:
- Per-stream cursor tracking via `Record<string, number>`
- Gap detection: assert `previousEnd === delta.start` for consecutive deltas
- Stale delta rejection: skip deltas where `delta.start < oldCursor`
- Cache-friendly `startOrder` rounding (round down to nearest 10)

**REQ-FILT-5**: Support `skipStreamIds` filtering to allow callers to exclude specific streams (used when streams are already materialized from stored messages).

### 4.3 Delta Compression

**REQ-FILT-6**: Delta compression must happen before persistence (in `DeltaStreamer.#createDelta`). Two compression strategies:

1. **UIMessageChunk compression** (`compressUIMessageChunks`):
- Merge consecutive `text-delta` parts with same `id` by concatenating `.delta`
- Merge consecutive `reasoning-delta` parts with same `id` by concatenating `.delta`

2. **TextStreamPart compression** (`compressTextStreamParts`):
- Merge consecutive `text-delta` parts with same `id` by concatenating `.text`
- Merge consecutive `reasoning-delta` parts with same `id` by concatenating `.text`
- Strip `Uint8Array` data from `file` parts (not suitable for delta transport)

**REQ-FILT-7**: Throttling must remain configurable per-stream:
- Default: `250ms` between delta writes
- Configurable via `StreamingOptions.throttleMs`
- Chunking granularity: `"word"`, `"line"`, `RegExp`, or custom `ChunkDetector` (default: `/[\p{P}\s]/u` — punctuation + whitespace)

---

## 5. Stream ID Tracking

### 5.1 Stream ID Lifecycle

**REQ-SID-1**: Stream IDs are Convex document IDs (`Id<"streamingMessages">`) generated lazily on first delta write:
- `DeltaStreamer.getStreamId()` creates the stream via `streams.create` mutation
- Race-condition safe: only one creation promise via `#creatingStreamIdPromise`
- Stream ID is `undefined` until the first `addParts()` call

**REQ-SID-2**: The `streams.create` mutation must:
1. Insert a `streamingMessages` document with `state: { kind: "streaming", lastHeartbeat: Date.now() }`
2. Schedule a timeout function at `TIMEOUT_INTERVAL` (10 minutes)
3. Patch the document with the `timeoutFnId`

**REQ-SID-3**: Stream IDs must be passed to `addMessages` via `finishStreamId` for atomic stream finish + message persistence (prevents UI flicker from separate mutations).

### 5.2 Client-Side Stream ID Management

**REQ-SID-4**: React hooks must track multiple concurrent streams per thread:
- `useDeltaStreams` returns `Array<{ streamMessage: StreamMessage; deltas: StreamDelta[] }>`
- Each stream accumulates deltas independently
- Streams are sorted by `[order, stepOrder]` for display

**REQ-SID-5**: When a thread changes (`threadId` differs from previous render):
- Clear all accumulated delta streams (`state.deltaStreams = undefined`)
- Reset all cursors (`setCursors({})`)
- Reset `startOrder`

**REQ-SID-6**: Stream identity in UIMessages uses the convention `id: "stream:{streamId}"` to distinguish streaming messages from persisted messages.

### 5.3 Heartbeat & Timeout

**REQ-SID-7**: Heartbeat behavior:
- Triggered on every `addDelta` call
- Debounced: only writes if >2.5 minutes since last heartbeat (`TIMEOUT_INTERVAL / 4`)
- Updates `state.lastHeartbeat` and reschedules the timeout function

**REQ-SID-8**: Timeout behavior:
- After 10 minutes of inactivity, `timeoutStream` internal mutation fires
- Checks if `lastHeartbeat + TIMEOUT_INTERVAL < Date.now()`
- If expired: aborts the stream with reason `"timeout"`
- If not expired: reschedules for the remaining time

**REQ-SID-9**: Cleanup behavior:
- `finish()` schedules `deleteStream` after `DELETE_STREAM_DELAY` (5 minutes)
- `deleteStream` removes the `streamingMessages` document and all associated `streamDeltas`
- 5-minute delay allows clients to fetch final deltas before cleanup

---

## 6. Backwards Compatibility Requirements

### 6.1 Transport Compatibility

**REQ-BC-1**: The existing WebSocket/reactive-query streaming path must remain the default and primary transport. HTTP streaming is additive, not a replacement.

**REQ-BC-2**: All existing public APIs must remain unchanged:
- `syncStreams()` function signature and return type (`SyncStreamsReturnValue`)
- `listStreams()` function signature
- `abortStream()` function signature
- `vStreamMessagesReturnValue` validator

**REQ-BC-3**: The `StreamArgs` union type must be extended (not replaced) to support HTTP streaming parameters:
```typescript
// Existing (preserved):
type StreamArgs =
| { kind: "list"; startOrder: number }
| { kind: "deltas"; cursors: Array<{ streamId: string; cursor: number }> }
// New (additive):
| { kind: "http"; streamId: string; cursor?: number }
```

### 6.2 Data Format Compatibility

**REQ-BC-4**: Both `UIMessageChunk` and `TextStreamPart` delta formats must be supported in perpetuity. The `format` field on `streamingMessages` is `v.optional(...)`, so streams created before format tracking was added (format = `undefined`) must default to `TextStreamPart` behavior.

**REQ-BC-5**: Forward compatibility for new `TextStreamPart` types from future AI SDK versions must be maintained via the `default` case in `updateFromTextStreamParts` (`src/deltas.ts:520-527`):
```typescript
default: {
console.warn(`Received unexpected part: ${JSON.stringify(part)}`);
break;
}
```

**REQ-BC-6**: The `readUIMessageStream` error suppression for `"no tool invocation found"` must be preserved (`src/deltas.ts:77-81`). This handles tool approval continuation streams that have `tool-result` without the original `tool-call`.

### 6.3 React Hook Compatibility

**REQ-BC-7**: Existing React hooks must not change behavior:
- `useThreadMessages` — paginated messages + streaming
- `useUIMessages` — UIMessage-first with metadata
- `useSmoothText` — animated text rendering

**REQ-BC-8**: New HTTP-streaming React hooks (if any) must be additive exports from `@convex-dev/agent/react`, not replacements.

### 6.4 Schema Compatibility

**REQ-BC-9**: No breaking changes to the component schema tables:
- `streamingMessages` — no field removals or type changes
- `streamDeltas` — no field removals or type changes
- Indexes must not be dropped (can add new ones)

**REQ-BC-10**: The `vStreamDelta` and `vStreamMessage` validators must remain structurally compatible. New optional fields may be added but existing fields must not change type or be removed.

### 6.5 Export Surface Compatibility

**REQ-BC-11**: All four export surfaces must remain stable:
- `@convex-dev/agent` — main exports
- `@convex-dev/agent/react` — React hooks
- `@convex-dev/agent/validators` — Convex validators
- `@convex-dev/agent/test` — testing utilities

HTTP streaming additions should be exported from the main surface or a new `@convex-dev/agent/http` surface (not mixed into existing surfaces that would break tree-shaking).

---

## 7. Non-Functional Requirements

**REQ-NF-1**: HTTP streaming latency must not exceed the WebSocket path latency by more than 100ms for equivalent payload sizes.

**REQ-NF-2**: HTTP streaming must support concurrent streams per thread (matching current behavior of up to 100 active streams per thread, per the `list` query's `.take(100)`).

**REQ-NF-3**: HTTP streaming must gracefully handle client disconnection without leaving orphaned streams (existing heartbeat/timeout mechanism applies).

**REQ-NF-4**: Delta writes must remain throttled at the configured `throttleMs` regardless of transport, to avoid excessive database writes.

---

## 8. Open Questions

1. **SSE vs NDJSON**: Should the HTTP transport use SSE (native browser support, automatic reconnection) or NDJSON (simpler, works with `fetch` + `ReadableStream`)?

2. **Authentication**: How should HTTP streaming endpoints authenticate? Convex actions have auth context, but raw HTTP endpoints may need token-based auth.

3. **Multi-stream HTTP**: Should a single HTTP connection support multiplexed streams (like the current WebSocket path with multi-cursor queries), or should each HTTP connection follow a single stream?

4. **Convex HTTP actions**: Should HTTP streaming be implemented as Convex HTTP actions (which have a 2-minute timeout and limited streaming support), or as a separate server/proxy?

5. **Atomic finish over HTTP**: The current `finishStreamId` pattern enables atomic stream finish + message save. How should this translate to the HTTP transport where the client may not be the writer?
4 changes: 4 additions & 0 deletions example/convex/_generated/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import type * as agents_config from "../agents/config.js";
import type * as agents_fashion from "../agents/fashion.js";
import type * as agents_simple from "../agents/simple.js";
import type * as agents_story from "../agents/story.js";
import type * as agents_streamingDemo from "../agents/streamingDemo.js";
import type * as agents_weather from "../agents/weather.js";
import type * as chat_approval from "../chat/approval.js";
import type * as chat_basic from "../chat/basic.js";
import type * as chat_human from "../chat/human.js";
import type * as chat_streamAbort from "../chat/streamAbort.js";
import type * as chat_streaming from "../chat/streaming.js";
import type * as chat_streamingDemo from "../chat/streamingDemo.js";
import type * as chat_streamingReasoning from "../chat/streamingReasoning.js";
import type * as chat_withoutAgent from "../chat/withoutAgent.js";
import type * as crons from "../crons.js";
Expand Down Expand Up @@ -62,12 +64,14 @@ declare const fullApi: ApiFromModules<{
"agents/fashion": typeof agents_fashion;
"agents/simple": typeof agents_simple;
"agents/story": typeof agents_story;
"agents/streamingDemo": typeof agents_streamingDemo;
"agents/weather": typeof agents_weather;
"chat/approval": typeof chat_approval;
"chat/basic": typeof chat_basic;
"chat/human": typeof chat_human;
"chat/streamAbort": typeof chat_streamAbort;
"chat/streaming": typeof chat_streaming;
"chat/streamingDemo": typeof chat_streamingDemo;
"chat/streamingReasoning": typeof chat_streamingReasoning;
"chat/withoutAgent": typeof chat_withoutAgent;
crons: typeof crons;
Expand Down
61 changes: 61 additions & 0 deletions example/convex/agents/streamingDemo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Agent for the full streaming demo with tool approval support.
// Combines streaming patterns with approval-gated tools.
import { Agent, createTool, stepCountIs } from "@convex-dev/agent";
import { components } from "../_generated/api";
import { defaultConfig } from "./config";
import { z } from "zod/v4";

// Tool that always requires approval
const deleteFileTool = createTool({
description: "Delete a file from the system",
inputSchema: z.object({
filename: z.string().describe("The name of the file to delete"),
}),
needsApproval: () => true,
execute: async (_ctx, input) => {
return `Successfully deleted file: ${input.filename}`;
},
});

// Tool with conditional approval (requires approval for amounts > $100)
const transferMoneyTool = createTool({
description: "Transfer money to an account",
inputSchema: z.object({
amount: z.number().describe("The amount to transfer"),
toAccount: z.string().describe("The destination account"),
}),
needsApproval: async (_ctx, input) => {
return input.amount > 100;
},
execute: async (_ctx, input) => {
return `Transferred $${input.amount} to account ${input.toAccount}`;
},
});

// Tool that doesn't need approval
const checkBalanceTool = createTool({
description: "Check the account balance",
inputSchema: z.object({
accountId: z.string().describe("The account to check"),
}),
execute: async (_ctx, _input) => {
return `Balance: $1,234.56`;
},
});

export const streamingDemoAgent = new Agent(components.agent, {
name: "Streaming Demo Agent",
instructions:
"You are a concise assistant who responds with emojis " +
"and abbreviations like lmao, lol, iirc, afaik, etc. where appropriate. " +
"You can delete files, transfer money, and check account balances. " +
"Always confirm what action you took after it completes.",
tools: {
deleteFile: deleteFileTool,
transferMoney: transferMoneyTool,
checkBalance: checkBalanceTool,
},
stopWhen: stepCountIs(5),
...defaultConfig,
callSettings: { temperature: 0 },
Comment on lines +59 to +60
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

callSettings override replaces the entire object from defaultConfig, silently dropping any other settings.

If defaultConfig.callSettings contains properties beyond temperature (e.g., maxTokens), callSettings: { temperature: 0 } will silently discard them.

♻️ Proposed fix
-  ...defaultConfig,
-  callSettings: { temperature: 0 },
+  ...defaultConfig,
+  callSettings: { ...defaultConfig.callSettings, temperature: 0 },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
...defaultConfig,
callSettings: { temperature: 0 },
...defaultConfig,
callSettings: { ...defaultConfig.callSettings, temperature: 0 },
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/convex/agents/streamingDemo.ts` around lines 59 - 60, The override
for callSettings currently replaces defaultConfig.callSettings entirely; instead
merge the existing defaultConfig.callSettings with the new temperature so other
settings (e.g., maxTokens) are preserved: update the object where defaultConfig
and callSettings are combined to construct callSettings by shallow-merging
defaultConfig.callSettings with temperature set to 0 (ensure you handle the case
where defaultConfig.callSettings may be undefined), leaving other defaultConfig
properties unchanged.

});
Loading
Loading