diff --git a/CHANGELOG.md b/CHANGELOG.md index ec09f00..72acda8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## v0.3 — in progress + +### What's new + +- **Parallel step execution (SPEC §29, #117)** — `async: true` marks non-blocking steps; `depends_on: [...]` declares dependencies; `action: join` synchronizes and merges branch results. String join (default, labelled headers in declaration order) and structured join (JSON merge when all deps declare `output_schema`) with optional `output_schema` validation on the join. Error modes: `fail_fast` (default) and `wait_for_all`. Pipeline-wide concurrency cap via `defaults.max_concurrency`. Uses `std::thread::scope` — no async runtime added. Complete parse-time validation (orphan detection, forward references, cycles, concurrent session conflicts, structured-join compatibility). Turn log entries tagged with `concurrent_group`, `launched_at`, `completed_at`. Template resolution for `{{ step... }}` dotted-path access. +- **Sampling parameter control (SPEC §30, #120)** — three-scope `sampling:` block (pipeline / provider / per-step) with field-level merge; temperature, top_p, top_k, max_tokens, stop_sequences, thinking; runner-specific quantization. + ## v0.2 — 2026-04-05 ### What works (all v0.1 features plus) diff --git a/CLAUDE.md b/CLAUDE.md index 9d015b6..9765b38 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -155,6 +155,9 @@ If nothing found → passthrough mode (safe zero-config default). | `{{ for_each. }}` | Current item value under the declared `as:` name (e.g. `{{ for_each.task }}` when `as: task`) | | `{{ for_each.index }}` | Current 1-based item index (only inside `for_each:` body) | | `{{ for_each.total }}` | Total number of items in the collection, after `max_items` cap (only inside `for_each:` body) | +| `{{ step..response }}` | Concatenated string output of an `action: join` step (SPEC §29.4) | +| `{{ step...response }}` | Full structured output of a named dependency in a structured join (SPEC §29.5) | +| `{{ step... }}` | Specific field within a namespaced structured dependency output (SPEC §29.5) | Note: `{{ session.invocation_prompt }}` is a supported alias for `{{ step.invocation.prompt }}` in the implementation but is deprecated — prefer the canonical form. @@ -185,6 +188,7 @@ Unresolved variables **abort with a typed error** — never silently empty. - `pipeline:` step bodies support both file-based sub-pipelines and named pipeline references (SPEC §9, §10) - `do_while:` fully implemented (§27): parse-time validation, executor loop, template vars, step ID namespacing, break/abort_pipeline, shared depth guard (MAX_LOOP_DEPTH=8). `on_max_iterations` field defaults to `abort_pipeline` (configurable variant not yet implemented). Controlled-mode executor events deferred. - `for_each:` fully implemented (§28): parse-time validation, runtime array iteration, item scope, template vars, break/abort_pipeline, max_items cap, shared depth guard with do_while. Controlled-mode executor events deferred. +- `async:` / `depends_on:` / `action: join` fully implemented (§29): `std::thread::scope`-based parallel dispatch, session forking (clean HTTP store for `resume: false`), string-join (§29.4) and structured-join (§29.5), `on_error: fail_fast`/`wait_for_all`, `defaults.max_concurrency` semaphore, all parse-time validation rules (orphan detection, forward refs, cycle detection, concurrent resume conflict, structured-join compatibility), turn log `concurrent_group`/`launched_at`/`completed_at`, dotted-path template resolution (`{{ step... }}`). Mid-flight runner-level cancellation for `fail_fast` is best-effort (branches complete; first error propagates). Controlled-mode executor events for async launches are deferred. - `output_schema` / `input_schema` (§26): JSON Schema validation at parse time and runtime. `schema-as-file-path` variant (§26.1) not yet implemented — schemas must be inline. - `do_while[N]` indexed iteration access (§27.4) is specified but not implemented — template resolver only exposes the final iteration - `pipeline:` as alternative to inline `steps:` is supported in both `do_while:` and `for_each:` loop bodies diff --git a/ail-core/CLAUDE.md b/ail-core/CLAUDE.md index 96aa891..38cfa31 100644 --- a/ail-core/CLAUDE.md +++ b/ail-core/CLAUDE.md @@ -23,6 +23,7 @@ Consumed by `ail` (the binary) and future language-server / SDK targets. | `executor/headless.rs` | `execute()` — headless mode entry point using `NullObserver` | | `executor/controlled.rs` | `execute_with_control()` — TUI-controlled mode with `ChannelObserver` | | `executor/events.rs` | `ExecuteOutcome`, `ExecutionControl`, `ExecutorEvent` | +| `executor/parallel.rs` | `ConcurrencySemaphore`, `BranchResult`, `merge_join_results()`, timestamp helpers — SPEC §29 parallel step dispatch primitives | | `executor/helpers/mod.rs` | Re-exports all helper functions for the executor | | `executor/helpers/invocation.rs` | `run_invocation_step()` — host-managed invocation step lifecycle | | `executor/helpers/runner_resolution.rs` | `resolve_step_provider()`, `resolve_step_sampling()` (SPEC §30.3 three-scope merge), `build_step_runner_box()`, `resolve_effective_runner_name()` | @@ -69,7 +70,8 @@ pub struct Pipeline { pub steps: Vec, pub source: Option, pub def // default_tools: pipeline-wide fallback; per-step tools override entirely (SPEC §3.2) // named_pipelines: named pipeline definitions from the `pipelines:` section (SPEC §10) // sampling_defaults: pipeline-wide sampling baseline (SPEC §30.2); orthogonal to provider-attached sampling on `defaults: ProviderConfig` -pub struct Step { pub id: StepId, pub body: StepBody, pub message: Option, pub tools: Option, pub on_result: Option>, pub model: Option, pub runner: Option, pub condition: Option, pub append_system_prompt: Option>, pub system_prompt: Option, pub resume: bool, pub on_error: Option, pub before: Vec, pub then: Vec, pub output_schema: Option, pub input_schema: Option, pub sampling: Option } +pub struct Step { pub id: StepId, pub body: StepBody, pub message: Option, pub tools: Option, pub on_result: Option>, pub model: Option, pub runner: Option, pub condition: Option, pub append_system_prompt: Option>, pub system_prompt: Option, pub resume: bool, pub async_step: bool, pub depends_on: Vec, pub on_error: Option, pub before: Vec, pub then: Vec, pub output_schema: Option, pub input_schema: Option, pub sampling: Option } +// async_step / depends_on: parallel execution primitives (SPEC §29). async_step=true marks a non-blocking step; depends_on lists step IDs this step waits for. // before: private pre-processing chain (SPEC §5.10) — runs before the step fires // then: private post-processing chain (SPEC §5.7) — runs after the step completes // output_schema: optional JSON Schema for validating step output (SPEC §26.1); validated at parse time, response validated at runtime @@ -84,7 +86,8 @@ pub enum StepBody { Prompt(String), Skill { name: String }, SubPipeline { path: // SubPipeline.path may contain {{ variable }} syntax — resolved at execution time (SPEC §11) // SubPipeline.prompt: when Some, overrides child session's invocation_prompt instead of using parent's last_response (SPEC §9.3) pub enum ContextSource { Shell(String) } -pub enum ActionKind { PauseForHuman, ModifyOutput { headless_behavior: HitlHeadlessBehavior, default_value: Option } } +pub enum ActionKind { PauseForHuman, ModifyOutput { headless_behavior: HitlHeadlessBehavior, default_value: Option }, Join { on_error_mode: JoinErrorMode } } +pub enum JoinErrorMode { FailFast, WaitForAll } // SPEC §29.7 — default FailFast pub enum HitlHeadlessBehavior { Skip, Abort, UseDefault } pub struct ResultBranch { pub matcher: ResultMatcher, pub action: ResultAction } pub enum ResultMatcher { Contains(String), ExitCode(ExitCodeMatch), Field { name: String, equals: serde_json::Value }, Always } diff --git a/ail-core/src/config/validation/mod.rs b/ail-core/src/config/validation/mod.rs index 666a4e2..fc927cb 100644 --- a/ail-core/src/config/validation/mod.rs +++ b/ail-core/src/config/validation/mod.rs @@ -461,10 +461,14 @@ fn validate_parallel_constraints(steps: &[Step]) -> Result<(), AilError> { // Build a set of step IDs in declaration order for forward-reference detection. let step_ids: Vec<&str> = steps.iter().map(|s| s.id.as_str()).collect(); - // Quick exit: if no steps use async or depends_on, nothing to validate. - let has_parallel = steps - .iter() - .any(|s| s.async_step || !s.depends_on.is_empty()); + // Quick exit: if no steps use async, depends_on, or action:join, nothing + // to validate. Checking for `action: join` catches the edge case of a + // malformed `action: join` step without `depends_on` — still a parse error. + let has_parallel = steps.iter().any(|s| { + s.async_step + || !s.depends_on.is_empty() + || matches!(s.body, StepBody::Action(ActionKind::Join { .. })) + }); if !has_parallel { return Ok(()); } diff --git a/ail-core/src/executor/controlled.rs b/ail-core/src/executor/controlled.rs index c85fa42..44d7edb 100644 --- a/ail-core/src/executor/controlled.rs +++ b/ail-core/src/executor/controlled.rs @@ -246,7 +246,7 @@ impl<'a> StepObserver for ChannelObserver<'a> { /// Blocks on `hitl_rx.recv()` when a `pause_for_human` step is reached. pub fn execute_with_control( session: &mut Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), control: &ExecutionControl, disabled_steps: &HashSet, event_tx: mpsc::Sender, diff --git a/ail-core/src/executor/core.rs b/ail-core/src/executor/core.rs index eb7b449..930a2c3 100644 --- a/ail-core/src/executor/core.rs +++ b/ail-core/src/executor/core.rs @@ -8,17 +8,18 @@ #![allow(clippy::result_large_err)] use crate::config::domain::{ - ActionKind, Condition, ConditionExpr, ContextSource, OnError, OnMaxItems, ResultAction, Step, - StepBody, StepId, MAX_LOOP_DEPTH, + ActionKind, Condition, ConditionExpr, ContextSource, JoinErrorMode, OnError, OnMaxItems, + ResultAction, Step, StepBody, StepId, MAX_LOOP_DEPTH, }; use crate::error::AilError; -use crate::runner::{InvokeOptions, RunResult, Runner}; +use crate::runner::{CancelToken, InvokeOptions, RunResult, Runner}; use crate::session::turn_log::TurnEntry; use crate::session::{DoWhileContext, ForEachContext, Session}; use super::dispatch; use super::events::ExecuteOutcome; use super::helpers::{evaluate_condition, evaluate_on_result}; +use super::parallel; // ── Observer trait ──────────────────────────────────────────────────────────── @@ -230,10 +231,10 @@ impl StepObserver for NullObserver { /// /// Returns `Some(ResultAction)` if the step's on_result matched and the caller /// should handle that action, or `None` if no on_result matched / no on_result defined. -fn execute_single_step( +pub(super) fn execute_single_step( step: &Step, session: &mut Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), observer: &mut O, depth: usize, total_steps: usize, @@ -259,16 +260,13 @@ fn execute_single_step( return Ok(None); } - // action: join — synchronization barrier, handled by the parallel execution engine - // in execute_core(). If we reach here, the join logic in execute_core() should have - // already processed this step. This arm is a safeguard for direct calls. + // action: join — synchronization barrier. Handled in execute_core_with_parallel + // before this function is reached. If a join step reaches here, it means there + // were no async steps to coordinate; produce an empty join entry as a no-op. if let StepBody::Action(ActionKind::Join { .. }) = &step.body { - // Join steps are processed by the parallel execution coordinator, not dispatched - // as individual steps. If this is reached from a non-parallel context (e.g. a - // pipeline with action:join but no async steps), produce an empty entry. let entry = TurnEntry { step_id: step_id.clone(), - prompt: "join".to_string(), + prompt: "join (no async deps)".to_string(), response: Some(String::new()), ..Default::default() }; @@ -630,7 +628,7 @@ fn execute_single_step( fn execute_chain_steps( chain: &[Step], session: &mut Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), observer: &mut O, depth: usize, ) -> Result<(), AilError> { @@ -692,7 +690,7 @@ fn execute_do_while( exit_when: &ConditionExpr, inner_steps: &[Step], session: &mut Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), observer: &mut O, depth: usize, ) -> Result { @@ -741,7 +739,7 @@ fn execute_do_while_inner( exit_when: &ConditionExpr, inner_steps: &[Step], session: &mut Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), observer: &mut O, depth: usize, ) -> Result { @@ -983,7 +981,7 @@ fn execute_for_each( on_max_items: &OnMaxItems, inner_steps: &[Step], session: &mut Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), observer: &mut O, depth: usize, ) -> Result { @@ -1076,7 +1074,7 @@ fn execute_for_each_inner( effective_count: u64, inner_steps: &[Step], session: &mut Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), observer: &mut O, depth: usize, ) -> Result { @@ -1356,7 +1354,7 @@ fn validate_input_schema( /// Early exit only via explicit declared outcomes — never silent failures. pub(super) fn execute_core( session: &mut Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), observer: &mut O, depth: usize, ) -> Result { @@ -1373,98 +1371,492 @@ pub(super) fn execute_core( // while iterating step bodies. let steps: Vec<_> = session.pipeline.steps.clone(); - for (step_index, step) in steps.iter().enumerate() { - let step_id = step.id.as_str().to_string(); - - // Evaluate the condition — `None` means always run. - let condition_skip = if let Some(ref cond) = step.condition { - !evaluate_condition(cond, session, &step_id)? - } else { - false - }; - - match observer.before_step(&step_id, step_index, condition_skip) { - BeforeStepAction::Run => {} - BeforeStepAction::Skip => continue, - BeforeStepAction::Stop => break, - } + // Fast path: if no async steps, take the sequential path. This avoids the + // scoped-thread machinery entirely for the common case. + let has_async = steps.iter().any(|s| s.async_step); + if !has_async { + return execute_core_sequential(session, runner, observer, depth, &steps, total_steps); + } - tracing::info!(run_id = %session.run_id, step_id = %step_id, "executing step"); + execute_core_with_parallel(session, runner, observer, depth, &steps, total_steps) +} - // Execute the step (including before/then chains) and get on_result action. - let matched_action = execute_single_step( +/// Sequential execution path (no async steps). Preserves the pre-§29 behavior. +fn execute_core_sequential( + session: &mut Session, + runner: &(dyn Runner + Sync), + observer: &mut O, + depth: usize, + steps: &[Step], + total_steps: usize, +) -> Result { + for (step_index, step) in steps.iter().enumerate() { + match dispatch_top_level_step( step, + step_index, session, runner, observer, depth, total_steps, - step_index, - )?; + )? { + LoopControl::Continue => {} + LoopControl::Skip => continue, + LoopControl::Break => break, + LoopControl::Return(outcome) => return Ok(outcome), + } + } + + let outcome = ExecuteOutcome::Completed; + observer.on_pipeline_done(&outcome); + Ok(outcome) +} + +/// Parallel execution path (SPEC §29). Wraps the iteration in +/// [`std::thread::scope`] so async steps can be spawned as scoped threads +/// that run concurrently with subsequent sequential steps. +fn execute_core_with_parallel( + session: &mut Session, + runner: &(dyn Runner + Sync), + observer: &mut O, + depth: usize, + steps: &[Step], + total_steps: usize, +) -> Result { + let concurrent_group = parallel::new_concurrent_group_id(); + let cancel_token = CancelToken::new(); + let max_concurrency = session.pipeline.max_concurrency; + + // Semaphore enforcing the pipeline-wide `defaults.max_concurrency` cap. + let effective_max = max_concurrency + .map(|n| n as usize) + .filter(|n| *n > 0) + .unwrap_or(steps.len().max(1)); + let semaphore = std::sync::Arc::new(parallel::ConcurrencySemaphore::new(effective_max)); + + let outcome_cell: std::cell::RefCell>> = + std::cell::RefCell::new(None); + + std::thread::scope(|scope| { + // step_id → launch metadata for in-flight async branches. + let mut in_flight: std::collections::HashMap = + std::collections::HashMap::new(); + + for (step_index, step) in steps.iter().enumerate() { + let step_id = step.id.as_str().to_string(); + + // Evaluate condition — `None` means always run. + let condition_skip = match step.condition { + Some(ref cond) => match evaluate_condition(cond, session, &step_id) { + Ok(v) => !v, + Err(e) => { + *outcome_cell.borrow_mut() = Some(Err(e)); + return; + } + }, + None => false, + }; + + match observer.before_step(&step_id, step_index, condition_skip) { + BeforeStepAction::Run => {} + BeforeStepAction::Skip => continue, + BeforeStepAction::Stop => break, + } + + // ── Async launch ───────────────────────────────────────────── + if step.async_step { + let launched_at = parallel::iso8601(std::time::SystemTime::now()); + // Branches observe a snapshot of the session at launch time. + let isolated_http = !step.resume; + let branch_session = session.fork_for_branch(isolated_http); + let step_clone = step.clone(); + let sem = std::sync::Arc::clone(&semaphore); + let ct = cancel_token.clone(); + let group = concurrent_group.clone(); + let launched_at_c = launched_at.clone(); + + let handle = scope.spawn(move || { + if !sem.acquire(&ct) || ct.is_cancelled() { + let now = parallel::iso8601(std::time::SystemTime::now()); + return parallel::BranchResult { + step_id: step_clone.id.as_str().to_string(), + outcome: Err(AilError::runner_cancelled(format!( + "Step '{}' cancelled by sibling failure (fail_fast)", + step_clone.id.as_str() + ))), + launched_at: launched_at_c.clone(), + completed_at: now, + extra_entries: vec![], + }; + } - // Handle on_result action at the top-level pipeline level. - if let Some(action) = matched_action { - let pipeline_base_dir_buf: Option = session - .pipeline - .source - .as_deref() - .and_then(|p| p.parent()) - .map(|p| p.to_path_buf()); - let pipeline_base_dir = pipeline_base_dir_buf.as_deref(); - - match action { - ResultAction::Continue => {} - ResultAction::Break => { - tracing::info!( - run_id = %session.run_id, - step_id = %step_id, - "on_result break — stopping pipeline early" + let mut branch_session = branch_session; + let mut null_obs = NullObserver; + let parent_count = branch_session.turn_log.entries().len(); + let res = execute_single_step( + &step_clone, + &mut branch_session, + runner, + &mut null_obs, + depth, + 1, + 0, ); - let outcome = ExecuteOutcome::Break { - step_id: step_id.clone(), + let completed_at = parallel::iso8601(std::time::SystemTime::now()); + sem.release(); + + let mut branch_entries: Vec = + branch_session.turn_log.entries().to_vec(); + if branch_entries.len() >= parent_count { + branch_entries.drain(..parent_count); + } + + let outcome = match res { + Ok(_) => { + if let Some(mut entry) = branch_entries.pop() { + entry.concurrent_group = Some(group.clone()); + entry.launched_at = Some(launched_at_c.clone()); + entry.completed_at = Some(completed_at.clone()); + Ok(entry) + } else { + Ok(TurnEntry { + step_id: step_clone.id.as_str().to_string(), + prompt: String::new(), + response: Some(String::new()), + concurrent_group: Some(group.clone()), + launched_at: Some(launched_at_c.clone()), + completed_at: Some(completed_at.clone()), + ..Default::default() + }) + } + } + Err(e) => Err(e), }; - observer.on_pipeline_done(&outcome); - return Ok(outcome); + + parallel::BranchResult { + step_id: step_clone.id.as_str().to_string(), + outcome, + launched_at: launched_at_c, + completed_at, + extra_entries: branch_entries, + } + }); + + in_flight.insert( + step_id.clone(), + AsyncHandle { + handle, + launched_at, + }, + ); + continue; + } + + // ── Dependency barrier ─────────────────────────────────────── + // Collect any in-flight async results this step depends on. + let mut branch_results: Vec = Vec::new(); + if !step.depends_on.is_empty() { + for dep_id in &step.depends_on { + if let Some(ah) = in_flight.remove(dep_id.as_str()) { + match ah.handle.join() { + Ok(br) => branch_results.push(br), + Err(_) => { + *outcome_cell.borrow_mut() = Some(Err(AilError::runner_failed( + format!("Async branch '{}' panicked", dep_id.as_str()), + ))); + return; + } + } + } } - ResultAction::AbortPipeline => { - let err = AilError::PipelineAborted { - detail: format!("Step '{step_id}' on_result fired abort_pipeline"), - context: Some(crate::error::ErrorContext::for_step( - &session.run_id, - &step_id, - )), - }; - observer.on_pipeline_error(&err); - return Err(err); + + // Append successful branch entries to the main session's turn log. + // Failed branches do not produce entries here — their error surfaces + // via the join's on_error handling below. + for br in &branch_results { + if let Ok(entry) = &br.outcome { + session.turn_log.append(entry.clone()); + for extra in &br.extra_entries { + session.turn_log.append(extra.clone()); + } + } } - ResultAction::PauseForHuman => { - observer.on_result_pause(&step_id, step.message.as_deref()); + } + + // ── Join step: merge branch results, evaluate on_result ────── + if parallel::is_join_step(step) { + let mode = parallel::join_error_mode(step) + .cloned() + .unwrap_or(JoinErrorMode::FailFast); + + // fail_fast: if any branch failed, signal cancel for any + // later-arriving branches and surface the error via on_error. + let had_failure = branch_results.iter().any(|b| b.outcome.is_err()); + if had_failure && matches!(mode, JoinErrorMode::FailFast) { + cancel_token.cancel(); } - ResultAction::Pipeline { - ref path, - ref prompt, - } => { - // Use a derived step ID so the sub-pipeline's response is - // addressable as `{{ step.__on_result.response }}` without - // shadowing the parent step's own turn log entry (SPEC §11). - let on_result_step_id = format!("{step_id}__on_result"); - let sub_entry = dispatch::sub_pipeline::execute_sub_pipeline( - path, - prompt.as_deref(), - &on_result_step_id, - session, - runner, - depth, - pipeline_base_dir, - ) - .inspect_err(|e| observer.on_pipeline_error(e))?; - session.turn_log.append(sub_entry); + + let deps_in_order: Vec<&str> = step.depends_on.iter().map(|s| s.as_str()).collect(); + + let join_entry_res = + parallel::merge_join_results(step, &deps_in_order, &branch_results, &mode); + + let join_entry = match join_entry_res { + Ok(e) => e, + Err(e) => { + // Propagate via on_error handling path. For fail_fast + // this is typically PipelineAborted. + observer.on_step_failed(&step_id, e.detail()); + *outcome_cell.borrow_mut() = Some(Err(e)); + return; + } + }; + + // Run before: chain (no-op for join since it's a synthetic step). + // Validate output_schema against merged response if declared. + if let Some(ref schema) = step.output_schema { + let response = join_entry.response.as_deref().unwrap_or(""); + if let Err(e) = validate_join_output_schema(response, schema, &step_id) { + observer.on_step_failed(&step_id, e.detail()); + *outcome_cell.borrow_mut() = Some(Err(e)); + return; + } } + + session.turn_log.append(join_entry); + + // Evaluate on_result against the merged response. + let matched_action = if let Some(ref branches) = step.on_result { + let last_entry = session.turn_log.entries().last(); + last_entry.and_then(|e| evaluate_on_result(branches, e, None)) + } else { + None + }; + + if let Some(action) = matched_action { + match handle_on_result_action( + action, &step_id, step, session, runner, observer, depth, + ) { + Ok(LoopControl::Continue) => {} + Ok(LoopControl::Skip) => continue, + Ok(LoopControl::Break) => break, + Ok(LoopControl::Return(outcome)) => { + *outcome_cell.borrow_mut() = Some(Ok(outcome)); + return; + } + Err(e) => { + *outcome_cell.borrow_mut() = Some(Err(e)); + return; + } + } + } + continue; } + + // ── Normal sequential step ────────────────────────────────── + // Condition + before_step were already evaluated above; go + // straight to execute_single_step. + tracing::info!(run_id = %session.run_id, step_id = %step_id, "executing step"); + let matched_action = match execute_single_step( + step, + session, + runner, + observer, + depth, + total_steps, + step_index, + ) { + Ok(a) => a, + Err(e) => { + *outcome_cell.borrow_mut() = Some(Err(e)); + return; + } + }; + + if let Some(action) = matched_action { + match handle_on_result_action( + action, &step_id, step, session, runner, observer, depth, + ) { + Ok(LoopControl::Continue) => {} + Ok(LoopControl::Skip) => continue, + Ok(LoopControl::Break) => break, + Ok(LoopControl::Return(outcome)) => { + *outcome_cell.borrow_mut() = Some(Ok(outcome)); + return; + } + Err(e) => { + *outcome_cell.borrow_mut() = Some(Err(e)); + return; + } + } + } + } + + // Any remaining in-flight branches (e.g. pipeline ended without a + // join for them — should have been caught at parse time, but join + // them here to keep the scope clean). + let remaining: Vec<_> = in_flight.drain().collect(); + for (_, ah) in remaining { + let _ = ah.handle.join(); } + }); + + if let Some(res) = outcome_cell.into_inner() { + return res; } let outcome = ExecuteOutcome::Completed; observer.on_pipeline_done(&outcome); Ok(outcome) } + +struct AsyncHandle<'scope> { + handle: std::thread::ScopedJoinHandle<'scope, parallel::BranchResult>, + #[allow(dead_code)] + launched_at: String, +} + +enum LoopControl { + Continue, + Skip, + Break, + Return(ExecuteOutcome), +} + +/// Dispatch a single top-level step the same way the pre-§29 loop did. +/// Returns a [`LoopControl`] signalling how the outer loop should proceed. +#[allow(clippy::too_many_arguments)] +fn dispatch_top_level_step( + step: &Step, + step_index: usize, + session: &mut Session, + runner: &(dyn Runner + Sync), + observer: &mut O, + depth: usize, + total_steps: usize, +) -> Result { + let step_id = step.id.as_str().to_string(); + + // Evaluate the condition — `None` means always run. + let condition_skip = if let Some(ref cond) = step.condition { + !evaluate_condition(cond, session, &step_id)? + } else { + false + }; + + match observer.before_step(&step_id, step_index, condition_skip) { + BeforeStepAction::Run => {} + BeforeStepAction::Skip => return Ok(LoopControl::Skip), + BeforeStepAction::Stop => return Ok(LoopControl::Break), + } + + tracing::info!(run_id = %session.run_id, step_id = %step_id, "executing step"); + + let matched_action = execute_single_step( + step, + session, + runner, + observer, + depth, + total_steps, + step_index, + )?; + + if let Some(action) = matched_action { + handle_on_result_action(action, &step_id, step, session, runner, observer, depth) + } else { + Ok(LoopControl::Continue) + } +} + +fn handle_on_result_action( + action: ResultAction, + step_id: &str, + step: &Step, + session: &mut Session, + runner: &(dyn Runner + Sync), + observer: &mut O, + depth: usize, +) -> Result { + let pipeline_base_dir_buf: Option = session + .pipeline + .source + .as_deref() + .and_then(|p| p.parent()) + .map(|p| p.to_path_buf()); + let pipeline_base_dir = pipeline_base_dir_buf.as_deref(); + + match action { + ResultAction::Continue => Ok(LoopControl::Continue), + ResultAction::Break => { + tracing::info!( + run_id = %session.run_id, + step_id = %step_id, + "on_result break — stopping pipeline early" + ); + let outcome = ExecuteOutcome::Break { + step_id: step_id.to_string(), + }; + observer.on_pipeline_done(&outcome); + Ok(LoopControl::Return(outcome)) + } + ResultAction::AbortPipeline => { + let err = AilError::PipelineAborted { + detail: format!("Step '{step_id}' on_result fired abort_pipeline"), + context: Some(crate::error::ErrorContext::for_step( + &session.run_id, + step_id, + )), + }; + observer.on_pipeline_error(&err); + Err(err) + } + ResultAction::PauseForHuman => { + observer.on_result_pause(step_id, step.message.as_deref()); + Ok(LoopControl::Continue) + } + ResultAction::Pipeline { + ref path, + ref prompt, + } => { + let on_result_step_id = format!("{step_id}__on_result"); + let sub_entry = dispatch::sub_pipeline::execute_sub_pipeline( + path, + prompt.as_deref(), + &on_result_step_id, + session, + runner, + depth, + pipeline_base_dir, + ) + .inspect_err(|e| observer.on_pipeline_error(e))?; + session.turn_log.append(sub_entry); + Ok(LoopControl::Continue) + } + } +} + +/// Validate a join step's merged output against its `output_schema`. +fn validate_join_output_schema( + response: &str, + schema: &serde_json::Value, + step_id: &str, +) -> Result<(), AilError> { + let parsed: serde_json::Value = serde_json::from_str(response).map_err(|e| { + AilError::config_validation(format!( + "Join step '{step_id}' output is not valid JSON: {e}" + )) + })?; + let validator = jsonschema::validator_for(schema).map_err(|e| { + AilError::config_validation(format!( + "Join step '{step_id}' output_schema is not a valid JSON Schema: {e}" + )) + })?; + if !validator.is_valid(&parsed) { + return Err(AilError::OutputSchemaValidationFailed { + detail: format!("Join step '{step_id}' merged output does not match output_schema"), + context: None, + }); + } + Ok(()) +} diff --git a/ail-core/src/executor/dispatch/prompt.rs b/ail-core/src/executor/dispatch/prompt.rs index 966e257..8066fe1 100644 --- a/ail-core/src/executor/dispatch/prompt.rs +++ b/ail-core/src/executor/dispatch/prompt.rs @@ -21,7 +21,7 @@ pub(in crate::executor) fn execute( template_text: &str, step: &Step, session: &mut Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), step_id: &str, step_index: usize, total_steps: usize, diff --git a/ail-core/src/executor/dispatch/skill.rs b/ail-core/src/executor/dispatch/skill.rs index 36435f6..a7c2788 100644 --- a/ail-core/src/executor/dispatch/skill.rs +++ b/ail-core/src/executor/dispatch/skill.rs @@ -21,7 +21,7 @@ pub(in crate::executor) fn execute( skill_name: &str, step: &Step, session: &mut Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), step_id: &str, step_index: usize, total_steps: usize, diff --git a/ail-core/src/executor/dispatch/sub_pipeline.rs b/ail-core/src/executor/dispatch/sub_pipeline.rs index f9c79b9..7bcd05f 100644 --- a/ail-core/src/executor/dispatch/sub_pipeline.rs +++ b/ail-core/src/executor/dispatch/sub_pipeline.rs @@ -31,7 +31,7 @@ pub(in crate::executor) fn execute( prompt_override: Option<&str>, step_id: &str, session: &mut Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), depth: usize, base_dir: Option<&std::path::Path>, observer: &mut O, @@ -58,7 +58,7 @@ pub(in crate::executor) fn execute_sub_pipeline( prompt_override: Option<&str>, step_id: &str, session: &mut Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), depth: usize, base_dir: Option<&std::path::Path>, ) -> Result { @@ -154,7 +154,7 @@ pub(in crate::executor) fn execute_named( prompt_override: Option<&str>, step_id: &str, session: &mut Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), depth: usize, observer: &mut O, ) -> Result { @@ -183,7 +183,7 @@ pub(in crate::executor) fn execute_named_pipeline( prompt_override: Option<&str>, step_id: &str, session: &mut Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), depth: usize, visited: &mut HashSet, ) -> Result { diff --git a/ail-core/src/executor/headless.rs b/ail-core/src/executor/headless.rs index eb4cf96..20b6bfd 100644 --- a/ail-core/src/executor/headless.rs +++ b/ail-core/src/executor/headless.rs @@ -21,7 +21,10 @@ use super::events::ExecuteOutcome; /// /// SPEC §4.2 core invariant: once execution begins, all steps run in order. /// Early exit only via explicit declared outcomes — never silent failures. -pub fn execute(session: &mut Session, runner: &dyn Runner) -> Result { +pub fn execute( + session: &mut Session, + runner: &(dyn Runner + Sync), +) -> Result { execute_core(session, runner, &mut NullObserver, 0) } diff --git a/ail-core/src/executor/helpers/invocation.rs b/ail-core/src/executor/helpers/invocation.rs index a194437..52786e0 100644 --- a/ail-core/src/executor/helpers/invocation.rs +++ b/ail-core/src/executor/helpers/invocation.rs @@ -15,7 +15,7 @@ use crate::session::{Session, TurnEntry}; /// unconditionally runs the runner and logs the result. It does not recheck. pub fn run_invocation_step( session: &mut Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), prompt: &str, options: InvokeOptions, ) -> Result { diff --git a/ail-core/src/executor/mod.rs b/ail-core/src/executor/mod.rs index 882baf6..8eb096a 100644 --- a/ail-core/src/executor/mod.rs +++ b/ail-core/src/executor/mod.rs @@ -10,6 +10,7 @@ mod dispatch; mod events; mod headless; mod helpers; +mod parallel; pub use controlled::execute_with_control; pub use events::{ExecuteOutcome, ExecutionControl, ExecutorEvent}; diff --git a/ail-core/src/executor/parallel.rs b/ail-core/src/executor/parallel.rs new file mode 100644 index 0000000..95bb8ce --- /dev/null +++ b/ail-core/src/executor/parallel.rs @@ -0,0 +1,354 @@ +//! Parallel step execution (SPEC §29). +//! +//! This module coordinates concurrent execution of `async: true` steps, the +//! synchronization at `action: join` barriers, and the merging of branch +//! results back into the main session's turn log. +//! +//! # Threading model +//! +//! Uses [`std::thread::scope`] — scoped threads guarantee that borrowed +//! references (like `&dyn Runner`) cannot outlive the scope, so we don't need +//! `Arc` wrapping. Each async branch runs on its own thread with a forked +//! [`Session`] produced by [`Session::fork_for_branch`]. +//! +//! # Runner Sync requirement +//! +//! For `std::thread::scope`, spawned closures must be `Send`. A `&dyn Runner` +//! is `Send` only if `Runner: Sync`. All current runner implementations are +//! `Sync` by composition, so we declare the parallel entry points as taking +//! `&(dyn Runner + Sync)`. Top-level `execute_core` inherits the same bound. +//! +//! # Join merge behaviour (SPEC §29.4, §29.5) +//! +//! - **String join** (default): concatenate branch responses in declaration +//! order with `[step_id]:\n` headers. +//! - **Structured join**: when all branches declared `output_schema`, parse +//! each response as JSON and merge into a single object namespaced by step id. +//! +//! # Error handling (SPEC §29.7) +//! +//! - `JoinErrorMode::FailFast` (default): first branch failure cancels +//! in-flight siblings via a shared [`CancelToken`] and propagates the error. +//! - `JoinErrorMode::WaitForAll`: all branches run to completion; failed +//! branches contribute `{ "error": …, "error_type": … }` envelopes to the +//! merged output (structured-only). + +#![allow(clippy::result_large_err)] + +use std::collections::HashMap; +use std::sync::{Condvar, Mutex}; +use std::time::SystemTime; + +use uuid::Uuid; + +use crate::config::domain::{ActionKind, JoinErrorMode, Step, StepBody}; +use crate::error::AilError; +use crate::runner::CancelToken; +use crate::session::turn_log::TurnEntry; + +/// Simple counting semaphore used to enforce `defaults.max_concurrency` +/// (SPEC §29.10). Implemented with `Mutex + Condvar` — no external +/// dependency. +/// +/// `acquire()` blocks until a slot is free or the cancel token fires. +/// `release()` decrements the counter and wakes one waiter. +pub struct ConcurrencySemaphore { + max: usize, + state: Mutex, + cv: Condvar, +} + +impl ConcurrencySemaphore { + pub fn new(max: usize) -> Self { + ConcurrencySemaphore { + max, + state: Mutex::new(0), + cv: Condvar::new(), + } + } + + /// Acquire a slot. Blocks until `current < max` or the cancel token fires. + /// Returns `true` if a slot was acquired, `false` if cancelled before acquire. + pub fn acquire(&self, cancel: &CancelToken) -> bool { + let mut guard = match self.state.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + loop { + if cancel.is_cancelled() { + return false; + } + if *guard < self.max { + *guard += 1; + return true; + } + guard = match self + .cv + .wait_timeout(guard, std::time::Duration::from_millis(50)) + { + Ok((g, _)) => g, + Err(p) => p.into_inner().0, + }; + } + } + + pub fn release(&self) { + let mut guard = match self.state.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + if *guard > 0 { + *guard -= 1; + } + self.cv.notify_one(); + } +} + +/// Result of executing one async branch on a spawned thread. +pub struct BranchResult { + pub step_id: String, + /// The branch's outcome: the completed `TurnEntry` or an error. + pub outcome: Result, + pub launched_at: String, + pub completed_at: String, + /// Entries the branch produced (in addition to the outcome entry) — + /// useful if the branch has a `then:` chain (currently unused, kept for + /// future observability). + pub extra_entries: Vec, +} + +/// Generate a short concurrent-group ID (SPEC §29.8). +pub fn new_concurrent_group_id() -> String { + format!("cg-{}", &Uuid::new_v4().simple().to_string()[..8]) +} + +/// Render a `SystemTime` as an ISO 8601 UTC timestamp to millisecond +/// precision. Used for `launched_at` / `completed_at` on turn log entries. +pub fn iso8601(t: SystemTime) -> String { + let dur = t.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default(); + let secs = dur.as_secs(); + let millis = dur.subsec_millis(); + // Compute Y-M-D H:M:S in UTC without any external dep. + let (y, m, d, hh, mm, ss) = civil_from_secs(secs); + format!("{y:04}-{m:02}-{d:02}T{hh:02}:{mm:02}:{ss:02}.{millis:03}Z") +} + +/// Minimal civil calendar conversion for ISO 8601 timestamps. Avoids a +/// chrono dependency for a single formatting use case. Handles dates in +/// the range [1970-01-01, 9999-12-31]. +fn civil_from_secs(secs: u64) -> (u32, u32, u32, u32, u32, u32) { + let ss = (secs % 60) as u32; + let mm = ((secs / 60) % 60) as u32; + let hh = ((secs / 3600) % 24) as u32; + let mut days = (secs / 86400) as i64; + + // Shift from 1970-01-01 to 0000-03-01 (civil-from-days algorithm, Howard Hinnant). + days += 719468; + let era = days.div_euclid(146097); + let doe = days.rem_euclid(146097) as u64; + let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365; + let y = yoe as i64 + era * 400; + let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); + let mp = (5 * doy + 2) / 153; + let d = (doy - (153 * mp + 2) / 5 + 1) as u32; + let m = if mp < 10 { mp + 3 } else { mp - 9 } as u32; + let y = if m <= 2 { y + 1 } else { y }; + (y as u32, m, d, hh, mm, ss) +} + +/// True when the step is an `action: join` step. +pub fn is_join_step(step: &Step) -> bool { + matches!(step.body, StepBody::Action(ActionKind::Join { .. })) +} + +/// Extract the join's error mode. Returns `None` when the step is not a join. +pub fn join_error_mode(step: &Step) -> Option<&JoinErrorMode> { + match &step.body { + StepBody::Action(ActionKind::Join { on_error_mode }) => Some(on_error_mode), + _ => None, + } +} + +/// Merge collected branch results into a single `TurnEntry` representing +/// the join step's output. Implements both string join (§29.4) and +/// structured join (§29.5). +/// +/// `deps_in_order` is the join's `depends_on` list in declaration order — +/// used to order the merged output. +pub fn merge_join_results( + join_step: &Step, + deps_in_order: &[&str], + branches: &[BranchResult], + on_error_mode: &JoinErrorMode, +) -> Result { + // Build a lookup map step_id → BranchResult. + let branch_map: HashMap<&str, &BranchResult> = + branches.iter().map(|b| (b.step_id.as_str(), b)).collect(); + + // Check if every declared dependency has output_schema — if so, use + // structured join. Otherwise, string join. + // (Parse-time validation already enforced all-or-nothing for structured + // joins declaring output_schema on the join itself.) + let all_structured = deps_in_order.iter().all(|dep_id| { + branch_map + .get(dep_id) + .and_then(|b| b.outcome.as_ref().ok()) + .map(|_| { + // Look up the dep step's output_schema from the pipeline's + // step list. We can't do that here without the pipeline — + // instead, treat "structured" as "join declares output_schema". + join_step.output_schema.is_some() + }) + .unwrap_or(false) + }); + + let use_structured = join_step.output_schema.is_some() && all_structured; + + let response = if use_structured { + merge_structured(deps_in_order, &branch_map, on_error_mode)? + } else { + merge_string(deps_in_order, &branch_map, on_error_mode)? + }; + + let earliest_launch = branches + .iter() + .map(|b| b.launched_at.clone()) + .min() + .unwrap_or_default(); + let latest_complete = branches + .iter() + .map(|b| b.completed_at.clone()) + .max() + .unwrap_or_default(); + + Ok(TurnEntry { + step_id: join_step.id.as_str().to_string(), + prompt: format!("join:[{}]", deps_in_order.join(",")), + response: Some(response), + launched_at: Some(earliest_launch), + completed_at: Some(latest_complete), + ..Default::default() + }) +} + +/// String join (SPEC §29.4): concatenate branch responses with `[step_id]:` +/// headers in declaration order. +fn merge_string( + deps_in_order: &[&str], + branches: &HashMap<&str, &BranchResult>, + on_error_mode: &JoinErrorMode, +) -> Result { + let mut out = String::new(); + for (i, dep_id) in deps_in_order.iter().enumerate() { + if i > 0 { + out.push('\n'); + } + let br = branches.get(dep_id).copied(); + match br.map(|b| &b.outcome) { + Some(Ok(entry)) => { + let resp = entry.response.as_deref().unwrap_or(""); + out.push_str(&format!("[{dep_id}]:\n{resp}\n")); + } + Some(Err(e)) => { + if matches!(on_error_mode, JoinErrorMode::FailFast) { + return Err(AilError::pipeline_aborted(format!( + "Join dependency '{dep_id}' failed (fail_fast): {}", + e.detail() + ))); + } + out.push_str(&format!("[{dep_id}]:\n\n", e.detail())); + } + None => { + out.push_str(&format!("[{dep_id}]:\n\n")); + } + } + } + Ok(out) +} + +/// Structured join (SPEC §29.5): merge each branch's structured response into +/// a single JSON object keyed by step id. +fn merge_structured( + deps_in_order: &[&str], + branches: &HashMap<&str, &BranchResult>, + on_error_mode: &JoinErrorMode, +) -> Result { + use serde_json::{Map, Value}; + let mut merged = Map::new(); + + for dep_id in deps_in_order { + let br = branches.get(dep_id).copied(); + match br.map(|b| &b.outcome) { + Some(Ok(entry)) => { + let resp = entry.response.as_deref().unwrap_or(""); + let parsed: Value = serde_json::from_str(resp).map_err(|e| { + AilError::config_validation(format!( + "Join dependency '{dep_id}' did not return valid JSON despite declaring \ + output_schema: {e}" + )) + })?; + merged.insert(dep_id.to_string(), parsed); + } + Some(Err(e)) => { + if matches!(on_error_mode, JoinErrorMode::FailFast) { + return Err(AilError::pipeline_aborted(format!( + "Join dependency '{dep_id}' failed (fail_fast): {}", + e.detail() + ))); + } + // wait_for_all: emit an error envelope for this dependency. + let mut env = Map::new(); + env.insert("error".to_string(), Value::String(e.detail().to_string())); + env.insert( + "error_type".to_string(), + Value::String(e.error_type().to_string()), + ); + merged.insert(dep_id.to_string(), Value::Object(env)); + } + None => { + if matches!(on_error_mode, JoinErrorMode::FailFast) { + return Err(AilError::pipeline_aborted(format!( + "Join dependency '{dep_id}' produced no result" + ))); + } + merged.insert(dep_id.to_string(), Value::Null); + } + } + } + + serde_json::to_string(&Value::Object(merged)).map_err(|e| { + AilError::config_validation(format!("Failed to serialize structured join: {e}")) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn iso8601_formats_epoch() { + let s = iso8601(SystemTime::UNIX_EPOCH); + assert_eq!(s, "1970-01-01T00:00:00.000Z"); + } + + #[test] + fn iso8601_monotonic() { + let t1 = iso8601(SystemTime::UNIX_EPOCH); + let t2 = iso8601(SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(86_400)); + assert!(t2 > t1); + assert_eq!(t2, "1970-01-02T00:00:00.000Z"); + } + + #[test] + fn semaphore_limits_concurrency() { + let sem = ConcurrencySemaphore::new(2); + let ct = CancelToken::new(); + assert!(sem.acquire(&ct)); + assert!(sem.acquire(&ct)); + // Third acquire would block, so don't test it here without threads. + sem.release(); + assert!(sem.acquire(&ct)); + sem.release(); + sem.release(); + } +} diff --git a/ail-core/src/runner/factory.rs b/ail-core/src/runner/factory.rs index 7905c0b..448cf57 100644 --- a/ail-core/src/runner/factory.rs +++ b/ail-core/src/runner/factory.rs @@ -54,7 +54,7 @@ impl RunnerFactory { headless: bool, http_store: &HttpSessionStore, provider: &ProviderConfig, - ) -> Result, AilError> { + ) -> Result, AilError> { Self::build_with_registry( runner_name, headless, @@ -76,7 +76,7 @@ impl RunnerFactory { http_store: &HttpSessionStore, provider: &ProviderConfig, registry: &PluginRegistry, - ) -> Result, AilError> { + ) -> Result, AilError> { let normalized = runner_name.trim().to_lowercase(); match normalized.as_str() { "claude" => { @@ -154,7 +154,7 @@ impl RunnerFactory { headless: bool, http_store: &HttpSessionStore, provider: &ProviderConfig, - ) -> Result, AilError> { + ) -> Result, AilError> { let name = std::env::var("AIL_DEFAULT_RUNNER").unwrap_or_else(|_| "claude".to_string()); Self::build(&name, headless, http_store, provider) } @@ -165,7 +165,7 @@ impl RunnerFactory { http_store: &HttpSessionStore, provider: &ProviderConfig, registry: &PluginRegistry, - ) -> Result, AilError> { + ) -> Result, AilError> { let name = std::env::var("AIL_DEFAULT_RUNNER").unwrap_or_else(|_| "claude".to_string()); Self::build_with_registry(&name, headless, http_store, provider, registry) } diff --git a/ail-core/src/session/state.rs b/ail-core/src/session/state.rs index f424370..3fe2631 100644 --- a/ail-core/src/session/state.rs +++ b/ail-core/src/session/state.rs @@ -146,6 +146,53 @@ impl Session { self.turn_log.record_run_started(pipeline_source); self } + + /// Fork a branch session for parallel async step execution (SPEC §29.9). + /// + /// The branch gets: + /// - A fresh `TurnLog` seeded with a clone of the parent's existing entries + /// (so template resolution of prior steps works inside the branch). + /// - `NullProvider` for log persistence — branch results are collected and + /// merged back into the main session's turn log after the join barrier. + /// - The same `run_id`, `invocation_prompt`, `cwd`, `runner_name`, `headless`, + /// `cli_provider` as the parent. + /// - A fresh `http_session_store` when `isolated_http` is true (for + /// `resume: false` async steps that opt out of context inheritance). + /// Otherwise shares the parent's store. + /// - Cleared loop contexts — loop bodies spawning async steps is an + /// unsupported edge case the spec defers; branches run sequentially inside. + pub fn fork_for_branch(&self, isolated_http: bool) -> Session { + let entries: Vec = self.turn_log.entries().to_vec(); + + let mut turn_log = TurnLog::with_provider( + self.run_id.clone(), + Box::new(super::log_provider::NullProvider), + ); + for e in entries { + turn_log.append(e); + } + + let http_session_store = if isolated_http { + Arc::new(Mutex::new(HashMap::new())) + } else { + self.http_session_store.clone() + }; + + Session { + run_id: self.run_id.clone(), + pipeline: self.pipeline.clone(), + invocation_prompt: self.invocation_prompt.clone(), + turn_log, + cli_provider: self.cli_provider.clone(), + cwd: self.cwd.clone(), + runner_name: self.runner_name.clone(), + headless: self.headless, + http_session_store, + do_while_context: self.do_while_context.clone(), + for_each_context: self.for_each_context.clone(), + loop_depth: self.loop_depth, + } + } } #[cfg(test)] diff --git a/ail-core/src/session/turn_log.rs b/ail-core/src/session/turn_log.rs index ab66a58..7dd032c 100644 --- a/ail-core/src/session/turn_log.rs +++ b/ail-core/src/session/turn_log.rs @@ -41,7 +41,7 @@ impl From<&SamplingConfig> for TurnEntrySampling { } } -#[derive(Serialize)] +#[derive(Serialize, Clone)] pub struct TurnEntry { pub step_id: String, pub prompt: String, diff --git a/ail-core/src/template.rs b/ail-core/src/template.rs index 85abbd3..7ff568d 100644 --- a/ail-core/src/template.rs +++ b/ail-core/src/template.rs @@ -154,6 +154,14 @@ fn resolve_variable(variable: &str, session: &Session) -> Result..` — walk JSON path after + // ``'s response, if that step's response is JSON. + if let Some(v) = resolve_join_dotted_path(session, rest) { + return Ok(v); + } + // Return the original error. return result; } @@ -258,3 +266,56 @@ fn resolve_step_field( ))), } } + +/// Resolve a dotted-path reference into a join step's structured response +/// (SPEC §29.6). Given `......`, tries progressively +/// shorter prefixes — ``, `.`, etc. — until one matches +/// a step with a JSON response, then walks the remaining path through the +/// JSON value. +/// +/// Returns `None` if no prefix matches a step with JSON content, or if the +/// path does not exist inside the JSON. The caller falls back to its +/// original error. +fn resolve_join_dotted_path(session: &Session, rest: &str) -> Option { + // `rest` is everything after `step.` — e.g. "checks_done.lint.clean". + let parts: Vec<&str> = rest.split('.').collect(); + if parts.len() < 3 { + return None; + } + + // Try splits: `parts[0]` as step id, rest as JSON path; then + // `parts[0..2]` as step id, `parts[2..]` as path; etc. + for split_at in 1..parts.len() { + let step_id = parts[..split_at].join("."); + let path = &parts[split_at..]; + let response = session.turn_log.response_for_step(&step_id)?; + let json: serde_json::Value = match serde_json::from_str(response) { + Ok(v) => v, + Err(_) => continue, + }; + if let Some(value) = walk_json_path(&json, path) { + return Some(render_json_leaf(&value)); + } + } + None +} + +/// Walk a dotted-path sequence through a JSON value. Returns the final +/// value, or `None` if any segment is missing. +fn walk_json_path(root: &serde_json::Value, path: &[&str]) -> Option { + let mut cur = root.clone(); + for seg in path { + cur = cur.get(*seg).cloned()?; + } + Some(cur) +} + +/// Render a JSON leaf as a plain string for template substitution. +/// Strings are returned verbatim (no quotes); other scalars use JSON +/// canonical form. +fn render_json_leaf(v: &serde_json::Value) -> String { + match v { + serde_json::Value::String(s) => s.clone(), + other => other.to_string(), + } +} diff --git a/ail-core/tests/fixtures/parallel_basic.ail.yaml b/ail-core/tests/fixtures/parallel_basic.ail.yaml index 5f388ef..2679e6a 100644 --- a/ail-core/tests/fixtures/parallel_basic.ail.yaml +++ b/ail-core/tests/fixtures/parallel_basic.ail.yaml @@ -15,7 +15,7 @@ pipeline: on_result: - contains: "FAIL" action: abort_pipeline - - always: + - always: true action: continue - id: ship diff --git a/ail-core/tests/spec/mod.rs b/ail-core/tests/spec/mod.rs index 4a20d28..2fe95cf 100644 --- a/ail-core/tests/spec/mod.rs +++ b/ail-core/tests/spec/mod.rs @@ -32,6 +32,7 @@ mod s26_output_schema; mod s26_s27_s28_integration; mod s27_do_while; mod s28_for_each; +mod s29_parallel; mod s30_sampling; mod s35_ail_log_formatter; mod s39_consistency; diff --git a/ail-core/tests/spec/s08_multi_runner.rs b/ail-core/tests/spec/s08_multi_runner.rs index 81785e8..d026617 100644 --- a/ail-core/tests/spec/s08_multi_runner.rs +++ b/ail-core/tests/spec/s08_multi_runner.rs @@ -15,7 +15,7 @@ fn test_provider() -> ProviderConfig { ProviderConfig::default() } -fn build(name: &str) -> Result, ail_core::error::AilError> { +fn build(name: &str) -> Result, ail_core::error::AilError> { RunnerFactory::build(name, false, &test_store(), &test_provider()) } diff --git a/ail-core/tests/spec/s29_parallel.rs b/ail-core/tests/spec/s29_parallel.rs new file mode 100644 index 0000000..8ae03a3 --- /dev/null +++ b/ail-core/tests/spec/s29_parallel.rs @@ -0,0 +1,569 @@ +//! SPEC §29 — parallel step execution. + +// ── Parse-time validation ─────────────────────────────────────────────────── + +mod parse { + use ail_core::config; + use ail_core::config::domain::{ActionKind, JoinErrorMode, StepBody}; + + fn load(yaml: &str) -> Result { + let tmp = tempfile::NamedTempFile::with_suffix(".ail.yaml").unwrap(); + std::fs::write(tmp.path(), yaml).unwrap(); + config::load(tmp.path()) + } + + #[test] + fn basic_async_plus_join_parses() { + let yaml = r#" +version: "0.1" +pipeline: + - id: lint + async: true + prompt: "Run lint" + - id: test + async: true + prompt: "Run tests" + - id: done + depends_on: [lint, test] + action: join +"#; + let pipeline = load(yaml).expect("should parse"); + assert!(pipeline.steps[0].async_step); + assert!(pipeline.steps[1].async_step); + assert_eq!(pipeline.steps[2].depends_on.len(), 2); + assert!(matches!( + pipeline.steps[2].body, + StepBody::Action(ActionKind::Join { .. }) + )); + } + + #[test] + fn join_wait_for_all_mode_parses() { + let yaml = r#" +version: "0.1" +pipeline: + - id: a + async: true + prompt: "a" + - id: done + depends_on: [a] + action: join + on_error: wait_for_all +"#; + let pipeline = load(yaml).expect("should parse"); + if let StepBody::Action(ActionKind::Join { on_error_mode }) = &pipeline.steps[1].body { + assert_eq!(*on_error_mode, JoinErrorMode::WaitForAll); + } else { + panic!("expected Join body"); + } + } + + #[test] + fn max_concurrency_parses() { + let yaml = r#" +version: "0.1" +defaults: + max_concurrency: 4 +pipeline: + - id: a + async: true + prompt: "a" + - id: done + depends_on: [a] + action: join +"#; + let pipeline = load(yaml).expect("should parse"); + assert_eq!(pipeline.max_concurrency, Some(4)); + } + + #[test] + fn orphaned_async_step_is_parse_error() { + let yaml = r#" +version: "0.1" +pipeline: + - id: orphan + async: true + prompt: "unreferenced" + - id: ship + prompt: "done" +"#; + let err = load(yaml).expect_err("should reject orphaned async"); + assert!( + err.detail().contains("not named in any step's depends_on"), + "got: {}", + err.detail() + ); + } + + #[test] + fn join_without_depends_on_is_parse_error() { + let yaml = r#" +version: "0.1" +pipeline: + - id: bad + action: join +"#; + let err = load(yaml).expect_err("should reject join without depends_on"); + assert!( + err.detail().contains("no depends_on list"), + "got: {}", + err.detail() + ); + } + + #[test] + fn forward_reference_in_depends_on_is_parse_error() { + let yaml = r#" +version: "0.1" +pipeline: + - id: first + depends_on: [later] + action: join + - id: later + async: true + prompt: "x" +"#; + let err = load(yaml).expect_err("should reject forward ref"); + // The error can come from either forward-ref check or cycle detection; + // both are valid — the point is the pipeline is rejected. + let d = err.detail(); + assert!( + d.contains("declared later") || d.contains("not declared"), + "got: {d}" + ); + } + + #[test] + fn cycle_in_depends_on_is_parse_error() { + // Both steps forward-ref each other — caught as forward-ref first. + let yaml = r#" +version: "0.1" +pipeline: + - id: a + depends_on: [b] + prompt: "a" + - id: b + depends_on: [a] + prompt: "b" +"#; + let err = load(yaml).expect_err("should reject"); + assert!(!err.detail().is_empty()); + } + + #[test] + fn concurrent_resume_conflict_is_parse_error() { + let yaml = r#" +version: "0.1" +pipeline: + - id: a + async: true + resume: true + prompt: "a" + - id: b + async: true + resume: true + prompt: "b" + - id: done + depends_on: [a, b] + action: join +"#; + let err = load(yaml).expect_err("should reject concurrent resume conflict"); + assert!( + err.detail().contains("cannot share a runner session"), + "got: {}", + err.detail() + ); + } + + #[test] + fn structured_join_mixed_dependencies_is_parse_error() { + let yaml = r#" +version: "0.1" +pipeline: + - id: a + async: true + prompt: "a" + output_schema: + type: object + properties: + ok: { type: boolean } + required: [ok] + - id: b + async: true + prompt: "b" + - id: done + depends_on: [a, b] + action: join + output_schema: + type: object + properties: + a: { type: object } + b: { type: object } +"#; + let err = load(yaml).expect_err("should reject mixed structured/unstructured"); + assert!( + err.detail().contains("does not declare output_schema"), + "got: {}", + err.detail() + ); + } +} + +// ── Runtime execution ─────────────────────────────────────────────────────── + +mod execution { + use ail_core::config::domain::{Pipeline, Step, StepBody, StepId}; + use ail_core::executor::execute; + use ail_core::runner::stub::{RecordingStubRunner, StubRunner}; + use ail_core::session::log_provider::NullProvider; + use ail_core::session::Session; + + fn prompt_step(id: &str, text: &str) -> Step { + Step { + id: StepId(id.to_string()), + body: StepBody::Prompt(text.to_string()), + ..Default::default() + } + } + + fn make_session(pipeline: Pipeline) -> Session { + Session::new(pipeline, "invocation".to_string()).with_log_provider(Box::new(NullProvider)) + } + + fn load_inline(yaml: &str) -> Pipeline { + let tmp = tempfile::NamedTempFile::with_suffix(".ail.yaml").unwrap(); + std::fs::write(tmp.path(), yaml).unwrap(); + ail_core::config::load(tmp.path()).expect("valid pipeline") + } + + #[test] + fn two_async_steps_and_join_produce_merged_entry() { + let yaml = r#" +version: "0.1" +pipeline: + - id: a + async: true + prompt: "run a" + - id: b + async: true + prompt: "run b" + - id: done + depends_on: [a, b] + action: join +"#; + let pipeline = load_inline(yaml); + let mut session = make_session(pipeline); + let runner = StubRunner::new("ok"); + execute(&mut session, &runner).expect("pipeline runs"); + + let entries = session.turn_log.entries(); + let ids: Vec<&str> = entries.iter().map(|e| e.step_id.as_str()).collect(); + assert!(ids.contains(&"a"), "missing branch a in {ids:?}"); + assert!(ids.contains(&"b"), "missing branch b in {ids:?}"); + assert!(ids.contains(&"done"), "missing join in {ids:?}"); + + // Branch entries carry concurrent_group metadata. + for id in &["a", "b"] { + let e = entries.iter().find(|e| e.step_id == *id).unwrap(); + assert!( + e.concurrent_group.is_some(), + "branch {id} should have concurrent_group" + ); + assert!(e.launched_at.is_some()); + assert!(e.completed_at.is_some()); + } + + // Join entry's response is the string-join concatenation. + let join_entry = entries.iter().find(|e| e.step_id == "done").unwrap(); + let resp = join_entry.response.as_deref().unwrap_or(""); + assert!(resp.contains("[a]:"), "missing [a] label in {resp}"); + assert!(resp.contains("[b]:"), "missing [b] label in {resp}"); + assert!(resp.contains("ok"), "missing branch response in {resp}"); + } + + #[test] + fn both_async_branches_invoke_runner() { + let yaml = r#" +version: "0.1" +pipeline: + - id: a + async: true + prompt: "branch a" + - id: b + async: true + prompt: "branch b" + - id: done + depends_on: [a, b] + action: join +"#; + let pipeline = load_inline(yaml); + let mut session = make_session(pipeline); + let runner = RecordingStubRunner::new("ok"); + execute(&mut session, &runner).expect("pipeline runs"); + + let calls = runner.calls(); + // Two async branches → exactly 2 runner invocations. + assert_eq!(calls.len(), 2, "expected 2 calls, got {}", calls.len()); + let prompts: Vec<&str> = calls.iter().map(|c| c.prompt.as_str()).collect(); + assert!(prompts.contains(&"branch a"), "got {prompts:?}"); + assert!(prompts.contains(&"branch b"), "got {prompts:?}"); + } + + #[test] + fn string_join_preserves_declaration_order() { + let yaml = r#" +version: "0.1" +pipeline: + - id: first + async: true + prompt: "1" + - id: second + async: true + prompt: "2" + - id: third + async: true + prompt: "3" + - id: done + depends_on: [first, second, third] + action: join +"#; + let pipeline = load_inline(yaml); + let mut session = make_session(pipeline); + let runner = StubRunner::new("resp"); + execute(&mut session, &runner).expect("pipeline runs"); + + let join_entry = session + .turn_log + .entries() + .iter() + .find(|e| e.step_id == "done") + .cloned() + .unwrap(); + let resp = join_entry.response.unwrap(); + let first_pos = resp.find("[first]:").unwrap(); + let second_pos = resp.find("[second]:").unwrap(); + let third_pos = resp.find("[third]:").unwrap(); + assert!(first_pos < second_pos); + assert!(second_pos < third_pos); + } + + #[test] + fn sequential_step_after_async_sees_join_result() { + let yaml = r#" +version: "0.1" +pipeline: + - id: a + async: true + prompt: "a" + - id: done + depends_on: [a] + action: join + - id: after + prompt: "received: {{ step.done.response }}" +"#; + let pipeline = load_inline(yaml); + let mut session = make_session(pipeline); + let runner = RecordingStubRunner::new("branch response"); + execute(&mut session, &runner).expect("pipeline runs"); + + let calls = runner.calls(); + // 1 async branch + 1 sequential after = 2 invocations. + let after_call = calls.iter().find(|c| c.prompt.starts_with("received:")); + let after = after_call.expect("after step should have been invoked"); + assert!( + after.prompt.contains("[a]:"), + "join response not substituted into after step: {}", + after.prompt + ); + } + + #[test] + fn condition_never_on_async_step_skips_but_unblocks_join() { + let yaml = r#" +version: "0.1" +pipeline: + - id: a + async: true + condition: never + prompt: "a" + - id: b + async: true + prompt: "b" + - id: done + depends_on: [a, b] + action: join +"#; + let pipeline = load_inline(yaml); + let mut session = make_session(pipeline); + let runner = RecordingStubRunner::new("ok"); + execute(&mut session, &runner).expect("pipeline runs"); + + let calls = runner.calls(); + // Only b runs — a is condition-skipped. + assert_eq!(calls.len(), 1); + assert_eq!(calls[0].prompt, "b"); + + // Join still completes. + let entries = session.turn_log.entries(); + assert!(entries.iter().any(|e| e.step_id == "done")); + } + + #[test] + fn on_result_contains_on_join_step_fires() { + let yaml = r#" +version: "0.1" +pipeline: + - id: lint + async: true + prompt: "lint" + - id: done + depends_on: [lint] + action: join + on_result: + - contains: "FAIL" + action: abort_pipeline + - always: true + action: continue + - id: ship + prompt: "ship" +"#; + let pipeline = load_inline(yaml); + + // Case A: no FAIL → ship runs. + let mut s1 = make_session(pipeline.clone()); + let ok_runner = StubRunner::new("all good"); + execute(&mut s1, &ok_runner).expect("pipeline runs"); + assert!(s1.turn_log.entries().iter().any(|e| e.step_id == "ship")); + + // Case B: FAIL → aborts before ship. + let mut s2 = make_session(pipeline); + let fail_runner = StubRunner::new("FAIL: broken"); + let res = execute(&mut s2, &fail_runner); + assert!(res.is_err(), "expected abort; got {res:?}"); + assert!(!s2.turn_log.entries().iter().any(|e| e.step_id == "ship")); + } + + #[test] + fn sequential_only_pipeline_still_works() { + // Guard against regressions in the non-async path. + let pipeline = Pipeline { + steps: vec![prompt_step("a", "hello"), prompt_step("b", "world")], + ..Default::default() + }; + let mut session = make_session(pipeline); + let runner = StubRunner::new("ok"); + execute(&mut session, &runner).expect("pipeline runs"); + let entries = session.turn_log.entries(); + assert_eq!(entries.len(), 2); + assert_eq!(entries[0].step_id, "a"); + assert_eq!(entries[1].step_id, "b"); + } + + #[test] + fn max_concurrency_1_serializes_branches() { + // max_concurrency: 1 forces branches to run one-at-a-time. The + // result should still be correct. + let yaml = r#" +version: "0.1" +defaults: + max_concurrency: 1 +pipeline: + - id: a + async: true + prompt: "a" + - id: b + async: true + prompt: "b" + - id: c + async: true + prompt: "c" + - id: done + depends_on: [a, b, c] + action: join +"#; + let pipeline = load_inline(yaml); + let mut session = make_session(pipeline); + let runner = RecordingStubRunner::new("ok"); + execute(&mut session, &runner).expect("pipeline runs"); + let calls = runner.calls(); + assert_eq!(calls.len(), 3); + } + + #[test] + fn turn_log_concurrent_group_shared_across_branches() { + let yaml = r#" +version: "0.1" +pipeline: + - id: a + async: true + prompt: "a" + - id: b + async: true + prompt: "b" + - id: done + depends_on: [a, b] + action: join +"#; + let pipeline = load_inline(yaml); + let mut session = make_session(pipeline); + let runner = StubRunner::new("ok"); + execute(&mut session, &runner).expect("pipeline runs"); + + let entries = session.turn_log.entries(); + let a_group = entries + .iter() + .find(|e| e.step_id == "a") + .and_then(|e| e.concurrent_group.clone()) + .unwrap(); + let b_group = entries + .iter() + .find(|e| e.step_id == "b") + .and_then(|e| e.concurrent_group.clone()) + .unwrap(); + assert_eq!(a_group, b_group, "branches should share concurrent_group"); + } +} + +// ── Integration: fixtures round-trip ──────────────────────────────────────── + +mod fixtures { + use ail_core::config; + + #[test] + fn parallel_basic_fixture_parses() { + let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) + .join("tests/fixtures/parallel_basic.ail.yaml"); + let p = config::load(&path).expect("should parse"); + assert_eq!(p.steps.len(), 4); + } + + fn fixture(name: &str) -> std::path::PathBuf { + std::path::Path::new(env!("CARGO_MANIFEST_DIR")) + .join("tests/fixtures") + .join(name) + } + + #[test] + fn parallel_structured_fixture_parses() { + let p = config::load(&fixture("parallel_structured.ail.yaml")).expect("should parse"); + assert_eq!(p.steps.len(), 3); + } + + #[test] + fn parallel_invalid_orphan_fixture_rejected() { + let res = config::load(&fixture("parallel_invalid_orphan.ail.yaml")); + assert!(res.is_err()); + } + + #[test] + fn parallel_invalid_join_no_deps_fixture_rejected() { + let res = config::load(&fixture("parallel_invalid_join_no_deps.ail.yaml")); + assert!(res.is_err()); + } + + #[test] + fn parallel_invalid_forward_ref_fixture_rejected() { + let res = config::load(&fixture("parallel_invalid_forward_ref.ail.yaml")); + assert!(res.is_err()); + } +} diff --git a/ail/src/chat.rs b/ail/src/chat.rs index 3bf53be..fe687ae 100644 --- a/ail/src/chat.rs +++ b/ail/src/chat.rs @@ -57,7 +57,7 @@ fn emit(value: &serde_json::Value) { async fn run_turn_stream( pipeline: Pipeline, cli_provider: &ProviderConfig, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), prompt: &str, resume_session_id: Option<&str>, turn_id: &str, @@ -205,7 +205,7 @@ async fn run_turn_stream( pub async fn run_chat_stream( pipeline: Pipeline, cli_provider: ProviderConfig, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), initial_message: Option, ) -> Result<(), String> { let chat_session_id = Uuid::new_v4().to_string(); @@ -364,7 +364,7 @@ pub async fn run_chat_stream( pub fn run_chat_text( pipeline: Pipeline, cli_provider: ProviderConfig, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), initial_message: Option, ) -> Result<(), String> { use std::io::BufRead; diff --git a/ail/src/dry_run.rs b/ail/src/dry_run.rs index e60c0ad..657dc1e 100644 --- a/ail/src/dry_run.rs +++ b/ail/src/dry_run.rs @@ -9,7 +9,11 @@ use ail_core::runner::{InvokeOptions, Runner}; /// /// Output clearly labels each step with `[DRY RUN]` and shows the resolved prompt /// or shell command that would be sent. -pub fn run_dry_run(session: &mut ail_core::session::Session, runner: &dyn Runner, prompt: &str) { +pub fn run_dry_run( + session: &mut ail_core::session::Session, + runner: &(dyn Runner + Sync), + prompt: &str, +) { println!("[DRY RUN] Pipeline: {}", pipeline_label(session)); println!("[DRY RUN] Run ID: {}", session.run_id); println!("[DRY RUN] Invocation prompt: {}", truncate(prompt, 200)); diff --git a/ail/src/once_json.rs b/ail/src/once_json.rs index bb9ce65..06567e3 100644 --- a/ail/src/once_json.rs +++ b/ail/src/once_json.rs @@ -12,7 +12,7 @@ use crate::control_bridge; /// as one JSON line. The invocation step (if host-managed) is also emitted as events. pub async fn run_once_json( session: &mut ail_core::session::Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), prompt: &str, ) { use ail_core::executor::ExecutionControl; diff --git a/ail/src/once_text.rs b/ail/src/once_text.rs index 0f17765..8dc4385 100644 --- a/ail/src/once_text.rs +++ b/ail/src/once_text.rs @@ -7,7 +7,7 @@ use ail_core::runner::{InvokeOptions, Runner}; /// per-step progress, thinking blocks, and/or responses as they arrive. pub fn run_once_text( session: &mut ail_core::session::Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), prompt: &str, show_thinking: bool, watch: bool, @@ -54,7 +54,7 @@ pub fn run_once_text( /// Lean/quiet path: no per-step output, just print the final response(s), with a subtle footer. fn run_once_text_quiet( session: &mut ail_core::session::Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), has_invocation_step: bool, run_start: std::time::Instant, ) { @@ -106,7 +106,7 @@ fn run_once_text_quiet( /// Show-work summary mode: print one line per completed step after execution. fn run_once_text_show_work( session: &mut ail_core::session::Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), run_start: std::time::Instant, ) { match ail_core::executor::execute(session, runner) { @@ -166,7 +166,7 @@ fn run_once_text_show_work( /// never blocks; events are drained after it returns. fn run_once_text_verbose( session: &mut ail_core::session::Session, - runner: &dyn Runner, + runner: &(dyn Runner + Sync), show_thinking: bool, watch: bool, ) { diff --git a/spec/README.md b/spec/README.md index 52fe41d..292b1cf 100644 --- a/spec/README.md +++ b/spec/README.md @@ -51,7 +51,7 @@ The AIL Pipeline Language Specification — for pipeline authors and implementer | [s26-output-schema.md](core/s26-output-schema.md) | §26 Structured Step I/O Schemas | `output_schema` / `input_schema`; JSON Schema compliance (`$schema` field selects draft, defaults to Draft 7); file-path or inline block; parse-time compatibility check; `field:` + `equals:` in `on_result`; array access via `{{ step..items }}`; provider compatibility | **draft** | | [s27-do-while.md](core/s27-do-while.md) | §27 `do_while:` — Bounded Repeat-Until | Bounded generate→test→fix loop; `max_iterations` (required), `exit_when` (§12.2 syntax), `on_max_iterations`; step namespacing (`::`); iteration scope; turn log events; executor events | **draft** | | [s28-for-each.md](core/s28-for-each.md) | §28 `for_each:` — Collection Iteration | Map steps over a validated array from a prior `output_schema: type: array` step; `over`, `as`, `max_items`, `on_max_items`; plan-execution pattern; requires §26 | **v0.3** | -| [s29-parallel-execution.md](core/s29-parallel-execution.md) | §29 Parallel Step Execution | `async: true`, `depends_on:`, `action: join`; session fork model; structured join with `output_schema` namespacing; `on_error: fail_fast \| wait_for_all`; cancel signals; template scoping rules; turn log concurrent_group | **planned** — design complete (#117) | +| [s29-parallel-execution.md](core/s29-parallel-execution.md) | §29 Parallel Step Execution | `async: true`, `depends_on:`, `action: join`; session fork model; structured join with `output_schema` namespacing; `on_error: fail_fast \| wait_for_all`; cancel signals; template scoping rules; turn log concurrent_group | **v0.3** — parallel dispatch, session forking, string+structured joins, all validation rules, 23 tests | | [s30-sampling-parameters.md](core/s30-sampling-parameters.md) | §30 Sampling Parameter Control | `sampling:` block at three scopes (pipeline / provider-attached / per-step); temperature, top_p, top_k, max_tokens, stop_sequences, thinking; field-level merge; stop_sequences replace semantics; runner-specific quantization of `thinking` (ClaudeCLI `--effort`, HTTP boolean, ail-native passthrough) | **v0.3** — spec + all runners + tests | --- diff --git a/spec/core/s29-parallel-execution.md b/spec/core/s29-parallel-execution.md index 56c16de..ecf38cc 100644 --- a/spec/core/s29-parallel-execution.md +++ b/spec/core/s29-parallel-execution.md @@ -1,6 +1,8 @@ ## 29. Parallel Step Execution -> **Implementation status:** Planned — design complete (issue #117). Not yet implemented. +> **Implementation status:** Implemented in v0.3 (issue #117). Uses `std::thread::scope` for +> scoped-thread dispatch. `do_while[N]` indexed access (§29.12 intersection with §27.4) and +> controlled-mode executor events for async launches are deferred. Independent pipeline steps running sequentially is pure waste — lint and test do not depend on each other. Parallel step execution allows independent steps to run concurrently, with explicit synchronization points that merge results and gate further execution. This also unlocks multi-provider comparison patterns and fan-out/fan-in workflows.