From 82f74996efb3ca7316310c57fdc8afb9c6db77b8 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Wed, 29 Apr 2026 15:48:48 -0400 Subject: [PATCH 1/3] fix: Add a received->pushed latency metric --- src/push/mod.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/push/mod.rs b/src/push/mod.rs index 39a98be9..6e6714a8 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -1,3 +1,4 @@ +use chrono::Utc; use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -187,7 +188,7 @@ impl PushPool { match push_task( worker, - activation, + activation.clone(), callback_url, timeout, grpc_shared_secret.as_slice(), @@ -198,6 +199,17 @@ impl PushPool { metrics::counter!("push.delivery", "result" => "ok").increment(1); debug!(task_id = %id, "Activation sent to worker"); + let act = activation.clone(); + let received_to_push_latency = act.received_latency(Utc::now()); + if received_to_push_latency > 0 { + metrics::histogram!( + "push.received_to_push.latency", + "namespace" => act.namespace, + "taskname" => act.taskname, + ) + .record(received_to_push_latency as f64); + } + if let Err(e) = store.mark_activation_processing(&id).await { metrics::counter!("push.mark_activation_processing", "result" => "error").increment(1); From a94777f41853c5796bab8f56217547bfc4b15815 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Wed, 29 Apr 2026 15:52:17 -0400 Subject: [PATCH 2/3] fix --- src/push/mod.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/push/mod.rs b/src/push/mod.rs index 6e6714a8..a710234e 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -199,15 +199,17 @@ impl PushPool { metrics::counter!("push.delivery", "result" => "ok").increment(1); debug!(task_id = %id, "Activation sent to worker"); - let act = activation.clone(); - let received_to_push_latency = act.received_latency(Utc::now()); - if received_to_push_latency > 0 { - metrics::histogram!( - "push.received_to_push.latency", - "namespace" => act.namespace, - "taskname" => act.taskname, - ) - .record(received_to_push_latency as f64); + if activation.processing_attempts < 1 { + let act = activation.clone(); + let received_to_push_latency = act.received_latency(Utc::now()); + if received_to_push_latency > 0 { + metrics::histogram!( + "push.received_to_push.latency", + "namespace" => act.namespace, + "taskname" => act.taskname, + ) + .record(received_to_push_latency as f64); + } } if let Err(e) = store.mark_activation_processing(&id).await { From 8172e16441b589e36c7b44fb564f8ea573bd25da Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Thu, 30 Apr 2026 13:04:34 -0400 Subject: [PATCH 3/3] fix" ; --- src/push/mod.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/push/mod.rs b/src/push/mod.rs index a710234e..5696ce77 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -200,13 +200,12 @@ impl PushPool { debug!(task_id = %id, "Activation sent to worker"); if activation.processing_attempts < 1 { - let act = activation.clone(); - let received_to_push_latency = act.received_latency(Utc::now()); + let received_to_push_latency = activation.received_latency(Utc::now()); if received_to_push_latency > 0 { metrics::histogram!( "push.received_to_push.latency", - "namespace" => act.namespace, - "taskname" => act.taskname, + "namespace" => activation.namespace, + "taskname" => activation.taskname, ) .record(received_to_push_latency as f64); }