Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub use types::Durofut;
/// by the background worker after successful initialization. Increment whenever
/// a new binary introduces new duroxide-pg migration scripts or any other
/// BGW-applied duroxide schema change.
pub const WORKER_SCHEMA_VERSION: i32 = 1;
pub const WORKER_SCHEMA_VERSION: i32 = 2;

::pgrx::pg_module_magic!(name, version);

Expand Down
126 changes: 126 additions & 0 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,15 @@ async fn initialize_duroxide_runtime(
}
};

if let Err(e) = install_canceled_status_normalization(mgmt_pool).await {
log!(
"pg_durable: failed to install canceled status normalization (will retry): {}",
e
);
tokio::time::sleep(retry_interval).await;
continue;
}

// Reuse the management pool for activities (graph loading, status updates).
// The former dedicated activity pool with its df.in_workflow hook is no
// longer needed — connect_as_user() sets that flag independently.
Expand All @@ -485,6 +494,123 @@ async fn initialize_duroxide_runtime(
}
}

/// Ensure canceled workflows remain distinguishable from failures in duroxide.executions.
///
/// Duroxide may record a cancellation terminal state as `Failed` while pg_durable
/// records `df.instances.status = 'cancelled'`. This hook normalizes that mismatch
/// at the storage boundary by writing the durable-store terminal value `Canceled`.
///
/// The df.instances trigger updates both `Running` and `Failed` execution rows:
/// - `Running` handles cancellation while the execution row is still in-flight
/// - `Failed` handles races where the runtime marked failure before normalization
async fn install_canceled_status_normalization(pool: &sqlx::PgPool) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
CREATE OR REPLACE FUNCTION df._pg_durable_mark_execution_canceled_from_instance()
RETURNS trigger
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog, df, duroxide
AS $$
BEGIN
IF NEW.status = 'cancelled' THEN
UPDATE duroxide.executions
SET status = 'Canceled'
WHERE instance_id = NEW.id
AND status IN ('Running', 'Failed');
END IF;
RETURN NEW;
END;
$$;
"#,
)
.execute(pool)
.await?;

sqlx::query(
r#"
DROP TRIGGER IF EXISTS trg_pg_durable_mark_execution_canceled_from_instance
ON df.instances
"#,
)
.execute(pool)
.await?;

sqlx::query(
r#"
CREATE TRIGGER trg_pg_durable_mark_execution_canceled_from_instance
AFTER UPDATE OF status ON df.instances
FOR EACH ROW
WHEN (NEW.status = 'cancelled' AND OLD.status IS DISTINCT FROM NEW.status)
EXECUTE FUNCTION df._pg_durable_mark_execution_canceled_from_instance()
"#,
)
.execute(pool)
.await?;

sqlx::query(
r#"
CREATE OR REPLACE FUNCTION duroxide._pg_durable_normalize_execution_status()
RETURNS trigger
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog, df, duroxide
AS $$
BEGIN
IF NEW.status = 'Failed'
AND EXISTS (
SELECT 1
FROM df.instances i
WHERE i.id = NEW.instance_id
AND i.status = 'cancelled'
) THEN
NEW.status := 'Canceled';
END IF;
RETURN NEW;
END;
$$;
"#,
)
.execute(pool)
.await?;

sqlx::query(
r#"
DROP TRIGGER IF EXISTS trg_pg_durable_normalize_execution_status
ON duroxide.executions
"#,
)
.execute(pool)
.await?;

sqlx::query(
r#"
CREATE TRIGGER trg_pg_durable_normalize_execution_status
BEFORE INSERT OR UPDATE OF status ON duroxide.executions
FOR EACH ROW
EXECUTE FUNCTION duroxide._pg_durable_normalize_execution_status()
"#,
)
.execute(pool)
.await?;

// Backfill rows from before the trigger existed (or from a previous binary).
sqlx::query(
r#"
UPDATE duroxide.executions e
SET status = 'Canceled'
FROM df.instances i
WHERE i.id = e.instance_id
AND i.status = 'cancelled'
AND e.status = 'Failed'
"#,
)
.execute(pool)
.await?;

Ok(())
}

/// Write the epoch sentinel after a successful runtime init.
/// Returns the generated epoch_id on success.
async fn write_epoch_sentinel(pool: &sqlx::PgPool) -> Result<String, sqlx::Error> {
Expand Down
33 changes: 33 additions & 0 deletions tests/e2e/sql/22_cancel_status_consistency.sql
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,39 @@ BEGIN
PERFORM df_e2e_assert_status_consistent(inst_id, 'cancelled', 'cancel_running_instance');
END $$;

-- Verify duroxide execution status is also cancellation (not Failed).
RESET SESSION AUTHORIZATION;
DO $$
DECLARE
inst_id TEXT;
exec_status TEXT;
attempts INT := 0;
BEGIN
SELECT instance_id INTO inst_id FROM _t_cancel;

LOOP
SELECT e.status INTO exec_status
FROM duroxide.executions e
WHERE e.instance_id = inst_id
ORDER BY e.execution_id DESC
LIMIT 1;

EXIT WHEN lower(coalesce(exec_status, '')) IN ('canceled', 'cancelled')
OR attempts > 300;

PERFORM pg_sleep(0.1);
attempts := attempts + 1;
END LOOP;

IF lower(coalesce(exec_status, '')) NOT IN ('canceled', 'cancelled') THEN
RAISE EXCEPTION 'FAILED [cancel_running_instance_duroxide]: executions.status returned %, expected canceled/cancelled',
exec_status;
END IF;

RAISE NOTICE 'PASSED [cancel_running_instance_duroxide]: executions.status = %', exec_status;
END $$;
SET SESSION AUTHORIZATION df_e2e_user;

DROP TABLE _t_cancel;
DROP TABLE _cancel_log;

Expand Down
Loading