From 4e1a7940c52c49612afad9679a5ce92d18979d23 Mon Sep 17 00:00:00 2001 From: Christopher Tso Date: Mon, 30 Mar 2026 08:21:18 +1100 Subject: [PATCH] fix(pipeline): add progress feedback and concurrent execution to grading (#851) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit pipeline grade and pipeline run grading step now run code-graders concurrently (default: 10 workers, configurable via -j) and print real-time progress to stderr. Deduplicates grading logic into shared runCodeGraders(). Benchmarked: 65s → 10s for 468 graders. Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/cli/src/commands/pipeline/grade.ts | 234 ++++++++++++++++-------- apps/cli/src/commands/pipeline/run.ts | 86 +-------- 2 files changed, 163 insertions(+), 157 deletions(-) diff --git a/apps/cli/src/commands/pipeline/grade.ts b/apps/cli/src/commands/pipeline/grade.ts index c491c9e9..89146f21 100644 --- a/apps/cli/src/commands/pipeline/grade.ts +++ b/apps/cli/src/commands/pipeline/grade.ts @@ -6,6 +6,9 @@ * with the response text on stdin (matching CodeEvaluator payload format), * and writes results to code_grader_results/.json. * + * Graders run concurrently (default: 4 workers) for performance. + * Progress is printed to stderr so users see real-time feedback. + * * Export directory additions: * ///code_grader_results/.json */ @@ -13,7 +16,9 @@ import { mkdir, readFile, readdir, writeFile } from 'node:fs/promises'; import { join } from 'node:path'; import { executeScript } from '@agentv/core'; -import { command, positional, string } from 'cmd-ts'; +import { command, number, option, optional, positional, string } from 'cmd-ts'; + +const DEFAULT_CONCURRENCY = 10; /** * Convert a Message[] array to plain text. @@ -26,6 +31,142 @@ function extractInputText(input: Array<{ role: string; content: string }>): stri return input.map((m) => `@[${m.role}]:\n${m.content}`).join('\n\n'); } +/** Describes a single grader to execute. */ +export interface GraderTask { + testId: string; + testDir: string; + resultsDir: string; + graderFile: string; + responseText: string; + inputData: { + input: Array<{ role: string; content: string }>; + input_files?: unknown[]; + metadata?: Record; + }; +} + +/** + * Run code-grader tasks with concurrency and progress feedback. + * Shared by `pipeline grade` and `pipeline run`. + */ +export async function runCodeGraders( + tasks: GraderTask[], + concurrency: number, +): Promise<{ totalGraders: number; totalPassed: number }> { + let totalGraders = 0; + let totalPassed = 0; + let completed = 0; + const total = tasks.length; + + if (total === 0) return { totalGraders: 0, totalPassed: 0 }; + + const writeProgress = () => { + process.stderr.write(`\rGrading: ${completed}/${total} done`); + }; + + writeProgress(); + + const executeGrader = async (task: GraderTask) => { + const { testId, testDir, resultsDir, graderFile, responseText, inputData } = task; + const graderConfig = JSON.parse( + await readFile(join(testDir, 'code_graders', graderFile), 'utf8'), + ); + const graderName = graderConfig.name; + + const inputText = extractInputText(inputData.input); + const payload = JSON.stringify({ + output: [{ role: 'assistant', content: responseText }], + input: inputData.input, + criteria: '', + expected_output: [], + input_files: inputData.input_files ?? [], + trace: null, + token_usage: null, + cost_usd: null, + duration_ms: null, + start_time: null, + end_time: null, + file_changes: null, + workspace_path: null, + config: graderConfig.config ?? null, + metadata: inputData.metadata ?? {}, + input_text: inputText, + output_text: responseText, + expected_output_text: '', + }); + + try { + const stdout = await executeScript( + graderConfig.command, + payload, + undefined, + graderConfig.cwd, + ); + const parsed = JSON.parse(stdout); + const score = typeof parsed.score === 'number' ? parsed.score : 0; + const assertions = Array.isArray(parsed.assertions) ? parsed.assertions : []; + + const result = { + name: graderName, + type: 'code-grader', + score, + weight: graderConfig.weight ?? 1.0, + assertions, + details: parsed.details ?? {}, + }; + + await writeFile( + join(resultsDir, `${graderName}.json`), + `${JSON.stringify(result, null, 2)}\n`, + 'utf8', + ); + + totalGraders++; + if (score >= 0.5) totalPassed++; + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + process.stderr.write(`\n ${testId}/${graderName}: ERROR — ${message}\n`); + + const errorResult = { + name: graderName, + type: 'code-grader', + score: 0, + weight: graderConfig.weight ?? 1.0, + assertions: [{ text: `Error: ${message}`, passed: false }], + details: { error: message }, + }; + + await writeFile( + join(resultsDir, `${graderName}.json`), + `${JSON.stringify(errorResult, null, 2)}\n`, + 'utf8', + ); + totalGraders++; + } finally { + completed++; + writeProgress(); + } + }; + + // Run with concurrency limit + const pending = new Set>(); + for (const task of tasks) { + const p = executeGrader(task).then(() => { + pending.delete(p); + }); + pending.add(p); + if (pending.size >= concurrency) { + await Promise.race(pending); + } + } + await Promise.all(pending); + + // Clear the progress line and print final summary + process.stderr.write('\n'); + + return { totalGraders, totalPassed }; +} + export const evalGradeCommand = command({ name: 'grade', description: 'Run code-grader assertions on responses in an export directory', @@ -35,16 +176,23 @@ export const evalGradeCommand = command({ displayName: 'export-dir', description: 'Export directory from pipeline input', }), + concurrency: option({ + type: optional(number), + long: 'concurrency', + short: 'j', + description: `Number of graders to run in parallel (default: ${DEFAULT_CONCURRENCY})`, + }), }, - handler: async ({ exportDir }) => { + handler: async ({ exportDir, concurrency }) => { + const maxWorkers = concurrency ?? DEFAULT_CONCURRENCY; const manifestPath = join(exportDir, 'manifest.json'); const manifest = JSON.parse(await readFile(manifestPath, 'utf8')); const testIds: string[] = manifest.test_ids; const evalSet: string = manifest.dataset ?? ''; const safeEvalSet = evalSet ? evalSet.replace(/[\/\\:*?"<>|]/g, '_') : ''; - let totalGraders = 0; - let totalPassed = 0; + // Collect all grader tasks upfront so we know the total count + const tasks: GraderTask[] = []; for (const testId of testIds) { const subpath = safeEvalSet ? [safeEvalSet, testId] : [testId]; @@ -62,88 +210,16 @@ export const evalGradeCommand = command({ if (graderFiles.length === 0) continue; await mkdir(resultsDir, { recursive: true }); - // Read response and input for stdin payload + // Read response and input once per test (shared by all graders for this test) const responseText = await readFile(join(testDir, 'response.md'), 'utf8'); const inputData = JSON.parse(await readFile(join(testDir, 'input.json'), 'utf8')); for (const graderFile of graderFiles) { - const graderConfig = JSON.parse(await readFile(join(codeGradersDir, graderFile), 'utf8')); - const graderName = graderConfig.name; - - // Build stdin payload matching CodeEvaluator format (snake_case) - const inputText = extractInputText(inputData.input); - const payload = JSON.stringify({ - output: [{ role: 'assistant', content: responseText }], - input: inputData.input, - criteria: '', - expected_output: [], - input_files: inputData.input_files ?? [], - trace: null, - token_usage: null, - cost_usd: null, - duration_ms: null, - start_time: null, - end_time: null, - file_changes: null, - workspace_path: null, - config: graderConfig.config ?? null, - metadata: inputData.metadata ?? {}, - input_text: inputText, - output_text: responseText, - expected_output_text: '', - }); - - try { - const stdout = await executeScript( - graderConfig.command, - payload, - undefined, - graderConfig.cwd, - ); - const parsed = JSON.parse(stdout); - const score = typeof parsed.score === 'number' ? parsed.score : 0; - const assertions = Array.isArray(parsed.assertions) ? parsed.assertions : []; - - const result = { - name: graderName, - type: 'code-grader', - score, - weight: graderConfig.weight ?? 1.0, - assertions, - details: parsed.details ?? {}, - }; - - await writeFile( - join(resultsDir, `${graderName}.json`), - `${JSON.stringify(result, null, 2)}\n`, - 'utf8', - ); - - totalGraders++; - if (score >= 0.5) totalPassed++; - } catch (error) { - const message = error instanceof Error ? error.message : String(error); - console.error(` ${testId}/${graderName}: ERROR — ${message}`); - - const errorResult = { - name: graderName, - type: 'code-grader', - score: 0, - weight: graderConfig.weight ?? 1.0, - assertions: [{ text: `Error: ${message}`, passed: false }], - details: { error: message }, - }; - - await writeFile( - join(resultsDir, `${graderName}.json`), - `${JSON.stringify(errorResult, null, 2)}\n`, - 'utf8', - ); - totalGraders++; - } + tasks.push({ testId, testDir, resultsDir, graderFile, responseText, inputData }); } } + const { totalGraders, totalPassed } = await runCodeGraders(tasks, maxWorkers); console.log(`Graded ${totalGraders} code-grader(s): ${totalPassed} passed`); }, }); diff --git a/apps/cli/src/commands/pipeline/run.ts b/apps/cli/src/commands/pipeline/run.ts index 7533c389..59c65d93 100644 --- a/apps/cli/src/commands/pipeline/run.ts +++ b/apps/cli/src/commands/pipeline/run.ts @@ -17,13 +17,15 @@ import { mkdir, readFile, readdir, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { dirname, join, relative, resolve } from 'node:path'; -import { deriveCategory, executeScript, loadTestSuite } from '@agentv/core'; +import { deriveCategory, loadTestSuite } from '@agentv/core'; import type { CodeEvaluatorConfig, EvaluatorConfig, LlmGraderEvaluatorConfig } from '@agentv/core'; import { command, number, oneOf, option, optional, positional, string } from 'cmd-ts'; import { buildDefaultRunDir } from '../eval/result-layout.js'; import { findRepoRoot } from '../eval/shared.js'; import { selectTarget } from '../eval/targets.js'; +import type { GraderTask } from './grade.js'; +import { runCodeGraders } from './grade.js'; /** * Convert a Message[] array to plain text. @@ -313,8 +315,8 @@ export const evalRunCommand = command({ return; } - let totalGraders = 0; - let totalPassed = 0; + // Collect grader tasks and run concurrently with progress feedback + const graderTasks: GraderTask[] = []; for (const testId of testIds) { const subpath = safeEvalSet ? [safeEvalSet, testId] : [testId]; @@ -335,84 +337,12 @@ export const evalRunCommand = command({ const inputData = JSON.parse(await readFile(join(testDir, 'input.json'), 'utf8')); for (const graderFile of graderFiles) { - const graderConfig = JSON.parse(await readFile(join(codeGradersDir, graderFile), 'utf8')); - const graderName = graderConfig.name; - - const inputText = extractInputText(inputData.input); - const payload = JSON.stringify({ - output: [{ role: 'assistant', content: responseText }], - input: inputData.input, - criteria: '', - expected_output: [], - input_files: inputData.input_files ?? [], - trace: null, - token_usage: null, - cost_usd: null, - duration_ms: null, - start_time: null, - end_time: null, - file_changes: null, - workspace_path: null, - config: graderConfig.config ?? null, - metadata: inputData.metadata ?? {}, - input_text: inputText, - output_text: responseText, - expected_output_text: '', - }); - - try { - const stdout = await executeScript( - graderConfig.command, - payload, - undefined, - graderConfig.cwd, - ); - const parsed = JSON.parse(stdout); - const score = typeof parsed.score === 'number' ? parsed.score : 0; - const assertions = Array.isArray(parsed.assertions) ? parsed.assertions : []; - - await writeFile( - join(resultsDir, `${graderName}.json`), - `${JSON.stringify( - { - name: graderName, - type: 'code-grader', - score, - weight: graderConfig.weight ?? 1.0, - assertions, - details: parsed.details ?? {}, - }, - null, - 2, - )}\n`, - 'utf8', - ); - totalGraders++; - if (score >= 0.5) totalPassed++; - } catch (error) { - const message = error instanceof Error ? error.message : String(error); - console.error(` ${testId}/${graderName}: ERROR — ${message}`); - await writeFile( - join(resultsDir, `${graderName}.json`), - `${JSON.stringify( - { - name: graderName, - type: 'code-grader', - score: 0, - weight: graderConfig.weight ?? 1.0, - assertions: [{ text: `Error: ${message}`, passed: false }], - details: { error: message }, - }, - null, - 2, - )}\n`, - 'utf8', - ); - totalGraders++; - } + graderTasks.push({ testId, testDir, resultsDir, graderFile, responseText, inputData }); } } + const graderConcurrency = workers ?? 10; + const { totalGraders, totalPassed } = await runCodeGraders(graderTasks, graderConcurrency); console.log(`Graded ${totalGraders} code-grader(s): ${totalPassed} passed`); console.log(`\nDone. Agent can now perform LLM grading on responses in ${outDir}`); },