diff --git a/src/stores/useSessionStore.ts b/src/stores/useSessionStore.ts index 60ace484f..c2c170250 100644 --- a/src/stores/useSessionStore.ts +++ b/src/stores/useSessionStore.ts @@ -103,19 +103,85 @@ function createEmptySlot(): SessionSlot { } /** - * Compute merged messages: server + realtime, deduped by id. + * Compute merged messages: server + realtime, deduped by stable message identity. * Server messages take priority (they're the persisted source of truth). * Realtime messages that aren't yet in server stay (in-flight streaming). */ function computeMerged(server: NormalizedMessage[], realtime: NormalizedMessage[]): NormalizedMessage[] { if (realtime.length === 0) return server; if (server.length === 0) return realtime; - const serverIds = new Set(server.map(m => m.id)); - const extra = realtime.filter(m => !serverIds.has(m.id)); + const extra = realtime.filter(m => !server.some(existing => isEquivalentNormalizedMessage(existing, m))); if (extra.length === 0) return server; return [...server, ...extra]; } +function isEquivalentNormalizedMessage(a: NormalizedMessage, b: NormalizedMessage): boolean { + if (a.id === b.id) return true; + + if ( + a.rowid !== undefined && + b.rowid !== undefined && + a.rowid === b.rowid && + a.kind === b.kind + ) { + return a.sequence === undefined || b.sequence === undefined || a.sequence === b.sequence; + } + + if ( + a.sequence !== undefined && + b.sequence !== undefined && + a.sequence === b.sequence && + a.kind === b.kind + ) { + return true; + } + + if (a.requestId && b.requestId && a.requestId === b.requestId && a.kind === b.kind) { + return true; + } + + if (a.toolId && b.toolId && a.toolId === b.toolId && a.kind === b.kind) { + return true; + } + + if ( + a.kind === 'session_created' && + b.kind === 'session_created' && + a.newSessionId && + a.newSessionId === b.newSessionId + ) { + return true; + } + + if (a.role === 'user' || b.role === 'user') { + return false; + } + + return ( + a.kind === b.kind && + a.timestamp === b.timestamp && + a.role === b.role && + a.content === b.content && + a.text === b.text && + a.toolName === b.toolName + ); +} + +function upsertRealtimeMessage(messages: NormalizedMessage[], msg: NormalizedMessage): NormalizedMessage[] { + const existingIndex = messages.findIndex(existing => isEquivalentNormalizedMessage(existing, msg)); + + if (existingIndex === -1) { + return [...messages, msg]; + } + + const next = [...messages]; + next[existingIndex] = { + ...messages[existingIndex], + ...msg, + }; + return next; +} + /** * Recompute slot.merged only when the input arrays have actually changed * (by reference). Returns true if merged was recomputed. @@ -273,7 +339,7 @@ export function useSessionStore() { */ const appendRealtime = useCallback((sessionId: string, msg: NormalizedMessage) => { const slot = getSlot(sessionId); - let updated = [...slot.realtimeMessages, msg]; + let updated = upsertRealtimeMessage(slot.realtimeMessages, msg); if (updated.length > MAX_REALTIME_MESSAGES) { updated = updated.slice(-MAX_REALTIME_MESSAGES); } @@ -288,7 +354,10 @@ export function useSessionStore() { const appendRealtimeBatch = useCallback((sessionId: string, msgs: NormalizedMessage[]) => { if (msgs.length === 0) return; const slot = getSlot(sessionId); - let updated = [...slot.realtimeMessages, ...msgs]; + let updated = slot.realtimeMessages; + for (const msg of msgs) { + updated = upsertRealtimeMessage(updated, msg); + } if (updated.length > MAX_REALTIME_MESSAGES) { updated = updated.slice(-MAX_REALTIME_MESSAGES); }