Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 51 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,19 @@ export const createChat = mutation({
export const getChatBody = query({
args: {
streamId: StreamIdValidator,
listItems: v.optional(v.boolean()), // Return chunks as textList array
},
handler: async (ctx, args) => {
return await persistentTextStreaming.getStreamBody(
ctx,
args.streamId as StreamId,
args.listItems,
);
},
});

// Create an HTTP action that generates chunks of the chat body
// and uses the component to stream them to the client and save them to the database.
// Create an HTTP action that generates chunks and uses the component to stream
// them to the client and persist to the database.
export const streamChat = httpAction(async (ctx, request) => {
const body = (await request.json()) as { streamId: string };
const generateChat = async (ctx, request, streamId, chunkAppender) => {
Expand Down Expand Up @@ -154,6 +156,53 @@ const { text, status } = useStream(
);
```

## Non-text (object/JSON) streaming

For structured or JSON streaming:

1. **Chunk appender**: Pass `true` as the second argument when a chunk is a complete object/JSON so each chunk is persisted immediately (no sentence-delimiter batching).
2. **List items**: Pass `listItems: true` to your body query so `getStreamBody` returns `textList` (array of chunks). Consume `textList` on the client.

**Backend** — stream JSON lines and return them as a list:

```ts
// In your HTTP action (e.g. streamItems):
const streamItems = httpAction(async (ctx, request) => {
const body = (await request.json()) as { streamId: string };
const generate = async (ctx, request, streamId, chunkAppender) => {
const items = [{ id: 1, name: "a" }, { id: 2, name: "b" }];
for (const item of items) {
await chunkAppender(JSON.stringify(item), true); // true = persist each chunk now
}
};
return await persistentTextStreaming.stream(ctx, request, body.streamId as StreamId, generate);
});

// Query with listItems so body has textList
export const getItemList = query({
args: { streamId: StreamIdValidator, listItems: v.optional(v.boolean()) },
handler: async (ctx, args) => {
return await persistentTextStreaming.getStreamBody(
ctx,
args.streamId as StreamId,
args.listItems ?? true, // return chunks as textList for structured data
);
},
});
```

**Frontend** — subscribe and parse each chunk as JSON:

```ts
const { textList, status } = useStream(
api.chat.getItemList,
new URL(`${convexSiteUrl}/chat-stream`),
driven,
streamId,
);
const items = textList.map((line) => JSON.parse(line) as { id: number; name: string });
```

## Design Philosophy

This component balances HTTP streaming with database persistence to try to
Expand Down
7 changes: 5 additions & 2 deletions example/convex/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ export const sendMessage = mutation({
});

export const getHistory = internalQuery({
args: {},
handler: async (ctx) => {
args: {
listItems: v.optional(v.boolean()),
},
handler: async (ctx, { listItems}) => {
// Grab all the user messages
const allMessages = await ctx.db.query("userMessages").collect();

Expand All @@ -46,6 +48,7 @@ export const getHistory = internalQuery({
responseMessage: await streamingComponent.getStreamBody(
ctx,
userMessage.responseStreamId as StreamId,
listItems,
),
};
}),
Expand Down
7 changes: 5 additions & 2 deletions example/convex/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
} from "@convex-dev/persistent-text-streaming";
import { components } from "./_generated/api";
import { query } from "./_generated/server";
import { v } from "convex/values";

export const streamingComponent = new PersistentTextStreaming(
components.persistentTextStreaming,
Expand All @@ -13,11 +14,13 @@ export const streamingComponent = new PersistentTextStreaming(
export const getStreamBody = query({
args: {
streamId: StreamIdValidator,
listItems: v.optional(v.boolean()),
},
handler: async (ctx, args) => {
handler: async (ctx, { streamId, listItems }) => {
return await streamingComponent.getStreamBody(
ctx,
args.streamId as StreamId,
streamId as StreamId,
listItems,
);
},
});
27 changes: 18 additions & 9 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ export type StreamId = string & { __isStreamId: true };
export const StreamIdValidator = v.string();
export type StreamBody = {
text: string;
textList: string[];
status: StreamStatus;
};

export type ChunkAppender = (text: string) => Promise<void>;
export type ChunkAppender = (text: string, isObjectOrJson?: boolean) => Promise<void>;
export type StreamWriter<A extends GenericActionCtx<GenericDataModel>> = (
ctx: A,
request: Request,
Expand Down Expand Up @@ -70,18 +71,19 @@ export class PersistentTextStreaming {
* @example
* ```ts
* const streaming = new PersistentTextStreaming(api);
* const { text, status } = await streaming.getStreamBody(ctx, streamId);
* const { text, textList, status } = await streaming.getStreamBody(ctx, streamId);
* ```
*/
async getStreamBody(
ctx: RunQueryCtx,
streamId: StreamId,
listItems?: boolean,
): Promise<StreamBody> {
const { text, status } = await ctx.runQuery(
const { text, status, textList } = await ctx.runQuery(
this.component.lib.getStreamText,
{ streamId },
{ streamId, listItems },
);
return { text, status: status as StreamStatus };
return { text, textList, status: status as StreamStatus };
}

/**
Expand Down Expand Up @@ -110,6 +112,7 @@ export class PersistentTextStreaming {
request: Request,
streamId: StreamId,
streamWriter: StreamWriter<A>,

) {
const streamState = await ctx.runQuery(this.component.lib.getStreamStatus, {
streamId,
Expand All @@ -128,7 +131,7 @@ export class PersistentTextStreaming {
let pending = "";

const doStream = async () => {
const chunkAppender: ChunkAppender = async (text) => {
const chunkAppender: ChunkAppender = async (text, isObjectOrJson) => {
// write to this handler's response stream on every update
if (writer) {
try {
Expand All @@ -143,7 +146,10 @@ export class PersistentTextStreaming {
}
pending += text;
// write to the database periodically, like at the end of sentences
if (hasDelimeter(text)) {
// When isObjectOrJson is true, the response is not a text, we do not
// need to do a delimiter check as the result may be inaccurate.
// In this case, we will add the chunk to the database, short circuiting the delimiter check.
if (isObjectOrJson || hasDelimeter(text)) {
await this.addChunk(ctx, streamId, pending, false);
pending = "";
}
Expand All @@ -158,8 +164,11 @@ export class PersistentTextStreaming {
throw e;
}

// Success? Flush any last updates
await this.addChunk(ctx, streamId, pending, true);
// Do not add an empty chunk to the database.
if (pending) {
// Success? Flush any last updates
await this.addChunk(ctx, streamId, pending, true);
}
Comment on lines +167 to +171
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Always finalize the stream when pending is empty.

If the last chunkAppender call already flushed pending—which is the normal path for chunkAppender(..., true) and any chunk ending in punctuation—this block is skipped and the stream stays "streaming" forever. That leaves subscribers with the wrong status and breaks the new structured-streaming flow.

🧩 Proposed fix
-      // Do not add an empty chunk to the database.
-      if (pending) {
-        // Success? Flush any last updates
-        await this.addChunk(ctx, streamId, pending, true);
-      }
+      if (pending) {
+        // Success? Flush any last updates.
+        await this.addChunk(ctx, streamId, pending, true);
+      } else {
+        await this.setStreamStatus(ctx, streamId, "done");
+      }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Do not add an empty chunk to the database.
if (pending) {
// Success? Flush any last updates
await this.addChunk(ctx, streamId, pending, true);
}
if (pending) {
// Success? Flush any last updates.
await this.addChunk(ctx, streamId, pending, true);
} else {
await this.setStreamStatus(ctx, streamId, "done");
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/index.ts` around lines 167 - 171, The stream finalization branch
only calls addChunk when pending is truthy, leaving streams stuck as "streaming"
if the last chunkAppender already flushed pending; remove the conditional and
always finalize the stream by invoking addChunk(ctx, streamId, pending, true)
(or call the dedicated finalize helper) so the stream status transitions to
final even when pending is empty—ensure addChunk (or the finalize helper)
tolerates an empty pending payload.


if (writer) {
await writer.close();
Expand Down
7 changes: 7 additions & 0 deletions src/component/lib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,11 @@ export const getStreamStatus = query({
export const getStreamText = query({
args: {
streamId: v.id("streams"),
listItems: v.optional(v.boolean()),
},
returns: v.object({
text: v.string(),
textList: v.array(v.string()),
status: streamStatusValidator,
}),
handler: async (ctx, args) => {
Expand All @@ -105,15 +107,20 @@ export const getStreamText = query({
throw new Error("Stream not found");
}
let text = "";
let textList: string[] = [];
if (stream.status !== "pending") {
const chunks = await ctx.db
.query("chunks")
.withIndex("byStream", (q) => q.eq("streamId", args.streamId))
.collect();
text = chunks.map((chunk) => chunk.text).join("");
if (args.listItems) {
textList = chunks.map((chunk) => chunk.text);
}
}
return {
text,
textList,
status: stream.status,
};
},
Expand Down
7 changes: 6 additions & 1 deletion src/react/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ export function useStream(
authToken?: string | null;
// If provided, these will be passed as additional headers.
headers?: Record<string, string>;
// Return the stream as an array / list of items.
listItems?: boolean; // Default is false.
},
) {
const [streamEnded, setStreamEnded] = useState(null as boolean | null);
Expand All @@ -64,6 +66,7 @@ export function useStream(
usePersistence && streamId ? { streamId } : "skip",
);
const [streamBody, setStreamBody] = useState<string>("");
const [streamBodyAsList, setStreamBodyAsList] = useState<string[]>([]);

useEffect(() => {
if (driven && streamId && !streamStarted.current) {
Expand All @@ -74,6 +77,7 @@ export function useStream(
streamId,
(text) => {
setStreamBody((prev) => prev + text);
setStreamBodyAsList((prev) => [...prev, text]);
},
{
...opts?.headers,
Expand Down Expand Up @@ -116,9 +120,10 @@ export function useStream(
}
return {
text: streamBody,
textList: streamBodyAsList,
status: status as StreamStatus,
};
}, [persistentBody, streamBody, streamEnded]);
}, [persistentBody, streamBody, streamBodyAsList, streamEnded]);

return body;
}
Expand Down