diff --git a/src/lib/onboard/machine/core-flow-phases.ts b/src/lib/onboard/machine/core-flow-phases.ts index a5cafdeceb..d86848663f 100644 --- a/src/lib/onboard/machine/core-flow-phases.ts +++ b/src/lib/onboard/machine/core-flow-phases.ts @@ -9,12 +9,9 @@ import { type ProviderInferenceStateOptions, } from "./handlers/provider-inference"; import { handleSandboxState, type SandboxStateOptions } from "./handlers/sandbox"; +import { runLiveOnboardFlowSlice } from "./live-flow-slice"; import type { OnboardStateResult } from "./result"; -import type { - OnboardMachineRunnerResult, - OnboardMachineRunnerRuntime, - OnboardStateHandlerResult, -} from "./runner"; +import type { OnboardMachineRunnerResult, OnboardMachineRunnerRuntime } from "./runner"; import type { OnboardSequencePhase } from "./sequence-runner"; export interface CoreOnboardFlowPhaseOptions< @@ -142,11 +139,6 @@ export function createCoreOnboardFlowPhases< return [providerInferencePhase, sandboxPhase]; } -function stateResults(result: OnboardStateHandlerResult): readonly OnboardStateResult[] { - if (Array.isArray(result)) return result as readonly OnboardStateResult[]; - return [result as OnboardStateResult]; -} - export async function runCoreOnboardFlowSlice(options: { context: Context; runtime: OnboardMachineRunnerRuntime; @@ -154,7 +146,6 @@ export async function runCoreOnboardFlowSlice; }): Promise> { - const coreRuntimeSession = await options.runtime.session(); // Compatibility bridge for live host glue while legacy step helpers remain a // second machine snapshot writer. OnboardRuntimeBoundary records skipped // stale/already-reached transition results from handlers whose source state @@ -162,21 +153,14 @@ export async function runCoreOnboardFlowSlice { - const finalRuntimeSession = await options.runtime.session(); // Keep resume and ahead-state sessions on the compatibility path for now. // The persisted invalid states for this slice are "policies", "finalizing", // and "post_verify": a previous run may have advanced `session.machine` @@ -185,27 +180,17 @@ export async function runFinalOnboardFlowSlice { await options.recordStateResult(stateResult); if (isPoliciesAppliedResult(stateResult)) options.afterPoliciesResultApplied?.(); - } - context = phaseResult.context; - options.onContextUpdated?.(context); - } + }, + }); } diff --git a/src/lib/onboard/machine/initial-flow-phases.ts b/src/lib/onboard/machine/initial-flow-phases.ts index 30bc1f827c..051ced46fc 100644 --- a/src/lib/onboard/machine/initial-flow-phases.ts +++ b/src/lib/onboard/machine/initial-flow-phases.ts @@ -13,12 +13,9 @@ import { type PreflightSandboxGpuFlag, type PreflightStateOptions, } from "./handlers/preflight"; +import { runLiveOnboardFlowSlice } from "./live-flow-slice"; import type { OnboardStateResult } from "./result"; -import type { - OnboardMachineRunnerResult, - OnboardMachineRunnerRuntime, - OnboardStateHandlerResult, -} from "./runner"; +import type { OnboardMachineRunnerResult, OnboardMachineRunnerRuntime } from "./runner"; import type { OnboardSequencePhase } from "./sequence-runner"; export type InitialOnboardFlowContext< @@ -57,11 +54,6 @@ export interface InitialOnboardFlowPhaseOptions< spawnSync?: SpawnSync; } -function stateResults(result: OnboardStateHandlerResult): readonly OnboardStateResult[] { - if (Array.isArray(result)) return result as readonly OnboardStateResult[]; - return [result as OnboardStateResult]; -} - function emitPreflightGpuNote(options: { gpu: Gpu | null; sandboxGpuConfig: Config; @@ -198,30 +190,18 @@ export async function runInitialOnboardFlowSlice; }): Promise> { - const initialRuntimeSession = await options.runtime.session(); // Keep resume on the compatibility path for now: resume intentionally re-runs // preflight/gateway backstops even when the saved machine is already ahead. // Remove this fallback only after resume repairs are modeled as strict FSM // transitions that preserve these safety checks before later phases run. - if ( - !options.resume && - (initialRuntimeSession.machine.state === "init" || - initialRuntimeSession.machine.state === "preflight") - ) { - return runInitialOnboardFlowSequence({ - context: options.context, - runtime: options.runtime, - phases: options.phases, - }); - } - - let context = options.context; - for (const phase of options.phases) { - const phaseResult = await phase.run(context); - for (const stateResult of stateResults(phaseResult.result)) { - await options.recordStateResult(stateResult); - } - context = phaseResult.context; - } - return { context, session: await options.runtime.session() }; + return runLiveOnboardFlowSlice({ + context: options.context, + runtime: options.runtime, + phases: options.phases, + resume: options.resume, + runWhenState: ["init", "preflight"], + compatibilityWhenState: ["gateway", "provider_selection"], + runSlice: runInitialOnboardFlowSequence, + applyCompatibleResult: options.recordStateResult, + }); } diff --git a/src/lib/onboard/machine/live-flow-slice.test.ts b/src/lib/onboard/machine/live-flow-slice.test.ts new file mode 100644 index 0000000000..95f71ec153 --- /dev/null +++ b/src/lib/onboard/machine/live-flow-slice.test.ts @@ -0,0 +1,263 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +import { describe, expect, it, vi } from "vitest"; + +import { createSession, type Session } from "../../state/onboard-session"; +import { + EmptyLiveOnboardFlowSliceResultError, + runLiveOnboardFlowSlice, + UnexpectedLiveOnboardFlowSliceStateError, +} from "./live-flow-slice"; +import { advanceTo, type OnboardStateResult } from "./result"; +import type { OnboardMachineRunnerRuntime } from "./runner"; +import { DuplicateOnboardSequencePhaseError, type OnboardSequencePhase } from "./sequence-runner"; + +interface Context { + value: number; +} + +function runtime(state: Session["machine"]["state"]): { + runtime: OnboardMachineRunnerRuntime; + applyResult(result: OnboardStateResult): Promise; + session(): Session; +} { + let session = createSession({ + machine: { version: 1, state, stateEnteredAt: null, revision: 1 }, + }); + const runtimeApi: OnboardMachineRunnerRuntime = { + async session() { + return session; + }, + async applyResult(result) { + if (result.type === "transition") { + session = { + ...session, + machine: { + ...session.machine, + state: result.next, + revision: session.machine.revision + 1, + }, + }; + } + return session; + }, + }; + return { + runtime: runtimeApi, + applyResult(result) { + return runtimeApi.applyResult(result); + }, + session() { + return session; + }, + }; +} + +function phase( + state: OnboardSequencePhase["state"], + next: number, + result: OnboardStateResult | readonly OnboardStateResult[] = advanceTo("gateway"), +): OnboardSequencePhase { + return { + state, + run: vi.fn((context) => ({ + context: { value: next }, + result, + })), + }; +} + +describe("runLiveOnboardFlowSlice", () => { + it("uses the strict slice runner for fresh matching entry states", async () => { + const runSlice = vi.fn(async ({ context }) => ({ + context: { value: context.value + 1 }, + session: createSession(), + })); + const applyCompatibleResult = vi.fn(async () => undefined); + + const result = await runLiveOnboardFlowSlice({ + context: { value: 1 }, + runtime: runtime("preflight").runtime, + phases: [phase("preflight", 2)], + resume: false, + runWhenState: ["preflight"], + compatibilityWhenState: ["provider_selection"], + runSlice, + applyCompatibleResult, + }); + + expect(result.context).toEqual({ value: 2 }); + expect(runSlice).toHaveBeenCalledOnce(); + expect(applyCompatibleResult).not.toHaveBeenCalled(); + }); + + it("applies compatibility results in exact phase order and returns the updated session", async () => { + const liveRuntime = runtime("provider_selection"); + const runSlice = vi.fn(async ({ context }) => ({ context, session: createSession() })); + const results = [ + advanceTo("gateway"), + advanceTo("provider_selection", { metadata: { state: "gateway" } }), + advanceTo("inference", { metadata: { state: "provider_selection" } }), + ]; + const applyCompatibleResult = vi.fn(async (result: OnboardStateResult) => + liveRuntime.applyResult(result), + ); + + const result = await runLiveOnboardFlowSlice({ + context: { value: 1 }, + runtime: liveRuntime.runtime, + phases: [ + phase("preflight", 2, results[0]), + { + state: "gateway", + run: vi.fn((context) => ({ + context: { value: context.value + 1 }, + result: [results[1], results[2]], + })), + }, + ], + resume: true, + runWhenState: ["preflight"], + runSlice, + applyCompatibleResult, + }); + + expect(result.context).toEqual({ value: 3 }); + expect(result.session.machine.state).toBe("inference"); + expect(runSlice).not.toHaveBeenCalled(); + expect(applyCompatibleResult.mock.calls.map(([result]) => result)).toEqual(results); + }); + + it("keeps resume-at-entry flows on compatibility execution", async () => { + const liveRuntime = runtime("preflight"); + const runSlice = vi.fn(async ({ context }) => ({ context, session: createSession() })); + const applyCompatibleResult = vi.fn(async (result: OnboardStateResult) => + liveRuntime.applyResult(result), + ); + + await runLiveOnboardFlowSlice({ + context: { value: 1 }, + runtime: liveRuntime.runtime, + phases: [phase("preflight", 2)], + resume: true, + runWhenState: ["preflight"], + runSlice, + applyCompatibleResult, + }); + + expect(runSlice).not.toHaveBeenCalled(); + expect(applyCompatibleResult).toHaveBeenCalledOnce(); + }); + + it("keeps non-resume ahead-state flows on compatibility execution", async () => { + const liveRuntime = runtime("provider_selection"); + const runSlice = vi.fn(async ({ context }) => ({ context, session: createSession() })); + const applyCompatibleResult = vi.fn(async (result: OnboardStateResult) => + liveRuntime.applyResult(result), + ); + + await runLiveOnboardFlowSlice({ + context: { value: 1 }, + runtime: liveRuntime.runtime, + phases: [phase("preflight", 2)], + resume: false, + runWhenState: ["preflight"], + compatibilityWhenState: ["provider_selection"], + runSlice, + applyCompatibleResult, + }); + + expect(runSlice).not.toHaveBeenCalled(); + expect(applyCompatibleResult).toHaveBeenCalledOnce(); + }); + + it("rejects non-resume states before the slice entry before running side effects", async () => { + const liveRuntime = runtime("init"); + const blocked = phase("provider_selection", 2); + const runSlice = vi.fn(async ({ context }) => ({ context, session: createSession() })); + const applyCompatibleResult = vi.fn(async () => undefined); + + await expect( + runLiveOnboardFlowSlice({ + context: { value: 1 }, + runtime: liveRuntime.runtime, + phases: [blocked], + resume: false, + runWhenState: ["provider_selection"], + compatibilityWhenState: ["inference", "sandbox"], + runSlice, + applyCompatibleResult, + }), + ).rejects.toBeInstanceOf(UnexpectedLiveOnboardFlowSliceStateError); + + expect(runSlice).not.toHaveBeenCalled(); + expect(blocked.run).not.toHaveBeenCalled(); + expect(applyCompatibleResult).not.toHaveBeenCalled(); + }); + + it("rejects duplicate compatibility phases before running side effects", async () => { + const liveRuntime = runtime("provider_selection"); + const first = phase("preflight", 2); + const second = phase("preflight", 3); + const applyCompatibleResult = vi.fn(async () => undefined); + + await expect( + runLiveOnboardFlowSlice({ + context: { value: 1 }, + runtime: liveRuntime.runtime, + phases: [first, second], + resume: true, + runWhenState: ["preflight"], + runSlice: vi.fn(), + applyCompatibleResult, + }), + ).rejects.toBeInstanceOf(DuplicateOnboardSequencePhaseError); + + expect(first.run).not.toHaveBeenCalled(); + expect(second.run).not.toHaveBeenCalled(); + expect(applyCompatibleResult).not.toHaveBeenCalled(); + }); + + it("rejects empty compatibility phase results", async () => { + const liveRuntime = runtime("provider_selection"); + const applyCompatibleResult = vi.fn(async () => undefined); + + await expect( + runLiveOnboardFlowSlice({ + context: { value: 1 }, + runtime: liveRuntime.runtime, + phases: [phase("preflight", 2, [])], + resume: true, + runWhenState: ["preflight"], + runSlice: vi.fn(), + applyCompatibleResult, + }), + ).rejects.toBeInstanceOf(EmptyLiveOnboardFlowSliceResultError); + + expect(applyCompatibleResult).not.toHaveBeenCalled(); + }); + + it("propagates compatibility application failures without running later phases", async () => { + const liveRuntime = runtime("provider_selection"); + const later = phase("gateway", 3); + const applyCompatibleResult = vi.fn(async () => { + throw new Error("compatibility failed"); + }); + + await expect( + runLiveOnboardFlowSlice({ + context: { value: 1 }, + runtime: liveRuntime.runtime, + phases: [phase("preflight", 2), later], + resume: true, + runWhenState: ["preflight"], + runSlice: vi.fn(), + applyCompatibleResult, + }), + ).rejects.toThrow("compatibility failed"); + + expect(applyCompatibleResult).toHaveBeenCalledOnce(); + expect(later.run).not.toHaveBeenCalled(); + }); +}); diff --git a/src/lib/onboard/machine/live-flow-slice.ts b/src/lib/onboard/machine/live-flow-slice.ts new file mode 100644 index 0000000000..78eedd17c9 --- /dev/null +++ b/src/lib/onboard/machine/live-flow-slice.ts @@ -0,0 +1,107 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +import type { OnboardStateResult } from "./result"; +import type { + OnboardMachineRunnerResult, + OnboardMachineRunnerRuntime, + OnboardStateHandlerResult, +} from "./runner"; +import { DuplicateOnboardSequencePhaseError, type OnboardSequencePhase } from "./sequence-runner"; +import type { OnboardMachineState } from "./types"; + +export interface LiveOnboardFlowSliceOptions { + context: Context; + runtime: OnboardMachineRunnerRuntime; + phases: readonly OnboardSequencePhase[]; + resume: boolean; + runWhenState: readonly OnboardMachineState[]; + compatibilityWhenState?: readonly OnboardMachineState[]; + runSlice(options: { + context: Context; + runtime: OnboardMachineRunnerRuntime; + phases: readonly OnboardSequencePhase[]; + }): Promise>; + applyCompatibleResult(result: OnboardStateResult): Promise; +} + +export class EmptyLiveOnboardFlowSliceResultError extends Error { + constructor(readonly state: OnboardSequencePhase["state"]) { + super(`Onboarding live flow phase '${state}' returned no results`); + this.name = "EmptyLiveOnboardFlowSliceResultError"; + } +} + +export class UnexpectedLiveOnboardFlowSliceStateError extends Error { + constructor( + readonly state: OnboardMachineState, + readonly runWhenState: readonly OnboardMachineState[], + readonly compatibilityWhenState: readonly OnboardMachineState[], + ) { + super(`Unexpected onboarding live flow state before slice entry: ${state}`); + this.name = "UnexpectedLiveOnboardFlowSliceStateError"; + } +} + +function assertUniquePhases(phases: readonly OnboardSequencePhase[]): void { + const states = new Set["state"]>(); + for (const phase of phases) { + if (states.has(phase.state)) throw new DuplicateOnboardSequencePhaseError(phase.state); + states.add(phase.state); + } +} + +function asResultArray( + result: OnboardStateHandlerResult, + state: OnboardSequencePhase["state"], +): readonly OnboardStateResult[] { + const results = Array.isArray(result) + ? (result as readonly OnboardStateResult[]) + : [result as OnboardStateResult]; + if (results.length === 0) throw new EmptyLiveOnboardFlowSliceResultError(state); + return results; +} + +/** + * Run a live onboard flow slice through the strict runner when the current + * machine state is exactly at the slice entry point. Resume/ahead-state flows + * use the compatibility path so repair/backstop phase bodies still execute even + * when a saved session has already advanced beyond the slice. Non-resume + * compatibility is limited to caller-declared ahead states so earlier machine + * states fail before running slice side effects out of order. Callers supply the + * compatibility recorder so each live slice keeps using the runtime boundary + * that validates or intentionally skips stale legacy step results. + */ +export async function runLiveOnboardFlowSlice({ + context, + runtime, + phases, + resume, + runWhenState, + compatibilityWhenState = [], + runSlice, + applyCompatibleResult, +}: LiveOnboardFlowSliceOptions): Promise> { + const current = await runtime.session(); + if (!resume && runWhenState.includes(current.machine.state)) { + return runSlice({ context, runtime, phases }); + } + if (!resume && !compatibilityWhenState.includes(current.machine.state)) { + throw new UnexpectedLiveOnboardFlowSliceStateError( + current.machine.state, + runWhenState, + compatibilityWhenState, + ); + } + + assertUniquePhases(phases); + let nextContext = context; + for (const phase of phases) { + const phaseResult = await phase.run(nextContext); + for (const result of asResultArray(phaseResult.result, phase.state)) { + await applyCompatibleResult(result); + } + nextContext = phaseResult.context; + } + return { context: nextContext, session: await runtime.session() }; +}