diff --git a/src/worker.rs b/src/worker.rs index 3a434c0b..f67ff55c 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -470,6 +470,15 @@ async fn initialize_duroxide_runtime( } }; + if let Err(e) = ensure_history_event_payload_identifiers(mgmt_pool).await { + log!( + "pg_durable: failed to install history payload identifier guard (will retry): {}", + e + ); + tokio::time::sleep(retry_interval).await; + continue; + } + // Reuse the management pool for activities (graph loading, status updates). // The former dedicated activity pool with its df.in_workflow hook is no // longer needed — connect_as_user() sets that flag independently. @@ -485,6 +494,73 @@ async fn initialize_duroxide_runtime( } } +/// Ensures cancellation/failure history payloads always carry the row identifiers. +/// +/// Some upstream runtime events can contain an empty `instance_id` and/or +/// `execution_id = 0` in `event_data`. This trigger normalizes those fields on +/// insert so payload consumers can always correlate the event back to the +/// owning history row. +async fn ensure_history_event_payload_identifiers(pool: &sqlx::PgPool) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + CREATE OR REPLACE FUNCTION duroxide._normalize_history_event_payload_identifiers() + RETURNS trigger + LANGUAGE plpgsql + AS $$ + DECLARE + v_data JSONB; + v_execution_text TEXT; + BEGIN + IF NEW.event_data IS NULL OR NEW.event_data = '' THEN + RETURN NEW; + END IF; + + BEGIN + v_data := NEW.event_data::JSONB; + EXCEPTION WHEN OTHERS THEN + RETURN NEW; + END; + + IF JSONB_TYPEOF(v_data) != 'object' THEN + RETURN NEW; + END IF; + + IF COALESCE(v_data->>'instance_id', '') = '' THEN + v_data := JSONB_SET(v_data, '{instance_id}', TO_JSONB(NEW.instance_id), true); + END IF; + + v_execution_text := v_data->>'execution_id'; + IF v_execution_text IS NULL OR v_execution_text !~ '^[0-9]+$' THEN + v_data := JSONB_SET(v_data, '{execution_id}', TO_JSONB(NEW.execution_id), true); + ELSIF v_execution_text::BIGINT = 0 THEN + v_data := JSONB_SET(v_data, '{execution_id}', TO_JSONB(NEW.execution_id), true); + END IF; + + NEW.event_data := v_data::TEXT; + RETURN NEW; + END; + $$; + "#, + ) + .execute(pool) + .await?; + + sqlx::query("DROP TRIGGER IF EXISTS trg_normalize_history_event_payload_identifiers ON duroxide.history") + .execute(pool) + .await?; + + sqlx::query( + "CREATE TRIGGER trg_normalize_history_event_payload_identifiers + BEFORE INSERT ON duroxide.history + FOR EACH ROW + EXECUTE FUNCTION duroxide._normalize_history_event_payload_identifiers()", + ) + .execute(pool) + .await?; + + Ok(()) +} + /// Write the epoch sentinel after a successful runtime init. /// Returns the generated epoch_id on success. async fn write_epoch_sentinel(pool: &sqlx::PgPool) -> Result { diff --git a/tests/e2e/sql/23_cancel_history_payload_ids.sql b/tests/e2e/sql/23_cancel_history_payload_ids.sql new file mode 100644 index 00000000..bc33f352 --- /dev/null +++ b/tests/e2e/sql/23_cancel_history_payload_ids.sql @@ -0,0 +1,76 @@ +-- Regression test: cancellation terminal history payload must preserve identifiers + +SET SESSION AUTHORIZATION df_e2e_user; + +CREATE TEMP TABLE _payload_cancel_instance (instance_id TEXT); + +INSERT INTO _payload_cancel_instance +SELECT df.start( + -- Long sleep keeps the instance running long enough for deterministic cancellation. + df.sleep(300), + 'cancel-history-payload-ids' +); + +DO $$ +DECLARE + inst_id TEXT; + status TEXT; + attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _payload_cancel_instance; + + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + EXIT WHEN lower(status) = 'running' OR attempts > 200; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + + IF lower(status) <> 'running' THEN + RAISE EXCEPTION 'TEST FAILED: instance did not reach running state before cancellation (status=%)', status; + END IF; + + PERFORM df.cancel(inst_id, 'payload-id-regression-check'); + + attempts := 0; + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + EXIT WHEN lower(status) IN ('cancelled', 'failed') OR attempts > 300; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; +END $$; + +RESET SESSION AUTHORIZATION; + +DO $$ +DECLARE + inst_id TEXT; + bad_rows INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _payload_cancel_instance; + + SELECT COUNT(*) + INTO bad_rows + FROM duroxide.history h + WHERE h.instance_id = inst_id + AND (h.event_data::JSONB ? 'instance_id' OR h.event_data::JSONB ? 'execution_id') + AND ( + (h.event_data::JSONB ? 'instance_id' + AND COALESCE(h.event_data::JSONB->>'instance_id', '') <> h.instance_id) + OR (h.event_data::JSONB ? 'execution_id' + AND ( + COALESCE(h.event_data::JSONB->>'execution_id', '') !~ '^[0-9]+$' + OR (h.event_data::JSONB->>'execution_id')::BIGINT <> h.execution_id + )) + ); + + IF bad_rows > 0 THEN + RAISE EXCEPTION 'TEST FAILED: found % history events with payload identifiers that do not match row identifiers for instance %', + bad_rows, inst_id; + END IF; + + RAISE NOTICE 'TEST PASSED: cancellation history payload identifiers match row identifiers'; +END $$; + +SELECT 'TEST PASSED: cancel history payload identifiers' AS result;