Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 56 additions & 5 deletions packages/opencode/src/cli/cmd/tui/context/sdk.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { createOpencodeClient } from "@opencode-ai/sdk/v2"
import type { GlobalEvent, Event } from "@opencode-ai/sdk/v2"
import { createSimpleContext } from "./helper"
import { createGlobalEmitter } from "@solid-primitives/event-bus"
import { batch, onCleanup, onMount } from "solid-js"
import { batch, createSignal, onCleanup, onMount } from "solid-js"

export type EventSource = {
subscribe: (handler: (event: GlobalEvent) => void) => Promise<() => void>
Expand Down Expand Up @@ -31,6 +31,9 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
}

let sdk = createSDK()
const [connectionState, setConnectionState] = createSignal<"connected" | "reconnecting">("connected")
const [reconnectToken, setReconnectToken] = createSignal(0)
let disconnected = false

const emitter = createGlobalEmitter<{
event: GlobalEvent
Expand Down Expand Up @@ -68,22 +71,62 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
flush()
}

const markDisconnected = () => {
if (abort.signal.aborted || sse?.signal.aborted) return
disconnected = true
setConnectionState("reconnecting")
}

function startSSE() {
sse?.abort()
const ctrl = new AbortController()
sse = ctrl
;(async () => {
while (true) {
if (abort.signal.aborted || ctrl.signal.aborted) break
const events = await sdk.global.event({ signal: ctrl.signal })

for await (const event of events.stream) {
if (ctrl.signal.aborted) break
handleEvent(event)
let heartbeatTimer: ReturnType<typeof setTimeout> | undefined
const resetHeartbeat = () => {
if (heartbeatTimer) clearTimeout(heartbeatTimer)
heartbeatTimer = setTimeout(() => {
markDisconnected()
}, 20000)
}

try {
const events = await sdk.global.event({
signal: ctrl.signal,
onSseError() {
markDisconnected()
},
})

resetHeartbeat()
for await (const event of events.stream) {
resetHeartbeat()
if (ctrl.signal.aborted) break
if ((event.payload.type as string) === "server.heartbeat") continue

if (connectionState() !== "connected") {
setConnectionState("connected")
if (disconnected) {
disconnected = false
setReconnectToken((value) => value + 1)
}
}
handleEvent(event)
}
} catch {
markDisconnected()
} finally {
if (heartbeatTimer) clearTimeout(heartbeatTimer)
}

if (timer) clearTimeout(timer)
if (queue.length > 0) flush()

// Small delay before reconnecting to avoid tight loops
await new Promise((resolve) => setTimeout(resolve, 250))
}
})().catch(() => {})
}
Expand All @@ -110,6 +153,14 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
directory: props.directory,
event: emitter,
fetch: props.fetch ?? fetch,
connection: {
get state() {
return connectionState()
},
get reconnectToken() {
return reconnectToken()
},
},
url: props.url,
}
},
Expand Down
12 changes: 12 additions & 0 deletions packages/opencode/src/cli/cmd/tui/context/sync.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({

event.subscribe((event) => {
switch (event.type) {
case "server.connected":
case "server.instance.disposed":
bootstrap()
break
Expand Down Expand Up @@ -445,6 +446,17 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
})
}

createEffect(
on(
() => sdk.connection.reconnectToken,
(token) => {
if (token > 0) {
void bootstrap()
}
},
),
)

const fullSyncedSessions = new Set<string>()
createEffect(
on(
Expand Down
36 changes: 25 additions & 11 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Cause, Deferred, Effect, Layer, Context } from "effect"
import { Cause, Deferred, Effect, Layer, Context, Schedule } from "effect"
import * as Stream from "effect/Stream"
import { Agent } from "@/agent/agent"
import { Bus } from "@/bus"
Expand Down Expand Up @@ -530,12 +530,12 @@ export namespace SessionProcessor {
yield* status.set(ctx.sessionID, { type: "idle" })
})

const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) {
yield* slog.info("process")
ctx.needsCompaction = false
ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true
const process = (streamInput: LLM.StreamInput): Effect.Effect<Result, never, never> => {
return Effect.gen(function* () {
yield* slog.info("process")
ctx.needsCompaction = false
ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true

return yield* Effect.gen(function* () {
yield* Effect.gen(function* () {
ctx.currentText = undefined
ctx.reasoningMap = {}
Expand Down Expand Up @@ -569,17 +569,29 @@ export namespace SessionProcessor {
message: info.message,
next: info.next,
}),
}),
}).pipe(Schedule.recurs(SessionRetry.RETRY_MAX_ATTEMPTS)),
),
Effect.catch(halt),
Effect.catchAll((e) => {
const error = parse(e)
if (MessageV2.APIError.isInstance(error) && error.data.isRetryable) {
return halt(
new MessageV2.AbortedError(
{ message: "Retry limit reached. Please check your network and continue." },
{ cause: e },
),
)
}
return Effect.fail(e)
}),
Effect.catchAll((e) => halt(e)),
Effect.ensuring(cleanup()),
)

if (ctx.needsCompaction) return "compact"
if (ctx.blocked || ctx.assistantMessage.error) return "stop"
return "continue"
})
})
}

return {
get message() {
Expand All @@ -589,9 +601,11 @@ export namespace SessionProcessor {
completeToolCall,
process,
} satisfies Handle
})
})

return Service.of({ create })
return Service.of({
create: (input: Input) => create(input) as Effect.Effect<Handle, never, never>,
})
}),
)

Expand Down
1 change: 1 addition & 0 deletions packages/opencode/src/session/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export namespace SessionRetry {
export const RETRY_BACKOFF_FACTOR = 2
export const RETRY_MAX_DELAY_NO_HEADERS = 30_000 // 30 seconds
export const RETRY_MAX_DELAY = 2_147_483_647 // max 32-bit signed integer for setTimeout
export const RETRY_MAX_ATTEMPTS = 10 // maximum number of retries before giving up

function cap(ms: number) {
return Math.min(ms, RETRY_MAX_DELAY)
Expand Down
Loading