diff --git a/Cargo.toml b/Cargo.toml index cb769e6c..317e864a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pg_durable" -version = "0.2.2" +version = "0.2.3" edition = "2021" license = "MIT" repository = "https://github.com/Azure/pg_durable" diff --git a/USER_GUIDE.md b/USER_GUIDE.md index 45941e98..83bb9d42 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -1032,7 +1032,7 @@ SELECT df.start( SELECT df.cancel('a1b2c3d4', 'Manual stop'); -- Find by label first, then cancel -SELECT instance_id FROM df.list_instances() WHERE label = 'every-minute-tick'; +SELECT instance_id FROM df.list_instances(filter_label => 'every-minute-tick'); -- Then cancel with the found ID SELECT df.cancel('found_id', 'Stopping cron job'); ``` @@ -1389,6 +1389,12 @@ SELECT * FROM df.list_instances('Running'); SELECT * FROM df.list_instances('Completed'); SELECT * FROM df.list_instances('Failed'); +-- Filter by label +SELECT * FROM df.list_instances(filter_label => 'my-job'); + +-- Filter by status and label +SELECT * FROM df.list_instances('Completed', filter_label => 'my-job'); + -- With limit SELECT * FROM df.list_instances(NULL, 10); ``` @@ -1625,7 +1631,7 @@ GRANT EXECUTE ON FUNCTION df.result(text) TO app_role; GRANT EXECUTE ON FUNCTION df.cancel(text, text) TO app_role; GRANT EXECUTE ON FUNCTION df.wait_for_completion(text, integer) TO app_role; GRANT EXECUTE ON FUNCTION df.run(text) TO app_role; -GRANT EXECUTE ON FUNCTION df.list_instances(text, integer) TO app_role; +GRANT EXECUTE ON FUNCTION df.list_instances(text, integer, text) TO app_role; GRANT EXECUTE ON FUNCTION df.instance_info(text) TO app_role; GRANT EXECUTE ON FUNCTION df.instance_nodes(text, integer) TO app_role; GRANT EXECUTE ON FUNCTION df.instance_executions(text, integer) TO app_role; diff --git a/sql/pg_durable--0.1.1.sql b/sql/pg_durable--0.1.1.sql index 221c1cdb..7dbc426b 100644 --- a/sql/pg_durable--0.1.1.sql +++ b/sql/pg_durable--0.1.1.sql @@ -3444,7 +3444,8 @@ SET LOCAL search_path TO @extschema@; -- pg_durable::monitoring::list_instances CREATE FUNCTION df."list_instances"( "status_filter" TEXT DEFAULT NULL, /* core::option::Option<&str> */ - "limit_count" INT DEFAULT 100 /* i32 */ + "limit_count" INT DEFAULT 100, /* i32 */ + "filter_label" TEXT DEFAULT NULL /* core::option::Option<&str> */ ) RETURNS TABLE ( "instance_id" TEXT, /* alloc::string::String */ "label" TEXT, /* core::option::Option */ diff --git a/sql/pg_durable--0.2.2--0.2.3.sql b/sql/pg_durable--0.2.2--0.2.3.sql new file mode 100644 index 00000000..e2d6d2c0 --- /dev/null +++ b/sql/pg_durable--0.2.2--0.2.3.sql @@ -0,0 +1,22 @@ +-- pg_durable upgrade: 0.2.2 → 0.2.3 +-- +-- Add filter_label parameter to df.list_instances(). +-- See issue: Add a filter_label parameter to list_instances function + +-- Drop the old signature so we can replace it with the new one. +DROP FUNCTION IF EXISTS df.list_instances(TEXT, INT); + +CREATE FUNCTION df."list_instances"( + "status_filter" TEXT DEFAULT NULL, + "limit_count" INT DEFAULT 100, + "filter_label" TEXT DEFAULT NULL +) RETURNS TABLE ( + "instance_id" TEXT, + "label" TEXT, + "function_name" TEXT, + "status" TEXT, + "execution_count" bigint, + "output" TEXT +) +LANGUAGE c +AS 'MODULE_PATHNAME', 'list_instances_wrapper'; diff --git a/src/monitoring.rs b/src/monitoring.rs index fabd7599..fb8aa261 100644 --- a/src/monitoring.rs +++ b/src/monitoring.rs @@ -11,11 +11,12 @@ use crate::types::{new_backend_provider, postgres_connection_string}; // Monitoring Functions // ============================================================================ -/// List all durable function instances, optionally filtered by status. +/// List all durable function instances, optionally filtered by status and/or label. #[pg_extern(schema = "df")] pub fn list_instances( status_filter: default!(Option<&str>, "NULL"), limit_count: default!(i32, "100"), + filter_label: default!(Option<&str>, "NULL"), ) -> TableIterator< 'static, ( @@ -42,16 +43,23 @@ pub fn list_instances( let user_instances: Vec<(String, Option, String)> = Spi::connect(|client| { use pgrx::datum::DatumWithOid; - let (sql, args): (&str, Vec) = if let Some(status) = status_filter { - ( + let (sql, args): (&str, Vec) = match (status_filter, filter_label) { + (Some(status), Some(label)) => ( + "SELECT id, label, status FROM df.instances WHERE status = $1 AND label = $2 ORDER BY created_at DESC LIMIT $3", + vec![status.into(), label.into(), (limit_count as i64).into()], + ), + (Some(status), None) => ( "SELECT id, label, status FROM df.instances WHERE status = $1 ORDER BY created_at DESC LIMIT $2", vec![status.into(), (limit_count as i64).into()], - ) - } else { - ( + ), + (None, Some(label)) => ( + "SELECT id, label, status FROM df.instances WHERE label = $1 ORDER BY created_at DESC LIMIT $2", + vec![label.into(), (limit_count as i64).into()], + ), + (None, None) => ( "SELECT id, label, status FROM df.instances ORDER BY created_at DESC LIMIT $1", vec![(limit_count as i64).into()], - ) + ), }; let mut instances = Vec::new(); if let Ok(table) = client.select(sql, None, &args) { diff --git a/tests/e2e/sql/05_monitoring_and_explain.sql b/tests/e2e/sql/05_monitoring_and_explain.sql index 4996ae43..b21141bb 100644 --- a/tests/e2e/sql/05_monitoring_and_explain.sql +++ b/tests/e2e/sql/05_monitoring_and_explain.sql @@ -1,6 +1,6 @@ -- Merged from: 09_monitoring, 10_explain, 31_explain_plain_sql -- Tests: list_instances, instance_info, status, result, df.explain() on live and dry-run, --- df.explain() on plain SQL auto-wrap +-- df.explain() on plain SQL auto-wrap, list_instances with filter_label SET SESSION AUTHORIZATION df_e2e_user; -- === Test: 09_monitoring === @@ -31,6 +31,36 @@ BEGIN IF NOT found THEN RAISE EXCEPTION 'TEST FAILED: instance not found in list_instances()'; END IF; + + -- Test list_instances with filter_label (matching label) + SELECT EXISTS ( + SELECT 1 FROM df.list_instances(filter_label => 'test-monitoring-label') + WHERE list_instances.instance_id = inst_id + ) INTO found; + + IF NOT found THEN + RAISE EXCEPTION 'TEST FAILED: instance not found in list_instances(filter_label => ''test-monitoring-label'')'; + END IF; + + -- Test list_instances with filter_label (non-matching label returns nothing) + SELECT EXISTS ( + SELECT 1 FROM df.list_instances(filter_label => 'no-such-label') + WHERE list_instances.instance_id = inst_id + ) INTO found; + + IF found THEN + RAISE EXCEPTION 'TEST FAILED: instance should not appear in list_instances with non-matching filter_label'; + END IF; + + -- Test list_instances with both status_filter and filter_label + SELECT EXISTS ( + SELECT 1 FROM df.list_instances('completed', filter_label => 'test-monitoring-label') + WHERE list_instances.instance_id = inst_id + ) INTO found; + + IF NOT found THEN + RAISE EXCEPTION 'TEST FAILED: instance not found in list_instances with status_filter and filter_label'; + END IF; -- Test instance_info SELECT i.status INTO info_status FROM df.instance_info(inst_id) i;