Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 129 additions & 140 deletions internal/agent/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,38 +92,103 @@ func (a *CustomAgent) Check(_ context.Context, _ Runtime) error { return nil }
// explicitly via ${api_key}, so skill-up does not pre-validate them.
func (a *CustomAgent) CheckCredentials(_ context.Context) error { return nil }

// Run executes the custom engine for a single case.
// customTransport executes a prepared custom-engine run and returns a uniform
// transportOutcome that the shared assembler (CustomAgent.assembleResult) maps
// to a *SessionResult. The local transport reads result files and the process
// exit; a future http transport will map an HTTP response body and status onto
// the same shape.
//
// run's returned error is a SETUP error raised before the engine executes (e.g.
// the command failed to render, or the input file could not be written): the
// run is abandoned and CustomAgent.Run reports an errorResult. An
// execution-phase failure — the engine ran but exited non-zero, timed out, or
// kept stdout open past WaitDelay — is carried in transportOutcome.execErr
// instead, so a partial result can still be assembled.
type customTransport interface {
run(ctx context.Context, rt Runtime, opts ExecOptions, prep *customRunPrep) (*transportOutcome, error)
}

// customRunPrep is the transport-agnostic state CustomAgent.Run builds once,
// before dispatching to a transport. Every transport consumes the same prepared
// session input and template variable set. start is captured at preparation
// time so the reported duration spans preparation through result assembly.
type customRunPrep struct {
custom *config.CustomEngineConfig
vars map[string]string
sessJSON []byte
timeoutSec int
messages []transcript.Message
start time.Time
}

// transportOutcome is what a transport hands back to the shared assembler.
// exitCode and stderr are the process exit code and stderr for the local
// transport (a future http transport maps HTTP status and error body onto
// them). raw is the result payload graded for session_result responses; stdout
// is the text-format final-message source (the local transport keeps them
// distinct so a bookkeeping output_file is never graded as a text answer).
// frameworkFiles are the framework-written paths (input/output files) to record
// in GeneratedFiles so the workspace-diff collector excludes them.
type transportOutcome struct {
raw string
stdout string
exitCode int
stderr string
execErr error
frameworkFiles []string
}

// Run executes the custom engine for a single case: it builds the shared
// session input, selects the transport, runs it, and assembles the result.
func (a *CustomAgent) Run(ctx context.Context, rt Runtime, opts ExecOptions, messages []transcript.Message) (*SessionResult, error) {
custom := a.Cfg.Custom
if custom == nil {
return a.errorResult(0), errors.New("custom engine config is missing")
}
transport, err := a.selectTransport(custom)
if err != nil {
return a.errorResult(0), err
}
prep, err := a.prepareRun(rt, opts, messages, custom)
if err != nil {
return a.errorResult(0), err
}
outcome, err := transport.run(ctx, rt, opts, prep)
if err != nil {
return a.errorResult(0), err
}
return a.assembleResult(ctx, rt, opts, prep, outcome)
}

// selectTransport maps engine.custom.transport to its implementation. The http
// transport is designed but not yet implemented; selecting it returns the
// explicit error rather than a transport. When http lands, this case returns an
// &httpTransport{} and CustomAgent.Run is otherwise unchanged.
func (a *CustomAgent) selectTransport(custom *config.CustomEngineConfig) (customTransport, error) {
switch custom.Transport {
case customTransportLocal:
return a.runLocal(ctx, rt, opts, messages, custom)
return &localTransport{a: a}, nil
case customTransportHTTP:
return a.errorResult(0), errors.New("custom engine http transport is not yet implemented")
return nil, errors.New("custom engine http transport is not yet implemented")
default:
return a.errorResult(0), fmt.Errorf("custom engine transport %q is not supported", custom.Transport)
return nil, fmt.Errorf("custom engine transport %q is not supported", custom.Transport)
}
}

func (a *CustomAgent) runLocal(ctx context.Context, rt Runtime, opts ExecOptions, messages []transcript.Message, custom *config.CustomEngineConfig) (*SessionResult, error) {
// Guard against a nil local block: validation skips engine.custom for
// built-in engines, so a `--engine` override can reach here unvalidated.
if custom.Local == nil {
return a.errorResult(0), errors.New("engine.custom.local.command is required when transport is local")
}

// prepareRun builds the transport-agnostic run state: the effective timeout, the
// full template variable set, and the marshaled SessionInput. start is captured
// here so the reported duration spans preparation through result assembly,
// matching the pre-refactor behavior.
func (a *CustomAgent) prepareRun(rt Runtime, opts ExecOptions, messages []transcript.Message, custom *config.CustomEngineConfig) (*customRunPrep, error) {
start := time.Now()
// engine.custom.timeout_seconds is the per-engine deadline; opts.TimeoutSec
// is the outer case deadline (the evaluator already wraps ctx with it).
// When both are positive, the effective deadline is the smaller of the
// two — exceeding the case deadline would have the case context kill the
// process while ${timeout_seconds} and SessionInput still advertised a
// longer timeout to the agent, producing premature termination with
// inconsistent metadata. Take the min so the value handed to the agent
// reflects the real wall-clock budget.
// When both are positive, the effective deadline is the smaller of the two
// — exceeding the case deadline would have the case context kill the process
// while ${timeout_seconds} and SessionInput still advertised a longer
// timeout to the agent, producing premature termination with inconsistent
// metadata. Take the min so the value handed to the agent reflects the real
// wall-clock budget.
timeoutSec := custom.TimeoutSeconds
switch {
case timeoutSec <= 0:
Expand All @@ -132,87 +197,37 @@ func (a *CustomAgent) runLocal(ctx context.Context, rt Runtime, opts ExecOptions
timeoutSec = opts.TimeoutSec
}

// Build the full template variable set first: kwargs may reference
// built-in variables (e.g. ${case_id}), and the I/O path templates may in
// turn reference ${kwargs.<key>}, so kwargs must be resolved before paths.
// Build the full template variable set first: kwargs may reference built-in
// variables (e.g. ${case_id}), and the I/O path templates may in turn
// reference ${kwargs.<key>}, so kwargs must be resolved before paths.
baseVars := a.buildBaseVars(rt, opts, messages, timeoutSec)

renderedKwargs, err := renderTemplateMap(custom.Kwargs, baseVars)
if err != nil {
return a.errorResult(0), fmt.Errorf("render custom.kwargs: %w", err)
return nil, fmt.Errorf("render custom.kwargs: %w", err)
}

sess := a.buildSessionInput(rt, opts, messages, renderedKwargs, timeoutSec)
// Marshal the session input once and share the bytes between the template
// expansion (${session_input}) and the on-disk input file, avoiding a
// second copy of what for long transcripts can be tens of MB per case.
// expansion (${session_input}) and the on-disk input file, avoiding a second
// copy of what for long transcripts can be tens of MB per case.
sessJSON, err := json.Marshal(sess)
if err != nil {
return a.errorResult(0), fmt.Errorf("marshal session input: %w", err)
return nil, fmt.Errorf("marshal session input: %w", err)
}
vars, err := a.completeTemplateVars(baseVars, renderedKwargs, sess, sessJSON)
if err != nil {
return a.errorResult(0), err
return nil, err
}

// Resolve the I/O paths with the full variable set (so they may use
// ${kwargs.<key>}), then expose the resolved paths to command rendering.
inputFile, outputFile, err := resolveCustomIOFiles(rt, custom, vars)
if err != nil {
return a.errorResult(0), err
}
vars["input_file"], vars["output_file"] = inputFile, outputFile

if err := persistRuntimeArtifact(ctx, rt, inputFile, string(sessJSON)); err != nil {
return a.errorResult(0), fmt.Errorf("write session input: %w", err)
}

// Remove any stale output file so a result left by a fixture or a previous
// run is never mistaken for this invocation's output. Only an explicitly
// configured output_file is cleared — the default ${output_file} path is
// left untouched, as it may be ordinary fixture input the agent reads.
// Also skip the clear when output_file resolves to the same path as
// input_file, so the SessionInput just written is not self-deleted.
clearedStaleOutput := false
if custom.Local.OutputFile != "" && filepath.Clean(outputFile) != filepath.Clean(inputFile) {
clearedStaleOutput = a.clearStaleOutputFile(ctx, rt, outputFile)
}

cmd, execOpts, err := a.buildLocalExec(ctx, rt, opts, custom, vars, timeoutSec)
if err != nil {
return a.errorResult(0), err
}

// Enforce the custom timeout via a context deadline: some runtimes
// (e.g. NoneRuntime) do not honor ExecOptions.TimeoutSec directly.
execCtx := ctx
if timeoutSec > 0 {
var cancel context.CancelFunc
execCtx, cancel = context.WithTimeout(ctx, time.Duration(timeoutSec)*time.Second)
defer cancel()
}

result, execErr := rt.Exec(execCtx, cmd, execOpts)
return a.finishLocal(ctx, rt, opts, custom, result, execErr, localRunContext{
InputFile: inputFile,
OutputFile: outputFile,
ClearedStaleOutput: clearedStaleOutput,
Start: start,
Messages: messages,
})
}

// localRunContext bundles the local-run bookkeeping that finishLocal needs:
// the resolved input/output paths, whether the pre-run cleanup actually
// removed a stale output file, the run start time (for duration), and the
// caller's transcript messages. Carrying these as one struct keeps
// finishLocal's signature manageable as more bookkeeping accrues.
type localRunContext struct {
InputFile string
OutputFile string
ClearedStaleOutput bool
Start time.Time
Messages []transcript.Message
return &customRunPrep{
custom: custom,
vars: vars,
sessJSON: sessJSON,
timeoutSec: timeoutSec,
messages: messages,
start: start,
}, nil
}

// clearStaleOutputFile removes a pre-existing output file before the run and
Expand Down Expand Up @@ -291,72 +306,53 @@ func (a *CustomAgent) buildLocalExec(ctx context.Context, rt Runtime, opts ExecO
return strings.Join(parts, " "), execOpts, nil
}

// finishLocal turns a runtime exec result into a SessionResult.
func (a *CustomAgent) finishLocal(ctx context.Context, rt Runtime, opts ExecOptions, custom *config.CustomEngineConfig, result ExecResult, execErr error, rc localRunContext) (*SessionResult, error) {
durationMs := time.Since(rc.Start).Milliseconds()

// The command may already have emitted a valid result even when execErr
// is non-nil — a timeout, exec.ErrWaitDelay (a backgrounded child kept
// stdout open past WaitDelay), or a non-zero exit after printing output.
// Always attempt the read so judges and expect checks can inspect the
// partial answer; if nothing was produced, raw stays empty and the
// parse-error path below takes over.
var (
raw string
outputFileProduced bool
)
readCtx := ctx
if execErr != nil {
// ctx may have been canceled (timeout/cancel) or unrelated to the
// failure; use a fresh context so a partial result is still
// recoverable on remote runtimes whose DownloadFile honors ctx.
var cancel context.CancelFunc
readCtx, cancel = context.WithTimeout(context.WithoutCancel(ctx), customArtifactReadTimeout)
defer cancel()
}
raw, outputFileProduced = a.readRawResult(readCtx, rt, custom, result, rc.OutputFile)

res, parseErr := a.buildResult(ctx, rt, opts, custom, raw, result, durationMs, rc.Messages)
// assembleResult maps a transport's outcome onto a *SessionResult, applying the
// shared exit-code and error semantics. It is transport-agnostic: the local and
// (future) http transports differ only in how they produce the
// transportOutcome. The raw-result read happens inside the transport, so this
// shared step never touches the output file or the process directly.
func (a *CustomAgent) assembleResult(ctx context.Context, rt Runtime, opts ExecOptions, prep *customRunPrep, o *transportOutcome) (*SessionResult, error) {
durationMs := time.Since(prep.start).Milliseconds()

usedOutputFile := outputFileProduced || rc.ClearedStaleOutput
res, parseErr := a.buildResult(ctx, rt, opts, prep.custom, o.raw, o.stdout, o.exitCode, o.stderr, durationMs, prep.messages)

if execErr != nil {
if o.execErr != nil {
switch {
case parseErr != nil:
// Nothing usable was produced before the failure.
res = a.errorResult(result.ExitCode)
res = a.errorResult(o.exitCode)
res.DurationMs = durationMs
res.Transcript = minimalCustomTranscript(rc.Messages, "")
res.Transcript = minimalCustomTranscript(prep.messages, "")
case res.ExitCode == 0:
// The run was interrupted; a parsed exit_code 0 must not let the
// evaluator treat an interrupted run as a success.
res.ExitCode = result.ExitCode
res.ExitCode = o.exitCode
if res.ExitCode == 0 {
res.ExitCode = 1
}
}
if res.Stderr == "" {
res.Stderr = a.maskAPIKey(result.Stderr)
res.Stderr = a.maskAPIKey(o.stderr)
} else {
res.Stderr = a.maskAPIKey(res.Stderr)
}
a.registerFrameworkIO(res, rc.InputFile, rc.OutputFile, usedOutputFile)
return res, fmt.Errorf("custom engine run failed: %w", execErr)
a.appendFrameworkFiles(res, o.frameworkFiles)
return res, fmt.Errorf("custom engine run failed: %w", o.execErr)
}

a.registerFrameworkIO(res, rc.InputFile, rc.OutputFile, usedOutputFile)
a.appendFrameworkFiles(res, o.frameworkFiles)
// A non-zero process exit is a failed run even when the engine's JSON
// reports exit_code 0 (e.g. a wrapper that crashed after printing output)
// or never emitted parseable JSON at all. Reflect the real process
// exit/stderr so reports surface the actual command failure.
if result.ExitCode != 0 {
res.ExitCode = result.ExitCode
if o.exitCode != 0 {
res.ExitCode = o.exitCode
if res.Stderr == "" {
res.Stderr = a.maskAPIKey(result.Stderr)
res.Stderr = a.maskAPIKey(o.stderr)
} else {
res.Stderr = a.maskAPIKey(res.Stderr)
}
return res, fmt.Errorf("custom engine command exited %d: %s", result.ExitCode, a.maskAPIKey(result.Stderr))
return res, fmt.Errorf("custom engine command exited %d: %s", o.exitCode, a.maskAPIKey(o.stderr))
}
if parseErr != nil {
res.Stderr = a.maskAPIKey(res.Stderr)
Expand All @@ -374,43 +370,36 @@ func (a *CustomAgent) finishLocal(ctx context.Context, rt Runtime, opts ExecOpti
// configured response_format. The text format never errors and always grades
// stdout (never the output file); session_result returns a parse error when
// the payload is missing or malformed.
func (a *CustomAgent) buildResult(ctx context.Context, rt Runtime, opts ExecOptions, custom *config.CustomEngineConfig, raw string, result ExecResult, durationMs int64, messages []transcript.Message) (*SessionResult, error) {
func (a *CustomAgent) buildResult(ctx context.Context, rt Runtime, opts ExecOptions, custom *config.CustomEngineConfig, raw, stdout string, exitCode int, stderr string, durationMs int64, messages []transcript.Message) (*SessionResult, error) {
if customResponseFormat(custom) == customResponseText {
// Per the contract, text responses come from stdout; an output file
// produced for bookkeeping must not be graded as the final answer.
// Mask before constructing the transcript so the assistant-reply
// message embedded in it never carries an unmasked value.
finalMsg := a.maskAPIKey(strings.TrimSpace(result.Stdout))
finalMsg := a.maskAPIKey(strings.TrimSpace(stdout))
return &SessionResult{
Engine: a.Name(),
ExitCode: result.ExitCode,
ExitCode: exitCode,
DurationMs: durationMs,
FinalMessage: finalMsg,
Stderr: a.maskAPIKey(result.Stderr),
Stderr: a.maskAPIKey(stderr),
Transcript: minimalCustomTranscript(messages, finalMsg),
Artifacts: &SessionArtifacts{},
}, nil
}
return a.parseSessionResult(ctx, rt, opts, raw, durationMs, messages)
}

// registerFrameworkIO records the framework-written input/output files in
// GeneratedFiles so the workspace-diff collector excludes them (they would
// otherwise show up as user changes) and they are archived for debugging.
// The bool gates registration of outputFile to match the "produced or cleared"
// rule the caller computes.
//
//revive:disable-next-line:flag-parameter
func (a *CustomAgent) registerFrameworkIO(res *SessionResult, inputFile, outputFile string, usedOutputFile bool) {
// appendFrameworkFiles records the framework-written files in GeneratedFiles so
// the workspace-diff collector excludes them (they would otherwise show up as
// user changes) and they are archived for debugging. The transport decides
// which files belong in the slice — for the local transport, the input file
// always, and the output file only when it was produced or cleared.
func (a *CustomAgent) appendFrameworkFiles(res *SessionResult, files []string) {
if res.Artifacts == nil {
res.Artifacts = &SessionArtifacts{}
}
if inputFile != "" {
res.Artifacts.GeneratedFiles = append(res.Artifacts.GeneratedFiles, inputFile)
}
if usedOutputFile && outputFile != "" {
res.Artifacts.GeneratedFiles = append(res.Artifacts.GeneratedFiles, outputFile)
}
res.Artifacts.GeneratedFiles = append(res.Artifacts.GeneratedFiles, files...)
}

// readRawResult reads the result payload from the output file, or from stdout.
Expand Down
Loading
Loading