Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## v0.3 — in progress

### What's new

- **Parallel step execution (SPEC §29, #117)** — `async: true` marks non-blocking steps; `depends_on: [...]` declares dependencies; `action: join` synchronizes and merges branch results. String join (default, labelled headers in declaration order) and structured join (JSON merge when all deps declare `output_schema`) with optional `output_schema` validation on the join. Error modes: `fail_fast` (default) and `wait_for_all`. Pipeline-wide concurrency cap via `defaults.max_concurrency`. Uses `std::thread::scope` — no async runtime added. Complete parse-time validation (orphan detection, forward references, cycles, concurrent session conflicts, structured-join compatibility). Turn log entries tagged with `concurrent_group`, `launched_at`, `completed_at`. Template resolution for `{{ step.<join>.<dep>.<field> }}` dotted-path access.
- **Sampling parameter control (SPEC §30, #120)** — three-scope `sampling:` block (pipeline / provider / per-step) with field-level merge; temperature, top_p, top_k, max_tokens, stop_sequences, thinking; runner-specific quantization.

## v0.2 — 2026-04-05

### What works (all v0.1 features plus)
Expand Down
4 changes: 4 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ If nothing found → passthrough mode (safe zero-config default).
| `{{ for_each.<as_name> }}` | Current item value under the declared `as:` name (e.g. `{{ for_each.task }}` when `as: task`) |
| `{{ for_each.index }}` | Current 1-based item index (only inside `for_each:` body) |
| `{{ for_each.total }}` | Total number of items in the collection, after `max_items` cap (only inside `for_each:` body) |
| `{{ step.<join_id>.response }}` | Concatenated string output of an `action: join` step (SPEC §29.4) |
| `{{ step.<join_id>.<dep_id>.response }}` | Full structured output of a named dependency in a structured join (SPEC §29.5) |
| `{{ step.<join_id>.<dep_id>.<field> }}` | Specific field within a namespaced structured dependency output (SPEC §29.5) |

Note: `{{ session.invocation_prompt }}` is a supported alias for `{{ step.invocation.prompt }}` in the implementation but is deprecated — prefer the canonical form.

Expand Down Expand Up @@ -185,6 +188,7 @@ Unresolved variables **abort with a typed error** — never silently empty.
- `pipeline:` step bodies support both file-based sub-pipelines and named pipeline references (SPEC §9, §10)
- `do_while:` fully implemented (§27): parse-time validation, executor loop, template vars, step ID namespacing, break/abort_pipeline, shared depth guard (MAX_LOOP_DEPTH=8). `on_max_iterations` field defaults to `abort_pipeline` (configurable variant not yet implemented). Controlled-mode executor events deferred.
- `for_each:` fully implemented (§28): parse-time validation, runtime array iteration, item scope, template vars, break/abort_pipeline, max_items cap, shared depth guard with do_while. Controlled-mode executor events deferred.
- `async:` / `depends_on:` / `action: join` fully implemented (§29): `std::thread::scope`-based parallel dispatch, session forking (clean HTTP store for `resume: false`), string-join (§29.4) and structured-join (§29.5), `on_error: fail_fast`/`wait_for_all`, `defaults.max_concurrency` semaphore, all parse-time validation rules (orphan detection, forward refs, cycle detection, concurrent resume conflict, structured-join compatibility), turn log `concurrent_group`/`launched_at`/`completed_at`, dotted-path template resolution (`{{ step.<join>.<dep>.<field> }}`). Mid-flight runner-level cancellation for `fail_fast` is best-effort (branches complete; first error propagates). Controlled-mode executor events for async launches are deferred.
- `output_schema` / `input_schema` (§26): JSON Schema validation at parse time and runtime. `schema-as-file-path` variant (§26.1) not yet implemented — schemas must be inline.
- `do_while[N]` indexed iteration access (§27.4) is specified but not implemented — template resolver only exposes the final iteration
- `pipeline:` as alternative to inline `steps:` is supported in both `do_while:` and `for_each:` loop bodies
Expand Down
7 changes: 5 additions & 2 deletions ail-core/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Consumed by `ail` (the binary) and future language-server / SDK targets.
| `executor/headless.rs` | `execute()` — headless mode entry point using `NullObserver` |
| `executor/controlled.rs` | `execute_with_control()` — TUI-controlled mode with `ChannelObserver` |
| `executor/events.rs` | `ExecuteOutcome`, `ExecutionControl`, `ExecutorEvent` |
| `executor/parallel.rs` | `ConcurrencySemaphore`, `BranchResult`, `merge_join_results()`, timestamp helpers — SPEC §29 parallel step dispatch primitives |
| `executor/helpers/mod.rs` | Re-exports all helper functions for the executor |
| `executor/helpers/invocation.rs` | `run_invocation_step()` — host-managed invocation step lifecycle |
| `executor/helpers/runner_resolution.rs` | `resolve_step_provider()`, `resolve_step_sampling()` (SPEC §30.3 three-scope merge), `build_step_runner_box()`, `resolve_effective_runner_name()` |
Expand Down Expand Up @@ -69,7 +70,8 @@ pub struct Pipeline { pub steps: Vec<Step>, pub source: Option<PathBuf>, pub def
// default_tools: pipeline-wide fallback; per-step tools override entirely (SPEC §3.2)
// named_pipelines: named pipeline definitions from the `pipelines:` section (SPEC §10)
// sampling_defaults: pipeline-wide sampling baseline (SPEC §30.2); orthogonal to provider-attached sampling on `defaults: ProviderConfig`
pub struct Step { pub id: StepId, pub body: StepBody, pub message: Option<String>, pub tools: Option<ToolPolicy>, pub on_result: Option<Vec<ResultBranch>>, pub model: Option<String>, pub runner: Option<String>, pub condition: Option<Condition>, pub append_system_prompt: Option<Vec<SystemPromptEntry>>, pub system_prompt: Option<String>, pub resume: bool, pub on_error: Option<OnError>, pub before: Vec<Step>, pub then: Vec<Step>, pub output_schema: Option<serde_json::Value>, pub input_schema: Option<serde_json::Value>, pub sampling: Option<SamplingConfig> }
pub struct Step { pub id: StepId, pub body: StepBody, pub message: Option<String>, pub tools: Option<ToolPolicy>, pub on_result: Option<Vec<ResultBranch>>, pub model: Option<String>, pub runner: Option<String>, pub condition: Option<Condition>, pub append_system_prompt: Option<Vec<SystemPromptEntry>>, pub system_prompt: Option<String>, pub resume: bool, pub async_step: bool, pub depends_on: Vec<StepId>, pub on_error: Option<OnError>, pub before: Vec<Step>, pub then: Vec<Step>, pub output_schema: Option<serde_json::Value>, pub input_schema: Option<serde_json::Value>, pub sampling: Option<SamplingConfig> }
// async_step / depends_on: parallel execution primitives (SPEC §29). async_step=true marks a non-blocking step; depends_on lists step IDs this step waits for.
// before: private pre-processing chain (SPEC §5.10) — runs before the step fires
// then: private post-processing chain (SPEC §5.7) — runs after the step completes
// output_schema: optional JSON Schema for validating step output (SPEC §26.1); validated at parse time, response validated at runtime
Expand All @@ -84,7 +86,8 @@ pub enum StepBody { Prompt(String), Skill { name: String }, SubPipeline { path:
// SubPipeline.path may contain {{ variable }} syntax — resolved at execution time (SPEC §11)
// SubPipeline.prompt: when Some, overrides child session's invocation_prompt instead of using parent's last_response (SPEC §9.3)
pub enum ContextSource { Shell(String) }
pub enum ActionKind { PauseForHuman, ModifyOutput { headless_behavior: HitlHeadlessBehavior, default_value: Option<String> } }
pub enum ActionKind { PauseForHuman, ModifyOutput { headless_behavior: HitlHeadlessBehavior, default_value: Option<String> }, Join { on_error_mode: JoinErrorMode } }
pub enum JoinErrorMode { FailFast, WaitForAll } // SPEC §29.7 — default FailFast
pub enum HitlHeadlessBehavior { Skip, Abort, UseDefault }
pub struct ResultBranch { pub matcher: ResultMatcher, pub action: ResultAction }
pub enum ResultMatcher { Contains(String), ExitCode(ExitCodeMatch), Field { name: String, equals: serde_json::Value }, Always }
Expand Down
12 changes: 8 additions & 4 deletions ail-core/src/config/validation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,10 +461,14 @@ fn validate_parallel_constraints(steps: &[Step]) -> Result<(), AilError> {
// Build a set of step IDs in declaration order for forward-reference detection.
let step_ids: Vec<&str> = steps.iter().map(|s| s.id.as_str()).collect();

// Quick exit: if no steps use async or depends_on, nothing to validate.
let has_parallel = steps
.iter()
.any(|s| s.async_step || !s.depends_on.is_empty());
// Quick exit: if no steps use async, depends_on, or action:join, nothing
// to validate. Checking for `action: join` catches the edge case of a
// malformed `action: join` step without `depends_on` — still a parse error.
let has_parallel = steps.iter().any(|s| {
s.async_step
|| !s.depends_on.is_empty()
|| matches!(s.body, StepBody::Action(ActionKind::Join { .. }))
});
if !has_parallel {
return Ok(());
}
Expand Down
2 changes: 1 addition & 1 deletion ail-core/src/executor/controlled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl<'a> StepObserver for ChannelObserver<'a> {
/// Blocks on `hitl_rx.recv()` when a `pause_for_human` step is reached.
pub fn execute_with_control(
session: &mut Session,
runner: &dyn Runner,
runner: &(dyn Runner + Sync),
control: &ExecutionControl,
disabled_steps: &HashSet<String>,
event_tx: mpsc::Sender<ExecutorEvent>,
Expand Down
Loading
Loading