Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ail-core/src/config/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ pub const MAX_SUB_PIPELINE_DEPTH: usize = 16;
/// Prevents runaway resource consumption from deeply nested loops (SPEC §27).
pub const MAX_LOOP_DEPTH: usize = 8;

/// Maximum number of `action: reload_self` invocations per pipeline run (SPEC §21).
/// Prevents infinite self-rewrite loops — the LLM can edit `.ail.yaml` and reload,
/// but only a bounded number of times within a single `ail --once` invocation.
pub const MAX_RELOADS_PER_RUN: usize = 16;

/// Provider and model configuration resolved from pipeline defaults, per-step overrides,
/// or CLI flags. All fields are optional — unset fields fall back to runner/environment defaults.
#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -377,6 +382,11 @@ pub enum ActionKind {
/// Error handling mode — controls behavior when a dependency fails (SPEC §29.7).
on_error_mode: JoinErrorMode,
},
/// Hot-reload the active pipeline from its source file on disk (SPEC §21).
/// Re-parses `session.pipeline.source`, validates, swaps `session.pipeline`
/// in place, and re-anchors the top-level sequential loop by matching the
/// reload step's own id in the new step list.
ReloadSelf,
}

/// Error handling mode for `action: join` steps (SPEC §29.7).
Expand Down
1 change: 0 additions & 1 deletion ail-core/src/config/validation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1181,7 +1181,6 @@ mod tests {
id: Some("ctx".to_string()),
context: Some(ContextDto {
shell: Some("git status".to_string()),
..Default::default()
}),
..Default::default()
}]);
Expand Down
1 change: 1 addition & 0 deletions ail-core/src/config/validation/step_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub(in crate::config) fn parse_step_body(
on_error_mode: JoinErrorMode::FailFast,
}))
}
"reload_self" => Ok(StepBody::Action(ActionKind::ReloadSelf)),
other => Err(cfg_err!(
"Step '{id_str}' specifies unknown action '{other}'"
)),
Expand Down
28 changes: 25 additions & 3 deletions ail-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ pub enum AilError {
detail: String,
context: Option<ErrorContext>,
},

#[error("[ail:pipeline/reload-failed] {detail}")]
PipelineReloadFailed {
detail: String,
context: Option<ErrorContext>,
},
}

impl AilError {
Expand Down Expand Up @@ -228,6 +234,7 @@ impl AilError {
Self::InputSchemaValidationFailed { .. } => error_types::INPUT_SCHEMA_VALIDATION_FAILED,
Self::SchemaCompatibilityFailed { .. } => error_types::SCHEMA_COMPATIBILITY_FAILED,
Self::ForEachSourceInvalid { .. } => error_types::FOR_EACH_SOURCE_INVALID,
Self::PipelineReloadFailed { .. } => error_types::PIPELINE_RELOAD_FAILED,
}
}

Expand Down Expand Up @@ -260,7 +267,8 @@ impl AilError {
| Self::OutputSchemaValidationFailed { detail, .. }
| Self::InputSchemaValidationFailed { detail, .. }
| Self::SchemaCompatibilityFailed { detail, .. }
| Self::ForEachSourceInvalid { detail, .. } => detail,
| Self::ForEachSourceInvalid { detail, .. }
| Self::PipelineReloadFailed { detail, .. } => detail,
}
}

Expand Down Expand Up @@ -291,7 +299,8 @@ impl AilError {
| Self::OutputSchemaValidationFailed { detail, .. }
| Self::InputSchemaValidationFailed { detail, .. }
| Self::SchemaCompatibilityFailed { detail, .. }
| Self::ForEachSourceInvalid { detail, .. } => detail,
| Self::ForEachSourceInvalid { detail, .. }
| Self::PipelineReloadFailed { detail, .. } => detail,
}
}

Expand Down Expand Up @@ -324,7 +333,8 @@ impl AilError {
| Self::OutputSchemaValidationFailed { context, .. }
| Self::InputSchemaValidationFailed { context, .. }
| Self::SchemaCompatibilityFailed { context, .. }
| Self::ForEachSourceInvalid { context, .. } => context.as_ref(),
| Self::ForEachSourceInvalid { context, .. }
| Self::PipelineReloadFailed { context, .. } => context.as_ref(),
}
}

Expand Down Expand Up @@ -436,6 +446,10 @@ impl AilError {
detail,
context: ctx,
},
Self::PipelineReloadFailed { detail, .. } => Self::PipelineReloadFailed {
detail,
context: ctx,
},
}
}

Expand Down Expand Up @@ -631,6 +645,13 @@ impl AilError {
context: None,
}
}

pub fn pipeline_reload_failed(detail: impl Into<String>) -> Self {
Self::PipelineReloadFailed {
detail: detail.into(),
context: None,
}
}
}

pub mod error_types {
Expand Down Expand Up @@ -659,6 +680,7 @@ pub mod error_types {
pub const INPUT_SCHEMA_VALIDATION_FAILED: &str = "ail:schema/input-validation-failed";
pub const SCHEMA_COMPATIBILITY_FAILED: &str = "ail:schema/compatibility-failed";
pub const FOR_EACH_SOURCE_INVALID: &str = "ail:for-each/source-invalid";
pub const PIPELINE_RELOAD_FAILED: &str = "ail:pipeline/reload-failed";
}

#[cfg(test)]
Expand Down
176 changes: 167 additions & 9 deletions ail-core/src/executor/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

use crate::config::domain::{
ActionKind, Condition, ConditionExpr, ContextSource, JoinErrorMode, OnError, OnMaxItems,
ResultAction, Step, StepBody, StepId, MAX_LOOP_DEPTH,
ResultAction, Step, StepBody, StepId, MAX_LOOP_DEPTH, MAX_RELOADS_PER_RUN,
};
use crate::error::AilError;
use crate::runner::{CancelToken, InvokeOptions, RunResult, Runner};
Expand Down Expand Up @@ -260,6 +260,13 @@ pub(super) fn execute_single_step<O: StepObserver>(
return Ok(None);
}

// action: reload_self — hot-reload the active pipeline from its source file
// on disk (SPEC §21). Handled inline: re-parse, swap session.pipeline, set
// reload_requested so the top-level sequential loop re-anchors after this step.
if let StepBody::Action(ActionKind::ReloadSelf) = &step.body {
return handle_reload_self(&step_id, session).map(|_| None);
}

// action: join — synchronization barrier. Handled in execute_core_with_parallel
// before this function is reached. If a join step reaches here, it means there
// were no async steps to coordinate; produce an empty join entry as a no-op.
Expand Down Expand Up @@ -386,6 +393,10 @@ pub(super) fn execute_single_step<O: StepObserver>(
unreachable!("Join handled above")
}

StepBody::Action(ActionKind::ReloadSelf) => {
unreachable!("ReloadSelf handled above")
}

StepBody::SubPipeline {
path: path_template,
prompt,
Expand Down Expand Up @@ -1381,30 +1392,73 @@ pub(super) fn execute_core<O: StepObserver>(
execute_core_with_parallel(session, runner, observer, depth, &steps, total_steps)
}

/// Sequential execution path (no async steps). Preserves the pre-§29 behavior.
/// Sequential execution path (no async steps). Preserves the pre-§29 behavior
/// and additionally honours `session.reload_requested` (SPEC §21): after each
/// step, if the flag is set, re-clone `session.pipeline.steps` and re-anchor
/// the cursor by matching the current step's id in the reloaded list.
fn execute_core_sequential<O: StepObserver>(
session: &mut Session,
runner: &(dyn Runner + Sync),
observer: &mut O,
depth: usize,
steps: &[Step],
total_steps: usize,
initial_steps: &[Step],
initial_total_steps: usize,
) -> Result<ExecuteOutcome, AilError> {
for (step_index, step) in steps.iter().enumerate() {
match dispatch_top_level_step(
step,
// Local mutable step list: starts as the caller's snapshot but may be
// re-cloned from session.pipeline.steps when a reload fires.
let mut steps: Vec<Step> = initial_steps.to_vec();
let mut total_steps = initial_total_steps;
let mut step_index: usize = 0;

while step_index < steps.len() {
let current_id = steps[step_index].id.as_str().to_string();

let control = dispatch_top_level_step(
&steps[step_index],
step_index,
session,
runner,
observer,
depth,
total_steps,
)? {
)?;

match control {
LoopControl::Continue => {}
LoopControl::Skip => continue,
LoopControl::Skip => {
step_index += 1;
continue;
}
LoopControl::Break => break,
LoopControl::Return(outcome) => return Ok(outcome),
}

// SPEC §21 reload seam: after a successful step, if the step (or any
// nested chain step) requested a hot reload, re-clone the step list
// and re-anchor by matching the current step id. The reload step
// itself (action: reload_self) sets this flag.
if session.reload_requested {
session.reload_requested = false;
steps = session.pipeline.steps.clone();
total_steps = steps.len();
let anchor = steps
.iter()
.position(|s| s.id.as_str() == current_id)
.ok_or_else(|| AilError::PipelineReloadFailed {
detail: format!(
"After reload, the executor could not find the anchor step \
'{current_id}' in the reloaded pipeline — unable to resume"
),
context: Some(crate::error::ErrorContext::for_step(
&session.run_id,
&current_id,
)),
})?;
step_index = anchor + 1;
continue;
}

step_index += 1;
}

let outcome = ExecuteOutcome::Completed;
Expand Down Expand Up @@ -1836,6 +1890,110 @@ fn handle_on_result_action<O: StepObserver>(
}
}

/// Hot-reload the active pipeline from its source file on disk (SPEC §21).
///
/// Re-parses `session.pipeline.source`, validates, swaps `session.pipeline` in
/// place, appends a `TurnEntry` recording the before/after step count, and
/// sets `session.reload_requested` so the top-level sequential executor loop
/// re-clones its steps vec and re-anchors by matching the reload step's id.
///
/// Errors on: passthrough (no source), exhausted `MAX_RELOADS_PER_RUN` cap,
/// config load/validation failure, or missing anchor id in the reloaded pipeline.
fn handle_reload_self(step_id: &str, session: &mut Session) -> Result<(), AilError> {
let source_path =
session
.pipeline
.source
.clone()
.ok_or_else(|| AilError::PipelineReloadFailed {
detail: format!(
"Step '{step_id}' declares action: reload_self but this run has no \
pipeline source on disk (passthrough mode); reload is only supported \
for pipelines loaded from a file"
),
context: Some(crate::error::ErrorContext::for_step(
&session.run_id,
step_id,
)),
})?;

if session.reload_count >= MAX_RELOADS_PER_RUN {
return Err(AilError::PipelineReloadFailed {
detail: format!(
"Step '{step_id}' exceeded the per-run reload cap ({MAX_RELOADS_PER_RUN}); \
the pipeline has already hot-reloaded {} times — aborting to prevent \
an infinite self-rewrite loop",
session.reload_count
),
context: Some(crate::error::ErrorContext::for_step(
&session.run_id,
step_id,
)),
});
}

let before_len = session.pipeline.steps.len();

let new_pipeline =
crate::config::load(&source_path).map_err(|e| AilError::PipelineReloadFailed {
detail: format!(
"Step '{step_id}' failed to reload pipeline from {}: {}",
source_path.display(),
e.detail()
),
context: Some(crate::error::ErrorContext::for_step(
&session.run_id,
step_id,
)),
})?;

// Anchor-by-id: the reload step's own id must survive in the new pipeline so
// the executor knows where to resume. Reject up front so we don't swap and
// then discover the problem mid-loop.
if !new_pipeline.steps.iter().any(|s| s.id.as_str() == step_id) {
return Err(AilError::PipelineReloadFailed {
detail: format!(
"Step '{step_id}' reloaded pipeline from {} but the reloaded step list \
no longer contains a step with id '{step_id}'; the executor cannot \
determine where to resume",
source_path.display()
),
context: Some(crate::error::ErrorContext::for_step(
&session.run_id,
step_id,
)),
});
}

let after_len = new_pipeline.steps.len();
session.pipeline = new_pipeline;
session.reload_count += 1;
session.reload_requested = true;

tracing::info!(
run_id = %session.run_id,
step_id = %step_id,
before_len,
after_len,
reload_count = session.reload_count,
source = %source_path.display(),
"reload_self swapped pipeline in place"
);

let entry = crate::session::turn_log::TurnEntry {
step_id: step_id.to_string(),
prompt: "reload_self".to_string(),
response: Some(format!(
"reloaded pipeline ({before_len} -> {after_len} steps) from {}",
source_path.display()
)),
..Default::default()
};
session.turn_log.append(entry);

Ok(())
}

/// Validate a join step's merged output against its `output_schema`.
fn validate_join_output_schema(
response: &str,
Expand Down
1 change: 0 additions & 1 deletion ail-core/src/executor/helpers/condition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ mod tests {
use crate::config::domain::{ConditionExpr, ConditionOp, Step, StepBody, StepId};
use crate::session::TurnEntry;
use crate::test_helpers::make_session;
use std::time::SystemTime;

fn session_with_shell_entry(step_id: &str, exit_code: i32, stdout: &str) -> Session {
let mut session = make_session(vec![prompt_step("dummy", "dummy")]);
Expand Down
7 changes: 7 additions & 0 deletions ail-core/src/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ fn chain_step_summary(body: &StepBody) -> String {
StepBody::Action(ActionKind::PauseForHuman) => "action: pause_for_human".to_string(),
StepBody::Action(ActionKind::ModifyOutput { .. }) => "action: modify_output".to_string(),
StepBody::Action(ActionKind::Join { .. }) => "action: join".to_string(),
StepBody::Action(ActionKind::ReloadSelf) => "action: reload_self".to_string(),
StepBody::Context(ContextSource::Shell(cmd)) => {
format!("context: shell: \"{}\"", yaml_quote(cmd))
}
Expand Down Expand Up @@ -221,6 +222,9 @@ pub fn materialize(pipeline: &Pipeline) -> String {
out.push_str(" on_error: wait_for_all\n");
}
}
StepBody::Action(ActionKind::ReloadSelf) => {
out.push_str(" action: reload_self\n");
}
StepBody::Action(ActionKind::ModifyOutput {
ref headless_behavior,
ref default_value,
Expand Down Expand Up @@ -433,6 +437,9 @@ fn serialize_step(out: &mut String, step: &Step, indent: &str, origin_comment: O
out.push_str(&format!("{field_indent}on_error: wait_for_all\n"));
}
}
StepBody::Action(ActionKind::ReloadSelf) => {
out.push_str(&format!("{field_indent}action: reload_self\n"));
}
StepBody::Context(ContextSource::Shell(cmd)) => {
out.push_str(&format!(
"{field_indent}context:\n{field_indent} shell: \"{}\"\n",
Expand Down
2 changes: 1 addition & 1 deletion ail-core/src/runner/dry_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ mod tests {

#[test]
fn dry_run_runner_default_trait() {
let runner = DryRunRunner::default();
let runner = DryRunRunner;
let result = runner.invoke("test", InvokeOptions::default()).unwrap();
assert!(result.response.contains("[DRY RUN]"));
}
Expand Down
Loading
Loading