diff --git a/.changeset/ai-autopilot-quality-pass.md b/.changeset/ai-autopilot-quality-pass.md new file mode 100644 index 0000000..ea0bc3a --- /dev/null +++ b/.changeset/ai-autopilot-quality-pass.md @@ -0,0 +1,10 @@ +--- +"@gemstack/ai-autopilot": patch +--- + +Quality + docs pass for ai-autopilot: + +- `Supervisor` now validates its options at construction (`plan` must be a function, `workers` is required, `concurrency`/`maxSubtasks` must be positive integers) and `run()` rejects an empty task, so misconfiguration fails fast with a clear message instead of deep in a planner call. +- An `onEvent` callback that throws is now isolated (logged and swallowed) so an observer bug can no longer abort a supervised run. +- Corrected the `SupervisorRun.usage` docs: it aggregates dispatched-subtask usage only (the `Planner`/`Synthesizer` contracts return data, not usage, so planning/synthesis spend isn't observable). +- Clarified that `maxSubtasks` and `budget` are optional, marked the internal `runPool` helper `@internal`, and added JSDoc examples. diff --git a/packages/ai-autopilot/README.md b/packages/ai-autopilot/README.md index bd75e2a..7bc193e 100644 --- a/packages/ai-autopilot/README.md +++ b/packages/ai-autopilot/README.md @@ -28,7 +28,7 @@ const supervisor = new Supervisor({ const run = await supervisor.run('Draft a launch brief for product X') console.log(run.text) // synthesized answer console.log(run.results) // per-subtask outcomes (ok / error / usage) -console.log(run.usage) // aggregate token usage +console.log(run.usage) // aggregate token usage across dispatched subtasks console.log(run.stoppedEarly) // true if a guardrail trimmed or halted work ``` @@ -42,10 +42,13 @@ Each stage is a plain function, so you mix LLM and deterministic logic freely: ## Guardrails -- **`concurrency`** (default 4) — max workers in flight. -- **`maxSubtasks`** — hard cap; a longer plan is trimmed and `stoppedEarly` is set. -- **`budget.maxTotalTokens`** — stop dispatching once aggregate usage crosses the limit (in-flight workers finish; remaining subtasks are skipped). +- **`concurrency`** (optional, default 4) — max workers in flight; positive integer. +- **`maxSubtasks`** (optional) — hard cap; a longer plan is trimmed and `stoppedEarly` is set. Omit for no cap. +- **`budget.maxTotalTokens`** (optional) — stop dispatching once aggregate dispatch usage crosses the limit (in-flight workers finish; remaining subtasks are skipped). Omit for no limit. - **Error isolation** — a worker that throws becomes an `ok: false` result; siblings continue. +- **Observer safety** — an `onEvent` callback that throws is logged and swallowed; it never aborts the run. + +`Supervisor` validates its options at construction (`plan`, `workers`, positive `concurrency` / `maxSubtasks`), and `run()` rejects an empty task, so misconfiguration fails fast with a clear message. ## Scope (what's deferred) diff --git a/packages/ai-autopilot/src/pool.ts b/packages/ai-autopilot/src/pool.ts index 8a69b20..f5014df 100644 --- a/packages/ai-autopilot/src/pool.ts +++ b/packages/ai-autopilot/src/pool.ts @@ -3,9 +3,18 @@ * * Before claiming each item a worker checks `shouldStop`; once it returns true, * no further items start (in-flight ones finish) and `stopped` is reported. The - * returned `results` are sparse-free and index-aligned to the items that - * actually ran — trailing items skipped by `shouldStop` are simply absent, so - * `results.length < items.length` signals truncation. + * returned `results` are sparse-free and ordered by item index — trailing items + * skipped by `shouldStop` are simply absent, so `results.length < items.length` + * signals truncation. + * + * @internal Not part of the public API; used by {@link Supervisor}. Import is + * not re-exported from the package entry and may change without a major bump. + * + * @param items The work items to process. + * @param limit Max concurrent workers (clamped to `[1, items.length]`). + * @param run Async processor for one item; receives the item and its index. + * @param shouldStop Optional predicate checked before each claim; `true` stops new work. + * @returns `results` (index-ordered, only items that ran) and `stopped` (whether `shouldStop` fired). */ export async function runPool( items: readonly T[], diff --git a/packages/ai-autopilot/src/supervisor.test.ts b/packages/ai-autopilot/src/supervisor.test.ts index 5e0a9dd..6381c0e 100644 --- a/packages/ai-autopilot/src/supervisor.test.ts +++ b/packages/ai-autopilot/src/supervisor.test.ts @@ -128,4 +128,36 @@ describe('Supervisor — plan → dispatch → synthesize', () => { assert.ok(types.includes(expected), `missing event ${expected}`) } }) + + it('validates options at construction', () => { + const a = new StubAgent(() => ({ text: 'x', usage: usage(1) })) + assert.throws(() => new Supervisor({ workers: a } as never), /requires a `plan` function/) + assert.throws(() => new Supervisor({ plan: () => [] } as never), /requires `workers`/) + assert.throws(() => new Supervisor({ plan: () => [], workers: a, concurrency: 0 }), /concurrency must be a positive integer/) + assert.throws(() => new Supervisor({ plan: () => [], workers: a, maxSubtasks: -1 }), /maxSubtasks must be a positive integer/) + }) + + it('rejects an empty task', async () => { + const a = new StubAgent(() => ({ text: 'x', usage: usage(1) })) + const sup = new Supervisor({ plan: () => [{ description: 'a' }], workers: a }) + await assert.rejects(() => sup.run(' '), /non-empty task/) + }) + + it('isolates a throwing onEvent callback — the run still completes', async () => { + const a = new StubAgent(() => ({ text: 'x', usage: usage(1) })) + const sup = new Supervisor({ + plan: () => [{ description: 'a' }], + workers: a, + onEvent: () => { throw new Error('observer boom') }, + }) + const originalError = console.error + console.error = () => {} // silence the expected logged warning + try { + const run = await sup.run('t') + assert.equal(run.text, 'x') + assert.equal(run.results[0]?.ok, true) + } finally { + console.error = originalError + } + }) }) diff --git a/packages/ai-autopilot/src/supervisor.ts b/packages/ai-autopilot/src/supervisor.ts index 9d0ea71..a5d07fa 100644 --- a/packages/ai-autopilot/src/supervisor.ts +++ b/packages/ai-autopilot/src/supervisor.ts @@ -5,6 +5,7 @@ import { defaultSynthesize } from './synthesizer.js' import type { PlannedSubtask, SubtaskResult, + SupervisorEvent, SupervisorOptions, SupervisorRun, WorkerRouter, @@ -33,15 +34,35 @@ const ZERO_USAGE: TokenUsage = { promptTokens: 0, completionTokens: 0, totalToke * }) * const { text } = await supervisor.run('Draft a launch brief for product X') * ``` + * + * Options are validated at construction. An `onEvent` callback that throws is + * isolated (logged-and-swallowed) so an observer bug cannot abort a run. */ export class Supervisor { - constructor(private readonly opts: SupervisorOptions) {} + constructor(private readonly opts: SupervisorOptions) { + if (typeof opts?.plan !== 'function') { + throw new TypeError('[ai-autopilot] Supervisor requires a `plan` function') + } + if (opts.workers == null) { + throw new TypeError('[ai-autopilot] Supervisor requires `workers` (an Agent, a Record, or a WorkerRouter)') + } + if (opts.concurrency !== undefined && (!Number.isInteger(opts.concurrency) || opts.concurrency < 1)) { + throw new RangeError(`[ai-autopilot] concurrency must be a positive integer, got ${opts.concurrency}`) + } + if (opts.maxSubtasks !== undefined && (!Number.isInteger(opts.maxSubtasks) || opts.maxSubtasks < 1)) { + throw new RangeError(`[ai-autopilot] maxSubtasks must be a positive integer, got ${opts.maxSubtasks}`) + } + } async run(task: string): Promise { + if (!task?.trim()) { + throw new Error('[ai-autopilot] run() requires a non-empty task') + } + const concurrency = this.opts.concurrency ?? 4 const route = resolveRouter(this.opts.workers) const synthesize = this.opts.synthesize ?? defaultSynthesize - const emit = this.opts.onEvent ?? (() => {}) + const emit = makeEmitter(this.opts.onEvent) let usage: TokenUsage = ZERO_USAGE let stoppedEarly = false @@ -89,6 +110,21 @@ export class Supervisor { // ─── Internals ─────────────────────────────────────────────────── +/** + * Wrap the user's `onEvent` so a throwing callback can never abort a run. + * Observer errors are reported to the console but otherwise swallowed. + */ +function makeEmitter(onEvent: SupervisorOptions['onEvent']): (event: SupervisorEvent) => void { + if (!onEvent) return () => {} + return (event) => { + try { + onEvent(event) + } catch (err) { + console.error('[ai-autopilot] onEvent callback threw; ignoring:', err) + } + } +} + async function runSubtask(route: WorkerRouter, subtask: PlannedSubtask): Promise { try { const agent = route(subtask) diff --git a/packages/ai-autopilot/src/synthesizer.ts b/packages/ai-autopilot/src/synthesizer.ts index 61f440c..d9b6639 100644 --- a/packages/ai-autopilot/src/synthesizer.ts +++ b/packages/ai-autopilot/src/synthesizer.ts @@ -3,7 +3,16 @@ import type { Synthesizer, SubtaskResult } from './types.js' /** * The default synthesizer: concatenate the successful results, in plan order, - * separated by blank lines. No LLM call — deterministic and free. + * separated by blank lines. No LLM call — deterministic and free. Failed + * subtasks are omitted; if every subtask failed, the result is an empty string. + * + * @example + * defaultSynthesize('task', [ + * { ok: true, text: 'Alpha', ... }, + * { ok: false, text: '', ... }, + * { ok: true, text: 'Gamma', ... }, + * ]) + * // => "Alpha\n\nGamma" */ export function defaultSynthesize(_task: string, results: SubtaskResult[]): string { return results diff --git a/packages/ai-autopilot/src/types.ts b/packages/ai-autopilot/src/types.ts index e660611..0bddca9 100644 --- a/packages/ai-autopilot/src/types.ts +++ b/packages/ai-autopilot/src/types.ts @@ -34,7 +34,11 @@ export interface SupervisorRun { plan: PlannedSubtask[] /** One result per dispatched subtask, in plan order. */ results: SubtaskResult[] - /** Aggregate token usage across planning, dispatch, and synthesis. */ + /** + * Aggregate token usage across the dispatched subtasks. Planning and + * synthesis usage is not included: the `Planner` / `Synthesizer` contracts + * return data, not usage, so the supervisor cannot observe their token spend. + */ usage: TokenUsage /** True when a guardrail (subtask cap or token budget) stopped work early. */ stoppedEarly: boolean @@ -82,12 +86,22 @@ export interface SupervisorOptions { * for an LLM synthesis. */ synthesize?: Synthesizer - /** Max subtasks dispatched at once. Default 4. */ + /** Max subtasks dispatched at once. Positive integer; default 4. */ concurrency?: number - /** Hard cap on subtasks; a longer plan is trimmed (and an event emitted). */ + /** + * Optional hard cap on subtasks. A longer plan is trimmed to this many and a + * `plan-trimmed` event is emitted; omit for no cap. Positive integer. + */ maxSubtasks?: number - /** Stop dispatching once aggregate token usage exceeds `maxTotalTokens`. */ + /** + * Optional token guardrail. When `maxTotalTokens` is set, the supervisor stops + * dispatching new subtasks once aggregate dispatch usage crosses it (in-flight + * workers still finish, so usage can overshoot slightly). Omit for no limit. + */ budget?: { maxTotalTokens?: number } - /** Observe progress. */ + /** + * Observe progress events. The callback is isolated: if it throws, the error + * is logged and the run continues, so an observer bug cannot abort the run. + */ onEvent?: (event: SupervisorEvent) => void }