diff --git a/packages/sdk/src/workflows/__tests__/budget-enforcement.test.ts b/packages/sdk/src/workflows/__tests__/budget-enforcement.test.ts new file mode 100644 index 000000000..a164b74e0 --- /dev/null +++ b/packages/sdk/src/workflows/__tests__/budget-enforcement.test.ts @@ -0,0 +1,170 @@ +import { mkdtempSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import path from 'node:path'; + +import { afterEach, describe, expect, it, vi } from 'vitest'; +import type { CliSessionReport } from '../cli-session-collector.js'; +import type { RelayYamlConfig } from '../types.js'; + +const mockedReports = vi.hoisted(() => ({ + queue: [] as Array, +})); + +vi.mock('@relaycast/sdk', () => ({ + RelayCast: vi.fn(), + RelayError: class RelayError extends Error {}, +})); + +vi.mock('../../relay.js', () => ({ + AgentRelay: vi.fn(), +})); + +vi.mock('../cli-session-collector.js', () => ({ + collectCliSession: vi.fn(async () => mockedReports.queue.shift() ?? null), +})); + +const { WorkflowRunner } = await import('../runner.js'); +const { InMemoryWorkflowDb } = await import('../memory-db.js'); + +const tempDirs: string[] = []; + +function createWorkspace(): string { + const dir = mkdtempSync(path.join(tmpdir(), 'workflow-budget-')); + tempDirs.push(dir); + return dir; +} + +function makeReport( + input: number, + output: number, + finalStatus: 'completed' | 'failed' = 'completed' +): CliSessionReport { + return { + cli: 'codex' as const, + sessionId: `session-${input}-${output}`, + model: 'gpt-5', + provider: 'openai', + durationMs: 1_000, + cost: null, + tokens: { input, output, cacheRead: 0 }, + turns: 1, + toolCalls: [], + errors: [], + finalStatus, + summary: `used ${input + output} tokens`, + }; +} + +function makeConfig(overrides?: { + tokenBudget?: number; + maxTokens?: number; + retries?: number; +}): RelayYamlConfig { + return { + version: '1', + name: 'budget-enforcement', + swarm: { + pattern: 'dag', + tokenBudget: overrides?.tokenBudget, + }, + agents: [ + { + name: 'worker', + cli: 'codex', + interactive: false, + constraints: overrides?.maxTokens ? { maxTokens: overrides.maxTokens } : undefined, + }, + ], + workflows: [ + { + name: 'default', + steps: [ + { + name: 'step-1', + agent: 'worker', + task: 'Do the first task', + retries: overrides?.retries, + }, + { + name: 'step-2', + agent: 'worker', + task: 'Do the second task', + dependsOn: ['step-1'], + }, + ], + }, + ], + trajectories: false, + }; +} + +afterEach(() => { + mockedReports.queue = []; + vi.clearAllMocks(); + while (tempDirs.length > 0) { + rmSync(tempDirs.pop()!, { recursive: true, force: true }); + } +}); + +describe('WorkflowRunner token budget enforcement', () => { + it('blocks later steps once the workflow budget is exhausted', async () => { + mockedReports.queue = [makeReport(60, 50)]; + + const db = new InMemoryWorkflowDb(); + const executor = { + executeAgentStep: vi.fn(async (step) => `completed ${step.name}`), + }; + const runner = new WorkflowRunner({ + cwd: createWorkspace(), + db, + executor, + }); + + const run = await runner.execute(makeConfig({ tokenBudget: 100, maxTokens: 80 }), 'default'); + const steps = await db.getStepsByRunId(run.id); + + expect(run.status).toBe('failed'); + expect(executor.executeAgentStep).toHaveBeenCalledTimes(1); + expect(steps.find((step) => step.stepName === 'step-1')?.status).toBe('completed'); + expect(steps.find((step) => step.stepName === 'step-2')?.status).toBe('failed'); + expect(steps.find((step) => step.stepName === 'step-2')?.completionReason).toBe( + 'failed_budget_exceeded' + ); + expect(steps.find((step) => step.stepName === 'step-2')?.error).toContain( + 'Workflow exceeded workflow budget' + ); + }); + + it('counts failed attempts against the same workflow budget before retrying later steps', async () => { + mockedReports.queue = [makeReport(40, 20, 'failed'), makeReport(30, 20)]; + + const db = new InMemoryWorkflowDb(); + const executor = { + executeAgentStep: vi + .fn() + .mockRejectedValueOnce(new Error('first attempt failed')) + .mockResolvedValueOnce('step-1 recovered'), + }; + const runner = new WorkflowRunner({ + cwd: createWorkspace(), + db, + executor, + }); + + const run = await runner.execute( + makeConfig({ tokenBudget: 100, maxTokens: 80, retries: 1 }), + 'default' + ); + const steps = await db.getStepsByRunId(run.id); + + expect(run.status).toBe('failed'); + expect(executor.executeAgentStep).toHaveBeenCalledTimes(2); + expect(steps.find((step) => step.stepName === 'step-1')?.status).toBe('completed'); + expect(steps.find((step) => step.stepName === 'step-2')?.completionReason).toBe( + 'failed_budget_exceeded' + ); + expect(steps.find((step) => step.stepName === 'step-2')?.error).toContain( + 'Workflow exceeded workflow budget' + ); + }); +}); diff --git a/packages/sdk/src/workflows/__tests__/budget-tracker.test.ts b/packages/sdk/src/workflows/__tests__/budget-tracker.test.ts new file mode 100644 index 000000000..49e4356b8 --- /dev/null +++ b/packages/sdk/src/workflows/__tests__/budget-tracker.test.ts @@ -0,0 +1,114 @@ +import { describe, expect, it } from 'vitest'; + +import { BudgetExceededError, BudgetTracker } from '../budget-tracker.js'; + +describe('BudgetTracker', () => { + it('tracks usage across multiple steps', () => { + const tracker = new BudgetTracker({ perAgent: 100, perWorkflow: 500 }); + + tracker.recordUsage('plan', { input: 10, output: 5, cacheRead: 3 }); + tracker.recordUsage('plan', { input: 1, output: 2 }); + tracker.recordUsage('implement', { input: 20, output: 8, cacheRead: 4 }); + + expect(tracker.getStepUsage('plan')).toEqual({ + input: 11, + output: 7, + cacheRead: 3, + total: 18, + }); + expect(tracker.getStepUsage('implement')).toEqual({ + input: 20, + output: 8, + cacheRead: 4, + total: 28, + }); + expect(tracker.getTotalUsage()).toEqual({ + input: 31, + output: 15, + cacheRead: 7, + total: 46, + }); + expect(tracker.getRemainingBudget()).toEqual({ + agent: 72, + workflow: 454, + }); + }); + + it('detects per-agent budget overruns without counting cache reads', () => { + const tracker = new BudgetTracker({ perAgent: 30 }); + + tracker.recordUsage('worker-a', { input: 20, output: 15, cacheRead: 999 }); + + expect(tracker.isOverBudget('worker-a')).toEqual({ + over: true, + reason: 'Step "worker-a" exceeded per-agent budget (35/30 tokens used)', + }); + }); + + it('detects per-workflow budget overruns', () => { + const tracker = new BudgetTracker({ perWorkflow: 50 }); + + tracker.recordUsage('step-1', { input: 20, output: 10 }); + tracker.recordUsage('step-2', { input: 5, output: 20 }); + + expect(tracker.isOverBudget('step-2')).toEqual({ + over: true, + reason: 'Workflow budget exceeded after step "step-2" (55/50 tokens used)', + }); + expect(tracker.getRemainingBudget()).toEqual({ + agent: null, + workflow: -5, + }); + }); + + it('prevents spawning when workflow budget is nearly exhausted', () => { + const tracker = new BudgetTracker({ perAgent: 100, perWorkflow: 500 }); + + tracker.recordUsage('planner', { input: 250, output: 241 }); + + expect(tracker.checkCanSpawn('implementer')).toEqual({ + allowed: false, + reason: 'Cannot spawn "implementer": remaining workflow budget 9 is below 10% of per-agent budget 10', + }); + }); + + it('maintains correct totals when async callers record usage in parallel', async () => { + const tracker = new BudgetTracker({ perAgent: 1_000, perWorkflow: 1_000 }); + + await Promise.all( + Array.from({ length: 100 }, async (_, index) => { + await Promise.resolve(); + tracker.recordUsage(`step-${index % 4}`, { + input: 2, + output: 3, + cacheRead: index % 2, + }); + }), + ); + + expect(tracker.getTotalUsage()).toEqual({ + input: 200, + output: 300, + cacheRead: 50, + total: 500, + }); + expect(tracker.getStepUsage('step-0').total).toBe(125); + expect(tracker.getStepUsage('step-1').total).toBe(125); + expect(tracker.getStepUsage('step-2').total).toBe(125); + expect(tracker.getStepUsage('step-3').total).toBe(125); + }); +}); + +describe('BudgetExceededError', () => { + it('exposes structured budget overrun details', () => { + const error = new BudgetExceededError('review', 'workflow', 500, 550); + + expect(error).toBeInstanceOf(Error); + expect(error.name).toBe('BudgetExceededError'); + expect(error.message).toBe('Workflow exceeded workflow budget: 550 tokens used of 500'); + expect(error.stepName).toBe('review'); + expect(error.budgetType).toBe('workflow'); + expect(error.limit).toBe(500); + expect(error.actual).toBe(550); + }); +}); diff --git a/packages/sdk/src/workflows/__tests__/run-summary-table.test.ts b/packages/sdk/src/workflows/__tests__/run-summary-table.test.ts index 313781c6f..7afcdba13 100644 --- a/packages/sdk/src/workflows/__tests__/run-summary-table.test.ts +++ b/packages/sdk/src/workflows/__tests__/run-summary-table.test.ts @@ -95,6 +95,61 @@ describe('formatRunSummaryTable', () => { expect(output).toContain(' └─ Error [turn 1] Error: database locked'); }); + it('renders budget usage and over-budget markers when summaries are provided', () => { + const output = formatRunSummaryTable( + [ + { name: 'plan', agent: 'lead', status: 'completed', attempts: 1, durationMs: 1_000 }, + { name: 'implement', agent: 'worker', status: 'completed', attempts: 2, durationMs: 2_000 }, + ], + new Map([ + [ + 'plan', + { + cli: 'claude', + sessionId: 's1', + model: 'claude-sonnet-4', + provider: 'anthropic', + durationMs: 1_200, + cost: 0.75, + tokens: { input: 100, output: 50, cacheRead: 10 }, + turns: 2, + toolCalls: [], + errors: [], + finalStatus: 'completed', + summary: 'planned', + }, + ], + [ + 'implement', + { + cli: 'codex', + sessionId: 's2', + model: 'gpt-5', + provider: 'openai', + durationMs: 3_400, + cost: 1.25, + tokens: { input: 300, output: 90, cacheRead: 20 }, + turns: 4, + toolCalls: [], + errors: [{ turn: 2, text: 'Error: recovered after retry' }], + finalStatus: 'completed', + summary: 'implemented', + }, + ], + ]), + new Map([ + ['plan', { used: 150, limit: 200, over: false }], + ['implement', { used: 390, limit: 350, over: true }], + ]), + { used: 540, limit: 500, over: true } + ); + + expect(output).toContain('Budget'); + expect(output).toContain('150/200'); + expect(output).toContain('390/350 [OVER]'); + expect(output).toContain('540/500 [OVER]'); + }); + it('renders deterministic steps without reports using placeholder columns', () => { const output = formatRunSummaryTable( [{ name: 'lint', agent: 'shell', status: 'completed', attempts: 1, durationMs: 900 }], diff --git a/packages/sdk/src/workflows/budget-tracker.ts b/packages/sdk/src/workflows/budget-tracker.ts new file mode 100644 index 000000000..66f04963f --- /dev/null +++ b/packages/sdk/src/workflows/budget-tracker.ts @@ -0,0 +1,207 @@ +export interface TokenUsage { + input: number; + output: number; + cacheRead: number; + total: number; +} + +export interface BudgetTrackerConfig { + perAgent?: number; + perWorkflow?: number; +} + +interface UsageDelta { + input: number; + output: number; + cacheRead?: number; +} + +function emptyUsage(): TokenUsage { + return { + input: 0, + output: 0, + cacheRead: 0, + total: 0, + }; +} + +function cloneUsage(usage: TokenUsage): TokenUsage { + return { + input: usage.input, + output: usage.output, + cacheRead: usage.cacheRead, + total: usage.total, + }; +} + +function validateBudget(name: string, value: number | undefined): void { + if (value === undefined) { + return; + } + + if (!Number.isFinite(value) || value < 0) { + throw new RangeError(`${name} must be a finite number greater than or equal to 0`); + } +} + +function normalizeCount(name: string, value: number | undefined): number { + const normalized = value ?? 0; + + if (!Number.isFinite(normalized) || normalized < 0) { + throw new RangeError(`${name} must be a finite number greater than or equal to 0`); + } + + return normalized; +} + +function normalizeUsage(tokens: UsageDelta): TokenUsage { + const input = normalizeCount('input tokens', tokens.input); + const output = normalizeCount('output tokens', tokens.output); + const cacheRead = normalizeCount('cache read tokens', tokens.cacheRead); + + return { + input, + output, + cacheRead, + total: input + output, + }; +} + +function addUsage(current: TokenUsage, delta: TokenUsage): TokenUsage { + const input = current.input + delta.input; + const output = current.output + delta.output; + const cacheRead = current.cacheRead + delta.cacheRead; + + return { + input, + output, + cacheRead, + total: input + output, + }; +} + +export class BudgetExceededError extends Error { + readonly stepName: string; + readonly budgetType: 'agent' | 'workflow'; + readonly limit: number; + readonly actual: number; + + constructor(stepName: string, budgetType: 'agent' | 'workflow', limit: number, actual: number) { + const scope = budgetType === 'agent' ? `Step "${stepName}"` : `Workflow`; + super(`${scope} exceeded ${budgetType} budget: ${actual} tokens used of ${limit}`); + this.name = 'BudgetExceededError'; + this.stepName = stepName; + this.budgetType = budgetType; + this.limit = limit; + this.actual = actual; + } +} + +export class BudgetTracker { + private readonly perAgent?: number; + private readonly perWorkflow?: number; + private readonly stepBudgets = new Map(); + private readonly usageByStep = new Map(); + private totalUsage: TokenUsage = emptyUsage(); + + constructor(config: BudgetTrackerConfig) { + validateBudget('perAgent', config.perAgent); + validateBudget('perWorkflow', config.perWorkflow); + + this.perAgent = config.perAgent; + this.perWorkflow = config.perWorkflow; + } + + recordUsage(stepName: string, tokens: { input: number; output: number; cacheRead?: number }): void { + const delta = normalizeUsage(tokens); + const currentStepUsage = this.usageByStep.get(stepName) ?? emptyUsage(); + + // Keep the full mutation synchronous so async callers only interleave at + // event-loop boundaries, not inside a partial update. + this.usageByStep.set(stepName, addUsage(currentStepUsage, delta)); + this.totalUsage = addUsage(this.totalUsage, delta); + } + + getStepUsage(stepName: string): TokenUsage { + return cloneUsage(this.usageByStep.get(stepName) ?? emptyUsage()); + } + + getTotalUsage(): TokenUsage { + return cloneUsage(this.totalUsage); + } + + setStepBudget(stepName: string, limit: number | undefined): void { + validateBudget(`budget for step "${stepName}"`, limit); + + if (limit === undefined) { + this.stepBudgets.delete(stepName); + return; + } + + this.stepBudgets.set(stepName, limit); + } + + getStepBudget(stepName: string): number | null { + return this.stepBudgets.get(stepName) ?? null; + } + + getRemainingBudget(): { agent: number | null; workflow: number | null } { + const largestStepTotal = Array.from(this.usageByStep.values()).reduce( + (largest, usage) => Math.max(largest, usage.total), + 0, + ); + + return { + agent: this.perAgent === undefined ? null : this.perAgent - largestStepTotal, + workflow: this.perWorkflow === undefined ? null : this.perWorkflow - this.totalUsage.total, + }; + } + + checkCanSpawn(agentName: string): { allowed: boolean; reason?: string } { + if (this.perWorkflow === undefined) { + return { allowed: true }; + } + + if (this.totalUsage.total >= this.perWorkflow) { + return { + allowed: false, + reason: `Cannot spawn "${agentName}": workflow budget exhausted (${this.totalUsage.total}/${this.perWorkflow} tokens used)`, + }; + } + + if (this.perAgent !== undefined) { + const remainingWorkflow = this.perWorkflow - this.totalUsage.total; + const minimumHeadroom = this.perAgent * 0.1; + + if (remainingWorkflow < minimumHeadroom) { + return { + allowed: false, + reason: `Cannot spawn "${agentName}": remaining workflow budget ${remainingWorkflow} is below 10% of per-agent budget ${minimumHeadroom}`, + }; + } + } + + return { allowed: true }; + } + + isOverBudget(stepName: string): { over: boolean; reason?: string } { + const stepUsage = this.getStepUsage(stepName); + const stepBudget = this.stepBudgets.get(stepName) ?? this.perAgent; + + if (stepBudget !== undefined && stepUsage.total > stepBudget) { + return { + over: true, + reason: `Step "${stepName}" exceeded per-agent budget (${stepUsage.total}/${stepBudget} tokens used)`, + }; + } + + if (this.perWorkflow !== undefined && this.totalUsage.total > this.perWorkflow) { + return { + over: true, + reason: `Workflow budget exceeded after step "${stepName}" (${this.totalUsage.total}/${this.perWorkflow} tokens used)`, + }; + } + + return { over: false }; + } +} diff --git a/packages/sdk/src/workflows/index.ts b/packages/sdk/src/workflows/index.ts index 1680687a3..4b9e2c552 100644 --- a/packages/sdk/src/workflows/index.ts +++ b/packages/sdk/src/workflows/index.ts @@ -2,6 +2,7 @@ export * from './types.js'; export * from './runner.js'; export * from './custom-steps.js'; export * from './cli-session-collector.js'; +export * from './budget-tracker.js'; export * from './channel-messenger.js'; export * from './process-spawner.js'; export * from './run-summary-table.js'; diff --git a/packages/sdk/src/workflows/run-summary-table.ts b/packages/sdk/src/workflows/run-summary-table.ts index fac3a2a4a..0769be95c 100644 --- a/packages/sdk/src/workflows/run-summary-table.ts +++ b/packages/sdk/src/workflows/run-summary-table.ts @@ -1,6 +1,18 @@ import type { CliSessionReport } from './cli-session-collector.js'; import type { StepOutcome } from './trajectory.js'; +export interface StepBudgetSummary { + used: number; + limit: number | null; + over: boolean; +} + +export interface WorkflowBudgetSummary { + used: number; + limit: number | null; + over: boolean; +} + function formatCurrency(value: number | null | undefined): string { return typeof value === 'number' ? `$${value.toFixed(2)}` : '--'; } @@ -11,6 +23,19 @@ function formatTokens(report: CliSessionReport | undefined): string { return total.toLocaleString('en-US'); } +function formatBudget(summary: StepBudgetSummary | undefined): string { + if (!summary?.limit) return '--'; + + const base = `${summary.used.toLocaleString('en-US')}/${summary.limit.toLocaleString('en-US')}`; + return summary.over ? `${base} [OVER]` : base; +} + +function formatWorkflowBudget(summary: WorkflowBudgetSummary | undefined): string { + if (!summary?.limit) return '--'; + const base = `${summary.used.toLocaleString('en-US')}/${summary.limit.toLocaleString('en-US')}`; + return summary.over ? `${base} [OVER]` : base; +} + function formatDuration(durationMs: number | null | undefined): string { if (typeof durationMs !== 'number' || !Number.isFinite(durationMs)) return '--'; if (durationMs < 1000) return `${durationMs}ms`; @@ -40,18 +65,29 @@ function formatErrors(outcome: StepOutcome, report: CliSessionReport | undefined export function formatRunSummaryTable( outcomes: StepOutcome[], - reports: Map + reports: Map, + stepBudgets: Map = new Map(), + workflowBudget?: WorkflowBudgetSummary ): string { // Only show the Cost column when at least one report has reliable cost data // (currently only OpenCode populates cost; Claude and Codex return null) const hasCost = Array.from(reports.values()).some((r) => typeof r.cost === 'number' && r.cost > 0); + const hasBudget = workflowBudget?.limit !== undefined || stepBudgets.size > 0; const headers = hasCost - ? ['Step', 'Status', 'Model', 'Cost', 'Tokens', 'Duration', 'Errors'] - : ['Step', 'Status', 'Model', 'Tokens', 'Duration', 'Errors']; + ? hasBudget + ? ['Step', 'Status', 'Model', 'Cost', 'Tokens', 'Budget', 'Duration', 'Errors'] + : ['Step', 'Status', 'Model', 'Cost', 'Tokens', 'Duration', 'Errors'] + : hasBudget + ? ['Step', 'Status', 'Model', 'Tokens', 'Budget', 'Duration', 'Errors'] + : ['Step', 'Status', 'Model', 'Tokens', 'Duration', 'Errors']; const widths = hasCost - ? [20, 6, 16, 8, 10, 10, 10] - : [20, 6, 16, 10, 10, 10]; + ? hasBudget + ? [20, 6, 16, 8, 10, 18, 10, 10] + : [20, 6, 16, 8, 10, 10, 10] + : hasBudget + ? [20, 6, 16, 10, 18, 10, 10] + : [20, 6, 16, 10, 10, 10]; const lines: string[] = []; lines.push(headers.map((h, i) => { @@ -82,8 +118,12 @@ export function formatRunSummaryTable( if (hasCost) cols.push(pad(formatCurrency(report?.cost), widths[3], 'right')); const tokenIdx = hasCost ? 4 : 3; cols.push(pad(formatTokens(report), widths[tokenIdx], 'right')); - cols.push(pad(formatDuration(reportDuration), widths[tokenIdx + 1], 'right')); - cols.push(pad(formatErrors(outcome, report), widths[tokenIdx + 2], 'right')); + if (hasBudget) { + cols.push(pad(formatBudget(stepBudgets.get(outcome.name)), widths[tokenIdx + 1], 'right')); + } + const durationIdx = hasBudget ? tokenIdx + 2 : tokenIdx + 1; + cols.push(pad(formatDuration(reportDuration), widths[durationIdx], 'right')); + cols.push(pad(formatErrors(outcome, report), widths[durationIdx + 1], 'right')); lines.push(cols.join(' ')); @@ -102,8 +142,12 @@ export function formatRunSummaryTable( if (hasCost) totalCols.push(pad(formatCurrency(totalCost), widths[3], 'right')); const tokenIdx = hasCost ? 4 : 3; totalCols.push(pad(totalTokens > 0 ? totalTokens.toLocaleString('en-US') : '--', widths[tokenIdx], 'right')); - totalCols.push(pad(formatDuration(totalDurationMs), widths[tokenIdx + 1], 'right')); - totalCols.push(pad('', widths[tokenIdx + 2], 'right')); + if (hasBudget) { + totalCols.push(pad(formatWorkflowBudget(workflowBudget), widths[tokenIdx + 1], 'right')); + } + const durationIdx = hasBudget ? tokenIdx + 2 : tokenIdx + 1; + totalCols.push(pad(formatDuration(totalDurationMs), widths[durationIdx], 'right')); + totalCols.push(pad('', widths[durationIdx + 1], 'right')); lines.push(totalCols.join(' ')); return lines.map((line) => ` ${line}`).join('\n'); diff --git a/packages/sdk/src/workflows/runner.ts b/packages/sdk/src/workflows/runner.ts index dc55a823f..43b251847 100644 --- a/packages/sdk/src/workflows/runner.ts +++ b/packages/sdk/src/workflows/runner.ts @@ -45,7 +45,12 @@ import { executeApiStep } from './api-executor.js'; import { ChannelMessenger } from './channel-messenger.js'; import { InMemoryWorkflowDb } from './memory-db.js'; import { buildCommand as buildProcessCommand, spawnProcess } from './process-spawner.js'; -import { formatRunSummaryTable } from './run-summary-table.js'; +import { BudgetExceededError, BudgetTracker } from './budget-tracker.js'; +import { + formatRunSummaryTable, + type StepBudgetSummary, + type WorkflowBudgetSummary, +} from './run-summary-table.js'; import { StepExecutor as WorkflowStepLifecycleExecutor, type StepExecutorDeps as WorkflowStepLifecycleExecutorDeps, @@ -455,6 +460,12 @@ export class WorkflowRunner { private readonly activeReviewers = new Map(); /** Structured CLI session reports captured during the current run, keyed by step name. */ private readonly agentReports = new Map(); + /** Shared workflow token budget tracker for the current run, when enabled. */ + private budgetTracker?: BudgetTracker; + /** Per-step enforced token budgets resolved from agent constraints. */ + private readonly stepBudgetLimits = new Map(); + /** Steps whose post-execution token usage exceeded an enforced budget. */ + private readonly overBudgetSteps = new Set(); private static readonly PTY_TASK_ARG_SIZE_LIMIT = 2 * 1024 * 1024; // 2 MB constructor(options: WorkflowRunnerOptions = {}) { @@ -1423,6 +1434,118 @@ export class WorkflowRunner { }; } + private createBudgetTracker(config: RelayYamlConfig): BudgetTracker | undefined { + const stepLimits = config.agents + .map((agent) => agent.constraints?.maxTokens) + .filter((value): value is number => typeof value === 'number'); + const largestStepBudget = stepLimits.length > 0 ? Math.max(...stepLimits) : undefined; + + if (config.swarm.tokenBudget === undefined && largestStepBudget === undefined) { + return undefined; + } + + return new BudgetTracker({ + perAgent: largestStepBudget, + perWorkflow: config.swarm.tokenBudget, + }); + } + + private resolveStepTokenBudget( + ownerDef: AgentDefinition, + specialistDef: AgentDefinition + ): number | undefined { + const limits = [specialistDef.constraints?.maxTokens, ownerDef.constraints?.maxTokens].filter( + (value): value is number => typeof value === 'number' + ); + if (limits.length === 0) return undefined; + return Math.min(...limits); + } + + private applyBudgetReport( + stepName: string, + stepBudget: number | undefined, + report: CliSessionReport | null + ): { + stepUsed: number; + stepLimit: number | null; + stepOver: boolean; + workflowUsed: number; + workflowLimit: number | null; + workflowOver: boolean; + } | null { + if (!this.budgetTracker || !report?.tokens) { + return null; + } + + this.budgetTracker.recordUsage(stepName, { + input: report.tokens.input, + output: report.tokens.output, + cacheRead: report.tokens.cacheRead, + }); + + if (stepBudget !== undefined) { + this.stepBudgetLimits.set(stepName, stepBudget); + this.budgetTracker.setStepBudget(stepName, stepBudget); + } + + const stepUsed = this.budgetTracker.getStepUsage(stepName).total; + const workflowUsed = this.budgetTracker.getTotalUsage().total; + const stepLimit = this.stepBudgetLimits.get(stepName) ?? null; + const workflowLimit = this.currentConfig?.swarm.tokenBudget ?? null; + const stepOver = stepLimit !== null && stepUsed > stepLimit; + const workflowOver = workflowLimit !== null && workflowUsed > workflowLimit; + + if (stepOver) { + this.overBudgetSteps.add(stepName); + this.log(`[budget] Step "${stepName}" exceeded token budget (${stepUsed}/${stepLimit})`); + } + + if (workflowOver) { + this.log( + `[budget] Workflow token budget exceeded after step "${stepName}" (${workflowUsed}/${workflowLimit})` + ); + } + + return { + stepUsed, + stepLimit, + stepOver, + workflowUsed, + workflowLimit, + workflowOver, + }; + } + + private buildStepBudgetSummary(outcomes: StepOutcome[]): Map { + const summaries = new Map(); + for (const outcome of outcomes) { + const limit = this.stepBudgetLimits.get(outcome.name) ?? null; + const used = this.budgetTracker?.getStepUsage(outcome.name).total ?? 0; + if (limit === null && used === 0) continue; + + summaries.set(outcome.name, { + used, + limit, + over: limit !== null && used > limit, + }); + } + return summaries; + } + + private buildWorkflowBudgetSummary(): WorkflowBudgetSummary | undefined { + const limit = this.currentConfig?.swarm.tokenBudget; + if (limit === undefined && !this.budgetTracker) { + return undefined; + } + + const used = this.budgetTracker?.getTotalUsage().total ?? 0; + return { + used, + limit: limit ?? null, + over: limit !== undefined && used > limit, + }; + } + private buildTrajectoryCompletionEvidence( stepName: string ): StepCompletionDecision['evidence'] | undefined { @@ -2668,6 +2791,9 @@ export class WorkflowRunner { this.runtimeStepAgents.clear(); this.stepCompletionEvidence.clear(); this.agentReports.clear(); + this.stepBudgetLimits.clear(); + this.overBudgetSteps.clear(); + this.budgetTracker = this.createBudgetTracker(config); this.log(`Starting workflow "${workflow.name}" (${workflow.steps.length} steps)`); @@ -3864,6 +3990,12 @@ export class WorkflowRunner { reviewer: reviewDef, }; const usesDedicatedOwner = usesOwnerFlow && ownerDef.name !== specialistDef.name; + const stepBudget = this.resolveStepTokenBudget(ownerDef, specialistDef); + + if (this.budgetTracker && stepBudget !== undefined) { + this.stepBudgetLimits.set(step.name, stepBudget); + this.budgetTracker.setStepBudget(step.name, stepBudget); + } const maxRetries = step.retries ?? @@ -3953,6 +4085,18 @@ export class WorkflowRunner { const stepOutputContext = this.buildStepOutputContext(stepStates, runId); let resolvedTask = this.interpolateStepTask(step.task ?? '', stepOutputContext); + if (this.budgetTracker) { + const spawnCheck = this.budgetTracker.checkCanSpawn(agentName); + if (!spawnCheck.allowed) { + const workflowLimit = this.currentConfig?.swarm.tokenBudget ?? 0; + const workflowUsed = this.budgetTracker.getTotalUsage().total; + this.log( + `[budget] Skipping step "${step.name}" — ${spawnCheck.reason ?? `workflow budget exhausted (${workflowUsed}/${workflowLimit})`}` + ); + throw new BudgetExceededError(step.name, 'workflow', workflowLimit, workflowUsed); + } + } + // On retry attempts, prepend failure context so the agent knows what went wrong if (attempt > 0 && lastError) { const priorOutput = (this.lastFailedStepOutput.get(step.name) ?? '').slice(-2000); @@ -4178,7 +4322,7 @@ export class WorkflowRunner { } } - await this.captureAgentReport( + const report = await this.captureAgentReport( runId, step.name, lastEffectiveAgentDef, @@ -4186,6 +4330,7 @@ export class WorkflowRunner { lastAttemptStartedAt, Date.now() ); + this.applyBudgetReport(step.name, stepBudget, report); // Mark completed state.row.status = 'completed'; @@ -4216,7 +4361,12 @@ export class WorkflowRunner { return; } catch (err) { lastError = err instanceof Error ? err.message : String(err); - lastCompletionReason = err instanceof WorkflowCompletionError ? err.completionReason : undefined; + lastCompletionReason = + err instanceof BudgetExceededError + ? 'failed_budget_exceeded' + : err instanceof WorkflowCompletionError + ? err.completionReason + : undefined; if (lastCompletionReason === 'retry_requested_by_owner' && attempt >= maxRetries) { lastError = this.buildOwnerRetryBudgetExceededMessage(step.name, maxRetries, lastError); } @@ -4224,12 +4374,31 @@ export class WorkflowRunner { lastExitCode = err.exitCode; lastExitSignal = err.exitSignal; } + const report = await this.captureAgentReport( + runId, + step.name, + lastEffectiveAgentDef, + lastEffectiveCwd, + lastAttemptStartedAt, + Date.now() + ); + const budgetOutcome = this.applyBudgetReport(step.name, stepBudget, report); + if (budgetOutcome?.stepOver || budgetOutcome?.workflowOver) { + lastCompletionReason = 'failed_budget_exceeded'; + lastError = + budgetOutcome.stepOver && budgetOutcome.stepLimit !== null + ? `Step "${step.name}" exceeded token budget (${budgetOutcome.stepUsed}/${budgetOutcome.stepLimit} tokens used)` + : `Workflow budget exceeded after step "${step.name}" (${budgetOutcome?.workflowUsed ?? 0}/${budgetOutcome?.workflowLimit ?? 0} tokens used)`; + } const ownerTimedOut = usesDedicatedOwner ? /\bowner timed out\b/i.test(lastError) : /\btimed out\b/i.test(lastError) && !lastError.includes(`${step.name}-review`); if (ownerTimedOut) { this.emit({ type: 'step:owner-timeout', runId, stepName: step.name, ownerName: ownerDef.name }); } + if (err instanceof BudgetExceededError || budgetOutcome?.stepOver || budgetOutcome?.workflowOver) { + break; + } } } @@ -4240,14 +4409,17 @@ export class WorkflowRunner { typeof step.verification === 'object' && 'value' in step.verification ? String(step.verification.value) : undefined; - await this.captureAgentReport( - runId, - step.name, - lastEffectiveAgentDef, - lastEffectiveCwd, - lastAttemptStartedAt, - Date.now() - ); + if (!this.agentReports.has(step.name)) { + const report = await this.captureAgentReport( + runId, + step.name, + lastEffectiveAgentDef, + lastEffectiveCwd, + lastAttemptStartedAt, + Date.now() + ); + this.applyBudgetReport(step.name, stepBudget, report); + } await this.trajectory?.stepFailed(step, lastError ?? 'Unknown error', maxRetries + 1, maxRetries, { agent: agentName, nonInteractive, @@ -6440,8 +6612,8 @@ export class WorkflowRunner { cwd: string | undefined, startedAt: number | undefined, completedAt: number - ): Promise { - if (!agentDef || !cwd || !startedAt) return; + ): Promise { + if (!agentDef || !cwd || !startedAt) return null; try { const report = await collectCliSession({ @@ -6450,15 +6622,17 @@ export class WorkflowRunner { startedAt, completedAt, }); - if (!report) return; + if (!report) return null; this.agentReports.set(stepName, report); this.emit({ type: 'step:agent-report', runId, stepName, report }); await this.persistAgentReport(runId, stepName, report); + return report; } catch (error) { this.log( `[${stepName}] CLI session collection failed: ${error instanceof Error ? error.message : String(error)}` ); + return null; } } @@ -6643,7 +6817,14 @@ export class WorkflowRunner { // Always show the summary table — with agent reports when available, // with just step/status/duration when not (non-interactive agents). - console.log(formatRunSummaryTable(outcomes, this.agentReports)); + console.log( + formatRunSummaryTable( + outcomes, + this.agentReports, + this.buildStepBudgetSummary(outcomes), + this.buildWorkflowBudgetSummary() + ) + ); // Show errors and output excerpts for failed steps below the table for (const outcome of outcomes) { diff --git a/packages/sdk/src/workflows/types.ts b/packages/sdk/src/workflows/types.ts index ff41b9628..aaae9b1f1 100644 --- a/packages/sdk/src/workflows/types.ts +++ b/packages/sdk/src/workflows/types.ts @@ -75,6 +75,8 @@ export interface SwarmConfig { pattern: SwarmPattern; maxConcurrency?: number; timeoutMs?: number; + /** Max total enforced tokens (input + output, excluding cache reads) across the whole workflow. */ + tokenBudget?: number; channel?: string; /** Idle agent detection and nudging configuration for interactive agents. */ idleNudge?: IdleNudgeConfig; @@ -836,6 +838,7 @@ export type WorkflowStepCompletionReason = | 'completed_by_process_exit' | 'retry_requested_by_owner' | 'failed_verification' + | 'failed_budget_exceeded' | 'failed_owner_decision' | 'failed_no_evidence';