diff --git a/src/provider_validation/capability_filtering.rs b/src/provider_validation/capability_filtering.rs index b304a37..1004749 100644 --- a/src/provider_validation/capability_filtering.rs +++ b/src/provider_validation/capability_filtering.rs @@ -436,6 +436,8 @@ pub async fn test_continue_as_new_execution_gets_own_pinned_version(fact orchestration: "TestOrch".to_string(), input: "new-input".to_string(), version: Some("1.0.0".to_string()), + parent_instance: None, + parent_id: None, carry_forward_events: vec![], initial_custom_status: None, }, diff --git a/src/provider_validation/prune.rs b/src/provider_validation/prune.rs index a8280a5..cffe2e4 100644 --- a/src/provider_validation/prune.rs +++ b/src/provider_validation/prune.rs @@ -303,6 +303,8 @@ async fn create_running_multi_execution_instance( orchestration: "LongRunning".to_string(), input: "{}".to_string(), version: Some("1.0.0".to_string()), + parent_instance: None, + parent_id: None, carry_forward_events: vec![], initial_custom_status: None, } @@ -368,6 +370,8 @@ async fn create_multi_execution_instance( orchestration: "TestOrch".to_string(), input: "{}".to_string(), version: Some("1.0.0".to_string()), + parent_instance: None, + parent_id: None, carry_forward_events: vec![], initial_custom_status: None, } diff --git a/src/providers/mod.rs b/src/providers/mod.rs index 6ec684c..4bc29ac 100644 --- a/src/providers/mod.rs +++ b/src/providers/mod.rs @@ -539,6 +539,22 @@ pub enum WorkItem { orchestration: String, input: String, version: Option, + /// Parent instance ID, preserved across continue-as-new so a logical + /// sub-orchestration can still notify its parent when a later execution + /// completes or fails. + /// + /// Optional for backward compatibility: messages enqueued by older + /// runtimes won't carry this, in which case the runtime falls back to + /// deriving the parent link from the previous execution's history. + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + parent_instance: Option, + /// Parent's `SubOrchestrationScheduled` event id, preserved across + /// continue-as-new alongside `parent_instance`. Optional for the same + /// backward-compatibility reason described above. + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + parent_id: Option, /// Persistent events carried forward from the previous execution. /// These are seeded into the new execution's history before any new /// externally-raised events, preserving FIFO order across CAN boundaries. diff --git a/src/runtime/dispatchers/orchestration.rs b/src/runtime/dispatchers/orchestration.rs index 99ef7fe..fd8f6e5 100644 --- a/src/runtime/dispatchers/orchestration.rs +++ b/src/runtime/dispatchers/orchestration.rs @@ -1308,13 +1308,17 @@ impl Runtime { WorkItem::ContinueAsNew { orchestration, input, + parent_instance, + parent_id, carry_forward_events, .. } => Some(( orchestration.clone(), input.clone(), - None, - None, + // Prefer the parent link carried on the work item; fall back to the + // previous execution's history for messages enqueued by older runtimes. + parent_instance.clone().or_else(|| history_mgr.parent_instance.clone()), + parent_id.or(history_mgr.parent_id), Some(carry_forward_events.clone()), )), _ => None, diff --git a/src/runtime/execution.rs b/src/runtime/execution.rs index c5801c5..1ccbe9e 100644 --- a/src/runtime/execution.rs +++ b/src/runtime/execution.rs @@ -336,6 +336,8 @@ impl Runtime { orchestration: orchestration_name.to_string(), input: input.clone(), version: version.clone(), + parent_instance: history_mgr.parent_instance.clone(), + parent_id: history_mgr.parent_id, carry_forward_events: unmatched, initial_custom_status, }); diff --git a/src/runtime/state_helpers.rs b/src/runtime/state_helpers.rs index 5c4c45c..e96842b 100644 --- a/src/runtime/state_helpers.rs +++ b/src/runtime/state_helpers.rs @@ -453,6 +453,8 @@ impl WorkItemReader { orchestration, input, version, + parent_instance, + parent_id, carry_forward_events, .. } => { @@ -470,7 +472,17 @@ impl WorkItemReader { carried.append(&mut completion_messages); completion_messages = carried; - (orchestration.clone(), input.clone(), version.clone(), None, None, true) + // Prefer the parent link carried on the work item. Fall back to + // the previous execution's history for messages enqueued by older + // runtimes that didn't stamp these fields (rolling-upgrade compat). + ( + orchestration.clone(), + input.clone(), + version.clone(), + parent_instance.clone().or_else(|| history_mgr.parent_instance.clone()), + parent_id.or(history_mgr.parent_id), + true, + ) } _ => unreachable!(), } @@ -741,6 +753,8 @@ mod tests { orchestration: "test-orch".to_string(), input: "new-input".to_string(), version: Some("2.0.0".to_string()), + parent_instance: None, + parent_id: None, carry_forward_events: vec![], initial_custom_status: None, }]; @@ -756,6 +770,66 @@ mod tests { assert_eq!(reader.parent_id, None); } + #[test] + fn test_workitem_reader_can_preserves_parent_link_from_work_item() { + // New runtimes stamp the parent link directly onto the ContinueAsNew work item. + let messages = vec![WorkItem::ContinueAsNew { + instance: "child-inst".to_string(), + orchestration: "test-orch".to_string(), + input: "new-input".to_string(), + version: Some("2.0.0".to_string()), + parent_instance: Some("parent-inst".to_string()), + parent_id: Some(42), + carry_forward_events: vec![], + initial_custom_status: None, + }]; + + let history_mgr = HistoryManager::from_history(&[]); + let reader = WorkItemReader::from_messages(&messages, &history_mgr, "child-inst"); + + assert!(reader.is_continue_as_new); + assert_eq!(reader.parent_instance, Some("parent-inst".to_string())); + assert_eq!(reader.parent_id, Some(42)); + } + + #[test] + fn test_workitem_reader_can_preserves_parent_link_from_history() { + // Backcompat: a ContinueAsNew work item enqueued by an older runtime carries no + // parent fields, so the link must be recovered from the previous execution's history. + let messages = vec![WorkItem::ContinueAsNew { + instance: "child-inst".to_string(), + orchestration: "test-orch".to_string(), + input: "new-input".to_string(), + version: Some("2.0.0".to_string()), + parent_instance: None, + parent_id: None, + carry_forward_events: vec![], + initial_custom_status: None, + }]; + let history = vec![Event::with_event_id( + 1, + "child-inst", + 1, + None, + EventKind::OrchestrationStarted { + name: "test-orch".to_string(), + version: "1.0.0".to_string(), + input: "old-input".to_string(), + parent_instance: Some("parent-inst".to_string()), + parent_id: Some(42), + carry_forward_events: None, + initial_custom_status: None, + }, + )]; + + let history_mgr = HistoryManager::from_history(&history); + let reader = WorkItemReader::from_messages(&messages, &history_mgr, "child-inst"); + + assert!(reader.is_continue_as_new); + assert_eq!(reader.parent_instance, Some("parent-inst".to_string())); + assert_eq!(reader.parent_id, Some(42)); + } + #[test] fn test_workitem_reader_completion_only() { let messages = vec![ @@ -980,6 +1054,8 @@ mod tests { orchestration: "orch".to_string(), input: "new".to_string(), version: None, + parent_instance: None, + parent_id: None, carry_forward_events: vec![ ("X".to_string(), "old-x".to_string()), ("Y".to_string(), "old-y".to_string()), diff --git a/tests/e2e_samples.rs b/tests/e2e_samples.rs index d206e24..7d62698 100644 --- a/tests/e2e_samples.rs +++ b/tests/e2e_samples.rs @@ -583,6 +583,56 @@ async fn sample_sub_orchestration_basic_fs() { rt.shutdown(None).await; } +/// Sub-orchestrations: child uses ContinueAsNew before completing. +/// +/// Highlights: +/// - Parent awaits a child orchestration across multiple child executions +/// - Child preserves its parent link when rolling over with ContinueAsNew +#[tokio::test] +async fn sample_sub_orchestration_continue_as_new_fs() { + let (store, _temp_dir) = common::create_sqlite_store_disk().await; + + let activity_registry = ActivityRegistry::builder().build(); + + let loop_child = |ctx: OrchestrationContext, input: String| async move { + let n: u32 = input.parse().unwrap_or(0); + if n < 3 { + return ctx.continue_as_new((n + 1).to_string()).await; + } + + Ok(format!("child-final:{n}")) + }; + let parent = |ctx: OrchestrationContext, input: String| async move { + let r = ctx.schedule_sub_orchestration("LoopChild", input).await.unwrap(); + Ok(format!("parent:{r}")) + }; + + let orchestration_registry = OrchestrationRegistry::builder() + .register("LoopChild", loop_child) + .register("ParentCan", parent) + .build(); + + let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await; + let client = Client::new(store.clone()); + client + .start_orchestration("inst-sub-can", "ParentCan", "0") + .await + .unwrap(); + + match client + .wait_for_orchestration("inst-sub-can", std::time::Duration::from_secs(5)) + .await + .unwrap() + { + runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "parent:child-final:3"), + runtime::OrchestrationStatus::Failed { details, .. } => { + panic!("orchestration failed: {}", details.display_message()) + } + _ => panic!("unexpected orchestration status"), + } + rt.shutdown(None).await; +} + /// Sub-orchestrations: fan-out to multiple children and join. /// /// Highlights: