From 6650410626110f5e29c32269bec0156b15d6ed7c Mon Sep 17 00:00:00 2001 From: Pino de Candia <32303022+pinodeca@users.noreply.github.com> Date: Fri, 12 Jun 2026 18:55:28 +0000 Subject: [PATCH 1/3] Preserve sub-orchestration parent link across ContinueAsNew --- src/runtime/dispatchers/orchestration.rs | 4 +- src/runtime/state_helpers.rs | 43 +++++++++++++++++++- tests/e2e_samples.rs | 50 ++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 3 deletions(-) diff --git a/src/runtime/dispatchers/orchestration.rs b/src/runtime/dispatchers/orchestration.rs index 99ef7fe..22840c1 100644 --- a/src/runtime/dispatchers/orchestration.rs +++ b/src/runtime/dispatchers/orchestration.rs @@ -1313,8 +1313,8 @@ impl Runtime { } => Some(( orchestration.clone(), input.clone(), - None, - None, + history_mgr.parent_instance.clone(), + history_mgr.parent_id, Some(carry_forward_events.clone()), )), _ => None, diff --git a/src/runtime/state_helpers.rs b/src/runtime/state_helpers.rs index 5c4c45c..4ea0324 100644 --- a/src/runtime/state_helpers.rs +++ b/src/runtime/state_helpers.rs @@ -470,7 +470,14 @@ impl WorkItemReader { carried.append(&mut completion_messages); completion_messages = carried; - (orchestration.clone(), input.clone(), version.clone(), None, None, true) + ( + orchestration.clone(), + input.clone(), + version.clone(), + history_mgr.parent_instance.clone(), + history_mgr.parent_id, + true, + ) } _ => unreachable!(), } @@ -756,6 +763,40 @@ mod tests { assert_eq!(reader.parent_id, None); } + #[test] + fn test_workitem_reader_can_preserves_parent_link_from_history() { + let messages = vec![WorkItem::ContinueAsNew { + instance: "child-inst".to_string(), + orchestration: "test-orch".to_string(), + input: "new-input".to_string(), + version: Some("2.0.0".to_string()), + carry_forward_events: vec![], + initial_custom_status: None, + }]; + let history = vec![Event::with_event_id( + 1, + "child-inst", + 1, + None, + EventKind::OrchestrationStarted { + name: "test-orch".to_string(), + version: "1.0.0".to_string(), + input: "old-input".to_string(), + parent_instance: Some("parent-inst".to_string()), + parent_id: Some(42), + carry_forward_events: None, + initial_custom_status: None, + }, + )]; + + let history_mgr = HistoryManager::from_history(&history); + let reader = WorkItemReader::from_messages(&messages, &history_mgr, "child-inst"); + + assert!(reader.is_continue_as_new); + assert_eq!(reader.parent_instance, Some("parent-inst".to_string())); + assert_eq!(reader.parent_id, Some(42)); + } + #[test] fn test_workitem_reader_completion_only() { let messages = vec![ diff --git a/tests/e2e_samples.rs b/tests/e2e_samples.rs index d206e24..7d62698 100644 --- a/tests/e2e_samples.rs +++ b/tests/e2e_samples.rs @@ -583,6 +583,56 @@ async fn sample_sub_orchestration_basic_fs() { rt.shutdown(None).await; } +/// Sub-orchestrations: child uses ContinueAsNew before completing. +/// +/// Highlights: +/// - Parent awaits a child orchestration across multiple child executions +/// - Child preserves its parent link when rolling over with ContinueAsNew +#[tokio::test] +async fn sample_sub_orchestration_continue_as_new_fs() { + let (store, _temp_dir) = common::create_sqlite_store_disk().await; + + let activity_registry = ActivityRegistry::builder().build(); + + let loop_child = |ctx: OrchestrationContext, input: String| async move { + let n: u32 = input.parse().unwrap_or(0); + if n < 3 { + return ctx.continue_as_new((n + 1).to_string()).await; + } + + Ok(format!("child-final:{n}")) + }; + let parent = |ctx: OrchestrationContext, input: String| async move { + let r = ctx.schedule_sub_orchestration("LoopChild", input).await.unwrap(); + Ok(format!("parent:{r}")) + }; + + let orchestration_registry = OrchestrationRegistry::builder() + .register("LoopChild", loop_child) + .register("ParentCan", parent) + .build(); + + let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await; + let client = Client::new(store.clone()); + client + .start_orchestration("inst-sub-can", "ParentCan", "0") + .await + .unwrap(); + + match client + .wait_for_orchestration("inst-sub-can", std::time::Duration::from_secs(5)) + .await + .unwrap() + { + runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "parent:child-final:3"), + runtime::OrchestrationStatus::Failed { details, .. } => { + panic!("orchestration failed: {}", details.display_message()) + } + _ => panic!("unexpected orchestration status"), + } + rt.shutdown(None).await; +} + /// Sub-orchestrations: fan-out to multiple children and join. /// /// Highlights: From 4f9d29cdc806e9886e3a01d64a471e1e5b62abac Mon Sep 17 00:00:00 2001 From: Pino de Candia <32303022+pinodeca@users.noreply.github.com> Date: Mon, 22 Jun 2026 19:34:44 +0000 Subject: [PATCH 2/3] Stamp parent link onto ContinueAsNew work item with history fallback Address review feedback: wire parent_instance/parent_id directly into WorkItem::ContinueAsNew instead of deriving solely from prior execution history. Fields are optional (#[serde(default)]) for backward compatibility; consumers fall back to history for messages enqueued by older runtimes. --- pr33-review.md | 33 ++++++++++++++++ .../capability_filtering.rs | 2 + src/provider_validation/kv_store.rs | 2 + src/provider_validation/multi_execution.rs | 2 + src/provider_validation/prune.rs | 4 ++ src/providers/mod.rs | 16 ++++++++ src/runtime/dispatchers/orchestration.rs | 8 +++- src/runtime/execution.rs | 2 + src/runtime/state_helpers.rs | 39 ++++++++++++++++++- 9 files changed, 104 insertions(+), 4 deletions(-) create mode 100644 pr33-review.md diff --git a/pr33-review.md b/pr33-review.md new file mode 100644 index 0000000..9a6fb69 --- /dev/null +++ b/pr33-review.md @@ -0,0 +1,33 @@ +# Review of PR #33 — Qualify auto-generated sub-orchestration IDs across `continue_as_new` generations + +I've read the full diff, traced the surrounding replay logic, checked all derivation/parsing call sites, and run the affected test suite (22/22 pass, including the new test). + +## Verdict: Correct, minimal, and rolling-upgrade safe. Approve with one recommended follow-up (an end-to-end regression test). + +## Why the fix is correct +The bug premise holds: `continue_as_new` resets `event_id` to `INITIAL_EVENT_ID` for the new generation, so auto IDs `sub::{event_id}` in generation N collide with generation N-1 (the parent instance is unchanged across generations, only `execution_id` increments). The provider then can't create a fresh child instance and the parent stalls waiting on an already-terminal child. Qualifying with `execution_id` for N>1 (`sub::{execution_id}.{event_id}`) removes the collision while leaving execution 1 byte-for-byte unchanged. + +## Rolling-upgrade safety (verified — this is the riskiest aspect) +This is safe for mixed-version clusters and in-flight instances: +- During replay, recorded sub-orchestrations are matched against history in `src/runtime/replay_engine.rs` (~L826-845) and bound to the **historical** instance ID. `action_matches_event_kind` for `SubOrch` compares only `name` and `input` (not instance) — `replay_engine.rs` (~L1880-1885) — so existing children keep their old `sub::{event_id}` IDs. +- The new derivation in `update_action_event_id` only applies to **new** actions beyond history. So an execution-2 instance already in flight under the old binary keeps its recorded children and only adopts the qualified format for newly scheduled ones. No determinism break. + +## No collateral concerns +- The auto ID is derived in exactly one place (`replay_engine.rs` ~L1793-1801); there's no duplicate path that was missed. +- The new `.` separator is safe: sub-orch IDs are never parsed numerically. `is_auto_generated_sub_orch_id` and `build_child_instance_id` (`src/lib.rs` ~L877-892) key only on the `sub::` prefix. +- Keeping execution 1 unqualified is a deliberate, well-documented compatibility choice. + +## Recommended follow-up (non-blocking) +The added test (`tests/replay_engine/sub_orchestration.rs` ~L47-70) only asserts the ID **format** at the replay-engine unit level. There's no end-to-end test reproducing the actual stall it fixes. Given the repo's scenario-test conventions, I'd recommend an integration test that: +1. Runs a parent that schedules a sub-orchestration, then `continue_as_new`s, +2. Schedules another sub-orchestration in the new generation whose `event_id` matches the prior generation's child, +3. Asserts the second child actually runs to completion and the parent completes (would have stalled/deadlocked before this fix). + +The existing `tests/replay_engine/fresh_execution.rs` `sub_orch_then_continue_as_new` test only covers execution 1 and doesn't exercise the cross-generation reuse. + +## Minor nits +- The PR checklist leaves "Regular tests pass locally" and "Doc tests pass locally" unchecked — worth confirming `cargo test --doc --all-features` before merge since `cargo nt` doesn't run doctests. +- The docs change also drops a trailing blank line in `docs/sub-orchestrations.md` — harmless. + +## Summary +Overall this is a clean, well-scoped fix. The only thing I'd ask for before merge is the end-to-end regression test to lock in the actual behavior, not just the ID string format. diff --git a/src/provider_validation/capability_filtering.rs b/src/provider_validation/capability_filtering.rs index b304a37..1004749 100644 --- a/src/provider_validation/capability_filtering.rs +++ b/src/provider_validation/capability_filtering.rs @@ -436,6 +436,8 @@ pub async fn test_continue_as_new_execution_gets_own_pinned_version(fact orchestration: "TestOrch".to_string(), input: "new-input".to_string(), version: Some("1.0.0".to_string()), + parent_instance: None, + parent_id: None, carry_forward_events: vec![], initial_custom_status: None, }, diff --git a/src/provider_validation/prune.rs b/src/provider_validation/prune.rs index a8280a5..cffe2e4 100644 --- a/src/provider_validation/prune.rs +++ b/src/provider_validation/prune.rs @@ -303,6 +303,8 @@ async fn create_running_multi_execution_instance( orchestration: "LongRunning".to_string(), input: "{}".to_string(), version: Some("1.0.0".to_string()), + parent_instance: None, + parent_id: None, carry_forward_events: vec![], initial_custom_status: None, } @@ -368,6 +370,8 @@ async fn create_multi_execution_instance( orchestration: "TestOrch".to_string(), input: "{}".to_string(), version: Some("1.0.0".to_string()), + parent_instance: None, + parent_id: None, carry_forward_events: vec![], initial_custom_status: None, } diff --git a/src/providers/mod.rs b/src/providers/mod.rs index 6ec684c..4bc29ac 100644 --- a/src/providers/mod.rs +++ b/src/providers/mod.rs @@ -539,6 +539,22 @@ pub enum WorkItem { orchestration: String, input: String, version: Option, + /// Parent instance ID, preserved across continue-as-new so a logical + /// sub-orchestration can still notify its parent when a later execution + /// completes or fails. + /// + /// Optional for backward compatibility: messages enqueued by older + /// runtimes won't carry this, in which case the runtime falls back to + /// deriving the parent link from the previous execution's history. + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + parent_instance: Option, + /// Parent's `SubOrchestrationScheduled` event id, preserved across + /// continue-as-new alongside `parent_instance`. Optional for the same + /// backward-compatibility reason described above. + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + parent_id: Option, /// Persistent events carried forward from the previous execution. /// These are seeded into the new execution's history before any new /// externally-raised events, preserving FIFO order across CAN boundaries. diff --git a/src/runtime/dispatchers/orchestration.rs b/src/runtime/dispatchers/orchestration.rs index 22840c1..fd8f6e5 100644 --- a/src/runtime/dispatchers/orchestration.rs +++ b/src/runtime/dispatchers/orchestration.rs @@ -1308,13 +1308,17 @@ impl Runtime { WorkItem::ContinueAsNew { orchestration, input, + parent_instance, + parent_id, carry_forward_events, .. } => Some(( orchestration.clone(), input.clone(), - history_mgr.parent_instance.clone(), - history_mgr.parent_id, + // Prefer the parent link carried on the work item; fall back to the + // previous execution's history for messages enqueued by older runtimes. + parent_instance.clone().or_else(|| history_mgr.parent_instance.clone()), + parent_id.or(history_mgr.parent_id), Some(carry_forward_events.clone()), )), _ => None, diff --git a/src/runtime/execution.rs b/src/runtime/execution.rs index c5801c5..1ccbe9e 100644 --- a/src/runtime/execution.rs +++ b/src/runtime/execution.rs @@ -336,6 +336,8 @@ impl Runtime { orchestration: orchestration_name.to_string(), input: input.clone(), version: version.clone(), + parent_instance: history_mgr.parent_instance.clone(), + parent_id: history_mgr.parent_id, carry_forward_events: unmatched, initial_custom_status, }); diff --git a/src/runtime/state_helpers.rs b/src/runtime/state_helpers.rs index 4ea0324..e96842b 100644 --- a/src/runtime/state_helpers.rs +++ b/src/runtime/state_helpers.rs @@ -453,6 +453,8 @@ impl WorkItemReader { orchestration, input, version, + parent_instance, + parent_id, carry_forward_events, .. } => { @@ -470,12 +472,15 @@ impl WorkItemReader { carried.append(&mut completion_messages); completion_messages = carried; + // Prefer the parent link carried on the work item. Fall back to + // the previous execution's history for messages enqueued by older + // runtimes that didn't stamp these fields (rolling-upgrade compat). ( orchestration.clone(), input.clone(), version.clone(), - history_mgr.parent_instance.clone(), - history_mgr.parent_id, + parent_instance.clone().or_else(|| history_mgr.parent_instance.clone()), + parent_id.or(history_mgr.parent_id), true, ) } @@ -748,6 +753,8 @@ mod tests { orchestration: "test-orch".to_string(), input: "new-input".to_string(), version: Some("2.0.0".to_string()), + parent_instance: None, + parent_id: None, carry_forward_events: vec![], initial_custom_status: None, }]; @@ -763,13 +770,39 @@ mod tests { assert_eq!(reader.parent_id, None); } + #[test] + fn test_workitem_reader_can_preserves_parent_link_from_work_item() { + // New runtimes stamp the parent link directly onto the ContinueAsNew work item. + let messages = vec![WorkItem::ContinueAsNew { + instance: "child-inst".to_string(), + orchestration: "test-orch".to_string(), + input: "new-input".to_string(), + version: Some("2.0.0".to_string()), + parent_instance: Some("parent-inst".to_string()), + parent_id: Some(42), + carry_forward_events: vec![], + initial_custom_status: None, + }]; + + let history_mgr = HistoryManager::from_history(&[]); + let reader = WorkItemReader::from_messages(&messages, &history_mgr, "child-inst"); + + assert!(reader.is_continue_as_new); + assert_eq!(reader.parent_instance, Some("parent-inst".to_string())); + assert_eq!(reader.parent_id, Some(42)); + } + #[test] fn test_workitem_reader_can_preserves_parent_link_from_history() { + // Backcompat: a ContinueAsNew work item enqueued by an older runtime carries no + // parent fields, so the link must be recovered from the previous execution's history. let messages = vec![WorkItem::ContinueAsNew { instance: "child-inst".to_string(), orchestration: "test-orch".to_string(), input: "new-input".to_string(), version: Some("2.0.0".to_string()), + parent_instance: None, + parent_id: None, carry_forward_events: vec![], initial_custom_status: None, }]; @@ -1021,6 +1054,8 @@ mod tests { orchestration: "orch".to_string(), input: "new".to_string(), version: None, + parent_instance: None, + parent_id: None, carry_forward_events: vec![ ("X".to_string(), "old-x".to_string()), ("Y".to_string(), "old-y".to_string()), From 78d3716821f0c061ae98b2577a2a407a9cfbd1a0 Mon Sep 17 00:00:00 2001 From: Pino de Candia <32303022+pinodeca@users.noreply.github.com> Date: Mon, 22 Jun 2026 19:38:16 +0000 Subject: [PATCH 3/3] Remove stray pr33-review.md from previous commit --- pr33-review.md | 33 --------------------------------- 1 file changed, 33 deletions(-) delete mode 100644 pr33-review.md diff --git a/pr33-review.md b/pr33-review.md deleted file mode 100644 index 9a6fb69..0000000 --- a/pr33-review.md +++ /dev/null @@ -1,33 +0,0 @@ -# Review of PR #33 — Qualify auto-generated sub-orchestration IDs across `continue_as_new` generations - -I've read the full diff, traced the surrounding replay logic, checked all derivation/parsing call sites, and run the affected test suite (22/22 pass, including the new test). - -## Verdict: Correct, minimal, and rolling-upgrade safe. Approve with one recommended follow-up (an end-to-end regression test). - -## Why the fix is correct -The bug premise holds: `continue_as_new` resets `event_id` to `INITIAL_EVENT_ID` for the new generation, so auto IDs `sub::{event_id}` in generation N collide with generation N-1 (the parent instance is unchanged across generations, only `execution_id` increments). The provider then can't create a fresh child instance and the parent stalls waiting on an already-terminal child. Qualifying with `execution_id` for N>1 (`sub::{execution_id}.{event_id}`) removes the collision while leaving execution 1 byte-for-byte unchanged. - -## Rolling-upgrade safety (verified — this is the riskiest aspect) -This is safe for mixed-version clusters and in-flight instances: -- During replay, recorded sub-orchestrations are matched against history in `src/runtime/replay_engine.rs` (~L826-845) and bound to the **historical** instance ID. `action_matches_event_kind` for `SubOrch` compares only `name` and `input` (not instance) — `replay_engine.rs` (~L1880-1885) — so existing children keep their old `sub::{event_id}` IDs. -- The new derivation in `update_action_event_id` only applies to **new** actions beyond history. So an execution-2 instance already in flight under the old binary keeps its recorded children and only adopts the qualified format for newly scheduled ones. No determinism break. - -## No collateral concerns -- The auto ID is derived in exactly one place (`replay_engine.rs` ~L1793-1801); there's no duplicate path that was missed. -- The new `.` separator is safe: sub-orch IDs are never parsed numerically. `is_auto_generated_sub_orch_id` and `build_child_instance_id` (`src/lib.rs` ~L877-892) key only on the `sub::` prefix. -- Keeping execution 1 unqualified is a deliberate, well-documented compatibility choice. - -## Recommended follow-up (non-blocking) -The added test (`tests/replay_engine/sub_orchestration.rs` ~L47-70) only asserts the ID **format** at the replay-engine unit level. There's no end-to-end test reproducing the actual stall it fixes. Given the repo's scenario-test conventions, I'd recommend an integration test that: -1. Runs a parent that schedules a sub-orchestration, then `continue_as_new`s, -2. Schedules another sub-orchestration in the new generation whose `event_id` matches the prior generation's child, -3. Asserts the second child actually runs to completion and the parent completes (would have stalled/deadlocked before this fix). - -The existing `tests/replay_engine/fresh_execution.rs` `sub_orch_then_continue_as_new` test only covers execution 1 and doesn't exercise the cross-generation reuse. - -## Minor nits -- The PR checklist leaves "Regular tests pass locally" and "Doc tests pass locally" unchecked — worth confirming `cargo test --doc --all-features` before merge since `cargo nt` doesn't run doctests. -- The docs change also drops a trailing blank line in `docs/sub-orchestrations.md` — harmless. - -## Summary -Overall this is a clean, well-scoped fix. The only thing I'd ask for before merge is the end-to-end regression test to lock in the actual behavior, not just the ID string format.