diff --git a/apps/code/src/main/services/cloud-task/service.test.ts b/apps/code/src/main/services/cloud-task/service.test.ts index c8a62e56e..643f9f58f 100644 --- a/apps/code/src/main/services/cloud-task/service.test.ts +++ b/apps/code/src/main/services/cloud-task/service.test.ts @@ -672,6 +672,693 @@ describe("CloudTaskService", () => { }); }); + it("clears the backend-error budget after a healthy long-lived cut", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const makeInProgressRun = () => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }); + + mockNetFetch + .mockResolvedValueOnce(makeInProgressRun()) // bootstrap: fetchTaskRun + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ) // bootstrap: fetchSessionLogs + .mockImplementation(() => Promise.resolve(makeInProgressRun())); + + // First connection delivers an explicit backend error frame (accruing the + // backend-error budget). Subsequent connections are healthy long-lived cuts + // (>= SSE_HEALTHY_CONNECTION_MS): each proves the stream recovered and must + // clear the backend-error budget, so it never accumulates for the run's life. + let streamCall = 0; + mockStreamFetch.mockImplementation(() => { + streamCall += 1; + if (streamCall === 1) { + return Promise.resolve( + createSseResponse('event: error\ndata: {"error":"boom"}\n\n'), + ); + } + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode('event: keepalive\ndata: {"type":"keepalive"}\n\n'), + ); + setTimeout(() => controller.error(new Error("terminated")), 65_000); + }, + }); + return Promise.resolve( + new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }), + ); + }); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const getWatcher = () => + ( + service as unknown as { + watchers: Map< + string, + { + reconnectAttempts: number; + streamErrorAttempts: number; + failed: boolean; + } + >; + } + ).watchers.get("task-1:run-1"); + + // The backend error must have accrued the backend-error budget first... + await waitFor(() => (getWatcher()?.streamErrorAttempts ?? 0) >= 1, 20_000); + // ...then the healthy long-lived cut on the next connection clears it. + await vi.advanceTimersByTimeAsync(67_000 * 2); + await waitFor(() => getWatcher()?.streamErrorAttempts === 0, 20_000); + + const watcher = getWatcher(); + expect(watcher?.failed).toBe(false); + expect(watcher?.streamErrorAttempts).toBe(0); + expect(watcher?.reconnectAttempts).toBe(0); + expect( + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + ).toBe(false); + }); + + it("counts quick stream failures and surfaces a retryable error", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const makeInProgressRun = () => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }); + + mockNetFetch + .mockResolvedValueOnce(makeInProgressRun()) + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ) + .mockImplementation(() => Promise.resolve(makeInProgressRun())); + + // Connections that fail immediately (under SSE_HEALTHY_CONNECTION_MS) are + // genuine churn and must keep counting toward the retry budget. + mockStreamFetch.mockImplementation(() => + Promise.resolve( + createSseResponse('event: error\ndata: {"error":"boom"}\n\n'), + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + await vi.advanceTimersByTimeAsync(70_000); + await waitFor( + () => + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + 10_000, + ); + + expect(updates).toContainEqual({ + taskId: "task-1", + runId: "run-1", + kind: "error", + errorTitle: "Cloud stream disconnected", + errorMessage: + "Lost connection to the cloud run stream. Retry to reconnect.", + retryable: true, + }); + }); + + it("stops the watcher without reconnecting once the run is terminal", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + let statusFetchCount = 0; + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + statusFetchCount += 1; + // Bootstrap sees an active run; the post-stream status check sees terminal. + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: statusFetchCount === 1 ? "in_progress" : "completed", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: + statusFetchCount === 1 + ? "2026-01-01T00:00:00Z" + : "2026-01-01T00:00:01Z", + }), + ); + }); + + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("")), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + await vi.advanceTimersByTimeAsync(10_000); + + expect(updates).toContainEqual( + expect.objectContaining({ + taskId: "task-1", + runId: "run-1", + kind: "status", + status: "completed", + }), + ); + expect(mockStreamFetch.mock.calls.length).toBe(1); + expect( + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ), + ).toBe(false); + }); + + it("surfaces a retryable error when the backend errors even on a long-lived stream", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const makeInProgressRun = () => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }); + + mockNetFetch + .mockResolvedValueOnce(makeInProgressRun()) + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ) + .mockImplementation(() => Promise.resolve(makeInProgressRun())); + + // Each connection stays open with a keepalive for 65s (> the healthy + // threshold) and only THEN emits an explicit backend `event: error` frame. + // An explicit backend error must always count toward the budget, so even a + // long-lived stream eventually surfaces the retryable disconnect error. + mockStreamFetch.mockImplementation(() => { + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode('event: keepalive\ndata: {"type":"keepalive"}\n\n'), + ); + setTimeout(() => { + controller.enqueue( + encoder.encode('event: error\ndata: {"error":"boom"}\n\n'), + ); + controller.close(); + }, 65_000); + }, + }); + return Promise.resolve( + new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }), + ); + }); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + // Drive >= 6 long-lived-then-backend-error cycles (65s open + backoff each). + await vi.advanceTimersByTimeAsync(65_000 * 7 + 70_000); + await waitFor( + () => + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + 10_000, + ); + + expect(updates).toContainEqual({ + taskId: "task-1", + runId: "run-1", + kind: "error", + errorTitle: "Cloud stream disconnected", + errorMessage: + "Lost connection to the cloud run stream. Retry to reconnect.", + retryable: true, + }); + }); + + it("treats a long-lived transport cut as healthy even with no frames received", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const makeInProgressRun = () => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }); + + mockNetFetch + .mockResolvedValueOnce(makeInProgressRun()) + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ) + .mockImplementation(() => Promise.resolve(makeInProgressRun())); + + // Each connection opens but delivers NOTHING, then is transport-cut at 65s. + // Healthiness is duration-only on purpose — it must NOT depend on keepalive + // frames surviving the proxy — so even a frame-less long-lived cut is healthy + // and never exhausts the budget. + mockStreamFetch.mockImplementation(() => { + const stream = new ReadableStream({ + start(controller) { + setTimeout(() => controller.error(new Error("terminated")), 65_000); + }, + }); + return Promise.resolve( + new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }), + ); + }); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + await vi.advanceTimersByTimeAsync(67_000 * 8); + await waitFor(() => mockStreamFetch.mock.calls.length >= 6, 20_000); + + expect( + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + ).toBe(false); + + const watcher = ( + service as unknown as { + watchers: Map; + } + ).watchers.get("task-1:run-1"); + expect(watcher?.failed).toBe(false); + expect(watcher?.reconnectAttempts).toBe(0); + }); + + it("resets the transport reconnect budget once a keepalive proves recovery", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const makeInProgressRun = () => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }); + + mockNetFetch + .mockResolvedValueOnce(makeInProgressRun()) + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ) + .mockImplementation(() => Promise.resolve(makeInProgressRun())); + + // First 3 connections fail fast at the transport level (established, then + // errored immediately, no frame) and accrue reconnect attempts. The 4th + // delivers a keepalive and stays open — proving the transport recovered, so + // the accrued attempts must reset rather than carry forward into the budget. + let streamCall = 0; + const keepaliveControllerRef: { + current: ReadableStreamDefaultController | null; + } = { current: null }; + const encoder = new TextEncoder(); + mockStreamFetch.mockImplementation(() => { + streamCall += 1; + if (streamCall <= 3) { + const stream = new ReadableStream({ + start(controller) { + controller.error(new Error("terminated")); + }, + }); + return Promise.resolve( + new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }), + ); + } + // 4th connection stays open with no frame; the test injects the keepalive + // below so it can observe the accrued budget BEFORE the reset. + const stream = new ReadableStream({ + start(controller) { + keepaliveControllerRef.current = controller; + }, + }); + return Promise.resolve( + new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }), + ); + }); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const getWatcher = () => + ( + service as unknown as { + watchers: Map; + } + ).watchers.get("task-1:run-1"); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + // Drive the 3 fast transport failures and open the held 4th connection. + await vi.advanceTimersByTimeAsync(30_000); + await waitFor( + () => streamCall >= 4 && !!keepaliveControllerRef.current, + 20_000, + ); + + // Non-vacuous precondition: the fast failures actually accrued the budget. + expect(getWatcher()?.reconnectAttempts ?? 0).toBeGreaterThan(0); + + // A keepalive on the recovered connection must reset the transport budget. + keepaliveControllerRef.current?.enqueue( + encoder.encode('event: keepalive\ndata: {"type":"keepalive"}\n\n'), + ); + await waitFor(() => getWatcher()?.reconnectAttempts === 0, 20_000); + + const watcher = getWatcher(); + expect(watcher?.failed).toBe(false); + expect(watcher?.reconnectAttempts).toBe(0); + expect( + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + ).toBe(false); + }); + + it("does not let a stale backend-error count inflate a transport reconnect delay", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const makeInProgressRun = () => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }); + + mockNetFetch + .mockResolvedValueOnce(makeInProgressRun()) // bootstrap: fetchTaskRun + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ) // bootstrap: fetchSessionLogs + .mockImplementation(() => Promise.resolve(makeInProgressRun())); + + // Connections 1-4 each emit a backend `event: error` frame, building the + // backend-error budget to 4 — those reconnects correctly pace on + // streamErrorAttempts. Connection 5 is held open until the test injects a + // quick TRANSPORT cut, which must pace its reconnect on the just-incremented + // transport budget (1 -> ~2s), NOT on the stale backend-error budget + // (4 -> ~16s). Math.max(both) for the delay would wrongly use the latter. + let streamCall = 0; + const transportControllerRef: { + current: ReadableStreamDefaultController | null; + } = { current: null }; + mockStreamFetch.mockImplementation(() => { + streamCall += 1; + if (streamCall <= 4) { + return Promise.resolve( + createSseResponse('event: error\ndata: {"error":"boom"}\n\n'), + ); + } + const stream = new ReadableStream({ + start(controller) { + if (streamCall === 5) { + transportControllerRef.current = controller; + } + }, + }); + return Promise.resolve( + new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }), + ); + }); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const getWatcher = () => + ( + service as unknown as { + watchers: Map< + string, + { + reconnectAttempts: number; + streamErrorAttempts: number; + failed: boolean; + } + >; + } + ).watchers.get("task-1:run-1"); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + // Drive the four backend-error reconnects (2s + 4s + 8s + 16s of backoff) + // and open the held fifth connection. + await vi.advanceTimersByTimeAsync(35_000); + await waitFor( + () => streamCall >= 5 && !!transportControllerRef.current, + 20_000, + ); + + // Non-vacuous precondition: the backend-error budget is stale-high while the + // transport budget is still zero. + expect(getWatcher()?.streamErrorAttempts).toBe(4); + expect(getWatcher()?.reconnectAttempts).toBe(0); + expect(getWatcher()?.failed).toBe(false); + + // A quick transport cut on the open fifth connection charges ONE transport + // attempt; its reconnect must wait ~2s (transport budget), not ~16s. + transportControllerRef.current?.error(new Error("terminated")); + await waitFor(() => getWatcher()?.reconnectAttempts === 1, 20_000); + expect(getWatcher()?.streamErrorAttempts).toBe(4); + + const callsBeforeProbe = mockStreamFetch.mock.calls.length; + // 5s is past the fixed ~2s transport backoff but well short of the buggy + // ~16s backend-error backoff, so the sixth connection only opens if the + // delay was paced on the transport budget. + await vi.advanceTimersByTimeAsync(5_000); + expect(mockStreamFetch.mock.calls.length).toBe(callsBeforeProbe + 1); + expect(getWatcher()?.failed).toBe(false); + }); + + it("surfaces an error instead of retrying forever when run-state fetch keeps failing after a clean stream end", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + // Bootstrap succeeds (run + empty backlog); every subsequent run-state + // fetch returns 500 (a non-fatal status -> fetchTaskRun resolves null). + mockNetFetch + .mockResolvedValueOnce( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ) // bootstrap: fetchTaskRun + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ) // bootstrap: fetchSessionLogs + .mockImplementation(() => + Promise.resolve(createJsonResponse({ detail: "boom" }, 500)), + ); + + // First connection is held open so bootstrap can finish; the test then + // closes it cleanly. Every later connection ends cleanly on its own, so the + // only thing that can fail is the post-stream run-state fetch (500). + let streamCall = 0; + const firstControllerRef: { + current: ReadableStreamDefaultController | null; + } = { current: null }; + mockStreamFetch.mockImplementation(() => { + streamCall += 1; + const stream = new ReadableStream({ + start(controller) { + if (streamCall === 1) { + firstControllerRef.current = controller; + } else { + controller.close(); + } + }, + }); + return Promise.resolve( + new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }), + ); + }); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + // Wait for bootstrap to emit its snapshot and hold the live connection open. + await waitFor( + () => + !!firstControllerRef.current && + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "snapshot", + ), + ); + + // Close the live stream cleanly: each clean end now fetches run state, which + // 500s. The reconnect must charge the budget so it eventually gives up. + firstControllerRef.current?.close(); + + // Budget is 5 attempts (2s + 4s + 8s + 16s + 30s + 30s of backoff). + await vi.advanceTimersByTimeAsync(120_000); + await waitFor( + () => + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + 20_000, + ); + + expect(updates).toContainEqual({ + taskId: "task-1", + runId: "run-1", + kind: "error", + errorTitle: "Cloud run state unavailable", + errorMessage: + "Could not fetch the latest cloud run state after the stream ended. Retry to reconnect.", + retryable: true, + }); + }); + const guardedFetchStatusExpectations = [ [ 401, diff --git a/apps/code/src/main/services/cloud-task/service.ts b/apps/code/src/main/services/cloud-task/service.ts index 1fc19b730..4bafa6776 100644 --- a/apps/code/src/main/services/cloud-task/service.ts +++ b/apps/code/src/main/services/cloud-task/service.ts @@ -21,6 +21,7 @@ const log = logger.scope("cloud-task"); const MAX_SSE_RECONNECT_ATTEMPTS = 5; const SSE_RECONNECT_BASE_DELAY_MS = 2_000; const SSE_RECONNECT_MAX_DELAY_MS = 30_000; +const SSE_HEALTHY_CONNECTION_MS = 60_000; const EVENT_BATCH_FLUSH_MS = 16; const EVENT_BATCH_MAX_SIZE = 50; const SESSION_LOG_PAGE_LIMIT = 5_000; @@ -48,6 +49,13 @@ class CloudTaskStreamError extends Error { } } +class BackendStreamError extends Error { + constructor(message: string) { + super(message); + this.name = "BackendStreamError"; + } +} + interface TaskRunResponse { id: string; status: TaskRunStatus; @@ -82,6 +90,7 @@ interface WatcherState { pendingLogEntries: StoredLogEntry[]; totalEntryCount: number; reconnectAttempts: number; + streamErrorAttempts: number; lastEventId: string | null; lastStatus: TaskRunStatus | null; lastStage: string | null; @@ -273,6 +282,7 @@ export class CloudTaskService extends TypedEventEmitter { } watcher.reconnectAttempts = 0; + watcher.streamErrorAttempts = 0; watcher.failed = false; watcher.pendingLogEntries = []; watcher.bufferedLogBatches = []; @@ -395,6 +405,7 @@ export class CloudTaskService extends TypedEventEmitter { pendingLogEntries: [], totalEntryCount: 0, reconnectAttempts: 0, + streamErrorAttempts: 0, lastEventId: null, lastStatus: null, lastStage: null, @@ -606,6 +617,12 @@ export class CloudTaskService extends TypedEventEmitter { const parser = new SseEventParser(); const decoder = new TextDecoder(); + // Tracks whether the response body was opened and how long it stayed open, + // so a long-lived connection cut by transport churn isn't penalized as a + // failed reconnect attempt (see SSE_HEALTHY_CONNECTION_MS). + let connectedAt = 0; + let streamWasEstablished = false; + try { const response = await this.authService.authenticatedFetch( fetch, @@ -625,6 +642,9 @@ export class CloudTaskService extends TypedEventEmitter { throw new Error("Stream response did not include a body"); } + connectedAt = Date.now(); + streamWasEstablished = true; + const reader = response.body.getReader(); while (true) { @@ -673,14 +693,32 @@ export class CloudTaskService extends TypedEventEmitter { const errorMessage = error instanceof Error ? error.message : "Unknown stream error"; + + const isBackendError = error instanceof BackendStreamError; + const wasHealthyStream = + !isBackendError && + streamWasEstablished && + Date.now() - connectedAt >= SSE_HEALTHY_CONNECTION_MS; + + const watcher = this.watchers.get(key); + if (watcher) { + if (isBackendError) { + watcher.streamErrorAttempts += 1; + } else if (wasHealthyStream) { + watcher.streamErrorAttempts = 0; + } + } + log.warn("Cloud task stream error", { key, error: errorMessage, + wasHealthyStream, + isBackendError, }); await this.handleStreamCompletion(key, { reconnectIfNonTerminal: true, reconnectError: error, - countReconnectAttempt: true, + countReconnectAttempt: !isBackendError && !wasHealthyStream, }); } finally { const currentWatcher = this.watchers.get(key); @@ -702,9 +740,15 @@ export class CloudTaskService extends TypedEventEmitter { const message = isSseErrorEvent(event.data) ? event.data.error : "Unknown stream error"; - throw new Error(message); + throw new BackendStreamError(message); } + // A keepalive or real event proves the transport recovered, so clear the + // transport reconnect budget. A keepalive stops here: it does NOT clear the + // backend-error budget, since it doesn't prove the stream itself produced + // data. + watcher.reconnectAttempts = 0; + if ( event.event === "keepalive" || (typeof event.data === "object" && @@ -715,7 +759,9 @@ export class CloudTaskService extends TypedEventEmitter { return; } - watcher.reconnectAttempts = 0; + // A real data event proves the stream materialized; clear the backend-error + // budget too. + watcher.streamErrorAttempts = 0; if (isTaskRunStateEvent(event.data)) { if (this.applyTaskRunState(watcher, event.data)) { @@ -976,7 +1022,13 @@ export class CloudTaskService extends TypedEventEmitter { } else { watcher.reconnectAttempts = 0; } - if (watcher.reconnectAttempts > MAX_SSE_RECONNECT_ATTEMPTS) { + // The watcher fails once either budget is exhausted: transport reconnect + // failures or backend stream-error frames. + const attemptCount = Math.max( + watcher.reconnectAttempts, + watcher.streamErrorAttempts, + ); + if (attemptCount > MAX_SSE_RECONNECT_ATTEMPTS) { const details = error instanceof CloudTaskStreamError ? error.details @@ -990,9 +1042,12 @@ export class CloudTaskService extends TypedEventEmitter { return; } + const backoffAttempts = + error instanceof BackendStreamError + ? watcher.streamErrorAttempts + : watcher.reconnectAttempts; const delay = Math.min( - SSE_RECONNECT_BASE_DELAY_MS * - 2 ** Math.max(watcher.reconnectAttempts - 1, 0), + SSE_RECONNECT_BASE_DELAY_MS * 2 ** Math.max(backoffAttempts - 1, 0), SSE_RECONNECT_MAX_DELAY_MS, ); @@ -1048,6 +1103,7 @@ export class CloudTaskService extends TypedEventEmitter { "Could not fetch the latest cloud run state after the stream ended. Retry to reconnect.", retryable: true, }), + { countAttempt: options.countReconnectAttempt ?? true }, ); return; }