Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions packages/ui/src/stores/delta-buffer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import assert from "node:assert/strict"
import { afterEach, beforeEach, describe, it } from "node:test"
import { setTimeout as delay } from "node:timers/promises"

import {
clearPendingDeltasForPart,
enqueueDelta,
flushPendingDeltasForMessage,
resetDeltaBufferForTests,
setFlushCallback,
} from "./delta-buffer.ts"

type DeltaBatch = Array<{ instanceId: string; messageId: string; partId: string; field: string; delta: string }>

describe("delta buffer", () => {
beforeEach(() => {
resetDeltaBufferForTests()
})

afterEach(() => {
resetDeltaBufferForTests()
})

it("concatenates matching deltas and flushes them once", async () => {
const flushed: DeltaBatch[] = []
setFlushCallback((batch) => flushed.push(batch))

enqueueDelta("instance-1", "message-1", "part-1", "text", "hello")
enqueueDelta("instance-1", "message-1", "part-1", "text", " world")

await delay(75)

assert.equal(flushed.length, 1)
assert.deepEqual(flushed[0], [
{ instanceId: "instance-1", messageId: "message-1", partId: "part-1", field: "text", delta: "hello world" },
])
})

it("clears pending deltas for a full part update before a stale timer flush", async () => {
const flushed: DeltaBatch[] = []
setFlushCallback((batch) => flushed.push(batch))

enqueueDelta("instance-1", "message-1", "part-1", "text", "stale")
clearPendingDeltasForPart("instance-1", "message-1", "part-1")

await delay(75)

assert.deepEqual(flushed, [])
})

it("flushes pending message deltas before applying message.updated", async () => {
const timerFlushes: DeltaBatch[] = []
const applied: Array<{ instanceId: string; delta: { messageId: string; partId: string; field: string; delta: string } }> = []
setFlushCallback((batch) => timerFlushes.push(batch))

enqueueDelta("instance-1", "message-1", "part-1", "text", "before update")
flushPendingDeltasForMessage("instance-1", "message-1", (instanceId, delta) => {
applied.push({ instanceId, delta })
})

await delay(75)

assert.deepEqual(applied, [
{
instanceId: "instance-1",
delta: { messageId: "message-1", partId: "part-1", field: "text", delta: "before update" },
},
])
assert.deepEqual(timerFlushes, [])
})

it("keeps clear and flush operations isolated by instance, message, and part", async () => {
const timerFlushes: DeltaBatch[] = []
const applied: Array<{ instanceId: string; delta: { messageId: string; partId: string; field: string; delta: string } }> = []
setFlushCallback((batch) => timerFlushes.push(batch))

enqueueDelta("instance-1", "message-1", "part-1", "text", "drop")
enqueueDelta("instance-1", "message-1", "part-2", "text", "same message")
enqueueDelta("instance-1", "message-2", "part-1", "text", "other message")
enqueueDelta("instance-2", "message-1", "part-1", "text", "other instance")

clearPendingDeltasForPart("instance-1", "message-1", "part-1")
flushPendingDeltasForMessage("instance-1", "message-1", (instanceId, delta) => {
applied.push({ instanceId, delta })
})

await delay(75)

assert.deepEqual(applied, [
{
instanceId: "instance-1",
delta: { messageId: "message-1", partId: "part-2", field: "text", delta: "same message" },
},
])
assert.deepEqual(timerFlushes, [
[
{ instanceId: "instance-1", messageId: "message-2", partId: "part-1", field: "text", delta: "other message" },
{ instanceId: "instance-2", messageId: "message-1", partId: "part-1", field: "text", delta: "other instance" },
],
])
})
})
89 changes: 89 additions & 0 deletions packages/ui/src/stores/delta-buffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Delta buffer for throttling SSE message.part.delta events.
*
* Accumulates text deltas in a 50ms window to reduce UI churn from
* high-frequency streaming chunks. Provides targeted flush/clear paths
* so full part-update or message-complete events always win over stale
* buffered deltas.
*/

const DELTA_FLUSH_INTERVAL = 50

const pendingDeltas = new Map<string, { instanceId: string; messageId: string; partId: string; field: string; delta: string }>()
let deltaFlushTimer: ReturnType<typeof setTimeout> | null = null

export function enqueueDelta(instanceId: string, messageId: string, partId: string, field: string, delta: string) {
const key = `${instanceId}:${messageId}:${partId}:${field}`
const existing = pendingDeltas.get(key)
const accumulated = existing ? existing.delta + delta : delta
pendingDeltas.set(key, { instanceId, messageId, partId, field, delta: accumulated })
if (deltaFlushTimer === null) {
deltaFlushTimer = setTimeout(flushDeltas, DELTA_FLUSH_INTERVAL)
}
}

export function clearPendingDeltasForPart(instanceId: string, messageId: string, partId: string) {
const keysToDelete: string[] = []
for (const key of pendingDeltas.keys()) {
if (key.startsWith(`${instanceId}:${messageId}:${partId}:`)) {
keysToDelete.push(key)
}
}
for (const key of keysToDelete) {
pendingDeltas.delete(key)
}
}

export function flushPendingDeltasForMessage(
instanceId: string,
messageId: string,
applyDelta: (instanceId: string, delta: { messageId: string; partId: string; field: string; delta: string }) => void
): void {
const prefix = `${instanceId}:${messageId}:`
const keysToFlush: string[] = []
for (const key of pendingDeltas.keys()) {
if (key.startsWith(prefix)) {
keysToFlush.push(key)
}
}
for (const key of keysToFlush) {
const pending = pendingDeltas.get(key)
if (pending) {
pendingDeltas.delete(key)
applyDelta(instanceId, {
messageId: pending.messageId,
partId: pending.partId,
field: pending.field,
delta: pending.delta,
})
}
}
}

export function setFlushCallback(
callback: (batch: Array<{ instanceId: string; messageId: string; partId: string; field: string; delta: string }>) => void
) {
// Store callback for flushDeltas to use
flushCallback = callback
}

export function resetDeltaBufferForTests() {
pendingDeltas.clear()
if (deltaFlushTimer !== null) {
clearTimeout(deltaFlushTimer)
deltaFlushTimer = null
}
flushCallback = null
}

let flushCallback: ((batch: Array<{ instanceId: string; messageId: string; partId: string; field: string; delta: string }>) => void) | null = null

function flushDeltas() {
deltaFlushTimer = null
if (pendingDeltas.size === 0) return
const batch = Array.from(pendingDeltas.values())
pendingDeltas.clear()
if (flushCallback) {
flushCallback(batch)
}
}
29 changes: 28 additions & 1 deletion packages/ui/src/stores/session-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ import type { MessageStatus } from "./message-v2/types"

import { getLogger } from "../lib/logger"
import { requestData } from "../lib/opencode-api"
import {
enqueueDelta,
clearPendingDeltasForPart,
flushPendingDeltasForMessage,
setFlushCallback,
} from "./delta-buffer"
import {
getPermissionId,
getPermissionKind,
Expand Down Expand Up @@ -384,6 +390,12 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes
upsertMessageInfoV2(instanceId, messageInfo, { status: "streaming" })
}

// Clear any pending deltas for this part before applying the full part update.
// The part update contains the complete state from the server, so accumulated
// deltas would be stale and cause duplication if flushed later.
if (part.id) {
clearPendingDeltasForPart(instanceId, messageId, part.id)
}
applyPartUpdateV2(instanceId, { ...part, sessionID: sessionId, messageID: messageId })
handleConversationAssistantPartUpdated(instanceId, { ...part, sessionID: sessionId, messageID: messageId }, messageInfo)

Expand All @@ -402,6 +414,14 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes
const messageId = typeof info.id === "string" ? info.id : undefined
if (!sessionId || !messageId) return

// Flush any pending deltas for this message before applying the update.
// Deltas are buffered for up to 50ms; if message.updated arrives before
// the buffer flushes, the message could be marked complete/error with
// stale text mutations still pending. Flushing first preserves the
// server's event ordering: all delta content is applied, then the
// message status/metadata update runs on the complete content.
flushPendingDeltasForMessage(instanceId, messageId, applyPartDeltaV2)

const timeInfo = (info.time ?? {}) as { created?: number; updated?: number; end?: number }
const nextUpdated =
typeof timeInfo.end === "number" && timeInfo.end > 0
Expand Down Expand Up @@ -453,12 +473,19 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes
}
}

// Delta buffer callback setup
setFlushCallback((batch) => {
for (const { instanceId, messageId, partId, field, delta } of batch) {
applyPartDeltaV2(instanceId, { messageId, partId, field, delta })
}
})

function handleMessagePartDelta(instanceId: string, event: MessagePartDeltaEvent): void {
const props = event.properties
if (!props) return
const { messageID, partID, field, delta } = props
if (!messageID || !partID || !field || typeof delta !== "string") return
applyPartDeltaV2(instanceId, { messageId: messageID, partId: partID, field, delta })
enqueueDelta(instanceId, messageID, partID, field, delta)
}

function handleSessionUpdate(instanceId: string, event: EventSessionUpdated): void {
Expand Down
Loading