Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/ai-autopilot-quality-pass.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@gemstack/ai-autopilot": patch
---

Quality + docs pass for ai-autopilot:

- `Supervisor` now validates its options at construction (`plan` must be a function, `workers` is required, `concurrency`/`maxSubtasks` must be positive integers) and `run()` rejects an empty task, so misconfiguration fails fast with a clear message instead of deep in a planner call.
- An `onEvent` callback that throws is now isolated (logged and swallowed) so an observer bug can no longer abort a supervised run.
- Corrected the `SupervisorRun.usage` docs: it aggregates dispatched-subtask usage only (the `Planner`/`Synthesizer` contracts return data, not usage, so planning/synthesis spend isn't observable).
- Clarified that `maxSubtasks` and `budget` are optional, marked the internal `runPool` helper `@internal`, and added JSDoc examples.
11 changes: 7 additions & 4 deletions packages/ai-autopilot/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const supervisor = new Supervisor({
const run = await supervisor.run('Draft a launch brief for product X')
console.log(run.text) // synthesized answer
console.log(run.results) // per-subtask outcomes (ok / error / usage)
console.log(run.usage) // aggregate token usage
console.log(run.usage) // aggregate token usage across dispatched subtasks
console.log(run.stoppedEarly) // true if a guardrail trimmed or halted work
```

Expand All @@ -42,10 +42,13 @@ Each stage is a plain function, so you mix LLM and deterministic logic freely:

## Guardrails

- **`concurrency`** (default 4) — max workers in flight.
- **`maxSubtasks`** — hard cap; a longer plan is trimmed and `stoppedEarly` is set.
- **`budget.maxTotalTokens`** — stop dispatching once aggregate usage crosses the limit (in-flight workers finish; remaining subtasks are skipped).
- **`concurrency`** (optional, default 4) — max workers in flight; positive integer.
- **`maxSubtasks`** (optional) — hard cap; a longer plan is trimmed and `stoppedEarly` is set. Omit for no cap.
- **`budget.maxTotalTokens`** (optional) — stop dispatching once aggregate dispatch usage crosses the limit (in-flight workers finish; remaining subtasks are skipped). Omit for no limit.
- **Error isolation** — a worker that throws becomes an `ok: false` result; siblings continue.
- **Observer safety** — an `onEvent` callback that throws is logged and swallowed; it never aborts the run.

`Supervisor` validates its options at construction (`plan`, `workers`, positive `concurrency` / `maxSubtasks`), and `run()` rejects an empty task, so misconfiguration fails fast with a clear message.

## Scope (what's deferred)

Expand Down
15 changes: 12 additions & 3 deletions packages/ai-autopilot/src/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,18 @@
*
* Before claiming each item a worker checks `shouldStop`; once it returns true,
* no further items start (in-flight ones finish) and `stopped` is reported. The
* returned `results` are sparse-free and index-aligned to the items that
* actually ran — trailing items skipped by `shouldStop` are simply absent, so
* `results.length < items.length` signals truncation.
* returned `results` are sparse-free and ordered by item index — trailing items
* skipped by `shouldStop` are simply absent, so `results.length < items.length`
* signals truncation.
*
* @internal Not part of the public API; used by {@link Supervisor}. Import is
* not re-exported from the package entry and may change without a major bump.
*
* @param items The work items to process.
* @param limit Max concurrent workers (clamped to `[1, items.length]`).
* @param run Async processor for one item; receives the item and its index.
* @param shouldStop Optional predicate checked before each claim; `true` stops new work.
* @returns `results` (index-ordered, only items that ran) and `stopped` (whether `shouldStop` fired).
*/
export async function runPool<T, R>(
items: readonly T[],
Expand Down
32 changes: 32 additions & 0 deletions packages/ai-autopilot/src/supervisor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,36 @@ describe('Supervisor — plan → dispatch → synthesize', () => {
assert.ok(types.includes(expected), `missing event ${expected}`)
}
})

it('validates options at construction', () => {
const a = new StubAgent(() => ({ text: 'x', usage: usage(1) }))
assert.throws(() => new Supervisor({ workers: a } as never), /requires a `plan` function/)
assert.throws(() => new Supervisor({ plan: () => [] } as never), /requires `workers`/)
assert.throws(() => new Supervisor({ plan: () => [], workers: a, concurrency: 0 }), /concurrency must be a positive integer/)
assert.throws(() => new Supervisor({ plan: () => [], workers: a, maxSubtasks: -1 }), /maxSubtasks must be a positive integer/)
})

it('rejects an empty task', async () => {
const a = new StubAgent(() => ({ text: 'x', usage: usage(1) }))
const sup = new Supervisor({ plan: () => [{ description: 'a' }], workers: a })
await assert.rejects(() => sup.run(' '), /non-empty task/)
})

it('isolates a throwing onEvent callback — the run still completes', async () => {
const a = new StubAgent(() => ({ text: 'x', usage: usage(1) }))
const sup = new Supervisor({
plan: () => [{ description: 'a' }],
workers: a,
onEvent: () => { throw new Error('observer boom') },
})
const originalError = console.error
console.error = () => {} // silence the expected logged warning
try {
const run = await sup.run('t')
assert.equal(run.text, 'x')
assert.equal(run.results[0]?.ok, true)
} finally {
console.error = originalError
}
})
})
40 changes: 38 additions & 2 deletions packages/ai-autopilot/src/supervisor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { defaultSynthesize } from './synthesizer.js'
import type {
PlannedSubtask,
SubtaskResult,
SupervisorEvent,
SupervisorOptions,
SupervisorRun,
WorkerRouter,
Expand Down Expand Up @@ -33,15 +34,35 @@ const ZERO_USAGE: TokenUsage = { promptTokens: 0, completionTokens: 0, totalToke
* })
* const { text } = await supervisor.run('Draft a launch brief for product X')
* ```
*
* Options are validated at construction. An `onEvent` callback that throws is
* isolated (logged-and-swallowed) so an observer bug cannot abort a run.
*/
export class Supervisor {
constructor(private readonly opts: SupervisorOptions) {}
constructor(private readonly opts: SupervisorOptions) {
if (typeof opts?.plan !== 'function') {
throw new TypeError('[ai-autopilot] Supervisor requires a `plan` function')
}
if (opts.workers == null) {
throw new TypeError('[ai-autopilot] Supervisor requires `workers` (an Agent, a Record<string, Agent>, or a WorkerRouter)')
}
if (opts.concurrency !== undefined && (!Number.isInteger(opts.concurrency) || opts.concurrency < 1)) {
throw new RangeError(`[ai-autopilot] concurrency must be a positive integer, got ${opts.concurrency}`)
}
if (opts.maxSubtasks !== undefined && (!Number.isInteger(opts.maxSubtasks) || opts.maxSubtasks < 1)) {
throw new RangeError(`[ai-autopilot] maxSubtasks must be a positive integer, got ${opts.maxSubtasks}`)
}
}

async run(task: string): Promise<SupervisorRun> {
if (!task?.trim()) {
throw new Error('[ai-autopilot] run() requires a non-empty task')
}

const concurrency = this.opts.concurrency ?? 4
const route = resolveRouter(this.opts.workers)
const synthesize = this.opts.synthesize ?? defaultSynthesize
const emit = this.opts.onEvent ?? (() => {})
const emit = makeEmitter(this.opts.onEvent)

let usage: TokenUsage = ZERO_USAGE
let stoppedEarly = false
Expand Down Expand Up @@ -89,6 +110,21 @@ export class Supervisor {

// ─── Internals ───────────────────────────────────────────────────

/**
* Wrap the user's `onEvent` so a throwing callback can never abort a run.
* Observer errors are reported to the console but otherwise swallowed.
*/
function makeEmitter(onEvent: SupervisorOptions['onEvent']): (event: SupervisorEvent) => void {
if (!onEvent) return () => {}
return (event) => {
try {
onEvent(event)
} catch (err) {
console.error('[ai-autopilot] onEvent callback threw; ignoring:', err)
}
}
}

async function runSubtask(route: WorkerRouter, subtask: PlannedSubtask): Promise<SubtaskResult> {
try {
const agent = route(subtask)
Expand Down
11 changes: 10 additions & 1 deletion packages/ai-autopilot/src/synthesizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@ import type { Synthesizer, SubtaskResult } from './types.js'

/**
* The default synthesizer: concatenate the successful results, in plan order,
* separated by blank lines. No LLM call — deterministic and free.
* separated by blank lines. No LLM call — deterministic and free. Failed
* subtasks are omitted; if every subtask failed, the result is an empty string.
*
* @example
* defaultSynthesize('task', [
* { ok: true, text: 'Alpha', ... },
* { ok: false, text: '', ... },
* { ok: true, text: 'Gamma', ... },
* ])
* // => "Alpha\n\nGamma"
*/
export function defaultSynthesize(_task: string, results: SubtaskResult[]): string {
return results
Expand Down
24 changes: 19 additions & 5 deletions packages/ai-autopilot/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ export interface SupervisorRun {
plan: PlannedSubtask[]
/** One result per dispatched subtask, in plan order. */
results: SubtaskResult[]
/** Aggregate token usage across planning, dispatch, and synthesis. */
/**
* Aggregate token usage across the dispatched subtasks. Planning and
* synthesis usage is not included: the `Planner` / `Synthesizer` contracts
* return data, not usage, so the supervisor cannot observe their token spend.
*/
usage: TokenUsage
/** True when a guardrail (subtask cap or token budget) stopped work early. */
stoppedEarly: boolean
Expand Down Expand Up @@ -82,12 +86,22 @@ export interface SupervisorOptions {
* for an LLM synthesis.
*/
synthesize?: Synthesizer
/** Max subtasks dispatched at once. Default 4. */
/** Max subtasks dispatched at once. Positive integer; default 4. */
concurrency?: number
/** Hard cap on subtasks; a longer plan is trimmed (and an event emitted). */
/**
* Optional hard cap on subtasks. A longer plan is trimmed to this many and a
* `plan-trimmed` event is emitted; omit for no cap. Positive integer.
*/
maxSubtasks?: number
/** Stop dispatching once aggregate token usage exceeds `maxTotalTokens`. */
/**
* Optional token guardrail. When `maxTotalTokens` is set, the supervisor stops
* dispatching new subtasks once aggregate dispatch usage crosses it (in-flight
* workers still finish, so usage can overshoot slightly). Omit for no limit.
*/
budget?: { maxTotalTokens?: number }
/** Observe progress. */
/**
* Observe progress events. The callback is isolated: if it throws, the error
* is logged and the run continues, so an observer bug cannot abort the run.
*/
onEvent?: (event: SupervisorEvent) => void
}
Loading