Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pg_durable"
version = "0.2.2"
version = "0.2.3"
edition = "2021"
license = "MIT"
repository = "https://github.com/Azure/pg_durable"
Expand Down
10 changes: 8 additions & 2 deletions USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ SELECT df.start(
SELECT df.cancel('a1b2c3d4', 'Manual stop');

-- Find by label first, then cancel
SELECT instance_id FROM df.list_instances() WHERE label = 'every-minute-tick';
SELECT instance_id FROM df.list_instances(filter_label => 'every-minute-tick');
-- Then cancel with the found ID
SELECT df.cancel('found_id', 'Stopping cron job');
```
Expand Down Expand Up @@ -1389,6 +1389,12 @@ SELECT * FROM df.list_instances('Running');
SELECT * FROM df.list_instances('Completed');
SELECT * FROM df.list_instances('Failed');

-- Filter by label
SELECT * FROM df.list_instances(filter_label => 'my-job');

-- Filter by status and label
SELECT * FROM df.list_instances('Completed', filter_label => 'my-job');

-- With limit
SELECT * FROM df.list_instances(NULL, 10);
```
Expand Down Expand Up @@ -1625,7 +1631,7 @@ GRANT EXECUTE ON FUNCTION df.result(text) TO app_role;
GRANT EXECUTE ON FUNCTION df.cancel(text, text) TO app_role;
GRANT EXECUTE ON FUNCTION df.wait_for_completion(text, integer) TO app_role;
GRANT EXECUTE ON FUNCTION df.run(text) TO app_role;
GRANT EXECUTE ON FUNCTION df.list_instances(text, integer) TO app_role;
GRANT EXECUTE ON FUNCTION df.list_instances(text, integer, text) TO app_role;
GRANT EXECUTE ON FUNCTION df.instance_info(text) TO app_role;
GRANT EXECUTE ON FUNCTION df.instance_nodes(text, integer) TO app_role;
GRANT EXECUTE ON FUNCTION df.instance_executions(text, integer) TO app_role;
Expand Down
3 changes: 2 additions & 1 deletion sql/pg_durable--0.1.1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3444,7 +3444,8 @@ SET LOCAL search_path TO @extschema@;
-- pg_durable::monitoring::list_instances
CREATE FUNCTION df."list_instances"(
"status_filter" TEXT DEFAULT NULL, /* core::option::Option<&str> */
"limit_count" INT DEFAULT 100 /* i32 */
"limit_count" INT DEFAULT 100, /* i32 */
"filter_label" TEXT DEFAULT NULL /* core::option::Option<&str> */
) RETURNS TABLE (
"instance_id" TEXT, /* alloc::string::String */
"label" TEXT, /* core::option::Option<alloc::string::String> */
Expand Down
22 changes: 22 additions & 0 deletions sql/pg_durable--0.2.2--0.2.3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- pg_durable upgrade: 0.2.2 → 0.2.3
--
-- Add filter_label parameter to df.list_instances().
-- See issue: Add a filter_label parameter to list_instances function

-- Drop the old signature so we can replace it with the new one.
DROP FUNCTION IF EXISTS df.list_instances(TEXT, INT);

CREATE FUNCTION df."list_instances"(
"status_filter" TEXT DEFAULT NULL,
"limit_count" INT DEFAULT 100,
"filter_label" TEXT DEFAULT NULL
) RETURNS TABLE (
"instance_id" TEXT,
"label" TEXT,
"function_name" TEXT,
"status" TEXT,
"execution_count" bigint,
"output" TEXT
)
LANGUAGE c
AS 'MODULE_PATHNAME', 'list_instances_wrapper';
22 changes: 15 additions & 7 deletions src/monitoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ use crate::types::{new_backend_provider, postgres_connection_string};
// Monitoring Functions
// ============================================================================

/// List all durable function instances, optionally filtered by status.
/// List all durable function instances, optionally filtered by status and/or label.
#[pg_extern(schema = "df")]
pub fn list_instances(
status_filter: default!(Option<&str>, "NULL"),
limit_count: default!(i32, "100"),
filter_label: default!(Option<&str>, "NULL"),
) -> TableIterator<
'static,
(
Expand All @@ -42,16 +43,23 @@ pub fn list_instances(
let user_instances: Vec<(String, Option<String>, String)> = Spi::connect(|client| {
use pgrx::datum::DatumWithOid;

let (sql, args): (&str, Vec<DatumWithOid>) = if let Some(status) = status_filter {
(
let (sql, args): (&str, Vec<DatumWithOid>) = match (status_filter, filter_label) {
(Some(status), Some(label)) => (
"SELECT id, label, status FROM df.instances WHERE status = $1 AND label = $2 ORDER BY created_at DESC LIMIT $3",
vec![status.into(), label.into(), (limit_count as i64).into()],
),
(Some(status), None) => (
"SELECT id, label, status FROM df.instances WHERE status = $1 ORDER BY created_at DESC LIMIT $2",
vec![status.into(), (limit_count as i64).into()],
)
} else {
(
),
(None, Some(label)) => (
"SELECT id, label, status FROM df.instances WHERE label = $1 ORDER BY created_at DESC LIMIT $2",
vec![label.into(), (limit_count as i64).into()],
),
(None, None) => (
"SELECT id, label, status FROM df.instances ORDER BY created_at DESC LIMIT $1",
vec![(limit_count as i64).into()],
)
),
};
let mut instances = Vec::new();
if let Ok(table) = client.select(sql, None, &args) {
Expand Down
32 changes: 31 additions & 1 deletion tests/e2e/sql/05_monitoring_and_explain.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-- Merged from: 09_monitoring, 10_explain, 31_explain_plain_sql
-- Tests: list_instances, instance_info, status, result, df.explain() on live and dry-run,
-- df.explain() on plain SQL auto-wrap
-- df.explain() on plain SQL auto-wrap, list_instances with filter_label
SET SESSION AUTHORIZATION df_e2e_user;

-- === Test: 09_monitoring ===
Expand Down Expand Up @@ -31,6 +31,36 @@ BEGIN
IF NOT found THEN
RAISE EXCEPTION 'TEST FAILED: instance not found in list_instances()';
END IF;

-- Test list_instances with filter_label (matching label)
SELECT EXISTS (
SELECT 1 FROM df.list_instances(filter_label => 'test-monitoring-label')
WHERE list_instances.instance_id = inst_id
) INTO found;

IF NOT found THEN
RAISE EXCEPTION 'TEST FAILED: instance not found in list_instances(filter_label => ''test-monitoring-label'')';
END IF;

-- Test list_instances with filter_label (non-matching label returns nothing)
SELECT EXISTS (
SELECT 1 FROM df.list_instances(filter_label => 'no-such-label')
WHERE list_instances.instance_id = inst_id
) INTO found;

IF found THEN
RAISE EXCEPTION 'TEST FAILED: instance should not appear in list_instances with non-matching filter_label';
END IF;

-- Test list_instances with both status_filter and filter_label
SELECT EXISTS (
SELECT 1 FROM df.list_instances('completed', filter_label => 'test-monitoring-label')
WHERE list_instances.instance_id = inst_id
) INTO found;

IF NOT found THEN
RAISE EXCEPTION 'TEST FAILED: instance not found in list_instances with status_filter and filter_label';
END IF;

-- Test instance_info
SELECT i.status INTO info_status FROM df.instance_info(inst_id) i;
Expand Down
Loading