diff --git a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx index 809c190ffe..79fc042f5d 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx @@ -41,9 +41,12 @@ interface MothershipChatProps { ) => void onStopGeneration: () => void messageQueue: QueuedMessage[] + editingQueuedId: string | null + dispatchingHeadId: string | null onRemoveQueuedMessage: (id: string) => void onSendQueuedMessage: (id: string) => Promise onEditQueuedMessage: (id: string) => QueuedMessage | undefined + onCancelQueueEdit: () => void userId?: string chatId?: string onContextAdd?: (context: ChatContext) => void @@ -183,9 +186,12 @@ export function MothershipChat({ onSubmit, onStopGeneration, messageQueue, + editingQueuedId, + dispatchingHeadId, onRemoveQueuedMessage, onSendQueuedMessage, onEditQueuedMessage, + onCancelQueueEdit, userId, chatId, onContextAdd, @@ -313,9 +319,12 @@ export function MothershipChat({
void onSendNow: (id: string) => Promise onEdit: (id: string) => void + onCancelEdit: () => void } -export function QueuedMessages({ messageQueue, onRemove, onSendNow, onEdit }: QueuedMessagesProps) { +export function QueuedMessages({ + messageQueue, + editingQueuedId, + dispatchingHeadId, + onRemove, + onSendNow, + onEdit, + onCancelEdit, +}: QueuedMessagesProps) { const [isExpanded, setIsExpanded] = useState(true) const [isNarrow, setIsNarrow] = useState(false) const roRef = useRef(null) @@ -57,101 +69,138 @@ export function QueuedMessages({ messageQueue, onRemove, onSendNow, onEdit }: Qu {isExpanded && (
- {messageQueue.map((msg) => ( -
-
-
-
+ {messageQueue.map((msg) => { + const isEditing = msg.id === editingQueuedId + const isDispatching = msg.id === dispatchingHeadId + return ( +
+
+
+
-
- -
+
+ +
- {msg.fileAttachments && msg.fileAttachments.length > 0 && ( - - - {isNarrow ? ( - - {msg.fileAttachments.length} - + {msg.fileAttachments && msg.fileAttachments.length > 0 && ( + + + {isNarrow ? ( + + {msg.fileAttachments.length} + + ) : ( + <> + {msg.fileAttachments[0].filename} + {msg.fileAttachments.length > 1 && ( + + +{msg.fileAttachments.length - 1} + + )} + + )} + + )} + +
+ {isEditing ? ( + + + + + + Cancel edit + + ) : ( <> - {msg.fileAttachments[0].filename} - {msg.fileAttachments.length > 1 && ( - - +{msg.fileAttachments.length - 1} - - )} - - )} - - )} - -
- - - - - - Edit queued message - - + + + + + + {isDispatching ? 'Sending now' : 'Edit queued message'} + + - - - - - - Send now - - + + + + + + Send now + + - - - - - - Remove from queue - - + + + + + + Remove from queue + + + + )} +
-
- ))} + ) + })}
)}
diff --git a/apps/sim/app/workspace/[workspaceId]/home/home.tsx b/apps/sim/app/workspace/[workspaceId]/home/home.tsx index 52a3bf4442..2b89156f90 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/home.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/home.tsx @@ -141,6 +141,9 @@ export function Home({ chatId }: HomeProps = {}) { removeFromQueue, sendNow, editQueuedMessage, + cancelQueueEdit, + editingQueuedId, + dispatchingHeadId, previewSession, genericResourceData, getCurrentRequestId, @@ -349,9 +352,12 @@ export function Home({ chatId }: HomeProps = {}) { onSubmit={handleSubmit} onStopGeneration={handleStopGeneration} messageQueue={messageQueue} + editingQueuedId={editingQueuedId} + dispatchingHeadId={dispatchingHeadId} onRemoveQueuedMessage={removeFromQueue} onSendQueuedMessage={sendNow} onEditQueuedMessage={editQueuedMessage} + onCancelQueueEdit={cancelQueueEdit} userId={session?.user?.id} chatId={resolvedChatId} onContextAdd={handleContextAdd} diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 3ec0168918..5ecc05fdc9 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -2,7 +2,7 @@ import { useCallback, useEffect, useMemo, useRef, useState } from 'react' import { createLogger } from '@sim/logger' import { getErrorMessage, toError } from '@sim/utils/errors' import { sleep } from '@sim/utils/helpers' -import { generateId } from '@sim/utils/id' +import { generateId, generateShortId } from '@sim/utils/id' import { useQueryClient } from '@tanstack/react-query' import { usePathname, useRouter } from 'next/navigation' import { requestJson } from '@/lib/api/client/request' @@ -133,6 +133,11 @@ import { workflowKeys } from '@/hooks/queries/workflows' import { workspaceFilesKeys } from '@/hooks/queries/workspace-files' import { useExecutionStream } from '@/hooks/use-execution-stream' import { useExecutionStore } from '@/stores/execution/store' +import { useMothershipQueueStore } from '@/stores/mothership-queue/store' +import type { + QueuedMothershipMessage, + QueuedSendHandoffSeed, +} from '@/stores/mothership-queue/types' import type { ChatContext } from '@/stores/panel' import { useTerminalConsoleStore } from '@/stores/terminal' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' @@ -174,6 +179,9 @@ export interface UseChatReturn { removeFromQueue: (id: string) => void sendNow: (id: string) => Promise editQueuedMessage: (id: string) => QueuedMessage | undefined + cancelQueueEdit: () => void + editingQueuedId: string | null + dispatchingHeadId: string | null previewSession: FilePreviewSession | null genericResourceData: GenericResourceData | null getCurrentRequestId: () => string | undefined @@ -209,6 +217,10 @@ const QUEUED_SEND_HANDOFF_CLAIM_TTL_MS = 30_000 const QUEUED_SEND_HANDOFF_RETRY_BASE_MS = 1000 const QUEUED_SEND_HANDOFF_RETRY_MAX_MS = 30_000 +// Stable empty array — sharing one reference keeps the selector from +// re-rendering on unrelated store writes. +const EMPTY_MESSAGE_QUEUE: QueuedMothershipMessage[] = [] + const logger = createLogger('useChat') type StreamPayload = Record @@ -237,17 +249,6 @@ interface QueuedSendHandoffState { resolveAttempts?: number } -interface QueuedSendHandoffSeed { - id: string - chatId?: string - supersededStreamId: string | null - userMessageId?: string -} - -type QueuedChatMessage = QueuedMessage & { - queuedSendHandoff?: QueuedSendHandoffSeed -} - interface QueuedSendHandoffClaim { id: string ownerId: string @@ -1539,7 +1540,9 @@ export function useChat( queueDispatchEpochRef.current++ queueDispatchActionsRef.current = [] queuedMessageDispatchIdsRef.current.clear() + userRemovedDuringDispatchRef.current.clear() queueDispatchTaskRef.current = null + setDispatchingHeadId(null) }, []) const resourcesRef = useRef(resources) resourcesRef.current = resources @@ -1708,10 +1711,23 @@ export function useChat( [removePreviewSession, syncPreviewSessionRefs] ) - const [messageQueue, setMessageQueue] = useState([]) - const messageQueueRef = useRef([]) - messageQueueRef.current = messageQueue + // Sentinel used while no `chatId` is resolved; `adoptResolvedChatId` + // migrates this bucket onto the real chatId on first send. Rotated on + // home reset so a new pending chat starts with an empty bucket. + const pendingChatKeyRef = useRef(`pending::${generateShortId()}`) + const [chatKey, setChatKey] = useState(initialChatId ?? pendingChatKeyRef.current) + const chatKeyRef = useRef(chatKey) + chatKeyRef.current = chatKey + const messageQueue = useMothershipQueueStore( + (state) => state.queues[chatKey] ?? EMPTY_MESSAGE_QUEUE + ) + const editingQueuedId = useMothershipQueueStore((state) => state.editing[chatKey] ?? null) + const [dispatchingHeadId, setDispatchingHeadId] = useState(null) const queuedMessageDispatchIdsRef = useRef>(new Set()) + // Ids the user explicitly removed while a dispatch was in flight — used to + // suppress the dispatch's failure-restore path, which would otherwise undo + // the user's removal silently. + const userRemovedDuringDispatchRef = useRef>(new Set()) const queueDispatchActionsRef = useRef([]) const queueDispatchTaskRef = useRef | null>(null) const queueDispatchEpochRef = useRef(0) @@ -1976,7 +1992,11 @@ export function useChat( inFlightResourceAddsRef.current.clear() reorderNeededAfterFlushRef.current = false resetEphemeralPreviewState() - setMessageQueue([]) + // Editing binds to this hook's composer — release it before rotating chatKey. + useMothershipQueueStore.getState().setEditing(chatKeyRef.current, null) + pendingChatKeyRef.current = `pending::${generateShortId()}` + chatKeyRef.current = pendingChatKeyRef.current + setChatKey(pendingChatKeyRef.current) clearQueueDispatchState() }, [ cancelActiveStreamRecovery, @@ -2029,6 +2049,17 @@ export function useChat( (chatId: string, options?: { replaceHomeHistory?: boolean; invalidateList?: boolean }) => { const selectedChatId = selectedChatIdRef.current chatIdRef.current = chatId + // Migrate from the pending sentinel (not chatKeyRef — user may have + // navigated to a different chat mid-stream, and we mustn't steal it). + if (pendingChatKeyRef.current !== chatId) { + useMothershipQueueStore.getState().migrate(pendingChatKeyRef.current, chatId) + } + // Only rebind chatKey if the user is still viewing the resolved chat. + const stillViewingResolvedChat = !selectedChatId || selectedChatId === chatId + if (stillViewingResolvedChat && chatKeyRef.current !== chatId) { + chatKeyRef.current = chatId + setChatKey(chatId) + } if (!selectedChatId || selectedChatId === chatId) { setResolvedChatId(chatId) } @@ -2287,7 +2318,21 @@ export function useChat( inFlightResourceAddsRef.current.clear() reorderNeededAfterFlushRef.current = false resetEphemeralPreviewState() - setMessageQueue([]) + // Rotate the bucket key; the previous chat's queue stays in the store. + // Release editing on the chat we're leaving (composer-scoped). + if (chatKeyRef.current !== (initialChatId ?? '')) { + useMothershipQueueStore.getState().setEditing(chatKeyRef.current, null) + } + if (initialChatId) { + if (chatKeyRef.current !== initialChatId) { + chatKeyRef.current = initialChatId + setChatKey(initialChatId) + } + } else { + pendingChatKeyRef.current = `pending::${generateShortId()}` + chatKeyRef.current = pendingChatKeyRef.current + setChatKey(pendingChatKeyRef.current) + } clearQueueDispatchState() }, [ initialChatId, @@ -4409,7 +4454,8 @@ export function useChat( */ const notifyTurnEnded = useCallback( (options: { error: boolean; skipQueueDispatch?: boolean }) => { - const hasQueuedFollowUp = !options.error && messageQueueRef.current.length > 0 + const queue = useMothershipQueueStore.getState().queues[chatKeyRef.current] + const hasQueuedFollowUp = !options.error && (queue?.length ?? 0) > 0 if (!options.error) { const cid = chatIdRef.current if (cid && onStreamEndRef.current) { @@ -4429,7 +4475,7 @@ export function useChat( message: string, fileAttachments?: FileAttachmentForApi[], contexts?: ChatContext[] - ): QueuedChatMessage => { + ): QueuedMothershipMessage => { const id = generateId() const handoffChatId = selectedChatIdRef.current ?? chatIdRef.current const cachedActiveStreamId = handoffChatId @@ -4464,7 +4510,8 @@ export function useChat( const finalize = useCallback( (options?: { error?: boolean; targetChatId?: string }) => { const isError = !!options?.error - const hasQueuedFollowUp = !isError && messageQueueRef.current.length > 0 + const queue = useMothershipQueueStore.getState().queues[chatKeyRef.current] + const hasQueuedFollowUp = !isError && (queue?.length ?? 0) > 0 reconcileTerminalPreviewSessions() locallyTerminalStreamIdRef.current = streamIdRef.current ?? activeTurnRef.current?.userMessageId ?? undefined @@ -4893,19 +4940,37 @@ export function useChat( async (message: string, fileAttachments?: FileAttachmentForApi[], contexts?: ChatContext[]) => { if (!message.trim() || !workspaceId) return + const queueStore = useMothershipQueueStore.getState() + const activeChatKey = chatKeyRef.current + const editingId = queueStore.editing[activeChatKey] ?? null + + // Edit-in-place: replace at the original index. If the slot was already + // dispatched mid-edit (UI-guard race), fall through to a tail-append. + if (editingId) { + const existing = queueStore.queues[activeChatKey] ?? [] + if (existing.some((m) => m.id === editingId)) { + queueStore.replaceAt(activeChatKey, editingId, { + content: message, + fileAttachments, + contexts, + }) + queueStore.setEditing(activeChatKey, null) + // Resume dispatch if it paused on this slot. + if (!sendingRef.current && !pendingStopPromiseRef.current) { + void enqueueQueueDispatchRef.current({ type: 'send_head' }) + } + return + } + queueStore.setEditing(activeChatKey, null) + } + if (sendingRef.current) { - setMessageQueue((prev) => [ - ...prev, - createQueuedMessage(message, fileAttachments, contexts), - ]) + queueStore.enqueue(activeChatKey, createQueuedMessage(message, fileAttachments, contexts)) return } if (pendingStopPromiseRef.current) { - setMessageQueue((prev) => [ - ...prev, - createQueuedMessage(message, fileAttachments, contexts), - ]) + queueStore.enqueue(activeChatKey, createQueuedMessage(message, fileAttachments, contexts)) void enqueueQueueDispatchRef.current({ type: 'send_head' }) return } @@ -5429,7 +5494,7 @@ export function useChat( const dispatchQueuedMessage = useCallback( async ( - msg: QueuedChatMessage, + msg: QueuedMothershipMessage, options: { epoch: number pendingStop?: Promise | null @@ -5441,19 +5506,24 @@ export function useChat( } queuedMessageDispatchIdsRef.current.add(msg.id) - let originalIndex = messageQueueRef.current.findIndex((queued) => queued.id === msg.id) + const dispatchChatKey = chatKeyRef.current + const queueAtStart = + useMothershipQueueStore.getState().queues[dispatchChatKey] ?? EMPTY_MESSAGE_QUEUE + let originalIndex = queueAtStart.findIndex((queued) => queued.id === msg.id) if (originalIndex === -1) { queuedMessageDispatchIdsRef.current.delete(msg.id) return } + setDispatchingHeadId(msg.id) + let removedFromQueue = false const removeQueuedMessage = () => { if (removedFromQueue || options.epoch !== queueDispatchEpochRef.current) { return } removedFromQueue = true - setMessageQueue((prev) => prev.filter((queued) => queued.id !== msg.id)) + useMothershipQueueStore.getState().remove(dispatchChatKey, msg.id) } const restoreQueuedMessage = (handoff?: QueuedSendHandoffSeed) => { @@ -5464,26 +5534,33 @@ export function useChat( if (!removedFromQueue || options.epoch !== queueDispatchEpochRef.current) { return } - setMessageQueue((prev) => { - if (prev.some((queued) => queued.id === msg.id)) return prev - const next = [...prev] - next.splice(Math.min(originalIndex, next.length), 0, msg) - return next - }) + // If the user explicitly removed this message during dispatch, honor + // that and don't re-insert on failure. + if (userRemovedDuringDispatchRef.current.delete(msg.id)) { + return + } + useMothershipQueueStore.getState().insertAt(dispatchChatKey, originalIndex, msg) } - const activeQueuedSendHandoff = options.queuedSendHandoff ?? msg.queuedSendHandoff + let activeQueuedSendHandoff: QueuedSendHandoffSeed | undefined = + options.queuedSendHandoff ?? msg.queuedSendHandoff try { - const currentIndex = messageQueueRef.current.findIndex((queued) => queued.id === msg.id) + const queueAtSend = + useMothershipQueueStore.getState().queues[dispatchChatKey] ?? EMPTY_MESSAGE_QUEUE + const currentIndex = queueAtSend.findIndex((queued) => queued.id === msg.id) if (currentIndex === -1) { return } originalIndex = currentIndex + // Re-read live: the user may have applied an in-place edit (`replaceAt`) + // between dispatch scheduling and this send. + const liveMsg = queueAtSend[currentIndex] + activeQueuedSendHandoff = options.queuedSendHandoff ?? liveMsg.queuedSendHandoff const consumed = await startSendMessage( - msg.content, - msg.fileAttachments, - msg.contexts, + liveMsg.content, + liveMsg.fileAttachments, + liveMsg.contexts, options.pendingStop, removeQueuedMessage, activeQueuedSendHandoff @@ -5495,7 +5572,9 @@ export function useChat( } catch { restoreQueuedMessage(activeQueuedSendHandoff) } finally { + setDispatchingHeadId((current) => (current === msg.id ? null : current)) queuedMessageDispatchIdsRef.current.delete(msg.id) + userRemovedDuringDispatchRef.current.delete(msg.id) } }, [startSendMessage] @@ -5515,8 +5594,13 @@ export function useChat( continue } - const msg = messageQueueRef.current[0] + const queueState = useMothershipQueueStore.getState() + const activeChatKey = chatKeyRef.current + const msg = queueState.queues[activeChatKey]?.[0] if (!msg) continue + // Pause draining if the head is bound to the composer; dispatching now + // would race the eventual submit. The next kick on edit-resolve resumes us. + if (queueState.editing[activeChatKey] === msg.id) continue await dispatchQueuedMessage(msg, { epoch: action.epoch }) } @@ -5543,14 +5627,20 @@ export function useChat( enqueueQueueDispatchRef.current = enqueueQueueDispatch const removeFromQueue = useCallback((id: string) => { + // If the message is mid-dispatch, mark it so the dispatch's failure-restore + // path won't silently undo the user's removal. + if (queuedMessageDispatchIdsRef.current.has(id)) { + userRemovedDuringDispatchRef.current.add(id) + } clearQueuedSendHandoffState(id) clearQueuedSendHandoffClaim(id) - setMessageQueue((prev) => prev.filter((m) => m.id !== id)) + useMothershipQueueStore.getState().remove(chatKeyRef.current, id) }, []) const sendQueuedMessageImmediately = useCallback( async (id: string) => { - const msg = messageQueueRef.current.find((queued) => queued.id === id) + const queue = useMothershipQueueStore.getState().queues[chatKeyRef.current] + const msg = queue?.find((queued) => queued.id === id) if (!msg) return if (queuedMessageDispatchIdsRef.current.has(msg.id)) return @@ -5603,14 +5693,45 @@ export function useChat( ) const editQueuedMessage = useCallback((id: string): QueuedMessage | undefined => { - const msg = messageQueueRef.current.find((m) => m.id === id) + // Reject edits on a message already mid-dispatch; the slot is about to be + // dropped. UI also disables this via `dispatchingHeadId`. + if (queuedMessageDispatchIdsRef.current.has(id)) return undefined + const activeChatKey = chatKeyRef.current + const queue = useMothershipQueueStore.getState().queues[activeChatKey] ?? EMPTY_MESSAGE_QUEUE + const msg = queue.find((m) => m.id === id) if (!msg) return undefined + // Evict any sessionStorage handoff — a failed prior dispatch may have left + // a pre-edit content snapshot that the recovery effect would otherwise replay. clearQueuedSendHandoffState(id) clearQueuedSendHandoffClaim(id) - setMessageQueue((prev) => prev.filter((m) => m.id !== id)) + useMothershipQueueStore.getState().setEditing(activeChatKey, id) return msg }, []) + const cancelQueueEdit = useCallback(() => { + useMothershipQueueStore.getState().setEditing(chatKeyRef.current, null) + // Resume dispatch if it paused on this slot. + if (!sendingRef.current && !pendingStopPromiseRef.current) { + void enqueueQueueDispatchRef.current({ type: 'send_head' }) + } + }, []) + + // Resume draining when a non-empty queue rehydrates with no active stream + // (e.g. nav-back). Wait for chat history to confirm no `activeStreamId` to + // avoid racing the reconnect path; mid-stream completions go through + // `notifyTurnEnded`. Idempotent — the dispatch loop dedupes. + const chatHistoryReady = chatHistory !== undefined + const remoteActiveStreamId = chatHistory?.activeStreamId ?? null + useEffect(() => { + if (!workspaceId) return + if (messageQueue.length === 0) return + if (sendingRef.current || pendingStopPromiseRef.current) return + if (queueDispatchTaskRef.current) return + if (resolvedChatId && !chatHistoryReady) return + if (remoteActiveStreamId) return + void enqueueQueueDispatchRef.current({ type: 'send_head' }) + }, [workspaceId, messageQueue.length, resolvedChatId, chatHistoryReady, remoteActiveStreamId]) + useEffect(() => { return () => { cancelActiveStreamRecovery() @@ -5621,6 +5742,8 @@ export function useChat( abortControllerRef.current = null clearActiveTurn() sendingRef.current = false + // Release the editing slot — the composer it binds to is unmounting. + useMothershipQueueStore.getState().setEditing(chatKeyRef.current, null) } }, [ cancelActiveStreamRecovery, @@ -5647,6 +5770,9 @@ export function useChat( removeFromQueue, sendNow, editQueuedMessage, + cancelQueueEdit, + editingQueuedId, + dispatchingHeadId, previewSession, genericResourceData, getCurrentRequestId, diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx index 059db45a53..251bb733fc 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx @@ -347,6 +347,9 @@ export const Panel = memo(function Panel({ workspaceId: propWorkspaceId }: Panel removeFromQueue: copilotRemoveFromQueue, sendNow: copilotSendNow, editQueuedMessage: copilotEditQueuedMessage, + cancelQueueEdit: copilotCancelQueueEdit, + editingQueuedId: copilotEditingQueuedId, + dispatchingHeadId: copilotDispatchingHeadId, getCurrentRequestId: getCopilotCurrentRequestId, } = useChat( workspaceId, @@ -885,9 +888,12 @@ export const Panel = memo(function Panel({ workspaceId: propWorkspaceId }: Panel onSubmit={handleCopilotSubmit} onStopGeneration={handleCopilotStopGeneration} messageQueue={copilotMessageQueue} + editingQueuedId={copilotEditingQueuedId} + dispatchingHeadId={copilotDispatchingHeadId} onRemoveQueuedMessage={copilotRemoveFromQueue} onSendQueuedMessage={copilotSendNow} onEditQueuedMessage={copilotEditQueuedMessage} + onCancelQueueEdit={copilotCancelQueueEdit} userId={session?.user?.id} chatId={copilotResolvedChatId} layout='copilot-view' diff --git a/apps/sim/hooks/queries/tasks.ts b/apps/sim/hooks/queries/tasks.ts index 0aff534a77..6a35bfbf36 100644 --- a/apps/sim/hooks/queries/tasks.ts +++ b/apps/sim/hooks/queries/tasks.ts @@ -27,6 +27,7 @@ import { } from '@/lib/copilot/request/session/file-preview-session-contract' import { isStreamBatchEvent, type StreamBatchEvent } from '@/lib/copilot/request/session/types' import { type MothershipResource, MothershipResourceType } from '@/lib/copilot/resources/types' +import { useMothershipQueueStore } from '@/stores/mothership-queue/store' export interface TaskMetadata { id: string @@ -281,6 +282,7 @@ export function useDeleteTask(workspaceId?: string) { onSettled: (_data, _error, chatId) => { queryClient.invalidateQueries({ queryKey: taskKeys.list(workspaceId) }) queryClient.removeQueries({ queryKey: taskKeys.detail(chatId) }) + useMothershipQueueStore.getState().clearChat(chatId) }, }) } @@ -296,8 +298,10 @@ export function useDeleteTasks(workspaceId?: string) { }, onSettled: (_data, _error, chatIds) => { queryClient.invalidateQueries({ queryKey: taskKeys.list(workspaceId) }) + const queueStore = useMothershipQueueStore.getState() for (const chatId of chatIds) { queryClient.removeQueries({ queryKey: taskKeys.detail(chatId) }) + queueStore.clearChat(chatId) } }, }) diff --git a/apps/sim/stores/mothership-queue/store.test.ts b/apps/sim/stores/mothership-queue/store.test.ts new file mode 100644 index 0000000000..c309192730 --- /dev/null +++ b/apps/sim/stores/mothership-queue/store.test.ts @@ -0,0 +1,180 @@ +/** + * @vitest-environment node + */ + +import { beforeEach, describe, expect, it } from 'vitest' +import { useMothershipQueueStore } from '@/stores/mothership-queue/store' +import type { QueuedMothershipMessage } from '@/stores/mothership-queue/types' + +const message = (id: string, content = `content-${id}`): QueuedMothershipMessage => ({ + id, + content, +}) + +describe('useMothershipQueueStore', () => { + beforeEach(() => { + useMothershipQueueStore.getState().reset() + }) + + describe('enqueue / remove', () => { + it('appends to the chat bucket', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + useMothershipQueueStore.getState().enqueue('chat-A', message('m2')) + expect(useMothershipQueueStore.getState().queues['chat-A']?.map((m) => m.id)).toEqual([ + 'm1', + 'm2', + ]) + }) + + it('keeps buckets isolated per chat', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + useMothershipQueueStore.getState().enqueue('chat-B', message('n1')) + const state = useMothershipQueueStore.getState() + expect(state.queues['chat-A']?.map((m) => m.id)).toEqual(['m1']) + expect(state.queues['chat-B']?.map((m) => m.id)).toEqual(['n1']) + }) + + it('removes the chat bucket entirely when the last message is removed', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + useMothershipQueueStore.getState().remove('chat-A', 'm1') + expect(useMothershipQueueStore.getState().queues['chat-A']).toBeUndefined() + }) + + it('clears editing when the editing message is removed', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + useMothershipQueueStore.getState().setEditing('chat-A', 'm1') + useMothershipQueueStore.getState().remove('chat-A', 'm1') + expect(useMothershipQueueStore.getState().editing['chat-A']).toBeUndefined() + }) + + it('preserves editing when a different message is removed', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + useMothershipQueueStore.getState().enqueue('chat-A', message('m2')) + useMothershipQueueStore.getState().setEditing('chat-A', 'm1') + useMothershipQueueStore.getState().remove('chat-A', 'm2') + expect(useMothershipQueueStore.getState().editing['chat-A']).toBe('m1') + }) + }) + + describe('insertAt', () => { + it('inserts at the requested index', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + useMothershipQueueStore.getState().enqueue('chat-A', message('m3')) + useMothershipQueueStore.getState().insertAt('chat-A', 1, message('m2')) + expect(useMothershipQueueStore.getState().queues['chat-A']?.map((m) => m.id)).toEqual([ + 'm1', + 'm2', + 'm3', + ]) + }) + + it('clamps an out-of-range index to the end', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + useMothershipQueueStore.getState().insertAt('chat-A', 99, message('m2')) + expect(useMothershipQueueStore.getState().queues['chat-A']?.map((m) => m.id)).toEqual([ + 'm1', + 'm2', + ]) + }) + + it('ignores duplicate ids', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + useMothershipQueueStore.getState().insertAt('chat-A', 0, message('m1')) + expect(useMothershipQueueStore.getState().queues['chat-A']?.length).toBe(1) + }) + }) + + describe('replaceAt', () => { + it('overwrites content while preserving id and index', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1', 'orig-1')) + useMothershipQueueStore.getState().enqueue('chat-A', message('m2', 'orig-2')) + useMothershipQueueStore.getState().enqueue('chat-A', message('m3', 'orig-3')) + + useMothershipQueueStore.getState().replaceAt('chat-A', 'm2', { content: 'edited-2' }) + + const queue = useMothershipQueueStore.getState().queues['chat-A'] + expect(queue?.map((m) => m.id)).toEqual(['m1', 'm2', 'm3']) + expect(queue?.[1]?.content).toBe('edited-2') + }) + + it('is a no-op when the id is no longer in the queue', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + const before = useMothershipQueueStore.getState().queues['chat-A'] + useMothershipQueueStore.getState().replaceAt('chat-A', 'missing', { content: 'x' }) + expect(useMothershipQueueStore.getState().queues['chat-A']).toBe(before) + }) + + it('strips queuedSendHandoff on edit so a fresh handoff is minted at send time', () => { + const original: QueuedMothershipMessage = { + id: 'm1', + content: 'orig', + queuedSendHandoff: { id: 'm1', supersededStreamId: 'stream-x' }, + } + useMothershipQueueStore.getState().enqueue('chat-A', original) + useMothershipQueueStore.getState().replaceAt('chat-A', 'm1', { content: 'edited' }) + const replaced = useMothershipQueueStore.getState().queues['chat-A']?.[0] + expect(replaced?.queuedSendHandoff).toBeUndefined() + expect(replaced?.content).toBe('edited') + }) + }) + + describe('migrate', () => { + it('moves both queue and editing from sentinel to resolved chatId', () => { + const pendingKey = 'pending::abc' + useMothershipQueueStore.getState().enqueue(pendingKey, message('m1')) + useMothershipQueueStore.getState().setEditing(pendingKey, 'm1') + useMothershipQueueStore.getState().migrate(pendingKey, 'chat-X') + const state = useMothershipQueueStore.getState() + expect(state.queues[pendingKey]).toBeUndefined() + expect(state.editing[pendingKey]).toBeUndefined() + expect(state.queues['chat-X']?.map((m) => m.id)).toEqual(['m1']) + expect(state.editing['chat-X']).toBe('m1') + }) + + it('is a no-op when source and target are the same', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + const before = useMothershipQueueStore.getState().queues['chat-A'] + useMothershipQueueStore.getState().migrate('chat-A', 'chat-A') + expect(useMothershipQueueStore.getState().queues['chat-A']).toBe(before) + }) + + it('is a no-op when the source bucket is empty', () => { + const before = useMothershipQueueStore.getState().queues + useMothershipQueueStore.getState().migrate('nope', 'chat-X') + expect(useMothershipQueueStore.getState().queues).toBe(before) + }) + + it('merges into an existing destination bucket instead of overwriting', () => { + useMothershipQueueStore.getState().enqueue('chat-X', message('existing-1')) + useMothershipQueueStore.getState().enqueue('chat-X', message('existing-2')) + useMothershipQueueStore.getState().enqueue('pending::abc', message('pending-1')) + useMothershipQueueStore.getState().migrate('pending::abc', 'chat-X') + expect(useMothershipQueueStore.getState().queues['chat-X']?.map((m) => m.id)).toEqual([ + 'existing-1', + 'existing-2', + 'pending-1', + ]) + expect(useMothershipQueueStore.getState().queues['pending::abc']).toBeUndefined() + }) + }) + + describe('clearChat', () => { + it('drops queue and editing for the chat', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + useMothershipQueueStore.getState().setEditing('chat-A', 'm1') + useMothershipQueueStore.getState().clearChat('chat-A') + const state = useMothershipQueueStore.getState() + expect(state.queues['chat-A']).toBeUndefined() + expect(state.editing['chat-A']).toBeUndefined() + }) + }) + + describe('setEditing', () => { + it('stores and clears the editing id', () => { + useMothershipQueueStore.getState().setEditing('chat-A', 'm1') + expect(useMothershipQueueStore.getState().editing['chat-A']).toBe('m1') + useMothershipQueueStore.getState().setEditing('chat-A', null) + expect(useMothershipQueueStore.getState().editing['chat-A']).toBeUndefined() + }) + }) +}) diff --git a/apps/sim/stores/mothership-queue/store.ts b/apps/sim/stores/mothership-queue/store.ts new file mode 100644 index 0000000000..5697d9ac47 --- /dev/null +++ b/apps/sim/stores/mothership-queue/store.ts @@ -0,0 +1,179 @@ +import { createLogger } from '@sim/logger' +import { toError } from '@sim/utils/errors' +import { create } from 'zustand' +import { createJSONStorage, devtools, persist } from 'zustand/middleware' +import type { MothershipQueueState, QueuedMothershipMessage } from '@/stores/mothership-queue/types' + +const logger = createLogger('MothershipQueueStore') + +/** + * Per-tab sessionStorage adapter — no-ops on SSR and tolerates quota errors. + * + * We persist to sessionStorage (not localStorage like `mothership-drafts`) + * because the queue auto-drains on rehydrate: tab close should not fire those + * sends days later. + */ +const sessionStorageAdapter = { + getItem: (name: string): string | null => { + if (typeof sessionStorage === 'undefined') return null + try { + return sessionStorage.getItem(name) + } catch (error) { + logger.warn('Failed to read mothership queue from sessionStorage', toError(error)) + return null + } + }, + setItem: (name: string, value: string): void => { + if (typeof sessionStorage === 'undefined') return + try { + sessionStorage.setItem(name, value) + } catch (error) { + logger.warn('Failed to persist mothership queue to sessionStorage', toError(error)) + } + }, + removeItem: (name: string): void => { + if (typeof sessionStorage === 'undefined') return + try { + sessionStorage.removeItem(name) + } catch (error) { + logger.warn('Failed to remove mothership queue from sessionStorage', toError(error)) + } + }, +} + +const initialState = { + queues: {} as Record, + editing: {} as Record, +} + +const omitKey = (record: Record, key: string): Record => { + if (!(key in record)) return record + const { [key]: _removed, ...rest } = record + return rest +} + +const setQueueForChat = ( + queues: Record, + chatKey: string, + next: QueuedMothershipMessage[] +): Record => + next.length === 0 ? omitKey(queues, chatKey) : { ...queues, [chatKey]: next } + +// Drop the volatile `queuedSendHandoff` from the persisted snapshot — its +// stream reference is meaningless after reload; the dispatcher mints a fresh +// one at send time if needed. +const stripVolatile = (message: QueuedMothershipMessage): QueuedMothershipMessage => { + if (!message.queuedSendHandoff) return message + const { queuedSendHandoff: _drop, ...rest } = message + return rest +} + +export const useMothershipQueueStore = create()( + devtools( + persist( + (set) => ({ + ...initialState, + + enqueue: (chatKey, message) => + set((state) => ({ + queues: setQueueForChat(state.queues, chatKey, [ + ...(state.queues[chatKey] ?? []), + message, + ]), + })), + + insertAt: (chatKey, index, message) => + set((state) => { + const current = state.queues[chatKey] ?? [] + if (current.some((m) => m.id === message.id)) return state + const next = [...current] + next.splice(Math.max(0, Math.min(index, next.length)), 0, message) + return { queues: setQueueForChat(state.queues, chatKey, next) } + }), + + replaceAt: (chatKey, id, patch) => + set((state) => { + const current = state.queues[chatKey] ?? [] + const index = current.findIndex((m) => m.id === id) + if (index === -1) return state + const next = [...current] + // Strip `queuedSendHandoff` — references the stream active at + // original enqueue time; the dispatcher mints a fresh one at send. + const { queuedSendHandoff: _stale, ...rest } = next[index] + next[index] = { + ...rest, + content: patch.content, + fileAttachments: patch.fileAttachments, + contexts: patch.contexts, + } + return { queues: setQueueForChat(state.queues, chatKey, next) } + }), + + remove: (chatKey, id) => + set((state) => { + const current = state.queues[chatKey] ?? [] + const next = current.filter((m) => m.id !== id) + const wasEditingThis = state.editing[chatKey] === id + if (next.length === current.length) { + return wasEditingThis ? { editing: omitKey(state.editing, chatKey) } : state + } + return { + queues: setQueueForChat(state.queues, chatKey, next), + ...(wasEditingThis ? { editing: omitKey(state.editing, chatKey) } : {}), + } + }), + + setEditing: (chatKey, id) => + set((state) => ({ + editing: + id === null ? omitKey(state.editing, chatKey) : { ...state.editing, [chatKey]: id }, + })), + + migrate: (fromKey, toKey) => + set((state) => { + if (fromKey === toKey) return state + const fromQueue = state.queues[fromKey] + const fromEditing = state.editing[fromKey] + if (!fromQueue && fromEditing === undefined) return state + + const queues = omitKey(state.queues, fromKey) + if (fromQueue && fromQueue.length > 0) { + // Merge defensively in case a stale bucket survived in + // sessionStorage. FIFO: existing first, then the resolved stream. + const existing = state.queues[toKey] ?? [] + queues[toKey] = [...existing, ...fromQueue] + } + const editing = omitKey(state.editing, fromKey) + if (fromEditing !== undefined) { + editing[toKey] = fromEditing + } + return { queues, editing } + }), + + clearChat: (chatKey) => + set((state) => ({ + queues: omitKey(state.queues, chatKey), + editing: omitKey(state.editing, chatKey), + })), + + reset: () => set(initialState), + }), + { + name: 'mothership-queue', + storage: createJSONStorage(() => sessionStorageAdapter), + // `editing` is intentionally omitted — the composer that holds the + // edit text is component-local and empty after reload, so a persisted + // editing flag would render an in-edit row with nothing bound. + partialize: (state) => ({ + queues: Object.fromEntries( + Object.entries(state.queues).map(([key, messages]) => [ + key, + messages.map(stripVolatile), + ]) + ), + }), + } + ), + { name: 'mothership-queue-store' } + ) +) diff --git a/apps/sim/stores/mothership-queue/types.ts b/apps/sim/stores/mothership-queue/types.ts new file mode 100644 index 0000000000..b39dff9b8e --- /dev/null +++ b/apps/sim/stores/mothership-queue/types.ts @@ -0,0 +1,30 @@ +import type { QueuedMessage } from '@/app/workspace/[workspaceId]/home/types' + +// Volatile — lets the dispatcher claim an in-flight stream's slot. Not persisted. +export interface QueuedSendHandoffSeed { + id: string + chatId?: string + supersededStreamId: string | null + userMessageId?: string +} + +export type QueuedMothershipMessage = QueuedMessage & { + queuedSendHandoff?: QueuedSendHandoffSeed +} + +// Mutable fields an in-place edit overwrites; id and index are preserved by `replaceAt`. +export type QueuedMessageEditPatch = Pick + +export interface MothershipQueueState { + queues: Record + editing: Record + + enqueue: (chatKey: string, message: QueuedMothershipMessage) => void + insertAt: (chatKey: string, index: number, message: QueuedMothershipMessage) => void + replaceAt: (chatKey: string, id: string, patch: QueuedMessageEditPatch) => void + remove: (chatKey: string, id: string) => void + setEditing: (chatKey: string, id: string | null) => void + migrate: (fromKey: string, toKey: string) => void + clearChat: (chatKey: string) => void + reset: () => void +}