diff --git a/USER_GUIDE.md b/USER_GUIDE.md index 6d2021f9..793b5eba 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -192,6 +192,8 @@ df.sql('SELECT 1') ~> df.sql('SELECT 2') | `df.clearvars()` | Clear all durable function variables | `df.clearvars()` | | `df.wait_for_signal(name)` | Wait for external signal | `df.wait_for_signal('approval')` | | `df.wait_for_signal(name, timeout)` | Wait with timeout (seconds) | `df.wait_for_signal('approval', 3600)` | +| `df.await_instance(id, timeout)` | Durably wait for another instance | `df.await_instance('a1b2c3d4', 300)` | +| `df.call_child(func, label, options)` | Start a child workflow and wait for it | `df.call_child('SELECT 1', 'child-job', '{"timeout_seconds":300}')` | | `df.signal(id, name, data)` | Send signal to instance | `df.signal('a1b2', 'go', '{}')` | ### Operators @@ -1153,6 +1155,22 @@ SELECT df.start( -- (e.g., via a webhook endpoint that calls df.signal) ``` +### Example: Parent waits for child workflow + +```sql +SELECT df.start( + df.call_child( + 'SELECT json_build_object(''report_id'', 42, ''status'', ''ready'')', + 'generate-report', + '{"timeout_seconds": 300}' + ) |=> 'child' + ~> 'INSERT INTO audit_log(payload) VALUES ($child::jsonb)', + 'parent-workflow' +); +``` + +`df.call_child(...)` returns a JSON envelope with the child `instance_id`, final `status`, and child `result`. To wait on an already-started instance instead, use `df.await_instance(...)`. + --- ## Multi-Database Support diff --git a/docs/child-orchestration-design.md b/docs/child-orchestration-design.md new file mode 100644 index 00000000..6bfcc066 --- /dev/null +++ b/docs/child-orchestration-design.md @@ -0,0 +1,77 @@ +# Child orchestration primitives (`df.call_child` / `df.await_instance`) + +## Summary + +pg_durable now exposes two graph-composable primitives for parent-waits-for-child flows: + +- `df.await_instance(instance_id text, timeout_seconds int default null)` +- `df.call_child(fut text, label text default null, options jsonb default null)` + +Both return Durofut JSON, so they compose naturally inside `df.seq`, `df.join`, `df.race`, and operator-based graphs. + +## API shape + +```sql +-- Wait for an already-started instance to reach a terminal state. +df.await_instance(instance_id text, timeout_seconds int default null) returns text + +-- Start a child workflow and then durably wait for it. +df.call_child( + fut text, + label text default null, + options jsonb default null +) returns text +``` + +### Supported `df.call_child` options + +- `timeout_seconds` — timeout passed to the wait phase +- `database` — database argument forwarded to the child `df.start(...)` +- `on_failure` — currently only `"raise"` is supported + +## Semantics in v0.2.1 + +### Result shape + +On success, both primitives resolve to a JSON envelope: + +```json +{ + "instance_id": "", + "status": "completed", + "result": +} +``` + +If the child output is valid JSON, it is embedded as JSON. Otherwise it is returned as a JSON string. + +### Failure semantics + +- `completed` child → success envelope above +- `failed` / `cancelled` child → raises in the parent +- timeout → raises in the parent + +This settles the default behavior as **raise on non-success**. + +### Cancellation propagation + +Parent cancellation does **not** automatically cancel children started via `df.call_child` in this release. Children are regular durable instances started through `df.start(...)`, so they continue independently unless cancelled separately. + +### Identity exposure + +The child `instance_id` is included in the success envelope so callers can inspect child state through existing monitoring APIs. + +### Variable and label inheritance + +- Labels do not inherit automatically; `df.call_child(..., label => ...)` sets the child label explicitly. +- `df.vars` inheritance uses the existing `df.start(...)` behavior because `df.call_child` starts the child through `df.start(...)`. Variables visible to the running child are whatever `df.start(...)` captures in that child-starting SQL step. + +## Implementation notes + +- `df.await_instance` is implemented as a dedicated `AWAIT_INSTANCE` node type. +- The orchestration polls child status durably through an activity plus a durable timer, so the parent suspends without holding a backend session. +- `df.call_child` is a convenience wrapper that expands to: + 1. a SQL node that calls `df.start(...)` for the child and stores the returned child `instance_id` + 2. an `AWAIT_INSTANCE` node that waits on that stored `instance_id` + +This keeps the implementation small while still giving users a first-class, graph-composable child orchestration primitive. diff --git a/docs/upgrade-testing.md b/docs/upgrade-testing.md index 6c717d9d..6242d749 100644 --- a/docs/upgrade-testing.md +++ b/docs/upgrade-testing.md @@ -235,6 +235,12 @@ what the upgrade script handles, and any backward compatibility considerations. - **Scenario B1 considerations:** No backward compatibility concern. `df.if_rows` is a new function that doesn't exist in v0.1.1 schemas — it simply won't be callable until the customer runs `ALTER EXTENSION UPDATE`. The `.so` symbol exists but is never invoked from old schemas. All other changes (substitution engine rewrite, `Result` return type) are internal to orchestration code and don't touch any SQL queries or table schemas. - **Scenario B2 considerations:** No data migration needed. The change is purely additive (new function) with no table or column changes. +#### Child orchestration helpers — df.await_instance / df.call_child +- **DDL change:** Upgrade script adds `df.await_instance(text, integer)` and `df.call_child(text, text, jsonb)` and refreshes helper SQL definitions (`df.ensure_durofut`, `df.grant_usage`) so upgraded installations recognize the new `AWAIT_INSTANCE` node type and can grant the new functions. +- **Scenario A considerations:** Fresh install picks up the new C-language functions from pgrx-generated SQL; the upgrade script must add matching `CREATE FUNCTION` entries and keep helper SQL bodies in sync. +- **Scenario B1 considerations:** No binary-compatibility risk for pre-upgrade schemas. These are additive SQL entrypoints: older schemas simply cannot call them until `ALTER EXTENSION UPDATE`, while existing SQL queries and table shapes remain unchanged. +- **Scenario B2 considerations:** No table/data migration is required. Existing instances continue to work, and upgraded roles should re-run `df.grant_usage(...)` if they rely on the helper to grant access to newly added functions. + #### Connection Limits — GUC-controlled pool sizing and backpressure - **DDL change:** None. All changes are runtime-only (pool consolidation, semaphore backpressure, new GUCs). - **Scenario A considerations:** No schema changes — the `df` schema equivalence contract is unchanged. diff --git a/sql/pg_durable--0.2.0--0.2.1.sql b/sql/pg_durable--0.2.0--0.2.1.sql index 088b0bfd..ffacb53c 100644 --- a/sql/pg_durable--0.2.0--0.2.1.sql +++ b/sql/pg_durable--0.2.0--0.2.1.sql @@ -1,5 +1,132 @@ -- pg_durable upgrade: 0.2.0 → 0.2.1 -- --- No schema changes in this release. --- This upgrade removes the dependency on the `ring` crate (switched to --- native-tls), includes security hardening fixes, and other bug fixes. +-- Add durable child-orchestration helpers and refresh helper/grant functions +-- so upgraded installations can use the new API surface immediately. + +CREATE FUNCTION df."await_instance"( + "instance_id" TEXT, + "timeout_seconds" INT DEFAULT NULL +) RETURNS TEXT +LANGUAGE c +AS 'MODULE_PATHNAME', 'await_instance_wrapper'; + +CREATE FUNCTION df."call_child"( + "fut" TEXT, + "label" TEXT DEFAULT NULL, + "options" JSONB DEFAULT NULL +) RETURNS TEXT +LANGUAGE c +AS 'MODULE_PATHNAME', 'call_child_wrapper'; + +CREATE OR REPLACE FUNCTION df.ensure_durofut(val text) RETURNS text AS $$ +DECLARE + node_type_val text; +BEGIN + BEGIN + node_type_val := (val::jsonb)->>'node_type'; + IF node_type_val IS NOT NULL THEN + IF node_type_val NOT IN ('SQL', 'THEN', 'IF', 'JOIN', 'LOOP', 'BREAK', 'RACE', 'SLEEP', 'WAIT_SCHEDULE', 'HTTP', 'SIGNAL', 'AWAIT_INSTANCE') THEN + RAISE EXCEPTION 'Unknown node_type ''%''. Valid types: SQL, THEN, IF, JOIN, LOOP, BREAK, RACE, SLEEP, WAIT_SCHEDULE, HTTP, SIGNAL, AWAIT_INSTANCE', node_type_val; + END IF; + RETURN val; + END IF; + EXCEPTION WHEN invalid_text_representation THEN + NULL; + WHEN raise_exception THEN + RAISE; + WHEN OTHERS THEN + NULL; + END; + + RETURN df.sql(val); +END; +$$ LANGUAGE plpgsql IMMUTABLE SET search_path = pg_catalog, df, pg_temp; + +CREATE OR REPLACE FUNCTION df.grant_usage( + p_role TEXT, + include_http boolean DEFAULT false, + with_grant boolean DEFAULT false +) +RETURNS VOID +LANGUAGE plpgsql +SET search_path = pg_catalog, df, pg_temp +AS $fn$ +DECLARE + grant_opt TEXT := ''; + func_sig TEXT; + func_sigs TEXT[] := ARRAY[ + 'df.sql(text)', + 'df.seq(text, text)', + 'df.as(text, text)', + 'df.sleep(bigint)', + 'df.wait_for_schedule(text)', + 'df.loop(text, text)', + 'df.break(text)', + 'df.if(text, text, text)', + 'df.if_rows(text, text, text)', + 'df.join(text, text)', + 'df.join3(text, text, text)', + 'df.race(text, text)', + 'df.wait_for_signal(text, integer)', + 'df.await_instance(text, integer)', + 'df.call_child(text, text, jsonb)', + 'df.signal(text, text, text)', + 'df.start(text, text, text)', + 'df.setvar(text, text)', + 'df.getvar(text)', + 'df.unsetvar(text)', + 'df.clearvars()', + 'df.status(text)', + 'df.result(text)', + 'df.cancel(text, text)', + 'df.wait_for_completion(text, integer)', + 'df.run(text)', + 'df.list_instances(text, integer)', + 'df.instance_info(text)', + 'df.instance_nodes(text, integer)', + 'df.instance_executions(text, integer)', + 'df.metrics()', + 'df.as_op(text, text)', + 'df.if_then_op(text, text)', + 'df.if_else_op(text, text)', + 'df.ensure_durofut(text)', + 'df.loop_prefix_op(text)', + 'df.version()', + 'df.debug_connection()', + 'df.explain(text)', + 'df.target_database()' + ]; +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = p_role) THEN + RAISE EXCEPTION 'role "%" does not exist', p_role; + END IF; + + IF with_grant THEN + grant_opt := ' WITH GRANT OPTION'; + END IF; + + EXECUTE format('GRANT USAGE ON SCHEMA df TO %I', p_role) || grant_opt; + + FOREACH func_sig IN ARRAY func_sigs LOOP + EXECUTE format('GRANT EXECUTE ON FUNCTION %s TO %I', func_sig, p_role) || grant_opt; + END LOOP; + + IF include_http THEN + EXECUTE format('GRANT EXECUTE ON FUNCTION df.http(text, text, text, jsonb, integer) TO %I', p_role) || grant_opt; + END IF; + + IF with_grant THEN + EXECUTE format('GRANT EXECUTE ON FUNCTION df.grant_usage(text, boolean, boolean) TO %I', p_role) || grant_opt; + EXECUTE format('GRANT EXECUTE ON FUNCTION df.revoke_usage(text) TO %I', p_role) || grant_opt; + END IF; + + EXECUTE format('GRANT SELECT ON df.instances TO %I', p_role) || grant_opt; + EXECUTE format('GRANT UPDATE (status, updated_at) ON df.instances TO %I', p_role) || grant_opt; + EXECUTE format('GRANT SELECT ON df.nodes TO %I', p_role) || grant_opt; + EXECUTE format('GRANT INSERT (id, label, root_node, submitted_by, database) ON df.instances TO %I', p_role) || grant_opt; + EXECUTE format('GRANT INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes TO %I', p_role) || grant_opt; + EXECUTE format('GRANT SELECT, INSERT, UPDATE, DELETE ON df.vars TO %I', p_role) || grant_opt; + + RAISE NOTICE 'pg_durable: granted df usage privileges to "%"', p_role; +END; +$fn$; diff --git a/src/activities/get_instance_state.rs b/src/activities/get_instance_state.rs new file mode 100644 index 00000000..075caea4 --- /dev/null +++ b/src/activities/get_instance_state.rs @@ -0,0 +1,40 @@ +//! GetInstanceState activity - reads status/result for a durable instance + +use duroxide::ActivityContext; +use sqlx::{PgPool, Row}; +use std::sync::Arc; + +/// Activity name for registration and scheduling +pub const NAME: &str = "pg_durable::activity::get-instance-state"; + +/// Read the current state of an instance from df.instances/df.nodes. +pub async fn execute( + ctx: ActivityContext, + pool: Arc, + instance_id: String, +) -> Result { + ctx.trace_info(format!("Reading state for instance {instance_id}")); + + let row = sqlx::query( + r#"SELECT i.status, n.result::text AS result + FROM df.instances i + LEFT JOIN df.nodes n ON n.id = i.root_node + WHERE i.id = $1"#, + ) + .bind(&instance_id) + .fetch_optional(pool.as_ref()) + .await + .map_err(|e| format!("Failed to read instance state for {instance_id}: {e}"))?; + + let Some(row) = row else { + return Err(format!("Instance not found: {instance_id}")); + }; + + let payload = serde_json::json!({ + "instance_id": instance_id, + "status": row.get::("status"), + "result": row.get::, _>("result"), + }); + + Ok(payload.to_string()) +} diff --git a/src/activities/mod.rs b/src/activities/mod.rs index ae1a7db0..07bfced0 100644 --- a/src/activities/mod.rs +++ b/src/activities/mod.rs @@ -5,6 +5,7 @@ pub mod execute_http; pub mod execute_sql; +pub mod get_instance_state; pub mod load_function_graph; pub mod update_instance_status; pub mod update_node_status; diff --git a/src/dsl.rs b/src/dsl.rs index e6179887..1d155718 100644 --- a/src/dsl.rs +++ b/src/dsl.rs @@ -25,6 +25,32 @@ fn is_in_workflow_context() -> bool { result.as_deref() == Some("true") } +fn sql_text_literal(value: &str) -> String { + format!("'{}'", value.replace('\'', "''")) +} + +fn hex_encode(value: &str) -> String { + let mut out = String::with_capacity(value.len() * 2); + for byte in value.as_bytes() { + use std::fmt::Write as _; + let _ = write!(&mut out, "{byte:02x}"); + } + out +} + +fn stable_internal_result_name(parts: &[&str]) -> String { + let mut hash = 14695981039346656037u64; + for part in parts { + for byte in part.as_bytes() { + hash ^= u64::from(*byte); + hash = hash.wrapping_mul(1099511628211); + } + hash ^= 0xff; + hash = hash.wrapping_mul(1099511628211); + } + format!("__df_call_child_{hash:016x}") +} + // ============================================================================ // Version & Debug Functions // ============================================================================ @@ -567,6 +593,135 @@ pub fn wait_for_signal(name: &str, timeout_seconds: default!(Option, "NULL" .to_json() } +/// Durably waits for another pg_durable instance to reach a terminal state. +/// +/// The returned JSON envelope contains the child instance_id, terminal status, +/// and completed result. Failed/cancelled children raise an error in the parent. +#[pg_extern(schema = "df")] +pub fn await_instance(instance_id: &str, timeout_seconds: default!(Option, "NULL")) -> String { + if instance_id.is_empty() { + pgrx::error!("Instance ID cannot be empty"); + } + + if let Some(timeout) = timeout_seconds { + if timeout <= 0 { + pgrx::error!("Timeout must be positive"); + } + } + + let config = serde_json::json!({ + "instance_id": instance_id, + "timeout_seconds": timeout_seconds + }); + + Durofut { + node_type: "AWAIT_INSTANCE".to_string(), + query: Some(config.to_string()), + ..Default::default() + } + .to_json() +} + +/// Starts a child workflow and durably waits for its terminal result. +/// +/// Supported options today: +/// - timeout_seconds: integer timeout for the wait phase +/// - database: target database for the child df.start() call +/// - on_failure: only "raise" is currently supported +#[pg_extern(schema = "df")] +pub fn call_child( + fut: &str, + label: default!(Option<&str>, "NULL"), + options: default!(Option, "NULL"), +) -> String { + let child_fut = match Durofut::ensure_strict(fut) { + Ok(d) => d, + Err(e) => pgrx::error!("Invalid child durable function: {}", e), + }; + if let Err(e) = child_fut.validate_recursive() { + pgrx::error!("Invalid child durable function graph: {}", e); + } + + let mut timeout_seconds: Option = None; + let mut database: Option<&str> = None; + + if let Some(opts) = options.as_ref().map(|jsonb| &jsonb.0) { + let obj = opts + .as_object() + .unwrap_or_else(|| pgrx::error!("df.call_child options must be a JSON object")); + + for key in obj.keys() { + match key.as_str() { + "timeout_seconds" | "database" | "on_failure" => {} + other => { + pgrx::error!( + "df.call_child: unsupported option '{}'. Supported options: timeout_seconds, database, on_failure", + other + ); + } + } + } + + if let Some(timeout_value) = obj.get("timeout_seconds") { + let timeout = timeout_value.as_i64().unwrap_or_else(|| { + pgrx::error!("df.call_child timeout_seconds must be an integer") + }); + if timeout <= 0 || timeout > i64::from(i32::MAX) { + pgrx::error!( + "df.call_child timeout_seconds must be between 1 and {}", + i32::MAX + ); + } + timeout_seconds = Some(timeout as i32); + } + + if let Some(database_value) = obj.get("database") { + database = Some( + database_value + .as_str() + .unwrap_or_else(|| pgrx::error!("df.call_child database must be a string")), + ); + } + + if let Some(on_failure_value) = obj.get("on_failure") { + let on_failure = on_failure_value + .as_str() + .unwrap_or_else(|| pgrx::error!("df.call_child on_failure must be a string")); + if on_failure != "raise" { + pgrx::error!("df.call_child only supports on_failure = 'raise' in this release"); + } + } + } + + let child_graph_json = child_fut.to_json(); + let internal_result_name = stable_internal_result_name(&[ + &child_graph_json, + label.unwrap_or(""), + database.unwrap_or(""), + ]); + + let label_arg = label + .map(sql_text_literal) + .unwrap_or_else(|| "NULL".to_string()); + let database_arg = database + .map(sql_text_literal) + .unwrap_or_else(|| "NULL".to_string()); + let start_query = format!( + "SELECT df.start(convert_from(decode({}, 'hex'), 'UTF8'), {}, {}) AS instance_id", + sql_text_literal(&hex_encode(&child_graph_json)), + label_arg, + database_arg + ); + + let start_node = as_named(&sql(&start_query), &internal_result_name); + let wait_node = await_instance( + &format!("${internal_result_name}.instance_id"), + timeout_seconds, + ); + + then_fn(&start_node, &wait_node) +} + /// Send a signal to a running durable function instance. /// /// # Arguments diff --git a/src/lib.rs b/src/lib.rs index 85b2e280..95511261 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -379,6 +379,8 @@ DECLARE 'df.join3(text, text, text)', 'df.race(text, text)', 'df.wait_for_signal(text, integer)', + 'df.await_instance(text, integer)', + 'df.call_child(text, text, jsonb)', 'df.signal(text, text, text)', 'df.start(text, text, text)', 'df.setvar(text, text)', @@ -721,8 +723,8 @@ BEGIN node_type_val := (val::jsonb)->>'node_type'; IF node_type_val IS NOT NULL THEN -- Has a node_type - validate it - IF node_type_val NOT IN ('SQL', 'THEN', 'IF', 'JOIN', 'LOOP', 'BREAK', 'RACE', 'SLEEP', 'WAIT_SCHEDULE', 'HTTP', 'SIGNAL') THEN - RAISE EXCEPTION 'Unknown node_type ''%''. Valid types: SQL, THEN, IF, JOIN, LOOP, BREAK, RACE, SLEEP, WAIT_SCHEDULE, HTTP, SIGNAL', node_type_val; + IF node_type_val NOT IN ('SQL', 'THEN', 'IF', 'JOIN', 'LOOP', 'BREAK', 'RACE', 'SLEEP', 'WAIT_SCHEDULE', 'HTTP', 'SIGNAL', 'AWAIT_INSTANCE') THEN + RAISE EXCEPTION 'Unknown node_type ''%''. Valid types: SQL, THEN, IF, JOIN, LOOP, BREAK, RACE, SLEEP, WAIT_SCHEDULE, HTTP, SIGNAL, AWAIT_INSTANCE', node_type_val; END IF; RETURN val; END IF; @@ -988,6 +990,71 @@ mod tests { assert_eq!(fut.node_type, "WAIT_SCHEDULE"); } + #[pg_test] + fn test_await_instance_creates_valid_node() { + let json = crate::dsl::await_instance("abcd1234", Some(30)); + let fut = Durofut::from_json(&json); + assert_eq!(fut.node_type, "AWAIT_INSTANCE"); + + let config: serde_json::Value = serde_json::from_str(fut.query.as_ref().unwrap()).unwrap(); + assert_eq!(config["instance_id"], "abcd1234"); + assert_eq!(config["timeout_seconds"], 30); + } + + #[pg_test] + fn test_call_child_builds_start_then_await_graph() { + let child_graph = crate::dsl::then_fn( + &crate::dsl::as_named(&crate::dsl::sql("SELECT 42 AS value"), "child_local"), + "SELECT $child_local.value", + ); + let json = crate::dsl::call_child( + &child_graph, + Some("child-label"), + Some(pgrx::JsonB(serde_json::json!({"timeout_seconds": 15}))), + ); + let json_again = crate::dsl::call_child( + &child_graph, + Some("child-label"), + Some(pgrx::JsonB(serde_json::json!({"timeout_seconds": 15}))), + ); + assert_eq!( + json, json_again, + "call_child output should be deterministic" + ); + + let fut = Durofut::from_json(&json); + assert_eq!(fut.node_type, "THEN"); + + let start_node = fut.left_node.as_ref().expect("start node missing"); + assert_eq!(start_node.node_type, "SQL"); + let internal_name = start_node + .result_name + .as_ref() + .expect("internal name missing"); + assert!(internal_name.starts_with("__df_call_child_")); + assert!(start_node + .query + .as_ref() + .expect("start query missing") + .contains("decode(")); + assert!( + !start_node + .query + .as_ref() + .expect("start query missing") + .contains("$child_local"), + "child graph placeholders should be encoded before entering the parent SQL node" + ); + + let await_node = fut.right_node.as_ref().expect("await node missing"); + assert_eq!(await_node.node_type, "AWAIT_INSTANCE"); + assert!(await_node + .query + .as_ref() + .expect("await config missing") + .contains(&format!("${internal_name}.instance_id"))); + } + #[pg_test] fn test_loop_creates_loop_node() { let body = crate::dsl::sql("SELECT 1"); diff --git a/src/orchestrations/execute_function_graph.rs b/src/orchestrations/execute_function_graph.rs index e295c353..ba90af83 100644 --- a/src/orchestrations/execute_function_graph.rs +++ b/src/orchestrations/execute_function_graph.rs @@ -37,6 +37,13 @@ struct SubtreeEnvelope { results: HashMap, } +#[derive(serde::Serialize, serde::Deserialize)] +struct AwaitedInstanceState { + instance_id: String, + status: String, + result: Option, +} + /// Execute a complete function graph pub async fn execute(ctx: OrchestrationContext, input_json: String) -> Result { let input: FunctionInput = serde_json::from_str(&input_json) @@ -289,6 +296,9 @@ async fn execute_node_inner( "race" => execute_race_node(ctx, graph, node, node_id, results, exec_ctx).await, "http" => execute_http_node(ctx, node, node_id, results, exec_ctx, &sys_vars).await, "signal" => execute_signal_node(ctx, node, node_id, results).await, + "await_instance" => { + execute_await_instance_node(ctx, node, node_id, results, exec_ctx, &sys_vars).await + } "break" => execute_break_node(ctx, node, node_id).await, other => Err(format!("Unknown node type: {other}")), } @@ -1011,3 +1021,87 @@ async fn execute_signal_node( Ok(result_str) } + +fn json_text_to_value(raw: &str) -> serde_json::Value { + serde_json::from_str(raw).unwrap_or_else(|_| serde_json::Value::String(raw.to_string())) +} + +async fn execute_await_instance_node( + ctx: &OrchestrationContext, + node: &FunctionNode, + node_id: &str, + results: &mut HashMap, + exec_ctx: &ExecutionContext, + sys_vars: &SystemVars, +) -> Result { + const POLL_INTERVAL: Duration = Duration::from_secs(1); + + let config_str = node + .query + .as_ref() + .ok_or_else(|| format!("AWAIT_INSTANCE node {node_id} has no config"))?; + let config: serde_json::Value = serde_json::from_str(config_str) + .map_err(|e| format!("Invalid AWAIT_INSTANCE config: {e}"))?; + + let instance_template = config["instance_id"] + .as_str() + .ok_or_else(|| "AWAIT_INSTANCE missing instance_id".to_string())?; + let instance_id = substitute_all_raw(instance_template, results, &exec_ctx.vars, sys_vars)?; + if instance_id.is_empty() { + return Err("AWAIT_INSTANCE resolved to an empty instance_id".to_string()); + } + + let timeout_seconds = config["timeout_seconds"].as_u64(); + let mut waited_seconds = 0u64; + + loop { + let state_json = ctx + .schedule_activity(activities::get_instance_state::NAME, instance_id.clone()) + .await?; + let state: AwaitedInstanceState = serde_json::from_str(&state_json) + .map_err(|e| format!("Failed to parse awaited instance state: {e}"))?; + + match state.status.to_lowercase().as_str() { + "completed" => { + let envelope = serde_json::json!({ + "instance_id": state.instance_id, + "status": "completed", + "result": state + .result + .as_deref() + .map(json_text_to_value) + .unwrap_or(serde_json::Value::Null) + }); + let result_str = envelope.to_string(); + if let Some(name) = &node.result_name { + results.insert(name.clone(), result_str.clone()); + } + return Ok(result_str); + } + "failed" | "cancelled" => { + let detail = state + .result + .as_deref() + .map(|raw| json_text_to_value(raw).to_string()) + .unwrap_or_else(|| "null".to_string()); + return Err(format!( + "Awaited instance {} ended with status {}: {}", + state.instance_id, state.status, detail + )); + } + _ => {} + } + + if let Some(timeout) = timeout_seconds { + if waited_seconds >= timeout { + return Err(format!( + "Timed out after {}s waiting for instance {}", + timeout, instance_id + )); + } + } + + ctx.schedule_timer(POLL_INTERVAL).await; + waited_seconds += POLL_INTERVAL.as_secs(); + } +} diff --git a/src/registry.rs b/src/registry.rs index bb0a57ee..6d48b07d 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -16,6 +16,7 @@ pub fn create_activity_registry(pool: Arc, semaphore: Arc) -> let status_pool = pool.clone(); let node_status_pool = pool.clone(); let http_pool = pool.clone(); + let instance_state_pool = pool.clone(); ActivityRegistry::builder() .register(activities::execute_sql::NAME, move |ctx: ActivityContext, input_json: String| { @@ -38,6 +39,10 @@ pub fn create_activity_registry(pool: Arc, semaphore: Arc) -> let pool = http_pool.clone(); async move { activities::execute_http::execute(ctx, pool, config_json).await } }) + .register(activities::get_instance_state::NAME, move |ctx: ActivityContext, instance_id: String| { + let pool = instance_state_pool.clone(); + async move { activities::get_instance_state::execute(ctx, pool, instance_id).await } + }) .build() } diff --git a/src/types.rs b/src/types.rs index 43a762db..f90f5378 100644 --- a/src/types.rs +++ b/src/types.rs @@ -807,6 +807,7 @@ pub const VALID_NODE_TYPES: &[&str] = &[ "WAIT_SCHEDULE", "HTTP", "SIGNAL", + "AWAIT_INSTANCE", ]; /// The Durofut type represents a "durable future" - a reference to a node in the function graph. diff --git a/tests/e2e/sql/49_child_orchestration.sql b/tests/e2e/sql/49_child_orchestration.sql new file mode 100644 index 00000000..31b38888 --- /dev/null +++ b/tests/e2e/sql/49_child_orchestration.sql @@ -0,0 +1,124 @@ +-- Tests: df.call_child convenience wrapper and df.await_instance durable wait +SET SESSION AUTHORIZATION df_e2e_user; + +DROP TABLE IF EXISTS test_child_orchestration_log; +CREATE TABLE test_child_orchestration_log ( + id SERIAL PRIMARY KEY, + msg TEXT NOT NULL, + data JSONB NOT NULL +); + +-- === Test 1: df.call_child starts a child instance and waits for completion === +CREATE TEMP TABLE _test_call_child_parent (instance_id TEXT); + +INSERT INTO _test_call_child_parent +SELECT df.start( + df.call_child( + 'SELECT json_build_object(''value'', 42, ''kind'', ''child'')', + 'call-child-child', + '{"timeout_seconds": 30}'::jsonb + ) |=> 'child' + ~> 'INSERT INTO test_child_orchestration_log (msg, data) VALUES (''call_child'', $child::jsonb)', + 'call-child-parent' +); + +DO $$ +DECLARE + parent_id TEXT; + parent_status TEXT; + child_instance_id TEXT; +BEGIN + SELECT instance_id INTO parent_id FROM _test_call_child_parent; + SELECT df.wait_for_completion(parent_id, 30) INTO parent_status; + + IF parent_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED: call_child parent status = %', parent_status; + END IF; + + SELECT data->>'instance_id' + INTO child_instance_id + FROM test_child_orchestration_log + WHERE msg = 'call_child' + ORDER BY id DESC + LIMIT 1; + + IF child_instance_id IS NULL OR child_instance_id = parent_id THEN + RAISE EXCEPTION 'TEST FAILED: call_child did not record a distinct child instance_id'; + END IF; + + IF NOT EXISTS ( + SELECT 1 + FROM test_child_orchestration_log + WHERE msg = 'call_child' + AND data->>'status' = 'completed' + AND data->'result'->'rows'->0->'json_build_object'->>'value' = '42' + AND data->'result'->'rows'->0->'json_build_object'->>'kind' = 'child' + ) THEN + RAISE EXCEPTION 'TEST FAILED: call_child result envelope missing expected child output'; + END IF; + + IF NOT EXISTS ( + SELECT 1 + FROM df.instances + WHERE id = child_instance_id + AND label = 'call-child-child' + AND status = 'completed' + ) THEN + RAISE EXCEPTION 'TEST FAILED: child instance metadata missing or not completed'; + END IF; + + RAISE NOTICE 'TEST PASSED: call_child'; +END $$; + +DROP TABLE _test_call_child_parent; +DELETE FROM test_child_orchestration_log; + +-- === Test 2: df.await_instance durably waits on an existing instance === +CREATE TEMP TABLE _test_direct_child AS +SELECT df.start( + 'SELECT json_build_object(''source'', ''direct-child'', ''ok'', true)', + 'direct-await-child' +) AS child_instance_id; + +CREATE TEMP TABLE _test_direct_parent AS +SELECT df.start( + df.await_instance((SELECT child_instance_id FROM _test_direct_child), 30) |=> 'awaited' + ~> 'INSERT INTO test_child_orchestration_log (msg, data) VALUES (''await_instance'', $awaited::jsonb)', + 'direct-await-parent' +) AS parent_instance_id; + +DO $$ +DECLARE + parent_id TEXT; + child_id TEXT; + parent_status TEXT; +BEGIN + SELECT parent_instance_id INTO parent_id FROM _test_direct_parent; + SELECT child_instance_id INTO child_id FROM _test_direct_child; + SELECT df.wait_for_completion(parent_id, 30) INTO parent_status; + + IF parent_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED: await_instance parent status = %', parent_status; + END IF; + + IF NOT EXISTS ( + SELECT 1 + FROM test_child_orchestration_log + WHERE msg = 'await_instance' + AND data->>'instance_id' = child_id + AND data->>'status' = 'completed' + AND data->'result'->'rows'->0->'json_build_object'->>'source' = 'direct-child' + AND (data->'result'->'rows'->0->'json_build_object'->>'ok')::boolean = true + ) THEN + RAISE EXCEPTION 'TEST FAILED: await_instance result envelope missing expected child output'; + END IF; + + RAISE NOTICE 'TEST PASSED: await_instance'; +END $$; + +DROP TABLE _test_direct_parent; +DROP TABLE _test_direct_child; +DROP TABLE test_child_orchestration_log; + +RESET SESSION AUTHORIZATION; +SELECT 'TEST PASSED' AS result;