Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 20 additions & 13 deletions apps/code/src/main/services/cloud-task/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ describe("CloudTaskService", () => {
]);
});

it("stops watching after clean stream completion even when the run remains active", async () => {
it("reconnects after clean stream completion when the run remains active", async () => {
vi.useFakeTimers();

const updates: unknown[] = [];
Expand Down Expand Up @@ -564,7 +564,9 @@ describe("CloudTaskService", () => {
);
});

mockStreamFetch.mockResolvedValueOnce(createSseResponse(""));
mockStreamFetch.mockImplementation(() =>
Promise.resolve(createSseResponse("")),
);

service.watch({
taskId: "task-1",
Expand All @@ -574,14 +576,7 @@ describe("CloudTaskService", () => {
});

await waitFor(() => mockStreamFetch.mock.calls.length === 1);
await waitFor(
() =>
!(
service as unknown as {
watchers: Map<string, unknown>;
}
).watchers.has("task-1:run-1"),
);
await waitFor(() => mockStreamFetch.mock.calls.length >= 7, 20_000);

expect(updates).toContainEqual(
expect.objectContaining({
Expand All @@ -591,10 +586,22 @@ describe("CloudTaskService", () => {
output: { pr_url: prUrl },
}),
);
expect(
updates.some(
(update) =>
typeof update === "object" &&
update !== null &&
(update as { kind?: string }).kind === "error",
),
).toBe(false);

await vi.advanceTimersByTimeAsync(70_000);

expect(mockStreamFetch).toHaveBeenCalledTimes(1);
expect(
(
service as unknown as {
watchers: Map<string, unknown>;
}
).watchers.has("task-1:run-1"),
).toBe(true);
});

it("emits a retryable cloud error after repeated stream failures", async () => {
Expand Down
50 changes: 41 additions & 9 deletions apps/code/src/main/services/cloud-task/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {

if (watcher.needsPostBootstrapReconnect) {
watcher.needsPostBootstrapReconnect = false;
this.scheduleReconnect(key);
this.scheduleReconnect(key, undefined, { countAttempt: false });
}

void this.verifyPostBootstrapStatus(key);
Expand Down Expand Up @@ -655,7 +655,7 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
return;
}

await this.handleStreamCompletion(key, { reconnectIfNonTerminal: false });
await this.handleStreamCompletion(key, { reconnectIfNonTerminal: true });
} catch (error) {
this.flushLogBatch(key);

Expand All @@ -677,7 +677,11 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
key,
error: errorMessage,
});
await this.handleStreamCompletion(key, { reconnectIfNonTerminal: true });
await this.handleStreamCompletion(key, {
reconnectIfNonTerminal: true,
reconnectError: error,
countReconnectAttempt: true,
});
} finally {
const currentWatcher = this.watchers.get(key);
if (currentWatcher?.sseAbortController === controller) {
Expand Down Expand Up @@ -952,7 +956,11 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
});
}

private scheduleReconnect(key: string, error?: unknown): void {
private scheduleReconnect(
key: string,
error?: unknown,
options: { countAttempt?: boolean } = {},
): void {
const watcher = this.watchers.get(key);
if (!watcher || watcher.failed || isTerminalStatus(watcher.lastStatus)) {
return;
Expand All @@ -962,7 +970,12 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
clearTimeout(watcher.reconnectTimeoutId);
}

watcher.reconnectAttempts += 1;
const countAttempt = options.countAttempt ?? true;
if (countAttempt) {
watcher.reconnectAttempts += 1;
} else {
watcher.reconnectAttempts = 0;
}
if (watcher.reconnectAttempts > MAX_SSE_RECONNECT_ATTEMPTS) {
const details =
error instanceof CloudTaskStreamError
Expand All @@ -978,7 +991,8 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
}

const delay = Math.min(
SSE_RECONNECT_BASE_DELAY_MS * 2 ** (watcher.reconnectAttempts - 1),
SSE_RECONNECT_BASE_DELAY_MS *
2 ** Math.max(watcher.reconnectAttempts - 1, 0),
SSE_RECONNECT_MAX_DELAY_MS,
);

Expand All @@ -995,7 +1009,11 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {

private async handleStreamCompletion(
key: string,
options: { reconnectIfNonTerminal: boolean },
options: {
reconnectIfNonTerminal: boolean;
reconnectError?: unknown;
countReconnectAttempt?: boolean;
},
): Promise<void> {
const watcher = this.watchers.get(key);
if (!watcher) return;
Expand Down Expand Up @@ -1034,14 +1052,28 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
return;
}

this.applyTaskRunState(watcher, run);
const stateChanged = this.applyTaskRunState(watcher, run);

if (!isTerminalStatus(watcher.lastStatus) && reconnectIfNonTerminal) {
if (stateChanged) {
this.emit(CloudTaskEvent.Update, {
taskId: watcher.taskId,
runId: watcher.runId,
kind: "status",
status: watcher.lastStatus ?? undefined,
stage: watcher.lastStage,
output: watcher.lastOutput,
errorMessage: watcher.lastErrorMessage,
branch: watcher.lastBranch,
});
}
log.warn("Cloud task stream ended before terminal status", {
key,
status: watcher.lastStatus,
});
this.scheduleReconnect(key);
this.scheduleReconnect(key, options.reconnectError, {
countAttempt: options.countReconnectAttempt ?? false,
});
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export type ConversationItem =
content: string;
timestamp: number;
attachments?: UserMessageAttachment[];
pinToTop?: boolean;
}
| { type: "git_action"; id: string; actionType: GitActionType }
| { type: "skill_button_action"; id: string; buttonId: SkillButtonId }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import { mergeConversationItems } from "./mergeConversationItems";
function userMessage(
id: string,
content: string,
pinToTop?: boolean,
): Extract<ConversationItem, { type: "user_message" }> {
return { type: "user_message", id, content, timestamp: 0 };
return { type: "user_message", id, content, timestamp: 0, pinToTop };
}

function queuedItem(
Expand Down Expand Up @@ -99,4 +100,27 @@ describe("mergeConversationItems", () => {
});
expect(result.map((i) => i.id)).toEqual(["opt", "setup", "q1"]);
});

it("cloud: renders follow-up optimistic messages at the tail", () => {
const result = mergeConversationItems({
conversationItems: [userMessage("setup", "setup")],
optimisticItems: [userMessage("opt", "follow up", false)],
queuedItems: [],
isCloud: true,
});
expect(result.map((i) => i.id)).toEqual(["setup", "opt"]);
});

it("cloud: does not dedupe historical messages against tail follow-up optimistics", () => {
const result = mergeConversationItems({
conversationItems: [
userMessage("old", "repeat"),
userMessage("setup", "setup"),
],
optimisticItems: [userMessage("opt", "repeat", false)],
queuedItems: [],
isCloud: true,
});
expect(result.map((i) => i.id)).toEqual(["old", "setup", "opt"]);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ interface MergeConversationItemsArgs {
}

// Cloud's initial optimistic is pinned to the top so the user's prompt stays
// visible above setup progress. When the agent echoes it back via
// `session/prompt`, the duplicate `user_message` is filtered out by content
// match so the bubble doesn't disappear-then-reappear when the echo lands.
// visible above setup progress. Follow-up optimistics render at the tail until
// the streamed `session/prompt` arrives and replaces them.
//
// Local sessions keep optimistic at the chronological end — they rely on
// `replaceOptimisticWithEvent` to swap optimistic↔real in place.
Expand All @@ -30,24 +29,31 @@ export function mergeConversationItems({
return queuedItems.length > 0 ? [...result, ...queuedItems] : result;
}

const optimisticUserContents = new Set(
optimisticItems
const pinnedOptimisticItems = optimisticItems.filter(
(item) => item.type !== "user_message" || item.pinToTop !== false,
);
const tailOptimisticItems = optimisticItems.filter(
(item) => item.type === "user_message" && item.pinToTop === false,
);
const pinnedOptimisticUserContents = new Set(
pinnedOptimisticItems
.filter(
(item): item is Extract<typeof item, { type: "user_message" }> =>
item.type === "user_message",
)
.map((item) => item.content),
);
const dedupedConversation =
optimisticUserContents.size === 0
pinnedOptimisticUserContents.size === 0
? conversationItems
: conversationItems.filter((item) => {
if (item.type !== "user_message") return true;
return !optimisticUserContents.has(item.content);
return !pinnedOptimisticUserContents.has(item.content);
});
const result: ConversationItem[] = [
...optimisticItems,
...pinnedOptimisticItems,
...dedupedConversation,
...tailOptimisticItems,
];
return queuedItems.length > 0 ? [...result, ...queuedItems] : result;
}
51 changes: 49 additions & 2 deletions apps/code/src/renderer/features/sessions/service/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const mockSessionStoreSetters = vi.hoisted(() => ({
setSession: vi.fn(),
removeSession: vi.fn(),
updateSession: vi.fn(),
updateCloudStatus: vi.fn(),
appendEvents: vi.fn(),
enqueueMessage: vi.fn(),
removeQueuedMessage: vi.fn(),
Expand All @@ -89,6 +90,7 @@ const mockSessionStoreSetters = vi.hoisted(() => ({
clearAll: vi.fn(),
appendOptimisticItem: vi.fn(),
clearOptimisticItems: vi.fn(),
clearTailOptimisticItems: vi.fn(),
replaceOptimisticWithEvent: vi.fn(),
}));

Expand Down Expand Up @@ -894,6 +896,43 @@ describe("SessionService", () => {
expect(unsubscribe).not.toHaveBeenCalled();
});

it("preserves an existing status callback when reusing a watcher without one", () => {
const service = getSessionService();
const onStatusChange = vi.fn();

service.watchCloudTask(
"task-123",
"run-123",
"https://api.anthropic.com",
123,
onStatusChange,
);
service.watchCloudTask(
"task-123",
"run-123",
"https://api.anthropic.com",
123,
);

const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock
.calls[0][1] as {
onData: (update: {
kind: "status";
taskId: string;
runId: string;
status: "in_progress";
}) => void;
};
subscribeOptions.onData({
kind: "status",
taskId: "task-123",
runId: "run-123",
status: "in_progress",
});

expect(onStatusChange).toHaveBeenCalledTimes(1);
});

it("hydrates a fresh cloud session from persisted logs before replay arrives", async () => {
const service = getSessionService();
const hydratedSession = createMockSession({
Expand Down Expand Up @@ -2376,7 +2415,7 @@ describe("SessionService", () => {
);
mockTrpcCloudTask.sendCommand.mutate.mockResolvedValue({
success: true,
result: { stopReason: "end_turn" },
result: { queued: true },
});
mockTrpcFs.readFileAsBase64.query.mockResolvedValue("aGVsbG8=");
mockAuthenticatedClient.prepareTaskRunArtifactUploads.mockResolvedValue([
Expand Down Expand Up @@ -2424,8 +2463,16 @@ describe("SessionService", () => {

const result = await service.sendPrompt("task-123", prompt);

expect(result.stopReason).toBe("end_turn");
expect(result.stopReason).toBe("queued");
expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledTimes(1);
expect(mockSessionStoreSetters.appendOptimisticItem).toHaveBeenCalledWith(
"run-123",
expect.objectContaining({
type: "user_message",
content: "read this\n\nAttached files: test.txt",
pinToTop: false,
}),
);

expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledWith(
expect.objectContaining({
Expand Down
Loading
Loading