Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changeset/ai-autopilot-initial.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
"@gemstack/ai-autopilot": minor
---

Initial release. Orchestration for `@gemstack/ai-sdk` agents — the control-policy layer over many agent runs. Seed slice: the supervisor/worker topology.

- `Supervisor` — **plan → dispatch → synthesize**: decompose a task into subtasks, dispatch each to a worker agent (bounded concurrency, optional token budget, per-subtask error isolation), and synthesize the results.
- `agentPlanner(agent)` — turn a planning agent into a `Planner` via `ai-sdk`'s `Output.array` (JSON subtask decomposition).
- `agentSynthesizer(agent)` / `defaultSynthesize` — combine subtask results (LLM pass, or deterministic concatenation).
- Pluggable stages (`plan` / `workers` / `synthesize`), guardrails (`concurrency`, `maxSubtasks`, `budget.maxTotalTokens`), and progress events.

Scope boundary: `ai-sdk` owns the single-agent loop + handoff/subagent primitives; `ai-autopilot` owns orchestrating multiple runs under a policy. The seed runs autonomous workers; durable pause/resume, more topologies, and queue-backed execution are deferred behind optional seams. Depends on `@gemstack/ai-sdk`.
56 changes: 56 additions & 0 deletions packages/ai-autopilot/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# @gemstack/ai-autopilot

Orchestration for [`@gemstack/ai-sdk`](https://github.com/gemstack-land/gemstack/tree/main/packages/ai-sdk) agents — the "director" layer that runs **many** agent runs under a control policy.

`ai-sdk` owns the single-agent loop and the handoff / subagent primitives. `ai-autopilot` owns orchestrating multiple runs: which agents run, in what order, how their results combine, and when to stop. If a feature is just calling an `ai-sdk` primitive, it belongs in `ai-sdk` — autopilot earns its keep only as the topology / control-policy layer.

## The seed: Supervisor (plan → dispatch → synthesize)

The first slice is the supervisor/worker topology — the smallest thing clearly more than the primitives:

1. **Plan** — a planner decomposes the task into subtasks.
2. **Dispatch** — each subtask runs on a worker agent, with bounded concurrency, an optional token budget, and per-subtask error isolation.
3. **Synthesize** — a synthesizer combines the results into the final answer.

```ts
import { Supervisor, agentPlanner, agentSynthesizer } from '@gemstack/ai-autopilot'

const supervisor = new Supervisor({
plan: agentPlanner(plannerAgent), // LLM decomposition
workers: { research: researchAgent, write: writerAgent }, // routed by subtask.worker
synthesize: agentSynthesizer(editorAgent), // LLM synthesis
concurrency: 3,
maxSubtasks: 8,
budget: { maxTotalTokens: 200_000 },
onEvent: (e) => console.log(e.type),
})

const run = await supervisor.run('Draft a launch brief for product X')
console.log(run.text) // synthesized answer
console.log(run.results) // per-subtask outcomes (ok / error / usage)
console.log(run.usage) // aggregate token usage
console.log(run.stoppedEarly) // true if a guardrail trimmed or halted work
```

## Pieces are pluggable

Each stage is a plain function, so you mix LLM and deterministic logic freely:

- **`plan`** — a `Planner`: `(task) => Subtask[]`. Use `agentPlanner(agent)` for LLM decomposition, or return a static list.
- **`workers`** — a single `Agent` (all subtasks), a `Record<string, Agent>` (routed by `subtask.worker`), or a `WorkerRouter` function.
- **`synthesize`** — a `Synthesizer`: `(task, results) => string`. Defaults to `defaultSynthesize` (concatenate successes, no LLM call); pass `agentSynthesizer(agent)` for an LLM pass.

## Guardrails

- **`concurrency`** (default 4) — max workers in flight.
- **`maxSubtasks`** — hard cap; a longer plan is trimmed and `stoppedEarly` is set.
- **`budget.maxTotalTokens`** — stop dispatching once aggregate usage crosses the limit (in-flight workers finish; remaining subtasks are skipped).
- **Error isolation** — a worker that throws becomes an `ok: false` result; siblings continue.

## Scope (what's deferred)

The seed dispatches **autonomous** workers via `agent.prompt()`. A worker that pauses for a client-tool or approval round-trip is reported as a failed subtask — durable pause/resume across a supervised run (building on `ai-sdk`'s `SubAgentRunStore` + resume primitives) is a deferred adapter, as are other topologies (pipelines, debate) and queue-backed long-running execution. Those land on demand, behind optional seams, not in the core.

## License

MIT
57 changes: 57 additions & 0 deletions packages/ai-autopilot/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
{
"name": "@gemstack/ai-autopilot",
"version": "0.0.0",
"description": "Orchestration for @gemstack/ai-sdk agents: a Supervisor that plans, dispatches subagents (bounded concurrency + budget guardrails), and synthesizes the result.",
"keywords": [
"ai",
"agent",
"agents",
"orchestration",
"multi-agent",
"supervisor",
"autonomy",
"planning",
"gemstack"
],
"license": "MIT",
"homepage": "https://github.com/gemstack-land/gemstack/tree/main/packages/ai-autopilot#readme",
"bugs": {
"url": "https://github.com/gemstack-land/gemstack/issues"
},
"repository": {
"type": "git",
"url": "https://github.com/gemstack-land/gemstack",
"directory": "packages/ai-autopilot"
},
"type": "module",
"engines": {
"node": ">=22.12.0"
},
"files": [
"dist"
],
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"exports": {
".": {
"import": "./dist/index.js",
"types": "./dist/index.d.ts"
}
},
"scripts": {
"build": "tsc -p tsconfig.build.json",
"dev": "tsc -p tsconfig.build.json --watch",
"typecheck": "tsc --noEmit",
"test": "tsc -p tsconfig.test.json && cd dist-test && node --test",
"clean": "rm -rf dist"
},
"dependencies": {
"@gemstack/ai-sdk": "workspace:^",
"zod": "^4.0.0"
},
"devDependencies": {
"@types/node": "^20.0.0",
"typescript": "^5.4.0"
},
"author": "Suleiman Shahbari"
}
28 changes: 28 additions & 0 deletions packages/ai-autopilot/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* `@gemstack/ai-autopilot` — orchestration for `@gemstack/ai-sdk` agents.
*
* The seed slice is the supervisor/worker topology: {@link Supervisor} plans a
* task into subtasks, dispatches them to worker agents (bounded concurrency +
* token budget + per-subtask error isolation), and synthesizes the result.
*
* Autopilot owns the *control policy* over many agent runs; `ai-sdk` owns the
* single-agent loop and the handoff / subagent primitives the policy builds on.
*
* - {@link Supervisor} — the plan → dispatch → synthesize orchestrator
* - {@link agentPlanner} — turn a planning agent into a {@link Planner}
* - {@link agentSynthesizer} / {@link defaultSynthesize} — combine results
*/
export { Supervisor } from './supervisor.js'
export { agentPlanner, type AgentPlannerOptions } from './planner.js'
export { agentSynthesizer, defaultSynthesize } from './synthesizer.js'
export type {
Subtask,
PlannedSubtask,
SubtaskResult,
SupervisorRun,
SupervisorOptions,
SupervisorEvent,
Planner,
WorkerRouter,
Synthesizer,
} from './types.js'
46 changes: 46 additions & 0 deletions packages/ai-autopilot/src/planner.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { describe, it } from 'node:test'
import assert from 'node:assert/strict'
import { AiFake, agent, getMessageText } from '@gemstack/ai-sdk'
import { agentPlanner } from './planner.js'

describe('agentPlanner', () => {
it('prompts the agent and parses a JSON subtask array', async () => {
const fake = AiFake.fake()
try {
fake.respondWith('[{"description":"research the market"},{"description":"draft copy","worker":"writer"}]')
const plan = agentPlanner(agent('You plan.'))
const subtasks = await plan('Launch product X')
assert.deepEqual(subtasks, [
{ description: 'research the market' },
{ description: 'draft copy', worker: 'writer' },
])
} finally {
fake.restore()
}
})

it('tolerates a fenced JSON code block', async () => {
const fake = AiFake.fake()
try {
fake.respondWith('```json\n[{"description":"only task"}]\n```')
const subtasks = await agentPlanner(agent('plan'))('do it')
assert.deepEqual(subtasks, [{ description: 'only task' }])
} finally {
fake.restore()
}
})

it('puts the schema instruction and the task into the planning prompt', async () => {
const fake = AiFake.fake()
try {
fake.respondWith('[]')
await agentPlanner(agent('plan'))('summarize the quarterly numbers')
const call = fake.getCalls()[0]!
const text = call.messages.map(m => getMessageText(m.content)).join('\n')
assert.match(text, /JSON array/)
assert.match(text, /summarize the quarterly numbers/)
} finally {
fake.restore()
}
})
})
39 changes: 39 additions & 0 deletions packages/ai-autopilot/src/planner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { Output } from '@gemstack/ai-sdk'
import type { Agent } from '@gemstack/ai-sdk'
import { z } from 'zod'
import type { Planner, Subtask } from './types.js'

/** Default subtask shape an LLM planner emits. */
const defaultSubtaskSchema = z.object({
description: z.string().describe('What this subtask asks a worker agent to do'),
worker: z.string().optional().describe('Worker pool key, when routing to named workers'),
})

export interface AgentPlannerOptions {
/**
* Zod schema for one subtask. Must produce at least `{ description: string }`
* (and optionally `worker`). Defaults to that shape.
*/
element?: z.ZodType
/** Override the planning instruction prepended to the task. */
instructions?: string
}

/**
* Build a {@link Planner} that asks an agent to decompose the task into a JSON
* array of subtasks, using `@gemstack/ai-sdk`'s `Output.array` for the schema
* instruction + parsing. The agent is your planning policy; autopilot orchestrates
* the subtasks it returns.
*/
export function agentPlanner(agent: Agent, opts: AgentPlannerOptions = {}): Planner {
const element = opts.element ?? defaultSubtaskSchema
const output = Output.array({ element })
const instructions = opts.instructions
?? 'Break the task below into the smallest set of independent subtasks that can run in parallel. Each subtask is dispatched to a worker agent.'

return async (task) => {
const prompt = `${instructions}\n\n# Task\n${task}\n\n${output.toSystemPrompt()}`
const response = await agent.prompt(prompt)
return output.parse(response.text ?? '') as Subtask[]
}
}
46 changes: 46 additions & 0 deletions packages/ai-autopilot/src/pool.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { describe, it } from 'node:test'
import assert from 'node:assert/strict'
import { runPool } from './pool.js'

const tick = () => new Promise<void>(r => setTimeout(r, 1))

describe('runPool', () => {
it('runs every item and preserves input order in results', async () => {
const { results, stopped } = await runPool([1, 2, 3, 4], 2, async (n) => {
await tick()
return n * 10
})
assert.deepEqual(results, [10, 20, 30, 40])
assert.equal(stopped, false)
})

it('never exceeds the concurrency limit', async () => {
let inFlight = 0
let peak = 0
await runPool(Array.from({ length: 10 }, (_, i) => i), 3, async (n) => {
inFlight++
peak = Math.max(peak, inFlight)
await tick()
inFlight--
return n
})
assert.ok(peak <= 3, `peak concurrency ${peak} exceeded 3`)
})

it('stops claiming new items once shouldStop flips, and reports stopped', async () => {
let done = 0
const { results, stopped } = await runPool([1, 2, 3, 4, 5], 1, async (n) => {
done++
return n
}, () => done >= 2)
assert.equal(stopped, true)
assert.equal(results.length, 2) // only the first two ran
assert.deepEqual(results, [1, 2])
})

it('handles an empty list', async () => {
const { results, stopped } = await runPool([], 4, async () => 1)
assert.deepEqual(results, [])
assert.equal(stopped, false)
})
})
39 changes: 39 additions & 0 deletions packages/ai-autopilot/src/pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Run `items` through `run` with at most `limit` in flight at once.
*
* Before claiming each item a worker checks `shouldStop`; once it returns true,
* no further items start (in-flight ones finish) and `stopped` is reported. The
* returned `results` are sparse-free and index-aligned to the items that
* actually ran — trailing items skipped by `shouldStop` are simply absent, so
* `results.length < items.length` signals truncation.
*/
export async function runPool<T, R>(
items: readonly T[],
limit: number,
run: (item: T, index: number) => Promise<R>,
shouldStop?: () => boolean,
): Promise<{ results: R[]; stopped: boolean }> {
const out: Array<{ index: number; value: R }> = []
let next = 0
let stopped = false

const workers = Math.max(1, Math.min(limit, items.length))

async function worker(): Promise<void> {
while (true) {
if (shouldStop?.()) {
stopped = true
return
}
const index = next++
if (index >= items.length) return
const value = await run(items[index]!, index)
out.push({ index, value })
}
}

await Promise.all(Array.from({ length: workers }, () => worker()))

out.sort((a, b) => a.index - b.index)
return { results: out.map(o => o.value), stopped }
}
Loading
Loading