From d1aa9672b8b2c40aa2447c6913b09340df98640d Mon Sep 17 00:00:00 2001 From: viamu Date: Sat, 28 Feb 2026 22:21:47 -0300 Subject: [PATCH] Add pipeline resume with checkpoint persistence Save a checkpoint after each successful step so failed pipelines can be resumed without re-running completed steps. Introduces --resume (picks up from the last failure) and --from-step (re-executes from a specific step). Checkpoints are stored in .codegenesis/ next to the YAML file and are automatically deleted on successful completion. Co-Authored-By: Claude Opus 4.6 --- CodeGenesis.Engine/Cli/RunPipelineCommand.cs | 122 +++++++++++-- .../Cli/RunPipelineCommandSettings.cs | 17 ++ .../Pipeline/CheckpointManager.cs | 88 ++++++++++ .../Pipeline/PipelineCheckpoint.cs | 22 +++ .../Pipeline/PipelineExecutor.cs | 162 +++++++++++++++++- CodeGenesis.Engine/Program.cs | 5 +- CodeGenesis.Engine/UI/PipelineRenderer.cs | 19 ++ 7 files changed, 410 insertions(+), 25 deletions(-) create mode 100644 CodeGenesis.Engine/Pipeline/CheckpointManager.cs create mode 100644 CodeGenesis.Engine/Pipeline/PipelineCheckpoint.cs diff --git a/CodeGenesis.Engine/Cli/RunPipelineCommand.cs b/CodeGenesis.Engine/Cli/RunPipelineCommand.cs index 44e0a83..05fbf6f 100644 --- a/CodeGenesis.Engine/Cli/RunPipelineCommand.cs +++ b/CodeGenesis.Engine/Cli/RunPipelineCommand.cs @@ -10,7 +10,8 @@ namespace CodeGenesis.Engine.Cli; public sealed class RunPipelineCommand( IClaudeRunner claude, PipelineExecutor executor, - PipelineRenderer renderer) : AsyncCommand + PipelineRenderer renderer, + CheckpointManager checkpointManager) : AsyncCommand { public override async Task ExecuteAsync(CommandContext commandContext, RunPipelineCommandSettings settings) { @@ -28,6 +29,54 @@ public override async Task ExecuteAsync(CommandContext commandContext, RunP return 1; } + // Compute YAML hash for checkpoint comparison + var yamlHash = checkpointManager.ComputeFileHash(settings.File); + + // Handle --resume / --from-step + HashSet? completedSteps = null; + string? resumeFromStep = null; + PipelineCheckpoint? checkpoint = null; + + if (settings.Resume || settings.FromStep is not null) + { + checkpoint = checkpointManager.Load(settings.File); + + if (checkpoint is null) + { + renderer.RenderError("No checkpoint found. Run the pipeline without --resume first."); + return 1; + } + + // Warn if YAML changed since checkpoint + if (checkpoint.YamlHash != yamlHash) + { + renderer.RenderInfo("Warning: Pipeline YAML has changed since the last checkpoint. Resuming with current config."); + } + + completedSteps = new HashSet(checkpoint.CompletedSteps); + + if (settings.Resume) + { + // --resume: determine resume point from checkpoint + resumeFromStep = checkpoint.FailedStepName; + // If no failed step recorded, all completed steps will be skipped + // and execution continues from the first non-completed step + + if (completedSteps.Count == 0 && resumeFromStep is null) + { + renderer.RenderInfo("Nothing to resume — no steps were completed."); + return 0; + } + } + else if (settings.FromStep is not null) + { + // --from-step: validate step exists in pipeline + resumeFromStep = settings.FromStep; + } + + // Restore cached step outputs into context + } + // Build template variables from inputs (defaults + overrides) var variables = new Dictionary(); foreach (var (key, input) in config.Inputs) @@ -71,6 +120,16 @@ public override async Task ExecuteAsync(CommandContext commandContext, RunP WorkingDirectory = workingDir }; + // Restore cached step outputs from checkpoint + if (checkpoint is not null && completedSteps is not null) + { + foreach (var (key, value) in checkpoint.StepOutputs) + { + if (completedSteps.Contains(key)) + context.StepOutputs[key] = value; + } + } + // Build step tree using StepBuilder List steps; try @@ -84,6 +143,21 @@ public override async Task ExecuteAsync(CommandContext commandContext, RunP return 1; } + // Validate --from-step target exists + if (settings.FromStep is not null && !steps.Any(s => s.Name == settings.FromStep)) + { + renderer.RenderError($"Step '{settings.FromStep}' not found in pipeline."); + return 1; + } + + // Check if all steps are already completed (--resume with nothing left) + if (completedSteps is not null && resumeFromStep is null && steps.All(s => completedSteps.Contains(s.Name))) + { + renderer.RenderInfo("All steps already completed. Nothing to resume."); + checkpointManager.Delete(settings.File); + return 0; + } + using var cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { @@ -93,27 +167,39 @@ public override async Task ExecuteAsync(CommandContext commandContext, RunP }; // Execute pipeline — resolve templates for each step just before it runs - var success = await executor.RunAsync(steps, context, cts.Token, onBeforeStep: step => - { - if (step is DynamicStep dynamicStep) + var success = await executor.RunAsync(steps, context, cts.Token, + onBeforeStep: step => { - // Re-resolve the prompt and system prompt with latest step outputs - var allVars = new Dictionary(variables); - foreach (var (key, value) in context.StepOutputs) - allVars[$"steps.{key}"] = value; - - dynamicStep.UpdateResolvedPrompt( - PipelineConfigLoader.ResolveTemplate( - dynamicStep.OriginalPromptTemplate, allVars)); - - if (dynamicStep.OriginalSystemPromptTemplate is not null) + if (step is DynamicStep dynamicStep) { - dynamicStep.UpdateResolvedSystemPrompt( + // Re-resolve the prompt and system prompt with latest step outputs + var allVars = new Dictionary(variables); + foreach (var (key, value) in context.StepOutputs) + allVars[$"steps.{key}"] = value; + + dynamicStep.UpdateResolvedPrompt( PipelineConfigLoader.ResolveTemplate( - dynamicStep.OriginalSystemPromptTemplate, allVars)); + dynamicStep.OriginalPromptTemplate, allVars)); + + if (dynamicStep.OriginalSystemPromptTemplate is not null) + { + dynamicStep.UpdateResolvedSystemPrompt( + PipelineConfigLoader.ResolveTemplate( + dynamicStep.OriginalSystemPromptTemplate, allVars)); + } } - } - }); + }, + completedSteps: completedSteps, + resumeFromStep: resumeFromStep, + checkpointManager: checkpointManager, + pipelineFile: settings.File, + yamlHash: yamlHash, + pipelineName: config.Pipeline.Name); + + if (!success) + { + renderer.RenderResumeHint(settings.File); + } return success ? 0 : 1; } diff --git a/CodeGenesis.Engine/Cli/RunPipelineCommandSettings.cs b/CodeGenesis.Engine/Cli/RunPipelineCommandSettings.cs index 8515cff..ca6cd9f 100644 --- a/CodeGenesis.Engine/Cli/RunPipelineCommandSettings.cs +++ b/CodeGenesis.Engine/Cli/RunPipelineCommandSettings.cs @@ -1,4 +1,5 @@ using System.ComponentModel; +using Spectre.Console; using Spectre.Console.Cli; namespace CodeGenesis.Engine.Cli; @@ -20,4 +21,20 @@ public sealed class RunPipelineCommandSettings : CommandSettings [CommandOption("-m|--model")] [Description("Claude model override (e.g. claude-sonnet-4-6)")] public string? Model { get; set; } + + [CommandOption("--resume")] + [Description("Resume from the last checkpoint (skips completed steps)")] + public bool Resume { get; set; } + + [CommandOption("--from-step ")] + [Description("Resume from a specific step name (re-executes that step and all following)")] + public string? FromStep { get; set; } + + public override ValidationResult Validate() + { + if (Resume && FromStep is not null) + return ValidationResult.Error("--resume and --from-step are mutually exclusive."); + + return ValidationResult.Success(); + } } diff --git a/CodeGenesis.Engine/Pipeline/CheckpointManager.cs b/CodeGenesis.Engine/Pipeline/CheckpointManager.cs new file mode 100644 index 0000000..fdd8b40 --- /dev/null +++ b/CodeGenesis.Engine/Pipeline/CheckpointManager.cs @@ -0,0 +1,88 @@ +using System.Security.Cryptography; +using System.Text.Json; +using Microsoft.Extensions.Logging; + +namespace CodeGenesis.Engine.Pipeline; + +public sealed class CheckpointManager(ILogger logger) +{ + private static readonly JsonSerializerOptions JsonOptions = new() + { + WriteIndented = true, + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; + + public string GetCheckpointPath(string pipelineFilePath) + { + var fullPath = Path.GetFullPath(pipelineFilePath); + var dir = Path.GetDirectoryName(fullPath)!; + var stem = Path.GetFileNameWithoutExtension(fullPath); + var checkpointDir = Path.Combine(dir, ".codegenesis"); + return Path.Combine(checkpointDir, $"{stem}.checkpoint.json"); + } + + public PipelineCheckpoint? Load(string pipelineFilePath) + { + var path = GetCheckpointPath(pipelineFilePath); + if (!File.Exists(path)) + return null; + + try + { + var json = File.ReadAllText(path); + return JsonSerializer.Deserialize(json, JsonOptions); + } + catch (Exception ex) + { + logger.LogWarning(ex, "Failed to load checkpoint from {Path}, ignoring", path); + return null; + } + } + + public void Save(PipelineCheckpoint checkpoint, string pipelineFilePath) + { + var path = GetCheckpointPath(pipelineFilePath); + var dir = Path.GetDirectoryName(path)!; + Directory.CreateDirectory(dir); + + var json = JsonSerializer.Serialize(checkpoint, JsonOptions); + var tmpPath = path + ".tmp"; + + try + { + File.WriteAllText(tmpPath, json); + File.Move(tmpPath, path, overwrite: true); + logger.LogDebug("Checkpoint saved to {Path}", path); + } + catch (Exception ex) + { + logger.LogWarning(ex, "Failed to save checkpoint to {Path}", path); + // Clean up temp file if it exists + try { File.Delete(tmpPath); } catch { /* best effort */ } + } + } + + public void Delete(string pipelineFilePath) + { + var path = GetCheckpointPath(pipelineFilePath); + try + { + if (File.Exists(path)) + { + File.Delete(path); + logger.LogDebug("Checkpoint deleted: {Path}", path); + } + } + catch (Exception ex) + { + logger.LogWarning(ex, "Failed to delete checkpoint at {Path}", path); + } + } + + public string ComputeFileHash(string filePath) + { + var bytes = File.ReadAllBytes(filePath); + var hash = SHA256.HashData(bytes); + return Convert.ToHexStringLower(hash); + } +} diff --git a/CodeGenesis.Engine/Pipeline/PipelineCheckpoint.cs b/CodeGenesis.Engine/Pipeline/PipelineCheckpoint.cs new file mode 100644 index 0000000..745d166 --- /dev/null +++ b/CodeGenesis.Engine/Pipeline/PipelineCheckpoint.cs @@ -0,0 +1,22 @@ +namespace CodeGenesis.Engine.Pipeline; + +public sealed class PipelineCheckpoint +{ + public int Version { get; init; } = 1; + public required string PipelineFile { get; init; } + public required string PipelineName { get; init; } + public required string YamlHash { get; init; } + public required DateTime LastUpdatedUtc { get; init; } + public required List CompletedSteps { get; init; } + public required Dictionary StepOutputs { get; init; } + public string? FailedStepName { get; init; } + public required CheckpointMetrics Metrics { get; init; } +} + +public sealed class CheckpointMetrics +{ + public int TotalInputTokens { get; init; } + public int TotalOutputTokens { get; init; } + public double TotalCostUsd { get; init; } + public int StepsCompleted { get; init; } +} diff --git a/CodeGenesis.Engine/Pipeline/PipelineExecutor.cs b/CodeGenesis.Engine/Pipeline/PipelineExecutor.cs index b4e410f..6f54a73 100644 --- a/CodeGenesis.Engine/Pipeline/PipelineExecutor.cs +++ b/CodeGenesis.Engine/Pipeline/PipelineExecutor.cs @@ -9,29 +9,91 @@ public sealed class PipelineExecutor( PipelineRenderer renderer, ILogger logger) : IStepExecutor { + /// + /// IStepExecutor implementation — used by composite steps (foreach, parallel) internally. + /// No resume logic; executes all steps unconditionally. + /// + async Task IStepExecutor.RunAsync( + IReadOnlyList steps, + PipelineContext context, + CancellationToken ct, + Action? onBeforeStep) + { + return await RunCoreAsync(steps, context, ct, onBeforeStep, + completedSteps: null, resumeFromStep: null, + checkpointManager: null, pipelineFile: null, yamlHash: null, pipelineName: null); + } + + /// + /// Public entry point called by RunPipelineCommand — supports resume parameters. + /// public async Task RunAsync( IReadOnlyList steps, PipelineContext context, CancellationToken ct, - Action? onBeforeStep = null) + Action? onBeforeStep = null, + HashSet? completedSteps = null, + string? resumeFromStep = null, + CheckpointManager? checkpointManager = null, + string? pipelineFile = null, + string? yamlHash = null, + string? pipelineName = null) + { + return await RunCoreAsync(steps, context, ct, onBeforeStep, + completedSteps, resumeFromStep, + checkpointManager, pipelineFile, yamlHash, pipelineName); + } + + private async Task RunCoreAsync( + IReadOnlyList steps, + PipelineContext context, + CancellationToken ct, + Action? onBeforeStep, + HashSet? completedSteps, + string? resumeFromStep, + CheckpointManager? checkpointManager, + string? pipelineFile, + string? yamlHash, + string? pipelineName) { + var isResuming = completedSteps is not null; + var doneSkipping = !isResuming; + renderer.RenderPipelineStart(context, steps.Count); var pipelineSw = Stopwatch.StartNew(); for (var i = 0; i < steps.Count; i++) { var step = steps[i]; + + // Resume logic: skip completed steps until we reach the resume point + if (!doneSkipping) + { + if (resumeFromStep is not null && step.Name == resumeFromStep) + { + // Found the explicit --from-step target: stop skipping, execute this step + doneSkipping = true; + } + else if (resumeFromStep is null && !completedSteps!.Contains(step.Name)) + { + // --resume mode: stop skipping at the first non-completed step + doneSkipping = true; + } + else + { + // This step was completed before — skip it + renderer.RenderStepSkipped(step, i + 1, steps.Count); + context.StepsCompleted++; + continue; + } + } + onBeforeStep?.Invoke(step); renderer.RenderStepStart(step, i + 1, steps.Count); StepResult result; try { - // Foreach renders its own sequential progress interleaved with sub-steps. - // Parallel/ParallelForeach use their own Live table display. - // ApprovalStep requires interactive Console input. - // None of these can be wrapped in a spinner (Spectre.Console - // does not allow concurrent interactive displays). if (step is ForeachStep or ParallelStep or ParallelForeachStep or ApprovalStep) result = await step.ExecuteAsync(context, ct); else @@ -48,6 +110,7 @@ public async Task RunAsync( context.FailureReason = "Pipeline was cancelled."; pipelineSw.Stop(); context.TotalDuration = pipelineSw.Elapsed; + SaveCheckpointOnFailure(checkpointManager, pipelineFile, yamlHash, pipelineName, context, steps, i, step.Name); renderer.RenderPipelineFailed(context); return false; } @@ -60,6 +123,7 @@ public async Task RunAsync( context.FailureReason = ex.Message; pipelineSw.Stop(); context.TotalDuration = pipelineSw.Elapsed; + SaveCheckpointOnFailure(checkpointManager, pipelineFile, yamlHash, pipelineName, context, steps, i, step.Name); renderer.RenderPipelineFailed(context); return false; } @@ -74,16 +138,102 @@ public async Task RunAsync( context.FailureReason = result.Error; pipelineSw.Stop(); context.TotalDuration = pipelineSw.Elapsed; + SaveCheckpointOnFailure(checkpointManager, pipelineFile, yamlHash, pipelineName, context, steps, i, step.Name); renderer.RenderPipelineFailed(context); return false; } context.StepsCompleted++; + + // Save checkpoint after each successful step + SaveCheckpointAfterStep(checkpointManager, pipelineFile, yamlHash, pipelineName, context, steps, i); } pipelineSw.Stop(); context.TotalDuration = pipelineSw.Elapsed; renderer.RenderPipelineSummary(context); + + // Pipeline completed successfully — delete checkpoint + if (checkpointManager is not null && pipelineFile is not null) + checkpointManager.Delete(pipelineFile); + return true; } + + private static void SaveCheckpointAfterStep( + CheckpointManager? checkpointManager, + string? pipelineFile, + string? yamlHash, + string? pipelineName, + PipelineContext context, + IReadOnlyList steps, + int currentIndex) + { + if (checkpointManager is null || pipelineFile is null || yamlHash is null || pipelineName is null) + return; + + // All steps up to and including currentIndex have completed + var completed = steps.Take(currentIndex + 1).Select(s => s.Name).ToList(); + + var checkpoint = new PipelineCheckpoint + { + PipelineFile = Path.GetFileName(pipelineFile), + PipelineName = pipelineName, + YamlHash = yamlHash, + LastUpdatedUtc = DateTime.UtcNow, + CompletedSteps = completed, + StepOutputs = new Dictionary(context.StepOutputs), + FailedStepName = null, + Metrics = new CheckpointMetrics + { + TotalInputTokens = context.TotalInputTokens, + TotalOutputTokens = context.TotalOutputTokens, + TotalCostUsd = context.TotalCostUsd, + StepsCompleted = context.StepsCompleted + } + }; + + checkpointManager.Save(checkpoint, pipelineFile); + } + + private static void SaveCheckpointOnFailure( + CheckpointManager? checkpointManager, + string? pipelineFile, + string? yamlHash, + string? pipelineName, + PipelineContext context, + IReadOnlyList steps, + int failedIndex, + string failedStepName) + { + if (checkpointManager is null || pipelineFile is null || yamlHash is null || pipelineName is null) + return; + + var completed = new List(); + for (var j = 0; j < failedIndex; j++) + { + if (context.StepOutputs.ContainsKey(steps[j].Name)) + completed.Add(steps[j].Name); + } + + var checkpoint = new PipelineCheckpoint + { + PipelineFile = Path.GetFileName(pipelineFile), + PipelineName = pipelineName, + YamlHash = yamlHash, + LastUpdatedUtc = DateTime.UtcNow, + CompletedSteps = completed, + StepOutputs = new Dictionary(context.StepOutputs), + FailedStepName = failedStepName, + Metrics = new CheckpointMetrics + { + TotalInputTokens = context.TotalInputTokens, + TotalOutputTokens = context.TotalOutputTokens, + TotalCostUsd = context.TotalCostUsd, + StepsCompleted = context.StepsCompleted + } + }; + + checkpointManager.Save(checkpoint, pipelineFile); + } } diff --git a/CodeGenesis.Engine/Program.cs b/CodeGenesis.Engine/Program.cs index 3c0cd1c..263fab7 100644 --- a/CodeGenesis.Engine/Program.cs +++ b/CodeGenesis.Engine/Program.cs @@ -43,6 +43,7 @@ services.AddSingleton(); services.AddSingleton(); services.AddSingleton(sp => sp.GetRequiredService()); + services.AddSingleton(); services.AddSingleton(); // CLI commands @@ -67,7 +68,9 @@ config.AddCommand("run-pipeline") .WithDescription("Run a pipeline from a YAML configuration file") .WithExample("run-pipeline", "examples/hello-world.yml") - .WithExample("run-pipeline", "examples/hello-world.yml", "--input", "task=\"Create a Python script\""); + .WithExample("run-pipeline", "examples/hello-world.yml", "--input", "task=\"Create a Python script\"") + .WithExample("run-pipeline", "examples/hello-world.yml", "--resume") + .WithExample("run-pipeline", "examples/hello-world.yml", "--from-step", "generate"); }); return await app.RunAsync(args); diff --git a/CodeGenesis.Engine/UI/PipelineRenderer.cs b/CodeGenesis.Engine/UI/PipelineRenderer.cs index 9ec196d..0fec4a8 100644 --- a/CodeGenesis.Engine/UI/PipelineRenderer.cs +++ b/CodeGenesis.Engine/UI/PipelineRenderer.cs @@ -206,6 +206,25 @@ public void RenderThinking(string label, string message) $"{Indent} [{ConsoleTheme.SubtleTag}]\U0001F4AD {label.EscapeMarkup()}: {message.EscapeMarkup()}[/]"); } + public void RenderStepSkipped(IPipelineStep step, int index, int total) + { + if (IsSuppressed) return; + AnsiConsole.MarkupLine( + $"{Indent}[{ConsoleTheme.SubtleTag}]{ConsoleTheme.Dash} Step {index}/{total}[/] " + + $"[{ConsoleTheme.SubtleTag}]{step.Name.EscapeMarkup()}[/] " + + $"[{ConsoleTheme.MutedTag}](cached)[/]"); + } + + public void RenderResumeHint(string pipelineFile) + { + if (IsSuppressed) return; + AnsiConsole.WriteLine(); + AnsiConsole.MarkupLine( + $" [{ConsoleTheme.SecondaryTag}]Tip:[/] Fix the issue, then resume with:"); + AnsiConsole.MarkupLine( + $" [{ConsoleTheme.PrimaryTag}]codegenesis run-pipeline {pipelineFile.EscapeMarkup()} --resume[/]"); + } + public void RenderStepComplete(IPipelineStep step, StepResult result) { if (IsSuppressed) return;