diff --git a/CHANGELOG.md b/CHANGELOG.md index efe9995..56af077 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- Reserved the `sub::` marker for runtime-generated sub-orchestration instance ids. + `Client::start_orchestration` and `Client::start_orchestration_versioned` now + return `ClientError::InvalidInput` for root instance ids that start with `sub::` + or contain `::sub::`; other uses of `::` remain supported. Applications that used + the reserved marker in root instance ids must rename those ids before upgrading. + See [docs/migration-guide.md](docs/migration-guide.md) for guidance. - **`ctx.new_guid()` now returns a standard UUID v4.** The previous implementation derived the value from `SystemTime::now()` nanoseconds plus a thread-local counter, which produced low-entropy, structured values (the @@ -19,6 +25,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 `nanos + process id`, removing a predictable-token pattern in work-item ownership checks. +### Fixed + +- **Parent hang on sub-orchestration instance-id collision** — When an auto-generated + child instance id already named a terminal instance, the scheduling parent could await + a completion that never arrived. The runtime now notifies the parent with a + sub-orchestration failure so it fails fast. The parent execution that scheduled the child + is stamped onto the child start at schedule time and persisted in the child's + `OrchestrationStarted` event, so the failure (and all sub-orchestration completion/failure + notifications) is routed to exactly that parent execution. This is correct across runtime + restarts and multiple dispatcher nodes, and avoids a TOCTOU window where the parent's + *current* execution at completion time could differ from the execution that scheduled the + child. When the stamp is absent (children started by an older runtime, or work items from + before this change), routing falls back to a durable provider read, keeping mixed-version + clusters correct during rolling upgrades. +- **Sub-orchestration id reuse across continue-as-new** — Child instance ids generated + after a parent `continue_as_new` now include the parent execution id + (`{parent}::sub::{execution_id}_{event_id}`), preventing collisions with the terminal + child of a previous iteration that schedules at the same position. + ## [0.1.29] - 2026-05-08 **Release:** diff --git a/docs/migration-guide.md b/docs/migration-guide.md index c73e826..94dff6b 100644 --- a/docs/migration-guide.md +++ b/docs/migration-guide.md @@ -2,6 +2,58 @@ This guide helps you migrate between Duroxide versions and handle orchestration versioning. +## Reserved `sub::` instance-id marker (Unreleased) + +The `sub::` marker is now reserved for runtime-generated sub-orchestration instance ids. +`Client::start_orchestration` and `Client::start_orchestration_versioned` reject root +instance ids that: + +- start with `sub::`, or +- contain the `::sub::` infix. + +Such ids return `ClientError::InvalidInput`. Ordinary uses of `::` in instance ids remain +valid (e.g. `tenant-7::order-42`); only the `sub::` marker is reserved. + +This prevents a root instance id from pre-occupying an auto-generated child id. Child +sub-orchestration ids take the form `{parent}::sub::{event_id}` on the first parent +execution and `{parent}::sub::{execution_id}_{event_id}` after `continue_as_new`. + +Before upgrading client code, audit your root instance-id scheme for the reserved marker: + +```text +# Reject — start with `sub::` or contain `::sub::` +sub::job-1 +tenant-7::sub::order-42 + +# Accept — ordinary `::` is fine +tenant-7::order-42 +order-2026-06-09 +``` + +Rename any root instance ids that use the reserved marker before upgrading. + +## Durable sub-orchestration routing (`parent_execution_id`) + +Sub-orchestration completion and failure notifications are now routed to the exact parent +execution that scheduled the child. To do this, the scheduling parent's execution id is +stamped onto the child's start and persisted in the child's history: + +- `WorkItem::StartOrchestration` gains an optional `parent_execution_id` field. +- `EventKind::OrchestrationStarted` gains an optional `parent_execution_id` field. + +Both fields are `Option`, serialized with `#[serde(default, skip_serializing_if = "Option::is_none")]`, +so the wire and history formats remain backward compatible: + +- **Old → new:** A new runtime reading an old child history (or an old work item) sees + `parent_execution_id = None` and falls back to a durable provider read of the parent's + current execution — the previous behavior. +- **New → old:** An old runtime ignores the extra field (it is skipped when absent and not + required when deserializing). + +No action is required to upgrade. Mixed-version clusters route correctly during a rolling +upgrade. The provider-read fallback is retained only for histories/work items created before +this change. + ## Orchestration Versioning Duroxide supports versioning to handle code evolution while maintaining compatibility with running instances. diff --git a/src/client/mod.rs b/src/client/mod.rs index d24ebce..0edcb87 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -168,6 +168,25 @@ pub struct Client { store: Arc, } +/// Reject instance ids that collide with the reserved sub-orchestration markers. +/// +/// Child sub-orchestration instance ids reserve the `sub::` marker (see +/// [`crate::auto_sub_orch_suffix`], the canonical formatter). The first parent +/// execution uses `{parent}::sub::{event_id}`; executions after continue-as-new use +/// `{parent}::sub::{execution_id}_{event_id}`. A user-supplied id matching either form +/// could pre-occupy a future child id, so the `sub::` prefix and `::sub::` infix are +/// reserved. Other uses of `::` remain valid. +fn validate_instance_id(instance: &str) -> Result<(), ClientError> { + if instance.starts_with(crate::SUB_ORCH_AUTO_PREFIX) || instance.contains("::sub::") { + return Err(ClientError::InvalidInput { + message: format!( + "instance id '{instance}' uses the reserved sub-orchestration marker 'sub::'" + ), + }); + } + Ok(()) +} + impl Client { /// Create a client bound to a Provider instance. /// @@ -211,6 +230,9 @@ impl Client { /// - Must be unique across all orchestrations /// - Can be any string (alphanumeric + hyphens recommended) /// - Reusing an instance ID that already exists will fail + /// - Must not use the reserved sub-orchestration marker `sub::` (as a prefix + /// or in the `::sub::` form); these are reserved for auto-generated child + /// instance ids. Such ids are rejected with [`ClientError::InvalidInput`]. /// /// # Example /// @@ -230,6 +252,8 @@ impl Client { /// /// # Errors /// + /// Returns `ClientError::InvalidInput` if the instance id uses the reserved + /// `sub::` marker. /// Returns `ClientError::Provider` if the provider fails to enqueue the orchestration. pub async fn start_orchestration( &self, @@ -237,13 +261,16 @@ impl Client { orchestration: impl Into, input: impl Into, ) -> Result<(), ClientError> { + let instance = instance.into(); + validate_instance_id(&instance)?; let item = WorkItem::StartOrchestration { - instance: instance.into(), + instance, orchestration: orchestration.into(), input: input.into(), version: None, parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; self.store @@ -256,6 +283,8 @@ impl Client { /// /// # Errors /// + /// Returns `ClientError::InvalidInput` if the instance id uses the reserved + /// `sub::` marker. /// Returns `ClientError::Provider` if the provider fails to enqueue the orchestration. pub async fn start_orchestration_versioned( &self, @@ -264,13 +293,16 @@ impl Client { version: impl Into, input: impl Into, ) -> Result<(), ClientError> { + let instance = instance.into(); + validate_instance_id(&instance)?; let item = WorkItem::StartOrchestration { - instance: instance.into(), + instance, orchestration: orchestration.into(), input: input.into(), version: Some(version.into()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; self.store diff --git a/src/lib.rs b/src/lib.rs index 2c39f77..20ffbb6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -873,6 +873,24 @@ pub fn is_auto_generated_sub_orch_id(instance: &str) -> bool { instance.starts_with(SUB_ORCH_AUTO_PREFIX) } +/// Build the auto-generated sub-orchestration suffix for a given parent execution +/// and scheduling event id. +/// +/// The first execution uses `sub::{event_id}` for backward compatibility. Later +/// executions (after `continue_as_new`) include the execution id as +/// `sub::{execution_id}_{event_id}`: event ids reset on continue-as-new, so a parent +/// that schedules a sub-orchestration at the same position on each iteration would +/// otherwise regenerate an identical child id and collide with the now-terminal +/// child from the previous iteration. +#[inline] +pub(crate) fn auto_sub_orch_suffix(execution_id: u64, event_id: u64) -> String { + if execution_id == INITIAL_EXECUTION_ID { + format!("{SUB_ORCH_AUTO_PREFIX}{event_id}") + } else { + format!("{SUB_ORCH_AUTO_PREFIX}{execution_id}_{event_id}") + } +} + /// Build the full child instance ID, adding parent prefix only for auto-generated IDs. /// /// - Auto-generated IDs (starting with "sub::"): `{parent}::{child}` (e.g., `parent-1::sub::5`) @@ -1099,6 +1117,13 @@ pub enum EventKind { input: String, parent_instance: Option, parent_id: Option, + /// Execution id of the parent that scheduled this sub-orchestration, persisted at + /// child-start time. Used to route this child's completion/failure back to the + /// exact parent execution that awaited it. `None` for root orchestrations and for + /// children started by older runtimes (routing then falls back to a provider read). + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + parent_execution_id: Option, /// Persistent events carried forward from the previous execution during continue-as-new. /// Present only on CAN-initiated executions for audit trail. Each tuple is (event_name, data). #[serde(skip_serializing_if = "Option::is_none")] @@ -3732,7 +3757,22 @@ impl OrchestrationContext { /// without any parent prefix. Use this when you need to control the exact /// instance ID for the sub-orchestration. /// - /// For auto-generated instance IDs, use [`schedule_sub_orchestration`] instead. + /// For auto-generated instance IDs, use [`schedule_sub_orchestration`](Self::schedule_sub_orchestration) + /// instead. + /// + /// # Reserved marker (advanced escape hatch) + /// + /// Unlike [`crate::Client::start_orchestration`], explicit child ids are **not** + /// validated against the reserved `sub::` marker — they are an advanced escape hatch + /// where the caller owns the full id space. Two consequences to be aware of: + /// + /// - An explicit id of the runtime-generated shape (e.g. `parent::sub::2`) is allowed + /// and may therefore collide with an auto-generated child id. The runtime defends + /// against the resulting collision: if the id already names a terminal instance the + /// scheduling parent receives a sub-orchestration failure instead of hanging. + /// - An explicit id that *starts with* `sub::` is treated as auto-generated by + /// [`crate::build_child_instance_id`] and therefore gets the parent prefix added, + /// so it is **not** used verbatim. Avoid leading `sub::` in explicit ids. pub fn schedule_sub_orchestration_with_id( &self, name: impl Into, @@ -3760,6 +3800,10 @@ impl OrchestrationContext { /// The provided `instance` value is used exactly as the child instance ID, /// without any parent prefix. /// + /// Like [`schedule_sub_orchestration_with_id`](Self::schedule_sub_orchestration_with_id), + /// explicit child ids are an advanced escape hatch and are **not** validated against the + /// reserved `sub::` marker; see that method for the collision and leading-`sub::` caveats. + /// /// Returns a [`DurableFuture`] that supports cancellation on drop. If the future /// is dropped without completing, a `CancelInstance` work item will be enqueued /// for the child orchestration. diff --git a/src/provider_validation/atomicity.rs b/src/provider_validation/atomicity.rs index 794e0ad..da335ee 100644 --- a/src/provider_validation/atomicity.rs +++ b/src/provider_validation/atomicity.rs @@ -37,6 +37,7 @@ pub async fn test_atomicity_failure_rollback(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -80,6 +81,7 @@ pub async fn test_atomicity_failure_rollback(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -142,6 +144,7 @@ pub async fn test_multi_operation_atomic_ack(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -327,6 +330,7 @@ pub async fn test_lock_released_only_on_successful_ack(facto input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -402,6 +406,7 @@ pub async fn test_concurrent_ack_prevention(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -430,6 +435,7 @@ pub async fn test_concurrent_ack_prevention(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/bulk_deletion.rs b/src/provider_validation/bulk_deletion.rs index e092ee2..d36789d 100644 --- a/src/provider_validation/bulk_deletion.rs +++ b/src/provider_validation/bulk_deletion.rs @@ -351,6 +351,7 @@ async fn create_completed_instance_with_parent( version: Some("1.0.0".to_string()), parent_instance: Some(parent_id.to_string()), parent_id: Some(1), + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, }, Some(parent_id.to_string()), @@ -378,6 +379,7 @@ async fn create_completed_instance_with_parent( input: "{}".to_string(), parent_instance: parent_instance_id.clone(), parent_id: if parent_instance_id.is_some() { Some(1) } else { None }, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/cancellation.rs b/src/provider_validation/cancellation.rs index 86a9866..ad502b3 100644 --- a/src/provider_validation/cancellation.rs +++ b/src/provider_validation/cancellation.rs @@ -54,6 +54,7 @@ pub async fn test_fetch_returns_running_state_for_active_orchestration WorkItem { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, } } @@ -39,6 +40,7 @@ fn orchestration_started_event(instance: &str, duroxide_version: &str) -> Event input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/deletion.rs b/src/provider_validation/deletion.rs index 312edc1..9ee5bc5 100644 --- a/src/provider_validation/deletion.rs +++ b/src/provider_validation/deletion.rs @@ -194,6 +194,7 @@ pub async fn test_delete_cleans_queues_and_locks(factory: &F input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -228,6 +229,7 @@ pub async fn test_delete_cleans_queues_and_locks(factory: &F input: new_input.to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: 1, }, None, @@ -262,6 +264,7 @@ pub async fn test_delete_cleans_queues_and_locks(factory: &F input: new_input.to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -421,6 +424,7 @@ pub async fn test_force_delete_prevents_ack_recreation(facto input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -519,6 +523,7 @@ async fn create_completed_instance_with_parent( version: Some("1.0.0".to_string()), parent_instance: Some(parent.to_string()), parent_id: Some(1), + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, } } else { @@ -546,6 +551,7 @@ async fn create_completed_instance_with_parent( input: "{}".to_string(), parent_instance: parent_id.map(|s| s.to_string()), parent_id: parent_id.map(|_| 1), + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -592,6 +598,7 @@ async fn create_failed_instance(provider: &dyn crate::providers::Provider, insta input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -636,6 +643,7 @@ async fn create_cancelled_instance(provider: &dyn crate::providers::Provider, in input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -1002,6 +1010,7 @@ pub async fn test_stale_activity_after_delete_recreate(facto input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -1084,6 +1093,7 @@ pub async fn test_stale_activity_after_delete_recreate(facto input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -1219,6 +1229,7 @@ async fn create_child_instance(provider: &dyn crate::providers::Provider, instan input: "{}".to_string(), parent_instance: Some(parent_id.to_string()), parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/error_handling.rs b/src/provider_validation/error_handling.rs index c14623f..00b7aa1 100644 --- a/src/provider_validation/error_handling.rs +++ b/src/provider_validation/error_handling.rs @@ -61,6 +61,7 @@ pub async fn test_duplicate_event_id_rejection(factory: &F) input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -183,6 +184,7 @@ pub async fn test_lock_expiration_during_ack(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/instance_creation.rs b/src/provider_validation/instance_creation.rs index 367b447..b9a6e8f 100644 --- a/src/provider_validation/instance_creation.rs +++ b/src/provider_validation/instance_creation.rs @@ -49,6 +49,7 @@ pub async fn test_instance_creation_via_metadata(factory: &F input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -139,6 +140,7 @@ pub async fn test_no_instance_creation_on_enqueue(factory: & input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -172,6 +174,7 @@ pub async fn test_null_version_handling(factory: &F) { version: None, // No version provided parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: 1, }; @@ -207,6 +210,7 @@ pub async fn test_null_version_handling(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -279,6 +283,7 @@ pub async fn test_sub_orchestration_instance_creation(factor input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -291,6 +296,7 @@ pub async fn test_sub_orchestration_instance_creation(factor version: None, parent_instance: Some("parent-instance".to_string()), parent_id: Some(1), + parent_execution_id: None, execution_id: 1, }], ExecutionMetadata { @@ -329,6 +335,7 @@ pub async fn test_sub_orchestration_instance_creation(factor input: "{}".to_string(), parent_instance: Some("parent-instance".to_string()), parent_id: Some(1), + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -355,6 +362,7 @@ pub async fn test_sub_orchestration_instance_creation(factor version: None, parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: 1, }, None, diff --git a/src/provider_validation/kv_store.rs b/src/provider_validation/kv_store.rs index 7fec4cc..21497fc 100644 --- a/src/provider_validation/kv_store.rs +++ b/src/provider_validation/kv_store.rs @@ -1468,6 +1468,7 @@ pub async fn test_kv_delete_instance_with_children(factory: input: "{}".to_string(), parent_instance: Some("kv-parent".to_string()), parent_id: Some(1), + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; provider.enqueue_for_orchestrator(child_start, None).await.unwrap(); @@ -1493,6 +1494,7 @@ pub async fn test_kv_delete_instance_with_children(factory: input: "{}".to_string(), parent_instance: Some("kv-parent".to_string()), parent_id: Some(1), + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -2405,6 +2407,7 @@ async fn continue_as_new(provider: &dyn crate::providers::Provider, instance: &s input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/lock_expiration.rs b/src/provider_validation/lock_expiration.rs index f3c6ec3..0c7d37e 100644 --- a/src/provider_validation/lock_expiration.rs +++ b/src/provider_validation/lock_expiration.rs @@ -164,6 +164,7 @@ pub async fn test_lock_renewal_on_ack(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -404,6 +405,7 @@ pub async fn test_worker_lock_renewal_extends_timeout(factor input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/management.rs b/src/provider_validation/management.rs index 9a42b62..9c7abf4 100644 --- a/src/provider_validation/management.rs +++ b/src/provider_validation/management.rs @@ -63,6 +63,7 @@ pub async fn test_list_instances_by_status(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -121,6 +122,7 @@ pub async fn test_list_executions(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -170,6 +172,7 @@ pub async fn test_get_instance_info(factory: &F) { input: "test".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -226,6 +229,7 @@ pub async fn test_get_execution_info(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -285,6 +289,7 @@ pub async fn test_get_system_metrics(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -497,6 +502,7 @@ pub async fn test_get_instance_stats_carry_forward(factory: input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: Some(vec![ ("raised-1".to_string(), r#"{"data":"a"}"#.to_string()), ("raised-2".to_string(), r#"{"data":"b"}"#.to_string()), diff --git a/src/provider_validation/mod.rs b/src/provider_validation/mod.rs index 72a1182..5cee692 100644 --- a/src/provider_validation/mod.rs +++ b/src/provider_validation/mod.rs @@ -65,6 +65,7 @@ pub(crate) fn start_item(instance: &str) -> WorkItem { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, } } @@ -98,6 +99,7 @@ pub(crate) async fn create_instance(provider: &dyn crate::providers::Provider, i input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -136,6 +138,7 @@ pub(crate) async fn create_instance_with_parent( input: "{}".to_string(), parent_instance: parent_instance_id, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; diff --git a/src/provider_validation/multi_execution.rs b/src/provider_validation/multi_execution.rs index b3bdc4e..db82c71 100644 --- a/src/provider_validation/multi_execution.rs +++ b/src/provider_validation/multi_execution.rs @@ -12,6 +12,7 @@ fn start_item_with_execution(instance: &str, execution_id: u64) -> WorkItem { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id, } } @@ -48,6 +49,7 @@ pub async fn test_execution_isolation(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -108,6 +110,7 @@ pub async fn test_execution_isolation(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -279,6 +282,7 @@ pub async fn test_execution_id_sequencing(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -322,6 +326,7 @@ pub async fn test_execution_id_sequencing(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -383,6 +388,7 @@ pub async fn test_continue_as_new_creates_new_execution(fact input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -433,6 +439,7 @@ pub async fn test_continue_as_new_creates_new_execution(fact input: "new-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/poison_message.rs b/src/provider_validation/poison_message.rs index 1a1d0e7..a85232a 100644 --- a/src/provider_validation/poison_message.rs +++ b/src/provider_validation/poison_message.rs @@ -26,6 +26,7 @@ pub async fn orchestration_attempt_count_starts_at_one(factory: &dyn ProviderFac version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, }, None, @@ -75,6 +76,7 @@ pub async fn orchestration_attempt_count_increments_on_refetch(factory: &dyn Pro version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, }, None, @@ -420,6 +422,7 @@ pub async fn abandon_orchestration_item_ignore_attempt_decrements(factory: &dyn version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, }, None, @@ -579,6 +582,7 @@ pub async fn max_attempt_count_across_message_batch(factory: &dyn ProviderFactor version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, }, None, diff --git a/src/provider_validation/prune.rs b/src/provider_validation/prune.rs index af4e8bd..cf8349f 100644 --- a/src/provider_validation/prune.rs +++ b/src/provider_validation/prune.rs @@ -328,6 +328,7 @@ async fn create_running_multi_execution_instance( input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -393,6 +394,7 @@ async fn create_multi_execution_instance( input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/queue_semantics.rs b/src/provider_validation/queue_semantics.rs index c69acbc..e6aaef8 100644 --- a/src/provider_validation/queue_semantics.rs +++ b/src/provider_validation/queue_semantics.rs @@ -145,6 +145,7 @@ pub async fn test_worker_ack_atomicity(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -248,6 +249,7 @@ pub async fn test_timer_delayed_visibility(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/tag_filtering.rs b/src/provider_validation/tag_filtering.rs index a72d695..14696b1 100644 --- a/src/provider_validation/tag_filtering.rs +++ b/src/provider_validation/tag_filtering.rs @@ -581,6 +581,7 @@ pub async fn test_tag_preserved_through_ack_orchestration_item(factory: &dyn Pro input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/providers/mod.rs b/src/providers/mod.rs index f1315cd..c7f5bd4 100644 --- a/src/providers/mod.rs +++ b/src/providers/mod.rs @@ -440,6 +440,13 @@ pub enum WorkItem { version: Option, parent_instance: Option, parent_id: Option, + /// Execution id of the parent that scheduled this sub-orchestration, stamped at + /// schedule time so completion/failure notifications route back to the exact + /// parent execution that awaited the child (rather than the parent's current + /// execution at completion time). `None` for top-level starts and for work items + /// produced by older runtimes; routing then falls back to a durable provider read. + #[serde(default, skip_serializing_if = "Option::is_none")] + parent_execution_id: Option, execution_id: u64, }, diff --git a/src/providers/sqlite.rs b/src/providers/sqlite.rs index eda3c3a..fbaa2d8 100644 --- a/src/providers/sqlite.rs +++ b/src/providers/sqlite.rs @@ -3437,6 +3437,7 @@ mod tests { input: input.to_string(), parent_instance: parent_instance.map(|s| s.to_string()), parent_id, + parent_execution_id: None, execution_id: next_execution_id, }, None, @@ -3463,6 +3464,7 @@ mod tests { input: input.to_string(), parent_instance: parent_instance.map(|s| s.to_string()), parent_id, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -3499,6 +3501,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; @@ -3529,6 +3532,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -3574,6 +3578,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; @@ -3598,6 +3603,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -3731,6 +3737,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; @@ -3887,6 +3894,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; store.enqueue_for_orchestrator(item, None).await.unwrap(); @@ -4017,6 +4025,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; @@ -4073,6 +4082,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; @@ -4152,6 +4162,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; @@ -4205,6 +4216,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: 1, }; store.enqueue_for_orchestrator(start_item, None).await.unwrap(); @@ -4228,6 +4240,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -4414,6 +4427,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; store.enqueue_for_orchestrator(item, None).await.unwrap(); @@ -4516,6 +4530,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; store.enqueue_for_orchestrator(item, None).await.unwrap(); diff --git a/src/runtime/dispatchers/orchestration.rs b/src/runtime/dispatchers/orchestration.rs index 72488ab..e0ef360 100644 --- a/src/runtime/dispatchers/orchestration.rs +++ b/src/runtime/dispatchers/orchestration.rs @@ -593,13 +593,18 @@ impl Runtime { || (temp_history_mgr.is_continued_as_new && !workitem_reader.is_continue_as_new) { warn!(instance = %instance, "Instance is terminal (completed/failed or CAN without start), acking batch without processing"); + // If a StartOrchestration in this discarded batch is a sub-orchestration whose + // parent differs from this instance's own recorded parent, the instance id was + // reused for an unrelated child. Notify that parent with a SubOrchFailed so it + // fails fast instead of awaiting a completion that will never arrive. + let orchestrator_items = self.terminal_collision_notifications(&item).await; let _ = self .ack_orchestration_with_changes( lock_token, item.execution_id, vec![], vec![], - vec![], + orchestrator_items, ExecutionMetadata::default(), vec![], // cancelled_activities - none for terminal instances ) @@ -1086,6 +1091,7 @@ impl Runtime { input: workitem_reader.input.clone(), parent_instance: workitem_reader.parent_instance.clone(), parent_id: workitem_reader.parent_id, + parent_execution_id: workitem_reader.parent_execution_id, carry_forward_events, initial_custom_status, }, @@ -1280,47 +1286,50 @@ impl Runtime { let mut history_mgr = HistoryManager::from_history(&item.history); // Track parent info for sub-orchestration failure notification - let mut parent_link: Option<(String, u64)> = None; + let mut parent_link: Option<(String, Option, u64)> = None; // If history is empty, we need to create an OrchestrationStarted event first if history_mgr.is_empty() { // Try to extract orchestration name from work items - let (orchestration_name, input, parent_instance, parent_id, carry_forward_events) = item - .messages - .iter() - .find_map(|msg| match msg { - WorkItem::StartOrchestration { - orchestration, - input, - parent_instance, - parent_id, - .. - } => Some(( - orchestration.clone(), - input.clone(), - parent_instance.clone(), - *parent_id, - None, - )), - WorkItem::ContinueAsNew { - orchestration, - input, - carry_forward_events, - .. - } => Some(( - orchestration.clone(), - input.clone(), - None, - None, - Some(carry_forward_events.clone()), - )), - _ => None, - }) - .unwrap_or_else(|| (item.orchestration_name.clone(), String::new(), None, None, None)); + let (orchestration_name, input, parent_instance, parent_id, parent_execution_id, carry_forward_events) = + item.messages + .iter() + .find_map(|msg| match msg { + WorkItem::StartOrchestration { + orchestration, + input, + parent_instance, + parent_id, + parent_execution_id, + .. + } => Some(( + orchestration.clone(), + input.clone(), + parent_instance.clone(), + *parent_id, + *parent_execution_id, + None, + )), + WorkItem::ContinueAsNew { + orchestration, + input, + carry_forward_events, + .. + } => Some(( + orchestration.clone(), + input.clone(), + None, + None, + None, + Some(carry_forward_events.clone()), + )), + _ => None, + }) + .unwrap_or_else(|| (item.orchestration_name.clone(), String::new(), None, None, None, None)); // Save parent link for notification if let (Some(pi), Some(pid)) = (&parent_instance, parent_id) { - parent_link = Some((pi.clone(), pid)); + parent_link = Some((pi.clone(), parent_execution_id, pid)); } history_mgr.append(Event::with_event_id( @@ -1334,6 +1343,7 @@ impl Runtime { input, parent_instance, parent_id, + parent_execution_id, carry_forward_events, initial_custom_status: None, }, @@ -1344,10 +1354,11 @@ impl Runtime { if let EventKind::OrchestrationStarted { parent_instance: Some(pi), parent_id: Some(pid), + parent_execution_id, .. } = &event.kind { - parent_link = Some((pi.clone(), *pid)); + parent_link = Some((pi.clone(), *parent_execution_id, *pid)); break; } } @@ -1360,12 +1371,12 @@ impl Runtime { output: Some(error.display_message()), orchestration_name: Some(item.orchestration_name.clone()), orchestration_version: Some(item.version.clone()), - parent_instance_id: parent_link.as_ref().map(|(pi, _)| pi.clone()), + parent_instance_id: parent_link.as_ref().map(|(pi, _, _)| pi.clone()), pinned_duroxide_version: None, // Poison path — version already set at creation }; // If this is a sub-orchestration, notify parent of failure - let orchestrator_items = if let Some((parent_instance, parent_id)) = parent_link { + let orchestrator_items = if let Some((parent_instance, parent_execution_id, parent_id)) = parent_link { tracing::debug!( target = "duroxide::runtime::execution", instance = %item.instance, @@ -1373,7 +1384,7 @@ impl Runtime { parent_id = %parent_id, "Enqueue SubOrchFailed to parent (poison)" ); - let parent_execution_id = self.get_execution_id_for_instance(&parent_instance, None).await; + let parent_execution_id = self.resolve_parent_execution_id(&parent_instance, parent_execution_id).await; vec![WorkItem::SubOrchFailed { parent_instance, parent_execution_id, @@ -1399,4 +1410,69 @@ impl Runtime { // Record metrics for poison detection self.record_orchestration_poison(); } + + /// Build `SubOrchFailed` notifications for a terminal instance that received a + /// `StartOrchestration` belonging to a different parent. + /// + /// Sub-orchestration child instance ids reserve the `sub::` marker (see + /// [`crate::auto_sub_orch_suffix`]): the first parent execution uses + /// `{parent}::sub::{event_id}` and executions after continue-as-new use + /// `{parent}::sub::{execution_id}_{event_id}`. If such an id already names a terminal + /// instance, the incoming `StartOrchestration` is discarded by the terminal fast-ack + /// path. Without this notification the scheduling parent would await a completion + /// forever. We only notify when the incoming work item's parent differs from the + /// terminal instance's own recorded parent, so genuine redelivery of a completed + /// child's start (parent already notified) does not spuriously fail the parent again. + async fn terminal_collision_notifications(&self, item: &crate::providers::OrchestrationItem) -> Vec { + // The terminal instance's own parent, as recorded in its history. + let own_parent = item.history.iter().find_map(|e| match &e.kind { + EventKind::OrchestrationStarted { + parent_instance: Some(pi), + parent_id: Some(pid), + .. + } => Some((pi.clone(), *pid)), + _ => None, + }); + + let mut notifications = Vec::new(); + for msg in &item.messages { + if let WorkItem::StartOrchestration { + parent_instance: Some(parent_instance), + parent_id: Some(parent_id), + parent_execution_id, + .. + } = msg + { + // Skip genuine redelivery: same parent that already owns this instance. + if own_parent.as_ref() == Some(&(parent_instance.clone(), *parent_id)) { + continue; + } + warn!( + instance = %item.instance, + parent_instance = %parent_instance, + parent_id = %parent_id, + "Sub-orchestration target instance id already exists and is terminal; notifying parent of failure" + ); + // Prefer the execution id stamped on the colliding start; fall back to a + // durable provider read for work items produced by older runtimes. + let parent_execution_id = self + .resolve_parent_execution_id(parent_instance, *parent_execution_id) + .await; + notifications.push(WorkItem::SubOrchFailed { + parent_instance: parent_instance.clone(), + parent_execution_id, + parent_id: *parent_id, + details: crate::ErrorDetails::Application { + kind: crate::AppErrorKind::OrchestrationFailed, + message: format!( + "sub-orchestration instance id '{}' already exists and is terminal", + item.instance + ), + retryable: false, + }, + }); + } + } + notifications + } } diff --git a/src/runtime/execution.rs b/src/runtime/execution.rs index 3d70804..38f0abf 100644 --- a/src/runtime/execution.rs +++ b/src/runtime/execution.rs @@ -63,6 +63,7 @@ impl Runtime { version: version.clone(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }); } @@ -76,7 +77,7 @@ impl Runtime { // (works for both existing history and newly appended OrchestrationStarted in delta) let (input, parent_link) = history_mgr.extract_context(); - if let Some((ref pinst, pid)) = parent_link { + if let Some((ref pinst, _pexec, pid)) = parent_link { tracing::debug!(target = "duroxide::runtime::execution", instance=%instance, parent_instance=%pinst, parent_id=%pid, "Detected parent link for orchestration"); } else { tracing::debug!(target = "duroxide::runtime::execution", instance=%instance, "No parent link for orchestration"); @@ -151,7 +152,6 @@ impl Runtime { session_id, tag, } => { - let execution_id = self.get_execution_id_for_instance(instance, Some(execution_id)).await; worker_items.push(WorkItem::ActivityExecute { instance: instance.to_string(), execution_id, @@ -166,8 +166,6 @@ impl Runtime { scheduling_event_id, fire_at_ms, } => { - let execution_id = self.get_execution_id_for_instance(instance, Some(execution_id)).await; - // Enqueue TimerFired to orchestrator queue with delayed visibility // Provider will use fire_at_ms for the visible_at timestamp // Note: fire_at_ms is computed at scheduling time (wall-clock), @@ -194,6 +192,10 @@ impl Runtime { version: version.clone(), parent_instance: Some(instance.to_string()), parent_id: Some(*scheduling_event_id), + // Stamp the scheduling parent execution so the child's + // completion/failure routes back to this exact execution, + // even across continue-as-new, restarts, or other nodes. + parent_execution_id: Some(execution_id), execution_id: crate::INITIAL_EXECUTION_ID, }); } @@ -211,6 +213,7 @@ impl Runtime { version: version.clone(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }); } @@ -240,11 +243,13 @@ impl Runtime { )); // Notify parent if this is a sub-orchestration - if let Some((parent_instance, parent_id)) = parent_link { + if let Some((parent_instance, parent_execution_id, parent_id)) = parent_link { tracing::debug!(target = "duroxide::runtime::execution", instance=%instance, parent_instance=%parent_instance, parent_id=%parent_id, "Enqueue SubOrchCompleted to parent"); + let parent_execution_id = + self.resolve_parent_execution_id(&parent_instance, parent_execution_id).await; orchestrator_items.push(WorkItem::SubOrchCompleted { parent_instance: parent_instance.clone(), - parent_execution_id: self.get_execution_id_for_instance(&parent_instance, None).await, + parent_execution_id, parent_id, result: output.clone(), }); @@ -273,11 +278,13 @@ impl Runtime { history_mgr.append_failed(details.clone()); // Notify parent if this is a sub-orchestration - if let Some((parent_instance, parent_id)) = parent_link { + if let Some((parent_instance, parent_execution_id, parent_id)) = parent_link { tracing::debug!(target = "duroxide::runtime::execution", instance=%instance, parent_instance=%parent_instance, parent_id=%parent_id, "Enqueue SubOrchFailed to parent"); + let parent_execution_id = + self.resolve_parent_execution_id(&parent_instance, parent_execution_id).await; orchestrator_items.push(WorkItem::SubOrchFailed { parent_instance: parent_instance.clone(), - parent_execution_id: self.get_execution_id_for_instance(&parent_instance, None).await, + parent_execution_id, parent_id, details: details.clone(), }); @@ -361,10 +368,12 @@ impl Runtime { } // Notify parent if this is a sub-orchestration - if let Some((parent_instance, parent_id)) = parent_link { + if let Some((parent_instance, parent_execution_id, parent_id)) = parent_link { + let parent_execution_id = + self.resolve_parent_execution_id(&parent_instance, parent_execution_id).await; orchestrator_items.push(WorkItem::SubOrchFailed { parent_instance: parent_instance.clone(), - parent_execution_id: self.get_execution_id_for_instance(&parent_instance, None).await, + parent_execution_id, parent_id, details: details.clone(), }); diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index fef0b09..9f4a2d6 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -6,7 +6,6 @@ // use crate::providers::{ExecutionMetadata, Provider, WorkItem}; use crate::{Event, EventKind, OrchestrationContext}; -use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; @@ -492,8 +491,6 @@ pub struct Runtime { joins: Mutex>>, history_store: Arc, orchestration_registry: OrchestrationRegistry, - /// Track the current execution ID for each active instance - current_execution_ids: Mutex>, /// Shutdown flag checked by dispatchers shutdown_flag: Arc, /// Runtime configuration options @@ -842,28 +839,48 @@ impl Runtime { None } - /// Get the current execution ID for an instance, or fetch from store if not tracked + /// Resolve the parent execution id for routing a sub-orchestration completion or + /// failure back to its parent. /// - /// If `current_execution_id` is provided and the instance matches, use it directly. - /// Otherwise, check in-memory tracking, then fall back to INITIAL_EXECUTION_ID. - async fn get_execution_id_for_instance(&self, instance: &str, current_execution_id: Option) -> u64 { - // If this is the current instance being processed, use the provided execution_id - if let Some(exec_id) = current_execution_id { - // Update in-memory tracking for future calls - self.current_execution_ids - .lock() - .await - .insert(instance.to_string(), exec_id); - return exec_id; + /// Prefers the `stamped` value carried durably from schedule time (the exact parent + /// execution that scheduled this child). When absent — children started by an older + /// runtime, or work items produced before this field existed — falls back to a durable + /// provider read via [`Self::parent_execution_id_for_routing`]. This keeps mixed-version + /// clusters correct during rolling upgrades. + async fn resolve_parent_execution_id(&self, parent_instance: &str, stamped: Option) -> u64 { + match stamped { + Some(execution_id) => execution_id, + None => self.parent_execution_id_for_routing(parent_instance).await, } + } - // First check in-memory tracking - if let Some(&exec_id) = self.current_execution_ids.lock().await.get(instance) { - return exec_id; + /// Resolve a parent instance's current execution id for routing a sub-orchestration + /// completion or failure back to it. + /// + /// This is the legacy fallback used only when the durable `parent_execution_id` stamp + /// is missing (old child histories / old work items). It reads the execution id from the + /// provider rather than process-local memory, so routing is correct when a restarted + /// runtime or a different dispatcher node emits the notification. A misrouted notification + /// (e.g. defaulting to execution 1 while the parent is on execution 2+) is filtered out + /// during replay and would leave the parent awaiting forever. `Provider::read` returns the + /// parent's current-execution history, so any event's `execution_id` is the current one. + /// On a read error (or no history yet) we fall back to `INITIAL_EXECUTION_ID`. + async fn parent_execution_id_for_routing(&self, parent_instance: &str) -> u64 { + match self.history_store.read(parent_instance).await { + Ok(events) => events + .iter() + .map(|e| e.execution_id) + .max() + .unwrap_or(crate::INITIAL_EXECUTION_ID), + Err(e) => { + tracing::warn!( + parent_instance = %parent_instance, + error = %e, + "failed to read parent history for sub-orchestration routing; defaulting to INITIAL_EXECUTION_ID" + ); + crate::INITIAL_EXECUTION_ID + } } - - // Fall back to INITIAL_EXECUTION_ID (no longer querying Provider::latest_execution_id) - crate::INITIAL_EXECUTION_ID } /// Start a new runtime using the in-memory SQLite provider. @@ -971,7 +988,6 @@ impl Runtime { joins: Mutex::new(joins), history_store, orchestration_registry, - current_execution_ids: Mutex::new(HashMap::new()), shutdown_flag: Arc::new(AtomicBool::new(false)), options, diff --git a/src/runtime/replay_engine.rs b/src/runtime/replay_engine.rs index 7dc67e9..aad01d7 100644 --- a/src/runtime/replay_engine.rs +++ b/src/runtime/replay_engine.rs @@ -1079,7 +1079,7 @@ impl ReplayEngine { ctx.bind_token(token, event_id); - let updated_action = update_action_event_id(action, event_id); + let updated_action = update_action_event_id(action, self.execution_id, event_id); if let crate::Action::StartSubOrchestration { instance, .. } = &updated_action { ctx.bind_sub_orchestration_instance(token, instance.clone()); @@ -1747,7 +1747,7 @@ fn action_to_event(action: &Action, instance: &str, execution_id: u64, event_id: /// Update an action's scheduling_event_id to the correct event_id. /// Also generates the actual sub-orchestration instance ID from the event_id /// (unless an explicit instance ID was provided, indicated by not starting with SUB_ORCH_PENDING_PREFIX). -fn update_action_event_id(action: Action, event_id: u64) -> Action { +fn update_action_event_id(action: Action, execution_id: u64, event_id: u64) -> Action { match action { Action::CallActivity { name, @@ -1790,7 +1790,7 @@ fn update_action_event_id(action: Action, event_id: u64) -> Action { // If instance starts with the pending prefix, it's a placeholder that needs to be replaced. // Otherwise, it's an explicit instance ID provided by the user. let final_instance = if instance.starts_with(crate::SUB_ORCH_PENDING_PREFIX) { - format!("{}{event_id}", crate::SUB_ORCH_AUTO_PREFIX) + crate::auto_sub_orch_suffix(execution_id, event_id) } else { instance }; @@ -1921,6 +1921,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/runtime/replay_engine_tests.rs b/src/runtime/replay_engine_tests.rs index dec034d..139c099 100644 --- a/src/runtime/replay_engine_tests.rs +++ b/src/runtime/replay_engine_tests.rs @@ -31,6 +31,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -117,6 +118,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -213,6 +215,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -257,6 +260,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -308,6 +312,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -376,6 +381,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/runtime/state_helpers.rs b/src/runtime/state_helpers.rs index 0d2e210..2675f88 100644 --- a/src/runtime/state_helpers.rs +++ b/src/runtime/state_helpers.rs @@ -25,6 +25,10 @@ pub struct HistoryManager { /// Parent event ID if this is a sub-orchestration pub parent_id: Option, + /// Parent execution id if this is a sub-orchestration (from OrchestrationStarted). + /// Used to route this child's completion/failure back to the exact parent execution. + pub parent_execution_id: Option, + /// Whether the orchestration has completed successfully pub is_completed: bool, @@ -56,6 +60,7 @@ impl HistoryManager { orchestration_input: None, parent_instance: None, parent_id: None, + parent_execution_id: None, is_completed: false, is_failed: false, is_continued_as_new: false, @@ -76,6 +81,7 @@ impl HistoryManager { input, parent_instance, parent_id, + parent_execution_id, .. } = &event.kind { @@ -85,6 +91,7 @@ impl HistoryManager { metadata.orchestration_input = Some(input.clone()); metadata.parent_instance = parent_instance.clone(); metadata.parent_id = *parent_id; + metadata.parent_execution_id = *parent_execution_id; metadata.current_execution_id = Some(execution_id_counter); last_started_index = Some(idx); // Don't break - we want the LAST (most recent) OrchestrationStarted @@ -250,12 +257,16 @@ impl HistoryManager { } /// Extract input and parent linkage from history for orchestration context - /// This looks at the full history including any newly appended events in the delta - pub fn extract_context(&self) -> (String, Option<(String, u64)>) { + /// This looks at the full history including any newly appended events in the delta. + /// + /// The returned parent link is `(parent_instance, parent_execution_id, parent_id)`, + /// where `parent_execution_id` is the durable execution id stamped at child-start time + /// (`None` for children started by older runtimes; callers fall back to a provider read). + pub fn extract_context(&self) -> (String, Option<(String, Option, u64)>) { // First check if we have metadata from the initial scan if let Some(ref input) = self.orchestration_input { let parent_link = if let (Some(parent_inst), Some(parent_id)) = (&self.parent_instance, self.parent_id) { - Some((parent_inst.clone(), parent_id)) + Some((parent_inst.clone(), self.parent_execution_id, parent_id)) } else { None }; @@ -268,11 +279,12 @@ impl HistoryManager { input, parent_instance, parent_id, + parent_execution_id, .. } = &e.kind { let parent_link = if let (Some(pinst), Some(pid)) = (parent_instance.clone(), *parent_id) { - Some((pinst, pid)) + Some((pinst, *parent_execution_id, pid)) } else { None }; @@ -384,6 +396,10 @@ pub struct WorkItemReader { /// Parent event ID (from start item or None) pub parent_id: Option, + /// Parent execution id (from start item or None) — used to route this child's + /// completion/failure back to the exact parent execution that scheduled it. + pub parent_execution_id: Option, + /// Whether this is a ContinueAsNew pub is_continue_as_new: bool, } @@ -428,7 +444,7 @@ impl WorkItemReader { } // Extract parameters from start item or use defaults - let (orchestration_name, input, version, parent_instance, parent_id, is_continue_as_new) = + let (orchestration_name, input, version, parent_instance, parent_id, parent_execution_id, is_continue_as_new) = if let Some(ref item) = start_item { match item { WorkItem::StartOrchestration { @@ -437,6 +453,7 @@ impl WorkItemReader { version, parent_instance, parent_id, + parent_execution_id, .. } => ( orchestration.clone(), @@ -444,6 +461,7 @@ impl WorkItemReader { version.clone(), parent_instance.clone(), *parent_id, + *parent_execution_id, false, ), WorkItem::ContinueAsNew { @@ -467,7 +485,7 @@ impl WorkItemReader { carried.append(&mut completion_messages); completion_messages = carried; - (orchestration.clone(), input.clone(), version.clone(), None, None, true) + (orchestration.clone(), input.clone(), version.clone(), None, None, None, true) } _ => unreachable!(), } @@ -485,7 +503,8 @@ impl WorkItemReader { let version = history_mgr.version(); let parent_instance = history_mgr.parent_instance.clone(); let parent_id = history_mgr.parent_id; - (orchestration_name, input, version, parent_instance, parent_id, false) + let parent_execution_id = history_mgr.parent_execution_id; + (orchestration_name, input, version, parent_instance, parent_id, parent_execution_id, false) }; Self { @@ -496,6 +515,7 @@ impl WorkItemReader { version, parent_instance, parent_id, + parent_execution_id, is_continue_as_new, } } @@ -536,6 +556,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -564,6 +585,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -599,6 +621,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -638,6 +661,7 @@ mod tests { input: "input1".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -662,6 +686,7 @@ mod tests { input: "input2".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -688,6 +713,7 @@ mod tests { input: "test".to_string(), parent_instance: Some("parent-instance".to_string()), parent_id: Some(42), + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -708,6 +734,7 @@ mod tests { version: Some("1.0.0".to_string()), parent_instance: Some("parent".to_string()), parent_id: Some(42), + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }, WorkItem::ActivityCompleted { @@ -781,6 +808,7 @@ mod tests { input: "test-input".to_string(), parent_instance: Some("parent-inst".to_string()), parent_id: Some(42), + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -822,6 +850,7 @@ mod tests { version: None, parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }, WorkItem::StartOrchestration { @@ -831,6 +860,7 @@ mod tests { version: None, parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }, ]; @@ -861,6 +891,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -901,6 +932,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -921,6 +953,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -941,6 +974,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/tests/cancellation_tests.rs b/tests/cancellation_tests.rs index 6753924..1012c74 100644 --- a/tests/cancellation_tests.rs +++ b/tests/cancellation_tests.rs @@ -1236,6 +1236,7 @@ async fn cancel_before_orchestration_starts() { input: "input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: 1, }, None, diff --git a/tests/capability_filtering_tests.rs b/tests/capability_filtering_tests.rs index 4b5a4ae..2d64525 100644 --- a/tests/capability_filtering_tests.rs +++ b/tests/capability_filtering_tests.rs @@ -314,6 +314,7 @@ async fn provider_seed_without_pinned_version(provider: &dyn Provider, instance: version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, }, None, diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 112c381..393ff15 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -119,6 +119,7 @@ pub async fn test_create_execution( input: input.to_string(), parent_instance: parent_instance.map(|s| s.to_string()), parent_id, + parent_execution_id: None, execution_id: next_execution_id, }, None, @@ -152,6 +153,7 @@ pub async fn test_create_execution( input: input.to_string(), parent_instance: parent_instance.map(|s| s.to_string()), parent_id, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -255,6 +257,7 @@ pub async fn seed_instance_with_pinned_version( input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -270,6 +273,7 @@ pub async fn seed_instance_with_pinned_version( version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, }, INITIAL_EXECUTION_ID, diff --git a/tests/instance_id_validation_tests.rs b/tests/instance_id_validation_tests.rs new file mode 100644 index 0000000..5a1e3af --- /dev/null +++ b/tests/instance_id_validation_tests.rs @@ -0,0 +1,74 @@ +//! Instance id validation: reserved sub-orchestration markers are rejected. +//! +//! Child sub-orchestration instance ids are auto-generated as +//! `{parent}::sub::{event_id}`. User-supplied instance ids must not collide with +//! that reserved form, otherwise they can squat a future child id. Other uses of +//! `::` (e.g. `i4::child-1`) remain valid. + +#![allow(clippy::unwrap_used)] +#![allow(clippy::expect_used)] + +use duroxide::Client; +use duroxide::providers::Provider; +use duroxide::providers::sqlite::SqliteProvider; +use std::sync::Arc; + +async fn client() -> Client { + let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); + Client::new(store) +} + +#[tokio::test] +async fn start_orchestration_rejects_reserved_infix() { + let err = client() + .await + .start_orchestration("victim::sub::2", "AnyOrch", "") + .await + .expect_err("instance id containing the reserved '::sub::' marker must be rejected"); + + assert!( + matches!(err, duroxide::ClientError::InvalidInput { .. }), + "expected InvalidInput, got {err:?}" + ); +} + +#[tokio::test] +async fn start_orchestration_rejects_reserved_prefix() { + let err = client() + .await + .start_orchestration("sub::pending_1", "AnyOrch", "") + .await + .expect_err("instance id starting with the reserved 'sub::' marker must be rejected"); + + assert!(matches!(err, duroxide::ClientError::InvalidInput { .. }), "got {err:?}"); +} + +#[tokio::test] +async fn start_orchestration_versioned_rejects_reserved_infix() { + let err = client() + .await + .start_orchestration_versioned("a::sub::3", "AnyOrch", "1.0.0", "") + .await + .expect_err("instance id containing the reserved '::sub::' marker must be rejected"); + + assert!(matches!(err, duroxide::ClientError::InvalidInput { .. }), "got {err:?}"); +} + +#[tokio::test] +async fn start_orchestration_accepts_normal_id() { + client() + .await + .start_orchestration("order-123", "AnyOrch", "") + .await + .expect("normal instance id must be accepted"); +} + +#[tokio::test] +async fn start_orchestration_accepts_non_reserved_double_colon() { + // `::` is only reserved in the `sub::` form; other uses remain valid. + client() + .await + .start_orchestration("i4::child-1", "AnyOrch", "") + .await + .expect("non-reserved '::' instance id must be accepted"); +} diff --git a/tests/observability_tests.rs b/tests/observability_tests.rs index 6b55279..ef4f23e 100644 --- a/tests/observability_tests.rs +++ b/tests/observability_tests.rs @@ -366,6 +366,7 @@ async fn test_fetch_orchestration_item_fault_injection() { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -1198,6 +1199,7 @@ async fn test_queue_depth_gauges_initialization() { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -1214,6 +1216,7 @@ async fn test_queue_depth_gauges_initialization() { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, diff --git a/tests/poison_message_tests.rs b/tests/poison_message_tests.rs index bbde348..03645db 100644 --- a/tests/poison_message_tests.rs +++ b/tests/poison_message_tests.rs @@ -40,6 +40,7 @@ async fn orchestration_attempt_count_increments_on_abandon() { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, }, None, diff --git a/tests/provider_atomic_tests.rs b/tests/provider_atomic_tests.rs index 5073d4c..fb25dee 100644 --- a/tests/provider_atomic_tests.rs +++ b/tests/provider_atomic_tests.rs @@ -116,6 +116,7 @@ async fn test_fetch_orchestration_item_new_instance() { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -249,6 +250,7 @@ async fn test_ack_orchestration_item_atomic() { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -276,6 +278,7 @@ async fn test_ack_orchestration_item_atomic() { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -385,6 +388,7 @@ async fn test_abandon_orchestration_item() { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -436,6 +440,7 @@ async fn test_abandon_orchestration_item_with_delay() { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -503,6 +508,7 @@ async fn test_in_memory_provider_atomic_operations() { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -531,6 +537,7 @@ async fn test_in_memory_provider_atomic_operations() { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/tests/replay_engine/helpers.rs b/tests/replay_engine/helpers.rs index 7d64148..dad3ca8 100644 --- a/tests/replay_engine/helpers.rs +++ b/tests/replay_engine/helpers.rs @@ -38,6 +38,7 @@ pub fn started_event(event_id: u64) -> Event { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/tests/scenarios.rs b/tests/scenarios.rs index 86437ce..f3da9a9 100644 --- a/tests/scenarios.rs +++ b/tests/scenarios.rs @@ -39,3 +39,6 @@ mod replay_versioning; #[path = "scenarios/copilot_chat.rs"] mod copilot_chat; + +#[path = "scenarios/suborch_id_collision.rs"] +mod suborch_id_collision; diff --git a/tests/scenarios/replay_versioning.rs b/tests/scenarios/replay_versioning.rs index ca9c67e..74bf339 100644 --- a/tests/scenarios/replay_versioning.rs +++ b/tests/scenarios/replay_versioning.rs @@ -344,6 +344,7 @@ async fn e2e_upgrade_with_preexisting_v1_history() { input: "seed".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id, }, execution_id, @@ -356,6 +357,7 @@ async fn e2e_upgrade_with_preexisting_v1_history() { input: "seed".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/tests/scenarios/suborch_id_collision.rs b/tests/scenarios/suborch_id_collision.rs new file mode 100644 index 0000000..7d5978c --- /dev/null +++ b/tests/scenarios/suborch_id_collision.rs @@ -0,0 +1,855 @@ +//! Sub-orchestration instance-id collision scenarios. +//! +//! Child sub-orchestration instance ids reserve the `sub::` marker. The suffix is produced +//! by [`duroxide::auto_sub_orch_suffix`]: the first parent execution uses +//! `{parent}::sub::{event_id}`, and executions after `continue_as_new` use +//! `{parent}::sub::{execution_id}_{event_id}`. +//! +//! ## The real collision class: same parent across continue-as-new +//! +//! Because the full child id embeds the parent instance id, children of *different* parents +//! never collide as long as root instance ids are unique across the provider. The genuine +//! collision the execution-scoped suffix defends against is a single parent that schedules a +//! sub-orchestration at the same event position on each `continue_as_new` generation: event +//! ids reset on continue-as-new, so without the execution-scoped suffix execution 2 would +//! regenerate execution 1's (now terminal) child id and the parent would hang forever. The +//! `continue_as_new_*` tests below are the primary regressions for this. +//! +//! ## Legacy / provider-bypass defense +//! +//! If an id with the reserved marker somehow already names a terminal instance when a parent +//! schedules its child — e.g. an older, non-validating client enqueued it directly through +//! the provider during a rolling upgrade — the parent must still not hang: the dispatcher +//! observes the terminal collision and fails the parent fast. The tests that *inject* a +//! foreign terminal "squatter" via the provider exercise that defense and the durable +//! routing of the failure notification; they are not the normal auto-generated collision +//! case and are labeled accordingly. + +#![allow(clippy::unwrap_used)] +#![allow(clippy::expect_used)] + +use duroxide::providers::Provider; +use duroxide::providers::WorkItem; +use duroxide::providers::sqlite::SqliteProvider; +use duroxide::providers::ExecutionMetadata; +use duroxide::runtime::registry::ActivityRegistry; +use duroxide::runtime::{self}; +use duroxide::{Client, Event, EventKind, OrchestrationContext, OrchestrationRegistry, OrchestrationStatus}; +use std::sync::Arc; +use std::time::Duration; + +#[path = "../common/mod.rs"] +mod common; + +/// Legacy / provider-bypass defense (not the normal auto-generated collision case). +/// +/// A pre-existing terminal instance occupying the parent's auto-generated child id must not +/// cause the parent to hang. The parent should reach a terminal state. +/// +/// Under unique root ids this collision cannot arise from auto-generated ids alone, so the +/// colliding instance is enqueued directly through the provider to model a client that does +/// not validate instance ids (e.g. an older node during a rolling upgrade). This exercises +/// the dispatcher's terminal-collision defense independently of client-side validation. +#[tokio::test] +async fn legacy_provider_bypass_terminal_collision_does_not_hang_parent() { + let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); + + // Parent's first action is a sub-orchestration call. Event 1 = OrchestrationStarted, + // Event 2 = SubOrchestrationScheduled, so the auto-generated child id is + // "{parent}::sub::2". + let parent = |ctx: OrchestrationContext, _input: String| async move { + match ctx.schedule_sub_orchestration("Child", "child-input").await { + Ok(r) => Ok(format!("parent-got:{r}")), + Err(e) => Err(format!("child-failed:{e}")), + } + }; + let child = |_ctx: OrchestrationContext, input: String| async move { Ok(format!("child-done:{input}")) }; + // Unrelated orchestration that completes immediately, used to occupy the child id. + let squatter = |_ctx: OrchestrationContext, _input: String| async move { Ok("squatted".to_string()) }; + + let orchs = OrchestrationRegistry::builder() + .register("Parent", parent) + .register("Child", child) + .register("Squatter", squatter) + .build(); + let acts = ActivityRegistry::builder().build(); + let rt = runtime::Runtime::start_with_store(store.clone(), acts, orchs).await; + let client = Client::new(store.clone()); + + // Occupy the predicted child id with an unrelated, already-completed instance. + // Enqueued directly (bypassing client-side validation) to model a non-validating client. + let squat_id = "job-1::sub::2"; + store + .enqueue_for_orchestrator( + WorkItem::StartOrchestration { + instance: squat_id.to_string(), + orchestration: "Squatter".to_string(), + input: String::new(), + version: None, + parent_instance: None, + parent_id: None, + parent_execution_id: None, + execution_id: 1, + }, + None, + ) + .await + .unwrap(); + let squat_status = client + .wait_for_orchestration(squat_id, Duration::from_secs(5)) + .await + .unwrap(); + assert!( + matches!(squat_status, OrchestrationStatus::Completed { .. }), + "squatter must complete first; got {squat_status:?}" + ); + + // Start the parent. Its child id collides with the terminal squatter instance. + client.start_orchestration("job-1", "Parent", "").await.unwrap(); + + let status = client + .wait_for_orchestration("job-1", Duration::from_secs(10)) + .await + .expect("parent must reach a terminal state, not hang"); + + match status { + OrchestrationStatus::Failed { details, .. } => { + let msg = details.display_message(); + assert!( + msg.contains("already exists"), + "failure should reflect the child-id collision; got {msg:?}" + ); + } + other => panic!("parent should fail fast due to the child-id collision; got {other:?}"), + } + + rt.shutdown(None).await; +} + +/// Genuine at-least-once redelivery of a completed child's own `StartOrchestration` +/// must not spuriously fail the parent. The child id already names a terminal instance, +/// but the incoming work item's parent matches that instance's recorded parent, so the +/// dispatcher must skip the collision notification and leave the parent completed. +#[tokio::test] +async fn redelivered_child_start_does_not_fail_parent() { + let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); + + let parent = |ctx: OrchestrationContext, _input: String| async move { + let r = ctx.schedule_sub_orchestration("Child", "child-input").await?; + Ok(format!("parent-got:{r}")) + }; + let child = |_ctx: OrchestrationContext, input: String| async move { Ok(format!("child-done:{input}")) }; + + let orchs = OrchestrationRegistry::builder() + .register("Parent", parent) + .register("Child", child) + .build(); + let acts = ActivityRegistry::builder().build(); + let rt = runtime::Runtime::start_with_store(store.clone(), acts, orchs).await; + let client = Client::new(store.clone()); + + client.start_orchestration("job-2", "Parent", "").await.unwrap(); + let status = client + .wait_for_orchestration("job-2", Duration::from_secs(10)) + .await + .unwrap(); + assert!( + matches!(&status, OrchestrationStatus::Completed { output, .. } if output == "parent-got:child-done:child-input"), + "parent should complete normally first; got {status:?}" + ); + + // Snapshot the parent's history before redelivery so we can prove nothing was appended. + let parent_history_before = store.read("job-2").await.unwrap(); + + // Redeliver the completed child's own StartOrchestration (same parent linkage). + // The child id "job-2::sub::2" is now terminal; the dispatcher must treat this as + // redelivery and not enqueue a SubOrchFailed for the parent. + store + .enqueue_for_orchestrator( + WorkItem::StartOrchestration { + instance: "job-2::sub::2".to_string(), + orchestration: "Child".to_string(), + input: "child-input".to_string(), + version: None, + parent_instance: Some("job-2".to_string()), + parent_id: Some(2), + parent_execution_id: None, + execution_id: 1, + }, + None, + ) + .await + .unwrap(); + + // Wait deterministically until the redelivered child start has drained from the + // orchestrator queue (and the queue has settled), rather than sleeping a fixed time. + wait_for_orchestrator_queue_drained(&store, Duration::from_secs(10)).await; + + // A spurious notification would be a parent-targeted SubOrchFailed. Assert none is + // queued and none was appended to the parent's history. (Checking only that the parent + // still reports Completed is insufficient: a SubOrchFailed delivered to an already + // terminal parent is discarded by the terminal fast-ack path without a trace.) + let depths = store + .as_management_capability() + .unwrap() + .get_queue_depths() + .await + .unwrap(); + assert_eq!( + depths.orchestrator_queue, 0, + "no parent-targeted SubOrchFailed should remain queued after redelivery" + ); + + let parent_history_after = store.read("job-2").await.unwrap(); + assert_eq!( + parent_history_after.len(), + parent_history_before.len(), + "redelivery must not append any event (e.g. SubOrchestrationFailed) to the parent" + ); + assert!( + !parent_history_after + .iter() + .any(|e| matches!(e.kind, duroxide::EventKind::OrchestrationFailed { .. })), + "parent history must not contain an OrchestrationFailed event after redelivery" + ); + + let after = client.get_orchestration_status("job-2").await.unwrap(); + assert!( + matches!(after, OrchestrationStatus::Completed { .. }), + "redelivery must not fail the parent; got {after:?}" + ); + + rt.shutdown(None).await; +} + +/// PRIMARY regression for the real collision class: a parent that schedules a +/// sub-orchestration on every continue-as-new iteration must not collide with itself. +/// +/// Event ids reset on continue-as-new, so without execution-scoped child ids the second +/// iteration would regenerate the same child id as the first (now terminal) and hang. The +/// auto-generated suffix includes the parent execution after the first execution, keeping +/// each iteration's child unique. This asserts both that the parent completes and that the +/// per-execution child suffixes are exactly `sub::2`, `sub::2_2`, `sub::3_2`, `sub::4_2`. +#[tokio::test] +async fn parent_with_suborch_survives_continue_as_new() { + let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); + + // Each execution's first action is a sub-orchestration call, then it continues as + // new with an incremented counter until it reaches the limit. + let parent = |ctx: OrchestrationContext, input: String| async move { + let n: u32 = input.parse().unwrap_or(0); + let r = ctx.schedule_sub_orchestration("Child", n.to_string()).await?; + if n < 3 { + return ctx.continue_as_new((n + 1).to_string()).await; + } + Ok(format!("done:{n}:{r}")) + }; + let child = |_ctx: OrchestrationContext, input: String| async move { Ok(format!("child-done:{input}")) }; + + let orchs = OrchestrationRegistry::builder() + .register("Parent", parent) + .register("Child", child) + .build(); + let acts = ActivityRegistry::builder().build(); + let rt = runtime::Runtime::start_with_store(store.clone(), acts, orchs).await; + let client = Client::new(store.clone()); + + client.start_orchestration("can-job", "Parent", "0").await.unwrap(); + + let status = client + .wait_for_orchestration("can-job", Duration::from_secs(10)) + .await + .expect("parent must run through all continue-as-new iterations, not hang"); + + assert!( + matches!(&status, OrchestrationStatus::Completed { output, .. } if output == "done:3:child-done:3"), + "parent should complete after looping with sub-orchestrations; got {status:?}" + ); + + // Each execution must schedule a distinctly-suffixed child: the first keeps the legacy + // `sub::{event_id}` form; later executions include the execution id, so no two iterations + // ever regenerate the same (now terminal) child id. + let mut suffixes = Vec::new(); + for execution_id in 1..=4 { + suffixes.push(scheduled_child_suffix(&store, "can-job", execution_id).await); + } + assert_eq!( + suffixes, + vec![ + "sub::2".to_string(), + "sub::2_2".to_string(), + "sub::3_2".to_string(), + "sub::4_2".to_string(), + ], + "each continue-as-new execution must generate a unique, execution-scoped child id" + ); + + rt.shutdown(None).await; +} + +/// Focused regression (affandar): after the first continue-as-new generation the child id +/// must include the execution id, so a parent that schedules a sub-orchestration at the same +/// event position on each generation never reuses a now-terminal child id. +/// +/// With the old id generation this hangs: execution 2 tries to start `P::sub::2`, finds +/// execution 1's child already terminal, and the parent never receives a completion. The +/// assertion pins the exact generated suffixes (`sub::2`, `sub::2_2`, `sub::3_2`). +#[tokio::test] +async fn continue_as_new_suborch_child_ids_include_execution_after_first() { + let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); + let parent_id = "can-child-id-job"; + + let parent = |ctx: OrchestrationContext, input: String| async move { + let n: u32 = input.parse().unwrap_or(0); + let result = ctx.schedule_sub_orchestration("Child", n.to_string()).await?; + if n < 2 { + return ctx.continue_as_new((n + 1).to_string()).await; + } + Ok(format!("done:{n}:{result}")) + }; + let child = |_ctx: OrchestrationContext, input: String| async move { Ok(format!("child-done:{input}")) }; + + let orchestrations = OrchestrationRegistry::builder() + .register("Parent", parent) + .register("Child", child) + .build(); + let activities = ActivityRegistry::builder().build(); + let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await; + let client = Client::new(store.clone()); + + client.start_orchestration(parent_id, "Parent", "0").await.unwrap(); + + let status = client + .wait_for_orchestration(parent_id, Duration::from_secs(5)) + .await + .expect("parent must not hang from reusing the same child id after continue-as-new"); + + assert!( + matches!(&status, OrchestrationStatus::Completed { output, .. } if output == "done:2:child-done:2"), + "parent should complete after three executions; got {status:?}" + ); + + let mut scheduled_child_suffixes = Vec::new(); + for execution_id in 1..=3 { + scheduled_child_suffixes.push(scheduled_child_suffix(&store, parent_id, execution_id).await); + } + assert_eq!( + scheduled_child_suffixes, + vec!["sub::2".to_string(), "sub::2_2".to_string(), "sub::3_2".to_string()], + ); + + rt.shutdown(None).await; +} + +/// Read the auto-generated child suffix recorded by the `SubOrchestrationScheduled` event in +/// the given parent execution. The event stores the suffix (e.g. `sub::2`), not the full +/// `{parent}::sub::...` id. +async fn scheduled_child_suffix(store: &Arc, parent_instance: &str, execution_id: u64) -> String { + let history = store.read_with_execution(parent_instance, execution_id).await.unwrap(); + history + .iter() + .find_map(|event| match &event.kind { + EventKind::SubOrchestrationScheduled { instance, .. } => Some(instance.clone()), + _ => None, + }) + .unwrap_or_else(|| panic!("execution {execution_id} should schedule a sub-orchestration")) +} + +/// Routing regression (affandar): a sub-orchestration's completion/failure must be addressed +/// to the parent execution that *scheduled* the child, not to whatever execution is current +/// when the child finishes. +/// +/// This is the scenario the durable `parent_execution_id` stamp defends against, distinct from +/// the awaited case (where the parent is blocked and the two coincide). Here the child outlives +/// the execution that scheduled it: +/// +/// 1. Execution 1 schedules a child, lets it start, then continues-as-new **without awaiting** +/// it. The child keeps running (continue-as-new does not cancel outstanding children) and +/// finishes only when the test releases it. +/// 2. Execution 2 schedules **no** sub-orchestration and parks on an external event, so it is +/// still alive when the child's late completion notification arrives. +/// 3. The child's `SubOrchCompleted` is emitted while the parent's current execution is 2. +/// +/// With the stamp, the notification is addressed to execution 1 (terminal) and the replay +/// execution filter discards it, so execution 2 is untouched. Without the stamp — routing via +/// the parent's *current* execution at completion time — the notification is addressed to +/// execution 2, where event id 2 is not a sub-orchestration schedule, so it is applied as a +/// nondeterministic completion and poisons the parent. The assertion that execution 2 is still +/// running (then completes cleanly once released) fails under that buggy routing. +#[tokio::test] +async fn suborch_completion_routes_to_scheduling_execution_not_current() { + let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); + let parent_id = "stale-suborch-route-job"; + + let parent = |ctx: OrchestrationContext, input: String| async move { + let n: u32 = input.parse().unwrap_or(0); + if n == 0 { + // Execution 1: schedule the child (event id 2), give it a turn to start, then + // continue-as-new without ever awaiting it. The child outlives this parent + // execution and finishes only once the test releases it, by which time the + // parent's current execution is already 2. + let _child = ctx.schedule_sub_orchestration("Child", "x"); + ctx.schedule_timer(Duration::from_millis(50)).await; + return ctx.continue_as_new("1").await; + } + // Execution 2: schedule no sub-orchestration. Park on an external event so this + // execution stays alive while the child's stale notification is processed. + ctx.schedule_wait("Release").await; + Ok("done".to_string()) + }; + // Child parks on an external event so the test controls exactly when it completes — + // after execution 2 has been established. + let child = |ctx: OrchestrationContext, _input: String| async move { + ctx.schedule_wait("ChildGo").await; + Ok("child-late".to_string()) + }; + + let orchestrations = OrchestrationRegistry::builder() + .register("Parent", parent) + .register("Child", child) + .build(); + let activities = ActivityRegistry::builder().build(); + let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await; + let client = Client::new(store.clone()); + + client.start_orchestration(parent_id, "Parent", "0").await.unwrap(); + + // Wait until (a) the parent has advanced to execution 2 and (b) the child (scheduled in + // execution 1 as `{parent}::sub::2`) is running and subscribed to its release event. + let child_instance = format!("{parent_id}::sub::2"); + let deadline = std::time::Instant::now() + Duration::from_secs(5); + loop { + let parent_on_exec_2 = store.read_with_execution(parent_id, 2).await.is_ok_and(|h| { + h.iter() + .any(|e| matches!(&e.kind, EventKind::OrchestrationStarted { .. })) + }); + let child_running = matches!( + client.get_orchestration_status(&child_instance).await.unwrap(), + OrchestrationStatus::Running { .. } + ); + if parent_on_exec_2 && child_running { + break; + } + assert!( + std::time::Instant::now() < deadline, + "parent never reached execution 2 with a running child" + ); + tokio::time::sleep(Duration::from_millis(25)).await; + } + + // Release the child *after* execution 2 is established, so its completion notification is + // emitted while the parent's current execution is 2 (the divergence the stamp guards). + client.raise_event(&child_instance, "ChildGo", "go").await.unwrap(); + + // Let the child's stale notification drain through the orchestrator queue and be processed + // by the parent *before* we release execution 2. + wait_for_orchestrator_queue_drained(&store, Duration::from_secs(5)).await; + + // Correct routing addresses the stale notification to execution 1 (terminal), so the + // replay filter discards it and execution 2 is still running, waiting for Release. + // Buggy current-execution routing applies it to execution 2 and poisons the parent. + let status = client.get_orchestration_status(parent_id).await.unwrap(); + assert!( + matches!(status, OrchestrationStatus::Running { .. }), + "execution 2 must still be running (stale child notification must not poison it); got {status:?}" + ); + + // Release execution 2 and confirm it completes cleanly. + client.raise_event(parent_id, "Release", "go").await.unwrap(); + let final_status = client + .wait_for_orchestration(parent_id, Duration::from_secs(5)) + .await + .expect("parent should complete after release"); + assert!( + matches!(&final_status, OrchestrationStatus::Completed { output, .. } if output == "done"), + "parent should complete with \"done\"; got {final_status:?}" + ); + + rt.shutdown(None).await; +} + +/// Legacy / provider-bypass defense: execution-scoped routing of a terminal-collision +/// failure within a single end-to-end run. +/// +/// A parent continues as new and, on execution 2, schedules a sub-orchestration whose +/// auto-generated child id (`{parent}::sub::{execution_id}_{event_id}`) already names a +/// terminal instance injected via the provider. The collision failure must be recorded in +/// execution 2, not misrouted to execution 1. Here the failure is produced while the parent's +/// own turn is running, so the `parent_execution_id` is stamped onto the colliding start and +/// used directly. This drives the full flow through one runtime; the +/// `legacy_provider_bypass_terminal_collision_routes_via_fallback_on_fresh_runtime` test below +/// is the stronger cross-runtime guard for the provider-read fallback. +#[tokio::test] +async fn legacy_provider_bypass_terminal_collision_fails_fast_on_execution_two() { + let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); + + // On execution 1 the parent immediately continues as new; on execution 2 its first + // action is a sub-orchestration call. Event 1 = OrchestrationStarted, event 2 = + // SubOrchestrationScheduled, so the execution-2 child id is "coll::sub::2_2". + let parent = |ctx: OrchestrationContext, input: String| async move { + let n: u32 = input.parse().unwrap_or(0); + if n == 0 { + return ctx.continue_as_new("1").await; + } + match ctx.schedule_sub_orchestration("Child", "x").await { + Ok(r) => Ok(format!("parent-got:{r}")), + Err(e) => Err(format!("child-failed:{e}")), + } + }; + let child = |_ctx: OrchestrationContext, input: String| async move { Ok(format!("child-done:{input}")) }; + let squatter = |_ctx: OrchestrationContext, _input: String| async move { Ok("squatted".to_string()) }; + + let squat_id = "coll::sub::2_2"; + + // Runtime A occupies the predicted execution-2 child id with an unrelated terminal + // instance, then shuts down so it holds no in-memory routing state for the parent. + { + let orchs = OrchestrationRegistry::builder().register("Squatter", squatter).build(); + let acts = ActivityRegistry::builder().build(); + let rt_a = runtime::Runtime::start_with_store(store.clone(), acts, orchs).await; + let client_a = Client::new(store.clone()); + store + .enqueue_for_orchestrator( + WorkItem::StartOrchestration { + instance: squat_id.to_string(), + orchestration: "Squatter".to_string(), + input: String::new(), + version: None, + parent_instance: None, + parent_id: None, + parent_execution_id: None, + execution_id: 1, + }, + None, + ) + .await + .unwrap(); + let squat_status = client_a + .wait_for_orchestration(squat_id, Duration::from_secs(5)) + .await + .unwrap(); + assert!( + matches!(squat_status, OrchestrationStatus::Completed { .. }), + "squatter must complete first; got {squat_status:?}" + ); + rt_a.shutdown(None).await; + } + + // Runtime B is a fresh runtime (no cached execution ids). It drives the parent to + // execution 2, where the child id collides with the terminal squatter. + let orchs = OrchestrationRegistry::builder() + .register("Parent", parent) + .register("Child", child) + .build(); + let acts = ActivityRegistry::builder().build(); + let rt_b = runtime::Runtime::start_with_store(store.clone(), acts, orchs).await; + let client_b = Client::new(store.clone()); + + client_b.start_orchestration("coll", "Parent", "0").await.unwrap(); + + let status = client_b + .wait_for_orchestration("coll", Duration::from_secs(10)) + .await + .expect("parent on execution 2 must fail fast, not hang"); + + match status { + OrchestrationStatus::Failed { details, .. } => { + let msg = details.display_message(); + assert!( + msg.contains("already exists"), + "failure should reflect the child-id collision; got {msg:?}" + ); + } + other => panic!("parent should fail fast due to the child-id collision; got {other:?}"), + } + + // The failure must be recorded in execution 2 (proves the notification routed to the + // parent's current execution, not execution 1). + let exec2 = store.read_with_execution("coll", 2).await.unwrap(); + assert!( + exec2 + .iter() + .any(|e| matches!(e.kind, duroxide::EventKind::OrchestrationFailed { .. })), + "execution 2 history must contain the OrchestrationFailed event" + ); + + rt_b.shutdown(None).await; +} + +/// Legacy / provider-bypass defense: stronger cross-runtime regression for the +/// terminal-collision routing *fallback*. +/// +/// Here the runtime that processes the colliding child start has *never* run the parent, and +/// the colliding start carries no stamped `parent_execution_id` (it is seeded directly into +/// the provider, modeling an old work item). The parent's execution-2 state (parked awaiting a +/// sub-orchestration whose id collides with a foreign terminal instance) is seeded directly +/// into the provider. With no stamp to use, the dispatcher must fall back to reading the +/// parent's current execution (2) from durable provider state when routing the `SubOrchFailed`, +/// so the failure lands in execution 2 and the parent fails fast. If the failure were routed to +/// execution 1 (as a process-local cache miss would default to), the parent's replay would +/// filter it out and the parent would hang. +#[tokio::test] +async fn legacy_provider_bypass_terminal_collision_routes_via_fallback_on_fresh_runtime() { + let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); + + let parent_id = "seeded-parent"; + let child_id = "seeded-parent::sub::2_2"; + + // 1. Seed a foreign terminal instance occupying the parent's execution-2 child id. + common::seed_history_turn( + store.as_ref(), + WorkItem::StartOrchestration { + instance: child_id.to_string(), + orchestration: "Squatter".to_string(), + input: String::new(), + version: Some("1.0.0".to_string()), + parent_instance: None, + parent_id: None, + parent_execution_id: None, + execution_id: 1, + }, + 1, + vec![ + Event::with_event_id( + 1, + child_id, + 1, + None, + EventKind::OrchestrationStarted { + name: "Squatter".to_string(), + version: "1.0.0".to_string(), + input: String::new(), + parent_instance: None, + parent_id: None, + parent_execution_id: None, + carry_forward_events: None, + initial_custom_status: None, + }, + ), + Event::with_event_id( + 2, + child_id, + 1, + None, + EventKind::OrchestrationCompleted { + output: "squatted".to_string(), + }, + ), + ], + vec![], + ExecutionMetadata { + orchestration_name: Some("Squatter".to_string()), + orchestration_version: Some("1.0.0".to_string()), + ..Default::default() + }, + ) + .await; + + // 2. Seed the parent directly on execution 2, parked awaiting the colliding child. + // On execution 2 its first action is the sub-orchestration call: event 1 = + // OrchestrationStarted, event 2 = SubOrchestrationScheduled, id "...::sub::2_2". + common::seed_history_turn( + store.as_ref(), + WorkItem::StartOrchestration { + instance: parent_id.to_string(), + orchestration: "Parent".to_string(), + input: "1".to_string(), + version: Some("1.0.0".to_string()), + parent_instance: None, + parent_id: None, + parent_execution_id: None, + execution_id: 2, + }, + 2, + vec![ + Event::with_event_id( + 1, + parent_id, + 2, + None, + EventKind::OrchestrationStarted { + name: "Parent".to_string(), + version: "1.0.0".to_string(), + input: "1".to_string(), + parent_instance: None, + parent_id: None, + parent_execution_id: None, + carry_forward_events: None, + initial_custom_status: None, + }, + ), + Event::with_event_id( + 2, + parent_id, + 2, + None, + EventKind::SubOrchestrationScheduled { + name: "Child".to_string(), + instance: child_id.to_string(), + input: "x".to_string(), + }, + ), + ], + vec![], + ExecutionMetadata { + orchestration_name: Some("Parent".to_string()), + orchestration_version: Some("1.0.0".to_string()), + ..Default::default() + }, + ) + .await; + + // Sanity: durable state reports the parent on execution 2. + assert_eq!( + store.read(parent_id).await.unwrap().iter().map(|e| e.execution_id).max(), + Some(2), + "seeded parent must be on execution 2" + ); + + // 3. Enqueue the colliding child start with NO stamped parent_execution_id, modeling a + // work item produced before this field existed (rolling upgrade). Its target id is + // already terminal (the foreign squatter), and its parent differs, so this is a + // genuine collision. With no stamp, routing must fall back to a durable provider read. + store + .enqueue_for_orchestrator( + WorkItem::StartOrchestration { + instance: child_id.to_string(), + orchestration: "Child".to_string(), + input: "x".to_string(), + version: Some("1.0.0".to_string()), + parent_instance: Some(parent_id.to_string()), + parent_id: Some(2), + parent_execution_id: None, + execution_id: 1, + }, + None, + ) + .await + .unwrap(); + + // 4. A fresh runtime that never ran the parent processes the collision and must route + // the failure to the parent's current execution (2), read from durable state. + let parent = |ctx: OrchestrationContext, input: String| async move { + let n: u32 = input.parse().unwrap_or(0); + if n == 0 { + return ctx.continue_as_new("1").await; + } + match ctx.schedule_sub_orchestration("Child", "x").await { + Ok(r) => Ok(format!("parent-got:{r}")), + Err(e) => Err(format!("child-failed:{e}")), + } + }; + let child = |_ctx: OrchestrationContext, input: String| async move { Ok(format!("child-done:{input}")) }; + + let orchs = OrchestrationRegistry::builder() + .register("Parent", parent) + .register("Child", child) + .build(); + let acts = ActivityRegistry::builder().build(); + let rt = runtime::Runtime::start_with_store(store.clone(), acts, orchs).await; + let client = Client::new(store.clone()); + + let status = client + .wait_for_orchestration(parent_id, Duration::from_secs(10)) + .await + .expect("fresh runtime must route the failure to execution 2, not hang"); + + match status { + OrchestrationStatus::Failed { details, .. } => { + let msg = details.display_message(); + assert!( + msg.contains("already exists"), + "failure should reflect the child-id collision; got {msg:?}" + ); + } + other => panic!("parent should fail fast due to the child-id collision; got {other:?}"), + } + + let exec2 = store.read_with_execution(parent_id, 2).await.unwrap(); + assert!( + exec2 + .iter() + .any(|e| matches!(e.kind, EventKind::OrchestrationFailed { .. })), + "execution 2 history must contain the OrchestrationFailed event" + ); + + rt.shutdown(None).await; +} +/// `sub::` marker validation that `Client::start_orchestration` enforces. An orchestration +/// may therefore use an id that a top-level client start would reject, and it is used as +/// the exact child instance id. +#[tokio::test] +async fn explicit_sub_orchestration_id_bypasses_reserved_marker_validation() { + let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); + + // An id a top-level client start would reject (contains the reserved `::sub::` infix), + // used verbatim as an explicit child id. + let explicit_id = "tenant::sub::99"; + + let parent = |ctx: OrchestrationContext, _input: String| async move { + let r = ctx + .schedule_sub_orchestration_with_id("Child", "tenant::sub::99", "child-input") + .await?; + Ok(format!("parent-got:{r}")) + }; + let child = |_ctx: OrchestrationContext, input: String| async move { Ok(format!("child-done:{input}")) }; + + let orchs = OrchestrationRegistry::builder() + .register("Parent", parent) + .register("Child", child) + .build(); + let acts = ActivityRegistry::builder().build(); + let rt = runtime::Runtime::start_with_store(store.clone(), acts, orchs).await; + let client = Client::new(store.clone()); + + // The same id is rejected for a top-level start. + let rejected = client.start_orchestration(explicit_id, "Parent", "").await; + assert!( + matches!(rejected, Err(duroxide::ClientError::InvalidInput { .. })), + "top-level start with the reserved marker must be rejected; got {rejected:?}" + ); + + // But the explicit sub-orchestration escape hatch allows it. + client.start_orchestration("tenant-parent", "Parent", "").await.unwrap(); + let status = client + .wait_for_orchestration("tenant-parent", Duration::from_secs(10)) + .await + .expect("parent using an explicit reserved-shaped child id should complete"); + assert!( + matches!(&status, OrchestrationStatus::Completed { output, .. } if output == "parent-got:child-done:child-input"), + "parent should complete with the explicit child id; got {status:?}" + ); + + // The child ran under the exact explicit id (no parent prefix). + let child_status = client.get_orchestration_status(explicit_id).await.unwrap(); + assert!( + matches!(child_status, OrchestrationStatus::Completed { .. }), + "explicit child id must be used verbatim; got {child_status:?}" + ); + + rt.shutdown(None).await; +} + +/// Poll until the orchestrator queue has drained and stayed empty across several reads, +/// so a transient parent-targeted `SubOrchFailed` (if one were wrongly enqueued) is given +/// a chance to appear before we assert its absence. +async fn wait_for_orchestrator_queue_drained(store: &Arc, timeout: Duration) { + let mgmt = store.as_management_capability().expect("management capability"); + let deadline = std::time::Instant::now() + timeout; + let mut consecutive_empty = 0; + loop { + let depth = mgmt.get_queue_depths().await.unwrap().orchestrator_queue; + if depth == 0 { + consecutive_empty += 1; + if consecutive_empty >= 5 { + return; + } + } else { + consecutive_empty = 0; + } + if std::time::Instant::now() >= deadline { + panic!("orchestrator queue did not drain within {timeout:?}"); + } + tokio::time::sleep(Duration::from_millis(50)).await; + } +} diff --git a/tests/scenarios/version_replay_bug.rs b/tests/scenarios/version_replay_bug.rs index 7236dad..485cdfc 100644 --- a/tests/scenarios/version_replay_bug.rs +++ b/tests/scenarios/version_replay_bug.rs @@ -326,6 +326,7 @@ fn unit_workitem_reader_completion_only_must_preserve_version() { input: "original-input".to_string(), parent_instance: Some("parent-inst".to_string()), parent_id: Some(42), + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -393,6 +394,7 @@ fn unit_workitem_reader_nth_execution_must_preserve_version() { input: "can-input".to_string(), parent_instance: Some("parent-for-v2".to_string()), parent_id: Some(99), + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -479,6 +481,7 @@ fn unit_workitem_reader_completion_only_tuple_field_analysis() { input: "original-input".to_string(), parent_instance: Some("parent-inst".to_string()), parent_id: Some(42), + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -550,6 +553,7 @@ fn unit_input_comes_from_extract_context_not_workitem_reader() { input: "ORIGINAL-INPUT-FROM-FIRST-TURN".to_string(), // The actual input parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -615,6 +619,7 @@ fn unit_nth_execution_history_starts_with_orchestration_started() { input: "can-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -825,6 +830,7 @@ fn unit_completion_only_replay_uses_nth_execution_input() { input: "CAN-INPUT-FOR-EXECUTION-2".to_string(), // Different from execution 1! parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/tests/sqlite_tests.rs b/tests/sqlite_tests.rs index b9ef01a..838141e 100644 --- a/tests/sqlite_tests.rs +++ b/tests/sqlite_tests.rs @@ -53,6 +53,7 @@ async fn test_sqlite_provider_basic() { input: r#"{"test": true}"#.to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }; @@ -88,6 +89,7 @@ async fn test_sqlite_provider_basic() { input: r#"{"test": true}"#.to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -140,6 +142,7 @@ async fn test_execution_status_completed() { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -209,6 +212,7 @@ async fn test_execution_status_failed() { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -420,6 +424,7 @@ async fn test_sqlite_file_concurrent_access() { input: format!("{{\"id\": {i}}}"), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }; @@ -457,6 +462,7 @@ async fn test_sqlite_file_concurrent_access() { input: format!("{{\"id\": {acked_count}}}"), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -586,6 +592,7 @@ async fn test_sqlite_provider_transactional() { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }; @@ -613,6 +620,7 @@ async fn test_sqlite_provider_transactional() { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -748,6 +756,7 @@ async fn test_sqlite_provider_timer_queue() { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -776,6 +785,7 @@ async fn test_sqlite_provider_timer_queue() { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -806,6 +816,7 @@ async fn test_execution_status_running() { input: "test".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }; @@ -831,6 +842,7 @@ async fn test_execution_status_running() { input: "test".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -883,6 +895,7 @@ async fn test_execution_output_captured_on_continue_as_new() { input: "test".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }; @@ -908,6 +921,7 @@ async fn test_execution_output_captured_on_continue_as_new() { input: "test".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -972,6 +986,7 @@ async fn test_instrumented_provider_semantic_equivalence() { version: None, parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: 1, }; @@ -1004,6 +1019,7 @@ async fn test_instrumented_provider_semantic_equivalence() { input: "test".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, },