diff --git a/v3/@claude-flow/codex/README.md b/v3/@claude-flow/codex/README.md index e848a5309c..51d2700080 100644 --- a/v3/@claude-flow/codex/README.md +++ b/v3/@claude-flow/codex/README.md @@ -497,14 +497,6 @@ $performance-optimization | Post-Task | `$hooks:post-task` | Record task completion | | Session End | `$hooks:session-end` | End session and persist | -#### Dual-Mode Skills (NEW) - -| Skill | Syntax | Description | -|-------|--------|-------------| -| Dual Spawn | `$dual-spawn` | Spawn parallel Codex workers from Claude Code | -| Dual Coordinate | `$dual-coordinate` | Coordinate Claude Code + Codex execution | -| Dual Collect | `$dual-collect` | Collect results from parallel Codex instances | - ### Custom Skills Create custom skills in `.agents/skills/`: @@ -559,56 +551,32 @@ npx claude-flow@alpha init --dual ### Spawning Parallel Codex Workers -From Claude Code, spawn headless Codex instances: +From Claude Code, spawn headless Codex instances with `codex exec`. +Use `claude -p` only for Claude workers. ```bash -# Spawn workers in parallel (each runs independently) -claude -p "Analyze src/auth/ for security issues" --session-id "task-1" & -claude -p "Write unit tests for src/api/" --session-id "task-2" & -claude -p "Optimize database queries in src/db/" --session-id "task-3" & +# Spawn Codex workers in parallel (each runs independently) +codex exec --skip-git-repo-check "Analyze src/auth/ for security issues" & +codex exec --skip-git-repo-check "Write unit tests for src/api/" & +codex exec --skip-git-repo-check "Optimize database queries in src/db/" & wait # Wait for all to complete ``` -### Dual-Mode Skills - -| Skill | Platform | Description | -|-------|----------|-------------| -| `$dual-spawn` | Codex | Spawn parallel workers from orchestrator | -| `$dual-coordinate` | Both | Coordinate cross-platform execution | -| `$dual-collect` | Claude Code | Collect results from Codex workers | - -### Dual-Mode Agents +### What Dual-Mode Currently Provides -| Agent | Type | Execution | -|-------|------|-----------| -| `codex-worker` | Worker | Headless background execution | -| `codex-coordinator` | Coordinator | Manage parallel worker pool | -| `dual-orchestrator` | Orchestrator | Route tasks to appropriate platform | +The current implementation provides: -### Task Routing Rules +- Static collaboration templates for `feature`, `security`, and `refactor` +- Fixed platform assignment per template worker +- Shared memory initialization and result collection +- Best-effort parallel execution within each dependency level, capped by `maxConcurrent` -| Task Complexity | Platform | Reason | -|----------------|----------|--------| -| Simple (1-2 files) | Codex Headless | Fast, parallel | -| Medium (3-5 files) | Claude Code | Needs context | -| Complex (architecture) | Claude Code | Reasoning required | -| Bulk operations | Codex Workers | Parallelize | -| Final review | Claude Code | Integration | +It does not currently provide: -### Example Workflow - -``` -1. Claude Code receives complex feature request -2. Designs architecture and creates plan -3. Spawns 4 Codex workers: - - Worker 1: Implement data models - - Worker 2: Create API endpoints - - Worker 3: Write unit tests - - Worker 4: Generate documentation -4. Workers execute in parallel (headless) -5. Claude Code collects and integrates results -6. Final review and refinement in Claude Code -``` +- Dynamic task routing by complexity +- Built-in dual-mode skills such as `$dual-spawn` +- Dedicated coordinator agents beyond the configured worker list +- Automatic Claude-side collection workflows outside the orchestrator itself ### Memory Sharing @@ -674,6 +642,8 @@ const orchestrator = new DualModeOrchestrator({ timeout: 300000, }); +// maxConcurrent limits workers within the same dependency level. + // Listen to events orchestrator.on('worker:started', ({ id, role }) => console.log(`Started: ${role}`)); orchestrator.on('worker:completed', ({ id }) => console.log(`Completed: ${id}`)); @@ -1022,3 +992,4 @@ MIT - Documentation: https://github.com/ruvnet/claude-flow - Issues: https://github.com/ruvnet/claude-flow/issues + diff --git a/v3/@claude-flow/codex/src/dual-mode/orchestrator.ts b/v3/@claude-flow/codex/src/dual-mode/orchestrator.ts index f6504032a4..5e483f054e 100644 --- a/v3/@claude-flow/codex/src/dual-mode/orchestrator.ts +++ b/v3/@claude-flow/codex/src/dual-mode/orchestrator.ts @@ -5,8 +5,9 @@ import { spawn, ChildProcess } from 'child_process'; import { EventEmitter } from 'events'; -import * as path from 'path'; import * as fs from 'fs'; +import * as os from 'os'; +import * as path from 'path'; export interface WorkerConfig { id: string; @@ -48,6 +49,68 @@ export interface CollaborationResult { errors: string[]; } +export interface HeadlessCommandSpec { + command: string; + args: string[]; + outputFile?: string; + unsupportedOptions?: string[]; +} + +interface HeadlessExecutionPlan extends HeadlessCommandSpec { + cleanup: () => void; +} + +export function buildHeadlessCommandSpec( + worker: Pick, + prompt: string, + commands: Pick, 'claudeCommand' | 'codexCommand'>, + outputFile?: string, +): HeadlessCommandSpec { + if (worker.platform === 'claude') { + const args = ['-p', prompt, '--output-format', 'text']; + + if (worker.maxTurns) { + args.push('--max-turns', String(worker.maxTurns)); + } + + if (worker.model) { + args.push('--model', worker.model); + } + + return { + command: commands.claudeCommand, + args, + }; + } + + const args = ['exec', '--skip-git-repo-check', '--color', 'never']; + + if (worker.maxTurns !== undefined) { + throw new Error('Codex workers do not support maxTurns in dual-mode yet'); + } + + if (worker.model) { + args.push('--model', worker.model); + } + + if (outputFile) { + args.push('--output-last-message', outputFile); + } + + args.push(prompt); + + return outputFile + ? { + command: commands.codexCommand, + args, + outputFile, + } + : { + command: commands.codexCommand, + args, + }; +} + /** * Orchestrates parallel execution of Claude Code and Codex workers */ @@ -62,9 +125,9 @@ export class DualModeOrchestrator extends EventEmitter { projectPath: config.projectPath, maxConcurrent: config.maxConcurrent ?? 4, sharedNamespace: config.sharedNamespace ?? 'collaboration', - timeout: config.timeout ?? 300000, // 5 minutes + timeout: config.timeout ?? 300000, claudeCommand: config.claudeCommand ?? 'claude', - codexCommand: config.codexCommand ?? 'claude', // Both use claude CLI + codexCommand: config.codexCommand ?? 'codex', }; } @@ -74,23 +137,21 @@ export class DualModeOrchestrator extends EventEmitter { async initializeSharedMemory(taskContext: string): Promise { const { projectPath, sharedNamespace } = this.config; - // Initialize memory database await this.runCommand( 'npx', ['claude-flow@alpha', 'memory', 'init', '--force'], - projectPath + projectPath, ); - // Store task context await this.runCommand( 'npx', [ 'claude-flow@alpha', 'memory', 'store', '--key', 'task-context', '--value', taskContext, - '--namespace', sharedNamespace + '--namespace', sharedNamespace, ], - projectPath + projectPath, ); this.emit('memory:initialized', { namespace: sharedNamespace, taskContext }); @@ -109,7 +170,6 @@ export class DualModeOrchestrator extends EventEmitter { }; this.workers.set(config.id, result); - // Wait for dependencies if (config.dependsOn?.length) { await this.waitForDependencies(config.dependsOn); } @@ -129,6 +189,7 @@ export class DualModeOrchestrator extends EventEmitter { result.error = error instanceof Error ? error.message : String(error); result.completedAt = new Date(); this.emit('worker:failed', { id: config.id, error: result.error }); + throw error; } } @@ -136,34 +197,16 @@ export class DualModeOrchestrator extends EventEmitter { * Execute a headless Claude/Codex instance */ private async executeHeadless(config: WorkerConfig): Promise { - const { projectPath, timeout } = this.config; - const command = config.platform === 'claude' ? this.config.claudeCommand : this.config.codexCommand; - - // Build the prompt with memory integration + const { projectPath } = this.config; + const timeout = config.timeout ?? this.config.timeout; const enhancedPrompt = this.buildCollaborativePrompt(config); - - const args = [ - '-p', enhancedPrompt, - '--output-format', 'text', - ]; - - if (config.maxTurns) { - args.push('--max-turns', String(config.maxTurns)); - } - - if (config.model) { - args.push('--model', config.model); - } + const execution = this.createHeadlessExecutionPlan(config, enhancedPrompt); return new Promise((resolve, reject) => { let output = ''; let errorOutput = ''; - const proc = spawn(command, args, { - cwd: projectPath, - env: { ...process.env, FORCE_COLOR: '0' }, - stdio: ['pipe', 'pipe', 'pipe'], - }); + const proc = this.spawnProcess(execution.command, execution.args, projectPath); this.processes.set(config.id, proc); @@ -177,6 +220,7 @@ export class DualModeOrchestrator extends EventEmitter { const timer = setTimeout(() => { proc.kill('SIGTERM'); + execution.cleanup(); reject(new Error(`Worker ${config.id} timed out after ${timeout}ms`)); }, timeout); @@ -184,21 +228,96 @@ export class DualModeOrchestrator extends EventEmitter { clearTimeout(timer); this.processes.delete(config.id); - if (code === 0 || output.length > 0) { - resolve(output || errorOutput); - } else { - reject(new Error(`Worker ${config.id} exited with code ${code}: ${errorOutput}`)); + try { + const finalOutput = this.readHeadlessOutput(execution, output, errorOutput); + + if (code === 0) { + resolve(finalOutput); + return; + } + + const diagnostic = finalOutput || 'Worker exited without output'; + reject(new Error(`Worker ${config.id} exited with code ${code}: ${diagnostic}`)); + } finally { + execution.cleanup(); } }); proc.on('error', (err) => { clearTimeout(timer); this.processes.delete(config.id); + execution.cleanup(); reject(err); }); }); } + protected spawnProcess(command: string, args: string[], cwd: string): ChildProcess { + return spawn(command, args, { + cwd, + env: { ...process.env, FORCE_COLOR: '0' }, + stdio: ['pipe', 'pipe', 'pipe'], + }); + } + + private createHeadlessExecutionPlan(config: WorkerConfig, prompt: string): HeadlessExecutionPlan { + if (config.platform === 'claude') { + return { + ...buildHeadlessCommandSpec(config, prompt, this.config), + cleanup: () => {}, + }; + } + + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'claude-flow-codex-')); + const safeId = this.sanitizeWorkerId(config.id); + const outputFile = path.join(tempDir, `${safeId}-last-message.txt`); + + return { + ...buildHeadlessCommandSpec(config, prompt, this.config, outputFile), + cleanup: () => { + try { + fs.rmSync(tempDir, { recursive: true, force: true }); + } catch { + // Ignore cleanup failures for temporary files. + } + }, + }; + } + + private readHeadlessOutput( + execution: HeadlessCommandSpec, + output: string, + errorOutput: string, + ): string { + const parts: string[] = []; + + if (execution.outputFile && fs.existsSync(execution.outputFile)) { + const finalMessage = fs.readFileSync(execution.outputFile, 'utf8').trim(); + if (finalMessage.length > 0) { + parts.push(finalMessage); + } + } + + if (output.trim().length > 0) { + parts.push(output.trim()); + } + + if (errorOutput.trim().length > 0) { + parts.push(errorOutput.trim()); + } + + return parts.join("`n`n").trim(); + } + + private sanitizeWorkerId(id: string): string { + const sanitized = path.basename(id) + .replace(/[^a-zA-Z0-9_-]/g, '-') + .replace(/-+/g, '-') + .replace(/^-|-$/g, ''); + + return sanitized.length > 0 ? sanitized : 'worker'; + } + /** * Build a prompt that includes memory coordination instructions */ @@ -230,14 +349,18 @@ Remember: Other agents depend on your results in shared memory. Be concise and s let waited = 0; while (waited < maxWait) { - const allComplete = deps.every(depId => { - const worker = this.workers.get(depId); - return worker && (worker.status === 'completed' || worker.status === 'failed'); - }); + const dependencyStates = deps.map((depId) => this.workers.get(depId)); + const failedDependency = dependencyStates.find((worker) => worker?.status === 'failed'); + + if (failedDependency) { + throw new Error(`Dependency ${failedDependency.id} failed`); + } + + const allComplete = dependencyStates.every((worker) => worker?.status === 'completed'); if (allComplete) return; - await new Promise(resolve => setTimeout(resolve, checkInterval)); + await new Promise((resolve) => setTimeout(resolve, checkInterval)); waited += checkInterval; } @@ -251,23 +374,14 @@ Remember: Other agents depend on your results in shared memory. Be concise and s const startTime = Date.now(); const errors: string[] = []; - // Initialize shared memory await this.initializeSharedMemory(taskContext); - // Group workers by dependency level const levels = this.buildDependencyLevels(workers); - // Execute each level in parallel for (const level of levels) { - const promises = level.map(worker => - this.spawnWorker(worker).catch(err => { - errors.push(`${worker.id}: ${err.message}`); - }) - ); - await Promise.all(promises); + await this.runWorkerLevel(level, errors); } - // Collect shared memory results const sharedMemory = await this.collectSharedMemory(); return { @@ -279,6 +393,20 @@ Remember: Other agents depend on your results in shared memory. Be concise and s }; } + private async runWorkerLevel(level: WorkerConfig[], errors: string[]): Promise { + const concurrency = Math.max(1, this.config.maxConcurrent); + + for (let index = 0; index < level.length; index += concurrency) { + const batch = level.slice(index, index + concurrency); + const promises = batch.map((worker) => + this.spawnWorker(worker).catch((err: Error) => { + errors.push(`${worker.id}: ${err.message}`); + }), + ); + await Promise.all(promises); + } + } + /** * Build dependency levels for parallel execution */ @@ -293,7 +421,7 @@ Remember: Other agents depend on your results in shared memory. Be concise and s if (placed.has(worker.id)) continue; const depsReady = !worker.dependsOn || - worker.dependsOn.every(dep => placed.has(dep)); + worker.dependsOn.every((dep) => placed.has(dep)); if (depsReady) { level.push(worker); @@ -301,7 +429,6 @@ Remember: Other agents depend on your results in shared memory. Be concise and s } if (level.length === 0 && placed.size < workers.length) { - // Circular dependency detected, add remaining for (const worker of workers) { if (!placed.has(worker.id)) { level.push(worker); @@ -331,7 +458,7 @@ Remember: Other agents depend on your results in shared memory. Be concise and s const output = await this.runCommand( 'npx', ['claude-flow@alpha', 'memory', 'list', '--namespace', sharedNamespace, '--format', 'json'], - projectPath + projectPath, ); return JSON.parse(output); } catch { @@ -394,23 +521,21 @@ export const CollaborationTemplates = { id: 'coder', platform: 'codex', role: 'coder', - prompt: `Implement the feature based on the architecture. Write clean, typed code.`, + prompt: 'Implement the feature based on the architecture. Write clean, typed code.', dependsOn: ['architect'], - maxTurns: 15, }, { id: 'tester', platform: 'codex', role: 'tester', - prompt: `Write comprehensive tests for the implementation. Target 80% coverage.`, + prompt: 'Write comprehensive tests for the implementation. Target 80% coverage.', dependsOn: ['coder'], - maxTurns: 10, }, { id: 'reviewer', platform: 'claude', role: 'reviewer', - prompt: `Review the code and tests for quality, security, and best practices.`, + prompt: 'Review the code and tests for quality, security, and best practices.', dependsOn: ['coder', 'tester'], maxTurns: 8, }, @@ -425,13 +550,12 @@ export const CollaborationTemplates = { platform: 'codex', role: 'security-scanner', prompt: `Scan ${target} for security vulnerabilities. Check OWASP Top 10.`, - maxTurns: 10, }, { id: 'analyzer', platform: 'claude', role: 'security-analyst', - prompt: `Analyze scan results and identify critical vulnerabilities.`, + prompt: 'Analyze scan results and identify critical vulnerabilities.', dependsOn: ['scanner'], maxTurns: 8, }, @@ -439,9 +563,8 @@ export const CollaborationTemplates = { id: 'fixer', platform: 'codex', role: 'security-fixer', - prompt: `Generate fixes for identified vulnerabilities.`, + prompt: 'Generate fixes for identified vulnerabilities.', dependsOn: ['analyzer'], - maxTurns: 12, }, ], @@ -460,7 +583,7 @@ export const CollaborationTemplates = { id: 'planner', platform: 'claude', role: 'refactor-planner', - prompt: `Create a refactoring plan based on the analysis.`, + prompt: 'Create a refactoring plan based on the analysis.', dependsOn: ['analyzer'], maxTurns: 6, }, @@ -468,17 +591,19 @@ export const CollaborationTemplates = { id: 'refactorer', platform: 'codex', role: 'refactorer', - prompt: `Execute the refactoring plan. Maintain all existing functionality.`, + prompt: 'Execute the refactoring plan. Maintain all existing functionality.', dependsOn: ['planner'], - maxTurns: 15, }, { id: 'validator', platform: 'codex', role: 'validator', - prompt: `Run tests and validate the refactoring didn't break anything.`, + prompt: 'Run tests and validate the refactoring didn\'t break anything.', dependsOn: ['refactorer'], - maxTurns: 5, }, ], }; + + + + diff --git a/v3/@claude-flow/codex/tests/dual-mode/orchestrator.test.ts b/v3/@claude-flow/codex/tests/dual-mode/orchestrator.test.ts new file mode 100644 index 0000000000..6db0de5124 --- /dev/null +++ b/v3/@claude-flow/codex/tests/dual-mode/orchestrator.test.ts @@ -0,0 +1,216 @@ +/** + * @claude-flow/codex - Dual-Mode Orchestrator Tests + */ + +import { PassThrough } from 'stream'; +import { describe, expect, it } from 'vitest'; +import { + buildHeadlessCommandSpec, + DualModeOrchestrator, +} from '../../src/dual-mode/orchestrator.js'; + +type SpawnRecord = { command: string; args: string[]; cwd: string }; +type SpawnMode = 'success' | 'failure'; + +class TestOrchestrator extends DualModeOrchestrator { + public spawned: SpawnRecord[] = []; + public mode: SpawnMode = 'success'; + + protected override spawnProcess(command: string, args: string[], cwd: string) { + this.spawned.push({ command, args, cwd }); + + const stdout = new PassThrough(); + const stderr = new PassThrough(); + const handlers = new Map void>>(); + + const proc = { + stdout, + stderr, + kill: () => true, + on: (event: string, handler: (...args: any[]) => void) => { + const list = handlers.get(event) ?? []; + list.push(handler); + handlers.set(event, list); + return proc; + }, + }; + + queueMicrotask(() => { + if (this.mode === 'failure') { + stderr.write('boom'); + handlers.get('close')?.forEach((handler) => handler(1)); + return; + } + + stdout.write('ok'); + handlers.get('close')?.forEach((handler) => handler(0)); + }); + + return proc as any; + } +} + +describe('DualModeOrchestrator', () => { + it('defaults to separate claude and codex executors', () => { + const orchestrator = new DualModeOrchestrator({ projectPath: '/tmp/project' }); + const internalConfig = (orchestrator as unknown as { + config: { claudeCommand: string; codexCommand: string }; + }).config; + + expect(internalConfig.claudeCommand).toBe('claude'); + expect(internalConfig.codexCommand).toBe('codex'); + }); + + it('preserves explicit executor overrides', () => { + const orchestrator = new DualModeOrchestrator({ + projectPath: '/tmp/project', + claudeCommand: 'claude-custom', + codexCommand: 'codex-custom', + }); + const internalConfig = (orchestrator as unknown as { + config: { claudeCommand: string; codexCommand: string }; + }).config; + + expect(internalConfig.claudeCommand).toBe('claude-custom'); + expect(internalConfig.codexCommand).toBe('codex-custom'); + }); + + it('uses codex exec in the actual worker execution path', async () => { + const orchestrator = new TestOrchestrator({ + projectPath: '/tmp/project', + codexCommand: 'codex-bin', + timeout: 1000, + }); + + await orchestrator.spawnWorker({ + id: 'worker-1', + platform: 'codex', + role: 'coder', + prompt: 'Implement the feature', + }); + + expect(orchestrator.spawned).toHaveLength(1); + expect(orchestrator.spawned[0]).toMatchObject({ + command: 'codex-bin', + cwd: '/tmp/project', + }); + expect(orchestrator.spawned[0]?.args[0]).toBe('exec'); + expect(orchestrator.spawned[0]?.args).not.toContain('-p'); + expect(orchestrator.spawned[0]?.args).not.toContain('--output-format'); + }); + + it('propagates worker execution failures', async () => { + const orchestrator = new TestOrchestrator({ + projectPath: '/tmp/project', + codexCommand: 'codex-bin', + timeout: 1000, + }); + orchestrator.mode = 'failure'; + + await expect( + orchestrator.spawnWorker({ + id: 'worker-2', + platform: 'codex', + role: 'coder', + prompt: 'Implement the feature', + }), + ).rejects.toThrow('Worker worker-2 exited with code 1'); + }); +}); + +describe('buildHeadlessCommandSpec', () => { + it('builds claude headless arguments', () => { + const spec = buildHeadlessCommandSpec( + { + platform: 'claude', + model: 'claude-sonnet', + maxTurns: 9, + }, + 'Analyze the repository', + { + claudeCommand: 'claude', + codexCommand: 'codex', + }, + ); + + expect(spec).toEqual({ + command: 'claude', + args: [ + '-p', + 'Analyze the repository', + '--output-format', + 'text', + '--max-turns', + '9', + '--model', + 'claude-sonnet', + ], + }); + }); + + it('builds codex exec arguments', () => { + const outputFile = 'C:/temp/codex-last-message.txt'; + const spec = buildHeadlessCommandSpec( + { + platform: 'codex', + model: 'gpt-5.3-codex', + }, + 'Implement the feature', + { + claudeCommand: 'claude', + codexCommand: 'codex', + }, + outputFile, + ); + + expect(spec).toEqual({ + command: 'codex', + args: [ + 'exec', + '--skip-git-repo-check', + '--color', + 'never', + '--model', + 'gpt-5.3-codex', + '--output-last-message', + outputFile, + 'Implement the feature', + ], + outputFile, + }); + }); + + it('throws when codex worker is configured with maxTurns', () => { + expect(() => + buildHeadlessCommandSpec( + { + platform: 'codex', + maxTurns: 12, + }, + 'Implement the feature', + { + claudeCommand: 'claude', + codexCommand: 'codex', + }, + ), + ).toThrow('Codex workers do not support maxTurns in dual-mode yet'); + }); + + it('omits optional codex properties when not provided', () => { + const spec = buildHeadlessCommandSpec( + { + platform: 'codex', + }, + 'Review the implementation', + { + claudeCommand: 'claude', + codexCommand: 'codex', + }, + ); + + expect(spec).toEqual({ + command: 'codex', + args: ['exec', '--skip-git-repo-check', '--color', 'never', 'Review the implementation'], + }); + }); +});