diff --git a/packages/ui/src/stores/session-events.ts b/packages/ui/src/stores/session-events.ts index 90430b1f5..0df24cc6e 100644 --- a/packages/ui/src/stores/session-events.ts +++ b/packages/ui/src/stores/session-events.ts @@ -327,7 +327,7 @@ function findPendingSyntheticMessageId( if (!record) continue if (record.sessionId !== sessionId) continue if (record.role !== role) continue - if (record.status !== "sending") continue + if (record.status !== "sending" && record.status !== "sent") continue if (!record.isEphemeral) continue return record.id } @@ -384,6 +384,12 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes upsertMessageInfoV2(instanceId, messageInfo, { status: "streaming" }) } + // Clear any pending deltas for this part before applying the full part update. + // The part update contains the complete state from the server, so accumulated + // deltas would be stale and cause duplication if flushed later. + if (part.id) { + clearPendingDeltasForPart(instanceId, messageId, part.id) + } applyPartUpdateV2(instanceId, { ...part, sessionID: sessionId, messageID: messageId }) handleConversationAssistantPartUpdated(instanceId, { ...part, sessionID: sessionId, messageID: messageId }, messageInfo) @@ -402,6 +408,14 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes const messageId = typeof info.id === "string" ? info.id : undefined if (!sessionId || !messageId) return + // Flush any pending deltas for this message before applying the update. + // Deltas are buffered for up to 50ms; if message.updated arrives before + // the buffer flushes, the message could be marked complete/error with + // stale text mutations still pending. Flushing first preserves the + // server's event ordering: all delta content is applied, then the + // message status/metadata update runs on the complete content. + flushPendingDeltasForMessage(instanceId, messageId) + const timeInfo = (info.time ?? {}) as { created?: number; updated?: number; end?: number } const nextUpdated = typeof timeInfo.end === "number" && timeInfo.end > 0 @@ -453,12 +467,67 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes } } +const DELTA_FLUSH_INTERVAL = 50 + +const pendingDeltas = new Map() +let deltaFlushTimer: ReturnType | null = null + +function enqueueDelta(instanceId: string, messageId: string, partId: string, field: string, delta: string) { + const key = `${instanceId}:${messageId}:${partId}:${field}` + const existing = pendingDeltas.get(key) + const accumulated = existing ? existing.delta + delta : delta + pendingDeltas.set(key, { instanceId, messageId, partId, field, delta: accumulated }) + if (deltaFlushTimer === null) { + deltaFlushTimer = setTimeout(flushDeltas, DELTA_FLUSH_INTERVAL) + } +} + +function clearPendingDeltasForPart(instanceId: string, messageId: string, partId: string) { + const keysToDelete: string[] = [] + for (const key of pendingDeltas.keys()) { + if (key.startsWith(`${instanceId}:${messageId}:${partId}:`)) { + keysToDelete.push(key) + } + } + for (const key of keysToDelete) { + pendingDeltas.delete(key) + } +} + +function flushPendingDeltasForMessage(instanceId: string, messageId: string): void { + const prefix = `${instanceId}:${messageId}:` + for (const key of pendingDeltas.keys()) { + if (key.startsWith(prefix)) { + const pending = pendingDeltas.get(key) + if (pending) { + applyPartDeltaV2(instanceId, { + messageId: pending.messageId, + partId: pending.partId, + field: pending.field, + delta: pending.delta, + }) + pendingDeltas.delete(key) + } + } + } +} + +function flushDeltas() { + deltaFlushTimer = null + if (pendingDeltas.size === 0) return + const batch = Array.from(pendingDeltas.values()) + pendingDeltas.clear() + for (const { instanceId, messageId, partId, field, delta } of batch) { + applyPartDeltaV2(instanceId, { messageId, partId, field, delta }) + } +} + function handleMessagePartDelta(instanceId: string, event: MessagePartDeltaEvent): void { const props = event.properties if (!props) return const { messageID, partID, field, delta } = props if (!messageID || !partID || !field || typeof delta !== "string") return - applyPartDeltaV2(instanceId, { messageId: messageID, partId: partID, field, delta }) + enqueueDelta(instanceId, messageID, partID, field, delta) } function handleSessionUpdate(instanceId: string, event: EventSessionUpdated): void {