From 801ad8dcd3b692cc46ce54519e2c3e819a5bacb2 Mon Sep 17 00:00:00 2001 From: Paschal Ezeugwu Date: Tue, 17 Mar 2026 14:50:41 +0100 Subject: [PATCH] return text as a chunk list --- README.md | 53 +++++++++++++++++++++++++++++++++++-- example/convex/messages.ts | 7 +++-- example/convex/streaming.ts | 7 +++-- src/client/index.ts | 27 ++++++++++++------- src/component/lib.ts | 7 +++++ src/react/index.ts | 7 ++++- 6 files changed, 92 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index a1d42a6..3f2513a 100644 --- a/README.md +++ b/README.md @@ -84,17 +84,19 @@ export const createChat = mutation({ export const getChatBody = query({ args: { streamId: StreamIdValidator, + listItems: v.optional(v.boolean()), // Return chunks as textList array }, handler: async (ctx, args) => { return await persistentTextStreaming.getStreamBody( ctx, args.streamId as StreamId, + args.listItems, ); }, }); -// Create an HTTP action that generates chunks of the chat body -// and uses the component to stream them to the client and save them to the database. +// Create an HTTP action that generates chunks and uses the component to stream +// them to the client and persist to the database. export const streamChat = httpAction(async (ctx, request) => { const body = (await request.json()) as { streamId: string }; const generateChat = async (ctx, request, streamId, chunkAppender) => { @@ -154,6 +156,53 @@ const { text, status } = useStream( ); ``` +## Non-text (object/JSON) streaming + +For structured or JSON streaming: + +1. **Chunk appender**: Pass `true` as the second argument when a chunk is a complete object/JSON so each chunk is persisted immediately (no sentence-delimiter batching). +2. **List items**: Pass `listItems: true` to your body query so `getStreamBody` returns `textList` (array of chunks). Consume `textList` on the client. + +**Backend** — stream JSON lines and return them as a list: + +```ts +// In your HTTP action (e.g. streamItems): +const streamItems = httpAction(async (ctx, request) => { + const body = (await request.json()) as { streamId: string }; + const generate = async (ctx, request, streamId, chunkAppender) => { + const items = [{ id: 1, name: "a" }, { id: 2, name: "b" }]; + for (const item of items) { + await chunkAppender(JSON.stringify(item), true); // true = persist each chunk now + } + }; + return await persistentTextStreaming.stream(ctx, request, body.streamId as StreamId, generate); +}); + +// Query with listItems so body has textList +export const getItemList = query({ + args: { streamId: StreamIdValidator, listItems: v.optional(v.boolean()) }, + handler: async (ctx, args) => { + return await persistentTextStreaming.getStreamBody( + ctx, + args.streamId as StreamId, + args.listItems ?? true, // return chunks as textList for structured data + ); + }, +}); +``` + +**Frontend** — subscribe and parse each chunk as JSON: + +```ts +const { textList, status } = useStream( + api.chat.getItemList, + new URL(`${convexSiteUrl}/chat-stream`), + driven, + streamId, +); +const items = textList.map((line) => JSON.parse(line) as { id: number; name: string }); +``` + ## Design Philosophy This component balances HTTP streaming with database persistence to try to diff --git a/example/convex/messages.ts b/example/convex/messages.ts index 1694d38..7ebb587 100644 --- a/example/convex/messages.ts +++ b/example/convex/messages.ts @@ -33,8 +33,10 @@ export const sendMessage = mutation({ }); export const getHistory = internalQuery({ - args: {}, - handler: async (ctx) => { + args: { + listItems: v.optional(v.boolean()), + }, + handler: async (ctx, { listItems}) => { // Grab all the user messages const allMessages = await ctx.db.query("userMessages").collect(); @@ -46,6 +48,7 @@ export const getHistory = internalQuery({ responseMessage: await streamingComponent.getStreamBody( ctx, userMessage.responseStreamId as StreamId, + listItems, ), }; }), diff --git a/example/convex/streaming.ts b/example/convex/streaming.ts index 915b4ed..c2cc0d4 100644 --- a/example/convex/streaming.ts +++ b/example/convex/streaming.ts @@ -5,6 +5,7 @@ import { } from "@convex-dev/persistent-text-streaming"; import { components } from "./_generated/api"; import { query } from "./_generated/server"; +import { v } from "convex/values"; export const streamingComponent = new PersistentTextStreaming( components.persistentTextStreaming, @@ -13,11 +14,13 @@ export const streamingComponent = new PersistentTextStreaming( export const getStreamBody = query({ args: { streamId: StreamIdValidator, + listItems: v.optional(v.boolean()), }, - handler: async (ctx, args) => { + handler: async (ctx, { streamId, listItems }) => { return await streamingComponent.getStreamBody( ctx, - args.streamId as StreamId, + streamId as StreamId, + listItems, ); }, }); diff --git a/src/client/index.ts b/src/client/index.ts index 2140955..b8b0812 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -14,10 +14,11 @@ export type StreamId = string & { __isStreamId: true }; export const StreamIdValidator = v.string(); export type StreamBody = { text: string; + textList: string[]; status: StreamStatus; }; -export type ChunkAppender = (text: string) => Promise; +export type ChunkAppender = (text: string, isObjectOrJson?: boolean) => Promise; export type StreamWriter> = ( ctx: A, request: Request, @@ -70,18 +71,19 @@ export class PersistentTextStreaming { * @example * ```ts * const streaming = new PersistentTextStreaming(api); - * const { text, status } = await streaming.getStreamBody(ctx, streamId); + * const { text, textList, status } = await streaming.getStreamBody(ctx, streamId); * ``` */ async getStreamBody( ctx: RunQueryCtx, streamId: StreamId, + listItems?: boolean, ): Promise { - const { text, status } = await ctx.runQuery( + const { text, status, textList } = await ctx.runQuery( this.component.lib.getStreamText, - { streamId }, + { streamId, listItems }, ); - return { text, status: status as StreamStatus }; + return { text, textList, status: status as StreamStatus }; } /** @@ -110,6 +112,7 @@ export class PersistentTextStreaming { request: Request, streamId: StreamId, streamWriter: StreamWriter, + ) { const streamState = await ctx.runQuery(this.component.lib.getStreamStatus, { streamId, @@ -128,7 +131,7 @@ export class PersistentTextStreaming { let pending = ""; const doStream = async () => { - const chunkAppender: ChunkAppender = async (text) => { + const chunkAppender: ChunkAppender = async (text, isObjectOrJson) => { // write to this handler's response stream on every update if (writer) { try { @@ -143,7 +146,10 @@ export class PersistentTextStreaming { } pending += text; // write to the database periodically, like at the end of sentences - if (hasDelimeter(text)) { + // When isObjectOrJson is true, the response is not a text, we do not + // need to do a delimiter check as the result may be inaccurate. + // In this case, we will add the chunk to the database, short circuiting the delimiter check. + if (isObjectOrJson || hasDelimeter(text)) { await this.addChunk(ctx, streamId, pending, false); pending = ""; } @@ -158,8 +164,11 @@ export class PersistentTextStreaming { throw e; } - // Success? Flush any last updates - await this.addChunk(ctx, streamId, pending, true); + // Do not add an empty chunk to the database. + if (pending) { + // Success? Flush any last updates + await this.addChunk(ctx, streamId, pending, true); + } if (writer) { await writer.close(); diff --git a/src/component/lib.ts b/src/component/lib.ts index cc01906..c73c3a8 100644 --- a/src/component/lib.ts +++ b/src/component/lib.ts @@ -94,9 +94,11 @@ export const getStreamStatus = query({ export const getStreamText = query({ args: { streamId: v.id("streams"), + listItems: v.optional(v.boolean()), }, returns: v.object({ text: v.string(), + textList: v.array(v.string()), status: streamStatusValidator, }), handler: async (ctx, args) => { @@ -105,15 +107,20 @@ export const getStreamText = query({ throw new Error("Stream not found"); } let text = ""; + let textList: string[] = []; if (stream.status !== "pending") { const chunks = await ctx.db .query("chunks") .withIndex("byStream", (q) => q.eq("streamId", args.streamId)) .collect(); text = chunks.map((chunk) => chunk.text).join(""); + if (args.listItems) { + textList = chunks.map((chunk) => chunk.text); + } } return { text, + textList, status: stream.status, }; }, diff --git a/src/react/index.ts b/src/react/index.ts index ef82b0e..7e613d3 100644 --- a/src/react/index.ts +++ b/src/react/index.ts @@ -39,6 +39,8 @@ export function useStream( authToken?: string | null; // If provided, these will be passed as additional headers. headers?: Record; + // Return the stream as an array / list of items. + listItems?: boolean; // Default is false. }, ) { const [streamEnded, setStreamEnded] = useState(null as boolean | null); @@ -64,6 +66,7 @@ export function useStream( usePersistence && streamId ? { streamId } : "skip", ); const [streamBody, setStreamBody] = useState(""); + const [streamBodyAsList, setStreamBodyAsList] = useState([]); useEffect(() => { if (driven && streamId && !streamStarted.current) { @@ -74,6 +77,7 @@ export function useStream( streamId, (text) => { setStreamBody((prev) => prev + text); + setStreamBodyAsList((prev) => [...prev, text]); }, { ...opts?.headers, @@ -116,9 +120,10 @@ export function useStream( } return { text: streamBody, + textList: streamBodyAsList, status: status as StreamStatus, }; - }, [persistentBody, streamBody, streamEnded]); + }, [persistentBody, streamBody, streamBodyAsList, streamEnded]); return body; }