diff --git a/Cargo.lock b/Cargo.lock index 55e3e3d..500d612 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -689,8 +689,7 @@ checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" [[package]] name = "duroxide" version = "0.1.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9377f5bf81d9a8ce56a13f913f60ca0e0ba8a9464349c407e7d11c326488bf0c" +source = "git+https://github.com/microsoft/duroxide.git?branch=pinodeca%2Fcontinue-parent-link#6650410626110f5e29c32269bec0156b15d6ed7c" dependencies = [ "async-trait", "futures", @@ -702,6 +701,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 45a400a..fffe89c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,14 @@ serde_json = { version = "1.0", features = ["preserve_order"] } uuid = { version = "1.0", features = ["v4", "serde"] } # duroxide integration -duroxide = "=0.1.29" +# Using git dependency on pinodeca/continue-parent-link branch which preserves the +# parent link when a sub-orchestration calls continue_as_new, unblocking the +# sub-orchestration approach for df.loop. +# Compatibility: duroxide-pg 0.1.34 has been verified to compile and run correctly +# against this branch (same public API as 0.1.29 — PR #31 is a runtime-only change). +# Once the branch is merged and a new duroxide release is published, revert both +# entries back to crates.io version pins as a compatible pair. +duroxide = { git = "https://github.com/microsoft/duroxide.git", branch = "pinodeca/continue-parent-link" } duroxide-pg = "=0.1.34" tokio = { version = "1", features = ["rt-multi-thread", "sync", "time"] } @@ -55,6 +62,11 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } [dev-dependencies] pgrx-tests = "=0.16.1" +# Override the crates.io duroxide with the git branch for both the direct dependency +# and duroxide-pg's transitive dependency on duroxide. +[patch.crates-io] +duroxide = { git = "https://github.com/microsoft/duroxide.git", branch = "pinodeca/continue-parent-link" } + [profile.dev] panic = "unwind" diff --git a/USER_GUIDE.md b/USER_GUIDE.md index 12efe7d..2d9e9b6 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -946,6 +946,12 @@ SELECT df.start( Use `@>` operator or `df.loop()` to create functions that run forever. Each iteration creates a new execution with fresh state (via continue-as-new). +> **Note:** Each `df.loop()` runs as its own child orchestration. The loop's iterations +> (the continue-as-new generations) are scoped to that child, so any nodes *before* the +> loop in the graph run exactly once and any nodes *after* the loop run once the loop +> exits. When inspecting `df.instances`, a looping function therefore shows a parent +> instance plus a separate child instance for the loop. + ```sql -- Simple heartbeat every 30 seconds (using @> operator) SELECT df.start( diff --git a/src/orchestrations/execute_function_graph.rs b/src/orchestrations/execute_function_graph.rs index fd17722..ca7c687 100644 --- a/src/orchestrations/execute_function_graph.rs +++ b/src/orchestrations/execute_function_graph.rs @@ -526,32 +526,109 @@ async fn execute_wait_schedule_node( /// deficit so an empty-bodied loop can't busy-spin via continue_as_new. const LOOP_MIN_ITER_DURATION: Duration = Duration::from_secs(1); -async fn execute_loop_node( - ctx: &OrchestrationContext, - graph: &FunctionGraph, - node: &FunctionNode, - node_id: &str, - results: &mut HashMap, - exec_ctx: &ExecutionContext, -) -> NodeResult { +/// Orchestration name for the loop sub-orchestration. +/// +/// Each `df.loop()` node spawns a child orchestration under this name. The +/// child handles all iterations via `continue_as_new`; when the loop exits it +/// returns a `SubtreeEnvelope` to the parent. The parent link is preserved +/// across `continue_as_new` generations by duroxide (see duroxide PR #31), so +/// the parent orchestration is notified when the loop finally completes. +pub const LOOP_NAME: &str = "pg_durable::orchestration::execute-loop"; + +/// Build the `SubtreeEnvelope` a loop returns to its parent on exit. +/// +/// A loop always exits with a *normal* result: a `df.break()` inside the body is the loop's +/// own terminator (caught here as `NodeError::Break`), not a break that should unwind past +/// the loop, so the envelope is always tagged `Normal`. `execute_loop_node` merges `results` +/// back into the parent map via `parse_subtree_envelope`. +fn loop_exit_envelope(result: String, results: HashMap) -> Result { + let envelope = SubtreeEnvelope { + control: Some(SubtreeControl::Normal), + result, + results, + }; + serde_json::to_string(&envelope).map_err(|e| format!("Failed to serialize loop envelope: {e}")) +} + +/// Sub-orchestration that runs a single loop iteration and either returns or +/// calls `continue_as_new` for the next iteration. +/// +/// Input JSON: +/// ```json +/// { "instance_id": "...", "loop_node_id": "...", +/// "results": "", "vars": "", "label": "..." } +/// ``` +/// +/// `load_function_graph` is called at the start of **every** generation +/// (including after `continue_as_new`) so that cross-iteration security +/// tampering is caught and the instance is failed — the same guarantee the +/// main `execute()` orchestration provides at its generation boundary. +/// +/// On loop exit the function returns a `SubtreeEnvelope` containing the final +/// result and any named results accumulated during the loop. +pub async fn execute_loop(ctx: OrchestrationContext, input_json: String) -> Result { + let input: serde_json::Value = serde_json::from_str(&input_json) + .map_err(|e| format!("Failed to parse ExecuteLoop input: {e}"))?; + + let instance_id = input["instance_id"] + .as_str() + .ok_or("Missing instance_id in ExecuteLoop input")? + .to_string(); + let loop_node_id = input["loop_node_id"] + .as_str() + .ok_or("Missing loop_node_id in ExecuteLoop input")? + .to_string(); + let results_json = input["results"] + .as_str() + .ok_or("Missing results in ExecuteLoop input")?; + + // re-load the graph from the database on every generation — this re-validates + // submitted_by and catches cross-iteration security tampering. + let graph_json = ctx + .schedule_activity(activities::load_function_graph::NAME, instance_id.clone()) + .await?; + let graph: FunctionGraph = serde_json::from_str(&graph_json) + .map_err(|e| format!("Failed to parse graph in ExecuteLoop: {e}"))?; + + let mut results: HashMap = serde_json::from_str(results_json) + .map_err(|e| format!("Failed to parse results in ExecuteLoop: {e}"))?; + + let vars: HashMap = if let Some(vars_str) = input["vars"].as_str() { + serde_json::from_str(vars_str) + .map_err(|e| format!("Failed to parse vars in ExecuteLoop: {e}"))? + } else { + HashMap::new() + }; + let label: Option = input["label"].as_str().map(|s| s.to_string()); + + let exec_ctx = ExecutionContext { vars, label }; + + let node = graph + .nodes + .get(&loop_node_id) + .ok_or_else(|| format!("Loop node not found: {loop_node_id}"))?; + let body_id = node .left_node .as_ref() - .ok_or_else(|| format!("LOOP node {node_id} has no body"))?; + .ok_or_else(|| format!("LOOP node {loop_node_id} has no body"))? + .clone(); - // Capture the iteration start time so we can rate-limit `continue_as_new` - // below. `utc_now()` is duroxide's deterministic clock (recorded in - // history and replayed verbatim), so this remains replay-safe. + // Capture iteration start time for rate-limiting continue_as_new. let iter_started = ctx.utc_now().await.ok(); ctx.trace_info("Executing loop iteration"); - // The loop is the only place that catches `NodeError::Break`: a break unwinds through - // every compound node in the body via `?` and is converted here into a normal loop exit. - // A `Failure` still propagates out of the loop unchanged. - let body_result = match Box::pin(execute_function_node_with_vars( - ctx, graph, body_id, results, exec_ctx, - )) + // The loop is where `NodeError::Break` is caught: a break unwinds through the body via + // `?` and is converted here into the loop's normal exit value. A `Failure` propagates + // out of the sub-orchestration unchanged. + let body_result = match execute_function_node_with_vars( + &ctx, + &graph, + &body_id, + &mut results, + &exec_ctx, + ) .await { Ok(v) => v, @@ -559,27 +636,34 @@ async fn execute_loop_node( ctx.trace_info(format!( "Loop terminated by break with value: {break_value}" )); - store_named_result(ctx, node, &break_value, results, "LOOP"); - return Ok(break_value); + store_named_result(&ctx, node, &break_value, &mut results, "LOOP"); + return loop_exit_envelope(break_value, results); } - Err(e @ NodeError::Failure(_)) => return Err(e), + Err(NodeError::Failure(e)) => return Err(e), }; - // Check while-condition if present + // While-condition: if present and false, exit the loop. if let Some(ref config_str) = node.query { if let Ok(config) = serde_json::from_str::(config_str) { if let Some(condition_node_id) = config["condition_node"].as_str() { ctx.trace_info("Evaluating loop condition"); - let condition_result = Box::pin(execute_function_node_with_vars( - ctx, - graph, + let condition_result = match execute_function_node_with_vars( + &ctx, + &graph, condition_node_id, - results, - exec_ctx, - )) - .await?; + &mut results, + &exec_ctx, + ) + .await + { + Ok(v) => v, + Err(NodeError::Break(break_value)) => { + store_named_result(&ctx, node, &break_value, &mut results, "LOOP"); + return loop_exit_envelope(break_value, results); + } + Err(NodeError::Failure(e)) => return Err(e), + }; - // Parse condition result to check truthiness (uses evaluate_condition to extract boolean from SQL result) let should_continue = evaluate_condition(&condition_result).unwrap_or(false); ctx.trace_info(format!( "Loop condition evaluated to: {condition_result} (continue={should_continue})" @@ -587,20 +671,14 @@ async fn execute_loop_node( if !should_continue { ctx.trace_info("Loop condition false, exiting loop"); - store_named_result(ctx, node, &body_result, results, "LOOP"); - return Ok(body_result); + store_named_result(&ctx, node, &body_result, &mut results, "LOOP"); + return loop_exit_envelope(body_result, results); } } } } - ctx.trace_info("Continuing as new for next loop iteration"); - - // Enforce a minimum per-iteration wall-clock duration to prevent - // busy-looping (e.g. `df.loop(df.sleep(0))`). Compute the elapsed time - // from the deterministic clock; if the iteration finished faster than - // LOOP_MIN_ITER_DURATION, schedule a timer for the deficit so the next - // continue_as_new is gated by at least that much real-clock time. + // Enforce a minimum per-iteration wall-clock duration to prevent busy-looping. if let Some(started) = iter_started { if let Ok(now) = ctx.utc_now().await { let elapsed = now.duration_since(started).unwrap_or(Duration::ZERO); @@ -615,19 +693,83 @@ async fn execute_loop_node( } } - // Preserve vars in continue_as_new input - let new_input = FunctionInput { - instance_id: graph.instance_id.clone(), - label: exec_ctx.label.clone(), - vars: exec_ctx.vars.clone(), - }; + // Another iteration needed: continue_as_new within this sub-orchestration. + // The parent orchestration keeps its awaiting handle because duroxide preserves + // the parent link across continue_as_new (duroxide PR #31). + ctx.trace_info(format!( + "Loop continuing with continue_as_new at node {loop_node_id}" + )); + let new_results_json = serde_json::to_string(&results) + .map_err(|e| format!("Failed to serialize updated results: {e}"))?; + let mut new_input = input.clone(); + new_input["results"] = serde_json::Value::String(new_results_json); + let new_input_json = serde_json::to_string(&new_input) + .map_err(|e| format!("Failed to serialize loop input: {e}"))?; + ctx.continue_as_new(new_input_json) + .await + .map(|_| String::new()) + .map_err(|e| format!("continue_as_new failed: {e:?}")) +} + +/// Build a deterministic, generation-qualified child instance ID for a sub-orchestration. +/// +/// Duroxide's auto-generated child IDs (`{parent}::sub::{event_id}`) reset their event +/// counter across `continue_as_new` generations. When a loop body itself spawns +/// sub-orchestrations (a nested `df.loop`, or a parallel/race branch), each loop +/// generation would otherwise re-derive the *same* child ID, colliding with the previous +/// (now terminal) generation's child and stalling forever. Embedding the current +/// execution (generation) ID plus the spawning node ID makes the child ID unique per +/// generation while staying deterministic across replays of the same generation. +fn child_instance_id(ctx: &OrchestrationContext, tag: &str, node_id: &str) -> String { + format!( + "{}::e{}::{tag}::{node_id}", + ctx.instance_id(), + ctx.execution_id() + ) +} + +async fn execute_loop_node( + ctx: &OrchestrationContext, + graph: &FunctionGraph, + node: &FunctionNode, + node_id: &str, + results: &mut HashMap, + exec_ctx: &ExecutionContext, +) -> NodeResult { + // Validate that the loop has a body before spawning the sub-orchestration. + node.left_node + .as_ref() + .ok_or_else(|| format!("LOOP node {node_id} has no body"))?; + + let results_json = + serde_json::to_string(results).map_err(|e| format!("Failed to serialize results: {e}"))?; + let vars_json = serde_json::to_string(&exec_ctx.vars) + .map_err(|e| format!("Failed to serialize vars: {e}"))?; + + let loop_input = serde_json::json!({ + "instance_id": graph.instance_id, + "loop_node_id": node_id, + "results": results_json, + "vars": vars_json, + "label": exec_ctx.label, + }) + .to_string(); - // duroxide 0.1.1: continue_as_new returns an awaitable future - return it directly - return ctx - .continue_as_new(serde_json::to_string(&new_input).unwrap_or(graph.instance_id.clone())) + ctx.trace_info(format!( + "Spawning loop sub-orchestration for node {node_id}" + )); + + let child_id = child_instance_id(ctx, "loop", node_id); + let raw = ctx + .schedule_sub_orchestration_with_id(LOOP_NAME, child_id, loop_input) .await - .map(|_| body_result) - .map_err(|e| NodeError::Failure(format!("continue_as_new failed: {e:?}"))); + .map_err(|e| format!("Loop sub-orchestration failed: {e}"))?; + + // Merge named results from the loop sub-orchestration back into the parent map and + // return the loop's final result. The loop always returns a `Normal` envelope (a break + // inside the body is the loop's own terminator), so `parse_subtree_envelope` will not + // re-raise a `NodeError::Break` here. + parse_subtree_envelope(&raw, "LOOP", results) } async fn execute_break_node( @@ -853,8 +995,11 @@ async fn execute_join_node( }) .to_string(); - // Build list of branch inputs - let mut branch_inputs = vec![left_input, right_input]; + // Build list of (branch node id, branch input) pairs. + let mut branch_inputs: Vec<(String, String)> = vec![ + (left_id.clone(), left_input), + (right_id.clone(), right_input), + ]; // Check for extra nodes (join3) if let Some(config_str) = &node.query { @@ -870,17 +1015,20 @@ async fn execute_join_node( "label": exec_ctx.label }) .to_string(); - branch_inputs.push(extra_input); + branch_inputs.push((extra_id.to_string(), extra_input)); } } } } } - // Schedule sub-orchestrations and collect DurableFutures + // Schedule sub-orchestrations and collect DurableFutures. Use explicit, + // generation-qualified child IDs so a JOIN inside a loop body does not collide + // across continue_as_new generations. let mut durable_futures = Vec::new(); - for input in branch_inputs { - let fut = ctx.schedule_sub_orchestration(SUBTREE_NAME, input); + for (branch_node_id, input) in branch_inputs { + let child_id = child_instance_id(ctx, "subtree", &branch_node_id); + let fut = ctx.schedule_sub_orchestration_with_id(SUBTREE_NAME, child_id, input); durable_futures.push(fut); } @@ -972,9 +1120,18 @@ async fn execute_race_node( }) .to_string(); - // Schedule sub-orchestrations - let left_fut = ctx.schedule_sub_orchestration(SUBTREE_NAME, left_input); - let right_fut = ctx.schedule_sub_orchestration(SUBTREE_NAME, right_input); + // Schedule sub-orchestrations with explicit, generation-qualified child IDs so a + // RACE inside a loop body does not collide across continue_as_new generations. + let left_fut = ctx.schedule_sub_orchestration_with_id( + SUBTREE_NAME, + child_instance_id(ctx, "subtree", left_id), + left_input, + ); + let right_fut = ctx.schedule_sub_orchestration_with_id( + SUBTREE_NAME, + child_instance_id(ctx, "subtree", right_id), + right_input, + ); // Use ctx.select2() - first to complete wins // select2 now returns Either2 instead of (winner_idx, DurableOutput) diff --git a/src/registry.rs b/src/registry.rs index 03e5c73..f20d128 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -55,5 +55,9 @@ pub fn create_orchestration_registry() -> OrchestrationRegistry { orchestrations::execute_function_graph::SUBTREE_NAME, orchestrations::execute_function_graph::execute_subtree, ) + .register( + orchestrations::execute_function_graph::LOOP_NAME, + orchestrations::execute_function_graph::execute_loop, + ) .build() } diff --git a/tests/e2e/sql/24_nonroot_loop.sql b/tests/e2e/sql/24_nonroot_loop.sql new file mode 100644 index 0000000..4aca198 --- /dev/null +++ b/tests/e2e/sql/24_nonroot_loop.sql @@ -0,0 +1,409 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Regression tests for: df.loop continue_as_new restarts from root when loop is not root +-- (https://github.com/microsoft/pg_durable/issues/227) +-- +-- When a loop is not the root node of a function graph (i.e., there are prefix or suffix +-- nodes), continue_as_new must NOT re-execute the prefix or skip the suffix. Loops are +-- now executed as a scoped sub-orchestration so continue_as_new is scoped to the loop +-- child, leaving the parent graph orchestration parked. + +SET SESSION AUTHORIZATION df_e2e_user; + +-- === Test 1: Non-root loop — prefix runs once, body runs N times === +-- +-- Graph: INSERT into prefix_table ~> df.loop(INSERT into body_table ~> break after 3) +-- Expected: prefix_table has exactly 1 row after completion, body_table has exactly 3 rows. + +DROP TABLE IF EXISTS test_nonroot_prefix; +DROP TABLE IF EXISTS test_nonroot_body; +CREATE TABLE test_nonroot_prefix (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_nonroot_body (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t1 AS +SELECT df.start( + df.seq( + 'INSERT INTO test_nonroot_prefix DEFAULT VALUES', + df.loop( + 'INSERT INTO test_nonroot_body DEFAULT VALUES' + ~> ( + 'SELECT COUNT(*) >= 3 FROM test_nonroot_body' + ?> df.break() + !> df.sleep(1) + ) + ) + ), + 'test-nonroot-loop-prefix' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_prefix INT; + v_body INT; +BEGIN + SELECT instance_id INTO v_id FROM _t1; + RAISE NOTICE 'Test 1 - non-root loop prefix: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-prefix]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_prefix FROM test_nonroot_prefix; + SELECT COUNT(*) INTO v_body FROM test_nonroot_body; + + IF v_prefix != 1 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-prefix]: prefix ran % time(s) (expected 1); ' + 'prefix nodes must not be re-executed on each loop iteration', v_prefix; + END IF; + + IF v_body != 3 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-prefix]: body ran % time(s) (expected 3)', v_body; + END IF; + + RAISE NOTICE 'PASSED: non-root loop prefix — prefix ran once, body ran 3 times'; +END $$; + +DROP TABLE _t1; +DROP TABLE test_nonroot_prefix; +DROP TABLE test_nonroot_body; + +-- === Test 2: Non-root loop — prefix once, body N times, suffix once === +-- +-- Graph: INSERT prefix ~> df.loop(body, break after 2) ~> INSERT suffix +-- Expected: prefix_table = 1 row, body_table = 2 rows, suffix_table = 1 row. + +DROP TABLE IF EXISTS test_nonroot2_prefix; +DROP TABLE IF EXISTS test_nonroot2_body; +DROP TABLE IF EXISTS test_nonroot2_suffix; +CREATE TABLE test_nonroot2_prefix (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_nonroot2_body (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_nonroot2_suffix (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t2 AS +SELECT df.start( + df.seq( + 'INSERT INTO test_nonroot2_prefix DEFAULT VALUES', + df.seq( + df.loop( + 'INSERT INTO test_nonroot2_body DEFAULT VALUES' + ~> ( + 'SELECT COUNT(*) >= 2 FROM test_nonroot2_body' + ?> df.break() + !> df.sleep(1) + ) + ), + 'INSERT INTO test_nonroot2_suffix DEFAULT VALUES' + ) + ), + 'test-nonroot-loop-prefix-suffix' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_prefix INT; + v_body INT; + v_suffix INT; +BEGIN + SELECT instance_id INTO v_id FROM _t2; + RAISE NOTICE 'Test 2 - non-root loop prefix+suffix: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-suffix]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_prefix FROM test_nonroot2_prefix; + SELECT COUNT(*) INTO v_body FROM test_nonroot2_body; + SELECT COUNT(*) INTO v_suffix FROM test_nonroot2_suffix; + + IF v_prefix != 1 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-suffix]: prefix ran % time(s) (expected 1)', v_prefix; + END IF; + + IF v_body != 2 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-suffix]: body ran % time(s) (expected 2)', v_body; + END IF; + + IF v_suffix != 1 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-suffix]: suffix ran % time(s) (expected 1)', v_suffix; + END IF; + + RAISE NOTICE 'PASSED: non-root loop prefix+suffix — prefix once, body twice, suffix once'; +END $$; + +DROP TABLE _t2; +DROP TABLE test_nonroot2_prefix; +DROP TABLE test_nonroot2_body; +DROP TABLE test_nonroot2_suffix; + +-- === Test 3: Non-root loop — named result from prefix available inside loop body === +-- +-- Verify that named results accumulated before the loop are still accessible +-- inside the loop body after continue_as_new (they are preserved in the loop +-- sub-orchestration's input). + +DROP TABLE IF EXISTS test_nonroot3_log; +CREATE TABLE test_nonroot3_log (id SERIAL, val TEXT, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t3 AS +SELECT df.start( + df.seq( + ('SELECT ''hello'' AS greeting' |=> 'prefix_result'), + df.loop( + ($$INSERT INTO test_nonroot3_log (val) + VALUES ($prefix_result) + RETURNING val$$ + |=> 'last_val') + ~> ( + 'SELECT COUNT(*) >= 2 FROM test_nonroot3_log' + ?> df.break() + !> df.sleep(1) + ) + ) + ), + 'test-nonroot-loop-named-result' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_cnt INT; + v_val TEXT; +BEGIN + SELECT instance_id INTO v_id FROM _t3; + RAISE NOTICE 'Test 3 - non-root loop uses prefix named result: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-named]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_cnt FROM test_nonroot3_log; + IF v_cnt != 2 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-named]: expected 2 rows, got %', v_cnt; + END IF; + + SELECT val INTO v_val FROM test_nonroot3_log ORDER BY id LIMIT 1; + IF v_val != 'hello' THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-named]: expected ''hello'', got ''%''', v_val; + END IF; + + RAISE NOTICE 'PASSED: non-root loop uses prefix named result across iterations'; +END $$; + +DROP TABLE _t3; +DROP TABLE test_nonroot3_log; + +-- === Test 4: Nested loop — a loop body that itself contains a loop === +-- +-- Each df.loop() spawns an execute_loop sub-orchestration, so a nested loop spawns a +-- child execute_loop from *within* another execute_loop generation. This verifies that +-- continue_as_new in the outer loop does not disturb the inner loop and vice versa. +-- +-- Outer loop runs 2 iterations (break when outer_marker has 2 rows); each outer iteration +-- runs an inner loop that inserts exactly one row and breaks immediately. +-- Expected: outer_marker = 2 rows, inner_table = 2 rows. + +DROP TABLE IF EXISTS test_nested_outer; +DROP TABLE IF EXISTS test_nested_inner; +CREATE TABLE test_nested_outer (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_nested_inner (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t4 AS +SELECT df.start( + df.loop( + 'INSERT INTO test_nested_outer DEFAULT VALUES' + ~> df.loop( + 'INSERT INTO test_nested_inner DEFAULT VALUES' + ~> df.break() + ) + ~> ( + 'SELECT COUNT(*) >= 2 FROM test_nested_outer' + ?> df.break() + !> df.sleep(1) + ) + ), + 'test-nested-loop' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_outer INT; + v_inner INT; +BEGIN + SELECT instance_id INTO v_id FROM _t4; + RAISE NOTICE 'Test 4 - nested loop: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [nested]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_outer FROM test_nested_outer; + SELECT COUNT(*) INTO v_inner FROM test_nested_inner; + + IF v_outer != 2 THEN + RAISE EXCEPTION 'TEST FAILED [nested]: outer ran % time(s) (expected 2)', v_outer; + END IF; + + IF v_inner != 2 THEN + RAISE EXCEPTION 'TEST FAILED [nested]: inner ran % time(s) (expected 2)', v_inner; + END IF; + + RAISE NOTICE 'PASSED: nested loop — outer ran twice, inner ran once per outer iteration'; +END $$; + +DROP TABLE _t4; +DROP TABLE test_nested_outer; +DROP TABLE test_nested_inner; + +-- === Test 5: Loop inside a JOIN branch === +-- +-- JOIN branches execute as execute_subtree sub-orchestrations, so a loop in a branch +-- spawns an execute_loop child from *within* execute_subtree. This verifies the loop +-- sub-orchestration nests correctly under a parallel branch and that the JOIN still +-- completes once both branches finish. +-- +-- Left branch inserts one row; right branch loops until join_loop has 2 rows. +-- Expected: join_left = 1 row, join_loop = 2 rows. + +DROP TABLE IF EXISTS test_join_left; +DROP TABLE IF EXISTS test_join_loop; +CREATE TABLE test_join_left (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_join_loop (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t5 AS +SELECT df.start( + 'INSERT INTO test_join_left DEFAULT VALUES' + & df.loop( + 'INSERT INTO test_join_loop DEFAULT VALUES' + ~> ( + 'SELECT COUNT(*) >= 2 FROM test_join_loop' + ?> df.break() + !> df.sleep(1) + ) + ), + 'test-loop-in-join-branch' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_left INT; + v_loop INT; +BEGIN + SELECT instance_id INTO v_id FROM _t5; + RAISE NOTICE 'Test 5 - loop in JOIN branch: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [loop-in-join]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_left FROM test_join_left; + SELECT COUNT(*) INTO v_loop FROM test_join_loop; + + IF v_left != 1 THEN + RAISE EXCEPTION 'TEST FAILED [loop-in-join]: left branch ran % time(s) (expected 1)', v_left; + END IF; + + IF v_loop != 2 THEN + RAISE EXCEPTION 'TEST FAILED [loop-in-join]: loop branch body ran % time(s) (expected 2)', v_loop; + END IF; + + RAISE NOTICE 'PASSED: loop in JOIN branch — left ran once, loop body ran twice'; +END $$; + +DROP TABLE _t5; +DROP TABLE test_join_left; +DROP TABLE test_join_loop; + +-- === Test 6: Non-root while-loop — prefix once, while-condition exit, suffix once === +-- +-- The earlier tests exit via df.break(); this one exits via a false while-condition +-- (df.loop(body, condition)). The condition node also runs inside the loop +-- sub-orchestration, so this exercises the while-false exit path across generations. +-- +-- Graph: INSERT prefix ~> df.loop(body, 'COUNT < 3') ~> INSERT suffix +-- Expected: prefix = 1 row, body = 3 rows (loop stops when count reaches 3), suffix = 1 row. + +DROP TABLE IF EXISTS test_while_prefix; +DROP TABLE IF EXISTS test_while_body; +DROP TABLE IF EXISTS test_while_suffix; +CREATE TABLE test_while_prefix (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_while_body (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_while_suffix (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t6 AS +SELECT df.start( + df.seq( + 'INSERT INTO test_while_prefix DEFAULT VALUES', + df.seq( + df.loop( + 'INSERT INTO test_while_body DEFAULT VALUES' ~> df.sleep(1), + 'SELECT COUNT(*) < 3 FROM test_while_body' + ), + 'INSERT INTO test_while_suffix DEFAULT VALUES' + ) + ), + 'test-nonroot-while-loop' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_prefix INT; + v_body INT; + v_suffix INT; +BEGIN + SELECT instance_id INTO v_id FROM _t6; + RAISE NOTICE 'Test 6 - non-root while loop: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-while]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_prefix FROM test_while_prefix; + SELECT COUNT(*) INTO v_body FROM test_while_body; + SELECT COUNT(*) INTO v_suffix FROM test_while_suffix; + + IF v_prefix != 1 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-while]: prefix ran % time(s) (expected 1)', v_prefix; + END IF; + + IF v_body != 3 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-while]: body ran % time(s) (expected 3)', v_body; + END IF; + + IF v_suffix != 1 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-while]: suffix ran % time(s) (expected 1)', v_suffix; + END IF; + + RAISE NOTICE 'PASSED: non-root while loop — prefix once, body 3x via while-condition, suffix once'; +END $$; + +DROP TABLE _t6; +DROP TABLE test_while_prefix; +DROP TABLE test_while_body; +DROP TABLE test_while_suffix; + +RESET SESSION AUTHORIZATION; +SELECT 'TEST PASSED' AS result;