Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/client/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ export async function startGeneration<
| { step: StepResult<TOOLS> }
| { object: GenerateObjectResult<unknown> },
createPendingMessage?: boolean,
finishStreamId?: string,
) => Promise<void>;
fail: (reason: string) => Promise<void>;
getSavedMessages: () => MessageDoc[];
Expand Down Expand Up @@ -228,6 +229,11 @@ export async function startGeneration<
| { step: StepResult<TOOLS> }
| { object: GenerateObjectResult<unknown> },
createPendingMessage?: boolean,
/**
* If provided, finish this stream atomically with the message save.
* This prevents UI flickering from separate mutations (issue #181).
*/
finishStreamId?: string,
) => {
if (threadId && saveMessages !== "none") {
const serialized =
Expand Down Expand Up @@ -265,6 +271,7 @@ export async function startGeneration<
messages: serialized.messages,
embeddings,
failPendingSteps: false,
finishStreamId,
});
const lastMessage = saved.messages.at(-1)!;
if (createPendingMessage) {
Expand Down
18 changes: 17 additions & 1 deletion src/client/streamText.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ export async function streamText<

const steps: StepResult<TOOLS>[] = [];

// Track the final step for atomic save with stream finish (issue #181)
let pendingFinalStep: StepResult<TOOLS> | undefined;

const streamer =
threadId && options.saveStreamDeltas
? new DeltaStreamer(
Expand Down Expand Up @@ -138,7 +141,14 @@ export async function streamText<
onStepFinish: async (step) => {
steps.push(step);
const createPendingMessage = await willContinue(steps, args.stopWhen);
await call.save({ step }, createPendingMessage);
if (!createPendingMessage && streamer) {
// This is the final step with streaming enabled.
// Defer saving until stream consumption completes for atomic finish (issue #181).
streamer.markFinishedExternally();
pendingFinalStep = step;
} else {
await call.save({ step }, createPendingMessage);
}
return args.onStepFinish?.(step);
},
}) as StreamTextResult<TOOLS, OUTPUT>;
Expand All @@ -153,6 +163,12 @@ export async function streamText<
await stream;
await result.consumeStream();
}

// If we deferred the final step save, do it now with atomic stream finish.
if (pendingFinalStep && streamer) {
const finishStreamId = await streamer.getOrCreateStreamId();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why the streamId wouldn't exist yet - but i guess better than streamer.streamId!?

await call.save({ step: pendingFinalStep }, false, finishStreamId);
}
const metadata: GenerationOutputMetadata = {
promptMessageId,
order,
Expand Down
44 changes: 33 additions & 11 deletions src/client/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ export class DeltaStreamer<T> {
#ongoingWrite: Promise<void> | undefined;
#cursor: number = 0;
public abortController: AbortController;
// When true, the stream will be finished externally (e.g., atomically via addMessages)
// and consumeStream should skip calling finish().
#finishedExternally: boolean = false;

constructor(
public readonly component: AgentComponent,
Expand Down Expand Up @@ -259,17 +262,16 @@ export class DeltaStreamer<T> {
// Avoid race conditions by only creating once
#creatingStreamIdPromise: Promise<string> | undefined;
public async getStreamId() {
if (this.streamId) {
return this.streamId;
}
if (this.#creatingStreamIdPromise) {
return this.#creatingStreamIdPromise;
if (!this.streamId) {
if (!this.#creatingStreamIdPromise) {
this.#creatingStreamIdPromise = this.ctx.runMutation(
this.component.streams.create,
this.metadata,
);
}
this.streamId = await this.#creatingStreamIdPromise;
}
this.#creatingStreamIdPromise = this.ctx.runMutation(
this.component.streams.create,
this.metadata,
);
this.streamId = await this.#creatingStreamIdPromise;
return this.streamId;
}

public async addParts(parts: T[]) {
Expand All @@ -290,7 +292,27 @@ export class DeltaStreamer<T> {
for await (const chunk of stream) {
await this.addParts([chunk]);
}
await this.finish();
// Skip finish if it will be handled externally (atomically with message save)
if (!this.#finishedExternally) {
await this.finish();
}
}

/**
* Mark the stream as being finished externally (e.g., atomically via addMessages).
* When called, consumeStream() will skip calling finish() since it will be
* handled elsewhere in the same mutation as message saving.
*/
public markFinishedExternally(): void {
this.#finishedExternally = true;
}

/**
* Get the stream ID, waiting for it to be created if necessary.
* Useful for passing to addMessages for atomic finish.
*/
public async getOrCreateStreamId(): Promise<string> {
return this.getStreamId();
}

async #sendDelta() {
Expand Down
1 change: 1 addition & 0 deletions src/component/_generated/component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ export type ComponentApi<Name extends string | undefined = string | undefined> =
vectors: Array<Array<number> | null>;
};
failPendingSteps?: boolean;
finishStreamId?: string;
hideFromUserIdSearch?: boolean;
messages: Array<{
error?: string;
Expand Down
11 changes: 10 additions & 1 deletion src/component/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
vVectorId,
} from "./vector/tables.js";
import { changeRefcount } from "./files.js";
import { getStreamingMessagesWithMetadata } from "./streams.js";
import { getStreamingMessagesWithMetadata, finishHandler } from "./streams.js";
import { partial } from "convex-helpers/validators";

function publicMessage(message: Doc<"messages">): MessageDoc {
Expand Down Expand Up @@ -141,6 +141,9 @@
// if set to true, these messages will not show up in text or vector search
// results for the userId
hideFromUserIdSearch: v.optional(v.boolean()),
// If provided, finish this stream atomically with the message save.
// This prevents UI flickering from separate mutations (issue #181).
finishStreamId: v.optional(v.id("streamingMessages")),
};
export const addMessages = mutation({
args: addMessagesArgs,
Expand All @@ -161,6 +164,7 @@
const {
embeddings,
failPendingSteps,
finishStreamId,

Check warning on line 167 in src/component/messages.ts

View workflow job for this annotation

GitHub Actions / Test and lint

'finishStreamId' is assigned a value but never used. Allowed unused vars must match /^_/u
messages,
promptMessageId,
pendingMessageId,
Expand Down Expand Up @@ -303,6 +307,11 @@
// TODO: delete the associated stream data for the order/stepOrder
toReturn.push((await ctx.db.get(messageId))!);
}
// Atomically finish the stream if requested, preventing UI flickering
// from separate mutations for message save and stream finish (issue #181).
if (args.finishStreamId) {
await finishHandler(ctx, { streamId: args.finishStreamId });
Comment on lines +312 to +313
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (args.finishStreamId) {
await finishHandler(ctx, { streamId: args.finishStreamId });
if (finishStreamId) {
await finishHandler(ctx, { streamId: finishStreamId });

}
return { messages: toReturn.map(publicMessage) };
}

Expand Down
Loading