From 312e4180b495bb02d4e89cb412a098b360f61fec Mon Sep 17 00:00:00 2001 From: Nanook Date: Wed, 24 Jun 2026 04:38:24 +0000 Subject: [PATCH 1/4] fix: prune old terminal instances --- src/lib.rs | 176 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/worker.rs | 115 +++++++++++++++++++++++++++++++++ 2 files changed, 291 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 60fce55..924dbb2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -858,6 +858,44 @@ mod tests { }) } + fn sql_literal(s: &str) -> String { + format!("'{}'", s.replace('\'', "''")) + } + + fn test_database_connection_string() -> String { + use crate::types::{get_host, get_port, get_worker_role}; + + let database = Spi::get_one::("SELECT pg_catalog.current_database()") + .expect("current_database query should succeed") + .expect("current_database should return a value"); + format!( + "postgres://{}@{}:{}/{}", + get_worker_role(), + get_host(), + get_port(), + database + ) + } + + async fn delete_prune_test_rows(pool: &sqlx::PgPool, id_list: &str) { + let mut tx = pool.begin().await.expect("begin cleanup transaction"); + sqlx::query("SET CONSTRAINTS ALL DEFERRED") + .execute(&mut *tx) + .await + .expect("defer cleanup constraints"); + sqlx::query(&format!( + "DELETE FROM df.nodes WHERE instance_id IN ({id_list})" + )) + .execute(&mut *tx) + .await + .expect("clean prune test nodes"); + sqlx::query(&format!("DELETE FROM df.instances WHERE id IN ({id_list})")) + .execute(&mut *tx) + .await + .expect("clean prune test instances"); + tx.commit().await.expect("commit cleanup"); + } + // ======================================================================== // Unit Tests - DSL Node Creation // ======================================================================== @@ -1398,6 +1436,144 @@ mod tests { assert!(backend_duroxide_schema().contains("duroxide")); } + #[pg_test] + fn test_prune_terminal_instances_respects_age_and_keep_count() { + let conn = test_database_connection_string(); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("tokio runtime"); + + let stats = rt.block_on(async { + let pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(1) + .connect(&conn) + .await + .expect("connect to test database"); + + let test_ids = [ + "aa261001", "aa261002", "aa261003", "aa261004", "aa261005", "aa261006", + ]; + let id_list = test_ids + .iter() + .map(|id| sql_literal(id)) + .collect::>() + .join(", "); + + delete_prune_test_rows(&pool, &id_list).await; + + let mut fixture_tx = pool.begin().await.expect("begin fixture transaction"); + sqlx::query("SET CONSTRAINTS ALL DEFERRED") + .execute(&mut *fixture_tx) + .await + .expect("defer fixture constraints"); + sqlx::query( + r#" + WITH fixtures(id, label, root_node, status, age_days, has_completed_at) AS ( + VALUES + ('aa261001', 'prune-old-completed', 'bb261001', 'completed', 60, true), + ('aa261002', 'prune-old-failed', 'bb261002', 'failed', 50, false), + ('aa261003', 'keep-old-because-rank', 'bb261003', 'completed', 40, true), + ('aa261004', 'keep-recent-terminal', 'bb261004', 'completed', 5, true), + ('aa261005', 'keep-running', 'bb261005', 'running', 90, false), + ('aa261006', 'prune-old-cancelled', 'bb261006', 'cancelled', 80, false) + ) + INSERT INTO df.instances + (id, label, root_node, status, submitted_by, created_at, updated_at, completed_at) + SELECT id, + label, + root_node, + status, + current_user::regrole, + pg_catalog.now() - (age_days::int * INTERVAL '1 day'), + pg_catalog.now() - (age_days::int * INTERVAL '1 day'), + CASE WHEN has_completed_at + THEN pg_catalog.now() - (age_days::int * INTERVAL '1 day') + ELSE NULL + END + FROM fixtures; + "#, + ) + .execute(&mut *fixture_tx) + .await + .expect("insert prune instances"); + + sqlx::query( + r#" + WITH fixtures(id, instance_id, status, age_days) AS ( + VALUES + ('bb261001', 'aa261001', 'completed', 60), + ('bb261002', 'aa261002', 'failed', 50), + ('bb261003', 'aa261003', 'completed', 40), + ('bb261004', 'aa261004', 'completed', 5), + ('bb261005', 'aa261005', 'running', 90), + ('bb261006', 'aa261006', 'completed', 80) + ) + INSERT INTO df.nodes + (id, instance_id, node_type, query, status, submitted_by, created_at, updated_at) + SELECT id, + instance_id, + 'SQL', + 'SELECT 1', + status, + current_user::regrole, + pg_catalog.now() - (age_days::int * INTERVAL '1 day'), + pg_catalog.now() - (age_days::int * INTERVAL '1 day') + FROM fixtures; + "#, + ) + .execute(&mut *fixture_tx) + .await + .expect("insert prune nodes"); + + let stats = + crate::worker::prune_terminal_instances_transaction(&mut fixture_tx, 30, 2) + .await + .expect("prune terminal instances"); + + let remaining_instances: i64 = sqlx::query_scalar(&format!( + "SELECT pg_catalog.count(*)::bigint FROM df.instances WHERE id IN ({id_list})" + )) + .fetch_one(&mut *fixture_tx) + .await + .expect("count remaining instances"); + assert_eq!(remaining_instances, 3); + + let remaining_nodes: i64 = sqlx::query_scalar(&format!( + "SELECT pg_catalog.count(*)::bigint FROM df.nodes WHERE instance_id IN ({id_list})" + )) + .fetch_one(&mut *fixture_tx) + .await + .expect("count remaining nodes"); + assert_eq!(remaining_nodes, 3); + + let remaining_ids: Vec = sqlx::query_scalar(&format!( + "SELECT id FROM df.instances WHERE id IN ({id_list}) ORDER BY id" + )) + .fetch_all(&mut *fixture_tx) + .await + .expect("load remaining ids"); + assert_eq!(remaining_ids, vec!["aa261003", "aa261004", "aa261005"]); + + fixture_tx + .rollback() + .await + .expect("rollback prune test fixture"); + + pool.close().await; + stats + }); + + assert_eq!( + stats, + crate::worker::PruneStats { + instances_deleted: 3, + nodes_deleted: 3, + } + ); + } + // ======================================================================== // Unit Tests - Workflow Variables // ======================================================================== diff --git a/src/worker.rs b/src/worker.rs index 976a3fd..387922d 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -21,6 +21,16 @@ use crate::types::{ postgres_connection_string, resolve_duroxide_schema_pool, worker_provider_config, }; +const TERMINAL_INSTANCE_PRUNE_INTERVAL: Duration = Duration::from_secs(60 * 60); +const TERMINAL_INSTANCE_RETENTION_DAYS: i32 = 30; +const TERMINAL_INSTANCE_MIN_KEEP: i64 = 10_000; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) struct PruneStats { + pub instances_deleted: i64, + pub nodes_deleted: i64, +} + /// Initialize tracing subscriber for duroxide logs. /// Must be called before Runtime::start_with_store() to capture all logs. fn init_tracing() { @@ -228,6 +238,7 @@ async fn run_duroxide_runtime() { run_until_extension_dropped_or_shutdown( &poll_pool, + &mgmt_pool, duroxide_runtime, EXTENSION_DROP_POLL_INTERVAL, SHUTDOWN_CHECK_INTERVAL, @@ -627,8 +638,91 @@ async fn check_epoch_sentinel(pool: &sqlx::PgPool, epoch_id: &str) -> bool { matches!(result, Ok(Some(_))) } +async fn prune_terminal_instances(pool: &sqlx::PgPool) -> Result { + prune_terminal_instances_with_limits( + pool, + TERMINAL_INSTANCE_RETENTION_DAYS, + TERMINAL_INSTANCE_MIN_KEEP, + ) + .await +} + +pub(crate) async fn prune_terminal_instances_with_limits( + pool: &sqlx::PgPool, + retention_days: i32, + min_keep: i64, +) -> Result { + let mut tx = pool.begin().await?; + let stats = prune_terminal_instances_transaction(&mut tx, retention_days, min_keep).await?; + tx.commit().await?; + + Ok(stats) +} + +pub(crate) async fn prune_terminal_instances_transaction( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + retention_days: i32, + min_keep: i64, +) -> Result { + sqlx::query("SET CONSTRAINTS ALL DEFERRED") + .execute(&mut **tx) + .await?; + + let (instances_deleted, nodes_deleted): (i64, i64) = sqlx::query_as( + r#" + WITH terminal_instances AS ( + SELECT + ranked.id, + ranked.terminal_at, + pg_catalog.row_number() OVER ( + ORDER BY ranked.terminal_at DESC NULLS LAST, ranked.id DESC + ) AS terminal_rank + FROM ( + SELECT + id, + COALESCE(completed_at, updated_at, created_at) AS terminal_at + FROM df.instances + WHERE status OPERATOR(pg_catalog.=) ANY (ARRAY['completed', 'failed', 'cancelled']) + ) ranked + ), + prune_candidates AS ( + SELECT id + FROM terminal_instances + WHERE terminal_rank OPERATOR(pg_catalog.>) $1 + AND terminal_at OPERATOR(pg_catalog.<) + pg_catalog.now() OPERATOR(pg_catalog.-) pg_catalog.make_interval(days => $2::int) + ), + deleted_nodes AS ( + DELETE FROM df.nodes n + USING prune_candidates pc + WHERE n.instance_id OPERATOR(pg_catalog.=) pc.id + RETURNING n.instance_id + ), + deleted_instances AS ( + DELETE FROM df.instances i + USING prune_candidates pc + WHERE i.id OPERATOR(pg_catalog.=) pc.id + RETURNING i.id + ) + SELECT + (SELECT pg_catalog.count(*)::bigint FROM deleted_instances) AS instances_deleted, + (SELECT pg_catalog.count(*)::bigint FROM deleted_nodes) AS nodes_deleted + "#, + ) + .bind(min_keep) + .bind(retention_days) + .fetch_one(&mut **tx) + .await?; + + Ok(PruneStats { + instances_deleted, + nodes_deleted, + }) +} + async fn run_until_extension_dropped_or_shutdown( poll_pool: &sqlx::PgPool, + maintenance_pool: &sqlx::PgPool, duroxide_runtime: Arc, drop_poll_interval: Duration, shutdown_check_interval: Duration, @@ -639,6 +733,12 @@ async fn run_until_extension_dropped_or_shutdown( let mut drop_check = tokio::time::interval(drop_poll_interval); drop_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + let mut prune_check = tokio::time::interval_at( + tokio::time::Instant::now() + TERMINAL_INSTANCE_PRUNE_INTERVAL, + TERMINAL_INSTANCE_PRUNE_INTERVAL, + ); + prune_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + loop { tokio::select! { _ = tokio::time::sleep(shutdown_check_interval) => { @@ -658,6 +758,21 @@ async fn run_until_extension_dropped_or_shutdown( break; } } + _ = prune_check.tick() => { + match prune_terminal_instances(maintenance_pool).await { + Ok(stats) if stats.instances_deleted > 0 || stats.nodes_deleted > 0 => { + log!( + "pg_durable: pruned {} terminal instance(s) and {} node row(s)", + stats.instances_deleted, + stats.nodes_deleted + ); + } + Ok(_) => {} + Err(e) => { + log!("pg_durable: terminal instance pruning failed: {}", e); + } + } + } } } From c455288871aba229d07a2d48c6aef3b5879c28a4 Mon Sep 17 00:00:00 2001 From: Nanook Date: Wed, 24 Jun 2026 19:12:23 +0000 Subject: [PATCH 2/4] test: cast current_database to text in prune test helper --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 924dbb2..9af09f9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -865,7 +865,7 @@ mod tests { fn test_database_connection_string() -> String { use crate::types::{get_host, get_port, get_worker_role}; - let database = Spi::get_one::("SELECT pg_catalog.current_database()") + let database = Spi::get_one::("SELECT pg_catalog.current_database()::text") .expect("current_database query should succeed") .expect("current_database should return a value"); format!( From 717e021bb5479e0c3425ead12daee4799e262383 Mon Sep 17 00:00:00 2001 From: Pino de Candia <32303022+pinodeca@users.noreply.github.com> Date: Thu, 25 Jun 2026 12:12:28 +0000 Subject: [PATCH 3/4] fix: correct prune query and harden ranking against forged updated_at MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix operator precedence bug in the prune query: explicit OPERATOR(pg_catalog.*) operators share one precedence and are left-associative, so 'terminal_at < now() - interval' parsed as '(terminal_at < now()) - interval' (boolean - interval). Parenthesize the now() - make_interval() expression. - Rank/age terminal rows by COALESCE(completed_at, created_at) instead of COALESCE(completed_at, updated_at, created_at). PUBLIC has column-level UPDATE on updated_at and failed/cancelled rows have NULL completed_at, so including updated_at let a low-privilege user forge the prune ordering/age. completed_at and created_at are not user-writable. - Connect the prune pgrx test as current_user instead of the worker-role GUC (defaults to 'postgres', which does not exist in the pgrx test cluster) — fixes the CI unit-test failure. - Extend the test with forged far-future updated_at on the failed and cancelled fixtures to lock in the hardening as a regression test. --- src/lib.rs | 37 +++++++++++++++++++++++++++---------- src/worker.rs | 11 +++++++++-- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 9af09f9..a840417 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -863,14 +863,21 @@ mod tests { } fn test_database_connection_string() -> String { - use crate::types::{get_host, get_port, get_worker_role}; + use crate::types::{get_host, get_port}; let database = Spi::get_one::("SELECT pg_catalog.current_database()::text") .expect("current_database query should succeed") .expect("current_database should return a value"); + // Connect as the current session role rather than the worker role GUC + // (which defaults to "postgres"). The pgrx test cluster's superuser is + // the OS account, so "postgres" may not exist; the current role always + // does and can log in. + let role = Spi::get_one::("SELECT current_user::text") + .expect("current_user query should succeed") + .expect("current_user should return a value"); format!( "postgres://{}@{}:{}/{}", - get_worker_role(), + role, get_host(), get_port(), database @@ -1470,14 +1477,21 @@ mod tests { .expect("defer fixture constraints"); sqlx::query( r#" - WITH fixtures(id, label, root_node, status, age_days, has_completed_at) AS ( + -- `forge_future_updated_at` rows simulate a low-privilege user + -- who set `updated_at` far into the future on their own terminal + -- rows (PUBLIC has column-level UPDATE on `updated_at`). These + -- rows have NULL `completed_at` ('failed'/'cancelled'), so if the + -- prune ranked/aged by `updated_at` they would float to the top of + -- the keep-window and never age out. The pruner must ignore + -- `updated_at` and still prune them by their (old) `created_at`. + WITH fixtures(id, label, root_node, status, age_days, has_completed_at, forge_future_updated_at) AS ( VALUES - ('aa261001', 'prune-old-completed', 'bb261001', 'completed', 60, true), - ('aa261002', 'prune-old-failed', 'bb261002', 'failed', 50, false), - ('aa261003', 'keep-old-because-rank', 'bb261003', 'completed', 40, true), - ('aa261004', 'keep-recent-terminal', 'bb261004', 'completed', 5, true), - ('aa261005', 'keep-running', 'bb261005', 'running', 90, false), - ('aa261006', 'prune-old-cancelled', 'bb261006', 'cancelled', 80, false) + ('aa261001', 'prune-old-completed', 'bb261001', 'completed', 60, true, false), + ('aa261002', 'prune-old-failed', 'bb261002', 'failed', 50, false, true), + ('aa261003', 'keep-old-because-rank', 'bb261003', 'completed', 40, true, false), + ('aa261004', 'keep-recent-terminal', 'bb261004', 'completed', 5, true, false), + ('aa261005', 'keep-running', 'bb261005', 'running', 90, false, false), + ('aa261006', 'prune-old-cancelled', 'bb261006', 'cancelled', 80, false, true) ) INSERT INTO df.instances (id, label, root_node, status, submitted_by, created_at, updated_at, completed_at) @@ -1487,7 +1501,10 @@ mod tests { status, current_user::regrole, pg_catalog.now() - (age_days::int * INTERVAL '1 day'), - pg_catalog.now() - (age_days::int * INTERVAL '1 day'), + CASE WHEN forge_future_updated_at + THEN pg_catalog.now() + INTERVAL '3650 days' + ELSE pg_catalog.now() - (age_days::int * INTERVAL '1 day') + END, CASE WHEN has_completed_at THEN pg_catalog.now() - (age_days::int * INTERVAL '1 day') ELSE NULL diff --git a/src/worker.rs b/src/worker.rs index 387922d..2495af9 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -680,7 +680,14 @@ pub(crate) async fn prune_terminal_instances_transaction( FROM ( SELECT id, - COALESCE(completed_at, updated_at, created_at) AS terminal_at + -- Rank/age only by server-controlled timestamps. `updated_at` + -- is intentionally excluded: PUBLIC has column-level UPDATE on + -- it, and for 'failed'/'cancelled' rows `completed_at` is NULL, + -- so including `updated_at` would let a low-privilege user forge + -- the prune ordering/age. `completed_at` (set by the worker on + -- completion) and `created_at` (insert-time default, not + -- grantable) are not user-writable. + COALESCE(completed_at, created_at) AS terminal_at FROM df.instances WHERE status OPERATOR(pg_catalog.=) ANY (ARRAY['completed', 'failed', 'cancelled']) ) ranked @@ -690,7 +697,7 @@ pub(crate) async fn prune_terminal_instances_transaction( FROM terminal_instances WHERE terminal_rank OPERATOR(pg_catalog.>) $1 AND terminal_at OPERATOR(pg_catalog.<) - pg_catalog.now() OPERATOR(pg_catalog.-) pg_catalog.make_interval(days => $2::int) + (pg_catalog.now() OPERATOR(pg_catalog.-) pg_catalog.make_interval(days => $2::int)) ), deleted_nodes AS ( DELETE FROM df.nodes n From a8146d9a50ce22d2ce9bf6c627a7a10f275873ce Mon Sep 17 00:00:00 2001 From: Pino de Candia <32303022+pinodeca@users.noreply.github.com> Date: Thu, 25 Jun 2026 12:27:27 +0000 Subject: [PATCH 4/4] feat: enforce hard 10,000-instance cap on terminal pruning Previously a terminal instance was pruned only when it was BOTH beyond the newest 10,000 AND older than 30 days, so the retained set could grow without bound when more than 10,000 terminal instances completed within the retention window. Change the keep predicate from AND to OR so an instance is pruned when it is beyond the newest 10,000 (hard cap, regardless of age) OR older than 30 days. The retained terminal count therefore never exceeds 10,000. - Rename TERMINAL_INSTANCE_MIN_KEEP -> TERMINAL_INSTANCE_MAX_KEEP and the function parameter min_keep -> max_keep to reflect the cap semantics. - Update the prune unit test to cover a young row pruned by the cap, old rows pruned by retention, non-terminal retention, and the forged-updated_at regression. - Document the retention/cap policy in USER_GUIDE.md (Monitoring section). --- USER_GUIDE.md | 33 +++++++++++++++++++++++++++++++++ src/lib.rs | 45 +++++++++++++++++++++++++-------------------- src/worker.rs | 26 +++++++++++++++++++------- 3 files changed, 77 insertions(+), 27 deletions(-) diff --git a/USER_GUIDE.md b/USER_GUIDE.md index 7fca2ec..61e6d7c 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -1507,6 +1507,39 @@ SELECT started_at, last_seen_at, The background worker updates `last_seen_at` every ~5 seconds as part of its normal operation. +### Data Retention (Automatic Pruning) + +To keep `df.instances` and `df.nodes` from growing without bound, the background +worker runs a **best-effort pruning pass roughly once an hour** that deletes old +**terminal** instances (status `completed`, `failed`, or `cancelled`) and their +associated `df.nodes` rows. Running and pending instances are **never** pruned, +regardless of age. + +The policy is currently fixed (no configuration GUC): + +- **Hard cap — at most 10,000 terminal instances are retained, regardless of + age.** The newest 10,000 terminal instances are kept; any beyond that are + pruned even if they are only minutes old. +- **Retention window — 30 days.** Terminal instances older than 30 days are + pruned even if the table holds fewer than 10,000 of them. + +Equivalently, a terminal instance is retained only while it is **both** among the +newest 10,000 terminal instances **and** less than 30 days old; otherwise it is +eligible for pruning. + +Notes: + +- "Age" is measured from `completed_at` when set (instances that reached + `completed`), otherwise from `created_at`. `updated_at` is intentionally **not** + used, because it is user-writable and would let a low-privilege user influence + pruning. +- Pruning runs in a single transaction with foreign-key constraints deferred: + matching `df.nodes` rows are deleted first, then the instance rows. +- Pruning is best-effort: if a pass fails it is logged and retried on the next + interval; it never stops workflow execution. +- If you need to retain terminal history beyond these limits (e.g. for auditing), + copy the rows you care about into your own table before they age out. + --- ## User Isolation & Privileges diff --git a/src/lib.rs b/src/lib.rs index a840417..5f8a200 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1477,21 +1477,26 @@ mod tests { .expect("defer fixture constraints"); sqlx::query( r#" - -- `forge_future_updated_at` rows simulate a low-privilege user - -- who set `updated_at` far into the future on their own terminal - -- rows (PUBLIC has column-level UPDATE on `updated_at`). These - -- rows have NULL `completed_at` ('failed'/'cancelled'), so if the - -- prune ranked/aged by `updated_at` they would float to the top of - -- the keep-window and never age out. The pruner must ignore - -- `updated_at` and still prune them by their (old) `created_at`. + -- max_keep=2, retention=30d. Terminal rows are ranked newest-first + -- by COALESCE(completed_at, created_at): + -- aa261001 (1d, r1) -> keep (within cap, young) + -- aa261002 (2d, r2) -> keep (within cap, young) + -- aa261003 (3d, r3) -> PRUNE by the hard cap even though it is + -- only 3 days old (rank > max_keep) + -- aa261004 (40d, r4) -> PRUNE (beyond cap and past retention) + -- aa261005 (50d, r5) -> PRUNE (beyond cap and past retention) + -- aa261006 (running) -> keep (non-terminal, never considered) + -- aa261004/aa261005 are 'failed'/'cancelled' (NULL completed_at) with + -- a forged far-future `updated_at`; they must still rank/age by their + -- old `created_at`, proving `updated_at` is not trusted. WITH fixtures(id, label, root_node, status, age_days, has_completed_at, forge_future_updated_at) AS ( VALUES - ('aa261001', 'prune-old-completed', 'bb261001', 'completed', 60, true, false), - ('aa261002', 'prune-old-failed', 'bb261002', 'failed', 50, false, true), - ('aa261003', 'keep-old-because-rank', 'bb261003', 'completed', 40, true, false), - ('aa261004', 'keep-recent-terminal', 'bb261004', 'completed', 5, true, false), - ('aa261005', 'keep-running', 'bb261005', 'running', 90, false, false), - ('aa261006', 'prune-old-cancelled', 'bb261006', 'cancelled', 80, false, true) + ('aa261001', 'keep-recent-1', 'bb261001', 'completed', 1, true, false), + ('aa261002', 'keep-recent-2', 'bb261002', 'completed', 2, true, false), + ('aa261003', 'prune-young-over-cap', 'bb261003', 'completed', 3, true, false), + ('aa261004', 'prune-old-failed-forged', 'bb261004', 'failed', 40, false, true), + ('aa261005', 'prune-old-cancelled-forged', 'bb261005', 'cancelled', 50, false, true), + ('aa261006', 'keep-running', 'bb261006', 'running', 90, false, false) ) INSERT INTO df.instances (id, label, root_node, status, submitted_by, created_at, updated_at, completed_at) @@ -1520,12 +1525,12 @@ mod tests { r#" WITH fixtures(id, instance_id, status, age_days) AS ( VALUES - ('bb261001', 'aa261001', 'completed', 60), - ('bb261002', 'aa261002', 'failed', 50), - ('bb261003', 'aa261003', 'completed', 40), - ('bb261004', 'aa261004', 'completed', 5), - ('bb261005', 'aa261005', 'running', 90), - ('bb261006', 'aa261006', 'completed', 80) + ('bb261001', 'aa261001', 'completed', 1), + ('bb261002', 'aa261002', 'completed', 2), + ('bb261003', 'aa261003', 'completed', 3), + ('bb261004', 'aa261004', 'failed', 40), + ('bb261005', 'aa261005', 'completed', 50), + ('bb261006', 'aa261006', 'running', 90) ) INSERT INTO df.nodes (id, instance_id, node_type, query, status, submitted_by, created_at, updated_at) @@ -1571,7 +1576,7 @@ mod tests { .fetch_all(&mut *fixture_tx) .await .expect("load remaining ids"); - assert_eq!(remaining_ids, vec!["aa261003", "aa261004", "aa261005"]); + assert_eq!(remaining_ids, vec!["aa261001", "aa261002", "aa261006"]); fixture_tx .rollback() diff --git a/src/worker.rs b/src/worker.rs index 2495af9..2bd3bf7 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -22,8 +22,15 @@ use crate::types::{ }; const TERMINAL_INSTANCE_PRUNE_INTERVAL: Duration = Duration::from_secs(60 * 60); +// Retention policy for terminal ('completed'/'failed'/'cancelled') instances. +// A terminal instance is pruned when it is OUTSIDE the newest +// TERMINAL_INSTANCE_MAX_KEEP rows (a hard cap enforced regardless of age) OR +// older than TERMINAL_INSTANCE_RETENTION_DAYS. Equivalently, an instance is +// retained only while it is BOTH among the newest TERMINAL_INSTANCE_MAX_KEEP +// terminal rows AND younger than the retention window. The number of retained +// terminal instances therefore never exceeds TERMINAL_INSTANCE_MAX_KEEP. const TERMINAL_INSTANCE_RETENTION_DAYS: i32 = 30; -const TERMINAL_INSTANCE_MIN_KEEP: i64 = 10_000; +const TERMINAL_INSTANCE_MAX_KEEP: i64 = 10_000; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) struct PruneStats { @@ -642,7 +649,7 @@ async fn prune_terminal_instances(pool: &sqlx::PgPool) -> Result Result Result { let mut tx = pool.begin().await?; - let stats = prune_terminal_instances_transaction(&mut tx, retention_days, min_keep).await?; + let stats = prune_terminal_instances_transaction(&mut tx, retention_days, max_keep).await?; tx.commit().await?; Ok(stats) @@ -662,7 +669,7 @@ pub(crate) async fn prune_terminal_instances_with_limits( pub(crate) async fn prune_terminal_instances_transaction( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, retention_days: i32, - min_keep: i64, + max_keep: i64, ) -> Result { sqlx::query("SET CONSTRAINTS ALL DEFERRED") .execute(&mut **tx) @@ -695,8 +702,13 @@ pub(crate) async fn prune_terminal_instances_transaction( prune_candidates AS ( SELECT id FROM terminal_instances + -- Prune when the row is beyond the newest $1 terminal instances (a + -- hard cap enforced regardless of age) OR older than the retention + -- window ($2 days). Retained rows are thus always within the newest + -- $1 AND younger than the retention window, so the retained terminal + -- count never exceeds $1. WHERE terminal_rank OPERATOR(pg_catalog.>) $1 - AND terminal_at OPERATOR(pg_catalog.<) + OR terminal_at OPERATOR(pg_catalog.<) (pg_catalog.now() OPERATOR(pg_catalog.-) pg_catalog.make_interval(days => $2::int)) ), deleted_nodes AS ( @@ -716,7 +728,7 @@ pub(crate) async fn prune_terminal_instances_transaction( (SELECT pg_catalog.count(*)::bigint FROM deleted_nodes) AS nodes_deleted "#, ) - .bind(min_keep) + .bind(max_keep) .bind(retention_days) .fetch_one(&mut **tx) .await?;