Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 71 additions & 2 deletions packages/ui/src/stores/session-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ function findPendingSyntheticMessageId(
if (!record) continue
if (record.sessionId !== sessionId) continue
if (record.role !== role) continue
if (record.status !== "sending") continue
if (record.status !== "sending" && record.status !== "sent") continue
if (!record.isEphemeral) continue
return record.id
}
Expand Down Expand Up @@ -384,6 +384,12 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes
upsertMessageInfoV2(instanceId, messageInfo, { status: "streaming" })
}

// Clear any pending deltas for this part before applying the full part update.
// The part update contains the complete state from the server, so accumulated
// deltas would be stale and cause duplication if flushed later.
if (part.id) {
clearPendingDeltasForPart(instanceId, messageId, part.id)
}
applyPartUpdateV2(instanceId, { ...part, sessionID: sessionId, messageID: messageId })
handleConversationAssistantPartUpdated(instanceId, { ...part, sessionID: sessionId, messageID: messageId }, messageInfo)

Expand All @@ -402,6 +408,14 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes
const messageId = typeof info.id === "string" ? info.id : undefined
if (!sessionId || !messageId) return

// Flush any pending deltas for this message before applying the update.
// Deltas are buffered for up to 50ms; if message.updated arrives before
// the buffer flushes, the message could be marked complete/error with
// stale text mutations still pending. Flushing first preserves the
// server's event ordering: all delta content is applied, then the
// message status/metadata update runs on the complete content.
flushPendingDeltasForMessage(instanceId, messageId)

const timeInfo = (info.time ?? {}) as { created?: number; updated?: number; end?: number }
const nextUpdated =
typeof timeInfo.end === "number" && timeInfo.end > 0
Expand Down Expand Up @@ -453,12 +467,67 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes
}
}

const DELTA_FLUSH_INTERVAL = 50

const pendingDeltas = new Map<string, { instanceId: string; messageId: string; partId: string; field: string; delta: string }>()
let deltaFlushTimer: ReturnType<typeof setTimeout> | null = null
Comment thread
pascalandr marked this conversation as resolved.

function enqueueDelta(instanceId: string, messageId: string, partId: string, field: string, delta: string) {
const key = `${instanceId}:${messageId}:${partId}:${field}`
const existing = pendingDeltas.get(key)
const accumulated = existing ? existing.delta + delta : delta
pendingDeltas.set(key, { instanceId, messageId, partId, field, delta: accumulated })
if (deltaFlushTimer === null) {
deltaFlushTimer = setTimeout(flushDeltas, DELTA_FLUSH_INTERVAL)
}
}

function clearPendingDeltasForPart(instanceId: string, messageId: string, partId: string) {
const keysToDelete: string[] = []
for (const key of pendingDeltas.keys()) {
if (key.startsWith(`${instanceId}:${messageId}:${partId}:`)) {
keysToDelete.push(key)
}
}
for (const key of keysToDelete) {
pendingDeltas.delete(key)
}
}

function flushPendingDeltasForMessage(instanceId: string, messageId: string): void {
const prefix = `${instanceId}:${messageId}:`
for (const key of pendingDeltas.keys()) {
if (key.startsWith(prefix)) {
const pending = pendingDeltas.get(key)
if (pending) {
applyPartDeltaV2(instanceId, {
messageId: pending.messageId,
partId: pending.partId,
field: pending.field,
delta: pending.delta,
})
pendingDeltas.delete(key)
}
}
}
}
Comment thread
pascalandr marked this conversation as resolved.

function flushDeltas() {
deltaFlushTimer = null
if (pendingDeltas.size === 0) return
const batch = Array.from(pendingDeltas.values())
pendingDeltas.clear()
for (const { instanceId, messageId, partId, field, delta } of batch) {
applyPartDeltaV2(instanceId, { messageId, partId, field, delta })
}
}

function handleMessagePartDelta(instanceId: string, event: MessagePartDeltaEvent): void {
const props = event.properties
if (!props) return
const { messageID, partID, field, delta } = props
if (!messageID || !partID || !field || typeof delta !== "string") return
applyPartDeltaV2(instanceId, { messageId: messageID, partId: partID, field, delta })
enqueueDelta(instanceId, messageID, partID, field, delta)
}

function handleSessionUpdate(instanceId: string, event: EventSessionUpdated): void {
Expand Down
Loading