Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,39 @@ SELECT started_at, last_seen_at,

The background worker updates `last_seen_at` every ~5 seconds as part of its normal operation.

### Data Retention (Automatic Pruning)

To keep `df.instances` and `df.nodes` from growing without bound, the background
worker runs a **best-effort pruning pass roughly once an hour** that deletes old
**terminal** instances (status `completed`, `failed`, or `cancelled`) and their
associated `df.nodes` rows. Running and pending instances are **never** pruned,
regardless of age.

The policy is currently fixed (no configuration GUC):

- **Hard cap — at most 10,000 terminal instances are retained, regardless of
age.** The newest 10,000 terminal instances are kept; any beyond that are
pruned even if they are only minutes old.
- **Retention window — 30 days.** Terminal instances older than 30 days are
pruned even if the table holds fewer than 10,000 of them.

Equivalently, a terminal instance is retained only while it is **both** among the
newest 10,000 terminal instances **and** less than 30 days old; otherwise it is
eligible for pruning.

Notes:

- "Age" is measured from `completed_at` when set (instances that reached
`completed`), otherwise from `created_at`. `updated_at` is intentionally **not**
used, because it is user-writable and would let a low-privilege user influence
pruning.
- Pruning runs in a single transaction with foreign-key constraints deferred:
matching `df.nodes` rows are deleted first, then the instance rows.
- Pruning is best-effort: if a pass fails it is logged and retried on the next
interval; it never stops workflow execution.
- If you need to retain terminal history beyond these limits (e.g. for auditing),
copy the rows you care about into your own table before they age out.

---

## User Isolation & Privileges
Expand Down
198 changes: 198 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,51 @@ mod tests {
})
}

fn sql_literal(s: &str) -> String {
format!("'{}'", s.replace('\'', "''"))
}

fn test_database_connection_string() -> String {
use crate::types::{get_host, get_port};

let database = Spi::get_one::<String>("SELECT pg_catalog.current_database()::text")
.expect("current_database query should succeed")
.expect("current_database should return a value");
// Connect as the current session role rather than the worker role GUC
// (which defaults to "postgres"). The pgrx test cluster's superuser is
// the OS account, so "postgres" may not exist; the current role always
// does and can log in.
let role = Spi::get_one::<String>("SELECT current_user::text")
.expect("current_user query should succeed")
.expect("current_user should return a value");
format!(
"postgres://{}@{}:{}/{}",
role,
get_host(),
get_port(),
database
)
}

async fn delete_prune_test_rows(pool: &sqlx::PgPool, id_list: &str) {
let mut tx = pool.begin().await.expect("begin cleanup transaction");
sqlx::query("SET CONSTRAINTS ALL DEFERRED")
.execute(&mut *tx)
.await
.expect("defer cleanup constraints");
sqlx::query(&format!(
"DELETE FROM df.nodes WHERE instance_id IN ({id_list})"
))
.execute(&mut *tx)
.await
.expect("clean prune test nodes");
sqlx::query(&format!("DELETE FROM df.instances WHERE id IN ({id_list})"))
.execute(&mut *tx)
.await
.expect("clean prune test instances");
tx.commit().await.expect("commit cleanup");
}

// ========================================================================
// Unit Tests - DSL Node Creation
// ========================================================================
Expand Down Expand Up @@ -1398,6 +1443,159 @@ mod tests {
assert!(backend_duroxide_schema().contains("duroxide"));
}

#[pg_test]
fn test_prune_terminal_instances_respects_age_and_keep_count() {
let conn = test_database_connection_string();

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("tokio runtime");

let stats = rt.block_on(async {
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(1)
.connect(&conn)
.await
.expect("connect to test database");

let test_ids = [
"aa261001", "aa261002", "aa261003", "aa261004", "aa261005", "aa261006",
];
let id_list = test_ids
.iter()
.map(|id| sql_literal(id))
.collect::<Vec<_>>()
.join(", ");

delete_prune_test_rows(&pool, &id_list).await;

let mut fixture_tx = pool.begin().await.expect("begin fixture transaction");
sqlx::query("SET CONSTRAINTS ALL DEFERRED")
.execute(&mut *fixture_tx)
.await
.expect("defer fixture constraints");
sqlx::query(
r#"
-- max_keep=2, retention=30d. Terminal rows are ranked newest-first
-- by COALESCE(completed_at, created_at):
-- aa261001 (1d, r1) -> keep (within cap, young)
-- aa261002 (2d, r2) -> keep (within cap, young)
-- aa261003 (3d, r3) -> PRUNE by the hard cap even though it is
-- only 3 days old (rank > max_keep)
-- aa261004 (40d, r4) -> PRUNE (beyond cap and past retention)
-- aa261005 (50d, r5) -> PRUNE (beyond cap and past retention)
-- aa261006 (running) -> keep (non-terminal, never considered)
-- aa261004/aa261005 are 'failed'/'cancelled' (NULL completed_at) with
-- a forged far-future `updated_at`; they must still rank/age by their
-- old `created_at`, proving `updated_at` is not trusted.
WITH fixtures(id, label, root_node, status, age_days, has_completed_at, forge_future_updated_at) AS (
VALUES
('aa261001', 'keep-recent-1', 'bb261001', 'completed', 1, true, false),
('aa261002', 'keep-recent-2', 'bb261002', 'completed', 2, true, false),
('aa261003', 'prune-young-over-cap', 'bb261003', 'completed', 3, true, false),
('aa261004', 'prune-old-failed-forged', 'bb261004', 'failed', 40, false, true),
('aa261005', 'prune-old-cancelled-forged', 'bb261005', 'cancelled', 50, false, true),
('aa261006', 'keep-running', 'bb261006', 'running', 90, false, false)
)
INSERT INTO df.instances
(id, label, root_node, status, submitted_by, created_at, updated_at, completed_at)
SELECT id,
label,
root_node,
status,
current_user::regrole,
pg_catalog.now() - (age_days::int * INTERVAL '1 day'),
CASE WHEN forge_future_updated_at
THEN pg_catalog.now() + INTERVAL '3650 days'
ELSE pg_catalog.now() - (age_days::int * INTERVAL '1 day')
END,
CASE WHEN has_completed_at
THEN pg_catalog.now() - (age_days::int * INTERVAL '1 day')
ELSE NULL
END
FROM fixtures;
"#,
)
.execute(&mut *fixture_tx)
.await
.expect("insert prune instances");

sqlx::query(
r#"
WITH fixtures(id, instance_id, status, age_days) AS (
VALUES
('bb261001', 'aa261001', 'completed', 1),
('bb261002', 'aa261002', 'completed', 2),
('bb261003', 'aa261003', 'completed', 3),
('bb261004', 'aa261004', 'failed', 40),
('bb261005', 'aa261005', 'completed', 50),
('bb261006', 'aa261006', 'running', 90)
)
INSERT INTO df.nodes
(id, instance_id, node_type, query, status, submitted_by, created_at, updated_at)
SELECT id,
instance_id,
'SQL',
'SELECT 1',
status,
current_user::regrole,
pg_catalog.now() - (age_days::int * INTERVAL '1 day'),
pg_catalog.now() - (age_days::int * INTERVAL '1 day')
FROM fixtures;
"#,
)
.execute(&mut *fixture_tx)
.await
.expect("insert prune nodes");

let stats =
crate::worker::prune_terminal_instances_transaction(&mut fixture_tx, 30, 2)
.await
.expect("prune terminal instances");

let remaining_instances: i64 = sqlx::query_scalar(&format!(
"SELECT pg_catalog.count(*)::bigint FROM df.instances WHERE id IN ({id_list})"
))
.fetch_one(&mut *fixture_tx)
.await
.expect("count remaining instances");
assert_eq!(remaining_instances, 3);

let remaining_nodes: i64 = sqlx::query_scalar(&format!(
"SELECT pg_catalog.count(*)::bigint FROM df.nodes WHERE instance_id IN ({id_list})"
))
.fetch_one(&mut *fixture_tx)
.await
.expect("count remaining nodes");
assert_eq!(remaining_nodes, 3);

let remaining_ids: Vec<String> = sqlx::query_scalar(&format!(
"SELECT id FROM df.instances WHERE id IN ({id_list}) ORDER BY id"
))
.fetch_all(&mut *fixture_tx)
.await
.expect("load remaining ids");
assert_eq!(remaining_ids, vec!["aa261001", "aa261002", "aa261006"]);

fixture_tx
.rollback()
.await
.expect("rollback prune test fixture");

pool.close().await;
stats
});

assert_eq!(
stats,
crate::worker::PruneStats {
instances_deleted: 3,
nodes_deleted: 3,
}
);
}

// ========================================================================
// Unit Tests - Workflow Variables
// ========================================================================
Expand Down
Loading
Loading