From 861957a5270f4bd2d012939a521ddb238d7b75ed Mon Sep 17 00:00:00 2001 From: Suleiman Shahbari Date: Fri, 26 Jun 2026 02:25:29 +0300 Subject: [PATCH] =?UTF-8?q?feat(ai-autopilot):=20add=20@gemstack/ai-autopi?= =?UTF-8?q?lot=20=E2=80=94=20Supervisor=20orchestration?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Third package of the AI family (#9). The orchestration layer: runs many agent runs under a control policy, above ai-sdk's single-agent loop. Seed slice = the supervisor/worker topology (plan -> dispatch -> synthesize), the smallest thing clearly more than the asTool/handoff primitives: - Supervisor: planner decomposes a task into subtasks; dispatch runs each on a worker agent with bounded concurrency, an optional token budget, and per-subtask error isolation; synthesizer combines the results. - agentPlanner(agent): LLM decomposition via ai-sdk Output.array. - agentSynthesizer(agent) / defaultSynthesize: LLM or deterministic combine. - Pluggable stages (plan/workers/synthesize), guardrails (concurrency, maxSubtasks, budget.maxTotalTokens), progress events. Scope boundary kept sharp: if a feature just calls an ai-sdk primitive it stays in ai-sdk; autopilot only owns topology/control/lifecycle. Autonomous workers in v1; durable pause/resume + more topologies deferred behind optional seams. One-directional (ai-autopilot -> ai-sdk). 18 tests across pool/supervisor/ planner/synthesizer. Closes #9 --- .changeset/ai-autopilot-initial.md | 12 ++ packages/ai-autopilot/README.md | 56 +++++++ packages/ai-autopilot/package.json | 57 +++++++ packages/ai-autopilot/src/index.ts | 28 ++++ packages/ai-autopilot/src/planner.test.ts | 46 ++++++ packages/ai-autopilot/src/planner.ts | 39 +++++ packages/ai-autopilot/src/pool.test.ts | 46 ++++++ packages/ai-autopilot/src/pool.ts | 39 +++++ packages/ai-autopilot/src/supervisor.test.ts | 131 +++++++++++++++ packages/ai-autopilot/src/supervisor.ts | 149 ++++++++++++++++++ packages/ai-autopilot/src/synthesizer.test.ts | 52 ++++++ packages/ai-autopilot/src/synthesizer.ts | 34 ++++ packages/ai-autopilot/src/types.ts | 93 +++++++++++ packages/ai-autopilot/tsconfig.build.json | 6 + packages/ai-autopilot/tsconfig.json | 5 + packages/ai-autopilot/tsconfig.test.json | 5 + pnpm-lock.yaml | 16 ++ 17 files changed, 814 insertions(+) create mode 100644 .changeset/ai-autopilot-initial.md create mode 100644 packages/ai-autopilot/README.md create mode 100644 packages/ai-autopilot/package.json create mode 100644 packages/ai-autopilot/src/index.ts create mode 100644 packages/ai-autopilot/src/planner.test.ts create mode 100644 packages/ai-autopilot/src/planner.ts create mode 100644 packages/ai-autopilot/src/pool.test.ts create mode 100644 packages/ai-autopilot/src/pool.ts create mode 100644 packages/ai-autopilot/src/supervisor.test.ts create mode 100644 packages/ai-autopilot/src/supervisor.ts create mode 100644 packages/ai-autopilot/src/synthesizer.test.ts create mode 100644 packages/ai-autopilot/src/synthesizer.ts create mode 100644 packages/ai-autopilot/src/types.ts create mode 100644 packages/ai-autopilot/tsconfig.build.json create mode 100644 packages/ai-autopilot/tsconfig.json create mode 100644 packages/ai-autopilot/tsconfig.test.json diff --git a/.changeset/ai-autopilot-initial.md b/.changeset/ai-autopilot-initial.md new file mode 100644 index 0000000..05fc075 --- /dev/null +++ b/.changeset/ai-autopilot-initial.md @@ -0,0 +1,12 @@ +--- +"@gemstack/ai-autopilot": minor +--- + +Initial release. Orchestration for `@gemstack/ai-sdk` agents — the control-policy layer over many agent runs. Seed slice: the supervisor/worker topology. + +- `Supervisor` — **plan → dispatch → synthesize**: decompose a task into subtasks, dispatch each to a worker agent (bounded concurrency, optional token budget, per-subtask error isolation), and synthesize the results. +- `agentPlanner(agent)` — turn a planning agent into a `Planner` via `ai-sdk`'s `Output.array` (JSON subtask decomposition). +- `agentSynthesizer(agent)` / `defaultSynthesize` — combine subtask results (LLM pass, or deterministic concatenation). +- Pluggable stages (`plan` / `workers` / `synthesize`), guardrails (`concurrency`, `maxSubtasks`, `budget.maxTotalTokens`), and progress events. + +Scope boundary: `ai-sdk` owns the single-agent loop + handoff/subagent primitives; `ai-autopilot` owns orchestrating multiple runs under a policy. The seed runs autonomous workers; durable pause/resume, more topologies, and queue-backed execution are deferred behind optional seams. Depends on `@gemstack/ai-sdk`. diff --git a/packages/ai-autopilot/README.md b/packages/ai-autopilot/README.md new file mode 100644 index 0000000..bd75e2a --- /dev/null +++ b/packages/ai-autopilot/README.md @@ -0,0 +1,56 @@ +# @gemstack/ai-autopilot + +Orchestration for [`@gemstack/ai-sdk`](https://github.com/gemstack-land/gemstack/tree/main/packages/ai-sdk) agents — the "director" layer that runs **many** agent runs under a control policy. + +`ai-sdk` owns the single-agent loop and the handoff / subagent primitives. `ai-autopilot` owns orchestrating multiple runs: which agents run, in what order, how their results combine, and when to stop. If a feature is just calling an `ai-sdk` primitive, it belongs in `ai-sdk` — autopilot earns its keep only as the topology / control-policy layer. + +## The seed: Supervisor (plan → dispatch → synthesize) + +The first slice is the supervisor/worker topology — the smallest thing clearly more than the primitives: + +1. **Plan** — a planner decomposes the task into subtasks. +2. **Dispatch** — each subtask runs on a worker agent, with bounded concurrency, an optional token budget, and per-subtask error isolation. +3. **Synthesize** — a synthesizer combines the results into the final answer. + +```ts +import { Supervisor, agentPlanner, agentSynthesizer } from '@gemstack/ai-autopilot' + +const supervisor = new Supervisor({ + plan: agentPlanner(plannerAgent), // LLM decomposition + workers: { research: researchAgent, write: writerAgent }, // routed by subtask.worker + synthesize: agentSynthesizer(editorAgent), // LLM synthesis + concurrency: 3, + maxSubtasks: 8, + budget: { maxTotalTokens: 200_000 }, + onEvent: (e) => console.log(e.type), +}) + +const run = await supervisor.run('Draft a launch brief for product X') +console.log(run.text) // synthesized answer +console.log(run.results) // per-subtask outcomes (ok / error / usage) +console.log(run.usage) // aggregate token usage +console.log(run.stoppedEarly) // true if a guardrail trimmed or halted work +``` + +## Pieces are pluggable + +Each stage is a plain function, so you mix LLM and deterministic logic freely: + +- **`plan`** — a `Planner`: `(task) => Subtask[]`. Use `agentPlanner(agent)` for LLM decomposition, or return a static list. +- **`workers`** — a single `Agent` (all subtasks), a `Record` (routed by `subtask.worker`), or a `WorkerRouter` function. +- **`synthesize`** — a `Synthesizer`: `(task, results) => string`. Defaults to `defaultSynthesize` (concatenate successes, no LLM call); pass `agentSynthesizer(agent)` for an LLM pass. + +## Guardrails + +- **`concurrency`** (default 4) — max workers in flight. +- **`maxSubtasks`** — hard cap; a longer plan is trimmed and `stoppedEarly` is set. +- **`budget.maxTotalTokens`** — stop dispatching once aggregate usage crosses the limit (in-flight workers finish; remaining subtasks are skipped). +- **Error isolation** — a worker that throws becomes an `ok: false` result; siblings continue. + +## Scope (what's deferred) + +The seed dispatches **autonomous** workers via `agent.prompt()`. A worker that pauses for a client-tool or approval round-trip is reported as a failed subtask — durable pause/resume across a supervised run (building on `ai-sdk`'s `SubAgentRunStore` + resume primitives) is a deferred adapter, as are other topologies (pipelines, debate) and queue-backed long-running execution. Those land on demand, behind optional seams, not in the core. + +## License + +MIT diff --git a/packages/ai-autopilot/package.json b/packages/ai-autopilot/package.json new file mode 100644 index 0000000..40e5d43 --- /dev/null +++ b/packages/ai-autopilot/package.json @@ -0,0 +1,57 @@ +{ + "name": "@gemstack/ai-autopilot", + "version": "0.0.0", + "description": "Orchestration for @gemstack/ai-sdk agents: a Supervisor that plans, dispatches subagents (bounded concurrency + budget guardrails), and synthesizes the result.", + "keywords": [ + "ai", + "agent", + "agents", + "orchestration", + "multi-agent", + "supervisor", + "autonomy", + "planning", + "gemstack" + ], + "license": "MIT", + "homepage": "https://github.com/gemstack-land/gemstack/tree/main/packages/ai-autopilot#readme", + "bugs": { + "url": "https://github.com/gemstack-land/gemstack/issues" + }, + "repository": { + "type": "git", + "url": "https://github.com/gemstack-land/gemstack", + "directory": "packages/ai-autopilot" + }, + "type": "module", + "engines": { + "node": ">=22.12.0" + }, + "files": [ + "dist" + ], + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "exports": { + ".": { + "import": "./dist/index.js", + "types": "./dist/index.d.ts" + } + }, + "scripts": { + "build": "tsc -p tsconfig.build.json", + "dev": "tsc -p tsconfig.build.json --watch", + "typecheck": "tsc --noEmit", + "test": "tsc -p tsconfig.test.json && cd dist-test && node --test", + "clean": "rm -rf dist" + }, + "dependencies": { + "@gemstack/ai-sdk": "workspace:^", + "zod": "^4.0.0" + }, + "devDependencies": { + "@types/node": "^20.0.0", + "typescript": "^5.4.0" + }, + "author": "Suleiman Shahbari" +} diff --git a/packages/ai-autopilot/src/index.ts b/packages/ai-autopilot/src/index.ts new file mode 100644 index 0000000..5d76f63 --- /dev/null +++ b/packages/ai-autopilot/src/index.ts @@ -0,0 +1,28 @@ +/** + * `@gemstack/ai-autopilot` — orchestration for `@gemstack/ai-sdk` agents. + * + * The seed slice is the supervisor/worker topology: {@link Supervisor} plans a + * task into subtasks, dispatches them to worker agents (bounded concurrency + + * token budget + per-subtask error isolation), and synthesizes the result. + * + * Autopilot owns the *control policy* over many agent runs; `ai-sdk` owns the + * single-agent loop and the handoff / subagent primitives the policy builds on. + * + * - {@link Supervisor} — the plan → dispatch → synthesize orchestrator + * - {@link agentPlanner} — turn a planning agent into a {@link Planner} + * - {@link agentSynthesizer} / {@link defaultSynthesize} — combine results + */ +export { Supervisor } from './supervisor.js' +export { agentPlanner, type AgentPlannerOptions } from './planner.js' +export { agentSynthesizer, defaultSynthesize } from './synthesizer.js' +export type { + Subtask, + PlannedSubtask, + SubtaskResult, + SupervisorRun, + SupervisorOptions, + SupervisorEvent, + Planner, + WorkerRouter, + Synthesizer, +} from './types.js' diff --git a/packages/ai-autopilot/src/planner.test.ts b/packages/ai-autopilot/src/planner.test.ts new file mode 100644 index 0000000..a966857 --- /dev/null +++ b/packages/ai-autopilot/src/planner.test.ts @@ -0,0 +1,46 @@ +import { describe, it } from 'node:test' +import assert from 'node:assert/strict' +import { AiFake, agent, getMessageText } from '@gemstack/ai-sdk' +import { agentPlanner } from './planner.js' + +describe('agentPlanner', () => { + it('prompts the agent and parses a JSON subtask array', async () => { + const fake = AiFake.fake() + try { + fake.respondWith('[{"description":"research the market"},{"description":"draft copy","worker":"writer"}]') + const plan = agentPlanner(agent('You plan.')) + const subtasks = await plan('Launch product X') + assert.deepEqual(subtasks, [ + { description: 'research the market' }, + { description: 'draft copy', worker: 'writer' }, + ]) + } finally { + fake.restore() + } + }) + + it('tolerates a fenced JSON code block', async () => { + const fake = AiFake.fake() + try { + fake.respondWith('```json\n[{"description":"only task"}]\n```') + const subtasks = await agentPlanner(agent('plan'))('do it') + assert.deepEqual(subtasks, [{ description: 'only task' }]) + } finally { + fake.restore() + } + }) + + it('puts the schema instruction and the task into the planning prompt', async () => { + const fake = AiFake.fake() + try { + fake.respondWith('[]') + await agentPlanner(agent('plan'))('summarize the quarterly numbers') + const call = fake.getCalls()[0]! + const text = call.messages.map(m => getMessageText(m.content)).join('\n') + assert.match(text, /JSON array/) + assert.match(text, /summarize the quarterly numbers/) + } finally { + fake.restore() + } + }) +}) diff --git a/packages/ai-autopilot/src/planner.ts b/packages/ai-autopilot/src/planner.ts new file mode 100644 index 0000000..59791ff --- /dev/null +++ b/packages/ai-autopilot/src/planner.ts @@ -0,0 +1,39 @@ +import { Output } from '@gemstack/ai-sdk' +import type { Agent } from '@gemstack/ai-sdk' +import { z } from 'zod' +import type { Planner, Subtask } from './types.js' + +/** Default subtask shape an LLM planner emits. */ +const defaultSubtaskSchema = z.object({ + description: z.string().describe('What this subtask asks a worker agent to do'), + worker: z.string().optional().describe('Worker pool key, when routing to named workers'), +}) + +export interface AgentPlannerOptions { + /** + * Zod schema for one subtask. Must produce at least `{ description: string }` + * (and optionally `worker`). Defaults to that shape. + */ + element?: z.ZodType + /** Override the planning instruction prepended to the task. */ + instructions?: string +} + +/** + * Build a {@link Planner} that asks an agent to decompose the task into a JSON + * array of subtasks, using `@gemstack/ai-sdk`'s `Output.array` for the schema + * instruction + parsing. The agent is your planning policy; autopilot orchestrates + * the subtasks it returns. + */ +export function agentPlanner(agent: Agent, opts: AgentPlannerOptions = {}): Planner { + const element = opts.element ?? defaultSubtaskSchema + const output = Output.array({ element }) + const instructions = opts.instructions + ?? 'Break the task below into the smallest set of independent subtasks that can run in parallel. Each subtask is dispatched to a worker agent.' + + return async (task) => { + const prompt = `${instructions}\n\n# Task\n${task}\n\n${output.toSystemPrompt()}` + const response = await agent.prompt(prompt) + return output.parse(response.text ?? '') as Subtask[] + } +} diff --git a/packages/ai-autopilot/src/pool.test.ts b/packages/ai-autopilot/src/pool.test.ts new file mode 100644 index 0000000..c07bb98 --- /dev/null +++ b/packages/ai-autopilot/src/pool.test.ts @@ -0,0 +1,46 @@ +import { describe, it } from 'node:test' +import assert from 'node:assert/strict' +import { runPool } from './pool.js' + +const tick = () => new Promise(r => setTimeout(r, 1)) + +describe('runPool', () => { + it('runs every item and preserves input order in results', async () => { + const { results, stopped } = await runPool([1, 2, 3, 4], 2, async (n) => { + await tick() + return n * 10 + }) + assert.deepEqual(results, [10, 20, 30, 40]) + assert.equal(stopped, false) + }) + + it('never exceeds the concurrency limit', async () => { + let inFlight = 0 + let peak = 0 + await runPool(Array.from({ length: 10 }, (_, i) => i), 3, async (n) => { + inFlight++ + peak = Math.max(peak, inFlight) + await tick() + inFlight-- + return n + }) + assert.ok(peak <= 3, `peak concurrency ${peak} exceeded 3`) + }) + + it('stops claiming new items once shouldStop flips, and reports stopped', async () => { + let done = 0 + const { results, stopped } = await runPool([1, 2, 3, 4, 5], 1, async (n) => { + done++ + return n + }, () => done >= 2) + assert.equal(stopped, true) + assert.equal(results.length, 2) // only the first two ran + assert.deepEqual(results, [1, 2]) + }) + + it('handles an empty list', async () => { + const { results, stopped } = await runPool([], 4, async () => 1) + assert.deepEqual(results, []) + assert.equal(stopped, false) + }) +}) diff --git a/packages/ai-autopilot/src/pool.ts b/packages/ai-autopilot/src/pool.ts new file mode 100644 index 0000000..8a69b20 --- /dev/null +++ b/packages/ai-autopilot/src/pool.ts @@ -0,0 +1,39 @@ +/** + * Run `items` through `run` with at most `limit` in flight at once. + * + * Before claiming each item a worker checks `shouldStop`; once it returns true, + * no further items start (in-flight ones finish) and `stopped` is reported. The + * returned `results` are sparse-free and index-aligned to the items that + * actually ran — trailing items skipped by `shouldStop` are simply absent, so + * `results.length < items.length` signals truncation. + */ +export async function runPool( + items: readonly T[], + limit: number, + run: (item: T, index: number) => Promise, + shouldStop?: () => boolean, +): Promise<{ results: R[]; stopped: boolean }> { + const out: Array<{ index: number; value: R }> = [] + let next = 0 + let stopped = false + + const workers = Math.max(1, Math.min(limit, items.length)) + + async function worker(): Promise { + while (true) { + if (shouldStop?.()) { + stopped = true + return + } + const index = next++ + if (index >= items.length) return + const value = await run(items[index]!, index) + out.push({ index, value }) + } + } + + await Promise.all(Array.from({ length: workers }, () => worker())) + + out.sort((a, b) => a.index - b.index) + return { results: out.map(o => o.value), stopped } +} diff --git a/packages/ai-autopilot/src/supervisor.test.ts b/packages/ai-autopilot/src/supervisor.test.ts new file mode 100644 index 0000000..5e0a9dd --- /dev/null +++ b/packages/ai-autopilot/src/supervisor.test.ts @@ -0,0 +1,131 @@ +import { describe, it } from 'node:test' +import assert from 'node:assert/strict' +import { Agent } from '@gemstack/ai-sdk' +import type { AgentResponse, TokenUsage } from '@gemstack/ai-sdk' +import { Supervisor } from './supervisor.js' +import type { SupervisorEvent } from './types.js' + +const usage = (t: number): TokenUsage => ({ promptTokens: t, completionTokens: 0, totalTokens: t }) + +/** A worker agent whose `prompt()` is fully scripted — no provider involved. */ +class StubAgent extends Agent { + constructor(private readonly fn: (input: string) => Partial) { + super() + } + instructions(): string { + return 'stub' + } + override async prompt(input: string): Promise { + return { text: '', steps: [], usage: usage(0), finishReason: 'stop', ...this.fn(input) } + } +} + +describe('Supervisor — plan → dispatch → synthesize', () => { + it('runs the happy path with a single worker and the default synthesizer', async () => { + const echo = new StubAgent((input) => ({ text: `did: ${input}`, usage: usage(10) })) + const sup = new Supervisor({ + plan: () => [{ description: 'a' }, { description: 'b' }], + workers: echo, + }) + const run = await sup.run('task') + + assert.deepEqual(run.plan.map(p => p.id), ['subtask-1', 'subtask-2']) + assert.ok(run.results.every(r => r.ok)) + assert.equal(run.text, 'did: a\n\ndid: b') + assert.equal(run.usage.totalTokens, 20) + assert.equal(run.stoppedEarly, false) + }) + + it('routes subtasks to a named worker pool by subtask.worker', async () => { + const research = new StubAgent(() => ({ text: 'researched', usage: usage(5) })) + const writer = new StubAgent(() => ({ text: 'written', usage: usage(5) })) + const sup = new Supervisor({ + plan: () => [{ description: 'x', worker: 'research' }, { description: 'y', worker: 'write' }], + workers: { research, write: writer }, + }) + const run = await sup.run('t') + assert.deepEqual(run.results.map(r => r.text), ['researched', 'written']) + }) + + it('isolates a failing worker — siblings still complete', async () => { + const ok = new StubAgent(() => ({ text: 'ok', usage: usage(1) })) + const boom = new StubAgent(() => { throw new Error('worker exploded') }) + const sup = new Supervisor({ + plan: () => [{ description: 'a' }, { description: 'b' }], + workers: (s) => (s.id === 'subtask-2' ? boom : ok), + }) + const run = await sup.run('t') + assert.equal(run.results[0]?.ok, true) + assert.equal(run.results[1]?.ok, false) + assert.match(String(run.results[1]?.error), /worker exploded/) + assert.equal(run.text, 'ok') // default synth omits the failure + }) + + it('trims a plan over maxSubtasks and flags stoppedEarly', async () => { + const a = new StubAgent(() => ({ text: 'x', usage: usage(1) })) + const events: SupervisorEvent[] = [] + const sup = new Supervisor({ + plan: () => [{ description: '1' }, { description: '2' }, { description: '3' }], + workers: a, + maxSubtasks: 2, + onEvent: (e) => events.push(e), + }) + const run = await sup.run('t') + assert.equal(run.plan.length, 2) + assert.equal(run.results.length, 2) + assert.equal(run.stoppedEarly, true) + assert.ok(events.some(e => e.type === 'plan-trimmed' && e.dropped === 1)) + }) + + it('halts dispatch once the token budget is crossed', async () => { + const a = new StubAgent(() => ({ text: 'x', usage: usage(100) })) + const sup = new Supervisor({ + plan: () => [{ description: '1' }, { description: '2' }, { description: '3' }, { description: '4' }], + workers: a, + concurrency: 1, // deterministic budget accounting + budget: { maxTotalTokens: 250 }, + }) + const run = await sup.run('t') + assert.equal(run.results.length, 3) // 100, 200, 300 — the 4th never starts + assert.equal(run.usage.totalTokens, 300) + assert.equal(run.stoppedEarly, true) + }) + + it('reports a paused worker as a failed subtask (no durable resume yet)', async () => { + const paused = new StubAgent(() => ({ + text: 'partial', + finishReason: 'tool_approval_required', + pendingApprovalToolCall: { isClientTool: false, toolCall: {} } as unknown as NonNullable, + usage: usage(2), + })) + const sup = new Supervisor({ plan: () => [{ description: 'a' }], workers: paused }) + const run = await sup.run('t') + assert.equal(run.results[0]?.ok, false) + assert.match(String(run.results[0]?.error), /paused/) + }) + + it('isolates an unknown worker key as a failed subtask', async () => { + const a = new StubAgent(() => ({ text: 'x', usage: usage(1) })) + const sup = new Supervisor({ + plan: () => [{ description: 'a', worker: 'ghost' }], + workers: { real: a }, + }) + const run = await sup.run('t') + assert.equal(run.results[0]?.ok, false) + assert.match(String(run.results[0]?.error), /no worker named "ghost"/) + }) + + it('emits plan, dispatch, and synthesize events', async () => { + const a = new StubAgent(() => ({ text: 'x', usage: usage(1) })) + const types: string[] = [] + const sup = new Supervisor({ + plan: () => [{ description: 'a' }], + workers: a, + onEvent: (e) => types.push(e.type), + }) + await sup.run('t') + for (const expected of ['plan', 'dispatch-start', 'dispatch-result', 'synthesize']) { + assert.ok(types.includes(expected), `missing event ${expected}`) + } + }) +}) diff --git a/packages/ai-autopilot/src/supervisor.ts b/packages/ai-autopilot/src/supervisor.ts new file mode 100644 index 0000000..9d0ea71 --- /dev/null +++ b/packages/ai-autopilot/src/supervisor.ts @@ -0,0 +1,149 @@ +import { Agent } from '@gemstack/ai-sdk' +import type { TokenUsage } from '@gemstack/ai-sdk' +import { runPool } from './pool.js' +import { defaultSynthesize } from './synthesizer.js' +import type { + PlannedSubtask, + SubtaskResult, + SupervisorOptions, + SupervisorRun, + WorkerRouter, +} from './types.js' + +const ZERO_USAGE: TokenUsage = { promptTokens: 0, completionTokens: 0, totalTokens: 0 } + +/** + * The supervisor/worker topology: **plan → dispatch → synthesize**. + * + * A planner decomposes the task into subtasks; the supervisor dispatches each + * to a worker agent (bounded concurrency, optional token budget, per-subtask + * error isolation); a synthesizer combines the results into the final answer. + * + * This is a control *policy* over `@gemstack/ai-sdk`'s single-agent primitives, + * not a wrapper around any one of them — it owns which agents run, in what + * order, how their results combine, and when to stop. + * + * ```ts + * const supervisor = new Supervisor({ + * plan: agentPlanner(plannerAgent), + * workers: { research: researchAgent, write: writerAgent }, + * synthesize: agentSynthesizer(editorAgent), + * concurrency: 3, + * budget: { maxTotalTokens: 200_000 }, + * }) + * const { text } = await supervisor.run('Draft a launch brief for product X') + * ``` + */ +export class Supervisor { + constructor(private readonly opts: SupervisorOptions) {} + + async run(task: string): Promise { + const concurrency = this.opts.concurrency ?? 4 + const route = resolveRouter(this.opts.workers) + const synthesize = this.opts.synthesize ?? defaultSynthesize + const emit = this.opts.onEvent ?? (() => {}) + + let usage: TokenUsage = ZERO_USAGE + let stoppedEarly = false + + // 1. Plan ──────────────────────────────────────────────── + const drafted = await this.opts.plan(task) + let plan: PlannedSubtask[] = drafted.map((s, i) => ({ ...s, id: s.id ?? `subtask-${i + 1}` })) + + const cap = this.opts.maxSubtasks + if (cap !== undefined && plan.length > cap) { + const dropped = plan.length - cap + plan = plan.slice(0, cap) + stoppedEarly = true + emit({ type: 'plan-trimmed', kept: plan.length, dropped, reason: 'maxSubtasks' }) + } + emit({ type: 'plan', task, subtasks: plan }) + + // 2. Dispatch ───────────────────────────────────────────── + const limit = this.opts.budget?.maxTotalTokens + const { results, stopped } = await runPool( + plan, + concurrency, + async (subtask) => { + emit({ type: 'dispatch-start', subtask }) + const result = await runSubtask(route, subtask) + usage = addUsage(usage, result.usage) + emit({ type: 'dispatch-result', result }) + return result + }, + limit !== undefined ? () => usage.totalTokens >= limit : undefined, + ) + + if (stopped && limit !== undefined) { + stoppedEarly = true + emit({ type: 'budget-exceeded', spentTokens: usage.totalTokens, limitTokens: limit, skipped: plan.length - results.length }) + } + + // 3. Synthesize ─────────────────────────────────────────── + emit({ type: 'synthesize', results }) + const text = await synthesize(task, results) + + return { text, plan, results, usage, stoppedEarly } + } +} + +// ─── Internals ─────────────────────────────────────────────────── + +async function runSubtask(route: WorkerRouter, subtask: PlannedSubtask): Promise { + try { + const agent = route(subtask) + const response = await agent.prompt(subtask.description) + + // The seed dispatches autonomous workers. A worker that pauses for a + // client-tool or approval round-trip can't be carried forward yet (durable + // resume is a deferred adapter), so surface it as a failed subtask rather + // than silently dropping the pause. + if ((response.pendingClientToolCalls?.length ?? 0) > 0 || response.pendingApprovalToolCall) { + return { + subtask, + text: response.text ?? '', + ok: false, + usage: response.usage ?? ZERO_USAGE, + error: new Error( + `[ai-autopilot] worker for "${subtask.id}" paused (${response.finishReason}); ` + + `the Supervisor seed runs autonomous workers and cannot resume a paused run yet`, + ), + } + } + + return { subtask, text: response.text ?? '', ok: true, usage: response.usage ?? ZERO_USAGE } + } catch (error) { + return { subtask, text: '', ok: false, error, usage: ZERO_USAGE } + } +} + +function resolveRouter(workers: SupervisorOptions['workers']): WorkerRouter { + if (typeof workers === 'function') return workers + if (workers instanceof Agent) return () => workers + + const pool = workers as Record + return (subtask) => { + if (subtask.worker === undefined) { + throw new Error( + `[ai-autopilot] subtask "${subtask.id}" has no \`worker\` key, but \`workers\` is a pool. ` + + `Set subtask.worker, or pass a single Agent / a WorkerRouter.`, + ) + } + const agent = pool[subtask.worker] + if (!agent) { + throw new Error( + `[ai-autopilot] no worker named "${subtask.worker}" (subtask "${subtask.id}"). ` + + `Known workers: ${Object.keys(pool).join(', ') || '(none)'}.`, + ) + } + return agent + } +} + +function addUsage(a: TokenUsage, b: TokenUsage): TokenUsage { + return { + promptTokens: a.promptTokens + b.promptTokens, + completionTokens: a.completionTokens + b.completionTokens, + totalTokens: a.totalTokens + b.totalTokens, + } +} diff --git a/packages/ai-autopilot/src/synthesizer.test.ts b/packages/ai-autopilot/src/synthesizer.test.ts new file mode 100644 index 0000000..6d4ecd0 --- /dev/null +++ b/packages/ai-autopilot/src/synthesizer.test.ts @@ -0,0 +1,52 @@ +import { describe, it } from 'node:test' +import assert from 'node:assert/strict' +import { AiFake, agent, getMessageText } from '@gemstack/ai-sdk' +import { defaultSynthesize, agentSynthesizer } from './synthesizer.js' +import type { SubtaskResult } from './types.js' + +function result(id: string, description: string, text: string, ok = true): SubtaskResult { + return { + subtask: { id, description }, + text, + ok, + usage: { promptTokens: 0, completionTokens: 0, totalTokens: 0 }, + } +} + +describe('defaultSynthesize', () => { + it('concatenates successful results and omits failures', () => { + const out = defaultSynthesize('t', [ + result('1', 'a', 'alpha'), + result('2', 'b', '', false), + result('3', 'c', 'gamma'), + ]) + assert.equal(out, 'alpha\n\ngamma') + }) + + it('returns empty string when nothing succeeded', () => { + assert.equal(defaultSynthesize('t', [result('1', 'a', '', false)]), '') + }) +}) + +describe('agentSynthesizer', () => { + it('prompts the agent with the task + successful results and returns its text', async () => { + const fake = AiFake.fake() + try { + fake.respondWith('final synthesized answer') + const synth = agentSynthesizer(agent('You synthesize.')) + const out = await synth('the task', [ + result('1', 'research', 'found X'), + result('2', 'failed', '', false), + ]) + assert.equal(out, 'final synthesized answer') + + const call = fake.getCalls()[0]! + const text = call.messages.map(m => getMessageText(m.content)).join('\n') + assert.match(text, /the task/) + assert.match(text, /found X/) + assert.ok(!text.includes('failed'), 'failed subtasks are not sent to the synthesizer') + } finally { + fake.restore() + } + }) +}) diff --git a/packages/ai-autopilot/src/synthesizer.ts b/packages/ai-autopilot/src/synthesizer.ts new file mode 100644 index 0000000..61f440c --- /dev/null +++ b/packages/ai-autopilot/src/synthesizer.ts @@ -0,0 +1,34 @@ +import type { Agent } from '@gemstack/ai-sdk' +import type { Synthesizer, SubtaskResult } from './types.js' + +/** + * The default synthesizer: concatenate the successful results, in plan order, + * separated by blank lines. No LLM call — deterministic and free. + */ +export function defaultSynthesize(_task: string, results: SubtaskResult[]): string { + return results + .filter(r => r.ok) + .map(r => r.text.trim()) + .filter(s => s.length > 0) + .join('\n\n') +} + +/** + * Build a {@link Synthesizer} that asks an agent to combine the worker results + * into a single coherent answer. Failed subtasks are omitted from the prompt. + */ +export function agentSynthesizer(agent: Agent, opts: { instructions?: string } = {}): Synthesizer { + const instructions = opts.instructions + ?? 'Combine the worker results below into a single, coherent answer to the task. Resolve overlaps and contradictions; do not just concatenate.' + + return async (task, results) => { + const ok = results.filter(r => r.ok) + const body = ok.length > 0 + ? ok.map(r => `## ${r.subtask.description}\n${r.text.trim()}`).join('\n\n') + : '(no successful worker results)' + + const prompt = `${instructions}\n\n# Task\n${task}\n\n# Worker results\n${body}` + const response = await agent.prompt(prompt) + return response.text ?? '' + } +} diff --git a/packages/ai-autopilot/src/types.ts b/packages/ai-autopilot/src/types.ts new file mode 100644 index 0000000..e660611 --- /dev/null +++ b/packages/ai-autopilot/src/types.ts @@ -0,0 +1,93 @@ +import type { Agent, TokenUsage } from '@gemstack/ai-sdk' + +/** A unit of work the supervisor dispatches to a worker agent. */ +export interface Subtask { + /** Stable id; auto-assigned (`subtask-N`) when a planner omits it. */ + id?: string + /** What the subtask asks for — becomes the worker agent's prompt. */ + description: string + /** Worker pool key, when `workers` is a `Record`. */ + worker?: string +} + +/** A subtask with its id resolved — what the supervisor actually runs. */ +export type PlannedSubtask = Subtask & { id: string } + +/** The outcome of dispatching a single subtask to a worker. */ +export interface SubtaskResult { + subtask: PlannedSubtask + /** The worker's final text. Empty string when the worker failed. */ + text: string + /** `false` when the worker threw or paused (client-tool / approval round-trip). */ + ok: boolean + /** The failure, when `ok` is false. */ + error?: unknown + /** Token usage for this worker run (zeroed on failure). */ + usage: TokenUsage +} + +/** The full result of a supervised run. */ +export interface SupervisorRun { + /** The synthesized final answer. */ + text: string + /** The plan that was executed (after any guardrail trimming). */ + plan: PlannedSubtask[] + /** One result per dispatched subtask, in plan order. */ + results: SubtaskResult[] + /** Aggregate token usage across planning, dispatch, and synthesis. */ + usage: TokenUsage + /** True when a guardrail (subtask cap or token budget) stopped work early. */ + stoppedEarly: boolean +} + +/** + * Decomposes a task into subtasks. The mechanism is yours — an LLM planner + * (see `agentPlanner`), a static list, or hand-rolled logic. Autopilot owns + * the control policy around it, not the decomposition. + */ +export type Planner = (task: string) => Subtask[] | Promise + +/** + * Routes a subtask to the worker agent that should run it. Built from the + * `workers` option: a single `Agent`, a `Record` keyed by + * `subtask.worker`, or a function for custom routing. + */ +export type WorkerRouter = (subtask: PlannedSubtask) => Agent + +/** Combines the subtask results into the final answer. */ +export type Synthesizer = (task: string, results: SubtaskResult[]) => string | Promise + +/** Progress events emitted during a run (for logging / UI). */ +export type SupervisorEvent = + | { type: 'plan'; task: string; subtasks: PlannedSubtask[] } + | { type: 'plan-trimmed'; kept: number; dropped: number; reason: 'maxSubtasks' } + | { type: 'dispatch-start'; subtask: PlannedSubtask } + | { type: 'dispatch-result'; result: SubtaskResult } + | { type: 'budget-exceeded'; spentTokens: number; limitTokens: number; skipped: number } + | { type: 'synthesize'; results: SubtaskResult[] } + +export interface SupervisorOptions { + /** How to decompose the task into subtasks. */ + plan: Planner + /** + * The worker agent(s) that run subtasks: + * - a single `Agent` — every subtask runs on it; + * - a `Record` — `subtask.worker` selects the agent; + * - a `WorkerRouter` function — full control. + */ + workers: Agent | Record | WorkerRouter + /** + * How to combine results into the final answer. Defaults to a plain + * concatenation of the successful results; pass `agentSynthesizer(agent)` + * for an LLM synthesis. + */ + synthesize?: Synthesizer + /** Max subtasks dispatched at once. Default 4. */ + concurrency?: number + /** Hard cap on subtasks; a longer plan is trimmed (and an event emitted). */ + maxSubtasks?: number + /** Stop dispatching once aggregate token usage exceeds `maxTotalTokens`. */ + budget?: { maxTotalTokens?: number } + /** Observe progress. */ + onEvent?: (event: SupervisorEvent) => void +} diff --git a/packages/ai-autopilot/tsconfig.build.json b/packages/ai-autopilot/tsconfig.build.json new file mode 100644 index 0000000..e578064 --- /dev/null +++ b/packages/ai-autopilot/tsconfig.build.json @@ -0,0 +1,6 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { "outDir": "dist", "rootDir": "src" }, + "include": ["src"], + "exclude": ["src/**/*.test.ts"] +} diff --git a/packages/ai-autopilot/tsconfig.json b/packages/ai-autopilot/tsconfig.json new file mode 100644 index 0000000..404aab4 --- /dev/null +++ b/packages/ai-autopilot/tsconfig.json @@ -0,0 +1,5 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { "noEmit": true, "rootDir": "src" }, + "include": ["src"] +} diff --git a/packages/ai-autopilot/tsconfig.test.json b/packages/ai-autopilot/tsconfig.test.json new file mode 100644 index 0000000..eebda2f --- /dev/null +++ b/packages/ai-autopilot/tsconfig.test.json @@ -0,0 +1,5 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { "outDir": "dist-test", "rootDir": "src" }, + "include": ["src"] +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 940de42..1f58558 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -18,6 +18,22 @@ importers: specifier: ^5.4.0 version: 5.9.3 + packages/ai-autopilot: + dependencies: + '@gemstack/ai-sdk': + specifier: workspace:^ + version: link:../ai-sdk + zod: + specifier: ^4.0.0 + version: 4.4.3 + devDependencies: + '@types/node': + specifier: ^20.0.0 + version: 20.19.43 + typescript: + specifier: ^5.4.0 + version: 5.9.3 + packages/ai-mcp: dependencies: '@gemstack/ai-sdk':