Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 51 additions & 13 deletions src/monitoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ pub fn metrics() -> TableIterator<
),
> {
let pg_conn_str = postgres_connection_string();
let provider_schema = backend_duroxide_schema();

let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand All @@ -315,21 +314,60 @@ pub fn metrics() -> TableIterator<
};

let results = rt.block_on(async {
let store = match new_backend_provider(&pg_conn_str, provider_schema).await {
Ok(s) => s,
let pool = match sqlx::postgres::PgPoolOptions::new()
.max_connections(1)
.connect(&pg_conn_str)
.await
{
Ok(pool) => pool,
Err(_) => return vec![],
};

let client = Client::new(store);

match client.get_system_metrics().await {
Ok(m) => vec![(
m.total_instances as i64,
m.running_instances as i64,
m.completed_instances as i64,
m.failed_instances as i64,
m.total_executions as i64,
m.total_events as i64,
let row: Result<(i64, i64, i64, i64, i64, i64), sqlx::Error> = sqlx::query_as(
r#"
SELECT
i.total_instances,
i.running_instances,
i.completed_instances,
i.failed_instances,
e.total_executions,
h.total_events
FROM (
SELECT
COUNT(*)::BIGINT AS total_instances,
COUNT(*) FILTER (WHERE status = 'running')::BIGINT AS running_instances,
COUNT(*) FILTER (WHERE status = 'completed')::BIGINT AS completed_instances,
COUNT(*) FILTER (WHERE status = 'failed')::BIGINT AS failed_instances
FROM df.instances
) i
CROSS JOIN (
SELECT COUNT(*)::BIGINT AS total_executions
FROM duroxide.executions
) e
CROSS JOIN (
SELECT COUNT(*)::BIGINT AS total_events
FROM duroxide.history
) h
"#,
)
.fetch_one(&pool)
.await;

match row {
Ok((
total_instances,
running_instances,
completed_instances,
failed_instances,
total_executions,
total_events,
)) => vec![(
total_instances,
running_instances,
completed_instances,
failed_instances,
total_executions,
total_events,
)],
Err(_) => vec![],
}
Expand Down
74 changes: 74 additions & 0 deletions tests/e2e/sql/05_monitoring_and_explain.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,80 @@ END $$;

DROP TABLE _test_state;

-- Regression: rolled-back df.start() should not inflate failed_instances in df.metrics()
DO $$
DECLARE
total_metrics BIGINT;
running_metrics BIGINT;
completed_metrics BIGINT;
failed_metrics BIGINT;
previous_failed_metrics BIGINT := -1;
stable_checks INT := 0;
attempts INT := 0;
total_instances BIGINT;
running_instances BIGINT;
completed_instances BIGINT;
failed_instances BIGINT;
BEGIN
BEGIN
PERFORM df.start('SELECT 1', 'rollback-metrics-probe');
RAISE EXCEPTION 'force rollback';
EXCEPTION
WHEN OTHERS THEN NULL;
END;

-- Worker waits up to 5s for an instance row after dequeue. Poll until
-- failed_instances stabilizes for 3 checks after at least ~6s.
LOOP
SELECT m.failed_instances INTO failed_metrics FROM df.metrics() m;

IF failed_metrics = previous_failed_metrics THEN
stable_checks := stable_checks + 1;
ELSE
stable_checks := 0;
previous_failed_metrics := failed_metrics;
END IF;

EXIT WHEN (attempts >= 12 AND stable_checks >= 3) OR attempts >= 60;
PERFORM pg_sleep(0.5);
attempts := attempts + 1;
END LOOP;

SELECT m.total_instances, m.running_instances, m.completed_instances, m.failed_instances
INTO total_metrics, running_metrics, completed_metrics, failed_metrics
FROM df.metrics() m;

SELECT
COUNT(*)::BIGINT,
COUNT(*) FILTER (WHERE lower(status) = 'running')::BIGINT,
COUNT(*) FILTER (WHERE lower(status) = 'completed')::BIGINT,
COUNT(*) FILTER (WHERE lower(status) = 'failed')::BIGINT
INTO total_instances, running_instances, completed_instances, failed_instances
FROM df.instances;

IF total_metrics != total_instances THEN
RAISE EXCEPTION 'TEST FAILED: metrics total_instances=% does not match df.instances=%',
total_metrics, total_instances;
END IF;

IF running_metrics != running_instances THEN
RAISE EXCEPTION 'TEST FAILED: metrics running_instances=% does not match df.instances=%',
running_metrics, running_instances;
END IF;

IF completed_metrics != completed_instances THEN
RAISE EXCEPTION 'TEST FAILED: metrics completed_instances=% does not match df.instances=%',
completed_metrics, completed_instances;
END IF;

IF failed_metrics != failed_instances THEN
RAISE EXCEPTION 'TEST FAILED: metrics failed_instances=% does not match df.instances=%',
failed_metrics, failed_instances;
END IF;

RAISE NOTICE 'TEST PASSED: rollback metrics consistency';
END $$;

-- === Test: 10_explain ===

-- Test dry-run explain (use $body$ to avoid conflict with inner $$)
Expand Down
Loading