Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/provider_validation/capability_filtering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ pub async fn test_continue_as_new_execution_gets_own_pinned_version<F: ProviderF
orchestration: "TestOrch".to_string(),
input: "{}".to_string(),
version: Some("1.0.0".to_string()),
parent_instance: None,
parent_id: None,
carry_forward_events: vec![],
initial_custom_status: None,
}],
Expand Down
2 changes: 2 additions & 0 deletions src/provider_validation/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2381,6 +2381,8 @@ async fn continue_as_new(provider: &dyn crate::providers::Provider, instance: &s
orchestration: "TestOrch".to_string(),
input: "{}".to_string(),
version: Some("1.0.0".to_string()),
parent_instance: None,
parent_id: None,
carry_forward_events: vec![],
initial_custom_status: None,
};
Expand Down
2 changes: 2 additions & 0 deletions src/provider_validation/multi_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ pub async fn test_continue_as_new_creates_new_execution<F: ProviderFactory>(fact
orchestration: "TestOrch".to_string(),
input: "new-input".to_string(),
version: Some("1.0.0".to_string()),
parent_instance: None,
parent_id: None,
carry_forward_events: vec![],
initial_custom_status: None,
},
Expand Down
4 changes: 4 additions & 0 deletions src/provider_validation/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ async fn create_running_multi_execution_instance(
orchestration: "LongRunning".to_string(),
input: "{}".to_string(),
version: Some("1.0.0".to_string()),
parent_instance: None,
parent_id: None,
carry_forward_events: vec![],
initial_custom_status: None,
}
Expand Down Expand Up @@ -368,6 +370,8 @@ async fn create_multi_execution_instance(
orchestration: "TestOrch".to_string(),
input: "{}".to_string(),
version: Some("1.0.0".to_string()),
parent_instance: None,
parent_id: None,
carry_forward_events: vec![],
initial_custom_status: None,
}
Expand Down
16 changes: 16 additions & 0 deletions src/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,22 @@ pub enum WorkItem {
orchestration: String,
input: String,
version: Option<String>,
/// Parent instance ID, preserved across continue-as-new so a logical
/// sub-orchestration can still notify its parent when a later execution
/// completes or fails.
///
/// Optional for backward compatibility: messages enqueued by older
/// runtimes won't carry this, in which case the runtime falls back to
/// deriving the parent link from the previous execution's history.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
parent_instance: Option<String>,
/// Parent's `SubOrchestrationScheduled` event id, preserved across
/// continue-as-new alongside `parent_instance`. Optional for the same
/// backward-compatibility reason described above.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
parent_id: Option<u64>,
/// Persistent events carried forward from the previous execution.
/// These are seeded into the new execution's history before any new
/// externally-raised events, preserving FIFO order across CAN boundaries.
Expand Down
8 changes: 6 additions & 2 deletions src/runtime/dispatchers/orchestration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1308,13 +1308,17 @@ impl Runtime {
WorkItem::ContinueAsNew {
orchestration,
input,
parent_instance,
parent_id,
carry_forward_events,
..
} => Some((
orchestration.clone(),
input.clone(),
None,
None,
// Prefer the parent link carried on the work item; fall back to the
// previous execution's history for messages enqueued by older runtimes.
parent_instance.clone().or_else(|| history_mgr.parent_instance.clone()),
parent_id.or(history_mgr.parent_id),
Some(carry_forward_events.clone()),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we wire this into WorkItem::ContinueAsNew event directly instead of pulling from the history?
Needs to be backcompat though.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 4f9d29c. The parent link (parent_instance/parent_id) is now stamped directly onto WorkItem::ContinueAsNew by the CAN producer in execution.rs, and the consumers read it from the work item.

For backcompat, both new fields are #[serde(default)] + skip_serializing_if, so old enqueued messages deserialize cleanly and the wire format is unchanged. To stay correct during rolling upgrades (a CAN enqueued by an older node that didn't stamp the fields), the consumers fall back to deriving the link from the previous execution's history when the fields are absent. Added test_workitem_reader_can_preserves_parent_link_from_work_item for the direct path and kept ..._from_history for the fallback path.

)),
_ => None,
Expand Down
2 changes: 2 additions & 0 deletions src/runtime/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ impl Runtime {
orchestration: orchestration_name.to_string(),
input: input.clone(),
version: version.clone(),
parent_instance: history_mgr.parent_instance.clone(),
parent_id: history_mgr.parent_id,
carry_forward_events: unmatched,
initial_custom_status,
});
Expand Down
78 changes: 77 additions & 1 deletion src/runtime/state_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ impl WorkItemReader {
orchestration,
input,
version,
parent_instance,
parent_id,
carry_forward_events,
..
} => {
Expand All @@ -470,7 +472,17 @@ impl WorkItemReader {
carried.append(&mut completion_messages);
completion_messages = carried;

(orchestration.clone(), input.clone(), version.clone(), None, None, true)
// Prefer the parent link carried on the work item. Fall back to
// the previous execution's history for messages enqueued by older
// runtimes that didn't stamp these fields (rolling-upgrade compat).
(
orchestration.clone(),
input.clone(),
version.clone(),
parent_instance.clone().or_else(|| history_mgr.parent_instance.clone()),
parent_id.or(history_mgr.parent_id),
true,
)
}
_ => unreachable!(),
}
Expand Down Expand Up @@ -741,6 +753,8 @@ mod tests {
orchestration: "test-orch".to_string(),
input: "new-input".to_string(),
version: Some("2.0.0".to_string()),
parent_instance: None,
parent_id: None,
carry_forward_events: vec![],
initial_custom_status: None,
}];
Expand All @@ -756,6 +770,66 @@ mod tests {
assert_eq!(reader.parent_id, None);
}

#[test]
fn test_workitem_reader_can_preserves_parent_link_from_work_item() {
// New runtimes stamp the parent link directly onto the ContinueAsNew work item.
let messages = vec![WorkItem::ContinueAsNew {
instance: "child-inst".to_string(),
orchestration: "test-orch".to_string(),
input: "new-input".to_string(),
version: Some("2.0.0".to_string()),
parent_instance: Some("parent-inst".to_string()),
parent_id: Some(42),
carry_forward_events: vec![],
initial_custom_status: None,
}];

let history_mgr = HistoryManager::from_history(&[]);
let reader = WorkItemReader::from_messages(&messages, &history_mgr, "child-inst");

assert!(reader.is_continue_as_new);
assert_eq!(reader.parent_instance, Some("parent-inst".to_string()));
assert_eq!(reader.parent_id, Some(42));
}

#[test]
fn test_workitem_reader_can_preserves_parent_link_from_history() {
// Backcompat: a ContinueAsNew work item enqueued by an older runtime carries no
// parent fields, so the link must be recovered from the previous execution's history.
let messages = vec![WorkItem::ContinueAsNew {
instance: "child-inst".to_string(),
orchestration: "test-orch".to_string(),
input: "new-input".to_string(),
version: Some("2.0.0".to_string()),
parent_instance: None,
parent_id: None,
carry_forward_events: vec![],
initial_custom_status: None,
}];
let history = vec![Event::with_event_id(
1,
"child-inst",
1,
None,
EventKind::OrchestrationStarted {
name: "test-orch".to_string(),
version: "1.0.0".to_string(),
input: "old-input".to_string(),
parent_instance: Some("parent-inst".to_string()),
parent_id: Some(42),
carry_forward_events: None,
initial_custom_status: None,
},
)];

let history_mgr = HistoryManager::from_history(&history);
let reader = WorkItemReader::from_messages(&messages, &history_mgr, "child-inst");

assert!(reader.is_continue_as_new);
assert_eq!(reader.parent_instance, Some("parent-inst".to_string()));
assert_eq!(reader.parent_id, Some(42));
}

#[test]
fn test_workitem_reader_completion_only() {
let messages = vec![
Expand Down Expand Up @@ -980,6 +1054,8 @@ mod tests {
orchestration: "orch".to_string(),
input: "new".to_string(),
version: None,
parent_instance: None,
parent_id: None,
carry_forward_events: vec![
("X".to_string(), "old-x".to_string()),
("Y".to_string(), "old-y".to_string()),
Expand Down
50 changes: 50 additions & 0 deletions tests/e2e_samples.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,56 @@ async fn sample_sub_orchestration_basic_fs() {
rt.shutdown(None).await;
}

/// Sub-orchestrations: child uses ContinueAsNew before completing.
///
/// Highlights:
/// - Parent awaits a child orchestration across multiple child executions
/// - Child preserves its parent link when rolling over with ContinueAsNew
#[tokio::test]
async fn sample_sub_orchestration_continue_as_new_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;

let activity_registry = ActivityRegistry::builder().build();

let loop_child = |ctx: OrchestrationContext, input: String| async move {
let n: u32 = input.parse().unwrap_or(0);
if n < 3 {
return ctx.continue_as_new((n + 1).to_string()).await;
}

Ok(format!("child-final:{n}"))
};
let parent = |ctx: OrchestrationContext, input: String| async move {
let r = ctx.schedule_sub_orchestration("LoopChild", input).await.unwrap();
Ok(format!("parent:{r}"))
};

let orchestration_registry = OrchestrationRegistry::builder()
.register("LoopChild", loop_child)
.register("ParentCan", parent)
.build();

let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sub-can", "ParentCan", "0")
.await
.unwrap();

match client
.wait_for_orchestration("inst-sub-can", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "parent:child-final:3"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
}

/// Sub-orchestrations: fan-out to multiple children and join.
///
/// Highlights:
Expand Down
Loading