From 27356d79576894d594b1bb943fb1b42e252714e2 Mon Sep 17 00:00:00 2001 From: codex-mohan Date: Mon, 22 Jun 2026 19:21:12 +0530 Subject: [PATCH] feat(agent): add runSubagent primitive for subagent spawning MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a reusable runSubagent() function to the agent SDK that creates a fresh-context child Agent with model override, optional Budget (maxTurns/maxTokens/timeoutMs — all uncapped by default), signal forwarding, and a separate onEvent event stream. Returns SubagentResult with final text (for parent), full transcript messages (for caller persistence), and accumulated usage. The parent agent only sees the final text — the full transcript is isolated and available for the caller to persist to a child session. Background mode and task resumption are deferred to the app layer; permissions stay in the code layer. --- packages/agent/src/__tests__/subagent.test.ts | 421 ++++++++++++++++++ packages/agent/src/index.ts | 2 + packages/agent/src/subagent.ts | 164 +++++++ 3 files changed, 587 insertions(+) create mode 100644 packages/agent/src/__tests__/subagent.test.ts create mode 100644 packages/agent/src/subagent.ts diff --git a/packages/agent/src/__tests__/subagent.test.ts b/packages/agent/src/__tests__/subagent.test.ts new file mode 100644 index 0000000..cafbbab --- /dev/null +++ b/packages/agent/src/__tests__/subagent.test.ts @@ -0,0 +1,421 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { runSubagent } from '../subagent.js'; +import { defineTool } from '../define-tool.js'; +import { z } from 'zod'; +import type { Model, AssistantMessage, AssistantMessageEvent } from '@mohanscodex/spectra-ai'; +import { AssistantMessageEventStream, registerProvider } from '@mohanscodex/spectra-ai'; +import type { AgentEvent } from '../types.js'; + +const parentModel: Model = { + id: 'claude-sonnet-4-20250514', + name: 'Claude Sonnet 4', + provider: 'test-provider', + api: 'test', +}; + +const cheapModel: Model = { + id: 'deepseek-v4-flash', + name: 'DeepSeek V4 Flash', + provider: 'test-provider', + api: 'test', +}; + +function createMockProvider(name: string, responseSequence: AssistantMessage[][]) { + let callIndex = 0; + + return { + name, + stream(model: Model, context: any) { + const stream = new AssistantMessageEventStream(); + const responses = responseSequence[callIndex] || []; + callIndex++; + + setTimeout(() => { + const partial: AssistantMessage = { + role: 'assistant', + content: [], + provider: model.provider, + model: model.id, + usage: { input: 10, output: 20, cacheRead: 0, cacheWrite: 0, totalTokens: 30 }, + stopReason: 'stop', + timestamp: Date.now(), + }; + + stream.push({ type: 'start', partial }); + + for (let i = 0; i < responses.length; i++) { + const msg = responses[i]; + for (const block of msg.content) { + if (block.type === 'text') { + stream.push({ + type: 'text_delta', + contentIndex: i, + delta: block.text, + partial: { ...partial, content: [block] }, + }); + } else if (block.type === 'toolCall') { + stream.push({ + type: 'toolcall_start', + contentIndex: i, + partial: { ...partial, content: [block] }, + }); + stream.push({ + type: 'toolcall_end', + contentIndex: i, + toolCall: block, + partial: { ...partial, content: [block] }, + }); + } + } + } + + const lastResponse = responses[responses.length - 1] || partial; + stream.push({ + type: 'done', + reason: lastResponse.stopReason, + message: lastResponse, + }); + stream.end(); + }, 10); + + return stream; + }, + }; +} + +function createTextMessage(text: string, stopReason: 'stop' | 'toolUse' = 'stop'): AssistantMessage { + return { + role: 'assistant', + content: [{ type: 'text', text }], + provider: 'test-provider', + model: 'test-model', + usage: { input: 10, output: 20, cacheRead: 0, cacheWrite: 0, totalTokens: 30 }, + stopReason, + timestamp: Date.now(), + }; +} + +function createToolCallMessage(toolCalls: any[]): AssistantMessage { + return { + role: 'assistant', + content: toolCalls, + provider: 'test-provider', + model: 'test-model', + usage: { input: 10, output: 20, cacheRead: 0, cacheWrite: 0, totalTokens: 30 }, + stopReason: 'toolUse', + timestamp: Date.now(), + }; +} + +describe('runSubagent', () => { + beforeEach(() => { + registerProvider(createMockProvider('test-provider', [[]])); + }); + + it('should run a subagent and return the final text', async () => { + registerProvider(createMockProvider('test-provider', [[createTextMessage('Found 12 files in src/auth')]])); + + const result = await runSubagent( + { model: parentModel, systemPrompt: 'You are an explorer.' }, + 'find all auth files', + ); + + expect(result.text).toBe('Found 12 files in src/auth'); + expect(result.aborted).toBe(false); + expect(result.error).toBeUndefined(); + }); + + it('should start with fresh context — no parent history', async () => { + const provider = createMockProvider('test-provider', [[createTextMessage('done')]]); + registerProvider(provider); + + await runSubagent({ model: parentModel }, 'do something'); + + const result = await runSubagent({ model: parentModel }, 'do something else'); + expect(result.messages.length).toBeLessThanOrEqual(3); + }); + + it('should use modelOverride when provided', async () => { + let capturedModel: Model | undefined; + const provider = { + name: 'test-provider', + stream(model: Model, context: any) { + capturedModel = model; + const stream = new AssistantMessageEventStream(); + const msg = createTextMessage('ok'); + setTimeout(() => { + stream.push({ type: 'start', partial: msg }); + stream.push({ type: 'done', reason: 'stop', message: msg }); + stream.end(); + }, 10); + return stream; + }, + }; + registerProvider(provider); + + await runSubagent( + { model: parentModel, modelOverride: cheapModel }, + 'test', + ); + + expect(capturedModel?.id).toBe('deepseek-v4-flash'); + }); + + it('should fall back to parent model when no override', async () => { + let capturedModel: Model | undefined; + const provider = { + name: 'test-provider', + stream(model: Model, context: any) { + capturedModel = model; + const stream = new AssistantMessageEventStream(); + const msg = createTextMessage('ok'); + setTimeout(() => { + stream.push({ type: 'start', partial: msg }); + stream.push({ type: 'done', reason: 'stop', message: msg }); + stream.end(); + }, 10); + return stream; + }, + }; + registerProvider(provider); + + await runSubagent({ model: parentModel }, 'test'); + + expect(capturedModel?.id).toBe('claude-sonnet-4-20250514'); + }); + + it('should forward events via onEvent callback', async () => { + registerProvider(createMockProvider('test-provider', [[createTextMessage('hello')]])); + + const events: AgentEvent[] = []; + await runSubagent( + { model: parentModel, onEvent: (e) => events.push(e) }, + 'test', + ); + + expect(events.length).toBeGreaterThan(0); + expect(events.some((e) => e.type === 'agent_start')).toBe(true); + expect(events.some((e) => e.type === 'agent_end')).toBe(true); + }); + + it('should accumulate usage across turns', async () => { + const echoTool = defineTool({ + name: 'echo', + description: 'Echo back', + parameters: z.object({ text: z.string() }), + execute: async ({ text }) => ({ + content: [{ type: 'text', text: `Echo: ${text}` }], + }), + }); + + const responses = [ + [createToolCallMessage([{ type: 'toolCall', id: 'tc1', name: 'echo', arguments: { text: 'hi' } }])], + [createTextMessage('final answer')], + ]; + registerProvider(createMockProvider('test-provider', responses)); + + const result = await runSubagent( + { model: parentModel, tools: [echoTool] }, + 'echo hi then answer', + ); + + expect(result.usage.input).toBe(20); + expect(result.usage.output).toBe(40); + expect(result.usage.totalTokens).toBe(60); + expect(result.text).toBe('final answer'); + }); + + it('should return full transcript in messages', async () => { + const echoTool = defineTool({ + name: 'echo', + description: 'Echo', + parameters: z.object({ text: z.string() }), + execute: async ({ text }) => ({ + content: [{ type: 'text', text: `Echo: ${text}` }], + }), + }); + + const responses = [ + [createToolCallMessage([{ type: 'toolCall', id: 'tc1', name: 'echo', arguments: { text: 'hi' } }])], + [createTextMessage('done')], + ]; + registerProvider(createMockProvider('test-provider', responses)); + + const result = await runSubagent( + { model: parentModel, tools: [echoTool] }, + 'test', + ); + + expect(result.messages.length).toBeGreaterThan(2); + expect(result.text).toBe('done'); + }); + + it('should propagate abort signal to child', async () => { + const controller = new AbortController(); + const provider = { + name: 'test-provider', + stream(model: Model, context: any) { + const stream = new AssistantMessageEventStream(); + const msg = createTextMessage('long response'); + setTimeout(() => { + stream.push({ type: 'start', partial: msg }); + stream.push({ type: 'text_delta', contentIndex: 0, delta: 'long', partial: msg }); + stream.push({ type: 'done', reason: 'aborted', message: msg }); + stream.end(); + }, 50); + return stream; + }, + }; + registerProvider(provider); + + setTimeout(() => controller.abort(), 20); + + const result = await runSubagent( + { model: parentModel, signal: controller.signal }, + 'test', + ); + + expect(result.aborted).toBe(true); + }); + + it('should enforce budget.maxTurns when set', async () => { + const responses = [ + [createTextMessage('turn 1', 'toolUse')], + [createTextMessage('turn 2', 'toolUse')], + [createTextMessage('turn 3', 'toolUse')], + [createTextMessage('turn 4')], + ]; + registerProvider(createMockProvider('test-provider', responses)); + + const noopTool = defineTool({ + name: 'noop', + description: 'No-op', + parameters: z.object({}), + execute: async () => ({ content: [{ type: 'text', text: 'ok' }] }), + }); + + const result = await runSubagent( + { model: parentModel, tools: [noopTool], budget: { maxTurns: 2 } }, + 'test', + ); + + expect(result.aborted).toBe(false); + expect(result.messages.filter((m) => m.role === 'assistant').length).toBeLessThanOrEqual(2); + }); + + it('should be uncapped when budget is undefined', async () => { + const responses = [ + [createTextMessage('turn 1')], + ]; + registerProvider(createMockProvider('test-provider', responses)); + + const result = await runSubagent( + { model: parentModel }, + 'test', + ); + + expect(result.aborted).toBe(false); + expect(result.error).toBeUndefined(); + }); + + it('should enforce budget.maxTokens when exceeded', async () => { + const responses = [ + [createToolCallMessage([{ type: 'toolCall', id: 'tc1', name: 'noop', arguments: {} }])], + [createTextMessage('turn 2')], + ]; + registerProvider(createMockProvider('test-provider', responses)); + + const noopTool = defineTool({ + name: 'noop', + description: 'No-op', + parameters: z.object({}), + execute: async () => ({ content: [{ type: 'text', text: 'ok' }] }), + }); + + const result = await runSubagent( + { model: parentModel, tools: [noopTool], budget: { maxTokens: 40 } }, + 'test', + ); + + expect(result.aborted).toBe(true); + expect(result.error).toContain('Budget exceeded'); + }); + + it('should enforce budget.timeoutMs when exceeded', async () => { + const provider = { + name: 'test-provider', + stream(model: Model, context: any) { + const stream = new AssistantMessageEventStream(); + const msg = createTextMessage('slow'); + setTimeout(() => { + stream.push({ type: 'start', partial: msg }); + stream.push({ type: 'done', reason: 'stop', message: msg }); + stream.end(); + }, 200); + return stream; + }, + }; + registerProvider(provider); + + const result = await runSubagent( + { model: parentModel, budget: { timeoutMs: 50 } }, + 'test', + ); + + expect(result.aborted).toBe(true); + }); + + it('should return error when the agent fails', async () => { + const provider = { + name: 'test-provider', + stream() { + throw new Error('Provider down'); + }, + }; + registerProvider(provider); + + const result = await runSubagent( + { model: parentModel }, + 'test', + ); + + expect(result.error).toBeDefined(); + expect(result.aborted).toBe(false); + }); + + it('should not call onEvent when not provided', async () => { + registerProvider(createMockProvider('test-provider', [[createTextMessage('ok')]])); + + const result = await runSubagent( + { model: parentModel }, + 'test', + ); + + expect(result.text).toBe('ok'); + }); + + it('should pass tools to child agent', async () => { + const echoTool = defineTool({ + name: 'echo', + description: 'Echo', + parameters: z.object({ text: z.string() }), + execute: async ({ text }) => ({ + content: [{ type: 'text', text: `Echo: ${text}` }], + }), + }); + + const responses = [ + [createToolCallMessage([{ type: 'toolCall', id: 'tc1', name: 'echo', arguments: { text: 'hi' } }])], + [createTextMessage('Echoed: hi')], + ]; + registerProvider(createMockProvider('test-provider', responses)); + + const result = await runSubagent( + { model: parentModel, tools: [echoTool] }, + 'echo hi', + ); + + expect(result.text).toBe('Echoed: hi'); + expect(result.messages.some((m) => m.role === 'toolResult')).toBe(true); + }); +}); diff --git a/packages/agent/src/index.ts b/packages/agent/src/index.ts index 7d46282..4b955a2 100644 --- a/packages/agent/src/index.ts +++ b/packages/agent/src/index.ts @@ -1,5 +1,6 @@ export { Agent } from './agent.js'; export { defineTool } from './define-tool.js'; +export { runSubagent } from './subagent.js'; export type { Skill, SkillMetadata, @@ -48,4 +49,5 @@ export type { RetryContext, RetryDecision, } from './types.js'; +export type { SubagentConfig, SubagentResult, SubagentBudget } from './subagent.js'; export type { AssistantMessageEvent } from '@mohanscodex/spectra-ai'; diff --git a/packages/agent/src/subagent.ts b/packages/agent/src/subagent.ts new file mode 100644 index 0000000..bab1b4a --- /dev/null +++ b/packages/agent/src/subagent.ts @@ -0,0 +1,164 @@ +import { Agent } from './agent.js'; +import type { Model, Message, AssistantMessage, Usage, StreamOptions } from '@mohanscodex/spectra-ai'; +import type { AgentConfig, AgentEvent } from './types.js'; + +export interface SubagentBudget { + maxTurns?: number; + maxTokens?: number; + timeoutMs?: number; +} + +export interface SubagentConfig extends AgentConfig { + modelOverride?: Model; + budget?: SubagentBudget; + signal?: AbortSignal; + label?: string; + onEvent?: (event: AgentEvent) => void; + streamOptions?: StreamOptions; + convertToLlm?: (messages: Message[]) => Message[] | Promise; + maxRetryDelayMs?: number; +} + +export interface SubagentResult { + text: string; + usage: Usage; + messages: Message[]; + aborted: boolean; + error?: string; +} + +const ZERO_USAGE: Usage = { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, +}; + +export async function runSubagent(config: SubagentConfig, prompt: string): Promise { + const model = config.modelOverride ?? config.model; + const budget = config.budget; + + const child = new Agent({ + model, + systemPrompt: config.systemPrompt, + tools: config.tools, + maxTurns: budget?.maxTurns ?? config.maxTurns, + toolExecution: config.toolExecution, + beforeToolCall: config.beforeToolCall, + afterToolCall: config.afterToolCall, + transformContext: config.transformContext, + getApiKey: config.getApiKey, + onRetry: config.onRetry, + streamOptions: config.streamOptions, + convertToLlm: config.convertToLlm, + maxRetryDelayMs: config.maxRetryDelayMs, + }); + + const controller = new AbortController(); + let aborted = false; + + if (config.signal) { + if (config.signal.aborted) { + aborted = true; + controller.abort(); + } else { + config.signal.addEventListener('abort', () => { + aborted = true; + controller.abort(); + }, { once: true }); + } + } + + let timeoutId: ReturnType | undefined; + if (budget?.timeoutMs) { + timeoutId = setTimeout(() => { + aborted = true; + controller.abort(); + }, budget.timeoutMs); + } + + const usage: Usage = { ...ZERO_USAGE }; + let budgetExceeded = false; + + try { + for await (const event of child.run(prompt, { signal: controller.signal })) { + config.onEvent?.(event); + + if (event.type === 'message_end' && event.message.role === 'assistant') { + const msg = event.message as AssistantMessage; + if (msg.usage) { + usage.input += msg.usage.input || 0; + usage.output += msg.usage.output || 0; + usage.cacheRead += msg.usage.cacheRead || 0; + usage.cacheWrite += msg.usage.cacheWrite || 0; + usage.totalTokens += msg.usage.totalTokens || 0; + if (msg.usage.cost) { + if (!usage.cost) { + usage.cost = { ...msg.usage.cost }; + } else { + usage.cost.input += msg.usage.cost.input || 0; + usage.cost.output += msg.usage.cost.output || 0; + usage.cost.cacheRead += msg.usage.cost.cacheRead || 0; + usage.cost.cacheWrite += msg.usage.cost.cacheWrite || 0; + usage.cost.total += msg.usage.cost.total || 0; + } + } + } + + if (budget?.maxTokens && usage.totalTokens > budget.maxTokens) { + budgetExceeded = true; + controller.abort(); + } + } + } + } catch (err) { + if (!aborted && !budgetExceeded && !controller.signal.aborted) { + return { + text: '', + usage, + messages: child.messages, + aborted: false, + error: err instanceof Error ? err.message : String(err), + }; + } + } finally { + if (timeoutId) clearTimeout(timeoutId); + } + + const text = extractFinalText(child.messages); + const childError = detectError(child.messages); + + return { + text, + usage, + messages: child.messages, + aborted: aborted || budgetExceeded || controller.signal.aborted, + error: budgetExceeded + ? `Budget exceeded: maxTokens (${budget?.maxTokens}) reached` + : childError, + }; +} + +function extractFinalText(messages: Message[]): string { + for (let i = messages.length - 1; i >= 0; i--) { + const msg = messages[i]; + if (msg.role === 'assistant') { + const textBlocks = msg.content.filter( + (c): c is { type: 'text'; text: string } => c.type === 'text', + ); + return textBlocks.map((b) => b.text).join(''); + } + } + return ''; +} + +function detectError(messages: Message[]): string | undefined { + for (let i = messages.length - 1; i >= 0; i--) { + const msg = messages[i]; + if (msg.role === 'assistant' && (msg as AssistantMessage).stopReason === 'error') { + return (msg as AssistantMessage).errorMessage ?? 'Unknown error'; + } + } + return undefined; +}