Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions CodeGenesis.Engine/Pipeline/PipelineExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ public async Task<bool> RunAsync(
StepResult result;
try
{
// Foreach renders its own sequential progress interleaved with sub-steps;
// wrapping it in a spinner would swallow that output.
// Parallel/ParallelForeach suppress sub-step rendering, so the spinner
// runs at the bottom while branch/item completions appear above it.
// ApprovalStep requires interactive Console input — no spinner.
if (step is ForeachStep or ApprovalStep)
// Foreach renders its own sequential progress interleaved with sub-steps.
// Parallel/ParallelForeach use their own Live table display.
// ApprovalStep requires interactive Console input.
// None of these can be wrapped in a spinner (Spectre.Console
// does not allow concurrent interactive displays).
if (step is ForeachStep or ParallelStep or ParallelForeachStep or ApprovalStep)
result = await step.ExecuteAsync(context, ct);
else
result = await renderer.RunWithSpinner(
Expand Down
153 changes: 77 additions & 76 deletions CodeGenesis.Engine/Steps/ParallelForeachStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ public async Task<StepResult> ExecuteAsync(PipelineContext context, Cancellation
var collectionRaw = resolveTemplate(config.Collection, allVars);
var items = CollectionParser.Parse(collectionRaw);

renderer.RenderParallelForeachStart(config.ItemVar, items.Count, config.MaxConcurrency);

if (items.Count == 0)
{
sw.Stop();
Expand All @@ -46,104 +44,107 @@ public async Task<StepResult> ExecuteAsync(PipelineContext context, Cancellation
};
}

var concurrencyInfo = config.MaxConcurrency.HasValue
? $"max {config.MaxConcurrency}"
: "unlimited";
var detail = $"{config.ItemVar} {items.Count} item(s) concurrency: {concurrencyInfo}";

var maxConcurrency = config.MaxConcurrency ?? int.MaxValue;
using var semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ct);

var iterationResults = new (bool Success, PipelineContext Context, string Item, int Index)[items.Count];
var tasks = new Task[items.Count];

for (var i = 0; i < items.Count; i++)
await renderer.RunParallelWithLiveTable(items, "parallel_foreach", detail, async liveTable =>
{
var index = i;
var item = items[i];
var tasks = new Task[items.Count];

tasks[i] = Task.Run(async () =>
for (var i = 0; i < items.Count; i++)
{
await semaphore.WaitAsync(linkedCts.Token);
try
{
// Suppress all sub-step rendering; only item-level messages show
renderer.PushScope();
renderer.SuppressRendering();
var index = i;
var item = items[i];

// Skip null or empty items
if (string.IsNullOrWhiteSpace(item))
tasks[i] = Task.Run(async () =>
{
await semaphore.WaitAsync(linkedCts.Token);
try
{
renderer.ResumeRendering();
renderer.RenderParallelForeachItemStart("(empty)", index, items.Count);
// Suppress all sub-step rendering inside parallel
renderer.PushScope();
renderer.SuppressRendering();
var emptyCtx = new PipelineContext

// Skip null or empty items
if (string.IsNullOrWhiteSpace(item))
{
var emptyCtx = new PipelineContext
{
TaskDescription = context.TaskDescription,
WorkingDirectory = context.WorkingDirectory
};
liveTable.MarkStarted(index);
liveTable.MarkComplete(index, true, TimeSpan.Zero, 0, 0);
iterationResults[index] = (true, emptyCtx, item, index);
return;
}

liveTable.MarkStarted(index);

// Create isolated context for this iteration
var iterationContext = new PipelineContext
{
TaskDescription = context.TaskDescription,
WorkingDirectory = context.WorkingDirectory
WorkingDirectory = context.WorkingDirectory,
StatusUpdate = msg => liveTable.UpdateActivity(index, msg)
};
iterationResults[index] = (true, emptyCtx, item, index);
return;
}

// Temporarily resume to render our own item-level message
renderer.ResumeRendering();
renderer.RenderParallelForeachItemStart(item, index, items.Count);
renderer.SuppressRendering();
// Copy parent step outputs so sub-steps can read them
foreach (var (key, value) in context.StepOutputs)
iterationContext.StepOutputs[key] = value;

// Create isolated context for this iteration
var iterationContext = new PipelineContext
{
TaskDescription = context.TaskDescription,
WorkingDirectory = context.WorkingDirectory,
StatusUpdate = msg => renderer.RenderThinking(item, msg)
};

// Copy parent step outputs so sub-steps can read them
foreach (var (key, value) in context.StepOutputs)
iterationContext.StepOutputs[key] = value;

// Build loop variables for this iteration
var iterVars = new Dictionary<string, string>(allVars)
{
["loop.item"] = item,
["loop.index"] = index.ToString(),
[config.ItemVar] = item
};
// Build loop variables for this iteration
var iterVars = new Dictionary<string, string>(allVars)
{
["loop.item"] = item,
["loop.index"] = index.ToString(),
[config.ItemVar] = item
};

// Clone sub-steps for this iteration so parallel threads don't share mutable state
var clonedSubSteps = CloneSubSteps(subSteps, iterVars);
// Clone sub-steps for this iteration so parallel threads don't share mutable state
var clonedSubSteps = CloneSubSteps(subSteps, iterVars);

var iterSw = Stopwatch.StartNew();
var iterSw = Stopwatch.StartNew();

var success = await executor.RunAsync(clonedSubSteps, iterationContext, linkedCts.Token,
onBeforeStep: step => ResolveBeforeStep(step, iterVars, iterationContext));
var success = await executor.RunAsync(clonedSubSteps, iterationContext, linkedCts.Token,
onBeforeStep: step => ResolveBeforeStep(step, iterVars, iterationContext));

iterSw.Stop();
iterSw.Stop();

iterationResults[index] = (success, iterationContext, item, index);
iterationResults[index] = (success, iterationContext, item, index);

if (!success && config.FailFast)
await linkedCts.CancelAsync();
if (!success && config.FailFast)
await linkedCts.CancelAsync();

// Resume rendering for our own completion message
renderer.ResumeRendering();
var iterTokens = iterationContext.TotalInputTokens + iterationContext.TotalOutputTokens;
renderer.RenderParallelForeachItemComplete(item, index, items.Count, success, iterSw.Elapsed, iterTokens, iterationContext.TotalCostUsd);
}
finally
{
renderer.ResumeRendering();
renderer.PopScope();
semaphore.Release();
}
}, linkedCts.Token);
}
var iterTokens = iterationContext.TotalInputTokens + iterationContext.TotalOutputTokens;
liveTable.MarkComplete(index, success, iterSw.Elapsed, iterTokens, iterationContext.TotalCostUsd);
}
finally
{
renderer.ResumeRendering();
renderer.PopScope();
semaphore.Release();
}
}, linkedCts.Token);
}

try
{
await Task.WhenAll(tasks);
}
catch (OperationCanceledException) when (config.FailFast)
{
// Expected when fail_fast cancels siblings
}
try
{
await Task.WhenAll(tasks);
}
catch (OperationCanceledException) when (config.FailFast)
{
// Expected when fail_fast cancels siblings
}
});

sw.Stop();

Expand Down Expand Up @@ -191,7 +192,7 @@ public async Task<StepResult> ExecuteAsync(PipelineContext context, Cancellation

var succeeded = iterationResults.Count(r => r.Success);
var failed = items.Count - succeeded;
renderer.RenderParallelForeachEnd(items.Count, succeeded, failed);
renderer.RenderParallelSummary(items.Count, succeeded, failed);

return new StepResult
{
Expand Down
Loading