From cf5a26a816a4be26eda5a04374a2c3aa91e82cf8 Mon Sep 17 00:00:00 2001 From: Rach Pradhan <54503978+justrach@users.noreply.github.com> Date: Fri, 15 May 2026 15:49:06 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat(sdk):=20DevSwarm-style=20swarm=20examp?= =?UTF-8?q?le=20=E2=80=94=20orchestrator/workers/synthesizer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thin slice of #55 ([DSP-1] Agents). Single-file TS demo at sdk/typescript/examples/swarm.ts wired up as `npm run swarm` that implements the three-stage pattern from justrach/devswarm using only existing @codegraff/sdk primitives: 1. Orchestrator: decompose task into N independent JSON-array subtasks 2. Workers: each spawns a fresh Graff.init() and runs its subtask in parallel (Promise.all). Separate Graff instances are the cheap stand-in for the worktree isolation devswarm uses (their ADR-001 / issue #213). Worker prompt does NOT include the orchestrator preamble (lesson from devswarm #389). 3. Synthesizer: merges worker outputs into a single response. Configurable via env: GRAFF_CWD, SWARM_N (clamped [1,8]), SWARM_MODEL. Default task picks a small read-only ask against this repo so the demo is cheap to smoke. Deliberately out of scope for this PR (future #55 PRs): - Role+mode routing (8 roles × 4 modes per devswarm) - Role-specific model tiers (Opus orchestrator / Sonnet workers / Haiku monitor) - Preset task chains (finder_fixer, reviewer_fixer, etc.) - NO_ISSUES_FOUND review-fix termination - Real worktree isolation Type-checks under the same tsc/tsx config the existing examples use. Manual smoke run is gated to the user since it makes real LLM calls. Refs: #53 (epic), #55 (DSP-1). Co-Authored-By: Claude Opus 4.7 (1M context) --- sdk/typescript/examples/swarm.ts | 206 +++++++++++++++++++++++++++++++ sdk/typescript/package.json | 1 + 2 files changed, 207 insertions(+) create mode 100644 sdk/typescript/examples/swarm.ts diff --git a/sdk/typescript/examples/swarm.ts b/sdk/typescript/examples/swarm.ts new file mode 100644 index 00000000..bcc96778 --- /dev/null +++ b/sdk/typescript/examples/swarm.ts @@ -0,0 +1,206 @@ +/** + * DevSwarm-style swarm orchestration via @codegraff/sdk. + * + * Implements the orchestrator → workers → synthesizer pattern from + * justrach/devswarm as a single-file SDK example. Three stages: + * + * 1. Orchestrator decomposes the task into N independent subtasks. + * 2. N workers run those subtasks in parallel, each in its own Graff. + * 3. Synthesizer merges the worker outputs into one cohesive response. + * + * Thin slice of codegraff issue #55 ([DSP-1]). Deliberately out of scope: + * role+mode routing, worktree isolation, preset task chains, role-specific + * model tiers (Opus orchestrator / Sonnet workers / Haiku monitor), + * NO_ISSUES_FOUND review-fix termination. Those land in follow-up PRs + * against the same issue. + * + * Run from sdk/typescript/: + * npm run swarm -- "your task here" + * + * Env: + * GRAFF_CWD workspace the workers run inside (defaults to repo root) + * SWARM_N worker count, clamped [1, 8] (default 3) + * SWARM_MODEL model id passed through to every stage (optional) + */ + +import path from "node:path"; +import { fileURLToPath } from "node:url"; + +import { Graff, type AgentEvent } from "../lib.js"; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); +// examples/ → sdk/typescript/ → sdk/ → repo root +const REPO_ROOT = process.env.GRAFF_CWD ?? path.resolve(__dirname, "../../.."); +const N_WORKERS = clampWorkers(process.env.SWARM_N); +const MODEL = process.env.SWARM_MODEL || undefined; + +const DEFAULT_TASK = + "List the .rs files under sdk/typescript/src and describe what each does in ONE sentence."; +const TASK = process.argv.slice(2).join(" ").trim() || DEFAULT_TASK; + +function clampWorkers(raw: string | undefined): number { + const parsed = Number(raw ?? 3); + if (!Number.isFinite(parsed)) return 3; + return Math.max(1, Math.min(8, Math.floor(parsed))); +} + +function ts(): string { + return new Date().toISOString().slice(11, 19); +} + +interface DrainResult { + text: string; + toolCalls: number; + interrupted: boolean; +} + +// Drain a chat stream, accumulating final-Markdown text and counting tool +// calls. Logs lifecycle to stderr so the answer on stdout stays clean. +async function drain(label: string, stream: AsyncIterable): Promise { + let text = ""; + let toolCalls = 0; + let interrupted = false; + for await (const ev of stream) { + switch (ev.type) { + case "ConversationStarted": + process.stderr.write(`[${ts()}] ${label} ⏵ conv=${ev.conversationId.slice(0, 8)}\n`); + break; + case "TaskMessage": + if (ev.content.kind === "Markdown") text += ev.content.text; + else if (ev.content.kind === "ToolInput") + process.stderr.write(`[${ts()}] ${label} ▸ ${ev.content.title}\n`); + break; + case "ToolCallStart": + toolCalls++; + break; + case "Interrupt": + interrupted = true; + process.stderr.write(`[${ts()}] ${label} ⚠ ${ev.reason.kind}\n`); + break; + case "TaskComplete": + process.stderr.write(`[${ts()}] ${label} ✓\n`); + break; + } + } + return { text, toolCalls, interrupted }; +} + +// Extract a JSON array of strings from orchestrator output. Tolerates +// ```json fences and surrounding prose because models add them anyway. +export function extractJsonArray(raw: string): string[] { + const fence = raw.match(/```(?:json)?\s*([\s\S]*?)```/); + const candidate = (fence ? fence[1] : raw).trim(); + const start = candidate.indexOf("["); + const end = candidate.lastIndexOf("]"); + if (start === -1 || end === -1 || end <= start) { + throw new Error(`orchestrator did not return a JSON array:\n${raw.slice(0, 500)}`); + } + const parsed: unknown = JSON.parse(candidate.slice(start, end + 1)); + if (!Array.isArray(parsed) || !parsed.every((s) => typeof s === "string")) { + throw new Error("orchestrator JSON was not an array of strings"); + } + return parsed as string[]; +} + +async function decompose(graff: Graff, task: string, n: number): Promise { + const prompt = + `You are the ORCHESTRATOR in a multi-agent swarm. Break the task below into ` + + `EXACTLY ${n} independent subtasks that can run in parallel without coordination. ` + + `Each subtask must be self-contained: a worker reading only the subtask should ` + + `know what to do. Return STRICTLY a JSON array of ${n} strings, no prose, no ` + + `code fence.\n\nTASK:\n${task}`; + const opts = MODEL ? { prompt, model: MODEL } : { prompt }; + const { text } = await drain("orchestrator", graff.chat(opts)); + return extractJsonArray(text); +} + +interface WorkerOutput { + workerId: number; + subtask: string; + result: string; + toolCalls: number; + durationMs: number; +} + +async function runWorker(workerId: number, subtask: string): Promise { + const label = `worker[${workerId + 1}]`; + const t0 = Date.now(); + // Each worker gets its own Graff so conversation state is fully isolated — + // the cheap stand-in for the worktree isolation devswarm uses for swarm + // workers (their ADR-001 / issue #213). Worker prompt does NOT include + // the orchestrator preamble so we don't leak it into worker context + // (lesson from devswarm #389). + const graff = await Graff.init({ cwd: REPO_ROOT }); + const prompt = + `You are worker ${workerId + 1} in a parallel swarm. Complete ONLY the ` + + `subtask below and return a concise result. Do not speculate about other ` + + `workers or the overall task.\n\nSUBTASK:\n${subtask}`; + const opts = MODEL ? { prompt, model: MODEL } : { prompt }; + const { text, toolCalls } = await drain(label, graff.chat(opts)); + return { + workerId, + subtask, + result: text, + toolCalls, + durationMs: Date.now() - t0, + }; +} + +async function synthesize(graff: Graff, task: string, outputs: WorkerOutput[]): Promise { + const bundle = outputs + .map( + (o) => + `--- WORKER ${o.workerId + 1} ---\n` + + `SUBTASK: ${o.subtask}\n` + + `OUTPUT:\n${o.result.trim()}`, + ) + .join("\n\n"); + const prompt = + `You are the SYNTHESIZER in a multi-agent swarm. Combine the worker outputs ` + + `below into ONE cohesive response answering the original task. Resolve ` + + `disagreements explicitly. Do not invent facts not present in worker ` + + `outputs.\n\nORIGINAL TASK:\n${task}\n\n${bundle}`; + const opts = MODEL ? { prompt, model: MODEL } : { prompt }; + const { text } = await drain("synthesizer", graff.chat(opts)); + return text; +} + +async function main(): Promise { + process.stderr.write( + `swarm n=${N_WORKERS} cwd=${REPO_ROOT}${MODEL ? ` model=${MODEL}` : ""}\n`, + ); + process.stderr.write(`TASK: ${TASK}\n---\n`); + + // Orchestrator and synthesizer share one Graff and run sequentially. + // Workers each get their own Graff and run in parallel. + const driver = await Graff.init({ cwd: REPO_ROOT }); + + const t0 = Date.now(); + const subtasks = await decompose(driver, TASK, N_WORKERS); + process.stderr.write(`\n[${ts()}] decomposed into ${subtasks.length} subtasks:\n`); + subtasks.forEach((s, i) => { + const preview = s.length > 140 ? s.slice(0, 137) + "..." : s; + process.stderr.write(` ${i + 1}. ${preview}\n`); + }); + process.stderr.write(`\n`); + + const workerResults = await Promise.all(subtasks.map((s, i) => runWorker(i, s))); + const totalToolCalls = workerResults.reduce((a, w) => a + w.toolCalls, 0); + + process.stderr.write(`\n[${ts()}] all ${subtasks.length} workers done. synthesizing...\n\n`); + const finalText = await synthesize(driver, TASK, workerResults); + + const ms = Date.now() - t0; + process.stderr.write(`\n=== final ===\n`); + process.stdout.write(finalText.trim() + "\n"); + process.stderr.write( + `\n[swarm] n=${subtasks.length} duration=${(ms / 1000).toFixed(1)}s ` + + `tool_calls=${totalToolCalls}\n`, + ); +} + +main().catch((err) => { + process.stderr.write(`\nFATAL: ${err?.stack ?? err}\n`); + process.exit(1); +}); diff --git a/sdk/typescript/package.json b/sdk/typescript/package.json index 14a07baa..bce3d16e 100644 --- a/sdk/typescript/package.json +++ b/sdk/typescript/package.json @@ -35,6 +35,7 @@ "smoke": "node examples/smoke.mjs", "demo": "tsx examples/agent-demo.ts", "compare": "tsx examples/compare.ts", + "swarm": "tsx examples/swarm.ts", "benchmark": "tsx examples/benchmark.ts" }, "devDependencies": { From b2dd10c80f83e5791fb6680ab15cc0246dbc977b Mon Sep 17 00:00:00 2001 From: Rach Pradhan <54503978+justrach@users.noreply.github.com> Date: Fri, 15 May 2026 16:07:39 +0800 Subject: [PATCH 2/2] =?UTF-8?q?fix(sdk):=20swarm=20v2=20=E2=80=94=20semant?= =?UTF-8?q?ic=20decompose,=20worker=20timeout,=20synth=20trust?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three findings from the first smoke run (gpt-5.5, N=3, 447s) addressed: #64 [DSP-1.1] orchestrator over-shards by modulo-N input split Rewrote decompose() prompt to require SEMANTICALLY DISTINCT subtasks attacking different aspects of the task. Added an explicit FORBIDDEN clause against modulo/index/sort-order partitioning. Allow returning fewer subtasks than N when the task is small. Verified: v2 returned 2 distinct subtasks (lib.rs vs wire.rs) instead of 3 identical modulo-shards. #63 [DSP-1.2] workers can stall indefinitely Added runWorkerWithTimeout() wrapping each worker in Promise.race against a setTimeout. WORKER_TIMEOUT_MS env (default 120s, min 10s, no upper clamp). On timeout, the worker yields a [TIMED_OUT after Nms] marker the synthesizer can handle. Cancellation of the underlying chat stream is best-effort — out of scope for this slice. Wired but not exercised this run (no stalls occurred). #62 [DSP-1.3] synthesizer doesn't trust worker outputs Rewrote synthesize() prompt to forbid tool use, require returning INSUFFICIENT_DATA when worker output is too thin, and explicitly handle [TIMED_OUT markers as missing data rather than ignored. Verified: v2 synthesizer ran 0 tool calls (down from Search + 2×Read). Net smoke-test deltas v1 → v2: duration: 447.7s → 139.1s (-69%) total tool calls: 20 → 2 (-90%) subtasks: 3 → 2 (correct semantic decomp) Co-Authored-By: Claude Opus 4.7 (1M context) --- sdk/typescript/examples/swarm.ts | 72 ++++++++++++++++++++++++++++---- 1 file changed, 63 insertions(+), 9 deletions(-) diff --git a/sdk/typescript/examples/swarm.ts b/sdk/typescript/examples/swarm.ts index bcc96778..5369deed 100644 --- a/sdk/typescript/examples/swarm.ts +++ b/sdk/typescript/examples/swarm.ts @@ -34,6 +34,7 @@ const __dirname = path.dirname(__filename); const REPO_ROOT = process.env.GRAFF_CWD ?? path.resolve(__dirname, "../../.."); const N_WORKERS = clampWorkers(process.env.SWARM_N); const MODEL = process.env.SWARM_MODEL || undefined; +const WORKER_TIMEOUT_MS = Math.max(10_000, Number(process.env.WORKER_TIMEOUT_MS ?? 120_000)); const DEFAULT_TASK = "List the .rs files under sdk/typescript/src and describe what each does in ONE sentence."; @@ -106,10 +107,21 @@ export function extractJsonArray(raw: string): string[] { async function decompose(graff: Graff, task: string, n: number): Promise { const prompt = `You are the ORCHESTRATOR in a multi-agent swarm. Break the task below into ` + - `EXACTLY ${n} independent subtasks that can run in parallel without coordination. ` + - `Each subtask must be self-contained: a worker reading only the subtask should ` + - `know what to do. Return STRICTLY a JSON array of ${n} strings, no prose, no ` + - `code fence.\n\nTASK:\n${task}`; + `AT MOST ${n} SEMANTICALLY DISTINCT subtasks that can run in parallel.\n\n` + + `REQUIREMENTS:\n` + + `- Each subtask must attack a DIFFERENT aspect, angle, or concern of the ` + + `task — e.g. exploration vs analysis vs verification, or one ` + + `file/area/concept per subtask.\n` + + `- Each subtask must be self-contained: a worker reading only the subtask ` + + `should know what to do without seeing the other subtasks or the original ` + + `framing.\n` + + `- FORBIDDEN: do NOT shard inputs by index, modulo, sort order, hash, or any ` + + `other mechanical partition. Subtasks like "handle items whose index mod ${n} ` + + `== 0" are WRONG. Subtasks attacking different facets of the problem are RIGHT.\n` + + `- If the task cannot be meaningfully split into ${n} subtasks, return fewer. ` + + `An array of 1 string is acceptable if the task is small.\n\n` + + `Return STRICTLY a JSON array of strings, no prose, no code fence.\n\n` + + `TASK:\n${task}`; const opts = MODEL ? { prompt, model: MODEL } : { prompt }; const { text } = await drain("orchestrator", graff.chat(opts)); return extractJsonArray(text); @@ -147,6 +159,36 @@ async function runWorker(workerId: number, subtask: string): Promise { + const label = `worker[${workerId + 1}]`; + let timer: NodeJS.Timeout | undefined; + const timeout = new Promise((resolve) => { + timer = setTimeout(() => { + process.stderr.write( + `[${ts()}] ${label} ⌛ timed out after ${WORKER_TIMEOUT_MS}ms\n`, + ); + resolve({ + workerId, + subtask, + result: `[TIMED_OUT after ${WORKER_TIMEOUT_MS}ms]`, + toolCalls: 0, + durationMs: WORKER_TIMEOUT_MS, + }); + }, WORKER_TIMEOUT_MS); + }); + try { + return await Promise.race([runWorker(workerId, subtask), timeout]); + } finally { + if (timer) clearTimeout(timer); + } +} + async function synthesize(graff: Graff, task: string, outputs: WorkerOutput[]): Promise { const bundle = outputs .map( @@ -157,10 +199,20 @@ async function synthesize(graff: Graff, task: string, outputs: WorkerOutput[]): ) .join("\n\n"); const prompt = - `You are the SYNTHESIZER in a multi-agent swarm. Combine the worker outputs ` + - `below into ONE cohesive response answering the original task. Resolve ` + - `disagreements explicitly. Do not invent facts not present in worker ` + - `outputs.\n\nORIGINAL TASK:\n${task}\n\n${bundle}`; + `You are the SYNTHESIZER in a multi-agent swarm. You have read-only access ` + + `to the worker outputs below.\n\n` + + `REQUIREMENTS:\n` + + `- DO NOT call any tools. Do not Read, Search, fs_search, or run any commands. ` + + `Use ONLY the worker outputs as source material.\n` + + `- Combine the worker outputs into ONE cohesive response answering the ` + + `original task.\n` + + `- Resolve disagreements explicitly when workers contradict each other.\n` + + `- If a worker output begins with "[TIMED_OUT", treat it as missing data ` + + `and note the gap rather than ignoring it.\n` + + `- If the combined worker outputs are insufficient to answer the task, ` + + `return the literal string \`INSUFFICIENT_DATA\` followed by an explanation ` + + `of what's missing — do not attempt to fill the gap yourself.\n\n` + + `ORIGINAL TASK:\n${task}\n\n${bundle}`; const opts = MODEL ? { prompt, model: MODEL } : { prompt }; const { text } = await drain("synthesizer", graff.chat(opts)); return text; @@ -185,7 +237,9 @@ async function main(): Promise { }); process.stderr.write(`\n`); - const workerResults = await Promise.all(subtasks.map((s, i) => runWorker(i, s))); + const workerResults = await Promise.all( + subtasks.map((s, i) => runWorkerWithTimeout(i, s)), + ); const totalToolCalls = workerResults.reduce((a, w) => a + w.toolCalls, 0); process.stderr.write(`\n[${ts()}] all ${subtasks.length} workers done. synthesizing...\n\n`);