Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions packages/opencode/src/provider/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { iife } from "@/util/iife"
import { Global } from "../global"
import path from "path"
import { pathToFileURL } from "url"
import { Effect, Layer, Context, Schema, Types } from "effect"
import { Effect, Exit, Layer, Context, Schema, Types } from "effect"
import { EffectBridge } from "@/effect"
import { InstanceState } from "@/effect"
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
Expand Down Expand Up @@ -933,6 +933,9 @@ export interface Interface {
) => Effect.Effect<{ providerID: ProviderID; modelID: string } | undefined>
readonly getSmallModel: (providerID: ProviderID) => Effect.Effect<Model | undefined>
readonly defaultModel: () => Effect.Effect<{ providerID: ProviderID; modelID: ModelID }>
readonly resolveFallbackChain: (
chain: Array<{ providerID: ProviderID; modelID: ModelID }>,
) => Effect.Effect<{ model: Model; remaining: Array<{ providerID: ProviderID; modelID: ModelID }> } | undefined>
}

interface State {
Expand Down Expand Up @@ -1680,7 +1683,20 @@ const layer: Layer.Layer<
}
})

return Service.of({ list, getProvider, getModel, getLanguage, closest, getSmallModel, defaultModel })
const resolveFallbackChain = Effect.fn("Provider.resolveFallbackChain")(function* (
chain: Array<{ providerID: ProviderID; modelID: ModelID }>,
) {
for (let i = 0; i < chain.length; i++) {
const { providerID, modelID } = chain[i]
const exit = yield* getModel(providerID, modelID).pipe(Effect.exit)
if (Exit.isSuccess(exit)) {
return { model: exit.value, remaining: chain.slice(i + 1) }
}
}
return undefined
})

return Service.of({ list, getProvider, getModel, getLanguage, closest, getSmallModel, defaultModel, resolveFallbackChain })
}),
)

Expand Down
8 changes: 8 additions & 0 deletions packages/opencode/src/session/message-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,14 @@ export const User = Schema.Struct({
}),
system: Schema.optional(Schema.String),
tools: Schema.optional(Schema.Record(Schema.String, Schema.Boolean)),
fallbackModels: Schema.optional(
Schema.Array(
Schema.Struct({
providerID: ProviderID,
modelID: ModelID,
}),
),
),
})
.annotate({ identifier: "UserMessage" })
.pipe(withStatics((s) => ({ zod: zod(s) })))
Expand Down
169 changes: 126 additions & 43 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Cause, Deferred, Effect, Layer, Context, Scope } from "effect"
import { Cause, Deferred, Effect, Exit, Layer, Context, Scope } from "effect"
import * as Stream from "effect/Stream"
import { Agent } from "@/agent/agent"
import { Bus } from "@/bus"
Expand All @@ -15,7 +15,8 @@ import type { SessionID } from "./schema"
import { SessionRetry } from "./retry"
import { SessionStatus } from "./status"
import { SessionSummary } from "./summary"
import type { Provider } from "@/provider"
import { Provider } from "@/provider"
import { ModelID, ProviderID } from "@/provider/schema"
import { Question } from "@/question"
import { errorMessage } from "@/util/error"
import { Log } from "@/util"
Expand Down Expand Up @@ -71,6 +72,8 @@ interface ProcessorContext extends Input {
needsCompaction: boolean
currentText: MessageV2.TextPart | undefined
reasoningMap: Record<string, MessageV2.ReasoningPart>
shouldFallback: boolean
fallbackChain: Array<{ providerID: ProviderID; modelID: ModelID }>
}

type StreamEvent = Event
Expand All @@ -90,6 +93,7 @@ export const layer: Layer.Layer<
| Plugin.Service
| SessionSummary.Service
| SessionStatus.Service
| Provider.Service
> = Layer.effect(
Service,
Effect.gen(function* () {
Expand All @@ -104,6 +108,7 @@ export const layer: Layer.Layer<
const summary = yield* SessionSummary.Service
const scope = yield* Scope.Scope
const status = yield* SessionStatus.Service
const provider = yield* Provider.Service

const create = Effect.fn("SessionProcessor.create")(function* (input: Input) {
// Pre-capture snapshot before the LLM stream starts. The AI SDK
Expand All @@ -121,6 +126,8 @@ export const layer: Layer.Layer<
needsCompaction: false,
currentText: undefined,
reasoningMap: {},
shouldFallback: false,
fallbackChain: [],
}
let aborted = false
const slog = log.clone().tag("session.id", input.sessionID).tag("messageID", input.assistantMessage.id)
Expand Down Expand Up @@ -520,70 +527,146 @@ export const layer: Layer.Layer<
yield* session.updateMessage(ctx.assistantMessage)
})

const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown) {
const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown, streamInput: LLM.StreamInput) {
slog.error("process", { error: errorMessage(e), stack: e instanceof Error ? e.stack : undefined })
const error = parse(e)

// Context overflow triggers compaction, not fallback
if (MessageV2.ContextOverflowError.isInstance(error)) {
ctx.needsCompaction = true
yield* bus.publish(Session.Event.Error, { sessionID: ctx.sessionID, error })
return
}

// Auth errors (bad credentials) should NOT fallback - they'll fail again
if (MessageV2.AuthError.isInstance(error)) {
ctx.assistantMessage.error = error
yield* bus.publish(Session.Event.Error, {
sessionID: ctx.assistantMessage.sessionID,
error: ctx.assistantMessage.error,
})
yield* status.set(ctx.sessionID, { type: "idle" })
return
}

// AbortedError (user cancelled) should NOT fallback
if (MessageV2.AbortedError.isInstance(error)) {
ctx.assistantMessage.error = error
yield* bus.publish(Session.Event.Error, {
sessionID: ctx.assistantMessage.sessionID,
error: ctx.assistantMessage.error,
})
yield* status.set(ctx.sessionID, { type: "idle" })
return
}

// APIError with isRetryable=false should NOT fallback (permanent failures)
if (MessageV2.APIError.isInstance(error) && !error.data.isRetryable) {
ctx.assistantMessage.error = error
yield* bus.publish(Session.Event.Error, {
sessionID: ctx.assistantMessage.sessionID,
error: ctx.assistantMessage.error,
})
yield* status.set(ctx.sessionID, { type: "idle" })
return
}

ctx.assistantMessage.error = error
yield* bus.publish(Session.Event.Error, {
sessionID: ctx.assistantMessage.sessionID,
error: ctx.assistantMessage.error,
})
yield* status.set(ctx.sessionID, { type: "idle" })

// Signal that we should try a fallback model if one is available
const fallbackChain = streamInput.user.fallbackModels
if (fallbackChain && fallbackChain.length > 0) {
ctx.shouldFallback = true
ctx.fallbackChain = [...fallbackChain]
}
})

const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) {
slog.info("process")
ctx.needsCompaction = false
ctx.shouldFallback = false
ctx.fallbackChain = []
ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true

return yield* Effect.gen(function* () {
yield* Effect.gen(function* () {
ctx.currentText = undefined
ctx.reasoningMap = {}
const stream = llm.stream(streamInput)
// Fallback loop - iterate through models when retries are exhausted
let currentStreamInput = streamInput

yield* stream.pipe(
Stream.tap((event) => handleEvent(event)),
Stream.takeUntil(() => ctx.needsCompaction),
Stream.runDrain,
return yield* Effect.gen(function* () {
while (true) {
yield* Effect.gen(function* () {
ctx.currentText = undefined
ctx.reasoningMap = {}
const stream = llm.stream(currentStreamInput)

yield* stream.pipe(
Stream.tap((event) => handleEvent(event)),
Stream.takeUntil(() => ctx.needsCompaction),
Stream.runDrain,
)
}).pipe(
Effect.onInterrupt(() =>
Effect.gen(function* () {
aborted = true
if (!ctx.assistantMessage.error) {
yield* halt(new DOMException("Aborted", "AbortError"), currentStreamInput)
}
}),
),
Effect.catchCauseIf(
(cause) => !Cause.hasInterruptsOnly(cause),
(cause) => Effect.fail(Cause.squash(cause)),
),
Effect.retry(
SessionRetry.policy({
parse,
set: (info) =>
status.set(ctx.sessionID, {
type: "retry",
attempt: info.attempt,
message: info.message,
next: info.next,
}),
}),
),
Effect.catch((e: unknown) => halt(e, currentStreamInput)),
Effect.ensuring(cleanup()),
)
}).pipe(
Effect.onInterrupt(() =>
Effect.gen(function* () {
aborted = true
if (!ctx.assistantMessage.error) {
yield* halt(new DOMException("Aborted", "AbortError"))

// Check if we need to try a fallback model
if (ctx.shouldFallback && ctx.fallbackChain.length > 0) {
const resolved = yield* provider.resolveFallbackChain(ctx.fallbackChain)

if (resolved) {
// Successfully resolved a fallback model, update stream input and continue
currentStreamInput = {
...currentStreamInput,
model: resolved.model,
user: {
...currentStreamInput.user,
fallbackModels: resolved.remaining,
},
}
}),
),
Effect.catchCauseIf(
(cause) => !Cause.hasInterruptsOnly(cause),
(cause) => Effect.fail(Cause.squash(cause)),
),
Effect.retry(
SessionRetry.policy({
parse,
set: (info) =>
status.set(ctx.sessionID, {
type: "retry",
attempt: info.attempt,
message: info.message,
next: info.next,
}),
}),
),
Effect.catch(halt),
Effect.ensuring(cleanup()),
)

if (ctx.needsCompaction) return "compact"
if (ctx.blocked || ctx.assistantMessage.error) return "stop"
return "continue"
// Reset fallback state for next attempt
ctx.shouldFallback = false
ctx.fallbackChain = []
ctx.assistantMessage.error = undefined
continue
}

// No more fallbacks available or all failed to resolve
return "stop" as const
}

// Normal flow - check result
if (ctx.needsCompaction) return "compact" as const
if (ctx.blocked || ctx.assistantMessage.error) return "stop" as const
return "continue" as const
}
})
})

Expand Down
15 changes: 13 additions & 2 deletions packages/opencode/test/fake/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,19 @@ export namespace ProviderTest {
getSmallModel: Effect.fn("TestProvider.getSmallModel")((providerID) =>
Effect.succeed(providerID === row.id ? mdl : undefined),
),
defaultModel: Effect.fn("TestProvider.defaultModel")(() =>
Effect.succeed({ providerID: row.id, modelID: mdl.id }),
resolveFallbackChain: Effect.fn("TestProvider.resolveFallbackChain")((chain) =>
Effect.gen(function* () {
for (let i = 0; i < chain.length; i++) {
const { providerID, modelID } = chain[i]
if (providerID === row.id && modelID === mdl.id) {
return { model: mdl, remaining: chain.slice(i + 1) }
}
}
return undefined
}),
),
resolveFallbackChain: Effect.fn("TestProvider.resolveFallbackChain")((chain) =>
Effect.succeed(chain[0] ? { model: mdl, remaining: chain.slice(1) } : undefined),
),
...override,
}),
Expand Down
Loading
Loading