Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 104 additions & 18 deletions CodeGenesis.Engine/Cli/RunPipelineCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ namespace CodeGenesis.Engine.Cli;
public sealed class RunPipelineCommand(
IClaudeRunner claude,
PipelineExecutor executor,
PipelineRenderer renderer) : AsyncCommand<RunPipelineCommandSettings>
PipelineRenderer renderer,
CheckpointManager checkpointManager) : AsyncCommand<RunPipelineCommandSettings>
{
public override async Task<int> ExecuteAsync(CommandContext commandContext, RunPipelineCommandSettings settings)
{
Expand All @@ -28,6 +29,54 @@ public override async Task<int> ExecuteAsync(CommandContext commandContext, RunP
return 1;
}

// Compute YAML hash for checkpoint comparison
var yamlHash = checkpointManager.ComputeFileHash(settings.File);

// Handle --resume / --from-step
HashSet<string>? completedSteps = null;
string? resumeFromStep = null;
PipelineCheckpoint? checkpoint = null;

if (settings.Resume || settings.FromStep is not null)
{
checkpoint = checkpointManager.Load(settings.File);

if (checkpoint is null)
{
renderer.RenderError("No checkpoint found. Run the pipeline without --resume first.");
return 1;
}

// Warn if YAML changed since checkpoint
if (checkpoint.YamlHash != yamlHash)
{
renderer.RenderInfo("Warning: Pipeline YAML has changed since the last checkpoint. Resuming with current config.");
}

completedSteps = new HashSet<string>(checkpoint.CompletedSteps);

if (settings.Resume)
{
// --resume: determine resume point from checkpoint
resumeFromStep = checkpoint.FailedStepName;
// If no failed step recorded, all completed steps will be skipped
// and execution continues from the first non-completed step

if (completedSteps.Count == 0 && resumeFromStep is null)
{
renderer.RenderInfo("Nothing to resume — no steps were completed.");
return 0;
}
}
else if (settings.FromStep is not null)
{
// --from-step: validate step exists in pipeline
resumeFromStep = settings.FromStep;
}

// Restore cached step outputs into context
}

// Build template variables from inputs (defaults + overrides)
var variables = new Dictionary<string, string>();
foreach (var (key, input) in config.Inputs)
Expand Down Expand Up @@ -71,6 +120,16 @@ public override async Task<int> ExecuteAsync(CommandContext commandContext, RunP
WorkingDirectory = workingDir
};

// Restore cached step outputs from checkpoint
if (checkpoint is not null && completedSteps is not null)
{
foreach (var (key, value) in checkpoint.StepOutputs)
{
if (completedSteps.Contains(key))
context.StepOutputs[key] = value;
}
}

// Build step tree using StepBuilder
List<IPipelineStep> steps;
try
Expand All @@ -84,6 +143,21 @@ public override async Task<int> ExecuteAsync(CommandContext commandContext, RunP
return 1;
}

// Validate --from-step target exists
if (settings.FromStep is not null && !steps.Any(s => s.Name == settings.FromStep))
{
renderer.RenderError($"Step '{settings.FromStep}' not found in pipeline.");
return 1;
}

// Check if all steps are already completed (--resume with nothing left)
if (completedSteps is not null && resumeFromStep is null && steps.All(s => completedSteps.Contains(s.Name)))
{
renderer.RenderInfo("All steps already completed. Nothing to resume.");
checkpointManager.Delete(settings.File);
return 0;
}

using var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
Expand All @@ -93,27 +167,39 @@ public override async Task<int> ExecuteAsync(CommandContext commandContext, RunP
};

// Execute pipeline — resolve templates for each step just before it runs
var success = await executor.RunAsync(steps, context, cts.Token, onBeforeStep: step =>
{
if (step is DynamicStep dynamicStep)
var success = await executor.RunAsync(steps, context, cts.Token,
onBeforeStep: step =>
{
// Re-resolve the prompt and system prompt with latest step outputs
var allVars = new Dictionary<string, string>(variables);
foreach (var (key, value) in context.StepOutputs)
allVars[$"steps.{key}"] = value;

dynamicStep.UpdateResolvedPrompt(
PipelineConfigLoader.ResolveTemplate(
dynamicStep.OriginalPromptTemplate, allVars));

if (dynamicStep.OriginalSystemPromptTemplate is not null)
if (step is DynamicStep dynamicStep)
{
dynamicStep.UpdateResolvedSystemPrompt(
// Re-resolve the prompt and system prompt with latest step outputs
var allVars = new Dictionary<string, string>(variables);
foreach (var (key, value) in context.StepOutputs)
allVars[$"steps.{key}"] = value;

dynamicStep.UpdateResolvedPrompt(
PipelineConfigLoader.ResolveTemplate(
dynamicStep.OriginalSystemPromptTemplate, allVars));
dynamicStep.OriginalPromptTemplate, allVars));

if (dynamicStep.OriginalSystemPromptTemplate is not null)
{
dynamicStep.UpdateResolvedSystemPrompt(
PipelineConfigLoader.ResolveTemplate(
dynamicStep.OriginalSystemPromptTemplate, allVars));
}
}
}
});
},
completedSteps: completedSteps,
resumeFromStep: resumeFromStep,
checkpointManager: checkpointManager,
pipelineFile: settings.File,
yamlHash: yamlHash,
pipelineName: config.Pipeline.Name);

if (!success)
{
renderer.RenderResumeHint(settings.File);
}

return success ? 0 : 1;
}
Expand Down
17 changes: 17 additions & 0 deletions CodeGenesis.Engine/Cli/RunPipelineCommandSettings.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.ComponentModel;
using Spectre.Console;
using Spectre.Console.Cli;

namespace CodeGenesis.Engine.Cli;
Expand All @@ -20,4 +21,20 @@ public sealed class RunPipelineCommandSettings : CommandSettings
[CommandOption("-m|--model")]
[Description("Claude model override (e.g. claude-sonnet-4-6)")]
public string? Model { get; set; }

[CommandOption("--resume")]
[Description("Resume from the last checkpoint (skips completed steps)")]
public bool Resume { get; set; }

[CommandOption("--from-step <STEP>")]
[Description("Resume from a specific step name (re-executes that step and all following)")]
public string? FromStep { get; set; }

public override ValidationResult Validate()
{
if (Resume && FromStep is not null)
return ValidationResult.Error("--resume and --from-step are mutually exclusive.");

return ValidationResult.Success();
}
}
88 changes: 88 additions & 0 deletions CodeGenesis.Engine/Pipeline/CheckpointManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using System.Security.Cryptography;
using System.Text.Json;
using Microsoft.Extensions.Logging;

namespace CodeGenesis.Engine.Pipeline;

public sealed class CheckpointManager(ILogger<CheckpointManager> logger)
{
private static readonly JsonSerializerOptions JsonOptions = new()
{
WriteIndented = true,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};

public string GetCheckpointPath(string pipelineFilePath)
{
var fullPath = Path.GetFullPath(pipelineFilePath);
var dir = Path.GetDirectoryName(fullPath)!;
var stem = Path.GetFileNameWithoutExtension(fullPath);
var checkpointDir = Path.Combine(dir, ".codegenesis");
return Path.Combine(checkpointDir, $"{stem}.checkpoint.json");
}

public PipelineCheckpoint? Load(string pipelineFilePath)
{
var path = GetCheckpointPath(pipelineFilePath);
if (!File.Exists(path))
return null;

try
{
var json = File.ReadAllText(path);
return JsonSerializer.Deserialize<PipelineCheckpoint>(json, JsonOptions);
}
catch (Exception ex)
{
logger.LogWarning(ex, "Failed to load checkpoint from {Path}, ignoring", path);
return null;
}
}

public void Save(PipelineCheckpoint checkpoint, string pipelineFilePath)
{
var path = GetCheckpointPath(pipelineFilePath);
var dir = Path.GetDirectoryName(path)!;
Directory.CreateDirectory(dir);

var json = JsonSerializer.Serialize(checkpoint, JsonOptions);
var tmpPath = path + ".tmp";

try
{
File.WriteAllText(tmpPath, json);
File.Move(tmpPath, path, overwrite: true);
logger.LogDebug("Checkpoint saved to {Path}", path);
}
catch (Exception ex)
{
logger.LogWarning(ex, "Failed to save checkpoint to {Path}", path);
// Clean up temp file if it exists
try { File.Delete(tmpPath); } catch { /* best effort */ }
}
}

public void Delete(string pipelineFilePath)
{
var path = GetCheckpointPath(pipelineFilePath);
try
{
if (File.Exists(path))
{
File.Delete(path);
logger.LogDebug("Checkpoint deleted: {Path}", path);
}
}
catch (Exception ex)
{
logger.LogWarning(ex, "Failed to delete checkpoint at {Path}", path);
}
}

public string ComputeFileHash(string filePath)
{
var bytes = File.ReadAllBytes(filePath);
var hash = SHA256.HashData(bytes);
return Convert.ToHexStringLower(hash);
}
}
22 changes: 22 additions & 0 deletions CodeGenesis.Engine/Pipeline/PipelineCheckpoint.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace CodeGenesis.Engine.Pipeline;

public sealed class PipelineCheckpoint
{
public int Version { get; init; } = 1;
public required string PipelineFile { get; init; }
public required string PipelineName { get; init; }
public required string YamlHash { get; init; }
public required DateTime LastUpdatedUtc { get; init; }
public required List<string> CompletedSteps { get; init; }
public required Dictionary<string, string> StepOutputs { get; init; }
public string? FailedStepName { get; init; }
public required CheckpointMetrics Metrics { get; init; }
}

public sealed class CheckpointMetrics
{
public int TotalInputTokens { get; init; }
public int TotalOutputTokens { get; init; }
public double TotalCostUsd { get; init; }
public int StepsCompleted { get; init; }
}
Loading