From 7b703a262f92e2894d783dce6d7d3904887ca247 Mon Sep 17 00:00:00 2001 From: Peter Kirkham Date: Thu, 21 May 2026 13:55:12 +0100 Subject: [PATCH 1/4] fix(code): bound cloud session reconnect attempts with cumulative cap MAX_SSE_RECONNECT_ATTEMPTS=5 was defeated by two loopholes that let unhealthy cloud runs reconnect forever without surfacing an error: - `scheduleReconnect` with `countAttempt: false` actively reset the counter to 0, so the clean-EOF path in `handleStreamCompletion` could loop without ever hitting the cap. - A successful SSE event also resets attempts, so a "dribble" of one event per reconnect cycle would never accumulate either. Two changes: 1. `countAttempt: false` now means "don't increment" rather than "reset to 0". 2. New `cumulativeReconnectAttempts` counter bumps on every schedule regardless of `countAttempt` and only resets on user-initiated retry. When it exceeds MAX_CUMULATIVE_RECONNECT_ ATTEMPTS=30 the watcher fails with a clear unrecoverable message so the UI can surface "this run is broken" instead of looping silently. Generated-By: PostHog Code Task-Id: 65171e71-4765-479c-b0c8-e4d0dd5c5bc7 --- .../main/services/cloud-task/service.test.ts | 66 +++++++++++++++++++ .../src/main/services/cloud-task/service.ts | 23 ++++++- 2 files changed, 87 insertions(+), 2 deletions(-) diff --git a/apps/code/src/main/services/cloud-task/service.test.ts b/apps/code/src/main/services/cloud-task/service.test.ts index 643f9f58f..87f70c23b 100644 --- a/apps/code/src/main/services/cloud-task/service.test.ts +++ b/apps/code/src/main/services/cloud-task/service.test.ts @@ -604,6 +604,72 @@ describe("CloudTaskService", () => { ).toBe(true); }); + it("fails the watcher after exhausting the cumulative reconnect budget on clean-EOF loops", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const makeInProgressRun = () => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }); + + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve(makeInProgressRun()); + }); + + // Every stream closes cleanly with no events. Under the previous + // implementation reconnectAttempts was reset to 0 on each clean EOF + // and the watcher would loop forever. + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("")), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + await vi.advanceTimersByTimeAsync(60 * 60_000); + + await waitFor( + () => + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + 10_000, + ); + + expect(updates).toContainEqual({ + taskId: "task-1", + runId: "run-1", + kind: "error", + errorTitle: "Cloud run unreachable", + errorMessage: + "Could not maintain a connection to the cloud run after many attempts. Click retry once the issue is resolved.", + retryable: true, + }); + }); + it("emits a retryable cloud error after repeated stream failures", async () => { vi.useFakeTimers(); diff --git a/apps/code/src/main/services/cloud-task/service.ts b/apps/code/src/main/services/cloud-task/service.ts index 4bafa6776..cb7d71d51 100644 --- a/apps/code/src/main/services/cloud-task/service.ts +++ b/apps/code/src/main/services/cloud-task/service.ts @@ -19,6 +19,7 @@ import { type SseEvent, SseEventParser } from "./sse-parser"; const log = logger.scope("cloud-task"); const MAX_SSE_RECONNECT_ATTEMPTS = 5; +const MAX_CUMULATIVE_RECONNECT_ATTEMPTS = 30; const SSE_RECONNECT_BASE_DELAY_MS = 2_000; const SSE_RECONNECT_MAX_DELAY_MS = 30_000; const SSE_HEALTHY_CONNECTION_MS = 60_000; @@ -91,6 +92,7 @@ interface WatcherState { totalEntryCount: number; reconnectAttempts: number; streamErrorAttempts: number; + cumulativeReconnectAttempts: number; lastEventId: string | null; lastStatus: TaskRunStatus | null; lastStage: string | null; @@ -283,6 +285,7 @@ export class CloudTaskService extends TypedEventEmitter { watcher.reconnectAttempts = 0; watcher.streamErrorAttempts = 0; + watcher.cumulativeReconnectAttempts = 0; watcher.failed = false; watcher.pendingLogEntries = []; watcher.bufferedLogBatches = []; @@ -406,6 +409,7 @@ export class CloudTaskService extends TypedEventEmitter { totalEntryCount: 0, reconnectAttempts: 0, streamErrorAttempts: 0, + cumulativeReconnectAttempts: 0, lastEventId: null, lastStatus: null, lastStage: null, @@ -1016,12 +1020,27 @@ export class CloudTaskService extends TypedEventEmitter { clearTimeout(watcher.reconnectTimeoutId); } + // Every scheduled reconnect burns cumulative budget — clean EOF loops + // would otherwise dodge the per-burst counter (`reconnectAttempts`) + // by setting countAttempt=false and silently retrying forever. + watcher.cumulativeReconnectAttempts += 1; const countAttempt = options.countAttempt ?? true; if (countAttempt) { watcher.reconnectAttempts += 1; - } else { - watcher.reconnectAttempts = 0; } + + if ( + watcher.cumulativeReconnectAttempts > MAX_CUMULATIVE_RECONNECT_ATTEMPTS + ) { + this.failWatcher(key, { + title: "Cloud run unreachable", + message: + "Could not maintain a connection to the cloud run after many attempts. Click retry once the issue is resolved.", + retryable: true, + }); + return; + } + // The watcher fails once either budget is exhausted: transport reconnect // failures or backend stream-error frames. const attemptCount = Math.max( From 831d508ad8fadb459a8d605eee5f45af52d49a4d Mon Sep 17 00:00:00 2001 From: Peter Kirkham Date: Thu, 21 May 2026 13:59:27 +0100 Subject: [PATCH 2/4] refactor(code): reset cumulative reconnect budget on observable progress MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cumulative cap as introduced would also trip on healthy-but-quiet cloud runs whose SSE stream cycles cleanly without events (keepalive- driven close, or the agent simply not emitting between actions). Reset the cumulative counter whenever real progress is observed — either a non-keepalive SSE event in `handleSseEvent`, or a state change detected via the post-stream-completion polling fetch — so only stuck loops with no forward motion get capped. Generated-By: PostHog Code Task-Id: 65171e71-4765-479c-b0c8-e4d0dd5c5bc7 --- apps/code/src/main/services/cloud-task/service.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/code/src/main/services/cloud-task/service.ts b/apps/code/src/main/services/cloud-task/service.ts index cb7d71d51..d4d258758 100644 --- a/apps/code/src/main/services/cloud-task/service.ts +++ b/apps/code/src/main/services/cloud-task/service.ts @@ -764,8 +764,9 @@ export class CloudTaskService extends TypedEventEmitter { } // A real data event proves the stream materialized; clear the backend-error - // budget too. + // and cumulative budgets too. watcher.streamErrorAttempts = 0; + watcher.cumulativeReconnectAttempts = 0; if (isTaskRunStateEvent(event.data)) { if (this.applyTaskRunState(watcher, event.data)) { @@ -1131,6 +1132,10 @@ export class CloudTaskService extends TypedEventEmitter { if (!isTerminalStatus(watcher.lastStatus) && reconnectIfNonTerminal) { if (stateChanged) { + // Server-side progress observed via polling — the run is alive even + // if the SSE stream keeps closing without events. Don't burn the + // cumulative budget while progress is happening. + watcher.cumulativeReconnectAttempts = 0; this.emit(CloudTaskEvent.Update, { taskId: watcher.taskId, runId: watcher.runId, From 94901a53fe0b43c8365ec8fd1b7808bbfaf50f6c Mon Sep 17 00:00:00 2001 From: Peter Kirkham Date: Thu, 21 May 2026 14:43:17 +0100 Subject: [PATCH 3/4] chore(code): trim verbose comments on cloud reconnect budget Generated-By: PostHog Code Task-Id: 65171e71-4765-479c-b0c8-e4d0dd5c5bc7 --- apps/code/src/main/services/cloud-task/service.test.ts | 3 --- apps/code/src/main/services/cloud-task/service.ts | 9 +++------ 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/apps/code/src/main/services/cloud-task/service.test.ts b/apps/code/src/main/services/cloud-task/service.test.ts index 87f70c23b..ab9000a16 100644 --- a/apps/code/src/main/services/cloud-task/service.test.ts +++ b/apps/code/src/main/services/cloud-task/service.test.ts @@ -631,9 +631,6 @@ describe("CloudTaskService", () => { return Promise.resolve(makeInProgressRun()); }); - // Every stream closes cleanly with no events. Under the previous - // implementation reconnectAttempts was reset to 0 on each clean EOF - // and the watcher would loop forever. mockStreamFetch.mockImplementation(() => Promise.resolve(createSseResponse("")), ); diff --git a/apps/code/src/main/services/cloud-task/service.ts b/apps/code/src/main/services/cloud-task/service.ts index d4d258758..59ccd54b2 100644 --- a/apps/code/src/main/services/cloud-task/service.ts +++ b/apps/code/src/main/services/cloud-task/service.ts @@ -1021,9 +1021,8 @@ export class CloudTaskService extends TypedEventEmitter { clearTimeout(watcher.reconnectTimeoutId); } - // Every scheduled reconnect burns cumulative budget — clean EOF loops - // would otherwise dodge the per-burst counter (`reconnectAttempts`) - // by setting countAttempt=false and silently retrying forever. + // Cumulative counter bounds runaway loops that clean-EOF (countAttempt=false) + // and would otherwise dodge `reconnectAttempts`. watcher.cumulativeReconnectAttempts += 1; const countAttempt = options.countAttempt ?? true; if (countAttempt) { @@ -1132,9 +1131,7 @@ export class CloudTaskService extends TypedEventEmitter { if (!isTerminalStatus(watcher.lastStatus) && reconnectIfNonTerminal) { if (stateChanged) { - // Server-side progress observed via polling — the run is alive even - // if the SSE stream keeps closing without events. Don't burn the - // cumulative budget while progress is happening. + // Polled progress proves the run is alive — don't burn cumulative budget. watcher.cumulativeReconnectAttempts = 0; this.emit(CloudTaskEvent.Update, { taskId: watcher.taskId, From 70d06a57b6a3c3f80844050c51f6941201d1f849 Mon Sep 17 00:00:00 2001 From: Peter Kirkham Date: Thu, 21 May 2026 15:36:31 +0100 Subject: [PATCH 4/4] fix(code): reset per-burst reconnect counter on observed progress Polled stateChanged means the run is alive; reset both reconnectAttempts and cumulativeReconnectAttempts so a prior error streak doesn't trip MAX_SSE_RECONNECT_ATTEMPTS on the next single error after progress. Generated-By: PostHog Code Task-Id: 65171e71-4765-479c-b0c8-e4d0dd5c5bc7 --- apps/code/src/main/services/cloud-task/service.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/code/src/main/services/cloud-task/service.ts b/apps/code/src/main/services/cloud-task/service.ts index 59ccd54b2..59716b068 100644 --- a/apps/code/src/main/services/cloud-task/service.ts +++ b/apps/code/src/main/services/cloud-task/service.ts @@ -1131,7 +1131,8 @@ export class CloudTaskService extends TypedEventEmitter { if (!isTerminalStatus(watcher.lastStatus) && reconnectIfNonTerminal) { if (stateChanged) { - // Polled progress proves the run is alive — don't burn cumulative budget. + // Polled progress proves the run is alive — reset both budgets. + watcher.reconnectAttempts = 0; watcher.cumulativeReconnectAttempts = 0; this.emit(CloudTaskEvent.Update, { taskId: watcher.taskId,