Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ df.sql('SELECT 1') ~> df.sql('SELECT 2')
| `df.clearvars()` | Clear all durable function variables | `df.clearvars()` |
| `df.wait_for_signal(name)` | Wait for external signal | `df.wait_for_signal('approval')` |
| `df.wait_for_signal(name, timeout)` | Wait with timeout (seconds) | `df.wait_for_signal('approval', 3600)` |
| `df.await_instance(id, timeout)` | Durably wait for another instance | `df.await_instance('a1b2c3d4', 300)` |
| `df.call_child(func, label, options)` | Start a child workflow and wait for it | `df.call_child('SELECT 1', 'child-job', '{"timeout_seconds":300}')` |
| `df.signal(id, name, data)` | Send signal to instance | `df.signal('a1b2', 'go', '{}')` |

### Operators
Expand Down Expand Up @@ -1153,6 +1155,22 @@ SELECT df.start(
-- (e.g., via a webhook endpoint that calls df.signal)
```

### Example: Parent waits for child workflow

```sql
SELECT df.start(
df.call_child(
'SELECT json_build_object(''report_id'', 42, ''status'', ''ready'')',
'generate-report',
'{"timeout_seconds": 300}'
) |=> 'child'
~> 'INSERT INTO audit_log(payload) VALUES ($child::jsonb)',
'parent-workflow'
);
```

`df.call_child(...)` returns a JSON envelope with the child `instance_id`, final `status`, and child `result`. To wait on an already-started instance instead, use `df.await_instance(...)`.

---

## Multi-Database Support
Expand Down
77 changes: 77 additions & 0 deletions docs/child-orchestration-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Child orchestration primitives (`df.call_child` / `df.await_instance`)

## Summary

pg_durable now exposes two graph-composable primitives for parent-waits-for-child flows:

- `df.await_instance(instance_id text, timeout_seconds int default null)`
- `df.call_child(fut text, label text default null, options jsonb default null)`

Both return Durofut JSON, so they compose naturally inside `df.seq`, `df.join`, `df.race`, and operator-based graphs.

## API shape

```sql
-- Wait for an already-started instance to reach a terminal state.
df.await_instance(instance_id text, timeout_seconds int default null) returns text

-- Start a child workflow and then durably wait for it.
df.call_child(
fut text,
label text default null,
options jsonb default null
) returns text
```

### Supported `df.call_child` options

- `timeout_seconds` — timeout passed to the wait phase
- `database` — database argument forwarded to the child `df.start(...)`
- `on_failure` — currently only `"raise"` is supported

## Semantics in v0.2.1

### Result shape

On success, both primitives resolve to a JSON envelope:

```json
{
"instance_id": "<child instance id>",
"status": "completed",
"result": <child output>
}
```

If the child output is valid JSON, it is embedded as JSON. Otherwise it is returned as a JSON string.

### Failure semantics

- `completed` child → success envelope above
- `failed` / `cancelled` child → raises in the parent
- timeout → raises in the parent

This settles the default behavior as **raise on non-success**.

### Cancellation propagation

Parent cancellation does **not** automatically cancel children started via `df.call_child` in this release. Children are regular durable instances started through `df.start(...)`, so they continue independently unless cancelled separately.

### Identity exposure

The child `instance_id` is included in the success envelope so callers can inspect child state through existing monitoring APIs.

### Variable and label inheritance

- Labels do not inherit automatically; `df.call_child(..., label => ...)` sets the child label explicitly.
- `df.vars` inheritance uses the existing `df.start(...)` behavior because `df.call_child` starts the child through `df.start(...)`. Variables visible to the running child are whatever `df.start(...)` captures in that child-starting SQL step.

## Implementation notes

- `df.await_instance` is implemented as a dedicated `AWAIT_INSTANCE` node type.
- The orchestration polls child status durably through an activity plus a durable timer, so the parent suspends without holding a backend session.
- `df.call_child` is a convenience wrapper that expands to:
1. a SQL node that calls `df.start(...)` for the child and stores the returned child `instance_id`
2. an `AWAIT_INSTANCE` node that waits on that stored `instance_id`

This keeps the implementation small while still giving users a first-class, graph-composable child orchestration primitive.
6 changes: 6 additions & 0 deletions docs/upgrade-testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ what the upgrade script handles, and any backward compatibility considerations.
- **Scenario B1 considerations:** No backward compatibility concern. `df.if_rows` is a new function that doesn't exist in v0.1.1 schemas — it simply won't be callable until the customer runs `ALTER EXTENSION UPDATE`. The `.so` symbol exists but is never invoked from old schemas. All other changes (substitution engine rewrite, `Result` return type) are internal to orchestration code and don't touch any SQL queries or table schemas.
- **Scenario B2 considerations:** No data migration needed. The change is purely additive (new function) with no table or column changes.

#### Child orchestration helpers — df.await_instance / df.call_child
- **DDL change:** Upgrade script adds `df.await_instance(text, integer)` and `df.call_child(text, text, jsonb)` and refreshes helper SQL definitions (`df.ensure_durofut`, `df.grant_usage`) so upgraded installations recognize the new `AWAIT_INSTANCE` node type and can grant the new functions.
- **Scenario A considerations:** Fresh install picks up the new C-language functions from pgrx-generated SQL; the upgrade script must add matching `CREATE FUNCTION` entries and keep helper SQL bodies in sync.
- **Scenario B1 considerations:** No binary-compatibility risk for pre-upgrade schemas. These are additive SQL entrypoints: older schemas simply cannot call them until `ALTER EXTENSION UPDATE`, while existing SQL queries and table shapes remain unchanged.
- **Scenario B2 considerations:** No table/data migration is required. Existing instances continue to work, and upgraded roles should re-run `df.grant_usage(...)` if they rely on the helper to grant access to newly added functions.

#### Connection Limits — GUC-controlled pool sizing and backpressure
- **DDL change:** None. All changes are runtime-only (pool consolidation, semaphore backpressure, new GUCs).
- **Scenario A considerations:** No schema changes — the `df` schema equivalence contract is unchanged.
Expand Down
133 changes: 130 additions & 3 deletions sql/pg_durable--0.2.0--0.2.1.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,132 @@
-- pg_durable upgrade: 0.2.0 → 0.2.1
--
-- No schema changes in this release.
-- This upgrade removes the dependency on the `ring` crate (switched to
-- native-tls), includes security hardening fixes, and other bug fixes.
-- Add durable child-orchestration helpers and refresh helper/grant functions
-- so upgraded installations can use the new API surface immediately.

CREATE FUNCTION df."await_instance"(
"instance_id" TEXT,
"timeout_seconds" INT DEFAULT NULL
) RETURNS TEXT
LANGUAGE c
AS 'MODULE_PATHNAME', 'await_instance_wrapper';

CREATE FUNCTION df."call_child"(
"fut" TEXT,
"label" TEXT DEFAULT NULL,
"options" JSONB DEFAULT NULL
) RETURNS TEXT
LANGUAGE c
AS 'MODULE_PATHNAME', 'call_child_wrapper';

CREATE OR REPLACE FUNCTION df.ensure_durofut(val text) RETURNS text AS $$
DECLARE
node_type_val text;
BEGIN
BEGIN
node_type_val := (val::jsonb)->>'node_type';
IF node_type_val IS NOT NULL THEN
IF node_type_val NOT IN ('SQL', 'THEN', 'IF', 'JOIN', 'LOOP', 'BREAK', 'RACE', 'SLEEP', 'WAIT_SCHEDULE', 'HTTP', 'SIGNAL', 'AWAIT_INSTANCE') THEN
RAISE EXCEPTION 'Unknown node_type ''%''. Valid types: SQL, THEN, IF, JOIN, LOOP, BREAK, RACE, SLEEP, WAIT_SCHEDULE, HTTP, SIGNAL, AWAIT_INSTANCE', node_type_val;
END IF;
RETURN val;
END IF;
EXCEPTION WHEN invalid_text_representation THEN
NULL;
WHEN raise_exception THEN
RAISE;
WHEN OTHERS THEN
NULL;
END;

RETURN df.sql(val);
END;
$$ LANGUAGE plpgsql IMMUTABLE SET search_path = pg_catalog, df, pg_temp;

CREATE OR REPLACE FUNCTION df.grant_usage(
p_role TEXT,
include_http boolean DEFAULT false,
with_grant boolean DEFAULT false
)
RETURNS VOID
LANGUAGE plpgsql
SET search_path = pg_catalog, df, pg_temp
AS $fn$
DECLARE
grant_opt TEXT := '';
func_sig TEXT;
func_sigs TEXT[] := ARRAY[
'df.sql(text)',
'df.seq(text, text)',
'df.as(text, text)',
'df.sleep(bigint)',
'df.wait_for_schedule(text)',
'df.loop(text, text)',
'df.break(text)',
'df.if(text, text, text)',
'df.if_rows(text, text, text)',
'df.join(text, text)',
'df.join3(text, text, text)',
'df.race(text, text)',
'df.wait_for_signal(text, integer)',
'df.await_instance(text, integer)',
'df.call_child(text, text, jsonb)',
'df.signal(text, text, text)',
'df.start(text, text, text)',
'df.setvar(text, text)',
'df.getvar(text)',
'df.unsetvar(text)',
'df.clearvars()',
'df.status(text)',
'df.result(text)',
'df.cancel(text, text)',
'df.wait_for_completion(text, integer)',
'df.run(text)',
'df.list_instances(text, integer)',
'df.instance_info(text)',
'df.instance_nodes(text, integer)',
'df.instance_executions(text, integer)',
'df.metrics()',
'df.as_op(text, text)',
'df.if_then_op(text, text)',
'df.if_else_op(text, text)',
'df.ensure_durofut(text)',
'df.loop_prefix_op(text)',
'df.version()',
'df.debug_connection()',
'df.explain(text)',
'df.target_database()'
];
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = p_role) THEN
RAISE EXCEPTION 'role "%" does not exist', p_role;
END IF;

IF with_grant THEN
grant_opt := ' WITH GRANT OPTION';
END IF;

EXECUTE format('GRANT USAGE ON SCHEMA df TO %I', p_role) || grant_opt;

FOREACH func_sig IN ARRAY func_sigs LOOP
EXECUTE format('GRANT EXECUTE ON FUNCTION %s TO %I', func_sig, p_role) || grant_opt;
END LOOP;

IF include_http THEN
EXECUTE format('GRANT EXECUTE ON FUNCTION df.http(text, text, text, jsonb, integer) TO %I', p_role) || grant_opt;
END IF;

IF with_grant THEN
EXECUTE format('GRANT EXECUTE ON FUNCTION df.grant_usage(text, boolean, boolean) TO %I', p_role) || grant_opt;
EXECUTE format('GRANT EXECUTE ON FUNCTION df.revoke_usage(text) TO %I', p_role) || grant_opt;
END IF;

EXECUTE format('GRANT SELECT ON df.instances TO %I', p_role) || grant_opt;
EXECUTE format('GRANT UPDATE (status, updated_at) ON df.instances TO %I', p_role) || grant_opt;
EXECUTE format('GRANT SELECT ON df.nodes TO %I', p_role) || grant_opt;
EXECUTE format('GRANT INSERT (id, label, root_node, submitted_by, database) ON df.instances TO %I', p_role) || grant_opt;
EXECUTE format('GRANT INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes TO %I', p_role) || grant_opt;
EXECUTE format('GRANT SELECT, INSERT, UPDATE, DELETE ON df.vars TO %I', p_role) || grant_opt;

RAISE NOTICE 'pg_durable: granted df usage privileges to "%"', p_role;
END;
$fn$;
40 changes: 40 additions & 0 deletions src/activities/get_instance_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//! GetInstanceState activity - reads status/result for a durable instance

use duroxide::ActivityContext;
use sqlx::{PgPool, Row};
use std::sync::Arc;

/// Activity name for registration and scheduling
pub const NAME: &str = "pg_durable::activity::get-instance-state";

/// Read the current state of an instance from df.instances/df.nodes.
pub async fn execute(
ctx: ActivityContext,
pool: Arc<PgPool>,
instance_id: String,
) -> Result<String, String> {
ctx.trace_info(format!("Reading state for instance {instance_id}"));

let row = sqlx::query(
r#"SELECT i.status, n.result::text AS result
FROM df.instances i
LEFT JOIN df.nodes n ON n.id = i.root_node
WHERE i.id = $1"#,
)
.bind(&instance_id)
.fetch_optional(pool.as_ref())
.await
.map_err(|e| format!("Failed to read instance state for {instance_id}: {e}"))?;

let Some(row) = row else {
return Err(format!("Instance not found: {instance_id}"));
};

let payload = serde_json::json!({
"instance_id": instance_id,
"status": row.get::<String, _>("status"),
"result": row.get::<Option<String>, _>("result"),
});

Ok(payload.to_string())
}
1 change: 1 addition & 0 deletions src/activities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

pub mod execute_http;
pub mod execute_sql;
pub mod get_instance_state;
pub mod load_function_graph;
pub mod update_instance_status;
pub mod update_node_status;
Loading
Loading