Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,15 @@ async fn initialize_duroxide_runtime(
}
};

if let Err(e) = ensure_history_event_payload_identifiers(mgmt_pool).await {
log!(
"pg_durable: failed to install history payload identifier guard (will retry): {}",
e
);
tokio::time::sleep(retry_interval).await;
continue;
}

// Reuse the management pool for activities (graph loading, status updates).
// The former dedicated activity pool with its df.in_workflow hook is no
// longer needed — connect_as_user() sets that flag independently.
Expand All @@ -485,6 +494,73 @@ async fn initialize_duroxide_runtime(
}
}

/// Ensures cancellation/failure history payloads always carry the row identifiers.
///
/// Some upstream runtime events can contain an empty `instance_id` and/or
/// `execution_id = 0` in `event_data`. This trigger normalizes those fields on
/// insert so payload consumers can always correlate the event back to the
/// owning history row.
async fn ensure_history_event_payload_identifiers(pool: &sqlx::PgPool) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
CREATE OR REPLACE FUNCTION duroxide._normalize_history_event_payload_identifiers()
RETURNS trigger
LANGUAGE plpgsql
AS $$
DECLARE
v_data JSONB;
v_execution_text TEXT;
BEGIN
IF NEW.event_data IS NULL OR NEW.event_data = '' THEN
RETURN NEW;
END IF;

BEGIN
v_data := NEW.event_data::JSONB;
EXCEPTION WHEN OTHERS THEN
RETURN NEW;
END;

IF JSONB_TYPEOF(v_data) != 'object' THEN
RETURN NEW;
END IF;

IF COALESCE(v_data->>'instance_id', '') = '' THEN
v_data := JSONB_SET(v_data, '{instance_id}', TO_JSONB(NEW.instance_id), true);
END IF;

v_execution_text := v_data->>'execution_id';
IF v_execution_text IS NULL OR v_execution_text !~ '^[0-9]+$' THEN
v_data := JSONB_SET(v_data, '{execution_id}', TO_JSONB(NEW.execution_id), true);
ELSIF v_execution_text::BIGINT = 0 THEN
v_data := JSONB_SET(v_data, '{execution_id}', TO_JSONB(NEW.execution_id), true);
END IF;

NEW.event_data := v_data::TEXT;
RETURN NEW;
END;
$$;
"#,
)
.execute(pool)
.await?;

sqlx::query("DROP TRIGGER IF EXISTS trg_normalize_history_event_payload_identifiers ON duroxide.history")
.execute(pool)
.await?;

sqlx::query(
"CREATE TRIGGER trg_normalize_history_event_payload_identifiers
BEFORE INSERT ON duroxide.history
FOR EACH ROW
EXECUTE FUNCTION duroxide._normalize_history_event_payload_identifiers()",
)
.execute(pool)
.await?;

Ok(())
}

/// Write the epoch sentinel after a successful runtime init.
/// Returns the generated epoch_id on success.
async fn write_epoch_sentinel(pool: &sqlx::PgPool) -> Result<String, sqlx::Error> {
Expand Down
76 changes: 76 additions & 0 deletions tests/e2e/sql/23_cancel_history_payload_ids.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
-- Regression test: cancellation terminal history payload must preserve identifiers

SET SESSION AUTHORIZATION df_e2e_user;

CREATE TEMP TABLE _payload_cancel_instance (instance_id TEXT);

INSERT INTO _payload_cancel_instance
SELECT df.start(
-- Long sleep keeps the instance running long enough for deterministic cancellation.
df.sleep(300),
'cancel-history-payload-ids'
);

DO $$
DECLARE
inst_id TEXT;
status TEXT;
attempts INT := 0;
BEGIN
SELECT instance_id INTO inst_id FROM _payload_cancel_instance;

LOOP
SELECT s INTO status FROM df.status(inst_id) s;
EXIT WHEN lower(status) = 'running' OR attempts > 200;
PERFORM pg_sleep(0.1);
attempts := attempts + 1;
END LOOP;

IF lower(status) <> 'running' THEN
RAISE EXCEPTION 'TEST FAILED: instance did not reach running state before cancellation (status=%)', status;
END IF;

PERFORM df.cancel(inst_id, 'payload-id-regression-check');

attempts := 0;
LOOP
SELECT s INTO status FROM df.status(inst_id) s;
EXIT WHEN lower(status) IN ('cancelled', 'failed') OR attempts > 300;
PERFORM pg_sleep(0.1);
attempts := attempts + 1;
END LOOP;
END $$;

RESET SESSION AUTHORIZATION;

DO $$
DECLARE
inst_id TEXT;
bad_rows INT := 0;
BEGIN
SELECT instance_id INTO inst_id FROM _payload_cancel_instance;

SELECT COUNT(*)
INTO bad_rows
FROM duroxide.history h
WHERE h.instance_id = inst_id
AND (h.event_data::JSONB ? 'instance_id' OR h.event_data::JSONB ? 'execution_id')
AND (
(h.event_data::JSONB ? 'instance_id'
AND COALESCE(h.event_data::JSONB->>'instance_id', '') <> h.instance_id)
OR (h.event_data::JSONB ? 'execution_id'
AND (
COALESCE(h.event_data::JSONB->>'execution_id', '') !~ '^[0-9]+$'
OR (h.event_data::JSONB->>'execution_id')::BIGINT <> h.execution_id
))
);

IF bad_rows > 0 THEN
RAISE EXCEPTION 'TEST FAILED: found % history events with payload identifiers that do not match row identifiers for instance %',
bad_rows, inst_id;
END IF;

RAISE NOTICE 'TEST PASSED: cancellation history payload identifiers match row identifiers';
END $$;

SELECT 'TEST PASSED: cancel history payload identifiers' AS result;
Loading