From 87ea3667e4e61a4c7043171f203b59ae2347ef96 Mon Sep 17 00:00:00 2001 From: Sisyphus-AI Date: Sat, 25 Apr 2026 21:19:58 +0200 Subject: [PATCH] feat(processor): add model fallback chain when retries are exhausted When the primary model fails after all retries, automatically try the next model from the fallback chain. This adds: - fallbackModels field to MessageV2.User schema - resolveFallbackChain helper to Provider service - outer fallback loop in SessionProcessor.process() - error classification: only retryable errors trigger fallback - No fallback for: AuthError, AbortedError, non-retryable APIError - Fallback for: rate limits, 5xx, other transient errors --- packages/opencode/src/provider/provider.ts | 20 +- packages/opencode/src/session/message-v2.ts | 8 + packages/opencode/src/session/processor.ts | 169 +++++++++---- packages/opencode/test/fake/provider.ts | 15 +- .../opencode/test/session/fallback.test.ts | 222 ++++++++++++++++++ 5 files changed, 387 insertions(+), 47 deletions(-) create mode 100644 packages/opencode/test/session/fallback.test.ts diff --git a/packages/opencode/src/provider/provider.ts b/packages/opencode/src/provider/provider.ts index d826f6b35050..7f22a7a23a96 100644 --- a/packages/opencode/src/provider/provider.ts +++ b/packages/opencode/src/provider/provider.ts @@ -19,7 +19,7 @@ import { iife } from "@/util/iife" import { Global } from "../global" import path from "path" import { pathToFileURL } from "url" -import { Effect, Layer, Context, Schema, Types } from "effect" +import { Effect, Exit, Layer, Context, Schema, Types } from "effect" import { EffectBridge } from "@/effect" import { InstanceState } from "@/effect" import { AppFileSystem } from "@opencode-ai/shared/filesystem" @@ -933,6 +933,9 @@ export interface Interface { ) => Effect.Effect<{ providerID: ProviderID; modelID: string } | undefined> readonly getSmallModel: (providerID: ProviderID) => Effect.Effect readonly defaultModel: () => Effect.Effect<{ providerID: ProviderID; modelID: ModelID }> + readonly resolveFallbackChain: ( + chain: Array<{ providerID: ProviderID; modelID: ModelID }>, + ) => Effect.Effect<{ model: Model; remaining: Array<{ providerID: ProviderID; modelID: ModelID }> } | undefined> } interface State { @@ -1680,7 +1683,20 @@ const layer: Layer.Layer< } }) - return Service.of({ list, getProvider, getModel, getLanguage, closest, getSmallModel, defaultModel }) + const resolveFallbackChain = Effect.fn("Provider.resolveFallbackChain")(function* ( + chain: Array<{ providerID: ProviderID; modelID: ModelID }>, + ) { + for (let i = 0; i < chain.length; i++) { + const { providerID, modelID } = chain[i] + const exit = yield* getModel(providerID, modelID).pipe(Effect.exit) + if (Exit.isSuccess(exit)) { + return { model: exit.value, remaining: chain.slice(i + 1) } + } + } + return undefined + }) + + return Service.of({ list, getProvider, getModel, getLanguage, closest, getSmallModel, defaultModel, resolveFallbackChain }) }), ) diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index d04645b7360c..2f734a7905c0 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -391,6 +391,14 @@ export const User = Schema.Struct({ }), system: Schema.optional(Schema.String), tools: Schema.optional(Schema.Record(Schema.String, Schema.Boolean)), + fallbackModels: Schema.optional( + Schema.Array( + Schema.Struct({ + providerID: ProviderID, + modelID: ModelID, + }), + ), + ), }) .annotate({ identifier: "UserMessage" }) .pipe(withStatics((s) => ({ zod: zod(s) }))) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 21f9329c6fce..5e39e23595d4 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -1,4 +1,4 @@ -import { Cause, Deferred, Effect, Layer, Context, Scope } from "effect" +import { Cause, Deferred, Effect, Exit, Layer, Context, Scope } from "effect" import * as Stream from "effect/Stream" import { Agent } from "@/agent/agent" import { Bus } from "@/bus" @@ -15,7 +15,8 @@ import type { SessionID } from "./schema" import { SessionRetry } from "./retry" import { SessionStatus } from "./status" import { SessionSummary } from "./summary" -import type { Provider } from "@/provider" +import { Provider } from "@/provider" +import { ModelID, ProviderID } from "@/provider/schema" import { Question } from "@/question" import { errorMessage } from "@/util/error" import { Log } from "@/util" @@ -71,6 +72,8 @@ interface ProcessorContext extends Input { needsCompaction: boolean currentText: MessageV2.TextPart | undefined reasoningMap: Record + shouldFallback: boolean + fallbackChain: Array<{ providerID: ProviderID; modelID: ModelID }> } type StreamEvent = Event @@ -90,6 +93,7 @@ export const layer: Layer.Layer< | Plugin.Service | SessionSummary.Service | SessionStatus.Service + | Provider.Service > = Layer.effect( Service, Effect.gen(function* () { @@ -104,6 +108,7 @@ export const layer: Layer.Layer< const summary = yield* SessionSummary.Service const scope = yield* Scope.Scope const status = yield* SessionStatus.Service + const provider = yield* Provider.Service const create = Effect.fn("SessionProcessor.create")(function* (input: Input) { // Pre-capture snapshot before the LLM stream starts. The AI SDK @@ -121,6 +126,8 @@ export const layer: Layer.Layer< needsCompaction: false, currentText: undefined, reasoningMap: {}, + shouldFallback: false, + fallbackChain: [], } let aborted = false const slog = log.clone().tag("session.id", input.sessionID).tag("messageID", input.assistantMessage.id) @@ -520,70 +527,146 @@ export const layer: Layer.Layer< yield* session.updateMessage(ctx.assistantMessage) }) - const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown) { + const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown, streamInput: LLM.StreamInput) { slog.error("process", { error: errorMessage(e), stack: e instanceof Error ? e.stack : undefined }) const error = parse(e) + + // Context overflow triggers compaction, not fallback if (MessageV2.ContextOverflowError.isInstance(error)) { ctx.needsCompaction = true yield* bus.publish(Session.Event.Error, { sessionID: ctx.sessionID, error }) return } + + // Auth errors (bad credentials) should NOT fallback - they'll fail again + if (MessageV2.AuthError.isInstance(error)) { + ctx.assistantMessage.error = error + yield* bus.publish(Session.Event.Error, { + sessionID: ctx.assistantMessage.sessionID, + error: ctx.assistantMessage.error, + }) + yield* status.set(ctx.sessionID, { type: "idle" }) + return + } + + // AbortedError (user cancelled) should NOT fallback + if (MessageV2.AbortedError.isInstance(error)) { + ctx.assistantMessage.error = error + yield* bus.publish(Session.Event.Error, { + sessionID: ctx.assistantMessage.sessionID, + error: ctx.assistantMessage.error, + }) + yield* status.set(ctx.sessionID, { type: "idle" }) + return + } + + // APIError with isRetryable=false should NOT fallback (permanent failures) + if (MessageV2.APIError.isInstance(error) && !error.data.isRetryable) { + ctx.assistantMessage.error = error + yield* bus.publish(Session.Event.Error, { + sessionID: ctx.assistantMessage.sessionID, + error: ctx.assistantMessage.error, + }) + yield* status.set(ctx.sessionID, { type: "idle" }) + return + } + ctx.assistantMessage.error = error yield* bus.publish(Session.Event.Error, { sessionID: ctx.assistantMessage.sessionID, error: ctx.assistantMessage.error, }) yield* status.set(ctx.sessionID, { type: "idle" }) + + // Signal that we should try a fallback model if one is available + const fallbackChain = streamInput.user.fallbackModels + if (fallbackChain && fallbackChain.length > 0) { + ctx.shouldFallback = true + ctx.fallbackChain = [...fallbackChain] + } }) const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) { slog.info("process") ctx.needsCompaction = false + ctx.shouldFallback = false + ctx.fallbackChain = [] ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true - return yield* Effect.gen(function* () { - yield* Effect.gen(function* () { - ctx.currentText = undefined - ctx.reasoningMap = {} - const stream = llm.stream(streamInput) + // Fallback loop - iterate through models when retries are exhausted + let currentStreamInput = streamInput - yield* stream.pipe( - Stream.tap((event) => handleEvent(event)), - Stream.takeUntil(() => ctx.needsCompaction), - Stream.runDrain, + return yield* Effect.gen(function* () { + while (true) { + yield* Effect.gen(function* () { + ctx.currentText = undefined + ctx.reasoningMap = {} + const stream = llm.stream(currentStreamInput) + + yield* stream.pipe( + Stream.tap((event) => handleEvent(event)), + Stream.takeUntil(() => ctx.needsCompaction), + Stream.runDrain, + ) + }).pipe( + Effect.onInterrupt(() => + Effect.gen(function* () { + aborted = true + if (!ctx.assistantMessage.error) { + yield* halt(new DOMException("Aborted", "AbortError"), currentStreamInput) + } + }), + ), + Effect.catchCauseIf( + (cause) => !Cause.hasInterruptsOnly(cause), + (cause) => Effect.fail(Cause.squash(cause)), + ), + Effect.retry( + SessionRetry.policy({ + parse, + set: (info) => + status.set(ctx.sessionID, { + type: "retry", + attempt: info.attempt, + message: info.message, + next: info.next, + }), + }), + ), + Effect.catch((e: unknown) => halt(e, currentStreamInput)), + Effect.ensuring(cleanup()), ) - }).pipe( - Effect.onInterrupt(() => - Effect.gen(function* () { - aborted = true - if (!ctx.assistantMessage.error) { - yield* halt(new DOMException("Aborted", "AbortError")) + + // Check if we need to try a fallback model + if (ctx.shouldFallback && ctx.fallbackChain.length > 0) { + const resolved = yield* provider.resolveFallbackChain(ctx.fallbackChain) + + if (resolved) { + // Successfully resolved a fallback model, update stream input and continue + currentStreamInput = { + ...currentStreamInput, + model: resolved.model, + user: { + ...currentStreamInput.user, + fallbackModels: resolved.remaining, + }, } - }), - ), - Effect.catchCauseIf( - (cause) => !Cause.hasInterruptsOnly(cause), - (cause) => Effect.fail(Cause.squash(cause)), - ), - Effect.retry( - SessionRetry.policy({ - parse, - set: (info) => - status.set(ctx.sessionID, { - type: "retry", - attempt: info.attempt, - message: info.message, - next: info.next, - }), - }), - ), - Effect.catch(halt), - Effect.ensuring(cleanup()), - ) - - if (ctx.needsCompaction) return "compact" - if (ctx.blocked || ctx.assistantMessage.error) return "stop" - return "continue" + // Reset fallback state for next attempt + ctx.shouldFallback = false + ctx.fallbackChain = [] + ctx.assistantMessage.error = undefined + continue + } + + // No more fallbacks available or all failed to resolve + return "stop" as const + } + + // Normal flow - check result + if (ctx.needsCompaction) return "compact" as const + if (ctx.blocked || ctx.assistantMessage.error) return "stop" as const + return "continue" as const + } }) }) diff --git a/packages/opencode/test/fake/provider.ts b/packages/opencode/test/fake/provider.ts index bfb185a4b1bf..a9eb2c0de58a 100644 --- a/packages/opencode/test/fake/provider.ts +++ b/packages/opencode/test/fake/provider.ts @@ -70,8 +70,19 @@ export namespace ProviderTest { getSmallModel: Effect.fn("TestProvider.getSmallModel")((providerID) => Effect.succeed(providerID === row.id ? mdl : undefined), ), - defaultModel: Effect.fn("TestProvider.defaultModel")(() => - Effect.succeed({ providerID: row.id, modelID: mdl.id }), + resolveFallbackChain: Effect.fn("TestProvider.resolveFallbackChain")((chain) => + Effect.gen(function* () { + for (let i = 0; i < chain.length; i++) { + const { providerID, modelID } = chain[i] + if (providerID === row.id && modelID === mdl.id) { + return { model: mdl, remaining: chain.slice(i + 1) } + } + } + return undefined + }), + ), + resolveFallbackChain: Effect.fn("TestProvider.resolveFallbackChain")((chain) => + Effect.succeed(chain[0] ? { model: mdl, remaining: chain.slice(1) } : undefined), ), ...override, }), diff --git a/packages/opencode/test/session/fallback.test.ts b/packages/opencode/test/session/fallback.test.ts new file mode 100644 index 000000000000..7e0ab044a693 --- /dev/null +++ b/packages/opencode/test/session/fallback.test.ts @@ -0,0 +1,222 @@ +import { expect } from "bun:test" +import { Effect, Layer } from "effect" +import type { Agent } from "../../src/agent/agent" +import { Agent as AgentSvc } from "../../src/agent/agent" +import { Bus } from "../../src/bus" +import { Config } from "../../src/config" +import { Permission } from "../../src/permission" +import { Plugin } from "../../src/plugin" +import { Provider } from "../../src/provider" +import { ModelID, ProviderID } from "../../src/provider/schema" +import { Session } from "../../src/session" +import { LLM } from "../../src/session/llm" +import { MessageV2 } from "../../src/session/message-v2" +import { SessionProcessor } from "../../src/session/processor" +import { MessageID, PartID, SessionID } from "../../src/session/schema" +import { SessionStatus } from "../../src/session/status" +import { SessionSummary } from "../../src/session/summary" +import { Snapshot } from "../../src/snapshot" +import { Log } from "../../src/util" +import { NodeFileSystem } from "@effect/platform-node" +import path from "path" +import { provideTmpdirServer } from "../fixture/fixture" +import { testEffect } from "../lib/effect" +import { TestLLMServer } from "../lib/llm-server" +import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner" + +void Log.init({ print: false }) + +const summary = Layer.succeed( + SessionSummary.Service, + SessionSummary.Service.of({ + summarize: () => Effect.void, + diff: () => Effect.succeed([]), + computeDiff: () => Effect.succeed([]), + }), +) + +const ref = { + providerID: ProviderID.make("test"), + modelID: ModelID.make("test-model"), +} + +const fallbackRef = { + providerID: ProviderID.make("test"), + modelID: ModelID.make("fallback-model"), +} + +function agent(): Agent.Info { + return { + name: "build", + mode: "primary", + options: {}, + permission: [{ permission: "*", pattern: "*", action: "allow" }], + } +} + +const user = Effect.fn("TestSession.user")(function* (sessionID: SessionID, text: string) { + const session = yield* Session.Service + const msg = yield* session.updateMessage({ + id: MessageID.ascending(), + role: "user", + sessionID, + agent: "build", + model: ref, + time: { created: Date.now() }, + }) + yield* session.updatePart({ + id: PartID.ascending(), + messageID: msg.id, + sessionID, + type: "text", + text, + }) + return msg +}) + +const assistant = Effect.fn("TestSession.assistant")(function* ( + sessionID: SessionID, + parentID: MessageID, + root: string, +) { + const session = yield* Session.Service + const msg: MessageV2.Assistant = { + id: MessageID.ascending(), + role: "assistant", + sessionID, + mode: "build", + agent: "build", + path: { cwd: root, root }, + cost: 0, + tokens: { + total: 0, + input: 0, + output: 0, + reasoning: 0, + cache: { read: 0, write: 0 }, + }, + modelID: ref.modelID, + providerID: ref.providerID, + parentID, + time: { created: Date.now() }, + finish: "end_turn", + } + yield* session.updateMessage(msg) + return msg +}) + +const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer)) +const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer) +const deps = Layer.mergeAll( + Session.defaultLayer, + Snapshot.defaultLayer, + AgentSvc.defaultLayer, + Permission.defaultLayer, + Plugin.defaultLayer, + Config.defaultLayer, + LLM.defaultLayer, + Provider.defaultLayer, + status, +).pipe(Layer.provideMerge(infra)) +const env = Layer.mergeAll( + TestLLMServer.layer, + SessionProcessor.layer.pipe(Layer.provide(summary), Layer.provideMerge(deps)), +) + +const it = testEffect(env) + +const boot = Effect.fn("test.boot")(function* () { + const processors = yield* SessionProcessor.Service + const session = yield* Session.Service + const provider = yield* Provider.Service + return { processors, session, provider } +}) + +// Test: Auth error should NOT trigger fallback +it.live("session.processor fallback: auth error does not fallback", () => + provideTmpdirServer( + ({ dir, llm }) => + Effect.gen(function* () { + const { processors, session, provider } = yield* boot() + + yield* llm.error(401, { error: { message: "Unauthorized", type: "authentication_error" } }) + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "hi") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + }) + + const input = { + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + fallbackModels: [{ providerID: fallbackRef.providerID, modelID: fallbackRef.modelID }], + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + messages: [{ role: "user", content: "hi" }], + tools: {}, + } satisfies LLM.StreamInput + + const value = yield* handle.process(input) + expect(value).toBe("stop") + expect(handle.message.error?.name).toBe("ProviderAuthError") + }), + { git: true, config: () => ({}) }, + ), +) + +it.live("session.processor fallback: non-retryable error does not fallback", () => + provideTmpdirServer( + ({ dir, llm }) => + Effect.gen(function* () { + const { processors, session, provider } = yield* boot() + + yield* llm.error(400, { error: { message: "bad request", code: "invalid_request" } }) + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "non-retryable") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + }) + + const input = { + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + fallbackModels: [{ providerID: fallbackRef.providerID, modelID: fallbackRef.modelID }], + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + messages: [{ role: "user", content: "non-retryable" }], + tools: {}, + } satisfies LLM.StreamInput + + const value = yield* handle.process(input) + expect(value).toBe("stop") + expect(handle.message.error).toBeDefined() + }), + { git: true, config: () => ({}) }, + ), +)