diff --git a/USER_GUIDE.md b/USER_GUIDE.md index fce8fad..59c1f35 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -1520,6 +1520,39 @@ SELECT started_at, last_seen_at, The background worker updates `last_seen_at` every ~5 seconds as part of its normal operation. +### Data Retention (Automatic Pruning) + +To keep `df.instances` and `df.nodes` from growing without bound, the background +worker runs a **best-effort pruning pass roughly once an hour** that deletes old +**terminal** instances (status `completed`, `failed`, or `cancelled`) and their +associated `df.nodes` rows. Running and pending instances are **never** pruned, +regardless of age. + +The policy is currently fixed (no configuration GUC): + +- **Hard cap — at most 10,000 terminal instances are retained, regardless of + age.** The newest 10,000 terminal instances are kept; any beyond that are + pruned even if they are only minutes old. +- **Retention window — 30 days.** Terminal instances older than 30 days are + pruned even if the table holds fewer than 10,000 of them. + +Equivalently, a terminal instance is retained only while it is **both** among the +newest 10,000 terminal instances **and** less than 30 days old; otherwise it is +eligible for pruning. + +Notes: + +- "Age" is measured from `completed_at` when set (instances that reached + `completed`), otherwise from `created_at`. `updated_at` is intentionally **not** + used, because it is user-writable and would let a low-privilege user influence + pruning. +- Pruning runs in a single transaction with foreign-key constraints deferred: + matching `df.nodes` rows are deleted first, then the instance rows. +- Pruning is best-effort: if a pass fails it is logged and retried on the next + interval; it never stops workflow execution. +- If you need to retain terminal history beyond these limits (e.g. for auditing), + copy the rows you care about into your own table before they age out. + --- ## User Isolation & Privileges diff --git a/src/lib.rs b/src/lib.rs index 60fce55..5f8a200 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -858,6 +858,51 @@ mod tests { }) } + fn sql_literal(s: &str) -> String { + format!("'{}'", s.replace('\'', "''")) + } + + fn test_database_connection_string() -> String { + use crate::types::{get_host, get_port}; + + let database = Spi::get_one::("SELECT pg_catalog.current_database()::text") + .expect("current_database query should succeed") + .expect("current_database should return a value"); + // Connect as the current session role rather than the worker role GUC + // (which defaults to "postgres"). The pgrx test cluster's superuser is + // the OS account, so "postgres" may not exist; the current role always + // does and can log in. + let role = Spi::get_one::("SELECT current_user::text") + .expect("current_user query should succeed") + .expect("current_user should return a value"); + format!( + "postgres://{}@{}:{}/{}", + role, + get_host(), + get_port(), + database + ) + } + + async fn delete_prune_test_rows(pool: &sqlx::PgPool, id_list: &str) { + let mut tx = pool.begin().await.expect("begin cleanup transaction"); + sqlx::query("SET CONSTRAINTS ALL DEFERRED") + .execute(&mut *tx) + .await + .expect("defer cleanup constraints"); + sqlx::query(&format!( + "DELETE FROM df.nodes WHERE instance_id IN ({id_list})" + )) + .execute(&mut *tx) + .await + .expect("clean prune test nodes"); + sqlx::query(&format!("DELETE FROM df.instances WHERE id IN ({id_list})")) + .execute(&mut *tx) + .await + .expect("clean prune test instances"); + tx.commit().await.expect("commit cleanup"); + } + // ======================================================================== // Unit Tests - DSL Node Creation // ======================================================================== @@ -1398,6 +1443,159 @@ mod tests { assert!(backend_duroxide_schema().contains("duroxide")); } + #[pg_test] + fn test_prune_terminal_instances_respects_age_and_keep_count() { + let conn = test_database_connection_string(); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("tokio runtime"); + + let stats = rt.block_on(async { + let pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(1) + .connect(&conn) + .await + .expect("connect to test database"); + + let test_ids = [ + "aa261001", "aa261002", "aa261003", "aa261004", "aa261005", "aa261006", + ]; + let id_list = test_ids + .iter() + .map(|id| sql_literal(id)) + .collect::>() + .join(", "); + + delete_prune_test_rows(&pool, &id_list).await; + + let mut fixture_tx = pool.begin().await.expect("begin fixture transaction"); + sqlx::query("SET CONSTRAINTS ALL DEFERRED") + .execute(&mut *fixture_tx) + .await + .expect("defer fixture constraints"); + sqlx::query( + r#" + -- max_keep=2, retention=30d. Terminal rows are ranked newest-first + -- by COALESCE(completed_at, created_at): + -- aa261001 (1d, r1) -> keep (within cap, young) + -- aa261002 (2d, r2) -> keep (within cap, young) + -- aa261003 (3d, r3) -> PRUNE by the hard cap even though it is + -- only 3 days old (rank > max_keep) + -- aa261004 (40d, r4) -> PRUNE (beyond cap and past retention) + -- aa261005 (50d, r5) -> PRUNE (beyond cap and past retention) + -- aa261006 (running) -> keep (non-terminal, never considered) + -- aa261004/aa261005 are 'failed'/'cancelled' (NULL completed_at) with + -- a forged far-future `updated_at`; they must still rank/age by their + -- old `created_at`, proving `updated_at` is not trusted. + WITH fixtures(id, label, root_node, status, age_days, has_completed_at, forge_future_updated_at) AS ( + VALUES + ('aa261001', 'keep-recent-1', 'bb261001', 'completed', 1, true, false), + ('aa261002', 'keep-recent-2', 'bb261002', 'completed', 2, true, false), + ('aa261003', 'prune-young-over-cap', 'bb261003', 'completed', 3, true, false), + ('aa261004', 'prune-old-failed-forged', 'bb261004', 'failed', 40, false, true), + ('aa261005', 'prune-old-cancelled-forged', 'bb261005', 'cancelled', 50, false, true), + ('aa261006', 'keep-running', 'bb261006', 'running', 90, false, false) + ) + INSERT INTO df.instances + (id, label, root_node, status, submitted_by, created_at, updated_at, completed_at) + SELECT id, + label, + root_node, + status, + current_user::regrole, + pg_catalog.now() - (age_days::int * INTERVAL '1 day'), + CASE WHEN forge_future_updated_at + THEN pg_catalog.now() + INTERVAL '3650 days' + ELSE pg_catalog.now() - (age_days::int * INTERVAL '1 day') + END, + CASE WHEN has_completed_at + THEN pg_catalog.now() - (age_days::int * INTERVAL '1 day') + ELSE NULL + END + FROM fixtures; + "#, + ) + .execute(&mut *fixture_tx) + .await + .expect("insert prune instances"); + + sqlx::query( + r#" + WITH fixtures(id, instance_id, status, age_days) AS ( + VALUES + ('bb261001', 'aa261001', 'completed', 1), + ('bb261002', 'aa261002', 'completed', 2), + ('bb261003', 'aa261003', 'completed', 3), + ('bb261004', 'aa261004', 'failed', 40), + ('bb261005', 'aa261005', 'completed', 50), + ('bb261006', 'aa261006', 'running', 90) + ) + INSERT INTO df.nodes + (id, instance_id, node_type, query, status, submitted_by, created_at, updated_at) + SELECT id, + instance_id, + 'SQL', + 'SELECT 1', + status, + current_user::regrole, + pg_catalog.now() - (age_days::int * INTERVAL '1 day'), + pg_catalog.now() - (age_days::int * INTERVAL '1 day') + FROM fixtures; + "#, + ) + .execute(&mut *fixture_tx) + .await + .expect("insert prune nodes"); + + let stats = + crate::worker::prune_terminal_instances_transaction(&mut fixture_tx, 30, 2) + .await + .expect("prune terminal instances"); + + let remaining_instances: i64 = sqlx::query_scalar(&format!( + "SELECT pg_catalog.count(*)::bigint FROM df.instances WHERE id IN ({id_list})" + )) + .fetch_one(&mut *fixture_tx) + .await + .expect("count remaining instances"); + assert_eq!(remaining_instances, 3); + + let remaining_nodes: i64 = sqlx::query_scalar(&format!( + "SELECT pg_catalog.count(*)::bigint FROM df.nodes WHERE instance_id IN ({id_list})" + )) + .fetch_one(&mut *fixture_tx) + .await + .expect("count remaining nodes"); + assert_eq!(remaining_nodes, 3); + + let remaining_ids: Vec = sqlx::query_scalar(&format!( + "SELECT id FROM df.instances WHERE id IN ({id_list}) ORDER BY id" + )) + .fetch_all(&mut *fixture_tx) + .await + .expect("load remaining ids"); + assert_eq!(remaining_ids, vec!["aa261001", "aa261002", "aa261006"]); + + fixture_tx + .rollback() + .await + .expect("rollback prune test fixture"); + + pool.close().await; + stats + }); + + assert_eq!( + stats, + crate::worker::PruneStats { + instances_deleted: 3, + nodes_deleted: 3, + } + ); + } + // ======================================================================== // Unit Tests - Workflow Variables // ======================================================================== diff --git a/src/worker.rs b/src/worker.rs index 976a3fd..2bd3bf7 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -21,6 +21,23 @@ use crate::types::{ postgres_connection_string, resolve_duroxide_schema_pool, worker_provider_config, }; +const TERMINAL_INSTANCE_PRUNE_INTERVAL: Duration = Duration::from_secs(60 * 60); +// Retention policy for terminal ('completed'/'failed'/'cancelled') instances. +// A terminal instance is pruned when it is OUTSIDE the newest +// TERMINAL_INSTANCE_MAX_KEEP rows (a hard cap enforced regardless of age) OR +// older than TERMINAL_INSTANCE_RETENTION_DAYS. Equivalently, an instance is +// retained only while it is BOTH among the newest TERMINAL_INSTANCE_MAX_KEEP +// terminal rows AND younger than the retention window. The number of retained +// terminal instances therefore never exceeds TERMINAL_INSTANCE_MAX_KEEP. +const TERMINAL_INSTANCE_RETENTION_DAYS: i32 = 30; +const TERMINAL_INSTANCE_MAX_KEEP: i64 = 10_000; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) struct PruneStats { + pub instances_deleted: i64, + pub nodes_deleted: i64, +} + /// Initialize tracing subscriber for duroxide logs. /// Must be called before Runtime::start_with_store() to capture all logs. fn init_tracing() { @@ -228,6 +245,7 @@ async fn run_duroxide_runtime() { run_until_extension_dropped_or_shutdown( &poll_pool, + &mgmt_pool, duroxide_runtime, EXTENSION_DROP_POLL_INTERVAL, SHUTDOWN_CHECK_INTERVAL, @@ -627,8 +645,103 @@ async fn check_epoch_sentinel(pool: &sqlx::PgPool, epoch_id: &str) -> bool { matches!(result, Ok(Some(_))) } +async fn prune_terminal_instances(pool: &sqlx::PgPool) -> Result { + prune_terminal_instances_with_limits( + pool, + TERMINAL_INSTANCE_RETENTION_DAYS, + TERMINAL_INSTANCE_MAX_KEEP, + ) + .await +} + +pub(crate) async fn prune_terminal_instances_with_limits( + pool: &sqlx::PgPool, + retention_days: i32, + max_keep: i64, +) -> Result { + let mut tx = pool.begin().await?; + let stats = prune_terminal_instances_transaction(&mut tx, retention_days, max_keep).await?; + tx.commit().await?; + + Ok(stats) +} + +pub(crate) async fn prune_terminal_instances_transaction( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + retention_days: i32, + max_keep: i64, +) -> Result { + sqlx::query("SET CONSTRAINTS ALL DEFERRED") + .execute(&mut **tx) + .await?; + + let (instances_deleted, nodes_deleted): (i64, i64) = sqlx::query_as( + r#" + WITH terminal_instances AS ( + SELECT + ranked.id, + ranked.terminal_at, + pg_catalog.row_number() OVER ( + ORDER BY ranked.terminal_at DESC NULLS LAST, ranked.id DESC + ) AS terminal_rank + FROM ( + SELECT + id, + -- Rank/age only by server-controlled timestamps. `updated_at` + -- is intentionally excluded: PUBLIC has column-level UPDATE on + -- it, and for 'failed'/'cancelled' rows `completed_at` is NULL, + -- so including `updated_at` would let a low-privilege user forge + -- the prune ordering/age. `completed_at` (set by the worker on + -- completion) and `created_at` (insert-time default, not + -- grantable) are not user-writable. + COALESCE(completed_at, created_at) AS terminal_at + FROM df.instances + WHERE status OPERATOR(pg_catalog.=) ANY (ARRAY['completed', 'failed', 'cancelled']) + ) ranked + ), + prune_candidates AS ( + SELECT id + FROM terminal_instances + -- Prune when the row is beyond the newest $1 terminal instances (a + -- hard cap enforced regardless of age) OR older than the retention + -- window ($2 days). Retained rows are thus always within the newest + -- $1 AND younger than the retention window, so the retained terminal + -- count never exceeds $1. + WHERE terminal_rank OPERATOR(pg_catalog.>) $1 + OR terminal_at OPERATOR(pg_catalog.<) + (pg_catalog.now() OPERATOR(pg_catalog.-) pg_catalog.make_interval(days => $2::int)) + ), + deleted_nodes AS ( + DELETE FROM df.nodes n + USING prune_candidates pc + WHERE n.instance_id OPERATOR(pg_catalog.=) pc.id + RETURNING n.instance_id + ), + deleted_instances AS ( + DELETE FROM df.instances i + USING prune_candidates pc + WHERE i.id OPERATOR(pg_catalog.=) pc.id + RETURNING i.id + ) + SELECT + (SELECT pg_catalog.count(*)::bigint FROM deleted_instances) AS instances_deleted, + (SELECT pg_catalog.count(*)::bigint FROM deleted_nodes) AS nodes_deleted + "#, + ) + .bind(max_keep) + .bind(retention_days) + .fetch_one(&mut **tx) + .await?; + + Ok(PruneStats { + instances_deleted, + nodes_deleted, + }) +} + async fn run_until_extension_dropped_or_shutdown( poll_pool: &sqlx::PgPool, + maintenance_pool: &sqlx::PgPool, duroxide_runtime: Arc, drop_poll_interval: Duration, shutdown_check_interval: Duration, @@ -639,6 +752,12 @@ async fn run_until_extension_dropped_or_shutdown( let mut drop_check = tokio::time::interval(drop_poll_interval); drop_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + let mut prune_check = tokio::time::interval_at( + tokio::time::Instant::now() + TERMINAL_INSTANCE_PRUNE_INTERVAL, + TERMINAL_INSTANCE_PRUNE_INTERVAL, + ); + prune_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + loop { tokio::select! { _ = tokio::time::sleep(shutdown_check_interval) => { @@ -658,6 +777,21 @@ async fn run_until_extension_dropped_or_shutdown( break; } } + _ = prune_check.tick() => { + match prune_terminal_instances(maintenance_pool).await { + Ok(stats) if stats.instances_deleted > 0 || stats.nodes_deleted > 0 => { + log!( + "pg_durable: pruned {} terminal instance(s) and {} node row(s)", + stats.instances_deleted, + stats.nodes_deleted + ); + } + Ok(_) => {} + Err(e) => { + log!("pg_durable: terminal instance pruning failed: {}", e); + } + } + } } }