diff --git a/packages/ui/src/stores/delta-buffer.test.ts b/packages/ui/src/stores/delta-buffer.test.ts new file mode 100644 index 000000000..09072783e --- /dev/null +++ b/packages/ui/src/stores/delta-buffer.test.ts @@ -0,0 +1,102 @@ +import assert from "node:assert/strict" +import { afterEach, beforeEach, describe, it } from "node:test" +import { setTimeout as delay } from "node:timers/promises" + +import { + clearPendingDeltasForPart, + enqueueDelta, + flushPendingDeltasForMessage, + resetDeltaBufferForTests, + setFlushCallback, +} from "./delta-buffer.ts" + +type DeltaBatch = Array<{ instanceId: string; messageId: string; partId: string; field: string; delta: string }> + +describe("delta buffer", () => { + beforeEach(() => { + resetDeltaBufferForTests() + }) + + afterEach(() => { + resetDeltaBufferForTests() + }) + + it("concatenates matching deltas and flushes them once", async () => { + const flushed: DeltaBatch[] = [] + setFlushCallback((batch) => flushed.push(batch)) + + enqueueDelta("instance-1", "message-1", "part-1", "text", "hello") + enqueueDelta("instance-1", "message-1", "part-1", "text", " world") + + await delay(75) + + assert.equal(flushed.length, 1) + assert.deepEqual(flushed[0], [ + { instanceId: "instance-1", messageId: "message-1", partId: "part-1", field: "text", delta: "hello world" }, + ]) + }) + + it("clears pending deltas for a full part update before a stale timer flush", async () => { + const flushed: DeltaBatch[] = [] + setFlushCallback((batch) => flushed.push(batch)) + + enqueueDelta("instance-1", "message-1", "part-1", "text", "stale") + clearPendingDeltasForPart("instance-1", "message-1", "part-1") + + await delay(75) + + assert.deepEqual(flushed, []) + }) + + it("flushes pending message deltas before applying message.updated", async () => { + const timerFlushes: DeltaBatch[] = [] + const applied: Array<{ instanceId: string; delta: { messageId: string; partId: string; field: string; delta: string } }> = [] + setFlushCallback((batch) => timerFlushes.push(batch)) + + enqueueDelta("instance-1", "message-1", "part-1", "text", "before update") + flushPendingDeltasForMessage("instance-1", "message-1", (instanceId, delta) => { + applied.push({ instanceId, delta }) + }) + + await delay(75) + + assert.deepEqual(applied, [ + { + instanceId: "instance-1", + delta: { messageId: "message-1", partId: "part-1", field: "text", delta: "before update" }, + }, + ]) + assert.deepEqual(timerFlushes, []) + }) + + it("keeps clear and flush operations isolated by instance, message, and part", async () => { + const timerFlushes: DeltaBatch[] = [] + const applied: Array<{ instanceId: string; delta: { messageId: string; partId: string; field: string; delta: string } }> = [] + setFlushCallback((batch) => timerFlushes.push(batch)) + + enqueueDelta("instance-1", "message-1", "part-1", "text", "drop") + enqueueDelta("instance-1", "message-1", "part-2", "text", "same message") + enqueueDelta("instance-1", "message-2", "part-1", "text", "other message") + enqueueDelta("instance-2", "message-1", "part-1", "text", "other instance") + + clearPendingDeltasForPart("instance-1", "message-1", "part-1") + flushPendingDeltasForMessage("instance-1", "message-1", (instanceId, delta) => { + applied.push({ instanceId, delta }) + }) + + await delay(75) + + assert.deepEqual(applied, [ + { + instanceId: "instance-1", + delta: { messageId: "message-1", partId: "part-2", field: "text", delta: "same message" }, + }, + ]) + assert.deepEqual(timerFlushes, [ + [ + { instanceId: "instance-1", messageId: "message-2", partId: "part-1", field: "text", delta: "other message" }, + { instanceId: "instance-2", messageId: "message-1", partId: "part-1", field: "text", delta: "other instance" }, + ], + ]) + }) +}) diff --git a/packages/ui/src/stores/delta-buffer.ts b/packages/ui/src/stores/delta-buffer.ts new file mode 100644 index 000000000..3307257a4 --- /dev/null +++ b/packages/ui/src/stores/delta-buffer.ts @@ -0,0 +1,89 @@ +/** + * Delta buffer for throttling SSE message.part.delta events. + * + * Accumulates text deltas in a 50ms window to reduce UI churn from + * high-frequency streaming chunks. Provides targeted flush/clear paths + * so full part-update or message-complete events always win over stale + * buffered deltas. + */ + +const DELTA_FLUSH_INTERVAL = 50 + +const pendingDeltas = new Map() +let deltaFlushTimer: ReturnType | null = null + +export function enqueueDelta(instanceId: string, messageId: string, partId: string, field: string, delta: string) { + const key = `${instanceId}:${messageId}:${partId}:${field}` + const existing = pendingDeltas.get(key) + const accumulated = existing ? existing.delta + delta : delta + pendingDeltas.set(key, { instanceId, messageId, partId, field, delta: accumulated }) + if (deltaFlushTimer === null) { + deltaFlushTimer = setTimeout(flushDeltas, DELTA_FLUSH_INTERVAL) + } +} + +export function clearPendingDeltasForPart(instanceId: string, messageId: string, partId: string) { + const keysToDelete: string[] = [] + for (const key of pendingDeltas.keys()) { + if (key.startsWith(`${instanceId}:${messageId}:${partId}:`)) { + keysToDelete.push(key) + } + } + for (const key of keysToDelete) { + pendingDeltas.delete(key) + } +} + +export function flushPendingDeltasForMessage( + instanceId: string, + messageId: string, + applyDelta: (instanceId: string, delta: { messageId: string; partId: string; field: string; delta: string }) => void +): void { + const prefix = `${instanceId}:${messageId}:` + const keysToFlush: string[] = [] + for (const key of pendingDeltas.keys()) { + if (key.startsWith(prefix)) { + keysToFlush.push(key) + } + } + for (const key of keysToFlush) { + const pending = pendingDeltas.get(key) + if (pending) { + pendingDeltas.delete(key) + applyDelta(instanceId, { + messageId: pending.messageId, + partId: pending.partId, + field: pending.field, + delta: pending.delta, + }) + } + } +} + +export function setFlushCallback( + callback: (batch: Array<{ instanceId: string; messageId: string; partId: string; field: string; delta: string }>) => void +) { + // Store callback for flushDeltas to use + flushCallback = callback +} + +export function resetDeltaBufferForTests() { + pendingDeltas.clear() + if (deltaFlushTimer !== null) { + clearTimeout(deltaFlushTimer) + deltaFlushTimer = null + } + flushCallback = null +} + +let flushCallback: ((batch: Array<{ instanceId: string; messageId: string; partId: string; field: string; delta: string }>) => void) | null = null + +function flushDeltas() { + deltaFlushTimer = null + if (pendingDeltas.size === 0) return + const batch = Array.from(pendingDeltas.values()) + pendingDeltas.clear() + if (flushCallback) { + flushCallback(batch) + } +} diff --git a/packages/ui/src/stores/session-events.ts b/packages/ui/src/stores/session-events.ts index 90430b1f5..ce3632d0b 100644 --- a/packages/ui/src/stores/session-events.ts +++ b/packages/ui/src/stores/session-events.ts @@ -17,6 +17,12 @@ import type { MessageStatus } from "./message-v2/types" import { getLogger } from "../lib/logger" import { requestData } from "../lib/opencode-api" +import { + enqueueDelta, + clearPendingDeltasForPart, + flushPendingDeltasForMessage, + setFlushCallback, +} from "./delta-buffer" import { getPermissionId, getPermissionKind, @@ -384,6 +390,12 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes upsertMessageInfoV2(instanceId, messageInfo, { status: "streaming" }) } + // Clear any pending deltas for this part before applying the full part update. + // The part update contains the complete state from the server, so accumulated + // deltas would be stale and cause duplication if flushed later. + if (part.id) { + clearPendingDeltasForPart(instanceId, messageId, part.id) + } applyPartUpdateV2(instanceId, { ...part, sessionID: sessionId, messageID: messageId }) handleConversationAssistantPartUpdated(instanceId, { ...part, sessionID: sessionId, messageID: messageId }, messageInfo) @@ -402,6 +414,14 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes const messageId = typeof info.id === "string" ? info.id : undefined if (!sessionId || !messageId) return + // Flush any pending deltas for this message before applying the update. + // Deltas are buffered for up to 50ms; if message.updated arrives before + // the buffer flushes, the message could be marked complete/error with + // stale text mutations still pending. Flushing first preserves the + // server's event ordering: all delta content is applied, then the + // message status/metadata update runs on the complete content. + flushPendingDeltasForMessage(instanceId, messageId, applyPartDeltaV2) + const timeInfo = (info.time ?? {}) as { created?: number; updated?: number; end?: number } const nextUpdated = typeof timeInfo.end === "number" && timeInfo.end > 0 @@ -453,12 +473,19 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes } } +// Delta buffer callback setup +setFlushCallback((batch) => { + for (const { instanceId, messageId, partId, field, delta } of batch) { + applyPartDeltaV2(instanceId, { messageId, partId, field, delta }) + } +}) + function handleMessagePartDelta(instanceId: string, event: MessagePartDeltaEvent): void { const props = event.properties if (!props) return const { messageID, partID, field, delta } = props if (!messageID || !partID || !field || typeof delta !== "string") return - applyPartDeltaV2(instanceId, { messageId: messageID, partId: partID, field, delta }) + enqueueDelta(instanceId, messageID, partID, field, delta) } function handleSessionUpdate(instanceId: string, event: EventSessionUpdated): void {