diff --git a/docs/spec-atomic-start.md b/docs/spec-atomic-start.md new file mode 100644 index 0000000..683ecf1 --- /dev/null +++ b/docs/spec-atomic-start.md @@ -0,0 +1,153 @@ +# Keeping the control plane and the runtime consistent + +**Status:** Implemented and validated. Atomic `df.start` / `df.cancel` / +`df.signal`, plus a reconciler that repairs leftover drift. +**Author:** pg_durable team +**Date:** June 2026 + +## Problem + +A durable function has state in two places: + +- **pg_durable control plane** — `df.nodes` and `df.instances`, written on the + caller's PostgreSQL transaction. +- **duroxide runtime** — `_duroxide` queue/history/instance state, previously + written out-of-band through a separate connection. + +Those writes were not atomic. The visible failure was a rolled-back `df.start()`: +`df.*` rows rolled back, but the duroxide enqueue survived. The worker then retried +an orchestration whose graph no longer existed, waited up to 5 seconds for rows that +would never appear, and left an orphan behind. We reproduced this with rolled-back +starts leaving `_duroxide.instances` rows that had no matching `df.instances` row. + +`df.cancel()` and `df.signal()` had the same transaction-boundary problem for their +runtime enqueue: a rollback of the caller's transaction did not roll back the runtime +work. + +## Decision + +Implement **prevent + repair**: + +1. **Prevent:** enqueue `df.start()`, `df.cancel()`, and `df.signal()` runtime work + inside the caller's transaction via SPI. The control-plane writes and runtime + enqueue now commit or roll back together. +2. **Repair:** run a lightweight reconciler that removes leftover runtime orphans + and marks stuck control-plane rows failed. This catches legacy/fallback drift and + crash-time drift that transaction-local enqueue cannot prevent. + +## Alternatives considered + +| Option | Summary | Decision | +|---|---|---| +| Single source of truth in `_duroxide` | Move all state into the runtime schema and rebuild pg_durable reads/security over it. | Too large and migration-heavy for this fix. | +| Run pg_durable writes inside duroxide's transaction | Let duroxide-pg perform the `df.*` writes on its worker-pool connection. | Atomic, but not with the caller's transaction; breaks identity capture and row security expectations. | +| Run duroxide enqueue inside the caller's transaction | Keep both stores and use SPI to enqueue runtime work next to `df.*` writes. | Chosen primary fix. | +| Async reconciler only | Tolerate drift and repair it later. | Useful backstop, but insufficient alone. | + +## Design overview + +`df.start()` still creates the graph and instance rows in `df.*`. The final enqueue +step now happens through SQL on the same transaction, so a rollback undoes both the +control-plane rows and the runtime queue row. `df.cancel()` and `df.signal()` follow +the same transaction rule. + +The runtime queue is not writable by ordinary users, so the enqueue goes through +private `SECURITY DEFINER` wrappers in the `df` schema. These wrappers are granted +through `df.grant_usage()`, build the runtime work items themselves, and perform +their own authorization checks before writing to `_duroxide`. + +The start wrapper is intentionally **not** a general-purpose privileged runtime +entrypoint. It only starts the root function-graph orchestration and validates that +the input targets the same instance id. Cancel/signal wrappers authorize against the +instance owner. + +### Direct duroxide-pg coupling + +This is the abstraction break in the design. Most pg_durable code talks to the +runtime through duroxide's Rust provider/client API. That API uses a separate +connection pool, so it cannot share the caller's backend transaction. + +To get caller-transaction atomicity, this PR calls the duroxide-pg SQL surface +directly (`enqueue_orchestrator_work`, `delete_instances_atomic`, and selected +runtime tables). That only works with a PostgreSQL-backed provider. In practice, +pg_durable is itself a PostgreSQL extension whose runtime state lives in the same +database, so a non-PG provider is not a meaningful deployment target; still, the +code probes for the SQL surface and falls back to the old out-of-band path when it +is absent. + +### Signals + +`df.signal()` fans out to the root instance and any running sub-orchestrations, +because a child branch may be the one waiting on the signal. + +Duroxide does not buffer external events until an orchestration is ready to receive +them. Therefore a signal sent before the root runtime row exists is rejected instead +of returning `OK` and being silently skipped. Once the runtime row exists, the +signal enqueue is atomic with the caller's transaction. + +### Reconciler + +`df.reconcile()` is an admin-only backstop. It: + +- deletes orphaned runtime **root** instances whose full subtree has no matching + `df.instances` row; and +- marks stuck `df.instances` rows failed when there is no live runtime instance and + no queued start. + +The background worker keeps one reconciler durable loop running per cluster on +`pg_durable.reconciler_cron` (default `*/5 * * * *`; empty disables it), submitted +by the dedicated non-superuser role `df_reconciler`. + +## Behavior changes + +- `df.start()`, `df.cancel()`, and `df.signal()` now participate in the caller's + transaction. For example, `BEGIN; SELECT df.start(...); ROLLBACK;` no longer + starts the workflow on the atomic path. +- If the duroxide-pg SQL surface or the `df` wrappers are missing, pg_durable logs + and falls back to the previous non-atomic client path. The fallback is not emitted + as a client-visible SQL `WARNING`, so scripts that capture `SELECT df.start(...)` + output remain compatible. +- `df.wait_for_schedule` now records an `utc_now` event before the timer so repeated + schedule waits compute the next cron tick each generation. This fixes a loop + busy-loop bug, but changes the recorded replay event sequence. + +## Upgrade and compatibility + +The wrappers and `df.reconcile()` are `df`-schema objects, so they are included in +both fresh-install SQL and the `0.2.3 -> 0.2.4` upgrade script. The upgrade script +also updates `df.grant_usage()` / `df.revoke_usage()` and backfills wrapper EXECUTE +privileges to existing roles that already had explicit `USAGE` on schema `df`. + +Binary backward compatibility is preserved for old schemas that have not yet run +`ALTER EXTENSION UPDATE`: the new binary uses the atomic path only when both the +duroxide-pg enqueue function and the `df._enqueue_orchestrator_*` wrappers exist; +otherwise it falls back to the old out-of-band client path. + +There is no table data migration. + +**Upgrade caveat:** drain or restart any in-flight instance already waiting in a +`WAIT_SCHEDULE` node during upgrade. Those histories may replay expecting the old +sequence and fail as nondeterministic after the `utc_now` change. + +## Validation + +Automated coverage added in this PR: + +- `24_atomic_rollback` — rolled-back start/cancel/signal leave no runtime effect; + signaling before runtime materialization is rejected. +- `25_enqueue_wrapper_authz` — wrapper authorization and start-wrapper hardening. +- `26_reconcile_orphan_gc` — orphan root with child sub-orchestrations is collected + as a full subtree; healthy instances remain untouched. +- `scripts/test-upgrade.sh` — schema equivalence, binary compatibility, data + compatibility, and existing-role wrapper-grant backfill. + +Manual/targeted validation included signal fan-out, cancel consistency, reconciler +liveness, cron scheduling, formatting, clippy, pgspot, and upgrade tests. + +## Remaining risks / follow-up + +- The direct duroxide-pg SQL dependency is intentional but should remain small and + well documented. Moving the privileged enqueue surface into duroxide-pg would be a + cleaner long-term boundary. +- Reconciler grace/cadence and role provisioning may need tuning after operational + experience. diff --git a/docs/upgrade-testing.md b/docs/upgrade-testing.md index 586e359..03d5ca1 100644 --- a/docs/upgrade-testing.md +++ b/docs/upgrade-testing.md @@ -206,14 +206,23 @@ what the upgrade script handles, and any backward compatibility considerations. ### v0.2.3 → v0.2.4 #### Simplify `df.grant_usage()` — drop the explicit function allowlist -- **DDL change (df schema):** `df.grant_usage()` no longer loops over a hard-coded `func_sigs` array issuing `GRANT EXECUTE` per function. Fresh installs (`src/lib.rs`) and the upgrade script (`sql/pg_durable--0.2.3--0.2.4.sql`) both `CREATE OR REPLACE` the function with a body that grants `USAGE ON SCHEMA df` plus the table privileges, and conditionally grants `df.http()` / the admin helpers. The signature `df.grant_usage(text, boolean, boolean)` is unchanged. -- **DDL change (df schema):** `df.revoke_usage()` is made symmetric with the new `grant_usage()`. It no longer loops over every `df.*` function in `pg_proc` issuing `REVOKE EXECUTE` (which, post-simplification, only produced "no privileges could be revoked" warnings since ordinary functions are never granted per-function EXECUTE). The new body revokes only what `grant_usage()` grants: schema `USAGE`, EXECUTE on the sensitive functions (`df.http`, `df.grant_usage`, `df.revoke_usage`), and the table privileges. The signature `df.revoke_usage(text)` is unchanged. +- **DDL change (df schema):** `df.grant_usage()` no longer loops over a hard-coded `func_sigs` array issuing `GRANT EXECUTE` per function. Fresh installs (`src/lib.rs`) and the upgrade script (`sql/pg_durable--0.2.3--0.2.4.sql`) both `CREATE OR REPLACE` the function with a body that grants `USAGE ON SCHEMA df` plus the table privileges, conditionally grants `df.http()` / the admin helpers, and explicitly grants the new private enqueue wrappers (`df._enqueue_orchestrator_start`, `df._enqueue_orchestrator_cancel`, `df._enqueue_orchestrator_signal`) to ordinary df users. The signature `df.grant_usage(text, boolean, boolean)` is unchanged. +- **DDL change (df schema):** `df.revoke_usage()` is made symmetric with the new `grant_usage()`. It no longer loops over every `df.*` function in `pg_proc` issuing `REVOKE EXECUTE` (which, post-simplification, only produced "no privileges could be revoked" warnings since ordinary functions are never granted per-function EXECUTE). The new body revokes only what `grant_usage()` grants: schema `USAGE`, EXECUTE on the sensitive/admin functions, EXECUTE on the private enqueue wrappers, and the table privileges. The signature `df.revoke_usage(text)` is unchanged. - **Rationale:** The ordinary `df.*` functions retain PostgreSQL's default PUBLIC `EXECUTE`, so schema `USAGE` is the real access gate; the per-function grants/revokes were redundant. The sensitive functions have PUBLIC `EXECUTE` revoked at install time and were never in the allowlist, so their protection is unchanged. - **Behavioral note:** A newly added `df.*` function is now callable by any role with schema `USAGE` by default. To keep a future function private, `REVOKE EXECUTE ... FROM PUBLIC` at install time and grant it explicitly in `df.grant_usage()`. - **Legacy cleanup caveat:** A role that was granted under the *old* `grant_usage()` (explicit per-function EXECUTE) and is later revoked under the new `revoke_usage()` may retain inert EXECUTE entries on ordinary functions. These are harmless — revoking schema `USAGE` fully locks the role out — and clear on the next drop/regrant cycle. - **Scenario A considerations:** Signatures are identical on the fresh-install and upgrade paths (only the bodies differ), so the function-signature equivalence contract passes. - **Scenario B1/B2 considerations:** No schema/data migration and no new objects. The replaced bodies work against the existing schema and change no privileges already granted. +#### Atomic in-transaction enqueue wrappers + `df.reconcile()` +- **DDL change (df schema):** Adds three private `SECURITY DEFINER` wrappers: `df._enqueue_orchestrator_start(text, text, text)`, `df._enqueue_orchestrator_cancel(text, text)`, and `df._enqueue_orchestrator_signal(text, text, text)`. Fresh installs and the upgrade script create the same functions and `REVOKE EXECUTE ... FROM PUBLIC`; `df.grant_usage()` grants them explicitly to df users because `df.start()` / `df.cancel()` / `df.signal()` call them via SPI as the caller. The upgrade script also backfills these wrapper grants to roles that already had explicit `USAGE` on schema `df` before `ALTER EXTENSION UPDATE`, preserving grant option where present. +- **DDL change (df schema):** Adds admin-only `df.reconcile(integer)` (`SECURITY DEFINER`, `REVOKE EXECUTE ... FROM PUBLIC`) to delete orphaned duroxide instance subtrees and mark stuck `df.instances` rows failed. The background worker starts it through a built-in durable loop as the dedicated `df_reconciler` role. +- **Provider coupling:** These wrappers and `df.reconcile()` call the duroxide-pg SQL surface directly (`enqueue_orchestrator_work`, `delete_instances_atomic`, and `_duroxide` tables). This is intentional: only direct SQL via SPI can share the caller's transaction. Non-PG providers (or older schemas without the wrappers) use the legacy out-of-band fallback. +- **Scenario A considerations:** Fresh-install and upgrade schemas must expose the same new `df` functions and grants. The upgrade script creates the wrappers/reconciler, updates `grant_usage()`/`revoke_usage()`, and backfills existing df users in the same release, so upgraded users can call `df.start()` / `df.cancel()` / `df.signal()` immediately after `ALTER EXTENSION UPDATE`. +- **Scenario B1 considerations:** The new `.so` remains compatible with pre-upgrade 0.2.3 schemas because `df.start()` / `df.cancel()` / `df.signal()` use the in-transaction path only when both the duroxide-pg provider SQL function and the `df._enqueue_orchestrator_*` wrappers exist; otherwise they log and fall back to the old out-of-band client path. This fallback is not client-visible as a SQL `WARNING`, so it does not contaminate scripts that capture `SELECT df.start(...)` output. +- **Scenario B2 considerations:** No data migration. Existing instances and queued work keep their original behavior; the new atomic semantics apply to calls made after the schema has the wrappers. +- **In-flight schedule caveat:** This release also changes `df.wait_for_schedule` to compute from the orchestration's recorded clock. That fixes a loop busy-loop bug, but changes the recorded replay event sequence (`utc_now` is recorded before the timer). Any in-flight instance already waiting in a `WAIT_SCHEDULE` node when the binary changes should be drained or restarted during upgrade; otherwise it may replay expecting the old sequence and fail as nondeterministic. + #### Rename `df.wait_for_completion()` to `df.await_instance()` - **DDL change (df schema):** Adds `df.await_instance(text, integer)` as the canonical C binding for the helper formerly exposed as `df.wait_for_completion(text, integer)`. The old SQL function remains present and the new `.so` continues exporting `wait_for_completion_wrapper` as a shim, so existing customer scripts keep working. - **Grant behavior:** No explicit grant migration is required. PostgreSQL grants `EXECUTE` on newly created functions to `PUBLIC` by default, and `df.await_instance` is not a sensitive helper whose default PUBLIC grant is revoked. diff --git a/scripts/test-upgrade.sh b/scripts/test-upgrade.sh index 1778543..bb0cfeb 100755 --- a/scripts/test-upgrade.sh +++ b/scripts/test-upgrade.sh @@ -914,6 +914,7 @@ echo "" B2_PRE_INSTANCE_ID="" B2_INFLIGHT_INSTANCE_ID="" B2_POST_INSTANCE_ID="" +B2_PRE_GRANTED_ROLE="durable_b2_pre_grant" test_b2_data_survives_upgrade() { # Step 1: Install previous version and create test data @@ -924,6 +925,14 @@ test_b2_data_survives_upgrade() { assert_sql_equals "SELECT df.clearvars();" "OK" || return 1 assert_sql_equals "SELECT df.setvar('b2_key', 'b2_value');" "OK" || return 1 + # Role granted under the previous schema. The upgrade must backfill EXECUTE + # on new private wrappers so this existing df user can keep calling + # df.start()/df.cancel()/df.signal() without re-running df.grant_usage(). + run_sql_capture "DROP OWNED BY ${B2_PRE_GRANTED_ROLE};" >/dev/null 2>&1 || true + run_sql_capture "DROP ROLE IF EXISTS ${B2_PRE_GRANTED_ROLE};" >/dev/null 2>&1 || true + run_sql_capture "CREATE ROLE ${B2_PRE_GRANTED_ROLE} LOGIN;" >/dev/null || return 1 + run_sql_capture "SELECT df.grant_usage('${B2_PRE_GRANTED_ROLE}');" >/dev/null || return 1 + B2_PRE_INSTANCE_ID=$(run_sql_capture "SELECT df.start('INSERT INTO test_upgrade_b2_log (kind, msg) VALUES (''pre'', ''{b2_key}'') RETURNING msg', 'b2-pre-upgrade');") || return 1 B2_INFLIGHT_INSTANCE_ID=$(run_sql_capture "SELECT df.start(df.sleep(2) ~> 'SELECT ''b2-running'' AS value', 'b2-inflight');") || return 1 @@ -968,6 +977,12 @@ test_b2_new_data_after_upgrade() { assert_sql_equals "SELECT msg FROM test_upgrade_b2_log WHERE kind = 'post' ORDER BY id DESC LIMIT 1;" "new_value" } +test_b2_existing_grants_after_upgrade() { + assert_sql_equals "SELECT has_function_privilege('${B2_PRE_GRANTED_ROLE}', 'df._enqueue_orchestrator_start(text, text, text)', 'EXECUTE');" "t" && + assert_sql_equals "SELECT has_function_privilege('${B2_PRE_GRANTED_ROLE}', 'df._enqueue_orchestrator_cancel(text, text)', 'EXECUTE');" "t" && + assert_sql_equals "SELECT has_function_privilege('${B2_PRE_GRANTED_ROLE}', 'df._enqueue_orchestrator_signal(text, text, text)', 'EXECUTE');" "t" +} + test_b2_grant_usage_after_upgrade() { # Regression guard for #110: after ALTER EXTENSION UPDATE, df.debug_connection() # must be gone from the catalog. Scenario A only compares function name/args/ @@ -996,6 +1011,7 @@ test_b2_grant_usage_after_upgrade() { # Clean up the probe role. run_sql_capture "DROP OWNED BY ${probe_role}; DROP ROLE IF EXISTS ${probe_role};" >/dev/null 2>&1 || true + run_sql_capture "DROP OWNED BY ${B2_PRE_GRANTED_ROLE}; DROP ROLE IF EXISTS ${B2_PRE_GRANTED_ROLE};" >/dev/null 2>&1 || true } if [ "$HAS_COMPAT_PREV" = true ]; then @@ -1003,6 +1019,7 @@ if [ "$HAS_COMPAT_PREV" = true ]; then run_test "B2: Pre-upgrade instance remains queryable" test_b2_pre_upgrade_instance_after_upgrade run_test "B2: In-flight work completes after upgrade" test_b2_inflight_work_after_upgrade run_test "B2: New data and execution after upgrade" test_b2_new_data_after_upgrade + run_test "B2: Existing df users retain wrapper privileges after upgrade" test_b2_existing_grants_after_upgrade run_test "B2: df.grant_usage() works and df.debug_connection() is gone after upgrade" test_b2_grant_usage_after_upgrade fi diff --git a/sql/pg_durable--0.2.3--0.2.4.sql b/sql/pg_durable--0.2.3--0.2.4.sql index efaab86..947c9b7 100644 --- a/sql/pg_durable--0.2.3--0.2.4.sql +++ b/sql/pg_durable--0.2.3--0.2.4.sql @@ -40,7 +40,10 @@ DROP FUNCTION IF EXISTS df.debug_connection(); -- -- The sensitive functions (df.http, df.grant_usage, df.revoke_usage) had their -- PUBLIC EXECUTE revoked at install time and were never in the list; they are --- still granted explicitly here, so their protection is unchanged. +-- still granted explicitly here, so their protection is unchanged. The new +-- in-transaction enqueue wrappers are also private (REVOKE FROM PUBLIC below) +-- and are granted explicitly to df users because df.start()/df.cancel()/df.signal() +-- call them via SPI as the calling role. -- -- This CREATE OR REPLACE brings pre-existing installs in line with fresh -- 0.2.4 installs (see src/lib.rs). The new body works against the existing @@ -76,6 +79,14 @@ BEGIN EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df.revoke_usage(text) TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; END IF; + -- In-transaction enqueue wrappers — SECURITY DEFINER, revoked from PUBLIC at + -- install. Granted unconditionally to every df user because df.start() / + -- df.cancel() / df.signal() call them via SPI as the calling role; their own + -- internal authorization checks gate access to other users' instances. + EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df._enqueue_orchestrator_start(text, text, text) TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; + EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df._enqueue_orchestrator_cancel(text, text) TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; + EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df._enqueue_orchestrator_signal(text, text, text) TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; + -- Table privileges EXECUTE pg_catalog.format('GRANT SELECT ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT UPDATE (status, updated_at) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; @@ -100,10 +111,11 @@ $fn$; -- the role out of every ordinary df.* function. -- -- The new body undoes exactly what grant_usage() grants: schema USAGE, EXECUTE --- on the sensitive functions, and the table privileges. Note: a role granted --- under the OLD grant_usage() (explicit per-function EXECUTE) may retain inert --- EXECUTE entries on ordinary functions after this revoke; they are harmless --- because schema USAGE is gone, and clear on the next drop/regrant cycle. +-- on the sensitive/admin functions, EXECUTE on the new enqueue wrappers, and +-- the table privileges. Note: a role granted under the OLD grant_usage() +-- (explicit per-function EXECUTE) may retain inert EXECUTE entries on ordinary +-- functions after this revoke; they are harmless because schema USAGE is gone, +-- and clear on the next drop/regrant cycle. -- ============================================================================ CREATE OR REPLACE FUNCTION df.revoke_usage(p_role TEXT) RETURNS VOID @@ -134,6 +146,24 @@ BEGIN NULL; END; + -- In-transaction enqueue wrappers (granted unconditionally by grant_usage()). + -- A delegated admin may not be the grantor of these; skip if so. + BEGIN + EXECUTE pg_catalog.format('REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_start(text, text, text) FROM %I CASCADE', p_role); + EXCEPTION WHEN insufficient_privilege THEN + NULL; + END; + BEGIN + EXECUTE pg_catalog.format('REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_cancel(text, text) FROM %I CASCADE', p_role); + EXCEPTION WHEN insufficient_privilege THEN + NULL; + END; + BEGIN + EXECUTE pg_catalog.format('REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_signal(text, text, text) FROM %I CASCADE', p_role); + EXCEPTION WHEN insufficient_privilege THEN + NULL; + END; + -- Table privileges. -- Column-level revokes must match the column-level grants from grant_usage(). EXECUTE pg_catalog.format('REVOKE SELECT, INSERT, UPDATE, DELETE ON df.vars FROM %I CASCADE', p_role); @@ -166,3 +196,287 @@ CREATE FUNCTION df."await_instance"( STRICT LANGUAGE c AS 'MODULE_PATHNAME', 'await_instance_wrapper'; + +-- ============================================================================ +-- In-transaction enqueue wrappers + reconciler (atomic df.start/cancel/signal, +-- and df.reconcile()). +-- +-- df.start()/df.cancel()/df.signal() now enqueue the duroxide work item over SPI +-- inside the CALLER'S transaction (so a rollback undoes the enqueue), through +-- these SECURITY DEFINER wrappers. The orchestrator queue is owner-only, so the +-- wrappers perform the privileged INSERT; each builds the work item server-side +-- and authorizes the caller (df.start by brand-new-instance state; df.cancel / +-- df.signal by pg_has_role(session_user, , 'MEMBER')). They are revoked +-- from PUBLIC and granted to df users by df.grant_usage() (above). +-- +-- df.reconcile() is the admin-only backstop that deletes orphaned duroxide +-- instance subtrees with no df.instances row and fails stuck df.instances rows. +-- +-- The wrappers resolve the duroxide schema via df.duroxide_schema() (defined in +-- the 0.2.2→0.2.3 upgrade) and require the duroxide-pg provider; df.start / +-- df.cancel / df.signal fall back to the out-of-band path when it is absent, so +-- this upgrade is safe on a fresh '_duroxide' or a legacy 'duroxide' schema. +-- ============================================================================ +CREATE FUNCTION df._enqueue_orchestrator_start( + p_instance_id text, + p_orchestration text, + p_input text) +RETURNS void +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $fn$ +DECLARE + sch text := df.duroxide_schema(); + work_item text; + v_blocked boolean; +BEGIN + -- This wrapper is not a generic privileged "start any orchestration" entry + -- point. df.start() passes the root graph-executor name and FunctionInput + -- JSON; reject anything else so a caller cannot use the SECURITY DEFINER + -- privilege to enqueue an internal sub-orchestration with crafted input. + IF p_orchestration OPERATOR(pg_catalog.<>) 'pg_durable::orchestration::execute-function-graph' THEN + RAISE EXCEPTION 'pg_durable: invalid start orchestration %', p_orchestration + USING ERRCODE = 'invalid_parameter_value'; + END IF; + + IF (p_input::jsonb ->> 'instance_id') IS DISTINCT FROM p_instance_id THEN + RAISE EXCEPTION 'pg_durable: start input instance_id does not match %', p_instance_id + USING ERRCODE = 'invalid_parameter_value'; + END IF; + + -- Authorization. This runs as the (privileged) definer, so it must not + -- trust the caller to only target their own instance. Permit the enqueue + -- only for a brand-new, not-yet-started instance: a 'pending' df.instances + -- row with no orchestrator-queue entry and no duroxide instance. A df user + -- can create such a pending instance directly (not only through df.start), + -- so this is a same-owner/not-yet-started check rather than proof that the + -- row was inserted in this transaction. The wrapper is safe because it also + -- fixes the orchestration to the root graph executor and validates the input + -- instance id, so callers cannot start internal orchestrations or target + -- someone else's already-started instance. + EXECUTE pg_catalog.format( + 'SELECT NOT EXISTS (SELECT 1 FROM df.instances i WHERE i.id = $1 AND i.status = ''pending'') ' + ' OR EXISTS (SELECT 1 FROM %I.orchestrator_queue q WHERE q.instance_id = $1) ' + ' OR EXISTS (SELECT 1 FROM %I.instances d WHERE d.instance_id = $1)', + sch, sch) + INTO v_blocked + USING p_instance_id; + + IF v_blocked THEN + RAISE EXCEPTION 'pg_durable: not authorized to enqueue a start for instance %', p_instance_id + USING ERRCODE = 'insufficient_privilege'; + END IF; + + -- Build the StartOrchestration work item server-side so the caller cannot + -- choose the work-item variant (no CancelInstance/ExternalRaised/etc.) or + -- target a different instance. Mirrors duroxide's WorkItem::StartOrchestration. + work_item := pg_catalog.json_build_object( + 'StartOrchestration', pg_catalog.json_build_object( + 'instance', p_instance_id, + 'orchestration', 'pg_durable::orchestration::execute-function-graph', + 'input', p_input, + 'version', NULL, + 'parent_instance', NULL, + 'parent_id', NULL, + 'execution_id', 1))::text; + + EXECUTE pg_catalog.format('SELECT %I.enqueue_orchestrator_work($1, $2, $3)', sch) + USING p_instance_id, work_item, pg_catalog.now(); +END; +$fn$; + +REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_start(text, text, text) FROM PUBLIC; + +CREATE FUNCTION df._enqueue_orchestrator_cancel(p_instance_id text, p_reason text) +RETURNS void +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $fn$ +DECLARE + sch text := df.duroxide_schema(); + owner_oid oid; +BEGIN + SELECT i.submitted_by::oid INTO owner_oid FROM df.instances i WHERE i.id = p_instance_id; + IF owner_oid IS NULL OR NOT pg_catalog.pg_has_role(session_user, owner_oid, 'MEMBER') THEN + RAISE EXCEPTION 'pg_durable: not authorized to cancel instance %', p_instance_id + USING ERRCODE = 'insufficient_privilege'; + END IF; + + EXECUTE pg_catalog.format('SELECT %I.enqueue_orchestrator_work($1, $2, $3)', sch) + USING p_instance_id, + pg_catalog.json_build_object('CancelInstance', + pg_catalog.json_build_object('instance', p_instance_id, 'reason', p_reason))::text, + pg_catalog.now(); +END; +$fn$; + +REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_cancel(text, text) FROM PUBLIC; + +CREATE FUNCTION df._enqueue_orchestrator_signal(p_instance_id text, p_name text, p_data text) +RETURNS void +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $fn$ +DECLARE + sch text := df.duroxide_schema(); + owner_oid oid; + root_exists boolean; +BEGIN + SELECT i.submitted_by::oid INTO owner_oid FROM df.instances i WHERE i.id = p_instance_id; + IF owner_oid IS NULL OR NOT pg_catalog.pg_has_role(session_user, owner_oid, 'MEMBER') THEN + RAISE EXCEPTION 'pg_durable: not authorized to signal instance %', p_instance_id + USING ERRCODE = 'insufficient_privilege'; + END IF; + + -- Duroxide does not buffer external events until an orchestration has a + -- pending subscription. If the root runtime row is not materialized yet, a + -- signal would be accepted but dropped before the workflow can observe it. + EXECUTE pg_catalog.format( + 'SELECT EXISTS (SELECT 1 FROM %I.instances WHERE instance_id = $1)', sch) + INTO root_exists + USING p_instance_id; + IF NOT root_exists THEN + RAISE EXCEPTION 'pg_durable: instance % is not ready to receive signals', p_instance_id + USING ERRCODE = 'object_not_in_prerequisite_state'; + END IF; + + -- Raise the event for the target instance and every RUNNING descendant + -- (a sub-orchestration — JOIN/RACE branch or loop generation — may be the one + -- waiting on the signal), mirroring the out-of-band fan-out. %1$I = schema. + EXECUTE pg_catalog.format( + 'INSERT INTO %1$I.orchestrator_queue (instance_id, work_item, visible_at, created_at) ' + 'SELECT t.instance_id, ' + ' pg_catalog.json_build_object(''ExternalRaised'', ' + ' pg_catalog.json_build_object(''instance'', t.instance_id, ''name'', $2, ''data'', $3))::text, ' + ' pg_catalog.now(), pg_catalog.now() ' + 'FROM ( ' + ' WITH RECURSIVE tree AS ( ' + ' SELECT i.instance_id, i.current_execution_id, true AS is_root ' + ' FROM %1$I.instances i WHERE i.instance_id = $1 ' + ' UNION ' + ' SELECT c.instance_id, c.current_execution_id, false ' + ' FROM %1$I.instances c JOIN tree p ON c.parent_instance_id = p.instance_id ' + ' ) ' + ' SELECT tr.instance_id ' + ' FROM tree tr ' + ' LEFT JOIN %1$I.executions e ' + ' ON e.instance_id = tr.instance_id AND e.execution_id = tr.current_execution_id ' + ' WHERE tr.is_root OR pg_catalog.lower(COALESCE(e.status, '''')) = ''running'' ' + ') t', + sch) + USING p_instance_id, p_name, p_data; +END; +$fn$; + +REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_signal(text, text, text) FROM PUBLIC; + +-- Backfill wrapper EXECUTE to roles that already had df usage before ALTER +-- EXTENSION UPDATE. New calls to df.grant_usage() grant these wrappers via the +-- function body above, but existing users would otherwise lose df.start() / +-- df.cancel() / df.signal() when the new .so chooses the atomic path. +DO $$ +DECLARE + r RECORD; + grant_opt TEXT; +BEGIN + FOR r IN + SELECT + pg_catalog.quote_ident(pg_catalog.pg_get_userbyid(a.grantee)) AS grantee, + pg_catalog.bool_or(a.is_grantable) AS with_grant_option + FROM pg_catalog.pg_namespace n + CROSS JOIN LATERAL pg_catalog.aclexplode(n.nspacl) AS a + WHERE n.nspname OPERATOR(pg_catalog.=) 'df' + AND a.privilege_type OPERATOR(pg_catalog.=) 'USAGE' + AND a.grantee OPERATOR(pg_catalog.<>) 0 -- skip PUBLIC + GROUP BY a.grantee + LOOP + grant_opt := CASE WHEN r.with_grant_option THEN ' WITH GRANT OPTION' ELSE '' END; + EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df._enqueue_orchestrator_start(text, text, text) TO %s', r.grantee) OPERATOR(pg_catalog.||) grant_opt; + EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df._enqueue_orchestrator_cancel(text, text) TO %s', r.grantee) OPERATOR(pg_catalog.||) grant_opt; + EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df._enqueue_orchestrator_signal(text, text, text) TO %s', r.grantee) OPERATOR(pg_catalog.||) grant_opt; + END LOOP; +END $$; + +CREATE FUNCTION df.reconcile(p_grace_seconds integer DEFAULT 60) +RETURNS TABLE(duroxide_orphans_deleted bigint, stuck_instances_failed bigint) +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $fn$ +DECLARE + sch text := df.duroxide_schema(); + orphan_ids text[]; + deleted bigint := 0; + stuck bigint := 0; +BEGIN + -- 1) Delete orphaned duroxide subtrees: every duroxide instance whose ROOT + -- ancestor has no df.instances row and is older than the grace window. + -- We must gather the FULL subtree (root + all descendants) because + -- delete_instances_atomic refuses (even with force) to delete a parent + -- whose children are not also in the list. Sub-orchestrations (JOIN/RACE + -- branches, loop generations) have no df.instances row and would be + -- mis-detected as roots if we keyed on "no df row" alone, so the orphan + -- seed is restricted to parent_instance_id IS NULL. + -- Wrapped so a GC failure never aborts reconcile or kills the built-in + -- reconciler loop. + BEGIN + EXECUTE pg_catalog.format( + 'WITH RECURSIVE orphan_root AS ( ' + ' SELECT d.instance_id ' + ' FROM %1$I.instances d ' + ' LEFT JOIN df.instances i ON i.id = d.instance_id ' + ' WHERE i.id IS NULL ' + ' AND d.parent_instance_id IS NULL ' + ' AND d.created_at < pg_catalog.now() - pg_catalog.make_interval(secs => $1) ' + '), subtree AS ( ' + ' SELECT instance_id FROM orphan_root ' + ' UNION ' + ' SELECT c.instance_id FROM %1$I.instances c ' + ' JOIN subtree s ON c.parent_instance_id = s.instance_id ' + ') SELECT pg_catalog.array_agg(instance_id) FROM subtree', + sch) + INTO orphan_ids + USING p_grace_seconds; + + IF orphan_ids IS NOT NULL AND pg_catalog.array_length(orphan_ids, 1) > 0 THEN + EXECUTE pg_catalog.format( + 'SELECT instances_deleted FROM %I.delete_instances_atomic($1, $2)', sch) + INTO deleted + USING orphan_ids, true; + END IF; + EXCEPTION WHEN OTHERS THEN + deleted := 0; + RAISE WARNING 'pg_durable: reconcile orphan-GC pass failed: %', SQLERRM; + END; + + -- 2) df.instances stuck non-terminal with no live duroxide instance and no + -- queued start (lost enqueue) → mark failed. The duroxide queue row + -- persists (locked) until ack, and the instance row is created at ack, so + -- a healthy in-flight start always matches one of the NOT EXISTS guards + -- and is never failed here. Best-effort; wrapped like step 1. + BEGIN + EXECUTE pg_catalog.format( + 'UPDATE df.instances i ' + 'SET status = ''failed'', updated_at = pg_catalog.now() ' + 'WHERE i.status IN (''pending'', ''running'') ' + ' AND i.updated_at < pg_catalog.now() - pg_catalog.make_interval(secs => $1) ' + ' AND NOT EXISTS (SELECT 1 FROM %1$I.instances d WHERE d.instance_id = i.id) ' + ' AND NOT EXISTS (SELECT 1 FROM %1$I.orchestrator_queue q WHERE q.instance_id = i.id)', + sch) + USING p_grace_seconds; + GET DIAGNOSTICS stuck = ROW_COUNT; + EXCEPTION WHEN OTHERS THEN + stuck := 0; + RAISE WARNING 'pg_durable: reconcile stuck-failover pass failed: %', SQLERRM; + END; + + duroxide_orphans_deleted := deleted; + stuck_instances_failed := stuck; + RETURN NEXT; +END; +$fn$; + +REVOKE EXECUTE ON FUNCTION df.reconcile(integer) FROM PUBLIC; diff --git a/src/client.rs b/src/client.rs index 3d10dfb..443080f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -36,7 +36,7 @@ thread_local! { /// row, or has a `schema_version` below `WORKER_SCHEMA_VERSION`. This is a fast /// SPI read called once per session on the first call to any `df.*` function /// that needs the duroxide client. -fn is_worker_ready() -> bool { +pub(crate) fn is_worker_ready() -> bool { let schema = backend_duroxide_schema(); // First check if the readiness table exists via the catalogue. Querying @@ -194,7 +194,10 @@ async fn list_running_descendants(client: &Client, root_instance_id: &str) -> Ve descendants } -/// Start a durable function via the shared PostgreSQL store. +/// Start a durable function via the duroxide client (out-of-band, on the cached +/// pool). This is the provider-agnostic fallback used by df.start() when the +/// duroxide-pg SQL enqueue surface is not available; it is NOT atomic with the +/// caller's transaction. pub fn start_durable_function( function_name: &str, instance_id: &str, diff --git a/src/dsl.rs b/src/dsl.rs index d0d044f..6475420 100644 --- a/src/dsl.rs +++ b/src/dsl.rs @@ -12,7 +12,6 @@ use std::str::FromStr; use std::cell::RefCell; use std::time::Instant; -use crate::client::start_durable_function; use crate::types::{ mark_non_future_helper_call, short_id, validate_result_name, Durofut, FunctionInput, }; @@ -625,9 +624,32 @@ pub fn signal(instance_id: &str, signal_name: &str, signal_data: default!(&str, pgrx::error!("Instance not found or access denied: {}", instance_id); } - match raise_external_event(instance_id, signal_name, &signal_data) { - Ok(_) => "OK".to_string(), - Err(e) => pgrx::error!("Failed to send signal: {}", e), + // Enqueue the ExternalRaised work item. Prefer the in-transaction SPI path + // (atomic with the caller's transaction; fans out to running descendants in + // the wrapper) when the duroxide-pg SQL surface is present; otherwise fall + // back to the out-of-band client path. + let schema = crate::types::backend_duroxide_schema(); + if in_tx_enqueue_supported(schema) { + if let Err(e) = Spi::run_with_args( + "SELECT df._enqueue_orchestrator_signal($1, $2, $3)", + &[ + instance_id.into(), + signal_name.into(), + signal_data.as_str().into(), + ], + ) { + pgrx::error!("Failed to send signal: {:?}", e); + } + "OK".to_string() + } else { + pgrx::log!( + "pg_durable: df.signal() is using the non-atomic fallback enqueue \ + (duroxide-pg SQL surface not detected)" + ); + match raise_external_event(instance_id, signal_name, &signal_data) { + Ok(_) => "OK".to_string(), + Err(e) => pgrx::error!("Failed to send signal: {}", e), + } } } @@ -635,6 +657,41 @@ pub fn signal(instance_id: &str, signal_name: &str, signal_data: default!(&str, // Orchestration Control Functions // ============================================================================ +/// True when both required pieces of the in-transaction enqueue path exist: +/// the duroxide-pg SQL enqueue surface in the provider schema and the df-schema +/// SECURITY DEFINER wrappers created by the extension/upgrade script. When +/// false (a non-pg provider, or a schema that predates the wrappers), callers +/// fall back to the out-of-band client path. The catalog probe never raises, so +/// it is safe in a backend session even when the schema is absent. +fn in_tx_enqueue_supported(schema: &str) -> bool { + Spi::get_one_with_args::( + "SELECT \ + EXISTS(SELECT 1 FROM pg_catalog.pg_proc p \ + JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace \ + WHERE n.nspname = $1 \ + AND p.proname = 'enqueue_orchestrator_work') \ + AND EXISTS(SELECT 1 FROM pg_catalog.pg_proc p \ + JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace \ + WHERE n.nspname = 'df' \ + AND p.proname = '_enqueue_orchestrator_start' \ + AND p.pronargs = 3) \ + AND EXISTS(SELECT 1 FROM pg_catalog.pg_proc p \ + JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace \ + WHERE n.nspname = 'df' \ + AND p.proname = '_enqueue_orchestrator_cancel' \ + AND p.pronargs = 2) \ + AND EXISTS(SELECT 1 FROM pg_catalog.pg_proc p \ + JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace \ + WHERE n.nspname = 'df' \ + AND p.proname = '_enqueue_orchestrator_signal' \ + AND p.pronargs = 3)", + &[schema.into()], + ) + .ok() + .flatten() + .unwrap_or(false) +} + /// Starts a durable SQL function. /// The fut argument can be either Durofut JSON or plain SQL string (auto-wrapped). /// Variables from df.vars are captured and passed to the orchestration. @@ -924,24 +981,70 @@ pub fn start( vars }); - // Start the orchestration via duroxide + // Start the orchestration durably. let input = FunctionInput { instance_id: instance_id.clone(), label: label.map(|s| s.to_string()), vars, loop_iteration: 0, }; - let input_json = serde_json::to_string(&input).unwrap_or(instance_id.clone()); + let input_json = serde_json::to_string(&input).unwrap_or_else(|_| instance_id.clone()); + + // Prefer the atomic in-transaction enqueue (Option 3) when the duroxide-pg + // SQL provider is present; otherwise fall back to the provider-agnostic + // out-of-band client path (legacy behavior). + let schema = crate::types::backend_duroxide_schema(); + if in_tx_enqueue_supported(schema) { + // Option 3 (atomic start): enqueue the StartOrchestration work item via + // SPI inside the CALLER'S transaction, so the df.nodes/df.instances + // INSERTs above and this orchestrator-queue INSERT commit or roll back + // together. A rolled-back df.start() leaves no duroxide orphan, and the + // worker only observes the queue row after the df.* rows are visible + // (which also removes the load-function-graph "wait for commit" race). + // + // Fail clearly (and atomically — the df.* INSERTs roll back) if the + // worker has not yet initialised the duroxide schema. + if !crate::client::is_worker_ready() { + pgrx::error!( + "pg_durable background worker not yet initialized — try again in a moment" + ); + } - if let Err(e) = start_durable_function( - crate::orchestrations::execute_function_graph::NAME, - &instance_id, - &input_json, - ) { + // The SECURITY DEFINER wrapper builds the StartOrchestration work item + // server-side from these trusted arguments (the caller cannot choose the + // work-item variant or target a foreign instance) and performs the + // privileged orchestrator-queue INSERT on the caller's transaction. + // Raising on failure aborts the whole df.start atomically. + if let Err(e) = Spi::run_with_args( + "SELECT df._enqueue_orchestrator_start($1, $2, $3)", + &[ + instance_id.as_str().into(), + crate::orchestrations::execute_function_graph::NAME.into(), + input_json.as_str().into(), + ], + ) { + pgrx::error!("Failed to enqueue durable function start: {:?}", e); + } + } else { + // Fallback: no duroxide-pg SQL enqueue surface detected (a non-pg + // provider, or a schema predating the wrapper). Enqueue out-of-band via + // the duroxide client. NOTE: this path is NOT atomic with the caller's + // transaction — a rollback will NOT undo the start. Warn so the + // non-atomic semantics are observable. pgrx::log!( - "pg_durable: Warning - failed to start durable function: {}", - e + "pg_durable: df.start() is using the non-atomic fallback enqueue \ + (duroxide-pg SQL surface not detected); a rollback will not undo this start" ); + if let Err(e) = crate::client::start_durable_function( + crate::orchestrations::execute_function_graph::NAME, + &instance_id, + &input_json, + ) { + pgrx::log!( + "pg_durable: Warning - failed to start durable function: {}", + e + ); + } } instance_id @@ -965,8 +1068,26 @@ pub fn cancel(instance_id: &str, reason: default!(&str, "'Cancelled by user'")) pgrx::error!("Instance not found or access denied: {}", instance_id); } - if let Err(e) = cancel_durable_function(instance_id, reason) { - return format!("Failed to cancel: {e}"); + // Enqueue the CancelInstance work item and flip the status mirror together. + // On the in-transaction path the enqueue and the status UPDATE commit + // atomically (no df.* / duroxide divergence on cancel); otherwise fall back + // to the out-of-band client path. + let schema = crate::types::backend_duroxide_schema(); + if in_tx_enqueue_supported(schema) { + if let Err(e) = Spi::run_with_args( + "SELECT df._enqueue_orchestrator_cancel($1, $2)", + &[instance_id.into(), reason.into()], + ) { + pgrx::error!("Failed to cancel: {:?}", e); + } + } else { + pgrx::log!( + "pg_durable: df.cancel() is using the non-atomic fallback enqueue \ + (duroxide-pg SQL surface not detected)" + ); + if let Err(e) = cancel_durable_function(instance_id, reason) { + return format!("Failed to cancel: {e}"); + } } // Update the instance status to 'cancelled' via SPI only when the instance is not diff --git a/src/lib.rs b/src/lib.rs index edb0a65..2debab9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,6 +29,12 @@ pub static EXECUTION_ACQUIRE_TIMEOUT: GucSetting = GucSetting::::new(3 /// functions are explicitly desired. See docs/superuser_guc.md. pub static ENABLE_SUPERUSER_INSTANCES: GucSetting = GucSetting::::new(false); +/// Cron schedule for the built-in durable reconciler (Option 4 / df.reconcile()). +/// The background worker ensures one reconciler instance per cluster, running on +/// this schedule. Set to an empty string to disable the built-in reconciler. +pub static RECONCILER_CRON: GucSetting> = + GucSetting::>::new(Some(c"*/5 * * * *")); + // Module declarations pub mod activities; pub mod client; @@ -135,6 +141,15 @@ pub extern "C-unwind" fn _PG_init() { GucFlags::SUPERUSER_ONLY, ); + GucRegistry::define_string_guc( + c"pg_durable.reconciler_cron", + c"Cron schedule for the built-in durable reconciler (df.reconcile()); empty disables it", + c"The background worker keeps one reconciler instance running on this schedule to repair residual df.* / duroxide divergence. Empty string disables the built-in reconciler. Requires server restart to change.", + &RECONCILER_CRON, + GucContext::Postmaster, + GucFlags::default(), + ); + worker::register_background_worker(); } @@ -373,6 +388,14 @@ BEGIN EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df.revoke_usage(text) TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; END IF; + -- In-transaction enqueue wrappers — SECURITY DEFINER, revoked from PUBLIC at + -- install. Granted unconditionally to every df user because df.start() / + -- df.cancel() / df.signal() call them via SPI as the calling role; their own + -- internal authorization checks gate access to other users' instances. + EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df._enqueue_orchestrator_start(text, text, text) TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; + EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df._enqueue_orchestrator_cancel(text, text) TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; + EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df._enqueue_orchestrator_signal(text, text, text) TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; + -- Table privileges EXECUTE pg_catalog.format('GRANT SELECT ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT UPDATE (status, updated_at) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; @@ -417,6 +440,24 @@ BEGIN NULL; END; + -- In-transaction enqueue wrappers (granted unconditionally by grant_usage()). + -- A delegated admin may not be the grantor of these; skip if so. + BEGIN + EXECUTE pg_catalog.format('REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_start(text, text, text) FROM %I CASCADE', p_role); + EXCEPTION WHEN insufficient_privilege THEN + NULL; + END; + BEGIN + EXECUTE pg_catalog.format('REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_cancel(text, text) FROM %I CASCADE', p_role); + EXCEPTION WHEN insufficient_privilege THEN + NULL; + END; + BEGIN + EXECUTE pg_catalog.format('REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_signal(text, text, text) FROM %I CASCADE', p_role); + EXCEPTION WHEN insufficient_privilege THEN + NULL; + END; + -- Table privileges. -- Column-level revokes must match the column-level grants from grant_usage(). EXECUTE pg_catalog.format('REVOKE SELECT, INSERT, UPDATE, DELETE ON df.vars FROM %I CASCADE', p_role); @@ -466,6 +507,332 @@ REVOKE EXECUTE ON FUNCTION df.revoke_usage(text) FROM PUBLIC; requires = ["create_tables", dsl::http] ); +// ============================================================================ +// Atomic start enqueue (Option 3) — SECURITY DEFINER wrapper +// ============================================================================ +// +// df.start() enqueues the StartOrchestration work item by calling this wrapper +// via SPI, inside the caller's transaction. The duroxide orchestrator queue +// grants INSERT to its owner only, so the privileged INSERT must run as a +// definer that owns the duroxide tables; the call still commits/rolls back as +// part of the caller's transaction, giving an atomic df.start(). +// +// PROVIDER REQUIREMENT: this path only works with the duroxide-pg (SQL-backed) +// provider, because it calls the provider's `enqueue_orchestrator_work` SQL +// function directly. df.start() probes for that function and falls back to the +// provider-agnostic out-of-band client path when it is absent (see dsl::start). +// +// The schema is resolved dynamically via df.duroxide_schema() ('_duroxide' on +// fresh installs, legacy 'duroxide' on upgraded ones); %I quotes the identifier. +// The function is created by the worker's migrations (not this extension); +// plpgsql resolves it at call time. +// +// SECURITY: this wrapper is SECURITY DEFINER and granted to every df user via +// df.grant_usage() (df.start() runs SECURITY INVOKER and calls it via SPI). To +// avoid handing users an arbitrary-work-item enqueue primitive (which would let +// a caller forge CancelInstance/ExternalRaised/etc. against another user's +// instance), it (1) constructs the StartOrchestration work item server-side from +// the caller's arguments — the caller cannot choose the variant or a foreign +// target — and (2) authorizes the enqueue only for a brand-new, not-yet-started +// instance (pending df.instances row, no queue entry, no duroxide instance), +// which under atomic-start semantics is reachable only for the row df.start just +// inserted in the current transaction. Hardening direction (move the entrypoint +// into duroxide-pg, owned by the schema owner) is in docs/spec-atomic-start.md. +extension_sql!( + r#" +CREATE FUNCTION df._enqueue_orchestrator_start( + p_instance_id text, + p_orchestration text, + p_input text) +RETURNS void +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $fn$ +DECLARE + sch text := df.duroxide_schema(); + work_item text; + v_blocked boolean; +BEGIN + -- This wrapper is not a generic privileged "start any orchestration" entry + -- point. df.start() passes the root graph-executor name and FunctionInput + -- JSON; reject anything else so a caller cannot use the SECURITY DEFINER + -- privilege to enqueue an internal sub-orchestration with crafted input. + IF p_orchestration OPERATOR(pg_catalog.<>) 'pg_durable::orchestration::execute-function-graph' THEN + RAISE EXCEPTION 'pg_durable: invalid start orchestration %', p_orchestration + USING ERRCODE = 'invalid_parameter_value'; + END IF; + + IF (p_input::jsonb ->> 'instance_id') IS DISTINCT FROM p_instance_id THEN + RAISE EXCEPTION 'pg_durable: start input instance_id does not match %', p_instance_id + USING ERRCODE = 'invalid_parameter_value'; + END IF; + + -- Authorization. This runs as the (privileged) definer, so it must not + -- trust the caller to only target their own instance. Permit the enqueue + -- only for a brand-new, not-yet-started instance: a 'pending' df.instances + -- row with no orchestrator-queue entry and no duroxide instance. A df user + -- can create such a pending instance directly (not only through df.start), + -- so this is a same-owner/not-yet-started check rather than proof that the + -- row was inserted in this transaction. The wrapper is safe because it also + -- fixes the orchestration to the root graph executor and validates the input + -- instance id, so callers cannot start internal orchestrations or target + -- someone else's already-started instance. + EXECUTE pg_catalog.format( + 'SELECT NOT EXISTS (SELECT 1 FROM df.instances i WHERE i.id = $1 AND i.status = ''pending'') ' + ' OR EXISTS (SELECT 1 FROM %I.orchestrator_queue q WHERE q.instance_id = $1) ' + ' OR EXISTS (SELECT 1 FROM %I.instances d WHERE d.instance_id = $1)', + sch, sch) + INTO v_blocked + USING p_instance_id; + + IF v_blocked THEN + RAISE EXCEPTION 'pg_durable: not authorized to enqueue a start for instance %', p_instance_id + USING ERRCODE = 'insufficient_privilege'; + END IF; + + -- Build the StartOrchestration work item server-side so the caller cannot + -- choose the work-item variant (no CancelInstance/ExternalRaised/etc.) or + -- target a different instance. Mirrors duroxide's WorkItem::StartOrchestration. + work_item := pg_catalog.json_build_object( + 'StartOrchestration', pg_catalog.json_build_object( + 'instance', p_instance_id, + 'orchestration', 'pg_durable::orchestration::execute-function-graph', + 'input', p_input, + 'version', NULL, + 'parent_instance', NULL, + 'parent_id', NULL, + 'execution_id', 1))::text; + + EXECUTE pg_catalog.format('SELECT %I.enqueue_orchestrator_work($1, $2, $3)', sch) + USING p_instance_id, work_item, pg_catalog.now(); +END; +$fn$; + +REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_start(text, text, text) FROM PUBLIC; +"#, + name = "atomic_start_enqueue", + requires = ["create_tables"] +); + +// ============================================================================ +// In-transaction signal / cancel enqueue (Part 1, extended) — SECURITY DEFINER +// ============================================================================ +// +// df.signal()/df.cancel() enqueue their work items (ExternalRaised / +// CancelInstance) via SPI through these wrappers, inside the caller's +// transaction, instead of out-of-band on the duroxide client pool. Both target +// an already-committed instance, so the start wrapper's "brand-new instance" +// guard does not apply; instead they authorize on ownership. +// +// AUTHORIZATION: these are SECURITY DEFINER (the orchestrator queue is +// owner-only) and granted to every df user via df.grant_usage(), so they must +// not let a caller signal/cancel a foreign instance. current_user is the definer +// here, so ownership is checked against session_user (the unforgeable +// authenticated role) plus membership in the instance's submitted_by — i.e. the +// caller's session must be able to act as the instance owner. The work items are +// built server-side from the arguments (no opaque caller-supplied work item). +extension_sql!( + r#" +CREATE FUNCTION df._enqueue_orchestrator_cancel(p_instance_id text, p_reason text) +RETURNS void +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $fn$ +DECLARE + sch text := df.duroxide_schema(); + owner_oid oid; +BEGIN + SELECT i.submitted_by::oid INTO owner_oid FROM df.instances i WHERE i.id = p_instance_id; + IF owner_oid IS NULL OR NOT pg_catalog.pg_has_role(session_user, owner_oid, 'MEMBER') THEN + RAISE EXCEPTION 'pg_durable: not authorized to cancel instance %', p_instance_id + USING ERRCODE = 'insufficient_privilege'; + END IF; + + EXECUTE pg_catalog.format('SELECT %I.enqueue_orchestrator_work($1, $2, $3)', sch) + USING p_instance_id, + pg_catalog.json_build_object('CancelInstance', + pg_catalog.json_build_object('instance', p_instance_id, 'reason', p_reason))::text, + pg_catalog.now(); +END; +$fn$; + +REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_cancel(text, text) FROM PUBLIC; + +CREATE FUNCTION df._enqueue_orchestrator_signal(p_instance_id text, p_name text, p_data text) +RETURNS void +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $fn$ +DECLARE + sch text := df.duroxide_schema(); + owner_oid oid; + root_exists boolean; +BEGIN + SELECT i.submitted_by::oid INTO owner_oid FROM df.instances i WHERE i.id = p_instance_id; + IF owner_oid IS NULL OR NOT pg_catalog.pg_has_role(session_user, owner_oid, 'MEMBER') THEN + RAISE EXCEPTION 'pg_durable: not authorized to signal instance %', p_instance_id + USING ERRCODE = 'insufficient_privilege'; + END IF; + + -- Duroxide does not buffer external events until an orchestration has a + -- pending subscription. If the root runtime row is not materialized yet, a + -- signal would be accepted but dropped before the workflow can observe it. + EXECUTE pg_catalog.format( + 'SELECT EXISTS (SELECT 1 FROM %I.instances WHERE instance_id = $1)', sch) + INTO root_exists + USING p_instance_id; + IF NOT root_exists THEN + RAISE EXCEPTION 'pg_durable: instance % is not ready to receive signals', p_instance_id + USING ERRCODE = 'object_not_in_prerequisite_state'; + END IF; + + -- Raise the event for the target instance and every RUNNING descendant + -- (a sub-orchestration — JOIN/RACE branch or loop generation — may be the one + -- waiting on the signal), mirroring the out-of-band fan-out. %1$I = schema. + EXECUTE pg_catalog.format( + 'INSERT INTO %1$I.orchestrator_queue (instance_id, work_item, visible_at, created_at) ' + 'SELECT t.instance_id, ' + ' pg_catalog.json_build_object(''ExternalRaised'', ' + ' pg_catalog.json_build_object(''instance'', t.instance_id, ''name'', $2, ''data'', $3))::text, ' + ' pg_catalog.now(), pg_catalog.now() ' + 'FROM ( ' + ' WITH RECURSIVE tree AS ( ' + ' SELECT i.instance_id, i.current_execution_id, true AS is_root ' + ' FROM %1$I.instances i WHERE i.instance_id = $1 ' + ' UNION ' + ' SELECT c.instance_id, c.current_execution_id, false ' + ' FROM %1$I.instances c JOIN tree p ON c.parent_instance_id = p.instance_id ' + ' ) ' + ' SELECT tr.instance_id ' + ' FROM tree tr ' + ' LEFT JOIN %1$I.executions e ' + ' ON e.instance_id = tr.instance_id AND e.execution_id = tr.current_execution_id ' + ' WHERE tr.is_root OR pg_catalog.lower(COALESCE(e.status, '''')) = ''running'' ' + ') t', + sch) + USING p_instance_id, p_name, p_data; +END; +$fn$; + +REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_signal(text, text, text) FROM PUBLIC; +"#, + name = "atomic_signal_cancel_enqueue", + requires = ["create_tables"] +); + +// ============================================================================ +// Reconciler (Option 4) — repair residual df.* / duroxide divergence +// ============================================================================ +// +// Atomic df.start()/df.signal()/df.cancel() commit their duroxide enqueue with +// the caller's transaction, but some divergence can still arise: legacy orphans, +// the non-atomic fallback path, and crashes mid-execution. df.reconcile() is a +// best-effort garbage collector for that residue. It is +// admin-only (EXECUTE revoked from PUBLIC) and SECURITY DEFINER so it can touch +// the duroxide-owned tables and all users' df.instances. +// +// The background worker keeps one reconciler instance running automatically +// (worker::ensure_reconciler), dogfooding pg_durable as a durable cron loop: +// df.start(df.loop(df.seq('SELECT * FROM df.reconcile()', +// df.wait_for_schedule())), +// 'df_reconciler') +// submitted by the dedicated non-superuser role df_reconciler. Set +// pg_durable.reconciler_cron = '' to disable it. +// +// Only ROOT duroxide instances (parent_instance_id IS NULL) are considered — +// sub-orchestrations (JOIN/RACE branches, loop generations) intentionally have +// no df.instances row and must not be collected. A grace window avoids racing +// legitimately in-flight operations. +extension_sql!( + r#" +CREATE FUNCTION df.reconcile(p_grace_seconds integer DEFAULT 60) +RETURNS TABLE(duroxide_orphans_deleted bigint, stuck_instances_failed bigint) +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $fn$ +DECLARE + sch text := df.duroxide_schema(); + orphan_ids text[]; + deleted bigint := 0; + stuck bigint := 0; +BEGIN + -- 1) Delete orphaned duroxide subtrees: every duroxide instance whose ROOT + -- ancestor has no df.instances row and is older than the grace window. + -- We must gather the FULL subtree (root + all descendants) because + -- delete_instances_atomic refuses (even with force) to delete a parent + -- whose children are not also in the list. Sub-orchestrations (JOIN/RACE + -- branches, loop generations) have no df.instances row and would be + -- mis-detected as roots if we keyed on "no df row" alone, so the orphan + -- seed is restricted to parent_instance_id IS NULL. + -- Wrapped so a GC failure never aborts reconcile or kills the built-in + -- reconciler loop. + BEGIN + EXECUTE pg_catalog.format( + 'WITH RECURSIVE orphan_root AS ( ' + ' SELECT d.instance_id ' + ' FROM %1$I.instances d ' + ' LEFT JOIN df.instances i ON i.id = d.instance_id ' + ' WHERE i.id IS NULL ' + ' AND d.parent_instance_id IS NULL ' + ' AND d.created_at < pg_catalog.now() - pg_catalog.make_interval(secs => $1) ' + '), subtree AS ( ' + ' SELECT instance_id FROM orphan_root ' + ' UNION ' + ' SELECT c.instance_id FROM %1$I.instances c ' + ' JOIN subtree s ON c.parent_instance_id = s.instance_id ' + ') SELECT pg_catalog.array_agg(instance_id) FROM subtree', + sch) + INTO orphan_ids + USING p_grace_seconds; + + IF orphan_ids IS NOT NULL AND pg_catalog.array_length(orphan_ids, 1) > 0 THEN + EXECUTE pg_catalog.format( + 'SELECT instances_deleted FROM %I.delete_instances_atomic($1, $2)', sch) + INTO deleted + USING orphan_ids, true; + END IF; + EXCEPTION WHEN OTHERS THEN + deleted := 0; + RAISE WARNING 'pg_durable: reconcile orphan-GC pass failed: %', SQLERRM; + END; + + -- 2) df.instances stuck non-terminal with no live duroxide instance and no + -- queued start (lost enqueue) → mark failed. The duroxide queue row + -- persists (locked) until ack, and the instance row is created at ack, so + -- a healthy in-flight start always matches one of the NOT EXISTS guards + -- and is never failed here. Best-effort; wrapped like step 1. + BEGIN + EXECUTE pg_catalog.format( + 'UPDATE df.instances i ' + 'SET status = ''failed'', updated_at = pg_catalog.now() ' + 'WHERE i.status IN (''pending'', ''running'') ' + ' AND i.updated_at < pg_catalog.now() - pg_catalog.make_interval(secs => $1) ' + ' AND NOT EXISTS (SELECT 1 FROM %1$I.instances d WHERE d.instance_id = i.id) ' + ' AND NOT EXISTS (SELECT 1 FROM %1$I.orchestrator_queue q WHERE q.instance_id = i.id)', + sch) + USING p_grace_seconds; + GET DIAGNOSTICS stuck = ROW_COUNT; + EXCEPTION WHEN OTHERS THEN + stuck := 0; + RAISE WARNING 'pg_durable: reconcile stuck-failover pass failed: %', SQLERRM; + END; + + duroxide_orphans_deleted := deleted; + stuck_instances_failed := stuck; + RETURN NEXT; +END; +$fn$; + +REVOKE EXECUTE ON FUNCTION df.reconcile(integer) FROM PUBLIC; +"#, + name = "reconcile", + requires = ["create_tables"] +); + // ============================================================================ // Extension Validation (must run before duroxide schema creation) // ============================================================================ diff --git a/src/orchestrations/execute_function_graph.rs b/src/orchestrations/execute_function_graph.rs index e593610..e9bfb99 100644 --- a/src/orchestrations/execute_function_graph.rs +++ b/src/orchestrations/execute_function_graph.rs @@ -513,16 +513,31 @@ async fn execute_wait_schedule_node( let config: serde_json::Value = serde_json::from_str(config_str) .map_err(|e| format!("Invalid WAIT_SCHEDULE config: {e}"))?; - let wait_seconds = config["wait_seconds"] - .as_u64() - .ok_or_else(|| "WAIT_SCHEDULE missing wait_seconds".to_string())?; - - let cron_expr = config["cron_expr"].as_str().unwrap_or("?"); + let baked_wait_seconds = config["wait_seconds"].as_u64().unwrap_or(0); + let cron_expr = config["cron_expr"].as_str(); + + // Recompute the delay from the orchestration's deterministic clock on each + // execution, so a df.loop around df.wait_for_schedule waits until the NEXT + // cron tick rather than replaying the fixed offset baked at DSL-build time + // (which would make every loop generation wait the same — possibly ~0s — + // interval, busy-looping). ctx.utc_now() is recorded in history, so this + // stays deterministic on replay. Falls back to the baked seconds if the + // clock read or cron parse is unavailable. + let wait = match (cron_expr, ctx.utc_now().await) { + (Some(cron), Ok(now_st)) => { + let now_dt: chrono::DateTime = now_st.into(); + crate::types::calculate_cron_wait_from(cron, now_dt) + .unwrap_or_else(|_| Duration::from_secs(baked_wait_seconds)) + } + _ => Duration::from_secs(baked_wait_seconds), + }; ctx.trace_info(format!( - "Waiting {wait_seconds} seconds until schedule: {cron_expr}" + "Waiting {}s until schedule: {}", + wait.as_secs(), + cron_expr.unwrap_or("?") )); - ctx.schedule_timer(Duration::from_secs(wait_seconds)).await; + ctx.schedule_timer(wait).await; Ok(r#"{"scheduled": true}"#.to_string()) } diff --git a/src/types.rs b/src/types.rs index 4fdab4f..c3f750d 100644 --- a/src/types.rs +++ b/src/types.rs @@ -353,25 +353,29 @@ pub fn worker_provider_config( config } -/// Calculate the duration until the next cron schedule match -pub fn calculate_cron_wait(cron_expr: &str) -> Result { +/// Calculate the duration until the next cron schedule match, relative to a +/// caller-supplied `now`. Used by the WAIT_SCHEDULE node executor with the +/// orchestration's deterministic clock so a looped `df.wait_for_schedule` waits +/// until the *next* tick each generation instead of replaying a fixed offset. +pub fn calculate_cron_wait_from(cron_expr: &str, now: DateTime) -> Result { let cron_with_seconds = format!("0 {cron_expr}"); let schedule = CronSchedule::from_str(&cron_with_seconds) .map_err(|e| format!("Invalid cron expression '{cron_expr}': {e}"))?; - let now: DateTime = Utc::now(); - let next = schedule - .upcoming(Utc) + .after(&now) .next() .ok_or_else(|| "No upcoming schedule found".to_string())?; - let duration = (next - now) + (next - now) .to_std() - .map_err(|_| "Failed to calculate wait duration".to_string())?; + .map_err(|_| "Failed to calculate wait duration".to_string()) +} - Ok(duration) +/// Calculate the duration until the next cron schedule match +pub fn calculate_cron_wait(cron_expr: &str) -> Result { + calculate_cron_wait_from(cron_expr, Utc::now()) } /// Evaluate a condition result to determine if it's truthy. diff --git a/src/worker.rs b/src/worker.rs index 976a3fd..b270249 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -226,6 +226,10 @@ async fn run_duroxide_runtime() { } }; + // Ensure the built-in durable reconciler is running for this epoch + // (Option 4). Idempotent: starts one only if none is pending/running. + ensure_reconciler(&mgmt_pool).await; + run_until_extension_dropped_or_shutdown( &poll_pool, duroxide_runtime, @@ -240,6 +244,125 @@ async fn run_duroxide_runtime() { poll_pool.close().await; } +/// Built-in durable reconciler (Option 4). Ensures one reconciler instance is +/// running per cluster: an infinite durable loop that calls `df.reconcile()` on +/// the `pg_durable.reconciler_cron` schedule. +/// +/// Idempotent — does nothing if a reconciler is already pending/running, and +/// (re)starts one if it has died. Called once per epoch and then periodically +/// from the steady-state poll loop, so a reconciler that fails mid-epoch is +/// restarted within the poll interval rather than only at the next epoch. The +/// instance is submitted by a dedicated **non-superuser** role +/// (`df_reconciler`) granted only df usage plus EXECUTE on the SECURITY DEFINER +/// `df.reconcile()`. Using a non-superuser identity avoids the +/// enable_superuser_instances guard, and bounds the blast radius to "trigger GC" +/// even if the instance were forged. +async fn ensure_reconciler(pool: &sqlx::PgPool) { + const ROLE: &str = "df_reconciler"; + const LABEL: &str = "df_reconciler"; + + let cron = crate::RECONCILER_CRON + .get() + .map(|c| c.to_string_lossy().into_owned()) + .unwrap_or_default(); + let cron = cron.trim().to_string(); + if cron.is_empty() { + return; // built-in reconciler disabled + } + + // Skip if a reconciler is already pending/running (idempotent; self-heals if + // a previous one failed/was cancelled). Runs as the worker role (bypasses + // RLS). The liveness key is the unforgeable submitted_by = df_reconciler + // role, NOT the user-supplied `label`: any df user can call df.start() with + // label 'df_reconciler', so keying on label would let an unprivileged user + // suppress the GC by parking a look-alike instance. A regular user cannot + // submit as df_reconciler (submitted_by = current_user at start time), so + // submitted_by is a sound singleton key. `::text` avoids a regrole cast + // error before the role exists (it simply matches no rows). + let already_running: i64 = match sqlx::query_scalar( + "SELECT count(*) FROM df.instances \ + WHERE submitted_by::text = $1 AND status IN ('pending', 'running')", + ) + .bind(ROLE) + .fetch_one(pool) + .await + { + Ok(n) => n, + Err(e) => { + log!("pg_durable: reconciler liveness check failed: {}", e); + return; + } + }; + if already_running > 0 { + return; + } + + // Ensure the dedicated role exists with the minimal grants. + if let Err(e) = ensure_reconciler_role(pool, ROLE).await { + log!("pg_durable: failed to provision reconciler role: {}", e); + return; + } + + // Start the durable loop as the dedicated role on a single connection so the + // SET ROLE applies to df.start() (which captures submitted_by = current_user). + let started = async { + let mut conn = pool.acquire().await?; + sqlx::query(&format!("SET ROLE {ROLE}")) + .execute(&mut *conn) + .await?; + let res = sqlx::query( + "SELECT df.start(\ + df.loop(df.seq('SELECT * FROM df.reconcile()', df.wait_for_schedule($1))), \ + $2)", + ) + .bind(&cron) + .bind(LABEL) + .execute(&mut *conn) + .await; + // Always reset the role before the connection returns to the pool. + let _ = sqlx::query("RESET ROLE").execute(&mut *conn).await; + res.map(|_| ()) + } + .await; + + match started { + Ok(()) => log!("pg_durable: started built-in reconciler (cron='{}')", cron), + Err(e) => log!("pg_durable: failed to start built-in reconciler: {}", e), + } +} + +/// Create the dedicated non-superuser reconciler role (if absent) and grant it +/// df usage plus EXECUTE on df.reconcile(). Idempotent. +/// +/// The role is `LOGIN` on purpose: the worker executes the reconcile SQL node by +/// opening a real connection AS submitted_by via `connect_as_user` (see +/// types.rs), exactly as for every other durable-function role, so it must be +/// connectable. It is non-superuser and holds only df usage + EXECUTE on +/// df.reconcile(); password/auth exposure is governed by the deployment's +/// pg_hba.conf, the same as any other df submitted_by role. +async fn ensure_reconciler_role(pool: &sqlx::PgPool, role: &str) -> Result<(), sqlx::Error> { + // role is a fixed compile-time constant, so the format! is injection-safe. + sqlx::query(&format!( + "DO $$ BEGIN \ + IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_roles WHERE rolname = '{role}') THEN \ + CREATE ROLE {role} LOGIN; \ + END IF; \ + END $$;" + )) + .execute(pool) + .await?; + sqlx::query("SELECT df.grant_usage($1)") + .bind(role) + .execute(pool) + .await?; + sqlx::query(&format!( + "GRANT EXECUTE ON FUNCTION df.reconcile(integer) TO {role}" + )) + .execute(pool) + .await?; + Ok(()) +} + async fn wait_for_extension_creation(poll_pool: &sqlx::PgPool, poll_interval: Duration) -> bool { log!("pg_durable: waiting for CREATE EXTENSION pg_durable..."); @@ -657,6 +780,10 @@ async fn run_until_extension_dropped_or_shutdown( log!("pg_durable: epoch sentinel gone — extension dropped or recreated"); break; } + // Self-heal: re-assert the built-in reconciler so a loop that + // died mid-epoch is restarted within the poll interval rather + // than only at the next epoch. Idempotent (no-op while alive). + ensure_reconciler(poll_pool).await; } } } diff --git a/tests/e2e/sql/24_atomic_rollback.sql b/tests/e2e/sql/24_atomic_rollback.sql new file mode 100644 index 0000000..d9de3d4 --- /dev/null +++ b/tests/e2e/sql/24_atomic_rollback.sql @@ -0,0 +1,207 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Tests: df.start / df.cancel / df.signal enqueue on the CALLER'S transaction. +-- +-- Regression test for df/_duroxide divergence. Originally the duroxide enqueue +-- happened out-of-band on a separate connection that committed independently, so +-- rolling back the caller's transaction left the runtime store ahead of the +-- control plane: +-- * a rolled-back df.start() left an orphaned orchestration in _duroxide; +-- * a rolled-back df.cancel() still cancelled the (committed) instance; +-- * a rolled-back df.signal() still delivered the signal. +-- +-- With the in-transaction enqueue, a ROLLBACK undoes the runtime work too. Each +-- scenario below FAILS without that change. +-- +-- _duroxide is owned by the worker role, so its tables are read as the superuser +-- (postgres); the durable functions themselves run as the non-superuser +-- df_e2e_user. + +-- Helper (superuser): assert no _duroxide residue remains for an instance id. +-- Resolves the runtime schema dynamically (df.duroxide_schema()). +CREATE OR REPLACE FUNCTION pg_temp.assert_no_duroxide_residue(p_id text) +RETURNS void LANGUAGE plpgsql AS $$ +DECLARE + sch text := df.duroxide_schema(); + n int; +BEGIN + EXECUTE format( + 'SELECT (SELECT count(*) FROM %I.orchestrator_queue WHERE instance_id = $1) ' + ' + (SELECT count(*) FROM %I.instances WHERE instance_id = $1)', + sch, sch) + INTO n USING p_id; + + IF n > 0 THEN + RAISE EXCEPTION 'TEST FAILED: rolled-back df.start left % _duroxide row(s) for instance % (non-atomic enqueue leaked)', n, p_id; + END IF; + RAISE NOTICE 'PASSED [start_rollback]: no _duroxide residue for %', p_id; +END $$; + +-- =========================================================================== +-- Scenario 1: a rolled-back df.start() leaves no _duroxide orphan +-- =========================================================================== + +SET SESSION AUTHORIZATION df_e2e_user; +BEGIN; +SELECT df.start('SELECT 42', 'atomic-rb-start') AS rb_id \gset +ROLLBACK; +RESET SESSION AUTHORIZATION; + +-- Give any (buggy) out-of-band enqueue a moment to surface before asserting. +SELECT pg_sleep(1); +SELECT pg_temp.assert_no_duroxide_residue(:'rb_id'); + +-- =========================================================================== +-- Scenario 2: a rolled-back df.cancel() does NOT cancel the instance +-- =========================================================================== + +SET SESSION AUTHORIZATION df_e2e_user; + +CREATE TEMP TABLE _t_cancel (instance_id TEXT); +INSERT INTO _t_cancel +SELECT df.start(df.loop(df.seq('SELECT 1', df.sleep(1))), 'atomic-rb-cancel'); + +-- Wait until the instance is genuinely running. +DO $$ +DECLARE inst_id TEXT; status TEXT; attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _t_cancel; + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + EXIT WHEN lower(status) = 'running' OR attempts > 300; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF lower(status) <> 'running' THEN + RAISE EXCEPTION 'Scenario 2 setup: cancel victim never reached running (status=%)', status; + END IF; +END $$; + +-- Issue a cancel, then roll it back. +BEGIN; +SELECT df.cancel((SELECT instance_id FROM _t_cancel), 'rolled-back-cancel'); +ROLLBACK; + +-- The instance must NOT become cancelled: the cancel enqueue was rolled back. +-- (Without the change the out-of-band CancelInstance committed, so the worker +-- cancels the instance within ~1s.) +DO $$ +DECLARE inst_id TEXT; status TEXT; attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _t_cancel; + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + IF lower(status) = 'cancelled' THEN + RAISE EXCEPTION 'TEST FAILED: rolled-back df.cancel still cancelled instance % (non-atomic enqueue leaked)', inst_id; + END IF; + EXIT WHEN attempts > 50; -- watch for ~5s + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + RAISE NOTICE 'PASSED [cancel_rollback]: instance % still % after a rolled-back cancel', inst_id, status; +END $$; + +-- Cleanup: cancel the (infinite) loop for real. +SELECT df.cancel((SELECT instance_id FROM _t_cancel), 'cleanup'); +DROP TABLE _t_cancel; + +RESET SESSION AUTHORIZATION; + +-- =========================================================================== +-- Scenario 3: a rolled-back df.signal() does NOT deliver the signal +-- =========================================================================== + +SET SESSION AUTHORIZATION df_e2e_user; + +CREATE TEMP TABLE _t_signal (instance_id TEXT); +INSERT INTO _t_signal +SELECT df.start('SELECT 1' ~> (df.wait_for_signal('go') |=> 'sig') ~> 'SELECT 1', + 'atomic-rb-signal'); + +-- Wait until the instance is running (blocked on the signal). +DO $$ +DECLARE inst_id TEXT; status TEXT; attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _t_signal; + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + EXIT WHEN lower(status) = 'running' OR attempts > 300; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF lower(status) <> 'running' THEN + RAISE EXCEPTION 'Scenario 3 setup: signal victim never reached running (status=%)', status; + END IF; +END $$; + +-- Send a signal, then roll it back. +BEGIN; +SELECT df.signal((SELECT instance_id FROM _t_signal), 'go', '{}'); +ROLLBACK; + +-- The instance must NOT complete: the signal enqueue was rolled back. +-- (Without the change the out-of-band ExternalRaised committed, so the instance +-- receives the signal and completes within ~1s.) +DO $$ +DECLARE inst_id TEXT; status TEXT; attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _t_signal; + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + IF lower(status) = 'completed' THEN + RAISE EXCEPTION 'TEST FAILED: rolled-back df.signal still delivered to instance % (non-atomic enqueue leaked)', inst_id; + END IF; + EXIT WHEN attempts > 50; -- watch for ~5s + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + RAISE NOTICE 'PASSED [signal_rollback]: instance % still % after a rolled-back signal', inst_id, status; +END $$; + +-- Cleanup: deliver the signal for real and let it finish. +SELECT df.signal((SELECT instance_id FROM _t_signal), 'go', '{}'); +DO $$ +DECLARE inst_id TEXT; status TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _t_signal; + SELECT df.wait_for_completion(inst_id, 30) INTO status; +END $$; +DROP TABLE _t_signal; + +RESET SESSION AUTHORIZATION; + +-- =========================================================================== +-- Scenario 4: a signal sent before runtime materialization is rejected +-- =========================================================================== + +SET SESSION AUTHORIZATION df_e2e_user; + +DO $$ +DECLARE inst_id TEXT; +BEGIN + BEGIN + inst_id := df.start( + 'SELECT 1' ~> (df.wait_for_signal('go') |=> 'sig') ~> 'SELECT 1', + 'atomic-signal-before-runtime-ready' + ); + + -- The worker cannot have materialized the root _duroxide.instances row + -- yet (we are still inside the same backend statement/transaction). A + -- signal here would be accepted by the runtime but skipped before the + -- workflow subscribes, so df.signal must fail instead of returning OK. + PERFORM df.signal(inst_id, 'go', '{}'); + RAISE EXCEPTION 'TEST FAILED: df.signal accepted instance % before runtime materialization', inst_id; + EXCEPTION + WHEN object_not_in_prerequisite_state THEN + IF SQLERRM LIKE '%not ready to receive signals%' THEN + RAISE NOTICE 'PASSED [signal_before_runtime_ready_rejected]: %', SQLERRM; + ELSE + RAISE EXCEPTION 'TEST FAILED: unexpected not-ready signal error: %', SQLERRM; + END IF; + END; +END $$; + +RESET SESSION AUTHORIZATION; + +SELECT 'TEST PASSED: atomic rollback (start/cancel/signal)' AS result; diff --git a/tests/e2e/sql/25_enqueue_wrapper_authz.sql b/tests/e2e/sql/25_enqueue_wrapper_authz.sql new file mode 100644 index 0000000..6944b88 --- /dev/null +++ b/tests/e2e/sql/25_enqueue_wrapper_authz.sql @@ -0,0 +1,200 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Tests: authorization and hardening on the in-transaction enqueue wrappers. +-- +-- The df._enqueue_orchestrator_* functions are SECURITY DEFINER (the runtime +-- queue is writable by its owner only) and granted to every df user via +-- df.grant_usage(). They must therefore refuse to enqueue work against an +-- instance the caller does not own, and the start wrapper must not become a +-- generic "start any internal orchestration with arbitrary input" primitive. +-- Cancel/signal ownership is checked with pg_has_role(session_user, +-- , 'MEMBER'): session_user cannot be spoofed inside a +-- SECURITY DEFINER function, and membership lets a role that owns the instance +-- through SET ROLE still qualify. +-- +-- Without the change these wrappers do not exist, so the forge attempts below +-- raise undefined_function and the test fails. + +-- Fresh, non-superuser roles. (Superusers bypass pg_has_role, so the denial can +-- only be exercised by a non-superuser caller.) +DO $precleanup$ +DECLARE + inst_id TEXT; +BEGIN + -- A failed/interrupted prior run can leave a loop instance submitted by one + -- of these roles. Cancel it before DROP ROLE so the worker stops reconnecting + -- as that role while this test rebuilds privileges. + FOR inst_id IN + SELECT i.id + FROM df.instances i + JOIN pg_catalog.pg_roles r ON r.oid = i.submitted_by::oid + WHERE r.rolname IN ('authz_owner', 'authz_other') + AND i.status NOT IN ('completed', 'failed', 'cancelled') + LOOP + PERFORM df.cancel(inst_id, 'authz-test-precleanup'); + END LOOP; +END $precleanup$; + +SELECT pg_sleep(0.5); + +DO $setup$ +BEGIN + BEGIN DROP OWNED BY authz_owner; EXCEPTION WHEN undefined_object THEN NULL; END; + BEGIN DROP OWNED BY authz_other; EXCEPTION WHEN undefined_object THEN NULL; END; + BEGIN DROP ROLE authz_owner; EXCEPTION WHEN undefined_object THEN NULL; END; + BEGIN DROP ROLE authz_other; EXCEPTION WHEN undefined_object THEN NULL; END; +END $setup$; + +CREATE ROLE authz_owner LOGIN; +CREATE ROLE authz_other LOGIN; +SELECT df.grant_usage('authz_owner'); +SELECT df.grant_usage('authz_other'); + +-- The owner starts a long-running instance. +SET SESSION AUTHORIZATION authz_owner; +CREATE TEMP TABLE _t_authz (instance_id TEXT); +INSERT INTO _t_authz +SELECT df.start(df.loop(df.seq('SELECT 1', df.sleep(1))), 'authz-owner-inst'); +RESET SESSION AUTHORIZATION; + +-- Let the non-owner read the instance id (so the forge attempts can target it). +GRANT SELECT ON _t_authz TO authz_other; + +-- Wait until it is running. +DO $$ +DECLARE inst_id TEXT; status TEXT; attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _t_authz; + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + EXIT WHEN lower(status) = 'running' OR attempts > 300; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF lower(status) <> 'running' THEN + RAISE EXCEPTION 'Setup: owner instance never reached running (status=%)', status; + END IF; +END $$; + +-- =========================================================================== +-- A non-owner cannot forge a cancel/signal against the owner's instance. +-- =========================================================================== + +SET SESSION AUTHORIZATION authz_other; +DO $$ +DECLARE inst_id TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _t_authz; + + -- Forge a cancel. + BEGIN + PERFORM df._enqueue_orchestrator_cancel(inst_id, 'forged'); + RAISE EXCEPTION 'TEST FAILED: authz_other was allowed to enqueue a cancel for owner instance %', inst_id; + EXCEPTION + WHEN insufficient_privilege THEN + IF SQLERRM LIKE '%not authorized%' THEN + RAISE NOTICE 'PASSED [cancel_forge_denied]: %', SQLERRM; + ELSE + RAISE EXCEPTION 'TEST FAILED: cancel denied, but not by the wrapper authorization check: %', SQLERRM; + END IF; + WHEN undefined_function THEN + RAISE EXCEPTION 'TEST FAILED: df._enqueue_orchestrator_cancel is missing (change not present): %', SQLERRM; + END; + + -- Forge a signal. + BEGIN + PERFORM df._enqueue_orchestrator_signal(inst_id, 'go', '{}'); + RAISE EXCEPTION 'TEST FAILED: authz_other was allowed to enqueue a signal for owner instance %', inst_id; + EXCEPTION + WHEN insufficient_privilege THEN + IF SQLERRM LIKE '%not authorized%' THEN + RAISE NOTICE 'PASSED [signal_forge_denied]: %', SQLERRM; + ELSE + RAISE EXCEPTION 'TEST FAILED: signal denied, but not by the wrapper authorization check: %', SQLERRM; + END IF; + WHEN undefined_function THEN + RAISE EXCEPTION 'TEST FAILED: df._enqueue_orchestrator_signal is missing (change not present): %', SQLERRM; + END; +END $$; +RESET SESSION AUTHORIZATION; + +-- =========================================================================== +-- A caller cannot use the start wrapper as a generic privileged entrypoint. +-- =========================================================================== + +SET SESSION AUTHORIZATION authz_other; +DO $$ +DECLARE + inst_id CONSTANT TEXT := 'feedf00d'; + root_id CONSTANT TEXT := 'deadbeef'; +BEGIN + -- Create a legitimate brand-new, pending instance owned by authz_other. The + -- start wrapper's state check should pass for this row; the test verifies + -- the wrapper still rejects a non-root internal orchestration name. + INSERT INTO df.instances (id, label, root_node, submitted_by, database) + VALUES (inst_id, 'authz-start-wrapper-hardening', root_id, current_user::regrole, 'postgres'); + + INSERT INTO df.nodes (id, instance_id, node_type, query, submitted_by, database) + VALUES (root_id, inst_id, 'SQL', 'SELECT 1', current_user::regrole, 'postgres'); + + BEGIN + PERFORM df._enqueue_orchestrator_start( + inst_id, + 'pg_durable::orchestration::execute-subtree', + json_build_object('instance_id', inst_id)::text); + RAISE EXCEPTION 'TEST FAILED: start wrapper accepted a non-root orchestration for instance %', inst_id; + EXCEPTION + WHEN invalid_parameter_value THEN + IF SQLERRM LIKE '%invalid start orchestration%' THEN + RAISE NOTICE 'PASSED [start_wrapper_rejects_internal_orchestration]: %', SQLERRM; + ELSE + RAISE EXCEPTION 'TEST FAILED: start wrapper rejected with an unexpected validation error: %', SQLERRM; + END IF; + WHEN undefined_function THEN + RAISE EXCEPTION 'TEST FAILED: df._enqueue_orchestrator_start is missing (change not present): %', SQLERRM; + END; +END $$; +RESET SESSION AUTHORIZATION; + +BEGIN; +DELETE FROM df.instances WHERE id = 'feedf00d'; +DELETE FROM df.nodes WHERE instance_id = 'feedf00d'; +COMMIT; + +-- =========================================================================== +-- The owner can still cancel its own instance (the authorized path works). +-- =========================================================================== + +SET SESSION AUTHORIZATION authz_owner; +SELECT df.cancel((SELECT instance_id FROM _t_authz), 'owner-cancel'); +RESET SESSION AUTHORIZATION; + +DO $$ +DECLARE inst_id TEXT; status TEXT; attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _t_authz; + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + EXIT WHEN lower(status) = 'cancelled' OR attempts > 100; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF lower(status) <> 'cancelled' THEN + RAISE EXCEPTION 'TEST FAILED: owner could not cancel its own instance % (status=%)', inst_id, status; + END IF; + RAISE NOTICE 'PASSED [owner_cancel_allowed]: instance % cancelled by its owner', inst_id; +END $$; + +DROP TABLE _t_authz; + +-- Cleanup roles. +DO $cleanup$ +BEGIN + BEGIN DROP OWNED BY authz_owner; EXCEPTION WHEN undefined_object THEN NULL; END; + BEGIN DROP OWNED BY authz_other; EXCEPTION WHEN undefined_object THEN NULL; END; + BEGIN DROP ROLE authz_owner; EXCEPTION WHEN undefined_object THEN NULL; END; + BEGIN DROP ROLE authz_other; EXCEPTION WHEN undefined_object THEN NULL; END; +END $cleanup$; + +SELECT 'TEST PASSED: enqueue wrapper authorization' AS result; diff --git a/tests/e2e/sql/26_reconcile_orphan_gc.sql b/tests/e2e/sql/26_reconcile_orphan_gc.sql new file mode 100644 index 0000000..c59c37b --- /dev/null +++ b/tests/e2e/sql/26_reconcile_orphan_gc.sql @@ -0,0 +1,149 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Tests: df.reconcile() repairs leftover df/_duroxide drift. +-- +-- df.reconcile() deletes orphaned ROOT runtime instances (no df.instances row, +-- older than the grace window), gathering each orphan's full subtree so the +-- delete is accepted, and leaves healthy instances untouched. +-- +-- Without the change df.reconcile() does not exist, so the calls below raise +-- undefined_function and the test fails. +-- +-- _duroxide is read as the superuser (postgres); durable functions run as +-- df_e2e_user. + +-- Helper (superuser): does a _duroxide instance row exist for this id? +CREATE OR REPLACE FUNCTION pg_temp.duroxide_instance_exists(p_id text) +RETURNS boolean LANGUAGE plpgsql AS $$ +DECLARE sch text := df.duroxide_schema(); ex boolean; +BEGIN + EXECUTE format('SELECT EXISTS(SELECT 1 FROM %I.instances WHERE instance_id = $1)', sch) + INTO ex USING p_id; + RETURN ex; +END $$; + +-- Helper (superuser): number of direct children (sub-orchestrations) of a root. +CREATE OR REPLACE FUNCTION pg_temp.duroxide_child_count(p_root text) +RETURNS bigint LANGUAGE plpgsql AS $$ +DECLARE sch text := df.duroxide_schema(); n bigint; +BEGIN + EXECUTE format('SELECT count(*) FROM %I.instances WHERE parent_instance_id = $1', sch) + INTO n USING p_root; + RETURN n; +END $$; + +-- Helper (superuser): size of the full instance subtree rooted at p_root. +CREATE OR REPLACE FUNCTION pg_temp.duroxide_subtree_count(p_root text) +RETURNS bigint LANGUAGE plpgsql AS $$ +DECLARE sch text := df.duroxide_schema(); n bigint; +BEGIN + EXECUTE format( + 'WITH RECURSIVE t AS ( ' + ' SELECT instance_id FROM %1$I.instances WHERE instance_id = $1 ' + ' UNION ' + ' SELECT c.instance_id FROM %1$I.instances c JOIN t ON c.parent_instance_id = t.instance_id ' + ') SELECT count(*) FROM t', sch) + INTO n USING p_root; + RETURN n; +END $$; + +-- =========================================================================== +-- Scenario 1: a planted root orphan WITH a running subtree is fully collected +-- =========================================================================== + +-- Start a parallel JOIN: each branch runs as its own sub-orchestration, which +-- has no df.instances row of its own. Orphaning the root then forces reconcile +-- to gather the FULL subtree (root + children) -- deleting only the root would be +-- refused by delete_instances_atomic, leaving the orphan behind. Long branches +-- keep the children running for the duration of the test. +SET SESSION AUTHORIZATION df_e2e_user; +CREATE TEMP TABLE _t_orphan (instance_id TEXT); +INSERT INTO _t_orphan +SELECT df.start('SELECT pg_sleep(30)' & 'SELECT pg_sleep(30)', 'reconcile-orphan-victim'); +RESET SESSION AUTHORIZATION; + +-- Wait until the worker has materialized the root AND at least one child, so the +-- subtree genuinely exists when we reconcile. +DO $$ +DECLARE inst_id TEXT; attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _t_orphan; + LOOP + EXIT WHEN pg_temp.duroxide_child_count(inst_id) >= 1 OR attempts > 300; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF pg_temp.duroxide_child_count(inst_id) < 1 THEN + RAISE EXCEPTION 'Setup: orphan victim never spawned a child sub-orchestration'; + END IF; + RAISE NOTICE 'Setup: orphan victim % has % child sub-orchestration(s)', inst_id, pg_temp.duroxide_child_count(inst_id); +END $$; + +-- Orphan it: drop the control-plane rows (superuser bypasses row security). +-- df.instances and df.nodes reference each other, but the FKs are +-- DEFERRABLE INITIALLY DEFERRED, so deleting both in one transaction is fine. +BEGIN; +DELETE FROM df.instances WHERE id = (SELECT instance_id FROM _t_orphan); +DELETE FROM df.nodes WHERE instance_id = (SELECT instance_id FROM _t_orphan); +COMMIT; + +-- Reconcile with a zero grace window: the orphaned root AND its subtree must be +-- collected. +SELECT df.reconcile(0); + +DO $$ +DECLARE inst_id TEXT; remaining bigint; +BEGIN + SELECT instance_id INTO inst_id FROM _t_orphan; + remaining := pg_temp.duroxide_subtree_count(inst_id); + IF remaining > 0 THEN + RAISE EXCEPTION 'TEST FAILED: df.reconcile left % subtree row(s) for orphaned root % (subtree not gathered)', remaining, inst_id; + END IF; + RAISE NOTICE 'PASSED [orphan_subtree_collected]: root % and all children removed', inst_id; +END $$; + +DROP TABLE _t_orphan; + +-- =========================================================================== +-- Scenario 2: a healthy, running instance is left untouched +-- =========================================================================== + +SET SESSION AUTHORIZATION df_e2e_user; +CREATE TEMP TABLE _t_live (instance_id TEXT); +INSERT INTO _t_live SELECT df.start('SELECT pg_sleep(3)', 'reconcile-live'); +RESET SESSION AUTHORIZATION; + +-- Ensure it is running before reconciling. +DO $$ +DECLARE inst_id TEXT; status TEXT; attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _t_live; + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + EXIT WHEN lower(status) = 'running' OR attempts > 300; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF lower(status) <> 'running' THEN + RAISE EXCEPTION 'Setup: live instance never reached running (status=%)', status; + END IF; +END $$; + +-- Reconcile must not disturb a healthy instance (it has a df.instances row). +SELECT df.reconcile(0); + +DO $$ +DECLARE inst_id TEXT; status TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _t_live; + SELECT df.wait_for_completion(inst_id, 30) INTO status; + IF lower(status) <> 'completed' THEN + RAISE EXCEPTION 'TEST FAILED: df.reconcile disturbed a healthy instance % (status=%)', inst_id, status; + END IF; + RAISE NOTICE 'PASSED [live_untouched]: % completed normally despite reconcile', inst_id; +END $$; + +DROP TABLE _t_live; + +SELECT 'TEST PASSED: reconcile orphan GC' AS result;