From aae09101b9a10ae1821bb4b61247acfd0a1c06e5 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 15 Apr 2026 02:14:10 +0000 Subject: [PATCH] =?UTF-8?q?feat(=C2=A721):=20action:=20reload=5Fself=20?= =?UTF-8?q?=E2=80=94=20in-run=20pipeline=20hot=20reload?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ship the minimum primitive for self-modifying pipelines (#132): a step type that re-reads the active .ail.yaml, swaps session.pipeline in place, and re-anchors the top-level sequential executor by step-id so newly-added tail steps execute in the same run. Previously the only way for an LLM-edited pipeline to take effect was on the next ail --once invocation — the executor captured a static Vec at startup. spec/core/s21 framed this as "deferred / between runs only"; that framing was incorrect. A minimum-viable mid-run reload is achievable without the full diff-application / FROM-layering / log-injection machinery. Surface: action: reload_self. Anchor-by-id resumption. Hard cap of MAX_RELOADS_PER_RUN = 16. Typed ail:pipeline/reload-failed error for passthrough (no source), cap exhaustion, validation failure, and missing-anchor. Turn-log entry records the before/after step count. Clean up pre-existing clippy/unused-import warnings across the workspace so the build is warning-free. --- ail-core/src/config/domain.rs | 10 + ail-core/src/config/validation/mod.rs | 1 - ail-core/src/config/validation/step_body.rs | 1 + ail-core/src/error.rs | 28 ++- ail-core/src/executor/core.rs | 176 ++++++++++++- ail-core/src/executor/helpers/condition.rs | 1 - ail-core/src/materialize.rs | 7 + ail-core/src/runner/dry_run.rs | 2 +- ail-core/src/session/state.rs | 14 ++ ail-core/src/session/turn_log.rs | 2 - ail-core/tests/spec/mod.rs | 1 + ail-core/tests/spec/s05_7_then_chains.rs | 2 +- ail-core/tests/spec/s16_on_error.rs | 4 +- ail-core/tests/spec/s21_reload_self.rs | 236 ++++++++++++++++++ ail-core/tests/spec/s26_output_schema.rs | 4 +- .../tests/spec/s26_s27_s28_integration.rs | 12 +- ail-core/tests/spec/s27_do_while.rs | 2 +- ail-core/tests/spec/s28_for_each.rs | 2 +- ail/tests/cli_chat.rs | 2 - ail/tests/cli_delete.rs | 4 +- ail/tests/cli_logs_tail.rs | 5 +- ail/tests/cli_materialize.rs | 2 - ail/tests/common/mod.rs | 6 + spec/core/s21-planned-extensions.md | 76 +++++- 24 files changed, 557 insertions(+), 43 deletions(-) create mode 100644 ail-core/tests/spec/s21_reload_self.rs diff --git a/ail-core/src/config/domain.rs b/ail-core/src/config/domain.rs index c8119c87..c9f968c4 100644 --- a/ail-core/src/config/domain.rs +++ b/ail-core/src/config/domain.rs @@ -20,6 +20,11 @@ pub const MAX_SUB_PIPELINE_DEPTH: usize = 16; /// Prevents runaway resource consumption from deeply nested loops (SPEC §27). pub const MAX_LOOP_DEPTH: usize = 8; +/// Maximum number of `action: reload_self` invocations per pipeline run (SPEC §21). +/// Prevents infinite self-rewrite loops — the LLM can edit `.ail.yaml` and reload, +/// but only a bounded number of times within a single `ail --once` invocation. +pub const MAX_RELOADS_PER_RUN: usize = 16; + /// Provider and model configuration resolved from pipeline defaults, per-step overrides, /// or CLI flags. All fields are optional — unset fields fall back to runner/environment defaults. #[derive(Debug, Clone, Default)] @@ -377,6 +382,11 @@ pub enum ActionKind { /// Error handling mode — controls behavior when a dependency fails (SPEC §29.7). on_error_mode: JoinErrorMode, }, + /// Hot-reload the active pipeline from its source file on disk (SPEC §21). + /// Re-parses `session.pipeline.source`, validates, swaps `session.pipeline` + /// in place, and re-anchors the top-level sequential loop by matching the + /// reload step's own id in the new step list. + ReloadSelf, } /// Error handling mode for `action: join` steps (SPEC §29.7). diff --git a/ail-core/src/config/validation/mod.rs b/ail-core/src/config/validation/mod.rs index fc927cbe..27d14d03 100644 --- a/ail-core/src/config/validation/mod.rs +++ b/ail-core/src/config/validation/mod.rs @@ -1181,7 +1181,6 @@ mod tests { id: Some("ctx".to_string()), context: Some(ContextDto { shell: Some("git status".to_string()), - ..Default::default() }), ..Default::default() }]); diff --git a/ail-core/src/config/validation/step_body.rs b/ail-core/src/config/validation/step_body.rs index 1d9f216f..19320aad 100644 --- a/ail-core/src/config/validation/step_body.rs +++ b/ail-core/src/config/validation/step_body.rs @@ -95,6 +95,7 @@ pub(in crate::config) fn parse_step_body( on_error_mode: JoinErrorMode::FailFast, })) } + "reload_self" => Ok(StepBody::Action(ActionKind::ReloadSelf)), other => Err(cfg_err!( "Step '{id_str}' specifies unknown action '{other}'" )), diff --git a/ail-core/src/error.rs b/ail-core/src/error.rs index 06de9f01..4abaa097 100644 --- a/ail-core/src/error.rs +++ b/ail-core/src/error.rs @@ -190,6 +190,12 @@ pub enum AilError { detail: String, context: Option, }, + + #[error("[ail:pipeline/reload-failed] {detail}")] + PipelineReloadFailed { + detail: String, + context: Option, + }, } impl AilError { @@ -228,6 +234,7 @@ impl AilError { Self::InputSchemaValidationFailed { .. } => error_types::INPUT_SCHEMA_VALIDATION_FAILED, Self::SchemaCompatibilityFailed { .. } => error_types::SCHEMA_COMPATIBILITY_FAILED, Self::ForEachSourceInvalid { .. } => error_types::FOR_EACH_SOURCE_INVALID, + Self::PipelineReloadFailed { .. } => error_types::PIPELINE_RELOAD_FAILED, } } @@ -260,7 +267,8 @@ impl AilError { | Self::OutputSchemaValidationFailed { detail, .. } | Self::InputSchemaValidationFailed { detail, .. } | Self::SchemaCompatibilityFailed { detail, .. } - | Self::ForEachSourceInvalid { detail, .. } => detail, + | Self::ForEachSourceInvalid { detail, .. } + | Self::PipelineReloadFailed { detail, .. } => detail, } } @@ -291,7 +299,8 @@ impl AilError { | Self::OutputSchemaValidationFailed { detail, .. } | Self::InputSchemaValidationFailed { detail, .. } | Self::SchemaCompatibilityFailed { detail, .. } - | Self::ForEachSourceInvalid { detail, .. } => detail, + | Self::ForEachSourceInvalid { detail, .. } + | Self::PipelineReloadFailed { detail, .. } => detail, } } @@ -324,7 +333,8 @@ impl AilError { | Self::OutputSchemaValidationFailed { context, .. } | Self::InputSchemaValidationFailed { context, .. } | Self::SchemaCompatibilityFailed { context, .. } - | Self::ForEachSourceInvalid { context, .. } => context.as_ref(), + | Self::ForEachSourceInvalid { context, .. } + | Self::PipelineReloadFailed { context, .. } => context.as_ref(), } } @@ -436,6 +446,10 @@ impl AilError { detail, context: ctx, }, + Self::PipelineReloadFailed { detail, .. } => Self::PipelineReloadFailed { + detail, + context: ctx, + }, } } @@ -631,6 +645,13 @@ impl AilError { context: None, } } + + pub fn pipeline_reload_failed(detail: impl Into) -> Self { + Self::PipelineReloadFailed { + detail: detail.into(), + context: None, + } + } } pub mod error_types { @@ -659,6 +680,7 @@ pub mod error_types { pub const INPUT_SCHEMA_VALIDATION_FAILED: &str = "ail:schema/input-validation-failed"; pub const SCHEMA_COMPATIBILITY_FAILED: &str = "ail:schema/compatibility-failed"; pub const FOR_EACH_SOURCE_INVALID: &str = "ail:for-each/source-invalid"; + pub const PIPELINE_RELOAD_FAILED: &str = "ail:pipeline/reload-failed"; } #[cfg(test)] diff --git a/ail-core/src/executor/core.rs b/ail-core/src/executor/core.rs index 930a2c33..942c5bef 100644 --- a/ail-core/src/executor/core.rs +++ b/ail-core/src/executor/core.rs @@ -9,7 +9,7 @@ use crate::config::domain::{ ActionKind, Condition, ConditionExpr, ContextSource, JoinErrorMode, OnError, OnMaxItems, - ResultAction, Step, StepBody, StepId, MAX_LOOP_DEPTH, + ResultAction, Step, StepBody, StepId, MAX_LOOP_DEPTH, MAX_RELOADS_PER_RUN, }; use crate::error::AilError; use crate::runner::{CancelToken, InvokeOptions, RunResult, Runner}; @@ -260,6 +260,13 @@ pub(super) fn execute_single_step( return Ok(None); } + // action: reload_self — hot-reload the active pipeline from its source file + // on disk (SPEC §21). Handled inline: re-parse, swap session.pipeline, set + // reload_requested so the top-level sequential loop re-anchors after this step. + if let StepBody::Action(ActionKind::ReloadSelf) = &step.body { + return handle_reload_self(&step_id, session).map(|_| None); + } + // action: join — synchronization barrier. Handled in execute_core_with_parallel // before this function is reached. If a join step reaches here, it means there // were no async steps to coordinate; produce an empty join entry as a no-op. @@ -386,6 +393,10 @@ pub(super) fn execute_single_step( unreachable!("Join handled above") } + StepBody::Action(ActionKind::ReloadSelf) => { + unreachable!("ReloadSelf handled above") + } + StepBody::SubPipeline { path: path_template, prompt, @@ -1381,30 +1392,73 @@ pub(super) fn execute_core( execute_core_with_parallel(session, runner, observer, depth, &steps, total_steps) } -/// Sequential execution path (no async steps). Preserves the pre-§29 behavior. +/// Sequential execution path (no async steps). Preserves the pre-§29 behavior +/// and additionally honours `session.reload_requested` (SPEC §21): after each +/// step, if the flag is set, re-clone `session.pipeline.steps` and re-anchor +/// the cursor by matching the current step's id in the reloaded list. fn execute_core_sequential( session: &mut Session, runner: &(dyn Runner + Sync), observer: &mut O, depth: usize, - steps: &[Step], - total_steps: usize, + initial_steps: &[Step], + initial_total_steps: usize, ) -> Result { - for (step_index, step) in steps.iter().enumerate() { - match dispatch_top_level_step( - step, + // Local mutable step list: starts as the caller's snapshot but may be + // re-cloned from session.pipeline.steps when a reload fires. + let mut steps: Vec = initial_steps.to_vec(); + let mut total_steps = initial_total_steps; + let mut step_index: usize = 0; + + while step_index < steps.len() { + let current_id = steps[step_index].id.as_str().to_string(); + + let control = dispatch_top_level_step( + &steps[step_index], step_index, session, runner, observer, depth, total_steps, - )? { + )?; + + match control { LoopControl::Continue => {} - LoopControl::Skip => continue, + LoopControl::Skip => { + step_index += 1; + continue; + } LoopControl::Break => break, LoopControl::Return(outcome) => return Ok(outcome), } + + // SPEC §21 reload seam: after a successful step, if the step (or any + // nested chain step) requested a hot reload, re-clone the step list + // and re-anchor by matching the current step id. The reload step + // itself (action: reload_self) sets this flag. + if session.reload_requested { + session.reload_requested = false; + steps = session.pipeline.steps.clone(); + total_steps = steps.len(); + let anchor = steps + .iter() + .position(|s| s.id.as_str() == current_id) + .ok_or_else(|| AilError::PipelineReloadFailed { + detail: format!( + "After reload, the executor could not find the anchor step \ + '{current_id}' in the reloaded pipeline — unable to resume" + ), + context: Some(crate::error::ErrorContext::for_step( + &session.run_id, + ¤t_id, + )), + })?; + step_index = anchor + 1; + continue; + } + + step_index += 1; } let outcome = ExecuteOutcome::Completed; @@ -1836,6 +1890,110 @@ fn handle_on_result_action( } } +/// Hot-reload the active pipeline from its source file on disk (SPEC §21). +/// +/// Re-parses `session.pipeline.source`, validates, swaps `session.pipeline` in +/// place, appends a `TurnEntry` recording the before/after step count, and +/// sets `session.reload_requested` so the top-level sequential executor loop +/// re-clones its steps vec and re-anchors by matching the reload step's id. +/// +/// Errors on: passthrough (no source), exhausted `MAX_RELOADS_PER_RUN` cap, +/// config load/validation failure, or missing anchor id in the reloaded pipeline. +fn handle_reload_self(step_id: &str, session: &mut Session) -> Result<(), AilError> { + let source_path = + session + .pipeline + .source + .clone() + .ok_or_else(|| AilError::PipelineReloadFailed { + detail: format!( + "Step '{step_id}' declares action: reload_self but this run has no \ + pipeline source on disk (passthrough mode); reload is only supported \ + for pipelines loaded from a file" + ), + context: Some(crate::error::ErrorContext::for_step( + &session.run_id, + step_id, + )), + })?; + + if session.reload_count >= MAX_RELOADS_PER_RUN { + return Err(AilError::PipelineReloadFailed { + detail: format!( + "Step '{step_id}' exceeded the per-run reload cap ({MAX_RELOADS_PER_RUN}); \ + the pipeline has already hot-reloaded {} times — aborting to prevent \ + an infinite self-rewrite loop", + session.reload_count + ), + context: Some(crate::error::ErrorContext::for_step( + &session.run_id, + step_id, + )), + }); + } + + let before_len = session.pipeline.steps.len(); + + let new_pipeline = + crate::config::load(&source_path).map_err(|e| AilError::PipelineReloadFailed { + detail: format!( + "Step '{step_id}' failed to reload pipeline from {}: {}", + source_path.display(), + e.detail() + ), + context: Some(crate::error::ErrorContext::for_step( + &session.run_id, + step_id, + )), + })?; + + // Anchor-by-id: the reload step's own id must survive in the new pipeline so + // the executor knows where to resume. Reject up front so we don't swap and + // then discover the problem mid-loop. + if !new_pipeline.steps.iter().any(|s| s.id.as_str() == step_id) { + return Err(AilError::PipelineReloadFailed { + detail: format!( + "Step '{step_id}' reloaded pipeline from {} but the reloaded step list \ + no longer contains a step with id '{step_id}'; the executor cannot \ + determine where to resume", + source_path.display() + ), + context: Some(crate::error::ErrorContext::for_step( + &session.run_id, + step_id, + )), + }); + } + + let after_len = new_pipeline.steps.len(); + session.pipeline = new_pipeline; + session.reload_count += 1; + session.reload_requested = true; + + tracing::info!( + run_id = %session.run_id, + step_id = %step_id, + before_len, + after_len, + reload_count = session.reload_count, + source = %source_path.display(), + "reload_self swapped pipeline in place" + ); + + let entry = crate::session::turn_log::TurnEntry { + step_id: step_id.to_string(), + prompt: "reload_self".to_string(), + response: Some(format!( + "reloaded pipeline ({before_len} -> {after_len} steps) from {}", + source_path.display() + )), + ..Default::default() + }; + session.turn_log.append(entry); + + Ok(()) +} + /// Validate a join step's merged output against its `output_schema`. fn validate_join_output_schema( response: &str, diff --git a/ail-core/src/executor/helpers/condition.rs b/ail-core/src/executor/helpers/condition.rs index b0d06714..31b0854b 100644 --- a/ail-core/src/executor/helpers/condition.rs +++ b/ail-core/src/executor/helpers/condition.rs @@ -85,7 +85,6 @@ mod tests { use crate::config::domain::{ConditionExpr, ConditionOp, Step, StepBody, StepId}; use crate::session::TurnEntry; use crate::test_helpers::make_session; - use std::time::SystemTime; fn session_with_shell_entry(step_id: &str, exit_code: i32, stdout: &str) -> Session { let mut session = make_session(vec![prompt_step("dummy", "dummy")]); diff --git a/ail-core/src/materialize.rs b/ail-core/src/materialize.rs index 266bcd94..a758d4c9 100644 --- a/ail-core/src/materialize.rs +++ b/ail-core/src/materialize.rs @@ -37,6 +37,7 @@ fn chain_step_summary(body: &StepBody) -> String { StepBody::Action(ActionKind::PauseForHuman) => "action: pause_for_human".to_string(), StepBody::Action(ActionKind::ModifyOutput { .. }) => "action: modify_output".to_string(), StepBody::Action(ActionKind::Join { .. }) => "action: join".to_string(), + StepBody::Action(ActionKind::ReloadSelf) => "action: reload_self".to_string(), StepBody::Context(ContextSource::Shell(cmd)) => { format!("context: shell: \"{}\"", yaml_quote(cmd)) } @@ -221,6 +222,9 @@ pub fn materialize(pipeline: &Pipeline) -> String { out.push_str(" on_error: wait_for_all\n"); } } + StepBody::Action(ActionKind::ReloadSelf) => { + out.push_str(" action: reload_self\n"); + } StepBody::Action(ActionKind::ModifyOutput { ref headless_behavior, ref default_value, @@ -433,6 +437,9 @@ fn serialize_step(out: &mut String, step: &Step, indent: &str, origin_comment: O out.push_str(&format!("{field_indent}on_error: wait_for_all\n")); } } + StepBody::Action(ActionKind::ReloadSelf) => { + out.push_str(&format!("{field_indent}action: reload_self\n")); + } StepBody::Context(ContextSource::Shell(cmd)) => { out.push_str(&format!( "{field_indent}context:\n{field_indent} shell: \"{}\"\n", diff --git a/ail-core/src/runner/dry_run.rs b/ail-core/src/runner/dry_run.rs index 93052539..ebd6b088 100644 --- a/ail-core/src/runner/dry_run.rs +++ b/ail-core/src/runner/dry_run.rs @@ -76,7 +76,7 @@ mod tests { #[test] fn dry_run_runner_default_trait() { - let runner = DryRunRunner::default(); + let runner = DryRunRunner; let result = runner.invoke("test", InvokeOptions::default()).unwrap(); assert!(result.response.contains("[DRY RUN]")); } diff --git a/ail-core/src/session/state.rs b/ail-core/src/session/state.rs index 3fe2631c..23d516c1 100644 --- a/ail-core/src/session/state.rs +++ b/ail-core/src/session/state.rs @@ -71,6 +71,13 @@ pub struct Session { /// Current nesting depth of loop constructs (do_while, for_each). Checked against /// `MAX_LOOP_DEPTH` to prevent runaway resource consumption from deeply nested loops. pub loop_depth: usize, + /// Number of `action: reload_self` invocations that have fired in this run (SPEC §21). + /// Checked against `MAX_RELOADS_PER_RUN` to prevent infinite self-rewrite loops. + pub reload_count: usize, + /// Signal set by `action: reload_self` after a successful in-place pipeline swap. + /// The top-level sequential executor loop observes this flag after each step, + /// re-clones `session.pipeline.steps`, and re-anchors by step-id. + pub reload_requested: bool, } impl Session { @@ -116,6 +123,8 @@ impl Session { do_while_context: None, for_each_context: None, loop_depth: 0, + reload_count: 0, + reload_requested: false, } } @@ -191,6 +200,11 @@ impl Session { do_while_context: self.do_while_context.clone(), for_each_context: self.for_each_context.clone(), loop_depth: self.loop_depth, + // Reload state is not inherited by async branches — branches run against + // the forked pipeline snapshot and must not trigger reloads themselves + // (see §21: reload is sequential-top-level only). + reload_count: 0, + reload_requested: false, } } } diff --git a/ail-core/src/session/turn_log.rs b/ail-core/src/session/turn_log.rs index 7dd032ca..dbdfeec0 100644 --- a/ail-core/src/session/turn_log.rs +++ b/ail-core/src/session/turn_log.rs @@ -526,9 +526,7 @@ impl TurnLog { #[cfg(test)] mod tests { use super::*; - use crate::runner::ToolEvent; use crate::session::log_provider::NullProvider; - use std::time::SystemTime; fn make_entry(step_id: &str, response: Option<&str>) -> TurnEntry { TurnEntry { diff --git a/ail-core/tests/spec/mod.rs b/ail-core/tests/spec/mod.rs index 2fe95cfe..1b444ea3 100644 --- a/ail-core/tests/spec/mod.rs +++ b/ail-core/tests/spec/mod.rs @@ -27,6 +27,7 @@ mod s19_plugin_discovery; mod s19_plugin_protocol; mod s21_dry_run; mod s21_mvp; +mod s21_reload_self; mod s23_structured_output; mod s26_output_schema; mod s26_s27_s28_integration; diff --git a/ail-core/tests/spec/s05_7_then_chains.rs b/ail-core/tests/spec/s05_7_then_chains.rs index 206b7b84..e677878c 100644 --- a/ail-core/tests/spec/s05_7_then_chains.rs +++ b/ail-core/tests/spec/s05_7_then_chains.rs @@ -294,7 +294,7 @@ fn nested_chains_execute_in_correct_order() { let mut parent = prompt_step("parent", "main"); // before chain step has its own before chain - let mut before_step = Step { + let before_step = Step { id: StepId("parent::before::0".to_string()), body: StepBody::Prompt("before-main".to_string()), message: None, diff --git a/ail-core/tests/spec/s16_on_error.rs b/ail-core/tests/spec/s16_on_error.rs index 70bcc0a8..c7509bf0 100644 --- a/ail-core/tests/spec/s16_on_error.rs +++ b/ail-core/tests/spec/s16_on_error.rs @@ -1,11 +1,9 @@ //! SPEC §16 — `on_error` error handling with retry, continue, and abort_pipeline. -use ail_core::config::domain::{OnError, Pipeline, Step, StepBody, StepId}; +use ail_core::config::domain::{OnError, Step, StepBody, StepId}; use ail_core::error::{error_types, AilError}; use ail_core::executor::{execute, ExecuteOutcome}; use ail_core::runner::{InvokeOptions, RunResult, Runner}; -use ail_core::session::log_provider::NullProvider; -use ail_core::session::Session; use ail_core::test_helpers::{make_session, prompt_step}; use std::sync::atomic::{AtomicU32, Ordering}; diff --git a/ail-core/tests/spec/s21_reload_self.rs b/ail-core/tests/spec/s21_reload_self.rs new file mode 100644 index 00000000..36153792 --- /dev/null +++ b/ail-core/tests/spec/s21_reload_self.rs @@ -0,0 +1,236 @@ +//! SPEC §21 — `action: reload_self` pipeline hot-reload primitive. +//! +//! Covers the minimum self-modification primitive: a running pipeline re-reads +//! its own `.ail.yaml` on disk, swaps the pipeline in place, and re-anchors the +//! top-level executor loop by step-id. + +use ail_core::config::domain::{ActionKind, Step, StepBody, StepId, MAX_RELOADS_PER_RUN}; +use ail_core::error::error_types; +use ail_core::executor::execute; +use ail_core::runner::stub::StubRunner; +use ail_core::session::log_provider::NullProvider; +use ail_core::session::Session; + +fn write_yaml(dir: &std::path::Path, filename: &str, contents: &str) -> std::path::PathBuf { + let path = dir.join(filename); + std::fs::write(&path, contents).expect("write pipeline fixture"); + path +} + +fn load_session(path: &std::path::Path) -> Session { + let pipeline = ail_core::config::load(path).expect("load pipeline fixture"); + Session::new(pipeline, "invocation prompt".to_string()) + .with_log_provider(Box::new(NullProvider)) +} + +/// When the on-disk pipeline is unchanged, `reload_self` is a no-op aside from +/// its own turn-log entry; subsequent steps still execute. +#[test] +fn reload_self_noop_continues() { + let tmp = tempfile::tempdir().unwrap(); + let path = write_yaml( + tmp.path(), + ".ail.yaml", + r#"version: "1" +pipeline: + - id: before + prompt: "step before reload" + - id: reload + action: reload_self + - id: after + prompt: "step after reload" +"#, + ); + + let mut session = load_session(&path); + let runner = StubRunner::new("stub"); + + execute(&mut session, &runner).expect("pipeline ok"); + + let ids: Vec<&str> = session + .turn_log + .entries() + .iter() + .map(|e| e.step_id.as_str()) + .collect(); + assert_eq!(ids, vec!["before", "reload", "after"]); + + let reload_entry = &session.turn_log.entries()[1]; + assert_eq!(reload_entry.prompt, "reload_self"); + let resp = reload_entry.response.as_deref().unwrap_or_default(); + assert!( + resp.starts_with("reloaded pipeline (3 -> 3 steps)"), + "unexpected reload response: {resp}" + ); + assert_eq!(session.reload_count, 1); + assert!(!session.reload_requested); +} + +/// When the YAML is rewritten before reload_self fires, the new tail steps run +/// in the current invocation — the whole point of the primitive. +#[test] +fn reload_self_picks_up_new_step_appended_to_disk() { + let tmp = tempfile::tempdir().unwrap(); + let path = write_yaml( + tmp.path(), + ".ail.yaml", + r#"version: "1" +pipeline: + - id: rewrite + prompt: "placeholder — in a real run this step's LLM would edit the file" + - id: reload + action: reload_self +"#, + ); + + let mut session = load_session(&path); + + // Simulate the effect of the `rewrite` step: overwrite the YAML on disk to + // append a tail step. In a real run the LLM's Write/Edit tool would do this. + std::fs::write( + &path, + r#"version: "1" +pipeline: + - id: rewrite + prompt: "placeholder" + - id: reload + action: reload_self + - id: newly_added + prompt: "added by rewrite step" +"#, + ) + .unwrap(); + + let runner = StubRunner::new("stub"); + execute(&mut session, &runner).expect("pipeline ok"); + + let ids: Vec<&str> = session + .turn_log + .entries() + .iter() + .map(|e| e.step_id.as_str()) + .collect(); + assert_eq!( + ids, + vec!["rewrite", "reload", "newly_added"], + "the reloaded tail step must execute in the same run" + ); + + let reload_entry = &session.turn_log.entries()[1]; + let resp = reload_entry.response.as_deref().unwrap_or_default(); + assert!( + resp.contains("2 -> 3 steps"), + "reload entry should report step-count delta, got: {resp}" + ); +} + +/// Passthrough pipelines have no `source` — reload_self must abort cleanly +/// with a typed error rather than panic. +#[test] +fn reload_self_in_passthrough_errors() { + let reload_step = Step { + id: StepId("reload".to_string()), + body: StepBody::Action(ActionKind::ReloadSelf), + ..Default::default() + }; + let mut session = Session::new( + ail_core::config::domain::Pipeline { + steps: vec![reload_step], + ..Default::default() + }, + "prompt".to_string(), + ) + .with_log_provider(Box::new(NullProvider)); + + let runner = StubRunner::new("unused"); + let err = execute(&mut session, &runner).expect_err("reload without source must fail"); + assert_eq!(err.error_type(), error_types::PIPELINE_RELOAD_FAILED); + assert!(err.detail().contains("passthrough")); +} + +/// Once the per-run reload cap is hit, further reloads fail fast — guard +/// against an LLM driving an infinite self-rewrite loop. +#[test] +fn reload_self_cap_aborts() { + let tmp = tempfile::tempdir().unwrap(); + let path = write_yaml( + tmp.path(), + ".ail.yaml", + r#"version: "1" +pipeline: + - id: reload + action: reload_self +"#, + ); + + let mut session = load_session(&path); + // Simulate a run that has already hit the cap. + session.reload_count = MAX_RELOADS_PER_RUN; + + let runner = StubRunner::new("unused"); + let err = execute(&mut session, &runner).expect_err("reload beyond cap must fail"); + assert_eq!(err.error_type(), error_types::PIPELINE_RELOAD_FAILED); + assert!(err.detail().contains("reload cap")); +} + +/// If the reloaded pipeline no longer contains the reload step's own id, the +/// executor cannot pick a safe resume point and must abort. +#[test] +fn reload_self_missing_anchor_aborts() { + let tmp = tempfile::tempdir().unwrap(); + let path = write_yaml( + tmp.path(), + ".ail.yaml", + r#"version: "1" +pipeline: + - id: reload + action: reload_self +"#, + ); + + let mut session = load_session(&path); + + // Rewrite the file so the anchor id "reload" disappears. + std::fs::write( + &path, + r#"version: "1" +pipeline: + - id: something_else + prompt: "no reload step here" +"#, + ) + .unwrap(); + + let runner = StubRunner::new("unused"); + let err = execute(&mut session, &runner).expect_err("missing anchor must fail"); + assert_eq!(err.error_type(), error_types::PIPELINE_RELOAD_FAILED); + assert!( + err.detail().contains("reload"), + "error should reference the reload step: {}", + err.detail() + ); +} + +/// If the rewritten YAML is syntactically invalid, reload fails via the typed +/// reload error (not a panic, not a silent skip). +#[test] +fn reload_self_invalid_yaml_aborts() { + let tmp = tempfile::tempdir().unwrap(); + let path = write_yaml( + tmp.path(), + ".ail.yaml", + r#"version: "1" +pipeline: + - id: reload + action: reload_self +"#, + ); + + let mut session = load_session(&path); + + std::fs::write(&path, "{ broken yaml: : : }").unwrap(); + + let runner = StubRunner::new("unused"); + let err = execute(&mut session, &runner).expect_err("invalid reload must fail"); + assert_eq!(err.error_type(), error_types::PIPELINE_RELOAD_FAILED); +} diff --git a/ail-core/tests/spec/s26_output_schema.rs b/ail-core/tests/spec/s26_output_schema.rs index 4565a89c..f7117031 100644 --- a/ail-core/tests/spec/s26_output_schema.rs +++ b/ail-core/tests/spec/s26_output_schema.rs @@ -1,5 +1,5 @@ -/// SPEC s26 -- Structured step I/O schemas: output_schema, input_schema, -/// field:equals: operator, and parse-time compatibility checks. +//! SPEC s26 -- Structured step I/O schemas: output_schema, input_schema, +//! field:equals: operator, and parse-time compatibility checks. mod parse_valid { use ail_core::config; diff --git a/ail-core/tests/spec/s26_s27_s28_integration.rs b/ail-core/tests/spec/s26_s27_s28_integration.rs index 6892d62e..80d7e668 100644 --- a/ail-core/tests/spec/s26_s27_s28_integration.rs +++ b/ail-core/tests/spec/s26_s27_s28_integration.rs @@ -1,9 +1,9 @@ -/// Cross-feature integration tests for §26 (output_schema/input_schema), -/// §27 (do_while), and §28 (for_each). -/// -/// These tests verify that the three feature areas compose correctly when -/// used together in a single pipeline — the scenarios that individual -/// per-section test files cannot cover. +//! Cross-feature integration tests for §26 (output_schema/input_schema), +//! §27 (do_while), and §28 (for_each). +//! +//! These tests verify that the three feature areas compose correctly when +//! used together in a single pipeline — the scenarios that individual +//! per-section test files cannot cover. // ── §26 + §28: for_each consuming schema-validated arrays ────────────────── diff --git a/ail-core/tests/spec/s27_do_while.rs b/ail-core/tests/spec/s27_do_while.rs index 0adbd619..0b894f44 100644 --- a/ail-core/tests/spec/s27_do_while.rs +++ b/ail-core/tests/spec/s27_do_while.rs @@ -1,4 +1,4 @@ -/// SPEC §27 — do_while: bounded repeat-until loop validation tests. +//! SPEC §27 — do_while: bounded repeat-until loop validation tests. mod parse_valid { use ail_core::config; diff --git a/ail-core/tests/spec/s28_for_each.rs b/ail-core/tests/spec/s28_for_each.rs index ec446eb5..37d981a9 100644 --- a/ail-core/tests/spec/s28_for_each.rs +++ b/ail-core/tests/spec/s28_for_each.rs @@ -1,4 +1,4 @@ -/// SPEC §28 — for_each collection iteration. +//! SPEC §28 — for_each collection iteration. // ── Parse-time validation (valid configs) ─────────────────────────────────── diff --git a/ail/tests/cli_chat.rs b/ail/tests/cli_chat.rs index 6704c653..f6743e92 100644 --- a/ail/tests/cli_chat.rs +++ b/ail/tests/cli_chat.rs @@ -5,8 +5,6 @@ mod common; -use predicates::prelude::*; - #[test] fn chat_message_flag_text_mode_one_shot() { let (mut cmd, _home) = common::ail_cmd_isolated(); diff --git a/ail/tests/cli_delete.rs b/ail/tests/cli_delete.rs index e3f6f2cd..39af3400 100644 --- a/ail/tests/cli_delete.rs +++ b/ail/tests/cli_delete.rs @@ -5,8 +5,6 @@ mod common; -use predicates::prelude::*; - /// Helper: run `ail --once "hello" --output-format json` and extract the run_id from /// the `run_started` event. fn create_run(home: &std::path::Path) -> String { @@ -43,7 +41,7 @@ fn delete_existing_run_text() { cmd.assert() .success() - .stdout(predicates::str::contains(&format!("Deleted run {run_id}"))); + .stdout(predicates::str::contains(format!("Deleted run {run_id}"))); } #[test] diff --git a/ail/tests/cli_logs_tail.rs b/ail/tests/cli_logs_tail.rs index 45e9e257..1faa8900 100644 --- a/ail/tests/cli_logs_tail.rs +++ b/ail/tests/cli_logs_tail.rs @@ -5,8 +5,6 @@ mod common; -use predicates::prelude::*; - /// Helper: run `ail --once "hello" --output-format json` and extract the run_id. fn create_run(home: &std::path::Path) -> String { let mut cmd = common::ail_cmd(home); @@ -91,9 +89,8 @@ fn logs_json_format_produces_valid_ndjson() { #[test] fn logs_tail_emits_existing_then_can_be_killed() { - use std::io::Read; use std::process::{Command, Stdio}; - use std::time::{Duration, Instant}; + use std::time::Duration; let home = common::isolated_home(); let run_id = create_run(home.path()); diff --git a/ail/tests/cli_materialize.rs b/ail/tests/cli_materialize.rs index 30e3c902..1446a413 100644 --- a/ail/tests/cli_materialize.rs +++ b/ail/tests/cli_materialize.rs @@ -2,8 +2,6 @@ mod common; -use predicates::prelude::*; - #[test] fn materialize_prints_to_stdout() { let (mut cmd, _home) = common::ail_cmd_isolated(); diff --git a/ail/tests/common/mod.rs b/ail/tests/common/mod.rs index a4b7b5db..8447382f 100644 --- a/ail/tests/common/mod.rs +++ b/ail/tests/common/mod.rs @@ -1,4 +1,10 @@ //! Shared helpers for ail binary integration tests. +//! +//! Each integration test file is compiled as its own crate — `#[allow(dead_code)]` +//! silences the "never used" warnings that appear in targets that don't happen +//! to use every helper here. + +#![allow(dead_code)] use assert_cmd::Command; use std::path::PathBuf; diff --git a/spec/core/s21-planned-extensions.md b/spec/core/s21-planned-extensions.md index 9b08bdd2..74e06281 100644 --- a/spec/core/s21-planned-extensions.md +++ b/spec/core/s21-planned-extensions.md @@ -58,9 +58,11 @@ This creates a systematic feedback loop: the pipeline's prompts improve over tim ### Self-Modifying Pipelines -> **Status: Deferred — post-POC. Significant design work required.** +> **Status: Partially implemented — the minimum primitive (`action: reload_self`) ships in v0.3. Diff application, log injection, and FROM-layered modifications remain deferred.** > > Dependencies: stable pipeline execution (v0.0.1), parallel step execution (above), structured step I/O schemas (above), HITL approval flow (§13), hot reload mechanism (§22). +> +> **Spec correction (v0.3):** Earlier revisions framed hot reload as strictly "between runs" — i.e. a modified `.ail.yaml` would only take effect on the next `ail --once` invocation, never the current one. That framing was wrong. A minimum-viable mid-run reload is achievable with a single action and an anchor-by-ID resumption model; the full diff/FROM/log-injection machinery is not a prerequisite. Hot reload is now implemented (see below); the higher-level machinery remains deferred. #### The Core Vision @@ -105,6 +107,78 @@ Either outcome is informative. A confirmed improvement validates the architectur > **Near-term path:** The SWE-bench experiment does not require the self-modifying pipeline or the plugin system below. It requires only v0.1 primitives: `context: shell:` steps, `on_result` branching on exit codes, and the `--headless` flag. An external driver script iterates over benchmark tasks, calling `ail --headless --once "" --pipeline swe-bench.yaml` per task. See §20 (v0.1 scope) for the target pipeline. +#### Pipeline Hot Reload — `action: reload_self` (Implemented, v0.3) + +The minimum primitive that enables within-run self-modification. A pipeline step edits its own `.ail.yaml` on disk (via the runner's `Write`/`Edit` tools, a `context: shell:` step, or any other mechanism), and the subsequent `reload_self` step re-reads that file and swaps the running pipeline's step list in place. + +```yaml +version: "1" +pipeline: + - id: propose_step + prompt: | + Edit .ail.yaml to append a new step called `cleanup` that runs + `cargo clippy -- -D warnings` via context: shell:. Use the Edit tool. + + - id: reload + action: reload_self + + - id: cleanup_placeholder # may not exist on first run — added by propose_step + condition: "{{ env.CLEANUP_ENABLED }} == 1" + context: + shell: echo "placeholder" +``` + +**Semantics** + +1. The `reload_self` step reads `session.pipeline.source` (the resolved absolute path of the active pipeline file). +2. It calls the same `config::load()` used at startup — full DTO → validation → domain conversion, including `FROM` inheritance. +3. On success, `session.pipeline` is atomically replaced with the reloaded value. +4. The executor re-anchors by matching the reload step's own `id` against the new step list, and continues from the position immediately after that match. +5. A `TurnEntry` is appended for the reload step with `prompt: "reload_self"` and `response` containing the before/after step count (audit trail). +6. If the reloaded pipeline is invalid, the reload fails; the pipeline aborts via the step's declared `on_error` (default `abort_pipeline`). + +**Anchor-by-ID resumption** + +The reload step's ID must still exist in the reloaded pipeline — that is the anchor. Rationale: + +- The old step index is meaningless after reload (steps may have been inserted, removed, or reordered). +- Matching by ID is the only stable identity the runtime has. +- If the anchor ID is missing, the executor cannot safely determine where to resume and aborts with `ail:pipeline/reload-failed`. + +**Guardrails** + +| Guardrail | Behaviour | +|---|---| +| Source required | `reload_self` in passthrough mode (no `.ail.yaml` discovered) aborts with `ail:pipeline/reload-failed`. | +| Reload cap | `MAX_RELOADS_PER_RUN = 16`. Once exceeded, further reloads abort. Prevents infinite self-rewrite loops. | +| Anchor survival | The reload step's own `id` must exist in the reloaded pipeline; otherwise abort. | +| Validation fidelity | Reload uses the same validator as startup. A reloaded pipeline that fails validation aborts the run with the validator's typed error. | +| Top-level only | The reload signal is honoured by the top-level sequential loop. Inside `do_while:` / `for_each:` bodies and `before:`/`then:` chains, the reload is recorded and the pipeline reference is swapped, but the enclosing iteration continues against its frozen inner step list. The new pipeline is observed after control returns to the top level. | +| Sequential dispatch only | Reload does not trigger inside the §29 parallel dispatch path — async branches work against a forked session snapshot and are unaffected. Declaring `async: true` on a `reload_self` step is not supported. | + +**Turn log** + +```json +{ + "step_id": "reload", + "prompt": "reload_self", + "response": "reloaded pipeline (3 -> 5 steps) from /abs/path/.ail.yaml" +} +``` + +This makes self-modifications visible to downstream steps (they can reference `{{ step.reload.response }}`) and to post-hoc audit tooling. + +**What is still deferred** + +`reload_self` is the minimum — it trusts the step that edited the YAML. The following higher-level machinery layers on top and remains unimplemented: + +- `action: apply_pipeline_diff` — structured diff application with validation before write. +- `context: run_log:` — log injection for reflection-style prompt steps. +- `FROM`-layered modification — writing modifications as an inheriting layer rather than overwriting the base. +- Rollback — reverting to the pre-reload pipeline if a subsequent step fails. + +These are additive: a future `apply_pipeline_diff` action would write a validated diff to disk and then internally invoke the same reload mechanism. + #### Required Primitives (not yet specced) **Log injection (`context: run_log:`)**