Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/runtime/dispatchers/orchestration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ impl Runtime {

// Create a new history manager to append the failure event
let mut failure_history_mgr = HistoryManager::from_history(&item.history);
failure_history_mgr.append_failed(infra_error.clone());
failure_history_mgr.append_failed(&item.instance, item.execution_id, infra_error.clone());

// Try to commit the failure event
let failure_delta = failure_history_mgr.delta().to_vec();
Expand Down Expand Up @@ -1356,7 +1356,7 @@ impl Runtime {
}
}

history_mgr.append_failed(error.clone());
history_mgr.append_failed(&item.instance, item.execution_id, error.clone());

let metadata = ExecutionMetadata {
status: Some("Failed".to_string()),
Expand Down
4 changes: 2 additions & 2 deletions src/runtime/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl Runtime {
Self::emit_terminal_cancellation_breadcrumbs(history_mgr, instance, execution_id, &outstanding);

// Add failure event last
history_mgr.append_failed(details.clone());
history_mgr.append_failed(instance, execution_id, details.clone());

// Notify parent if this is a sub-orchestration
if let Some((parent_instance, parent_id)) = parent_link {
Expand Down Expand Up @@ -354,7 +354,7 @@ impl Runtime {
Self::emit_terminal_cancellation_breadcrumbs(history_mgr, instance, execution_id, &outstanding);

// Add failure event last, and propagate cancellation to outstanding sub-orchestrations.
history_mgr.append_failed(details.clone());
history_mgr.append_failed(instance, execution_id, details.clone());

for (_schedule_id, child_suffix) in &outstanding.sub_orchestrations {
orchestrator_items.push(WorkItem::CancelInstance {
Expand Down
8 changes: 3 additions & 5 deletions src/runtime/state_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,12 @@ impl HistoryManager {
}

/// Append an OrchestrationFailed event with the next event_id
pub fn append_failed(&mut self, details: crate::ErrorDetails) {
pub fn append_failed(&mut self, instance_id: &str, execution_id: u64, details: crate::ErrorDetails) {
let next_id = self.next_event_id();
// Note: instance_id and execution_id should be set from context
// For now, use placeholder values as this will be set by the caller
self.append(Event::with_event_id(
next_id,
"", // instance_id should be set by caller
0, // execution_id should be set by caller
instance_id,
execution_id,
None,
EventKind::OrchestrationFailed { details },
));
Expand Down
20 changes: 20 additions & 0 deletions tests/e2e_samples.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,26 @@ async fn sample_cancellation_parent_cascades_to_children_fs() {
.await;
assert!(ok, "timeout waiting for parent cancel failure");

// Regression: the terminal OrchestrationFailed event must preserve the
// instance_id/execution_id so telemetry pipelines reading event payloads
// can correlate the cancellation back to the workflow instance.
let parent_hist = store.read("inst-sample-cancel").await.unwrap_or_default();
let failed = parent_hist
.iter()
.rev()
.find(|e| matches!(&e.kind, EventKind::OrchestrationFailed { .. }))
.expect("parent history should contain OrchestrationFailed");
assert_eq!(
failed.instance_id,
"inst-sample-cancel",
"OrchestrationFailed must carry the instance_id"
);
assert_ne!(
failed.execution_id,
0,
"OrchestrationFailed must carry a non-placeholder execution_id"
);

// Find child instance (prefix is parent::sub::<id>) and check it was canceled too
let mgmt = store.as_management_capability().expect("ProviderAdmin required");
let children: Vec<String> = mgmt
Expand Down
Loading