From f5dc38217f2cd32377ed0751f9abd11f9645ff0d Mon Sep 17 00:00:00 2001 From: KM Date: Mon, 13 Apr 2026 00:12:07 +0900 Subject: [PATCH] fix: improve SSE heartbeat and session retry logic in TUI - Implement 20s heartbeat timeout in TUI SDKProvider - Add 250ms delay between SSE reconnection attempts to avoid tight loops - Add retry limit (10 attempts) to SessionProcessor - Handle retry exhaustion by halting session with a resumeable AbortedError - Trigger re-sync in SyncProvider on server.connected or reconnectToken change --- .../opencode/src/cli/cmd/tui/context/sdk.tsx | 61 +++++++++++++++++-- .../opencode/src/cli/cmd/tui/context/sync.tsx | 12 ++++ packages/opencode/src/session/processor.ts | 36 +++++++---- packages/opencode/src/session/retry.ts | 1 + 4 files changed, 94 insertions(+), 16 deletions(-) diff --git a/packages/opencode/src/cli/cmd/tui/context/sdk.tsx b/packages/opencode/src/cli/cmd/tui/context/sdk.tsx index ad35aa45c28c..2dafb9176111 100644 --- a/packages/opencode/src/cli/cmd/tui/context/sdk.tsx +++ b/packages/opencode/src/cli/cmd/tui/context/sdk.tsx @@ -2,7 +2,7 @@ import { createOpencodeClient } from "@opencode-ai/sdk/v2" import type { GlobalEvent, Event } from "@opencode-ai/sdk/v2" import { createSimpleContext } from "./helper" import { createGlobalEmitter } from "@solid-primitives/event-bus" -import { batch, onCleanup, onMount } from "solid-js" +import { batch, createSignal, onCleanup, onMount } from "solid-js" export type EventSource = { subscribe: (handler: (event: GlobalEvent) => void) => Promise<() => void> @@ -31,6 +31,9 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({ } let sdk = createSDK() + const [connectionState, setConnectionState] = createSignal<"connected" | "reconnecting">("connected") + const [reconnectToken, setReconnectToken] = createSignal(0) + let disconnected = false const emitter = createGlobalEmitter<{ event: GlobalEvent @@ -68,6 +71,12 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({ flush() } + const markDisconnected = () => { + if (abort.signal.aborted || sse?.signal.aborted) return + disconnected = true + setConnectionState("reconnecting") + } + function startSSE() { sse?.abort() const ctrl = new AbortController() @@ -75,15 +84,49 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({ ;(async () => { while (true) { if (abort.signal.aborted || ctrl.signal.aborted) break - const events = await sdk.global.event({ signal: ctrl.signal }) - for await (const event of events.stream) { - if (ctrl.signal.aborted) break - handleEvent(event) + let heartbeatTimer: ReturnType | undefined + const resetHeartbeat = () => { + if (heartbeatTimer) clearTimeout(heartbeatTimer) + heartbeatTimer = setTimeout(() => { + markDisconnected() + }, 20000) + } + + try { + const events = await sdk.global.event({ + signal: ctrl.signal, + onSseError() { + markDisconnected() + }, + }) + + resetHeartbeat() + for await (const event of events.stream) { + resetHeartbeat() + if (ctrl.signal.aborted) break + if ((event.payload.type as string) === "server.heartbeat") continue + + if (connectionState() !== "connected") { + setConnectionState("connected") + if (disconnected) { + disconnected = false + setReconnectToken((value) => value + 1) + } + } + handleEvent(event) + } + } catch { + markDisconnected() + } finally { + if (heartbeatTimer) clearTimeout(heartbeatTimer) } if (timer) clearTimeout(timer) if (queue.length > 0) flush() + + // Small delay before reconnecting to avoid tight loops + await new Promise((resolve) => setTimeout(resolve, 250)) } })().catch(() => {}) } @@ -110,6 +153,14 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({ directory: props.directory, event: emitter, fetch: props.fetch ?? fetch, + connection: { + get state() { + return connectionState() + }, + get reconnectToken() { + return reconnectToken() + }, + }, url: props.url, } }, diff --git a/packages/opencode/src/cli/cmd/tui/context/sync.tsx b/packages/opencode/src/cli/cmd/tui/context/sync.tsx index 498db99a1b78..d0489e584628 100644 --- a/packages/opencode/src/cli/cmd/tui/context/sync.tsx +++ b/packages/opencode/src/cli/cmd/tui/context/sync.tsx @@ -110,6 +110,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ event.subscribe((event) => { switch (event.type) { + case "server.connected": case "server.instance.disposed": bootstrap() break @@ -445,6 +446,17 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ }) } + createEffect( + on( + () => sdk.connection.reconnectToken, + (token) => { + if (token > 0) { + void bootstrap() + } + }, + ), + ) + const fullSyncedSessions = new Set() createEffect( on( diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index be0977c1ddd2..608cfe3b571e 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -1,4 +1,4 @@ -import { Cause, Deferred, Effect, Layer, Context } from "effect" +import { Cause, Deferred, Effect, Layer, Context, Schedule } from "effect" import * as Stream from "effect/Stream" import { Agent } from "@/agent/agent" import { Bus } from "@/bus" @@ -530,12 +530,12 @@ export namespace SessionProcessor { yield* status.set(ctx.sessionID, { type: "idle" }) }) - const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) { - yield* slog.info("process") - ctx.needsCompaction = false - ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true + const process = (streamInput: LLM.StreamInput): Effect.Effect => { + return Effect.gen(function* () { + yield* slog.info("process") + ctx.needsCompaction = false + ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true - return yield* Effect.gen(function* () { yield* Effect.gen(function* () { ctx.currentText = undefined ctx.reasoningMap = {} @@ -569,9 +569,21 @@ export namespace SessionProcessor { message: info.message, next: info.next, }), - }), + }).pipe(Schedule.recurs(SessionRetry.RETRY_MAX_ATTEMPTS)), ), - Effect.catch(halt), + Effect.catchAll((e) => { + const error = parse(e) + if (MessageV2.APIError.isInstance(error) && error.data.isRetryable) { + return halt( + new MessageV2.AbortedError( + { message: "Retry limit reached. Please check your network and continue." }, + { cause: e }, + ), + ) + } + return Effect.fail(e) + }), + Effect.catchAll((e) => halt(e)), Effect.ensuring(cleanup()), ) @@ -579,7 +591,7 @@ export namespace SessionProcessor { if (ctx.blocked || ctx.assistantMessage.error) return "stop" return "continue" }) - }) + } return { get message() { @@ -589,9 +601,11 @@ export namespace SessionProcessor { completeToolCall, process, } satisfies Handle - }) + }) - return Service.of({ create }) + return Service.of({ + create: (input: Input) => create(input) as Effect.Effect, + }) }), ) diff --git a/packages/opencode/src/session/retry.ts b/packages/opencode/src/session/retry.ts index 5ec9a585b021..f61070fa5737 100644 --- a/packages/opencode/src/session/retry.ts +++ b/packages/opencode/src/session/retry.ts @@ -14,6 +14,7 @@ export namespace SessionRetry { export const RETRY_BACKOFF_FACTOR = 2 export const RETRY_MAX_DELAY_NO_HEADERS = 30_000 // 30 seconds export const RETRY_MAX_DELAY = 2_147_483_647 // max 32-bit signed integer for setTimeout + export const RETRY_MAX_ATTEMPTS = 10 // maximum number of retries before giving up function cap(ms: number) { return Math.min(ms, RETRY_MAX_DELAY)