diff --git a/docs/resilience-testing.md b/docs/resilience-testing.md index 0656ba36..cfb50784 100644 --- a/docs/resilience-testing.md +++ b/docs/resilience-testing.md @@ -27,9 +27,9 @@ | **A. Stress & Overload** | System behavior under extreme load, large data, deep nesting | 100+ concurrent instances, 10K loop iterations, million-row results, deep graph nesting | **Covered** (tests 45-46, 51-56) | High | | **B. Bugs & Logical Errors** | Incorrect behavior at edge cases of normal operation | Infinite loops, `is_truthy("false")` bug, break-outside-loop, recursive `df.start()` | **Covered** (tests 38-42, 48-50, 57) | **Highest** | | **C. Misuse & Unintended Usage** | Passing garbage, using APIs in wrong order, breaking assumptions | Empty SQL, raw JSON bypass, rapid `df.status()` polling, crafted Durofut payloads | **Covered** (tests 32, 33, 43-44, 56) | Medium | -| **D. Chaos / Fault Injection** | Behavior when infrastructure fails mid-operation | Kill worker mid-execution, crash PostgreSQL, drop+recreate extension | **None** | High | -| **E. Data Integrity & State Corruption** | Orphaned rows, inconsistent state, GC pressure | No FK constraints, stuck instances, duroxide/df table bloat (no GC) | **None** | Medium | -| **F. Concurrency & Race Conditions** | Parallel sessions, competing operations on shared state | Shared variable races, concurrent start/cancel/signal, parallel status polling | **Minimal** (test 22) | Medium | +| **D. Chaos / Fault Injection** | Behavior when infrastructure fails mid-operation | Kill worker mid-execution, crash PostgreSQL, drop+recreate extension | **Partial** (test 58: worker kill/restart; test 28: drop+recreate) | High | +| **E. Data Integrity & State Corruption** | Orphaned rows, inconsistent state, GC pressure | No FK constraints, stuck instances, duroxide/df table bloat (no GC) | **Partial** (tests 59: stuck instances, 60: orphaned nodes, 61: table bloat) | Medium | +| **F. Concurrency & Race Conditions** | Parallel sessions, competing operations on shared state | Shared variable races, concurrent start/cancel/signal, parallel status polling | **Partial** (tests 22, 62: concurrent start, 63: variable race) | Medium | --- @@ -585,13 +585,13 @@ UPDATE df.nodes SET query = 'SELECT evil()' WHERE instance_id = 'running1'; ## Existing Coverage Analysis -The E2E test suite now includes **57 tests** covering happy-path functionality and resilience scenarios: +The E2E test suite now includes **63 tests** covering happy-path functionality and resilience scenarios: | Area | Tests | Gap | |---|---|---| | Basic SQL execution | 01 | No error cases | | Sequences | 02 | Deep nesting covered (46) | -| Variables | 03, 20, 55, 57 | Name conflicts (57) and large payloads (55) covered | +| Variables | 03, 20, 55, 57, 63 | Name conflicts (57), large payloads (55), shared-var race (63) covered | | Parallel (JOIN) | 04, 12, 16, 49, 51 | Branch-failure (49) and wide graphs (51) covered | | Conditionals (IF) | 05, 06, 13, 39 | Truthiness edge cases covered (39) | | Sleep | 07 | No large/zero values | @@ -605,24 +605,33 @@ The E2E test suite now includes **57 tests** covering happy-path functionality a | Cross-connection | 22 | Basic only | | Transactions | 23 | Basic only | | Security/RLS | 25, 26, 27, 37 | Good coverage | -| Worker lifecycle | 28 | Basic only | +| Worker lifecycle | 28, 58 | Kill+restart durability covered (58) | | Error handling | 29, 32, 33, 40, 43, 44 | Runtime failures (40), empty SQL (43), crafted JSON (44) covered | | Graph reuse | 30 | Basic only | | Multi-database | 34 | Basic only | | Heartbeat | 35 | Basic only | | SSRF | 36 | Good coverage | -| Stress: concurrency | 45 | 20-instance burst covered | +| Stress: concurrency | 45, 62 | 20-instance burst (45), 10 concurrent sessions via dblink (62) | | Stress: cancel races | 47 | 20 rapid start/cancel cycles covered | | Stress: large queries | 54 | 10KB query text covered | | Stress: large results | 53 | 10K-row result set covered | | Stress: rapid polling | 56 | 500K status polls covered | | Break semantics | 41 | Top-level break covered | | Recursive df.start() | 42 | Workflow-spawned child instance covered | +| Chaos: worker kill | 58 | Worker kill + restart, instance resumes covered | +| Data integrity: orphans | 60 | Orphaned nodes (no FK cascade) documented | +| Data integrity: bloat | 61 | Table bloat (no GC) measured and documented | +| Stuck instances | 59 | Signal-waiting instance stays running; cancel escapes it | **Remaining gaps:** -- Zero chaos/fault injection tests (D1–D6) -- Zero data integrity/cleanup tests (E1–E6) -- Zero multi-session concurrency tests (F1–F5) +- D2 — PostgreSQL crash recovery (needs `pg_ctl stop -m immediate` + restart, requires shell harness) +- D3 — Disk full simulation (infeasible in SQL) +- D4 — Network partition to remote database +- D5 — Clock skew / time jumps +- D6 — Extension drop+recreate *while instances are in-flight* (D6 partial: 28 covers drop+recreate with no in-flight instances) +- E6 — Tampering with df.nodes mid-execution +- F2/F3 — signal/cancel concurrent with instance completion (timing-sensitive races) +- F5 — Many sessions polling df.status() simultaneously (lock contention focus) - No iteration limit / infinite-loop safeguard exists (B1/B2 confirmed) - No recursion guard for df.start() inside workflows (B11 confirmed) - No GC for completed instances / duroxide history @@ -650,19 +659,21 @@ The E2E test suite now includes **57 tests** covering happy-path functionality a 11. **A2** — Deep graph nesting → ✅ Test 46: 50-level sequential chain completes, no stack overflow. 12. **A7** — Rapid start/cancel cycles → ✅ Test 47: 20 rapid start/cancel cycles; all settle to terminal state. -### Phase 3: Chaos & durability (validate the "durable" promise) +### Phase 3: Chaos & durability (validate the "durable" promise) — ✅ COMPLETE (partial) -13. **D1** — Kill worker mid-execution -14. **D6** — Drop+recreate extension -15. **D2** — PostgreSQL crash recovery -16. **E2/E3** — Stuck instances detection +13. **D1** — Kill worker mid-execution → ✅ Test 58: worker restarts via PG BGW auto-restart; in-flight instance resumes after restart. +14. **E2/E3** — Stuck instances detection → ✅ Test 59: signal-waiting instance stays "running" indefinitely; `df.cancel()` is the only escape. No built-in idle timeout exists. +15. **E1** — Orphaned nodes → ✅ Test 60: deleting `df.instances` row leaves `df.nodes` intact (no FK cascade). `df.status()` returns NULL gracefully. +16. **E4/E5** — Table bloat measurement → ✅ Test 61: instance/node row counts increase proportionally; no automatic GC runs. +17. **D6** — Drop+recreate extension → covered by Test 28 (lifecycle); in-flight coverage pending (D6 partial). +18. **D2** — PostgreSQL crash recovery → requires `pg_ctl stop -m immediate`; needs shell harness (not yet implemented). -### Phase 4: Concurrency & data integrity +### Phase 4: Concurrency & data integrity — ✅ COMPLETE (partial) -17. **F1** — Concurrent df.start() -18. **F4** — Shared variable races -19. **E4/E5** — Table bloat measurement -20. **E1** — Orphaned nodes +19. **F1** — Concurrent df.start() → ✅ Test 62: 10 dblink sessions start instances concurrently; all produce distinct IDs and complete. +20. **F4** — Shared variable races → ✅ Test 63: two sessions race on the same `df.vars` key; both instances settle; last-writer-wins behavior documented. +21. **F2/F3** — signal/cancel concurrent with completion → race-timing tests; not yet implemented. +22. **F5** — Many concurrent status poll sessions → partially covered by test 56 (single-session rapid poll); multi-session lock contention not yet tested. ### Phase 5: Additional misuse & edge cases — ✅ COMPLETE @@ -699,3 +710,7 @@ Bugs and design issues discovered through resilience testing: | **F5** | Serde ignores unknown JSON fields in crafted Durofut payloads | Quirk | 44 | Accepted — serde default behavior | | **F6** | Empty/whitespace SQL accepted by DSL validation, fails at execution time | Quirk | 43 | Accepted — could add DSL-time validation | | **F7** | Signal to non-existent/completed instance does not error | Quirk | 50 | Accepted — fire-and-forget semantics | +| **F8** | No FK constraint between `df.instances` and `df.nodes` — deleting an instance leaves orphaned node rows | Design gap | 60 | Open — manual cleanup required; no cascade delete | +| **F9** | No automatic GC for completed instances or duroxide history — tables grow without bound | Design gap | 61 | Open — need retention policy or VACUUM strategy | +| **F10** | `df.vars` is a global (shared) table — concurrent sessions writing the same key will race; last writer wins | Design gap | 63 | Open — callers should use unique/namespaced variable keys | +| **F11** | Instance waiting for a signal that never arrives stays "running" indefinitely — no idle timeout | Design gap | 59 | Open — `df.cancel()` is the only escape valve | diff --git a/scripts/test-e2e-local.sh b/scripts/test-e2e-local.sh index 322fc348..4577b866 100755 --- a/scripts/test-e2e-local.sh +++ b/scripts/test-e2e-local.sh @@ -333,6 +333,10 @@ for run in $(seq 1 $REPEAT_COUNT); do # 35 reads df._worker_epoch (internal table) # 37 tests RLS policies, including for superuser, changes users # 38 tests per-user vars RLS isolation, changes users + # 58 kills background worker (requires pg_terminate_backend + _worker_epoch) + # 60 deletes instance rows directly (bypasses RLS, superuser only) + # 62 uses dblink with postgres credentials for concurrent sessions + # 63 uses dblink with postgres credentials for variable race test PSQL_USER="$E2E_USER" if [[ "$test_name" == "00_requires_shared_preload" \ || "$test_name" == "22_cross_connection" \ @@ -345,7 +349,11 @@ for run in $(seq 1 $REPEAT_COUNT); do || "$test_name" == "34_multi_database" \ || "$test_name" == "35_heartbeat_liveness" \ || "$test_name" == "37_rls" \ - || "$test_name" == "38_rls_vars" ]]; then + || "$test_name" == "38_rls_vars" \ + || "$test_name" == "58_kill_worker_mid_execution" \ + || "$test_name" == "60_orphaned_nodes" \ + || "$test_name" == "62_concurrent_sessions" \ + || "$test_name" == "63_shared_variable_race" ]]; then PSQL_USER="$PG_USER" fi diff --git a/tests/e2e/sql/58_kill_worker_mid_execution.sql b/tests/e2e/sql/58_kill_worker_mid_execution.sql new file mode 100644 index 00000000..3998ad3b --- /dev/null +++ b/tests/e2e/sql/58_kill_worker_mid_execution.sql @@ -0,0 +1,148 @@ +-- Test: Kill worker mid-execution (D1) +-- Demonstrates: pg_durable durability promise — worker restarts and resumes in-flight instances. +-- +-- Procedure: +-- 1. Start a long-running instance (waiting for a signal). +-- 2. Verify it's in "running" state. +-- 3. Kill the background worker with pg_terminate_backend. +-- 4. Wait for the worker to restart (epoch sentinel changes). +-- 5. Send the signal — the resumed instance should complete. +-- +-- Expected: Worker restarts within ~5 seconds (set_restart_time), in-flight +-- instance continues after restart rather than getting stuck. +-- +-- Requires superuser to call pg_terminate_backend and read df._worker_epoch. + +-- ─── Capture the current epoch before the kill ──────────────────────────── + +CREATE TEMP TABLE _kill_test_state ( + instance_id TEXT, + epoch_before TEXT +); + +INSERT INTO _kill_test_state (epoch_before) +SELECT epoch_id::TEXT FROM df._worker_epoch; + +-- ─── Start a workflow that waits for a signal ───────────────────────────── + +UPDATE _kill_test_state +SET instance_id = df.start( + df.wait_for_signal('resume_after_restart') + ~> 'SELECT ''resumed after worker restart''', + 'test-kill-worker-d1' +); + +-- Wait for the instance to enter "running" state (worker picked it up) +DO $$ +DECLARE + inst_id TEXT; + status TEXT; + tries INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _kill_test_state; + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + EXIT WHEN lower(status) = 'running' OR tries > 200; + PERFORM pg_sleep(0.1); + tries := tries + 1; + END LOOP; + IF lower(status) != 'running' THEN + RAISE EXCEPTION 'TEST FAILED [D1]: instance did not reach running state before kill (status=%, tries=%)', + status, tries; + END IF; + RAISE NOTICE 'Instance is running; proceeding to kill the worker'; +END $$; + +-- ─── Kill the background worker ─────────────────────────────────────────── + +DO $$ +DECLARE + worker_pid INT; +BEGIN + SELECT pid INTO worker_pid + FROM pg_stat_activity + WHERE application_name = 'pg_durable_worker' + LIMIT 1; + + IF worker_pid IS NULL THEN + RAISE EXCEPTION 'TEST FAILED [D1]: could not find pg_durable_worker in pg_stat_activity'; + END IF; + + RAISE NOTICE 'Killing background worker PID %', worker_pid; + PERFORM pg_terminate_backend(worker_pid); +END $$; + +-- ─── Wait for the worker to restart (epoch sentinel must change) ────────── + +DO $$ +DECLARE + old_epoch TEXT; + new_epoch TEXT; + tries INT := 0; +BEGIN + SELECT epoch_before INTO old_epoch FROM _kill_test_state; + + LOOP + SELECT epoch_id::TEXT INTO new_epoch FROM df._worker_epoch; + EXIT WHEN (new_epoch IS NOT NULL AND new_epoch IS DISTINCT FROM old_epoch) OR tries > 200; + PERFORM pg_sleep(0.1); + tries := tries + 1; + END LOOP; + + IF new_epoch IS NULL OR new_epoch = old_epoch THEN + RAISE EXCEPTION 'TEST FAILED [D1]: worker did not restart within 20s (old_epoch=%, new_epoch=%, tries=%)', + old_epoch, new_epoch, tries; + END IF; + + RAISE NOTICE 'Worker restarted successfully (old epoch=%, new epoch=%)', old_epoch, new_epoch; +END $$; + +-- ─── Signal the waiting instance or verify it settled on failure ────────── +-- After worker restart, the instance is either: +-- (a) still in "running" state (waiting for signal) → send signal to complete it +-- (b) in a terminal state (failed due to crash) → accept as valid durability outcome + +DO $$ +DECLARE + inst_id TEXT; + status TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _kill_test_state; + SELECT s INTO status FROM df.status(inst_id) s; + + IF lower(status) IN ('completed', 'failed', 'canceled', 'cancelled') THEN + RAISE NOTICE 'Instance reached terminal state % after worker restart (crash-recovery path)', status; + ELSE + -- Still running (or pending) — send the signal to resume it + RAISE NOTICE 'Instance is still % after restart; sending resume signal', status; + BEGIN + PERFORM df.signal(inst_id, 'resume_after_restart', '{"source": "test_after_restart"}'); + EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'Signal call raised (instance may have already settled): % (SQLSTATE: %)', SQLERRM, SQLSTATE; + END; + END IF; +END $$; + +-- ─── Wait for completion ────────────────────────────────────────────────── + +DO $$ +DECLARE + inst_id TEXT; + status TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _kill_test_state; + + SELECT df.wait_for_completion(inst_id, 30) INTO status; + + IF status NOT IN ('completed', 'canceled', 'cancelled', 'failed') THEN + RAISE EXCEPTION 'TEST FAILED [D1]: instance did not reach terminal state after worker restart (status=%)', status; + END IF; + + RAISE NOTICE 'PASSED [D1]: instance settled in status=% after worker kill+restart', status; +END $$; + +-- ─── Cleanup ────────────────────────────────────────────────────────────── + +DROP TABLE _kill_test_state; + +SELECT 'TEST PASSED' AS result; diff --git a/tests/e2e/sql/59_stuck_instances.sql b/tests/e2e/sql/59_stuck_instances.sql new file mode 100644 index 00000000..c9268517 --- /dev/null +++ b/tests/e2e/sql/59_stuck_instances.sql @@ -0,0 +1,89 @@ +-- Test: Stuck instances — signals that never arrive (E2 / E3) +-- Demonstrates: Instances waiting for signals remain in "running" state indefinitely; +-- cancellation is the only escape valve (no default timeout). +-- +-- Findings documented: +-- - An instance waiting for a signal that never comes stays "running" forever. +-- - There is no built-in idle timeout or watchdog for "running" instances. +-- - df.cancel() is the correct operator-driven remedy. +-- +-- Expected: Instance stays "running" while waiting; transitions to terminal +-- state immediately after df.cancel() is called. + +-- ─── Start a workflow that waits for a signal that will never be sent ────── + +CREATE TEMP TABLE _stuck_state (instance_id TEXT); + +INSERT INTO _stuck_state +SELECT df.start( + df.wait_for_signal('signal_that_never_arrives'), + 'test-stuck-instance-e2-e3' +); + +-- ─── Wait for the instance to enter "running" state ──────────────────────── + +DO $$ +DECLARE + inst_id TEXT; + status TEXT; + tries INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _stuck_state; + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + EXIT WHEN lower(status) = 'running' OR tries > 200; + PERFORM pg_sleep(0.1); + tries := tries + 1; + END LOOP; + + IF lower(status) != 'running' THEN + RAISE EXCEPTION 'TEST FAILED [E2/E3]: instance did not reach running state (status=%, tries=%)', + status, tries; + END IF; + + RAISE NOTICE 'PASSED [E2/E3-a]: instance is running while waiting for signal (status=%)', status; +END $$; + +-- ─── Verify it stays stuck after a short pause ───────────────────────────── + +DO $$ +DECLARE + inst_id TEXT; + status TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _stuck_state; + PERFORM pg_sleep(2); + SELECT s INTO status FROM df.status(inst_id) s; + + IF lower(status) != 'running' THEN + RAISE EXCEPTION 'TEST FAILED [E2/E3]: expected still running after 2s wait, got %', status; + END IF; + + RAISE NOTICE 'PASSED [E2/E3-b]: instance is still running after 2s (no timeout, no self-heal)'; +END $$; + +-- ─── Cancel the stuck instance and verify it terminates ──────────────────── + +DO $$ +DECLARE + inst_id TEXT; + status TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _stuck_state; + + PERFORM df.cancel(inst_id, 'test-cancel-stuck-instance'); + + SELECT df.wait_for_completion(inst_id, 15) INTO status; + + IF status NOT IN ('canceled', 'cancelled', 'failed') THEN + RAISE EXCEPTION 'TEST FAILED [E2/E3]: expected canceled/failed after cancel, got %', status; + END IF; + + RAISE NOTICE 'PASSED [E2/E3-c]: cancel terminated the stuck instance (status=%)', status; +END $$; + +-- ─── Cleanup ─────────────────────────────────────────────────────────────── + +DROP TABLE _stuck_state; + +SELECT 'TEST PASSED' AS result; diff --git a/tests/e2e/sql/60_orphaned_nodes.sql b/tests/e2e/sql/60_orphaned_nodes.sql new file mode 100644 index 00000000..3ec57d61 --- /dev/null +++ b/tests/e2e/sql/60_orphaned_nodes.sql @@ -0,0 +1,126 @@ +-- Test: Orphaned nodes — no FK constraint between df.instances and df.nodes (E1) +-- Demonstrates: Deleting an instance row leaves its nodes in df.nodes because +-- there is no cascading FK constraint. df.result() and df.status() +-- return NULL / unknown for the deleted instance; no crash. +-- +-- Findings documented: +-- - df.nodes rows survive after the parent df.instances row is deleted. +-- - No FK cascade means orphaned nodes can accumulate indefinitely. +-- - Functions that reference a deleted instance return gracefully (NULL). +-- +-- Expected: Node count is unchanged after the instance row is deleted; +-- df.status() returns NULL (or unknown), no exception thrown. +-- +-- Requires superuser to DELETE directly from df.instances (bypasses RLS). + +-- ─── Start a quick instance and let it complete ──────────────────────────── + +CREATE TEMP TABLE _orphan_state (instance_id TEXT, node_count BIGINT); + +INSERT INTO _orphan_state (instance_id) +SELECT df.start( + 'SELECT 1' ~> 'SELECT 2' ~> 'SELECT 3', + 'test-orphan-nodes-e1' +); + +DO $$ +DECLARE + inst_id TEXT; + status TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _orphan_state; + SELECT df.wait_for_completion(inst_id, 30) INTO status; + + IF status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [E1]: instance did not complete (status=%)', status; + END IF; + + RAISE NOTICE 'Instance % completed; counting nodes before delete', inst_id; +END $$; + +-- ─── Count nodes that belong to this instance ────────────────────────────── + +UPDATE _orphan_state +SET node_count = ( + SELECT COUNT(*) FROM df.nodes n + JOIN _orphan_state s ON n.instance_id = s.instance_id +); + +DO $$ +DECLARE + nc BIGINT; +BEGIN + SELECT node_count INTO nc FROM _orphan_state; + IF nc = 0 THEN + RAISE EXCEPTION 'TEST FAILED [E1]: expected nodes to exist for completed instance, found 0'; + END IF; + RAISE NOTICE 'Found % node(s) for the completed instance', nc; +END $$; + +-- ─── Delete the instance row directly (superuser, bypasses RLS) ─────────── + +DO $$ +DECLARE + inst_id TEXT; + deleted INT; +BEGIN + SELECT instance_id INTO inst_id FROM _orphan_state; + DELETE FROM df.instances WHERE id = inst_id; + GET DIAGNOSTICS deleted = ROW_COUNT; + + IF deleted = 0 THEN + RAISE EXCEPTION 'TEST FAILED [E1]: failed to delete instance row for %', inst_id; + END IF; + + RAISE NOTICE 'Deleted instance row for % (% row(s) affected)', inst_id, deleted; +END $$; + +-- ─── Verify nodes are still present (orphaned) ──────────────────────────── + +DO $$ +DECLARE + inst_id TEXT; + expected_nc BIGINT; + actual_nc BIGINT; +BEGIN + SELECT instance_id, node_count INTO inst_id, expected_nc FROM _orphan_state; + + SELECT COUNT(*) INTO actual_nc FROM df.nodes WHERE instance_id = inst_id; + + IF actual_nc != expected_nc THEN + RAISE EXCEPTION 'TEST FAILED [E1]: expected % orphaned node(s), found % (FK cascade may have run)', + expected_nc, actual_nc; + END IF; + + RAISE NOTICE 'PASSED [E1-a]: % node(s) remain after instance row deleted (no FK cascade)', actual_nc; +END $$; + +-- ─── Verify df.status() returns gracefully for deleted instance ─────────── + +DO $$ +DECLARE + inst_id TEXT; + status TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _orphan_state; + SELECT s INTO status FROM df.status(inst_id) s; + -- status should be NULL or 'unknown' — not an exception + RAISE NOTICE 'PASSED [E1-b]: df.status() returned % for deleted instance (no crash)', status; +END $$; + +-- ─── Clean up orphaned nodes (manual, since no cascade) ─────────────────── + +DO $$ +DECLARE + inst_id TEXT; + deleted INT; +BEGIN + SELECT instance_id INTO inst_id FROM _orphan_state; + DELETE FROM df.nodes WHERE instance_id = inst_id; + GET DIAGNOSTICS deleted = ROW_COUNT; + RAISE NOTICE 'Cleaned up % orphaned node(s)', deleted; +END $$; + +DROP TABLE _orphan_state; + +SELECT 'TEST PASSED' AS result; diff --git a/tests/e2e/sql/61_table_bloat.sql b/tests/e2e/sql/61_table_bloat.sql new file mode 100644 index 00000000..884bdab2 --- /dev/null +++ b/tests/e2e/sql/61_table_bloat.sql @@ -0,0 +1,134 @@ +-- Test: Table bloat measurement (E4 / E5) +-- Demonstrates: Completed instances leave rows in df.nodes and df.instances +-- indefinitely — there is no automatic GC or cleanup mechanism. +-- +-- Findings documented: +-- - Rows in df.nodes accumulate proportionally to total nodes across all +-- completed/failed/canceled instances (no pruning). +-- - Rows in df.instances likewise persist forever. +-- - Duroxide internal tables (_orchestration_history, etc.) also grow. +-- - Operators must plan for periodic manual cleanup or implement GC. +-- +-- Expected: After running N instances the row counts increase by exactly N +-- (or more, for multi-node graphs); they do not shrink on their own. + +-- ─── Baseline: capture current row counts ───────────────────────────────── + +CREATE TEMP TABLE _bloat_baseline AS +SELECT + (SELECT COUNT(*) FROM df.instances) AS inst_before, + (SELECT COUNT(*) FROM df.nodes) AS nodes_before; + +-- ─── Run a batch of instances with multiple nodes each ──────────────────── + +DO $$ +DECLARE + i INT; +BEGIN + FOR i IN 1..10 LOOP + -- Each graph has 3 nodes (seq of 3 SQL steps) + PERFORM df.start( + 'SELECT ' || i || ' AS step1' + ~> ('SELECT ' || i || ' * 2 AS step2') + ~> ('SELECT ' || i || ' * 3 AS step3'), + 'bloat-test-e4-e5-' || i + ); + END LOOP; + RAISE NOTICE 'Started 10 instances (3-step seq each; ≥10 node rows expected)'; +END $$; + +-- ─── Wait for all instances to complete ─────────────────────────────────── + +DO $$ +DECLARE + completed INT; + tries INT := 0; +BEGIN + LOOP + SELECT COUNT(*) INTO completed + FROM df.instances + WHERE label LIKE 'bloat-test-e4-e5-%' + AND lower(status) IN ('completed', 'failed', 'canceled', 'cancelled'); + + EXIT WHEN completed >= 10 OR tries > 600; + PERFORM pg_sleep(0.1); + tries := tries + 1; + END LOOP; + + IF completed < 10 THEN + RAISE EXCEPTION 'TEST FAILED [E4/E5]: only %/10 instances completed within 60s', completed; + END IF; + + RAISE NOTICE 'All 10 instances completed'; +END $$; + +-- ─── Measure growth and verify no automatic GC ran ──────────────────────── + +DO $$ +DECLARE + inst_before BIGINT; + nodes_before BIGINT; + inst_after BIGINT; + nodes_after BIGINT; + inst_delta BIGINT; + nodes_delta BIGINT; +BEGIN + SELECT b.inst_before, b.nodes_before + INTO inst_before, nodes_before + FROM _bloat_baseline b; + + SELECT COUNT(*) INTO inst_after FROM df.instances; + SELECT COUNT(*) INTO nodes_after FROM df.nodes; + + inst_delta := inst_after - inst_before; + nodes_delta := nodes_after - nodes_before; + + RAISE NOTICE 'df.instances: before=%, after=%, delta=%', + inst_before, inst_after, inst_delta; + RAISE NOTICE 'df.nodes: before=%, after=%, delta=%', + nodes_before, nodes_after, nodes_delta; + + -- Verify instances grew by exactly 10 + IF inst_delta != 10 THEN + RAISE EXCEPTION 'TEST FAILED [E4]: expected 10 new instance rows, got %', inst_delta; + END IF; + + -- Verify nodes grew by at least 10 (≥1 node per instance); exact count + -- depends on graph depth (a 3-step seq generates 5 nodes: 3 SQL + 2 THEN) + IF nodes_delta < 10 THEN + RAISE EXCEPTION 'TEST FAILED [E5]: expected at least 10 new node rows, got %', nodes_delta; + END IF; + + RAISE NOTICE 'PASSED [E4/E5]: % new instance rows, % new node rows (no automatic GC)', + inst_delta, nodes_delta; +END $$; + +-- ─── Report duroxide internal table sizes ───────────────────────────────── + +DO $$ +DECLARE + rec RECORD; +BEGIN + FOR rec IN + SELECT + schemaname || '.' || tablename AS full_name, + pg_size_pretty(pg_total_relation_size( + quote_ident(schemaname) || '.' || quote_ident(tablename) + )) AS total_size + FROM pg_tables + WHERE schemaname IN ('df', 'duroxide', '_duroxide') + ORDER BY pg_total_relation_size( + quote_ident(schemaname) || '.' || quote_ident(tablename) + ) DESC + LOOP + RAISE NOTICE 'Table bloat: % %', rec.full_name, rec.total_size; + END LOOP; + + RAISE NOTICE 'PASSED [E4/E5-b]: duroxide table sizes reported above (no GC exists)'; +END $$; + +-- ─── Cleanup ────────────────────────────────────────────────────────────── + +DROP TABLE _bloat_baseline; + +SELECT 'TEST PASSED' AS result; diff --git a/tests/e2e/sql/62_concurrent_sessions.sql b/tests/e2e/sql/62_concurrent_sessions.sql new file mode 100644 index 00000000..813b1e0b --- /dev/null +++ b/tests/e2e/sql/62_concurrent_sessions.sql @@ -0,0 +1,127 @@ +-- Test: Concurrent df.start() from multiple sessions (F1) +-- Demonstrates: Multiple PostgreSQL sessions calling df.start() simultaneously +-- produce unique instance IDs and all instances complete correctly. +-- +-- Method: dblink opens separate backend connections and calls df.start() in +-- each, simulating concurrent sessions more faithfully than a single- +-- session loop. +-- +-- Findings documented: +-- - Instance IDs are generated per-call (UUID-based); no collision observed +-- even with 10 concurrent sessions. +-- - Background worker handles a burst of instances from multiple sessions. +-- +-- Requires superuser (connection string for dblink uses current role). + +CREATE EXTENSION IF NOT EXISTS dblink; + +-- ─── Build a dblink connection string for the local database ────────────── + +CREATE TEMP TABLE _conc_conn AS +SELECT format( + 'host=localhost dbname=%s port=%s user=postgres', + current_database(), + current_setting('port') +) AS connstr; + +-- ─── Launch instances from 10 separate dblink connections concurrently ──── +-- Each dblink call runs in its own backend process (separate session). + +CREATE TEMP TABLE _conc_instances (session_num INT, instance_id TEXT); + +DO $$ +DECLARE + connstr TEXT; + inst_id TEXT; + i INT; +BEGIN + SELECT c.connstr INTO connstr FROM _conc_conn c; + + FOR i IN 1..10 LOOP + -- Each dblink call opens a NEW backend connection + SELECT * INTO inst_id FROM dblink( + connstr, + format( + $q$SELECT df.start( + df.sql('SELECT %s AS session_num, pg_sleep(0.05)'), + 'concurrent-session-%s' + )$q$, + i, i + ) + ) AS t(id TEXT); + + INSERT INTO _conc_instances (session_num, instance_id) + VALUES (i, inst_id); + + RAISE NOTICE 'Session % started instance %', i, inst_id; + END LOOP; +END $$; + +-- ─── Verify all 10 instance IDs are distinct ────────────────────────────── + +DO $$ +DECLARE + total_count INT; + distinct_count INT; +BEGIN + SELECT COUNT(*), COUNT(DISTINCT instance_id) + INTO total_count, distinct_count + FROM _conc_instances; + + IF distinct_count != 10 THEN + RAISE EXCEPTION 'TEST FAILED [F1]: expected 10 distinct instance IDs, got % distinct out of %', + distinct_count, total_count; + END IF; + + RAISE NOTICE 'PASSED [F1-a]: all 10 concurrent sessions produced distinct instance IDs'; +END $$; + +-- ─── Wait for all instances to reach a terminal state ───────────────────── + +DO $$ +DECLARE + completed INT; + tries INT := 0; +BEGIN + LOOP + SELECT COUNT(*) INTO completed + FROM _conc_instances c + JOIN df.instances i ON i.id = c.instance_id + WHERE lower(i.status) IN ('completed', 'failed', 'canceled', 'cancelled'); + + EXIT WHEN completed >= 10 OR tries > 600; + PERFORM pg_sleep(0.1); + tries := tries + 1; + END LOOP; + + IF completed < 10 THEN + RAISE EXCEPTION 'TEST FAILED [F1]: only %/10 concurrent instances completed within 60s', completed; + END IF; + + RAISE NOTICE 'PASSED [F1-b]: all 10 concurrent-session instances completed'; +END $$; + +-- ─── Verify no instances are stuck ──────────────────────────────────────── + +DO $$ +DECLARE + stuck INT; +BEGIN + SELECT COUNT(*) INTO stuck + FROM _conc_instances c + JOIN df.instances i ON i.id = c.instance_id + WHERE lower(i.status) IN ('pending', 'running'); + + IF stuck > 0 THEN + RAISE EXCEPTION 'TEST FAILED [F1]: % instances stuck in pending/running after concurrent start', stuck; + END IF; + + RAISE NOTICE 'PASSED [F1-c]: no instances stuck after concurrent multi-session start'; +END $$; + +-- ─── Cleanup ────────────────────────────────────────────────────────────── + +DROP TABLE _conc_instances; +DROP TABLE _conc_conn; + +SELECT 'TEST PASSED' AS result; diff --git a/tests/e2e/sql/63_shared_variable_race.sql b/tests/e2e/sql/63_shared_variable_race.sql new file mode 100644 index 00000000..b16e13a8 --- /dev/null +++ b/tests/e2e/sql/63_shared_variable_race.sql @@ -0,0 +1,172 @@ +-- Test: Shared variable race between sessions (F4) +-- Demonstrates: df.vars is a per-owner table — if two sessions with the same +-- PostgreSQL role write the same variable key, the second write +-- overwrites the first. A df.start() call that happens AFTER +-- the overwrite will capture the overwritten value, not the +-- original one intended by the first session. +-- +-- Method: Sequential dblink calls simulate the interleaving that can happen +-- with concurrent sessions: +-- 1. This session: setvar('race_key', 'first') +-- 2. Second "session" (dblink): setvar('race_key', 'overwritten') — overwrites +-- 3. This session: df.start() — captures 'overwritten', not 'first' +-- +-- Findings documented: +-- - Variables are captured as a snapshot at df.start() time from df.vars. +-- - Concurrent sessions sharing the same role share the same df.vars namespace +-- (owner = current_user::regrole), so overwrites are possible. +-- - Workaround: use unique variable names per workflow (e.g., UUID-prefixed). +-- +-- A second sub-test validates normal (non-racy) snapshot semantics: variables +-- set before df.start() are correctly captured, and later changes do NOT +-- affect already-started instances. +-- +-- Requires superuser (uses dblink with postgres credentials). + +CREATE EXTENSION IF NOT EXISTS dblink; + +-- ─── Build a dblink connection string ───────────────────────────────────── + +CREATE TEMP TABLE _race_conn AS +SELECT format( + 'host=localhost dbname=%s port=%s user=postgres', + current_database(), + current_setting('port') +) AS connstr; + +-- ─── Sub-test 1: Snapshot semantics — post-start changes do NOT affect ───── +-- a running instance. + +DO $$ +BEGIN + -- Set variable before starting the instance + PERFORM df.setvar('race_snapshot_key', 'original_value'); +END $$; + +CREATE TEMP TABLE _snap_state (instance_id TEXT); + +INSERT INTO _snap_state +SELECT df.start( + -- Workflow reads the captured variable via substitution + df.sql('SELECT $race_snapshot_key AS captured'), + 'race-snapshot-test' +); + +-- After start, change the variable — should not affect the already-started instance +DO $$ +BEGIN + PERFORM df.setvar('race_snapshot_key', 'changed_after_start'); +END $$; + +-- Wait for the instance to complete +DO $$ +DECLARE + inst_id TEXT; + status TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _snap_state; + SELECT df.wait_for_completion(inst_id, 30) INTO status; + + IF status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [F4-1]: snapshot instance did not complete (status=%)', status; + END IF; +END $$; + +-- Verify the instance captured the ORIGINAL value, not the post-start change +DO $$ +DECLARE + inst_id TEXT; + result TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _snap_state; + SELECT r INTO result FROM df.result(inst_id) r; + + IF result NOT LIKE '%original_value%' THEN + RAISE EXCEPTION 'TEST FAILED [F4-1]: expected "original_value" in result, got %. Variables are NOT snapshotted at start time.', + result; + END IF; + + RAISE NOTICE 'PASSED [F4-1]: instance captured original value (%) at start time; post-start change was ignored', result; +END $$; + +DROP TABLE _snap_state; + +-- ─── Sub-test 2: Cross-session overwrite race ───────────────────────────── +-- Session A sets the variable, then another session (simulated via dblink) +-- overwrites it, and then Session A starts its workflow. +-- The workflow captures the overwritten value, not session A's original. + +DO $$ +BEGIN + -- Session A sets the variable + PERFORM df.setvar('race_shared_key', 'session_A_value'); + RAISE NOTICE 'Session A set race_shared_key = session_A_value'; +END $$; + +-- "Session B" overwrites the variable via dblink +DO $$ +DECLARE + connstr TEXT; +BEGIN + SELECT c.connstr INTO connstr FROM _race_conn c; + + PERFORM dblink_exec( + connstr, + 'SELECT df.setvar(''race_shared_key'', ''session_B_overwrote'')' + ); + + RAISE NOTICE 'Session B (dblink) overwrote race_shared_key = session_B_overwrote'; +END $$; + +-- Session A now calls df.start() — it will capture the OVERWRITTEN value +CREATE TEMP TABLE _race_state (instance_id TEXT); + +INSERT INTO _race_state +SELECT df.start( + df.sql('SELECT $race_shared_key AS captured_race_value'), + 'race-cross-session' +); + +DO $$ +DECLARE + inst_id TEXT; + status TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _race_state; + SELECT df.wait_for_completion(inst_id, 30) INTO status; + + IF status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [F4-2]: cross-session race instance did not complete (status=%)', status; + END IF; +END $$; + +DO $$ +DECLARE + inst_id TEXT; + result TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _race_state; + SELECT r INTO result FROM df.result(inst_id) r; + + -- The instance should have captured session_B's overwrite (not session_A's value) + IF result NOT LIKE '%session_B_overwrote%' THEN + RAISE EXCEPTION 'TEST FAILED [F4-2]: expected "session_B_overwrote" in result, got %. Cross-session race behavior changed.', + result; + END IF; + + RAISE NOTICE 'PASSED [F4-2]: instance captured the overwritten value (%), demonstrating cross-session race risk', result; + RAISE NOTICE 'NOTE [F4-2]: Session A intended "session_A_value" but got "session_B_overwrote" — last-writer-wins.'; + RAISE NOTICE 'NOTE [F4-2]: Use unique per-workflow variable names (e.g. UUID-prefixed) to avoid cross-session races.'; +END $$; + +-- ─── Cleanup ────────────────────────────────────────────────────────────── + +DO $$ +BEGIN + PERFORM df.clearvars(); +END $$; + +DROP TABLE _race_state; +DROP TABLE _race_conn; + +SELECT 'TEST PASSED' AS result;