From 6ebb5ea20d56dc6080daa0b59a84c3a8023772e0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 May 2026 14:02:42 +0000 Subject: [PATCH 1/2] Initial plan From 14cf7fefe7ce67b5b29bfede23d5986d8c028319 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 May 2026 14:28:17 +0000 Subject: [PATCH 2/2] Fix cancellation status normalization for duroxide executions Co-authored-by: pinodeca <32303022+pinodeca@users.noreply.github.com> --- src/lib.rs | 2 +- src/worker.rs | 126 ++++++++++++++++++ .../e2e/sql/22_cancel_status_consistency.sql | 33 +++++ 3 files changed, 160 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index be84c0d4..ce756746 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,7 +45,7 @@ pub use types::Durofut; /// by the background worker after successful initialization. Increment whenever /// a new binary introduces new duroxide-pg migration scripts or any other /// BGW-applied duroxide schema change. -pub const WORKER_SCHEMA_VERSION: i32 = 1; +pub const WORKER_SCHEMA_VERSION: i32 = 2; ::pgrx::pg_module_magic!(name, version); diff --git a/src/worker.rs b/src/worker.rs index 3a434c0b..f324a9c6 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -470,6 +470,15 @@ async fn initialize_duroxide_runtime( } }; + if let Err(e) = install_canceled_status_normalization(mgmt_pool).await { + log!( + "pg_durable: failed to install canceled status normalization (will retry): {}", + e + ); + tokio::time::sleep(retry_interval).await; + continue; + } + // Reuse the management pool for activities (graph loading, status updates). // The former dedicated activity pool with its df.in_workflow hook is no // longer needed — connect_as_user() sets that flag independently. @@ -485,6 +494,123 @@ async fn initialize_duroxide_runtime( } } +/// Ensure canceled workflows remain distinguishable from failures in duroxide.executions. +/// +/// Duroxide may record a cancellation terminal state as `Failed` while pg_durable +/// records `df.instances.status = 'cancelled'`. This hook normalizes that mismatch +/// at the storage boundary by writing the durable-store terminal value `Canceled`. +/// +/// The df.instances trigger updates both `Running` and `Failed` execution rows: +/// - `Running` handles cancellation while the execution row is still in-flight +/// - `Failed` handles races where the runtime marked failure before normalization +async fn install_canceled_status_normalization(pool: &sqlx::PgPool) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + CREATE OR REPLACE FUNCTION df._pg_durable_mark_execution_canceled_from_instance() + RETURNS trigger + LANGUAGE plpgsql + SECURITY DEFINER + SET search_path = pg_catalog, df, duroxide + AS $$ + BEGIN + IF NEW.status = 'cancelled' THEN + UPDATE duroxide.executions + SET status = 'Canceled' + WHERE instance_id = NEW.id + AND status IN ('Running', 'Failed'); + END IF; + RETURN NEW; + END; + $$; + "#, + ) + .execute(pool) + .await?; + + sqlx::query( + r#" + DROP TRIGGER IF EXISTS trg_pg_durable_mark_execution_canceled_from_instance + ON df.instances + "#, + ) + .execute(pool) + .await?; + + sqlx::query( + r#" + CREATE TRIGGER trg_pg_durable_mark_execution_canceled_from_instance + AFTER UPDATE OF status ON df.instances + FOR EACH ROW + WHEN (NEW.status = 'cancelled' AND OLD.status IS DISTINCT FROM NEW.status) + EXECUTE FUNCTION df._pg_durable_mark_execution_canceled_from_instance() + "#, + ) + .execute(pool) + .await?; + + sqlx::query( + r#" + CREATE OR REPLACE FUNCTION duroxide._pg_durable_normalize_execution_status() + RETURNS trigger + LANGUAGE plpgsql + SECURITY DEFINER + SET search_path = pg_catalog, df, duroxide + AS $$ + BEGIN + IF NEW.status = 'Failed' + AND EXISTS ( + SELECT 1 + FROM df.instances i + WHERE i.id = NEW.instance_id + AND i.status = 'cancelled' + ) THEN + NEW.status := 'Canceled'; + END IF; + RETURN NEW; + END; + $$; + "#, + ) + .execute(pool) + .await?; + + sqlx::query( + r#" + DROP TRIGGER IF EXISTS trg_pg_durable_normalize_execution_status + ON duroxide.executions + "#, + ) + .execute(pool) + .await?; + + sqlx::query( + r#" + CREATE TRIGGER trg_pg_durable_normalize_execution_status + BEFORE INSERT OR UPDATE OF status ON duroxide.executions + FOR EACH ROW + EXECUTE FUNCTION duroxide._pg_durable_normalize_execution_status() + "#, + ) + .execute(pool) + .await?; + + // Backfill rows from before the trigger existed (or from a previous binary). + sqlx::query( + r#" + UPDATE duroxide.executions e + SET status = 'Canceled' + FROM df.instances i + WHERE i.id = e.instance_id + AND i.status = 'cancelled' + AND e.status = 'Failed' + "#, + ) + .execute(pool) + .await?; + + Ok(()) +} + /// Write the epoch sentinel after a successful runtime init. /// Returns the generated epoch_id on success. async fn write_epoch_sentinel(pool: &sqlx::PgPool) -> Result { diff --git a/tests/e2e/sql/22_cancel_status_consistency.sql b/tests/e2e/sql/22_cancel_status_consistency.sql index 5a189c14..add37c4c 100644 --- a/tests/e2e/sql/22_cancel_status_consistency.sql +++ b/tests/e2e/sql/22_cancel_status_consistency.sql @@ -155,6 +155,39 @@ BEGIN PERFORM df_e2e_assert_status_consistent(inst_id, 'cancelled', 'cancel_running_instance'); END $$; +-- Verify duroxide execution status is also cancellation (not Failed). +RESET SESSION AUTHORIZATION; +DO $$ +DECLARE + inst_id TEXT; + exec_status TEXT; + attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _t_cancel; + + LOOP + SELECT e.status INTO exec_status + FROM duroxide.executions e + WHERE e.instance_id = inst_id + ORDER BY e.execution_id DESC + LIMIT 1; + + EXIT WHEN lower(coalesce(exec_status, '')) IN ('canceled', 'cancelled') + OR attempts > 300; + + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + + IF lower(coalesce(exec_status, '')) NOT IN ('canceled', 'cancelled') THEN + RAISE EXCEPTION 'FAILED [cancel_running_instance_duroxide]: executions.status returned %, expected canceled/cancelled', + exec_status; + END IF; + + RAISE NOTICE 'PASSED [cancel_running_instance_duroxide]: executions.status = %', exec_status; +END $$; +SET SESSION AUTHORIZATION df_e2e_user; + DROP TABLE _t_cancel; DROP TABLE _cancel_log;