From 9aab186ed86f28082ef8242f1fea31b39e1b33e6 Mon Sep 17 00:00:00 2001 From: Alessandro Pogliaghi Date: Wed, 6 May 2026 16:32:47 +0100 Subject: [PATCH 1/3] fix: keep cloud task follow-ups connected --- .../main/services/cloud-task/service.test.ts | 26 ++++---- .../src/main/services/cloud-task/service.ts | 16 ++++- .../components/buildConversationItems.ts | 1 + .../components/mergeConversationItems.test.ts | 13 +++- .../components/mergeConversationItems.ts | 14 ++-- .../features/sessions/service/service.test.ts | 13 +++- .../features/sessions/service/service.ts | 65 +++++++++++++------ .../features/sessions/stores/sessionStore.ts | 12 ++++ 8 files changed, 118 insertions(+), 42 deletions(-) diff --git a/apps/code/src/main/services/cloud-task/service.test.ts b/apps/code/src/main/services/cloud-task/service.test.ts index 76229e9d6..892006a93 100644 --- a/apps/code/src/main/services/cloud-task/service.test.ts +++ b/apps/code/src/main/services/cloud-task/service.test.ts @@ -531,7 +531,7 @@ describe("CloudTaskService", () => { ]); }); - it("stops watching after clean stream completion even when the run remains active", async () => { + it("reconnects after clean stream completion when the run remains active", async () => { vi.useFakeTimers(); const updates: unknown[] = []; @@ -564,7 +564,9 @@ describe("CloudTaskService", () => { ); }); - mockStreamFetch.mockResolvedValueOnce(createSseResponse("")); + mockStreamFetch + .mockResolvedValueOnce(createSseResponse("")) + .mockResolvedValueOnce(createOpenSseResponse("")); service.watch({ taskId: "task-1", @@ -574,14 +576,8 @@ describe("CloudTaskService", () => { }); await waitFor(() => mockStreamFetch.mock.calls.length === 1); - await waitFor( - () => - !( - service as unknown as { - watchers: Map; - } - ).watchers.has("task-1:run-1"), - ); + await vi.advanceTimersByTimeAsync(2_000); + await waitFor(() => mockStreamFetch.mock.calls.length === 2); expect(updates).toContainEqual( expect.objectContaining({ @@ -592,9 +588,13 @@ describe("CloudTaskService", () => { }), ); - await vi.advanceTimersByTimeAsync(70_000); - - expect(mockStreamFetch).toHaveBeenCalledTimes(1); + expect( + ( + service as unknown as { + watchers: Map; + } + ).watchers.has("task-1:run-1"), + ).toBe(true); }); it("emits a retryable cloud error after repeated stream failures", async () => { diff --git a/apps/code/src/main/services/cloud-task/service.ts b/apps/code/src/main/services/cloud-task/service.ts index 094bba43d..aefbb4821 100644 --- a/apps/code/src/main/services/cloud-task/service.ts +++ b/apps/code/src/main/services/cloud-task/service.ts @@ -655,7 +655,7 @@ export class CloudTaskService extends TypedEventEmitter { return; } - await this.handleStreamCompletion(key, { reconnectIfNonTerminal: false }); + await this.handleStreamCompletion(key, { reconnectIfNonTerminal: true }); } catch (error) { this.flushLogBatch(key); @@ -1034,9 +1034,21 @@ export class CloudTaskService extends TypedEventEmitter { return; } - this.applyTaskRunState(watcher, run); + const stateChanged = this.applyTaskRunState(watcher, run); if (!isTerminalStatus(watcher.lastStatus) && reconnectIfNonTerminal) { + if (stateChanged) { + this.emit(CloudTaskEvent.Update, { + taskId: watcher.taskId, + runId: watcher.runId, + kind: "status", + status: watcher.lastStatus ?? undefined, + stage: watcher.lastStage, + output: watcher.lastOutput, + errorMessage: watcher.lastErrorMessage, + branch: watcher.lastBranch, + }); + } log.warn("Cloud task stream ended before terminal status", { key, status: watcher.lastStatus, diff --git a/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts b/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts index 244a1e737..fbd0d1ee4 100644 --- a/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts +++ b/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts @@ -37,6 +37,7 @@ export type ConversationItem = content: string; timestamp: number; attachments?: UserMessageAttachment[]; + pinToTop?: boolean; } | { type: "git_action"; id: string; actionType: GitActionType } | { type: "skill_button_action"; id: string; buttonId: SkillButtonId } diff --git a/apps/code/src/renderer/features/sessions/components/mergeConversationItems.test.ts b/apps/code/src/renderer/features/sessions/components/mergeConversationItems.test.ts index a8434c718..de558bf23 100644 --- a/apps/code/src/renderer/features/sessions/components/mergeConversationItems.test.ts +++ b/apps/code/src/renderer/features/sessions/components/mergeConversationItems.test.ts @@ -6,8 +6,9 @@ import { mergeConversationItems } from "./mergeConversationItems"; function userMessage( id: string, content: string, + pinToTop?: boolean, ): Extract { - return { type: "user_message", id, content, timestamp: 0 }; + return { type: "user_message", id, content, timestamp: 0, pinToTop }; } function queuedItem( @@ -99,4 +100,14 @@ describe("mergeConversationItems", () => { }); expect(result.map((i) => i.id)).toEqual(["opt", "setup", "q1"]); }); + + it("cloud: renders follow-up optimistic messages at the tail", () => { + const result = mergeConversationItems({ + conversationItems: [userMessage("setup", "setup")], + optimisticItems: [userMessage("opt", "follow up", false)], + queuedItems: [], + isCloud: true, + }); + expect(result.map((i) => i.id)).toEqual(["setup", "opt"]); + }); }); diff --git a/apps/code/src/renderer/features/sessions/components/mergeConversationItems.ts b/apps/code/src/renderer/features/sessions/components/mergeConversationItems.ts index 8aa755cb1..59654aee5 100644 --- a/apps/code/src/renderer/features/sessions/components/mergeConversationItems.ts +++ b/apps/code/src/renderer/features/sessions/components/mergeConversationItems.ts @@ -10,9 +10,8 @@ interface MergeConversationItemsArgs { } // Cloud's initial optimistic is pinned to the top so the user's prompt stays -// visible above setup progress. When the agent echoes it back via -// `session/prompt`, the duplicate `user_message` is filtered out by content -// match so the bubble doesn't disappear-then-reappear when the echo lands. +// visible above setup progress. Follow-up optimistics render at the tail until +// the streamed `session/prompt` arrives and replaces them. // // Local sessions keep optimistic at the chronological end — they rely on // `replaceOptimisticWithEvent` to swap optimistic↔real in place. @@ -30,6 +29,12 @@ export function mergeConversationItems({ return queuedItems.length > 0 ? [...result, ...queuedItems] : result; } + const pinnedOptimisticItems = optimisticItems.filter( + (item) => item.type !== "user_message" || item.pinToTop !== false, + ); + const tailOptimisticItems = optimisticItems.filter( + (item) => item.type === "user_message" && item.pinToTop === false, + ); const optimisticUserContents = new Set( optimisticItems .filter( @@ -46,8 +51,9 @@ export function mergeConversationItems({ return !optimisticUserContents.has(item.content); }); const result: ConversationItem[] = [ - ...optimisticItems, + ...pinnedOptimisticItems, ...dedupedConversation, + ...tailOptimisticItems, ]; return queuedItems.length > 0 ? [...result, ...queuedItems] : result; } diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index 3337d15c0..c3664e7e8 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -89,6 +89,7 @@ const mockSessionStoreSetters = vi.hoisted(() => ({ clearAll: vi.fn(), appendOptimisticItem: vi.fn(), clearOptimisticItems: vi.fn(), + clearTailOptimisticItems: vi.fn(), replaceOptimisticWithEvent: vi.fn(), })); @@ -2376,7 +2377,7 @@ describe("SessionService", () => { ); mockTrpcCloudTask.sendCommand.mutate.mockResolvedValue({ success: true, - result: { stopReason: "end_turn" }, + result: { queued: true }, }); mockTrpcFs.readFileAsBase64.query.mockResolvedValue("aGVsbG8="); mockAuthenticatedClient.prepareTaskRunArtifactUploads.mockResolvedValue([ @@ -2424,8 +2425,16 @@ describe("SessionService", () => { const result = await service.sendPrompt("task-123", prompt); - expect(result.stopReason).toBe("end_turn"); + expect(result.stopReason).toBe("queued"); expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledTimes(1); + expect(mockSessionStoreSetters.appendOptimisticItem).toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ + type: "user_message", + content: "read this\n\nAttached files: test.txt", + pinToTop: false, + }), + ); expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledWith( expect.objectContaining({ diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 8a42c9ff7..50a37be93 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -142,6 +142,14 @@ function extractLatestConfigOptionsFromEntries( return latest; } +function hasSessionPromptEvent(events: AcpMessage[]): boolean { + return events.some( + (event) => + isJsonRpcRequest(event.message) && + event.message.method === "session/prompt", + ); +} + function buildCloudDefaultConfigOptions( initialMode: string | undefined, adapter: Adapter = "claude", @@ -1729,6 +1737,18 @@ export class SessionService { if (!auth || !cloudCommandAuth) { throw new Error("Authentication required for cloud commands"); } + + this.watchCloudTask( + session.taskId, + session.taskRunId, + cloudCommandAuth.apiHost, + cloudCommandAuth.teamId, + undefined, + session.logUrl, + undefined, + session.adapter ?? "claude", + ); + const artifactIds = await uploadRunAttachments( auth.client, session.taskId, @@ -1745,6 +1765,14 @@ export class SessionService { sessionStoreSetters.updateSession(session.taskRunId, { isPromptPending: true, + promptStartedAt: Date.now(), + pausedDurationMs: 0, + }); + sessionStoreSetters.appendOptimisticItem(session.taskRunId, { + type: "user_message", + content: transport.promptText, + timestamp: Date.now(), + pinToTop: false, }); track(ANALYTICS_EVENTS.PROMPT_SENT, { @@ -1764,36 +1792,24 @@ export class SessionService { params, }); - sessionStoreSetters.updateSession(session.taskRunId, { - isPromptPending: false, - }); - if (!result.success) { throw new Error(result.error ?? "Failed to send cloud command"); } - const stopReason = - (result.result as { stopReason?: string })?.stopReason ?? "end_turn"; - - const freshSession = sessionStoreSetters.getSessionByTaskId( - session.taskId, - ); - if (freshSession && freshSession.messageQueue.length > 0) { - setTimeout(() => { - this.sendQueuedCloudMessages(session.taskId).catch((err) => { - log.error("Failed to send queued cloud messages", { - taskId: session.taskId, - error: err, - }); - }); - }, 0); - } + const commandResult = result.result as + | { queued?: boolean; stopReason?: string } + | undefined; + const stopReason = commandResult?.queued + ? "queued" + : (commandResult?.stopReason ?? "end_turn"); return { stopReason }; } catch (error) { sessionStoreSetters.updateSession(session.taskRunId, { isPromptPending: false, + promptStartedAt: null, }); + sessionStoreSetters.clearTailOptimisticItems(session.taskRunId); throw error; } } @@ -3197,6 +3213,9 @@ export class SessionService { session, newEvents, ); + if (hasSessionPromptEvent(newEvents)) { + sessionStoreSetters.clearTailOptimisticItems(taskRunId); + } sessionStoreSetters.appendEvents(taskRunId, newEvents, expectedCount); this.updatePromptStateFromEvents(taskRunId, newEvents); } else { @@ -3526,6 +3545,9 @@ export class SessionService { if (rawEntries.length >= expectedCount) { const events = convertStoredEntriesToEvents(rawEntries); + if (hasSessionPromptEvent(events)) { + sessionStoreSetters.clearTailOptimisticItems(taskRunId); + } sessionStoreSetters.updateSession(taskRunId, { events, isCloud: true, @@ -3544,6 +3566,9 @@ export class SessionService { }); let newEvents = convertStoredEntriesToEvents(newEntries); newEvents = this.filterSkippedPromptEvents(taskRunId, session, newEvents); + if (hasSessionPromptEvent(newEvents)) { + sessionStoreSetters.clearTailOptimisticItems(taskRunId); + } sessionStoreSetters.appendEvents( taskRunId, newEvents, diff --git a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts index 5a0cb9ca8..23257d7a7 100644 --- a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts +++ b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts @@ -32,6 +32,7 @@ export type OptimisticItem = id: string; content: string; timestamp: number; + pinToTop?: boolean; } | { type: "skill_button_action"; @@ -453,6 +454,17 @@ export const sessionStoreSetters = { }); }, + clearTailOptimisticItems: (taskRunId: string): void => { + useSessionStore.setState((state) => { + const session = state.sessions[taskRunId]; + if (session) { + session.optimisticItems = session.optimisticItems.filter( + (item) => item.type !== "user_message" || item.pinToTop !== false, + ); + } + }); + }, + replaceOptimisticWithEvent: (taskRunId: string, event: AcpMessage): void => { useSessionStore.setState((state) => { const session = state.sessions[taskRunId]; From 4337e973574388677265277d0da7ef4746dddd3d Mon Sep 17 00:00:00 2001 From: Alessandro Pogliaghi Date: Wed, 6 May 2026 17:13:03 +0100 Subject: [PATCH 2/3] fix: stabilize agent server keepalive test --- .../agent/src/server/agent-server.test.ts | 38 +++++++++++++++++-- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/packages/agent/src/server/agent-server.test.ts b/packages/agent/src/server/agent-server.test.ts index b995884fd..33b5bc805 100644 --- a/packages/agent/src/server/agent-server.test.ts +++ b/packages/agent/src/server/agent-server.test.ts @@ -292,12 +292,41 @@ describe("AgentServer HTTP Mode", () => { let reader: ReadableStreamDefaultReader | null = null; try { - await createServer().start(); + const testServer = createServer() as unknown as { + app: { + fetch: (request: Request) => Promise | Response; + }; + session: unknown; + }; + testServer.session = { + payload: { + run_id: "test-run-id", + task_id: "test-task-id", + team_id: 1, + user_id: 1, + distinct_id: "test-distinct-id", + mode: "interactive", + }, + acpSessionId: "session-1", + acpConnection: { cleanup: vi.fn().mockResolvedValue(undefined) }, + clientConnection: {}, + sseController: null, + deviceInfo: { type: "cloud" }, + logWriter: { + appendRawLine: vi.fn(), + flush: vi.fn().mockResolvedValue(undefined), + }, + permissionMode: "default", + hasDesktopConnected: false, + }; + const token = createToken(); - const response = await fetch(`http://localhost:${port}/events`, { - headers: { Authorization: `Bearer ${token}` }, - }); + const response = await testServer.app.fetch( + new Request(`http://localhost:${port}/events`, { + headers: { Authorization: `Bearer ${token}` }, + }), + ); expect(response.status).toBe(200); expect(response.body).not.toBeNull(); @@ -329,6 +358,7 @@ describe("AgentServer HTTP Mode", () => { expect(streamText).not.toContain('"type":"keepalive"'); } finally { await reader?.cancel(); + server = undefined; setIntervalSpy.mockRestore(); } }, 20000); From 98581d8d34b7109fd155be428a8ef4c51b0e423c Mon Sep 17 00:00:00 2001 From: Alessandro Pogliaghi Date: Wed, 6 May 2026 17:33:54 +0100 Subject: [PATCH 3/3] fix: harden cloud follow-up UI handling --- .../main/services/cloud-task/service.test.ts | 17 ++++++--- .../src/main/services/cloud-task/service.ts | 34 +++++++++++++---- .../components/mergeConversationItems.test.ts | 13 +++++++ .../components/mergeConversationItems.ts | 8 ++-- .../features/sessions/service/service.test.ts | 38 +++++++++++++++++++ .../features/sessions/service/service.ts | 4 +- 6 files changed, 97 insertions(+), 17 deletions(-) diff --git a/apps/code/src/main/services/cloud-task/service.test.ts b/apps/code/src/main/services/cloud-task/service.test.ts index 892006a93..c8a62e56e 100644 --- a/apps/code/src/main/services/cloud-task/service.test.ts +++ b/apps/code/src/main/services/cloud-task/service.test.ts @@ -564,9 +564,9 @@ describe("CloudTaskService", () => { ); }); - mockStreamFetch - .mockResolvedValueOnce(createSseResponse("")) - .mockResolvedValueOnce(createOpenSseResponse("")); + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("")), + ); service.watch({ taskId: "task-1", @@ -576,8 +576,7 @@ describe("CloudTaskService", () => { }); await waitFor(() => mockStreamFetch.mock.calls.length === 1); - await vi.advanceTimersByTimeAsync(2_000); - await waitFor(() => mockStreamFetch.mock.calls.length === 2); + await waitFor(() => mockStreamFetch.mock.calls.length >= 7, 20_000); expect(updates).toContainEqual( expect.objectContaining({ @@ -587,6 +586,14 @@ describe("CloudTaskService", () => { output: { pr_url: prUrl }, }), ); + expect( + updates.some( + (update) => + typeof update === "object" && + update !== null && + (update as { kind?: string }).kind === "error", + ), + ).toBe(false); expect( ( diff --git a/apps/code/src/main/services/cloud-task/service.ts b/apps/code/src/main/services/cloud-task/service.ts index aefbb4821..1fc19b730 100644 --- a/apps/code/src/main/services/cloud-task/service.ts +++ b/apps/code/src/main/services/cloud-task/service.ts @@ -549,7 +549,7 @@ export class CloudTaskService extends TypedEventEmitter { if (watcher.needsPostBootstrapReconnect) { watcher.needsPostBootstrapReconnect = false; - this.scheduleReconnect(key); + this.scheduleReconnect(key, undefined, { countAttempt: false }); } void this.verifyPostBootstrapStatus(key); @@ -677,7 +677,11 @@ export class CloudTaskService extends TypedEventEmitter { key, error: errorMessage, }); - await this.handleStreamCompletion(key, { reconnectIfNonTerminal: true }); + await this.handleStreamCompletion(key, { + reconnectIfNonTerminal: true, + reconnectError: error, + countReconnectAttempt: true, + }); } finally { const currentWatcher = this.watchers.get(key); if (currentWatcher?.sseAbortController === controller) { @@ -952,7 +956,11 @@ export class CloudTaskService extends TypedEventEmitter { }); } - private scheduleReconnect(key: string, error?: unknown): void { + private scheduleReconnect( + key: string, + error?: unknown, + options: { countAttempt?: boolean } = {}, + ): void { const watcher = this.watchers.get(key); if (!watcher || watcher.failed || isTerminalStatus(watcher.lastStatus)) { return; @@ -962,7 +970,12 @@ export class CloudTaskService extends TypedEventEmitter { clearTimeout(watcher.reconnectTimeoutId); } - watcher.reconnectAttempts += 1; + const countAttempt = options.countAttempt ?? true; + if (countAttempt) { + watcher.reconnectAttempts += 1; + } else { + watcher.reconnectAttempts = 0; + } if (watcher.reconnectAttempts > MAX_SSE_RECONNECT_ATTEMPTS) { const details = error instanceof CloudTaskStreamError @@ -978,7 +991,8 @@ export class CloudTaskService extends TypedEventEmitter { } const delay = Math.min( - SSE_RECONNECT_BASE_DELAY_MS * 2 ** (watcher.reconnectAttempts - 1), + SSE_RECONNECT_BASE_DELAY_MS * + 2 ** Math.max(watcher.reconnectAttempts - 1, 0), SSE_RECONNECT_MAX_DELAY_MS, ); @@ -995,7 +1009,11 @@ export class CloudTaskService extends TypedEventEmitter { private async handleStreamCompletion( key: string, - options: { reconnectIfNonTerminal: boolean }, + options: { + reconnectIfNonTerminal: boolean; + reconnectError?: unknown; + countReconnectAttempt?: boolean; + }, ): Promise { const watcher = this.watchers.get(key); if (!watcher) return; @@ -1053,7 +1071,9 @@ export class CloudTaskService extends TypedEventEmitter { key, status: watcher.lastStatus, }); - this.scheduleReconnect(key); + this.scheduleReconnect(key, options.reconnectError, { + countAttempt: options.countReconnectAttempt ?? false, + }); return; } diff --git a/apps/code/src/renderer/features/sessions/components/mergeConversationItems.test.ts b/apps/code/src/renderer/features/sessions/components/mergeConversationItems.test.ts index de558bf23..fe8f5ebf8 100644 --- a/apps/code/src/renderer/features/sessions/components/mergeConversationItems.test.ts +++ b/apps/code/src/renderer/features/sessions/components/mergeConversationItems.test.ts @@ -110,4 +110,17 @@ describe("mergeConversationItems", () => { }); expect(result.map((i) => i.id)).toEqual(["setup", "opt"]); }); + + it("cloud: does not dedupe historical messages against tail follow-up optimistics", () => { + const result = mergeConversationItems({ + conversationItems: [ + userMessage("old", "repeat"), + userMessage("setup", "setup"), + ], + optimisticItems: [userMessage("opt", "repeat", false)], + queuedItems: [], + isCloud: true, + }); + expect(result.map((i) => i.id)).toEqual(["old", "setup", "opt"]); + }); }); diff --git a/apps/code/src/renderer/features/sessions/components/mergeConversationItems.ts b/apps/code/src/renderer/features/sessions/components/mergeConversationItems.ts index 59654aee5..c3cf30998 100644 --- a/apps/code/src/renderer/features/sessions/components/mergeConversationItems.ts +++ b/apps/code/src/renderer/features/sessions/components/mergeConversationItems.ts @@ -35,8 +35,8 @@ export function mergeConversationItems({ const tailOptimisticItems = optimisticItems.filter( (item) => item.type === "user_message" && item.pinToTop === false, ); - const optimisticUserContents = new Set( - optimisticItems + const pinnedOptimisticUserContents = new Set( + pinnedOptimisticItems .filter( (item): item is Extract => item.type === "user_message", @@ -44,11 +44,11 @@ export function mergeConversationItems({ .map((item) => item.content), ); const dedupedConversation = - optimisticUserContents.size === 0 + pinnedOptimisticUserContents.size === 0 ? conversationItems : conversationItems.filter((item) => { if (item.type !== "user_message") return true; - return !optimisticUserContents.has(item.content); + return !pinnedOptimisticUserContents.has(item.content); }); const result: ConversationItem[] = [ ...pinnedOptimisticItems, diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index c3664e7e8..953c1f3df 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -68,6 +68,7 @@ const mockSessionStoreSetters = vi.hoisted(() => ({ setSession: vi.fn(), removeSession: vi.fn(), updateSession: vi.fn(), + updateCloudStatus: vi.fn(), appendEvents: vi.fn(), enqueueMessage: vi.fn(), removeQueuedMessage: vi.fn(), @@ -895,6 +896,43 @@ describe("SessionService", () => { expect(unsubscribe).not.toHaveBeenCalled(); }); + it("preserves an existing status callback when reusing a watcher without one", () => { + const service = getSessionService(); + const onStatusChange = vi.fn(); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + onStatusChange, + ); + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + ); + + const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock + .calls[0][1] as { + onData: (update: { + kind: "status"; + taskId: string; + runId: string; + status: "in_progress"; + }) => void; + }; + subscribeOptions.onData({ + kind: "status", + taskId: "task-123", + runId: "run-123", + status: "in_progress", + }); + + expect(onStatusChange).toHaveBeenCalledTimes(1); + }); + it("hydrates a fresh cloud session from persisted logs before replay arrives", async () => { const service = getSessionService(); const hydratedSession = createMockSession({ diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 50a37be93..519f444f9 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -2586,7 +2586,9 @@ export class SessionService { existingWatcher.apiHost === apiHost && existingWatcher.teamId === teamId ) { - existingWatcher.onStatusChange = onStatusChange; + if (onStatusChange) { + existingWatcher.onStatusChange = onStatusChange; + } // Ensure configOptions is populated on revisit const existing = sessionStoreSetters.getSessionByTaskId(taskId); if (existing) {