From 21d88f584f68f4c174471f838c96f60aa188c8ab Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Thu, 30 Apr 2026 23:37:31 -0700 Subject: [PATCH 1/4] feat(rust_streams): add PipelineStats and instrument Batch/Filter Introduce pipeline_stats.rs mirroring Python PipelineStats: buffered per-step exec, error, and max timing, flushed every 10s via the metrics crate (streams.pipeline.*) with a step label. No Arroyo dependency for recording. Plumb step_name through RuntimeOperator Batch/Filter. BatchStep records step_exec per row and step_timing on flush; Filter records around the Python predicate. Remove duplicate Python metrics from PredicateFilter. Rust tests cover aggregation and throttling; use scripts/rust-envvars with cargo test per Makefile. Made-with: Cursor --- .../adapters/arroyo/rust_arroyo.py | 17 +- .../sentry_streams/rust_streams.pyi | 8 +- sentry_streams/src/batch_step.rs | 22 +- sentry_streams/src/filter_step.rs | 28 +- sentry_streams/src/lib.rs | 1 + sentry_streams/src/operators.rs | 24 +- sentry_streams/src/pipeline_stats.rs | 334 ++++++++++++++++++ sentry_streams/src/transformer.rs | 3 +- 8 files changed, 413 insertions(+), 24 deletions(-) create mode 100644 sentry_streams/src/pipeline_stats.rs diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index e8d674c2..c56a509a 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -466,19 +466,11 @@ def filter(self, step: Filter[Any], stream: Route) -> Route: elif isinstance(step, PredicateFilter): def filter_msg(msg: Message[Any]) -> bool: - input_metrics(step.name) - has_error = None - start_time = time.time() - try: - result = step.resolved_function(msg) - return result - except Exception as e: - has_error = str(e.__class__.__name__) - raise e - finally: - output_metrics(step.name, has_error, start_time) + return step.resolved_function(msg) - self.__consumers[stream.source].add_step(RuntimeOperator.Filter(route, filter_msg)) + self.__consumers[stream.source].add_step( + RuntimeOperator.Filter(route, filter_msg, step.name) + ) return stream else: @@ -514,6 +506,7 @@ def reduce( route=route, max_batch_size=step.batch_size, max_batch_time_ms=max_batch_time_ms, + step_name=step.name, ) ) return stream diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index 1954d9bf..03c97de2 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -89,7 +89,12 @@ class RuntimeOperator: @classmethod def Map(cls, route: Route, function: Callable[[Message[Any]], Any]) -> Self: ... @classmethod - def Filter(cls, route: Route, function: Callable[[Message[Any]], bool]) -> Self: ... + def Filter( + cls, + route: Route, + function: Callable[[Message[Any]], bool], + step_name: str, + ) -> Self: ... @classmethod def HeaderFilter(cls, route: Route, header_name: str, expected_value: int) -> Self: ... @classmethod @@ -128,6 +133,7 @@ class RuntimeOperator: route: Route, max_batch_size: int | None = None, max_batch_time_ms: float | None = None, + step_name: str, ) -> Self: ... @classmethod def PythonAdapter(cls, route: Route, delegate_Factory: RustOperatorFactory) -> Self: ... diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index d9089c0f..121a2f7d 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -6,6 +6,7 @@ //! //! The GIL is taken only to build the list on flush (after [`Message::into_payload`] in submit). use crate::messages::{into_pyany, PyAnyMessage, PyStreamingMessage, RoutedValuePayload}; +use crate::pipeline_stats::get_stats; use crate::routes::{Route, RoutedValue}; use crate::time_helpers::current_epoch; use crate::utils::traced_with_gil; @@ -18,7 +19,7 @@ use sentry_arroyo::processing::strategies::{ use sentry_arroyo::types::{Message, Partition}; use sentry_arroyo::utils::timing::Deadline; use std::collections::{BTreeMap, VecDeque}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; fn first_element_schema(py: Python<'_>, first: &PyStreamingMessage) -> Option { match first { @@ -165,6 +166,7 @@ pub struct BatchStep { next_step: Box>, route: Route, + step_name: String, max_batch_size: Option, max_batch_time: Option, /// `None` until the first streaming message in a window. @@ -193,11 +195,13 @@ impl BatchStep { route: Route, max_batch_size: Option, max_batch_time: Option, + step_name: String, next_step: Box>, ) -> Self { Self { next_step, route, + step_name, max_batch_size, max_batch_time, batch: None, @@ -254,7 +258,12 @@ impl BatchStep { // We create a synthetic watermark to avoid waiting for the next batch to complete before // allowing the consumer to commit. let committable_for_synthetic = b.current_offsets_snapshot(); + let flush_start = Instant::now(); let batch_msg = b.flush()?; + get_stats().step_timing( + &self.step_name, + flush_start.elapsed().as_secs_f64(), + ); self.batch = None; let wm_after_batch: Vec<_> = std::mem::take(&mut self.watermark_buffer); @@ -302,12 +311,14 @@ pub fn build_batch_step( route: &Route, max_batch_size: Option, max_batch_time: Option, + step_name: String, next: Box>, ) -> Box> { Box::new(BatchStep::new( route.clone(), max_batch_size, max_batch_time, + step_name, next, )) } @@ -369,6 +380,7 @@ impl ProcessingStrategy for BatchStep { .expect("open batch") .append(committable, pysm); } + get_stats().step_exec(&self.step_name); Ok(()) } } @@ -604,7 +616,13 @@ mod tests { let sub = Arc::new(Mutex::new(Vec::new())); let wms = Arc::new(Mutex::new(Vec::new())); let next = FakeStrategy::new(sub.clone(), wms.clone(), false); - let step = BatchStep::new(route, max_n, max_t, Box::new(next)); + let step = BatchStep::new( + route, + max_n, + max_t, + "test_batch".to_string(), + Box::new(next), + ); (step, sub, wms) } diff --git a/sentry_streams/src/filter_step.rs b/sentry_streams/src/filter_step.rs index 02a925d8..a0bf5598 100644 --- a/sentry_streams/src/filter_step.rs +++ b/sentry_streams/src/filter_step.rs @@ -1,5 +1,6 @@ use crate::callers::{try_apply_py, ApplyError}; use crate::messages::RoutedValuePayload; +use crate::pipeline_stats::get_stats; use crate::routes::{Route, RoutedValue}; use crate::utils::traced_with_gil; use pyo3::{Py, PyAny}; @@ -7,12 +8,13 @@ use sentry_arroyo::processing::strategies::{ CommitRequest, ProcessingStrategy, StrategyError, SubmitError, }; use sentry_arroyo::types::{InnerMessage, Message}; -use std::time::Duration; +use std::time::{Duration, Instant}; pub struct Filter { pub callable: Py, pub next_step: Box>, pub route: Route, + pub step_name: String, } impl Filter { @@ -26,11 +28,13 @@ impl Filter { callable: Py, next_step: Box>, route: Route, + step_name: String, ) -> Self { Self { callable, next_step, route, + step_name, } } } @@ -52,6 +56,9 @@ impl ProcessingStrategy for Filter { unreachable!("Watermark message trying to be passed to filter function.") }; + let stats = get_stats(); + stats.step_exec(&self.step_name); + let start = Instant::now(); let res = traced_with_gil!(|py| { try_apply_py( py, @@ -60,13 +67,24 @@ impl ProcessingStrategy for Filter { ) .and_then(|py_res| py_res.is_truthy(py).map_err(|_| ApplyError::ApplyFailed)) }); + let elapsed = start.elapsed().as_secs_f64(); match (res, &message.inner_message) { - (Ok(true), _) => self.next_step.submit(message), - (Ok(false), _) => Ok(()), + (Ok(true), _) => { + stats.step_timing(&self.step_name, elapsed); + self.next_step.submit(message) + } + (Ok(false), _) => { + stats.step_timing(&self.step_name, elapsed); + Ok(()) + } (Err(ApplyError::ApplyFailed), _) => panic!("Python filter function raised exception that is not sentry_streams.pipeline.exception.InvalidMessageError"), (Err(ApplyError::InvalidMessage), InnerMessage::AnyMessage(..)) => panic!("Got exception while processing AnyMessage, Arroyo cannot handle error on AnyMessage"), - (Err(ApplyError::InvalidMessage), InnerMessage::BrokerMessage(broker_message)) => Err(SubmitError::InvalidMessage(broker_message.into())), + (Err(ApplyError::InvalidMessage), InnerMessage::BrokerMessage(broker_message)) => { + stats.step_error(&self.step_name); + stats.step_timing(&self.step_name, elapsed); + Err(SubmitError::InvalidMessage(broker_message.into())) + } } } @@ -118,6 +136,7 @@ mod tests { build_filter( &Route::new("source1".to_string(), vec!["waypoint1".to_string()]), callable, + "test_step".to_string(), Box::new(next_step), ) }) @@ -222,6 +241,7 @@ mod tests { let mut strategy = build_filter( &Route::new("source1".to_string(), vec!["waypoint1".to_string()]), callable, + "test_step".to_string(), Box::new(next_step), ); diff --git a/sentry_streams/src/lib.rs b/sentry_streams/src/lib.rs index c5c6e56a..6615161a 100644 --- a/sentry_streams/src/lib.rs +++ b/sentry_streams/src/lib.rs @@ -13,6 +13,7 @@ mod kafka_config; mod messages; mod metrics; mod metrics_config; +mod pipeline_stats; mod mocks; mod operators; mod python_operator; diff --git a/sentry_streams/src/operators.rs b/sentry_streams/src/operators.rs index 78113d99..692f9d88 100644 --- a/sentry_streams/src/operators.rs +++ b/sentry_streams/src/operators.rs @@ -40,7 +40,11 @@ pub enum RuntimeOperator { /// This translates to a custom Arroyo strategy (Filter step) where a function /// is provided to transform the message payload into a bool. #[pyo3(name = "Filter")] - Filter { route: Route, function: Py }, + Filter { + route: Route, + function: Py, + step_name: String, + }, /// Filter by integer equality on a message header (Rust-only, no Python predicate). #[pyo3(name = "HeaderFilter")] @@ -102,6 +106,7 @@ pub enum RuntimeOperator { max_batch_size: Option, /// Wall-clock duration in milliseconds; `None` means no time limit (size-only batch). max_batch_time_ms: Option, + step_name: String, }, /// Delegates messages processing to a Python operator that provides /// the same kind of interface as an Arroyo strategy. This is meant @@ -126,11 +131,15 @@ pub fn build( let func_ref = traced_with_gil!(|py| function.clone_ref(py)); build_map(route, func_ref, next) } - RuntimeOperator::Filter { function, route } => { + RuntimeOperator::Filter { + function, + route, + step_name, + } => { // All functions (Python and Rust) are called the same way now // Rust functions automatically release the GIL internally let func_ref = traced_with_gil!(|py| function.clone_ref(py)); - build_filter(route, func_ref, next) + build_filter(route, func_ref, step_name.clone(), next) } RuntimeOperator::HeaderFilter { route, @@ -200,9 +209,16 @@ pub fn build( route, max_batch_size, max_batch_time_ms, + step_name, } => { let max_t = max_batch_time_ms.map(|ms| Duration::from_secs_f64((ms / 1000.0).max(0.0))); - build_batch_step(route, *max_batch_size, max_t, next) + build_batch_step( + route, + *max_batch_size, + max_t, + step_name.clone(), + next, + ) } RuntimeOperator::PythonAdapter { route, diff --git a/sentry_streams/src/pipeline_stats.rs b/sentry_streams/src/pipeline_stats.rs new file mode 100644 index 00000000..b7560cd4 --- /dev/null +++ b/sentry_streams/src/pipeline_stats.rs @@ -0,0 +1,334 @@ +//! Buffered pipeline step metrics (same semantics as `sentry_streams.metrics.stats`). +//! Records via the `metrics` crate only — no Arroyo types. + +use std::collections::HashMap; +use std::sync::{Mutex, OnceLock}; +use std::time::{Duration, Instant}; + +const METRIC_INPUT_MESSAGES: &str = "streams.pipeline.input.messages"; +const METRIC_ERRORS: &str = "streams.pipeline.errors"; +const METRIC_DURATION: &str = "streams.pipeline.duration"; + +struct Inner { + exec_buffer: HashMap, + error_buffer: HashMap, + timing_buffer: HashMap, + last_flush: Instant, +} + +impl Inner { + fn new() -> Self { + Self { + exec_buffer: HashMap::new(), + error_buffer: HashMap::new(), + timing_buffer: HashMap::new(), + last_flush: Instant::now(), + } + } +} + +/// Aggregates per-step exec / error / timing and flushes periodically to DogStatsD via `metrics`. +pub struct PipelineStats { + inner: Mutex, + flush_interval: Duration, +} + +impl PipelineStats { + fn with_flush_interval(flush_interval: Duration) -> Self { + Self { + inner: Mutex::new(Inner::new()), + flush_interval, + } + } + + /// Production flush interval (10 seconds), matching Python `FLUSH_TIME`. + pub fn new() -> Self { + Self::with_flush_interval(Duration::from_secs(10)) + } + + fn flush_emit_locked(inner: &mut Inner) { + for (step, value) in inner.exec_buffer.drain() { + let labels = vec![("step".to_string(), step)]; + metrics::counter!(METRIC_INPUT_MESSAGES, &labels).increment(value); + } + for (step, value) in inner.error_buffer.drain() { + let labels = vec![("step".to_string(), step)]; + metrics::counter!(METRIC_ERRORS, &labels).increment(value); + } + for (step, fvalue) in inner.timing_buffer.drain() { + let labels = vec![("step".to_string(), step)]; + metrics::histogram!(METRIC_DURATION, &labels).record(fvalue); + } + inner.last_flush = Instant::now(); + } + + fn maybe_flush(&self) { + let mut inner = self.inner.lock().unwrap(); + if inner.last_flush.elapsed() >= self.flush_interval { + Self::flush_emit_locked(&mut inner); + } + } + + pub fn step_exec(&self, step: &str) { + let mut inner = self.inner.lock().unwrap(); + *inner.exec_buffer.entry(step.to_string()).or_insert(0) += 1; + drop(inner); + self.maybe_flush(); + } + + pub fn step_error(&self, step: &str) { + let mut inner = self.inner.lock().unwrap(); + *inner.error_buffer.entry(step.to_string()).or_insert(0) += 1; + drop(inner); + self.maybe_flush(); + } + + pub fn step_timing(&self, step: &str, value_secs: f64) { + let mut inner = self.inner.lock().unwrap(); + let entry = inner.timing_buffer.entry(step.to_string()).or_insert(0.0); + if *entry < value_secs { + *entry = value_secs; + } + drop(inner); + self.maybe_flush(); + } + + #[cfg(test)] + fn with_flush_interval_for_test(flush_interval: Duration) -> Self { + Self::with_flush_interval(flush_interval) + } + + /// Emit and clear buffers ignoring the throttle (for unit tests). + #[cfg(test)] + pub(crate) fn flush_emit_for_test(&self) { + let mut inner = self.inner.lock().unwrap(); + Self::flush_emit_locked(&mut inner); + } + + #[cfg(test)] + pub(crate) fn exec_count(&self, step: &str) -> u64 { + self.inner + .lock() + .unwrap() + .exec_buffer + .get(step) + .copied() + .unwrap_or(0) + } + +} + +static GLOBAL_STATS: OnceLock = OnceLock::new(); + +pub fn get_stats() -> &'static PipelineStats { + GLOBAL_STATS.get_or_init(PipelineStats::new) +} + +#[cfg(test)] +mod tests { + use super::*; + use metrics::{Key, KeyName, Metadata, Recorder, SharedString, Unit}; + use std::sync::Arc; + + #[derive(Default)] + struct CaptureRecorder { + counters: Arc>>, + histograms: Arc>>, + } + + impl Recorder for CaptureRecorder { + fn describe_counter(&self, _key: KeyName, _unit: Option, _description: SharedString) {} + + fn describe_gauge(&self, _key: KeyName, _unit: Option, _description: SharedString) {} + + fn describe_histogram(&self, _key: KeyName, _unit: Option, _description: SharedString) {} + + fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> metrics::Counter { + metrics::Counter::from_arc(Arc::new(CaptureCounter { + key: key.clone(), + counters: Arc::clone(&self.counters), + })) + } + + fn register_gauge(&self, _key: &Key, _metadata: &Metadata<'_>) -> metrics::Gauge { + metrics::Gauge::noop() + } + + fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> metrics::Histogram { + metrics::Histogram::from_arc(Arc::new(CaptureHistogram { + key: key.clone(), + histograms: Arc::clone(&self.histograms), + })) + } + } + + struct CaptureCounter { + key: Key, + counters: Arc>>, + } + + impl metrics::CounterFn for CaptureCounter { + fn increment(&self, value: u64) { + self.counters.lock().unwrap().push((self.key.clone(), value)); + } + + fn absolute(&self, _value: u64) {} + } + + struct CaptureHistogram { + key: Key, + histograms: Arc>>, + } + + impl metrics::HistogramFn for CaptureHistogram { + fn record(&self, value: f64) { + self.histograms.lock().unwrap().push((self.key.clone(), value)); + } + } + + fn key_name(key: &Key) -> String { + key.name().to_string() + } + + fn labels_include_step(key: &Key, step: &str) -> bool { + key.labels().any(|l| l.key() == "step" && l.value() == step) + } + + #[test] + fn flush_emits_aggregated_counters_and_max_timing() { + let counters = Arc::new(Mutex::new(Vec::new())); + let histograms = Arc::new(Mutex::new(Vec::new())); + let rec = Arc::new(CaptureRecorder { + counters: Arc::clone(&counters), + histograms: Arc::clone(&histograms), + }); + let stats = PipelineStats::with_flush_interval_for_test(Duration::from_secs(3600)); + stats.step_exec("in_step"); + stats.step_exec("in_step"); + stats.step_error("err_step"); + stats.step_timing("timer_step", 0.1); + stats.step_timing("timer_step", 0.05); + let _guard = metrics::set_default_local_recorder(rec.as_ref()); + stats.flush_emit_for_test(); + + let c = counters.lock().unwrap(); + let input: u64 = c + .iter() + .filter(|(k, _)| key_name(k) == METRIC_INPUT_MESSAGES && labels_include_step(k, "in_step")) + .map(|(_, v)| *v) + .sum(); + assert_eq!(input, 2); + + let errs: u64 = c + .iter() + .filter(|(k, _)| key_name(k) == METRIC_ERRORS && labels_include_step(k, "err_step")) + .map(|(_, v)| *v) + .sum(); + assert_eq!(errs, 1); + + let h = histograms.lock().unwrap(); + let dur: Vec = h + .iter() + .filter(|(k, _)| key_name(k) == METRIC_DURATION && labels_include_step(k, "timer_step")) + .map(|(_, v)| *v) + .collect(); + assert_eq!(dur, vec![0.1]); + } + + #[test] + fn no_auto_flush_before_interval() { + let stats = PipelineStats::with_flush_interval_for_test(Duration::from_secs(3600)); + stats.step_exec("a"); + assert_eq!(stats.exec_count("a"), 1); + } + + #[test] + fn flush_clears_buffers() { + let counters = Arc::new(Mutex::new(Vec::new())); + let histograms = Arc::new(Mutex::new(Vec::new())); + let rec = Arc::new(CaptureRecorder { + counters: Arc::clone(&counters), + histograms: Arc::clone(&histograms), + }); + let stats = PipelineStats::with_flush_interval_for_test(Duration::from_secs(3600)); + stats.step_exec("s"); + let _guard = metrics::set_default_local_recorder(rec.as_ref()); + stats.flush_emit_for_test(); + assert_eq!( + counters + .lock() + .unwrap() + .iter() + .filter(|(k, _)| labels_include_step(k, "s")) + .count(), + 1 + ); + counters.lock().unwrap().clear(); + stats.step_exec("s"); + stats.flush_emit_for_test(); + let c = counters.lock().unwrap(); + let sum: u64 = c + .iter() + .filter(|(k, _)| key_name(k) == METRIC_INPUT_MESSAGES && labels_include_step(k, "s")) + .map(|(_, v)| *v) + .sum(); + assert_eq!(sum, 1); + } + + #[test] + fn multiple_steps_one_flush() { + let counters = Arc::new(Mutex::new(Vec::new())); + let histograms = Arc::new(Mutex::new(Vec::new())); + let rec = Arc::new(CaptureRecorder { + counters: Arc::clone(&counters), + histograms: Arc::clone(&histograms), + }); + let stats = PipelineStats::with_flush_interval_for_test(Duration::from_secs(3600)); + stats.step_exec("step_1"); + stats.step_exec("step_1"); + stats.step_exec("step_2"); + let _guard = metrics::set_default_local_recorder(rec.as_ref()); + stats.flush_emit_for_test(); + + let c = counters.lock().unwrap(); + let n1: u64 = c + .iter() + .filter(|(k, _)| key_name(k) == METRIC_INPUT_MESSAGES && labels_include_step(k, "step_1")) + .map(|(_, v)| *v) + .sum(); + let n2: u64 = c + .iter() + .filter(|(k, _)| key_name(k) == METRIC_INPUT_MESSAGES && labels_include_step(k, "step_2")) + .map(|(_, v)| *v) + .sum(); + assert_eq!(n1, 2); + assert_eq!(n2, 1); + } + + #[test] + fn throttle_flushes_after_interval() { + let counters = Arc::new(Mutex::new(Vec::new())); + let histograms = Arc::new(Mutex::new(Vec::new())); + let rec = Arc::new(CaptureRecorder { + counters: Arc::clone(&counters), + histograms: Arc::clone(&histograms), + }); + let stats = PipelineStats::with_flush_interval_for_test(Duration::from_millis(50)); + let _guard = metrics::set_default_local_recorder(rec.as_ref()); + stats.step_exec("throttle_step"); + assert!(counters.lock().unwrap().is_empty()); + + std::thread::sleep(Duration::from_millis(120)); + stats.step_exec("throttle_step"); + + let c = counters.lock().unwrap(); + let sum: u64 = c + .iter() + .filter(|(k, _)| { + key_name(k) == METRIC_INPUT_MESSAGES && labels_include_step(k, "throttle_step") + }) + .map(|(_, v)| *v) + .sum(); + assert_eq!(sum, 2, "expected throttle flush to emit both execs"); + } +} diff --git a/sentry_streams/src/transformer.rs b/sentry_streams/src/transformer.rs index 4e500e17..931b5353 100644 --- a/sentry_streams/src/transformer.rs +++ b/sentry_streams/src/transformer.rs @@ -59,10 +59,11 @@ pub fn build_map( pub fn build_filter( route: &Route, callable: Py, + step_name: String, next: Box>, ) -> Box> { let copied_route = route.clone(); - Box::new(Filter::new(callable, next, copied_route)) + Box::new(Filter::new(callable, next, copied_route, step_name)) } #[cfg(test)] From 2ba7e735660a7fc6e9d4d16a27d9d3c3a4485d0e Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Fri, 1 May 2026 00:23:13 -0700 Subject: [PATCH 2/4] fix(ci): reorder Batch fields for stub syntax and rustfmt Put step_name before optional Batch parameters so rust_streams.pyi is valid Python (required args before defaults). Apply cargo fmt to satisfy the lint job. Made-with: Cursor --- .../adapters/arroyo/rust_arroyo.py | 2 +- .../sentry_streams/rust_streams.pyi | 2 +- sentry_streams/src/batch_step.rs | 5 +-- sentry_streams/src/lib.rs | 2 +- sentry_streams/src/operators.rs | 12 ++----- sentry_streams/src/pipeline_stats.rs | 34 ++++++++++++++----- 6 files changed, 33 insertions(+), 24 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index c56a509a..088066bf 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -504,9 +504,9 @@ def reduce( self.__consumers[stream.source].add_step( RuntimeOperator.Batch( route=route, + step_name=step.name, max_batch_size=step.batch_size, max_batch_time_ms=max_batch_time_ms, - step_name=step.name, ) ) return stream diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index 03c97de2..618213ee 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -131,9 +131,9 @@ class RuntimeOperator: def Batch( cls, route: Route, + step_name: str, max_batch_size: int | None = None, max_batch_time_ms: float | None = None, - step_name: str, ) -> Self: ... @classmethod def PythonAdapter(cls, route: Route, delegate_Factory: RustOperatorFactory) -> Self: ... diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index 121a2f7d..c2a4d3a8 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -260,10 +260,7 @@ impl BatchStep { let committable_for_synthetic = b.current_offsets_snapshot(); let flush_start = Instant::now(); let batch_msg = b.flush()?; - get_stats().step_timing( - &self.step_name, - flush_start.elapsed().as_secs_f64(), - ); + get_stats().step_timing(&self.step_name, flush_start.elapsed().as_secs_f64()); self.batch = None; let wm_after_batch: Vec<_> = std::mem::take(&mut self.watermark_buffer); diff --git a/sentry_streams/src/lib.rs b/sentry_streams/src/lib.rs index 6615161a..b8819a2b 100644 --- a/sentry_streams/src/lib.rs +++ b/sentry_streams/src/lib.rs @@ -13,9 +13,9 @@ mod kafka_config; mod messages; mod metrics; mod metrics_config; -mod pipeline_stats; mod mocks; mod operators; +mod pipeline_stats; mod python_operator; mod routers; mod routes; diff --git a/sentry_streams/src/operators.rs b/sentry_streams/src/operators.rs index 692f9d88..1b591ac3 100644 --- a/sentry_streams/src/operators.rs +++ b/sentry_streams/src/operators.rs @@ -102,11 +102,11 @@ pub enum RuntimeOperator { #[pyo3(name = "Batch")] Batch { route: Route, + step_name: String, /// `None` means no size limit (time-only window). max_batch_size: Option, /// Wall-clock duration in milliseconds; `None` means no time limit (size-only batch). max_batch_time_ms: Option, - step_name: String, }, /// Delegates messages processing to a Python operator that provides /// the same kind of interface as an Arroyo strategy. This is meant @@ -207,18 +207,12 @@ pub fn build( } RuntimeOperator::Batch { route, + step_name, max_batch_size, max_batch_time_ms, - step_name, } => { let max_t = max_batch_time_ms.map(|ms| Duration::from_secs_f64((ms / 1000.0).max(0.0))); - build_batch_step( - route, - *max_batch_size, - max_t, - step_name.clone(), - next, - ) + build_batch_step(route, *max_batch_size, max_t, step_name.clone(), next) } RuntimeOperator::PythonAdapter { route, diff --git a/sentry_streams/src/pipeline_stats.rs b/sentry_streams/src/pipeline_stats.rs index b7560cd4..d9fec5ee 100644 --- a/sentry_streams/src/pipeline_stats.rs +++ b/sentry_streams/src/pipeline_stats.rs @@ -115,7 +115,6 @@ impl PipelineStats { .copied() .unwrap_or(0) } - } static GLOBAL_STATS: OnceLock = OnceLock::new(); @@ -137,11 +136,18 @@ mod tests { } impl Recorder for CaptureRecorder { - fn describe_counter(&self, _key: KeyName, _unit: Option, _description: SharedString) {} + fn describe_counter(&self, _key: KeyName, _unit: Option, _description: SharedString) { + } fn describe_gauge(&self, _key: KeyName, _unit: Option, _description: SharedString) {} - fn describe_histogram(&self, _key: KeyName, _unit: Option, _description: SharedString) {} + fn describe_histogram( + &self, + _key: KeyName, + _unit: Option, + _description: SharedString, + ) { + } fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> metrics::Counter { metrics::Counter::from_arc(Arc::new(CaptureCounter { @@ -169,7 +175,10 @@ mod tests { impl metrics::CounterFn for CaptureCounter { fn increment(&self, value: u64) { - self.counters.lock().unwrap().push((self.key.clone(), value)); + self.counters + .lock() + .unwrap() + .push((self.key.clone(), value)); } fn absolute(&self, _value: u64) {} @@ -182,7 +191,10 @@ mod tests { impl metrics::HistogramFn for CaptureHistogram { fn record(&self, value: f64) { - self.histograms.lock().unwrap().push((self.key.clone(), value)); + self.histograms + .lock() + .unwrap() + .push((self.key.clone(), value)); } } @@ -214,7 +226,9 @@ mod tests { let c = counters.lock().unwrap(); let input: u64 = c .iter() - .filter(|(k, _)| key_name(k) == METRIC_INPUT_MESSAGES && labels_include_step(k, "in_step")) + .filter(|(k, _)| { + key_name(k) == METRIC_INPUT_MESSAGES && labels_include_step(k, "in_step") + }) .map(|(_, v)| *v) .sum(); assert_eq!(input, 2); @@ -293,12 +307,16 @@ mod tests { let c = counters.lock().unwrap(); let n1: u64 = c .iter() - .filter(|(k, _)| key_name(k) == METRIC_INPUT_MESSAGES && labels_include_step(k, "step_1")) + .filter(|(k, _)| { + key_name(k) == METRIC_INPUT_MESSAGES && labels_include_step(k, "step_1") + }) .map(|(_, v)| *v) .sum(); let n2: u64 = c .iter() - .filter(|(k, _)| key_name(k) == METRIC_INPUT_MESSAGES && labels_include_step(k, "step_2")) + .filter(|(k, _)| { + key_name(k) == METRIC_INPUT_MESSAGES && labels_include_step(k, "step_2") + }) .map(|(_, v)| *v) .sum(); assert_eq!(n1, 2); From d8b1cfa85690dcabcbf0b1c6f219a507dcfc6b34 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Fri, 1 May 2026 11:37:11 -0700 Subject: [PATCH 3/4] Add step name to the headers filter --- .../adapters/arroyo/rust_arroyo.py | 7 ++--- .../sentry_streams/rust_streams.pyi | 8 +++++- sentry_streams/src/header_filter_step.rs | 26 ++++++++++++++++--- sentry_streams/src/operators.rs | 10 ++++++- 4 files changed, 40 insertions(+), 11 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 088066bf..0d0b8647 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -457,6 +457,7 @@ def filter(self, step: Filter[Any], stream: Route) -> Route: self.__consumers[stream.source].add_step( RuntimeOperator.HeaderFilter( route=route, + step_name=step.name, header_name=step.header_name, expected_value=step.value, ) @@ -464,12 +465,8 @@ def filter(self, step: Filter[Any], stream: Route) -> Route: return stream elif isinstance(step, PredicateFilter): - - def filter_msg(msg: Message[Any]) -> bool: - return step.resolved_function(msg) - self.__consumers[stream.source].add_step( - RuntimeOperator.Filter(route, filter_msg, step.name) + RuntimeOperator.Filter(route, lambda msg: step.resolved_function(msg), step.name) ) return stream diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index 618213ee..84257e98 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -96,7 +96,13 @@ class RuntimeOperator: step_name: str, ) -> Self: ... @classmethod - def HeaderFilter(cls, route: Route, header_name: str, expected_value: int) -> Self: ... + def HeaderFilter( + cls, + route: Route, + step_name: str, + header_name: str, + expected_value: int, + ) -> Self: ... @classmethod def StreamSink( cls, route: Route, topic_name: str, kafka_config: PyKafkaProducerConfig diff --git a/sentry_streams/src/header_filter_step.rs b/sentry_streams/src/header_filter_step.rs index e3020784..ebffcde7 100644 --- a/sentry_streams/src/header_filter_step.rs +++ b/sentry_streams/src/header_filter_step.rs @@ -1,6 +1,7 @@ //! Filter messages by integer equality on a named header, without calling Python. use crate::messages::PyStreamingMessage; use crate::messages::RoutedValuePayload; +use crate::pipeline_stats::get_stats; use crate::routes::{Route, RoutedValue}; use crate::utils::traced_with_gil; use sentry_arroyo::processing::strategies::{ @@ -59,6 +60,7 @@ fn streaming_message_headers(msg: &PyStreamingMessage) -> Vec<(String, Vec)> pub struct HeaderIntEqualityFilter { header_name: String, expected: i64, + step_name: String, next_step: Box>, route: Route, } @@ -67,12 +69,14 @@ impl HeaderIntEqualityFilter { pub fn new( header_name: String, expected: i64, + step_name: String, next_step: Box>, route: Route, ) -> Self { Self { header_name, expected, + step_name, next_step, route, } @@ -85,22 +89,28 @@ impl ProcessingStrategy for HeaderIntEqualityFilter { } fn submit(&mut self, message: Message) -> Result<(), SubmitError> { - if self.route != message.payload().route { + if self.route != message.payload().route || message.payload().payload.is_watermark_msg() { return self.next_step.submit(message); } let RoutedValuePayload::PyStreamingMessage(ref py_streaming_msg) = message.payload().payload else { - // Watermarks and any other non-streaming payload: pass through (same as build_map in transformer.rs). return self.next_step.submit(message); }; + let stats = get_stats(); + stats.step_exec(&self.step_name); let headers = streaming_message_headers(py_streaming_msg); - match header_int_equality_decision(&headers, &self.header_name, self.expected) { + let decision = header_int_equality_decision(&headers, &self.header_name, self.expected); + + match decision { Ok(true) => self.next_step.submit(message), Ok(false) => Ok(()), - Err(()) => Err(invalid_message_submit_error(&message)), + Err(()) => { + stats.step_error(&self.step_name); + Err(invalid_message_submit_error(&message)) + } } } @@ -117,11 +127,13 @@ pub fn build_header_int_filter( route: &Route, header_name: String, expected: i64, + step_name: String, next: Box>, ) -> Box> { Box::new(HeaderIntEqualityFilter::new( header_name, expected, + step_name, next, route.clone(), )) @@ -217,6 +229,7 @@ mod tests { &Route::new("source1".to_string(), vec!["waypoint1".to_string()]), "pid".to_string(), 42, + "test_step".to_string(), Box::new(next_step), ); @@ -272,6 +285,7 @@ mod tests { &Route::new("source1".to_string(), vec!["waypoint1".to_string()]), "pid".to_string(), 42, + "test_step".to_string(), Box::new(next_step), ); @@ -303,6 +317,7 @@ mod tests { &Route::new("source1".to_string(), vec!["waypoint1".to_string()]), "pid".to_string(), 42, + "test_step".to_string(), Box::new(FakeStrategy::new(Arc::default(), Arc::default(), false)), ); @@ -339,6 +354,7 @@ mod tests { &Route::new("s".to_string(), vec!["w".to_string()]), "k".to_string(), -1, + "test_step".to_string(), Box::new(next_step), ); @@ -368,6 +384,7 @@ mod tests { &Route::new("source".to_string(), vec![]), "h".to_string(), 1, + "test_step".to_string(), Box::new(FakeStrategy::new(Arc::default(), submitted_wm_clone, false)), ); @@ -391,6 +408,7 @@ mod tests { &Route::new("source1".to_string(), vec!["waypoint1".to_string()]), "x".to_string(), 1, + "test_step".to_string(), Box::new(FakeStrategy::new(submitted, Arc::default(), false)), ); diff --git a/sentry_streams/src/operators.rs b/sentry_streams/src/operators.rs index 1b591ac3..a368be15 100644 --- a/sentry_streams/src/operators.rs +++ b/sentry_streams/src/operators.rs @@ -50,6 +50,7 @@ pub enum RuntimeOperator { #[pyo3(name = "HeaderFilter")] HeaderFilter { route: Route, + step_name: String, header_name: String, expected_value: i64, }, @@ -143,9 +144,16 @@ pub fn build( } RuntimeOperator::HeaderFilter { route, + step_name, header_name, expected_value, - } => build_header_int_filter(route, header_name.clone(), *expected_value, next), + } => build_header_int_filter( + route, + header_name.clone(), + *expected_value, + step_name.clone(), + next, + ), RuntimeOperator::StreamSink { route, topic_name, From 059c640bfa486202a9a2a25d12c359d223e98037 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Fri, 1 May 2026 12:04:45 -0700 Subject: [PATCH 4/4] Remove lock and move to threadlocal --- sentry_streams/src/pipeline_stats.rs | 246 ++++++++++++++++----------- 1 file changed, 144 insertions(+), 102 deletions(-) diff --git a/sentry_streams/src/pipeline_stats.rs b/sentry_streams/src/pipeline_stats.rs index d9fec5ee..719448c4 100644 --- a/sentry_streams/src/pipeline_stats.rs +++ b/sentry_streams/src/pipeline_stats.rs @@ -1,133 +1,161 @@ //! Buffered pipeline step metrics (same semantics as `sentry_streams.metrics.stats`). //! Records via the `metrics` crate only — no Arroyo types. +//! +//! State is **thread-local**: each OS thread has its own buffers. The consumer +//! `submit` path is expected to call into this from one thread per pipeline; do +//! not rely on cross-thread aggregation here. +use std::cell::RefCell; use std::collections::HashMap; -use std::sync::{Mutex, OnceLock}; use std::time::{Duration, Instant}; const METRIC_INPUT_MESSAGES: &str = "streams.pipeline.input.messages"; const METRIC_ERRORS: &str = "streams.pipeline.errors"; const METRIC_DURATION: &str = "streams.pipeline.duration"; -struct Inner { +thread_local! { + static PIPELINE_STATS: RefCell = RefCell::new(PipelineStatsState::new()); +} + +/// Per-thread pipeline stats buffers and flush timer. +struct PipelineStatsState { exec_buffer: HashMap, error_buffer: HashMap, timing_buffer: HashMap, last_flush: Instant, + flush_interval: Duration, } -impl Inner { +impl PipelineStatsState { fn new() -> Self { + Self::with_flush_interval(Duration::from_secs(10)) + } + + fn with_flush_interval(flush_interval: Duration) -> Self { Self { exec_buffer: HashMap::new(), error_buffer: HashMap::new(), timing_buffer: HashMap::new(), last_flush: Instant::now(), - } - } -} - -/// Aggregates per-step exec / error / timing and flushes periodically to DogStatsD via `metrics`. -pub struct PipelineStats { - inner: Mutex, - flush_interval: Duration, -} - -impl PipelineStats { - fn with_flush_interval(flush_interval: Duration) -> Self { - Self { - inner: Mutex::new(Inner::new()), flush_interval, } } - /// Production flush interval (10 seconds), matching Python `FLUSH_TIME`. - pub fn new() -> Self { - Self::with_flush_interval(Duration::from_secs(10)) - } - - fn flush_emit_locked(inner: &mut Inner) { - for (step, value) in inner.exec_buffer.drain() { + fn flush_emit(&mut self) { + for (step, value) in self.exec_buffer.drain() { let labels = vec![("step".to_string(), step)]; metrics::counter!(METRIC_INPUT_MESSAGES, &labels).increment(value); } - for (step, value) in inner.error_buffer.drain() { + for (step, value) in self.error_buffer.drain() { let labels = vec![("step".to_string(), step)]; metrics::counter!(METRIC_ERRORS, &labels).increment(value); } - for (step, fvalue) in inner.timing_buffer.drain() { + for (step, fvalue) in self.timing_buffer.drain() { let labels = vec![("step".to_string(), step)]; metrics::histogram!(METRIC_DURATION, &labels).record(fvalue); } - inner.last_flush = Instant::now(); + self.last_flush = Instant::now(); } - fn maybe_flush(&self) { - let mut inner = self.inner.lock().unwrap(); - if inner.last_flush.elapsed() >= self.flush_interval { - Self::flush_emit_locked(&mut inner); + fn maybe_flush(&mut self) { + if self.last_flush.elapsed() >= self.flush_interval { + self.flush_emit(); } } - pub fn step_exec(&self, step: &str) { - let mut inner = self.inner.lock().unwrap(); - *inner.exec_buffer.entry(step.to_string()).or_insert(0) += 1; - drop(inner); + fn step_exec(&mut self, step: &str) { + *self.exec_buffer.entry(step.to_string()).or_insert(0) += 1; self.maybe_flush(); } - pub fn step_error(&self, step: &str) { - let mut inner = self.inner.lock().unwrap(); - *inner.error_buffer.entry(step.to_string()).or_insert(0) += 1; - drop(inner); + fn step_error(&mut self, step: &str) { + *self.error_buffer.entry(step.to_string()).or_insert(0) += 1; self.maybe_flush(); } - pub fn step_timing(&self, step: &str, value_secs: f64) { - let mut inner = self.inner.lock().unwrap(); - let entry = inner.timing_buffer.entry(step.to_string()).or_insert(0.0); + fn step_timing(&mut self, step: &str, value_secs: f64) { + let entry = self.timing_buffer.entry(step.to_string()).or_insert(0.0); if *entry < value_secs { *entry = value_secs; } - drop(inner); self.maybe_flush(); } #[cfg(test)] - fn with_flush_interval_for_test(flush_interval: Duration) -> Self { - Self::with_flush_interval(flush_interval) + fn exec_count(&self, step: &str) -> u64 { + self.exec_buffer.get(step).copied().unwrap_or(0) + } + + #[cfg(test)] + fn flush_emit_for_test(&mut self) { + self.flush_emit(); + } +} + +/// Handle to this thread's [`PipelineStatsState`]; cheap to copy (zero-sized). +#[derive(Clone, Copy)] +pub struct PipelineStats; + +impl PipelineStats { + pub fn step_exec(self, step: &str) { + PIPELINE_STATS.with(|cell| { + let mut s = cell.borrow_mut(); + s.step_exec(step); + }); + } + + pub fn step_error(self, step: &str) { + PIPELINE_STATS.with(|cell| { + let mut s = cell.borrow_mut(); + s.step_error(step); + }); + } + + pub fn step_timing(self, step: &str, value_secs: f64) { + PIPELINE_STATS.with(|cell| { + let mut s = cell.borrow_mut(); + s.step_timing(step, value_secs); + }); } - /// Emit and clear buffers ignoring the throttle (for unit tests). #[cfg(test)] - pub(crate) fn flush_emit_for_test(&self) { - let mut inner = self.inner.lock().unwrap(); - Self::flush_emit_locked(&mut inner); + pub(crate) fn flush_emit_for_test(self) { + PIPELINE_STATS.with(|cell| { + let mut s = cell.borrow_mut(); + s.flush_emit_for_test(); + }); } #[cfg(test)] - pub(crate) fn exec_count(&self, step: &str) -> u64 { - self.inner - .lock() - .unwrap() - .exec_buffer - .get(step) - .copied() - .unwrap_or(0) + pub(crate) fn exec_count(self, step: &str) -> u64 { + PIPELINE_STATS.with(|cell| { + let s = cell.borrow(); + s.exec_count(step) + }) } } -static GLOBAL_STATS: OnceLock = OnceLock::new(); +pub fn get_stats() -> PipelineStats { + PipelineStats +} -pub fn get_stats() -> &'static PipelineStats { - GLOBAL_STATS.get_or_init(PipelineStats::new) +#[cfg(test)] +pub(crate) fn configure_pipeline_stats_for_test(flush_interval: Duration) { + PIPELINE_STATS.with(|cell| { + *cell.borrow_mut() = PipelineStatsState::with_flush_interval(flush_interval); + }); +} + +#[cfg(test)] +pub(crate) fn reset_pipeline_stats_for_test() { + configure_pipeline_stats_for_test(Duration::from_secs(10)); } #[cfg(test)] mod tests { use super::*; use metrics::{Key, KeyName, Metadata, Recorder, SharedString, Unit}; - use std::sync::Arc; + use std::sync::{Arc, Mutex}; #[derive(Default)] struct CaptureRecorder { @@ -206,24 +234,52 @@ mod tests { key.labels().any(|l| l.key() == "step" && l.value() == step) } + fn prepare_pipeline_stats_for_test(flush_interval: Duration) { + reset_pipeline_stats_for_test(); + configure_pipeline_stats_for_test(flush_interval); + } + + /// Reset pipeline stats, apply flush interval, and wire a [`CaptureRecorder`] for `metrics` locals. + struct CaptureTestContext { + counters: Arc>>, + histograms: Arc>>, + recorder: Arc, + } + + impl CaptureTestContext { + fn new(flush_interval: Duration) -> Self { + prepare_pipeline_stats_for_test(flush_interval); + let counters = Arc::new(Mutex::new(Vec::new())); + let histograms = Arc::new(Mutex::new(Vec::new())); + let recorder = Arc::new(CaptureRecorder { + counters: Arc::clone(&counters), + histograms: Arc::clone(&histograms), + }); + Self { + counters, + histograms, + recorder, + } + } + + fn install_metrics_recorder(&self) -> metrics::LocalRecorderGuard<'_> { + metrics::set_default_local_recorder(self.recorder.as_ref()) + } + } + #[test] fn flush_emits_aggregated_counters_and_max_timing() { - let counters = Arc::new(Mutex::new(Vec::new())); - let histograms = Arc::new(Mutex::new(Vec::new())); - let rec = Arc::new(CaptureRecorder { - counters: Arc::clone(&counters), - histograms: Arc::clone(&histograms), - }); - let stats = PipelineStats::with_flush_interval_for_test(Duration::from_secs(3600)); + let ctx = CaptureTestContext::new(Duration::from_secs(3600)); + let stats = get_stats(); stats.step_exec("in_step"); stats.step_exec("in_step"); stats.step_error("err_step"); stats.step_timing("timer_step", 0.1); stats.step_timing("timer_step", 0.05); - let _guard = metrics::set_default_local_recorder(rec.as_ref()); + let _guard = ctx.install_metrics_recorder(); stats.flush_emit_for_test(); - let c = counters.lock().unwrap(); + let c = ctx.counters.lock().unwrap(); let input: u64 = c .iter() .filter(|(k, _)| { @@ -240,7 +296,7 @@ mod tests { .sum(); assert_eq!(errs, 1); - let h = histograms.lock().unwrap(); + let h = ctx.histograms.lock().unwrap(); let dur: Vec = h .iter() .filter(|(k, _)| key_name(k) == METRIC_DURATION && labels_include_step(k, "timer_step")) @@ -251,25 +307,21 @@ mod tests { #[test] fn no_auto_flush_before_interval() { - let stats = PipelineStats::with_flush_interval_for_test(Duration::from_secs(3600)); + prepare_pipeline_stats_for_test(Duration::from_secs(3600)); + let stats = get_stats(); stats.step_exec("a"); assert_eq!(stats.exec_count("a"), 1); } #[test] fn flush_clears_buffers() { - let counters = Arc::new(Mutex::new(Vec::new())); - let histograms = Arc::new(Mutex::new(Vec::new())); - let rec = Arc::new(CaptureRecorder { - counters: Arc::clone(&counters), - histograms: Arc::clone(&histograms), - }); - let stats = PipelineStats::with_flush_interval_for_test(Duration::from_secs(3600)); + let ctx = CaptureTestContext::new(Duration::from_secs(3600)); + let stats = get_stats(); stats.step_exec("s"); - let _guard = metrics::set_default_local_recorder(rec.as_ref()); + let _guard = ctx.install_metrics_recorder(); stats.flush_emit_for_test(); assert_eq!( - counters + ctx.counters .lock() .unwrap() .iter() @@ -277,10 +329,10 @@ mod tests { .count(), 1 ); - counters.lock().unwrap().clear(); + ctx.counters.lock().unwrap().clear(); stats.step_exec("s"); stats.flush_emit_for_test(); - let c = counters.lock().unwrap(); + let c = ctx.counters.lock().unwrap(); let sum: u64 = c .iter() .filter(|(k, _)| key_name(k) == METRIC_INPUT_MESSAGES && labels_include_step(k, "s")) @@ -291,20 +343,15 @@ mod tests { #[test] fn multiple_steps_one_flush() { - let counters = Arc::new(Mutex::new(Vec::new())); - let histograms = Arc::new(Mutex::new(Vec::new())); - let rec = Arc::new(CaptureRecorder { - counters: Arc::clone(&counters), - histograms: Arc::clone(&histograms), - }); - let stats = PipelineStats::with_flush_interval_for_test(Duration::from_secs(3600)); + let ctx = CaptureTestContext::new(Duration::from_secs(3600)); + let stats = get_stats(); stats.step_exec("step_1"); stats.step_exec("step_1"); stats.step_exec("step_2"); - let _guard = metrics::set_default_local_recorder(rec.as_ref()); + let _guard = ctx.install_metrics_recorder(); stats.flush_emit_for_test(); - let c = counters.lock().unwrap(); + let c = ctx.counters.lock().unwrap(); let n1: u64 = c .iter() .filter(|(k, _)| { @@ -325,21 +372,16 @@ mod tests { #[test] fn throttle_flushes_after_interval() { - let counters = Arc::new(Mutex::new(Vec::new())); - let histograms = Arc::new(Mutex::new(Vec::new())); - let rec = Arc::new(CaptureRecorder { - counters: Arc::clone(&counters), - histograms: Arc::clone(&histograms), - }); - let stats = PipelineStats::with_flush_interval_for_test(Duration::from_millis(50)); - let _guard = metrics::set_default_local_recorder(rec.as_ref()); + let ctx = CaptureTestContext::new(Duration::from_millis(50)); + let stats = get_stats(); + let _guard = ctx.install_metrics_recorder(); stats.step_exec("throttle_step"); - assert!(counters.lock().unwrap().is_empty()); + assert!(ctx.counters.lock().unwrap().is_empty()); std::thread::sleep(Duration::from_millis(120)); stats.step_exec("throttle_step"); - let c = counters.lock().unwrap(); + let c = ctx.counters.lock().unwrap(); let sum: u64 = c .iter() .filter(|(k, _)| {