diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index e8d674c2..0d0b8647 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -457,6 +457,7 @@ def filter(self, step: Filter[Any], stream: Route) -> Route: self.__consumers[stream.source].add_step( RuntimeOperator.HeaderFilter( route=route, + step_name=step.name, header_name=step.header_name, expected_value=step.value, ) @@ -464,21 +465,9 @@ def filter(self, step: Filter[Any], stream: Route) -> Route: return stream elif isinstance(step, PredicateFilter): - - def filter_msg(msg: Message[Any]) -> bool: - input_metrics(step.name) - has_error = None - start_time = time.time() - try: - result = step.resolved_function(msg) - return result - except Exception as e: - has_error = str(e.__class__.__name__) - raise e - finally: - output_metrics(step.name, has_error, start_time) - - self.__consumers[stream.source].add_step(RuntimeOperator.Filter(route, filter_msg)) + self.__consumers[stream.source].add_step( + RuntimeOperator.Filter(route, lambda msg: step.resolved_function(msg), step.name) + ) return stream else: @@ -512,6 +501,7 @@ def reduce( self.__consumers[stream.source].add_step( RuntimeOperator.Batch( route=route, + step_name=step.name, max_batch_size=step.batch_size, max_batch_time_ms=max_batch_time_ms, ) diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index 1954d9bf..84257e98 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -89,9 +89,20 @@ class RuntimeOperator: @classmethod def Map(cls, route: Route, function: Callable[[Message[Any]], Any]) -> Self: ... @classmethod - def Filter(cls, route: Route, function: Callable[[Message[Any]], bool]) -> Self: ... + def Filter( + cls, + route: Route, + function: Callable[[Message[Any]], bool], + step_name: str, + ) -> Self: ... @classmethod - def HeaderFilter(cls, route: Route, header_name: str, expected_value: int) -> Self: ... + def HeaderFilter( + cls, + route: Route, + step_name: str, + header_name: str, + expected_value: int, + ) -> Self: ... @classmethod def StreamSink( cls, route: Route, topic_name: str, kafka_config: PyKafkaProducerConfig @@ -126,6 +137,7 @@ class RuntimeOperator: def Batch( cls, route: Route, + step_name: str, max_batch_size: int | None = None, max_batch_time_ms: float | None = None, ) -> Self: ... diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index d9089c0f..c2a4d3a8 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -6,6 +6,7 @@ //! //! The GIL is taken only to build the list on flush (after [`Message::into_payload`] in submit). use crate::messages::{into_pyany, PyAnyMessage, PyStreamingMessage, RoutedValuePayload}; +use crate::pipeline_stats::get_stats; use crate::routes::{Route, RoutedValue}; use crate::time_helpers::current_epoch; use crate::utils::traced_with_gil; @@ -18,7 +19,7 @@ use sentry_arroyo::processing::strategies::{ use sentry_arroyo::types::{Message, Partition}; use sentry_arroyo::utils::timing::Deadline; use std::collections::{BTreeMap, VecDeque}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; fn first_element_schema(py: Python<'_>, first: &PyStreamingMessage) -> Option { match first { @@ -165,6 +166,7 @@ pub struct BatchStep { next_step: Box>, route: Route, + step_name: String, max_batch_size: Option, max_batch_time: Option, /// `None` until the first streaming message in a window. @@ -193,11 +195,13 @@ impl BatchStep { route: Route, max_batch_size: Option, max_batch_time: Option, + step_name: String, next_step: Box>, ) -> Self { Self { next_step, route, + step_name, max_batch_size, max_batch_time, batch: None, @@ -254,7 +258,9 @@ impl BatchStep { // We create a synthetic watermark to avoid waiting for the next batch to complete before // allowing the consumer to commit. let committable_for_synthetic = b.current_offsets_snapshot(); + let flush_start = Instant::now(); let batch_msg = b.flush()?; + get_stats().step_timing(&self.step_name, flush_start.elapsed().as_secs_f64()); self.batch = None; let wm_after_batch: Vec<_> = std::mem::take(&mut self.watermark_buffer); @@ -302,12 +308,14 @@ pub fn build_batch_step( route: &Route, max_batch_size: Option, max_batch_time: Option, + step_name: String, next: Box>, ) -> Box> { Box::new(BatchStep::new( route.clone(), max_batch_size, max_batch_time, + step_name, next, )) } @@ -369,6 +377,7 @@ impl ProcessingStrategy for BatchStep { .expect("open batch") .append(committable, pysm); } + get_stats().step_exec(&self.step_name); Ok(()) } } @@ -604,7 +613,13 @@ mod tests { let sub = Arc::new(Mutex::new(Vec::new())); let wms = Arc::new(Mutex::new(Vec::new())); let next = FakeStrategy::new(sub.clone(), wms.clone(), false); - let step = BatchStep::new(route, max_n, max_t, Box::new(next)); + let step = BatchStep::new( + route, + max_n, + max_t, + "test_batch".to_string(), + Box::new(next), + ); (step, sub, wms) } diff --git a/sentry_streams/src/filter_step.rs b/sentry_streams/src/filter_step.rs index 02a925d8..a0bf5598 100644 --- a/sentry_streams/src/filter_step.rs +++ b/sentry_streams/src/filter_step.rs @@ -1,5 +1,6 @@ use crate::callers::{try_apply_py, ApplyError}; use crate::messages::RoutedValuePayload; +use crate::pipeline_stats::get_stats; use crate::routes::{Route, RoutedValue}; use crate::utils::traced_with_gil; use pyo3::{Py, PyAny}; @@ -7,12 +8,13 @@ use sentry_arroyo::processing::strategies::{ CommitRequest, ProcessingStrategy, StrategyError, SubmitError, }; use sentry_arroyo::types::{InnerMessage, Message}; -use std::time::Duration; +use std::time::{Duration, Instant}; pub struct Filter { pub callable: Py, pub next_step: Box>, pub route: Route, + pub step_name: String, } impl Filter { @@ -26,11 +28,13 @@ impl Filter { callable: Py, next_step: Box>, route: Route, + step_name: String, ) -> Self { Self { callable, next_step, route, + step_name, } } } @@ -52,6 +56,9 @@ impl ProcessingStrategy for Filter { unreachable!("Watermark message trying to be passed to filter function.") }; + let stats = get_stats(); + stats.step_exec(&self.step_name); + let start = Instant::now(); let res = traced_with_gil!(|py| { try_apply_py( py, @@ -60,13 +67,24 @@ impl ProcessingStrategy for Filter { ) .and_then(|py_res| py_res.is_truthy(py).map_err(|_| ApplyError::ApplyFailed)) }); + let elapsed = start.elapsed().as_secs_f64(); match (res, &message.inner_message) { - (Ok(true), _) => self.next_step.submit(message), - (Ok(false), _) => Ok(()), + (Ok(true), _) => { + stats.step_timing(&self.step_name, elapsed); + self.next_step.submit(message) + } + (Ok(false), _) => { + stats.step_timing(&self.step_name, elapsed); + Ok(()) + } (Err(ApplyError::ApplyFailed), _) => panic!("Python filter function raised exception that is not sentry_streams.pipeline.exception.InvalidMessageError"), (Err(ApplyError::InvalidMessage), InnerMessage::AnyMessage(..)) => panic!("Got exception while processing AnyMessage, Arroyo cannot handle error on AnyMessage"), - (Err(ApplyError::InvalidMessage), InnerMessage::BrokerMessage(broker_message)) => Err(SubmitError::InvalidMessage(broker_message.into())), + (Err(ApplyError::InvalidMessage), InnerMessage::BrokerMessage(broker_message)) => { + stats.step_error(&self.step_name); + stats.step_timing(&self.step_name, elapsed); + Err(SubmitError::InvalidMessage(broker_message.into())) + } } } @@ -118,6 +136,7 @@ mod tests { build_filter( &Route::new("source1".to_string(), vec!["waypoint1".to_string()]), callable, + "test_step".to_string(), Box::new(next_step), ) }) @@ -222,6 +241,7 @@ mod tests { let mut strategy = build_filter( &Route::new("source1".to_string(), vec!["waypoint1".to_string()]), callable, + "test_step".to_string(), Box::new(next_step), ); diff --git a/sentry_streams/src/header_filter_step.rs b/sentry_streams/src/header_filter_step.rs index e3020784..ebffcde7 100644 --- a/sentry_streams/src/header_filter_step.rs +++ b/sentry_streams/src/header_filter_step.rs @@ -1,6 +1,7 @@ //! Filter messages by integer equality on a named header, without calling Python. use crate::messages::PyStreamingMessage; use crate::messages::RoutedValuePayload; +use crate::pipeline_stats::get_stats; use crate::routes::{Route, RoutedValue}; use crate::utils::traced_with_gil; use sentry_arroyo::processing::strategies::{ @@ -59,6 +60,7 @@ fn streaming_message_headers(msg: &PyStreamingMessage) -> Vec<(String, Vec)> pub struct HeaderIntEqualityFilter { header_name: String, expected: i64, + step_name: String, next_step: Box>, route: Route, } @@ -67,12 +69,14 @@ impl HeaderIntEqualityFilter { pub fn new( header_name: String, expected: i64, + step_name: String, next_step: Box>, route: Route, ) -> Self { Self { header_name, expected, + step_name, next_step, route, } @@ -85,22 +89,28 @@ impl ProcessingStrategy for HeaderIntEqualityFilter { } fn submit(&mut self, message: Message) -> Result<(), SubmitError> { - if self.route != message.payload().route { + if self.route != message.payload().route || message.payload().payload.is_watermark_msg() { return self.next_step.submit(message); } let RoutedValuePayload::PyStreamingMessage(ref py_streaming_msg) = message.payload().payload else { - // Watermarks and any other non-streaming payload: pass through (same as build_map in transformer.rs). return self.next_step.submit(message); }; + let stats = get_stats(); + stats.step_exec(&self.step_name); let headers = streaming_message_headers(py_streaming_msg); - match header_int_equality_decision(&headers, &self.header_name, self.expected) { + let decision = header_int_equality_decision(&headers, &self.header_name, self.expected); + + match decision { Ok(true) => self.next_step.submit(message), Ok(false) => Ok(()), - Err(()) => Err(invalid_message_submit_error(&message)), + Err(()) => { + stats.step_error(&self.step_name); + Err(invalid_message_submit_error(&message)) + } } } @@ -117,11 +127,13 @@ pub fn build_header_int_filter( route: &Route, header_name: String, expected: i64, + step_name: String, next: Box>, ) -> Box> { Box::new(HeaderIntEqualityFilter::new( header_name, expected, + step_name, next, route.clone(), )) @@ -217,6 +229,7 @@ mod tests { &Route::new("source1".to_string(), vec!["waypoint1".to_string()]), "pid".to_string(), 42, + "test_step".to_string(), Box::new(next_step), ); @@ -272,6 +285,7 @@ mod tests { &Route::new("source1".to_string(), vec!["waypoint1".to_string()]), "pid".to_string(), 42, + "test_step".to_string(), Box::new(next_step), ); @@ -303,6 +317,7 @@ mod tests { &Route::new("source1".to_string(), vec!["waypoint1".to_string()]), "pid".to_string(), 42, + "test_step".to_string(), Box::new(FakeStrategy::new(Arc::default(), Arc::default(), false)), ); @@ -339,6 +354,7 @@ mod tests { &Route::new("s".to_string(), vec!["w".to_string()]), "k".to_string(), -1, + "test_step".to_string(), Box::new(next_step), ); @@ -368,6 +384,7 @@ mod tests { &Route::new("source".to_string(), vec![]), "h".to_string(), 1, + "test_step".to_string(), Box::new(FakeStrategy::new(Arc::default(), submitted_wm_clone, false)), ); @@ -391,6 +408,7 @@ mod tests { &Route::new("source1".to_string(), vec!["waypoint1".to_string()]), "x".to_string(), 1, + "test_step".to_string(), Box::new(FakeStrategy::new(submitted, Arc::default(), false)), ); diff --git a/sentry_streams/src/lib.rs b/sentry_streams/src/lib.rs index c5c6e56a..b8819a2b 100644 --- a/sentry_streams/src/lib.rs +++ b/sentry_streams/src/lib.rs @@ -15,6 +15,7 @@ mod metrics; mod metrics_config; mod mocks; mod operators; +mod pipeline_stats; mod python_operator; mod routers; mod routes; diff --git a/sentry_streams/src/operators.rs b/sentry_streams/src/operators.rs index 78113d99..a368be15 100644 --- a/sentry_streams/src/operators.rs +++ b/sentry_streams/src/operators.rs @@ -40,12 +40,17 @@ pub enum RuntimeOperator { /// This translates to a custom Arroyo strategy (Filter step) where a function /// is provided to transform the message payload into a bool. #[pyo3(name = "Filter")] - Filter { route: Route, function: Py }, + Filter { + route: Route, + function: Py, + step_name: String, + }, /// Filter by integer equality on a message header (Rust-only, no Python predicate). #[pyo3(name = "HeaderFilter")] HeaderFilter { route: Route, + step_name: String, header_name: String, expected_value: i64, }, @@ -98,6 +103,7 @@ pub enum RuntimeOperator { #[pyo3(name = "Batch")] Batch { route: Route, + step_name: String, /// `None` means no size limit (time-only window). max_batch_size: Option, /// Wall-clock duration in milliseconds; `None` means no time limit (size-only batch). @@ -126,17 +132,28 @@ pub fn build( let func_ref = traced_with_gil!(|py| function.clone_ref(py)); build_map(route, func_ref, next) } - RuntimeOperator::Filter { function, route } => { + RuntimeOperator::Filter { + function, + route, + step_name, + } => { // All functions (Python and Rust) are called the same way now // Rust functions automatically release the GIL internally let func_ref = traced_with_gil!(|py| function.clone_ref(py)); - build_filter(route, func_ref, next) + build_filter(route, func_ref, step_name.clone(), next) } RuntimeOperator::HeaderFilter { route, + step_name, header_name, expected_value, - } => build_header_int_filter(route, header_name.clone(), *expected_value, next), + } => build_header_int_filter( + route, + header_name.clone(), + *expected_value, + step_name.clone(), + next, + ), RuntimeOperator::StreamSink { route, topic_name, @@ -198,11 +215,12 @@ pub fn build( } RuntimeOperator::Batch { route, + step_name, max_batch_size, max_batch_time_ms, } => { let max_t = max_batch_time_ms.map(|ms| Duration::from_secs_f64((ms / 1000.0).max(0.0))); - build_batch_step(route, *max_batch_size, max_t, next) + build_batch_step(route, *max_batch_size, max_t, step_name.clone(), next) } RuntimeOperator::PythonAdapter { route, diff --git a/sentry_streams/src/pipeline_stats.rs b/sentry_streams/src/pipeline_stats.rs new file mode 100644 index 00000000..719448c4 --- /dev/null +++ b/sentry_streams/src/pipeline_stats.rs @@ -0,0 +1,394 @@ +//! Buffered pipeline step metrics (same semantics as `sentry_streams.metrics.stats`). +//! Records via the `metrics` crate only — no Arroyo types. +//! +//! State is **thread-local**: each OS thread has its own buffers. The consumer +//! `submit` path is expected to call into this from one thread per pipeline; do +//! not rely on cross-thread aggregation here. + +use std::cell::RefCell; +use std::collections::HashMap; +use std::time::{Duration, Instant}; + +const METRIC_INPUT_MESSAGES: &str = "streams.pipeline.input.messages"; +const METRIC_ERRORS: &str = "streams.pipeline.errors"; +const METRIC_DURATION: &str = "streams.pipeline.duration"; + +thread_local! { + static PIPELINE_STATS: RefCell = RefCell::new(PipelineStatsState::new()); +} + +/// Per-thread pipeline stats buffers and flush timer. +struct PipelineStatsState { + exec_buffer: HashMap, + error_buffer: HashMap, + timing_buffer: HashMap, + last_flush: Instant, + flush_interval: Duration, +} + +impl PipelineStatsState { + fn new() -> Self { + Self::with_flush_interval(Duration::from_secs(10)) + } + + fn with_flush_interval(flush_interval: Duration) -> Self { + Self { + exec_buffer: HashMap::new(), + error_buffer: HashMap::new(), + timing_buffer: HashMap::new(), + last_flush: Instant::now(), + flush_interval, + } + } + + fn flush_emit(&mut self) { + for (step, value) in self.exec_buffer.drain() { + let labels = vec![("step".to_string(), step)]; + metrics::counter!(METRIC_INPUT_MESSAGES, &labels).increment(value); + } + for (step, value) in self.error_buffer.drain() { + let labels = vec![("step".to_string(), step)]; + metrics::counter!(METRIC_ERRORS, &labels).increment(value); + } + for (step, fvalue) in self.timing_buffer.drain() { + let labels = vec![("step".to_string(), step)]; + metrics::histogram!(METRIC_DURATION, &labels).record(fvalue); + } + self.last_flush = Instant::now(); + } + + fn maybe_flush(&mut self) { + if self.last_flush.elapsed() >= self.flush_interval { + self.flush_emit(); + } + } + + fn step_exec(&mut self, step: &str) { + *self.exec_buffer.entry(step.to_string()).or_insert(0) += 1; + self.maybe_flush(); + } + + fn step_error(&mut self, step: &str) { + *self.error_buffer.entry(step.to_string()).or_insert(0) += 1; + self.maybe_flush(); + } + + fn step_timing(&mut self, step: &str, value_secs: f64) { + let entry = self.timing_buffer.entry(step.to_string()).or_insert(0.0); + if *entry < value_secs { + *entry = value_secs; + } + self.maybe_flush(); + } + + #[cfg(test)] + fn exec_count(&self, step: &str) -> u64 { + self.exec_buffer.get(step).copied().unwrap_or(0) + } + + #[cfg(test)] + fn flush_emit_for_test(&mut self) { + self.flush_emit(); + } +} + +/// Handle to this thread's [`PipelineStatsState`]; cheap to copy (zero-sized). +#[derive(Clone, Copy)] +pub struct PipelineStats; + +impl PipelineStats { + pub fn step_exec(self, step: &str) { + PIPELINE_STATS.with(|cell| { + let mut s = cell.borrow_mut(); + s.step_exec(step); + }); + } + + pub fn step_error(self, step: &str) { + PIPELINE_STATS.with(|cell| { + let mut s = cell.borrow_mut(); + s.step_error(step); + }); + } + + pub fn step_timing(self, step: &str, value_secs: f64) { + PIPELINE_STATS.with(|cell| { + let mut s = cell.borrow_mut(); + s.step_timing(step, value_secs); + }); + } + + #[cfg(test)] + pub(crate) fn flush_emit_for_test(self) { + PIPELINE_STATS.with(|cell| { + let mut s = cell.borrow_mut(); + s.flush_emit_for_test(); + }); + } + + #[cfg(test)] + pub(crate) fn exec_count(self, step: &str) -> u64 { + PIPELINE_STATS.with(|cell| { + let s = cell.borrow(); + s.exec_count(step) + }) + } +} + +pub fn get_stats() -> PipelineStats { + PipelineStats +} + +#[cfg(test)] +pub(crate) fn configure_pipeline_stats_for_test(flush_interval: Duration) { + PIPELINE_STATS.with(|cell| { + *cell.borrow_mut() = PipelineStatsState::with_flush_interval(flush_interval); + }); +} + +#[cfg(test)] +pub(crate) fn reset_pipeline_stats_for_test() { + configure_pipeline_stats_for_test(Duration::from_secs(10)); +} + +#[cfg(test)] +mod tests { + use super::*; + use metrics::{Key, KeyName, Metadata, Recorder, SharedString, Unit}; + use std::sync::{Arc, Mutex}; + + #[derive(Default)] + struct CaptureRecorder { + counters: Arc>>, + histograms: Arc>>, + } + + impl Recorder for CaptureRecorder { + fn describe_counter(&self, _key: KeyName, _unit: Option, _description: SharedString) { + } + + fn describe_gauge(&self, _key: KeyName, _unit: Option, _description: SharedString) {} + + fn describe_histogram( + &self, + _key: KeyName, + _unit: Option, + _description: SharedString, + ) { + } + + fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> metrics::Counter { + metrics::Counter::from_arc(Arc::new(CaptureCounter { + key: key.clone(), + counters: Arc::clone(&self.counters), + })) + } + + fn register_gauge(&self, _key: &Key, _metadata: &Metadata<'_>) -> metrics::Gauge { + metrics::Gauge::noop() + } + + fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> metrics::Histogram { + metrics::Histogram::from_arc(Arc::new(CaptureHistogram { + key: key.clone(), + histograms: Arc::clone(&self.histograms), + })) + } + } + + struct CaptureCounter { + key: Key, + counters: Arc>>, + } + + impl metrics::CounterFn for CaptureCounter { + fn increment(&self, value: u64) { + self.counters + .lock() + .unwrap() + .push((self.key.clone(), value)); + } + + fn absolute(&self, _value: u64) {} + } + + struct CaptureHistogram { + key: Key, + histograms: Arc>>, + } + + impl metrics::HistogramFn for CaptureHistogram { + fn record(&self, value: f64) { + self.histograms + .lock() + .unwrap() + .push((self.key.clone(), value)); + } + } + + fn key_name(key: &Key) -> String { + key.name().to_string() + } + + fn labels_include_step(key: &Key, step: &str) -> bool { + key.labels().any(|l| l.key() == "step" && l.value() == step) + } + + fn prepare_pipeline_stats_for_test(flush_interval: Duration) { + reset_pipeline_stats_for_test(); + configure_pipeline_stats_for_test(flush_interval); + } + + /// Reset pipeline stats, apply flush interval, and wire a [`CaptureRecorder`] for `metrics` locals. + struct CaptureTestContext { + counters: Arc>>, + histograms: Arc>>, + recorder: Arc, + } + + impl CaptureTestContext { + fn new(flush_interval: Duration) -> Self { + prepare_pipeline_stats_for_test(flush_interval); + let counters = Arc::new(Mutex::new(Vec::new())); + let histograms = Arc::new(Mutex::new(Vec::new())); + let recorder = Arc::new(CaptureRecorder { + counters: Arc::clone(&counters), + histograms: Arc::clone(&histograms), + }); + Self { + counters, + histograms, + recorder, + } + } + + fn install_metrics_recorder(&self) -> metrics::LocalRecorderGuard<'_> { + metrics::set_default_local_recorder(self.recorder.as_ref()) + } + } + + #[test] + fn flush_emits_aggregated_counters_and_max_timing() { + let ctx = CaptureTestContext::new(Duration::from_secs(3600)); + let stats = get_stats(); + stats.step_exec("in_step"); + stats.step_exec("in_step"); + stats.step_error("err_step"); + stats.step_timing("timer_step", 0.1); + stats.step_timing("timer_step", 0.05); + let _guard = ctx.install_metrics_recorder(); + stats.flush_emit_for_test(); + + let c = ctx.counters.lock().unwrap(); + let input: u64 = c + .iter() + .filter(|(k, _)| { + key_name(k) == METRIC_INPUT_MESSAGES && labels_include_step(k, "in_step") + }) + .map(|(_, v)| *v) + .sum(); + assert_eq!(input, 2); + + let errs: u64 = c + .iter() + .filter(|(k, _)| key_name(k) == METRIC_ERRORS && labels_include_step(k, "err_step")) + .map(|(_, v)| *v) + .sum(); + assert_eq!(errs, 1); + + let h = ctx.histograms.lock().unwrap(); + let dur: Vec = h + .iter() + .filter(|(k, _)| key_name(k) == METRIC_DURATION && labels_include_step(k, "timer_step")) + .map(|(_, v)| *v) + .collect(); + assert_eq!(dur, vec![0.1]); + } + + #[test] + fn no_auto_flush_before_interval() { + prepare_pipeline_stats_for_test(Duration::from_secs(3600)); + let stats = get_stats(); + stats.step_exec("a"); + assert_eq!(stats.exec_count("a"), 1); + } + + #[test] + fn flush_clears_buffers() { + let ctx = CaptureTestContext::new(Duration::from_secs(3600)); + let stats = get_stats(); + stats.step_exec("s"); + let _guard = ctx.install_metrics_recorder(); + stats.flush_emit_for_test(); + assert_eq!( + ctx.counters + .lock() + .unwrap() + .iter() + .filter(|(k, _)| labels_include_step(k, "s")) + .count(), + 1 + ); + ctx.counters.lock().unwrap().clear(); + stats.step_exec("s"); + stats.flush_emit_for_test(); + let c = ctx.counters.lock().unwrap(); + let sum: u64 = c + .iter() + .filter(|(k, _)| key_name(k) == METRIC_INPUT_MESSAGES && labels_include_step(k, "s")) + .map(|(_, v)| *v) + .sum(); + assert_eq!(sum, 1); + } + + #[test] + fn multiple_steps_one_flush() { + let ctx = CaptureTestContext::new(Duration::from_secs(3600)); + let stats = get_stats(); + stats.step_exec("step_1"); + stats.step_exec("step_1"); + stats.step_exec("step_2"); + let _guard = ctx.install_metrics_recorder(); + stats.flush_emit_for_test(); + + let c = ctx.counters.lock().unwrap(); + let n1: u64 = c + .iter() + .filter(|(k, _)| { + key_name(k) == METRIC_INPUT_MESSAGES && labels_include_step(k, "step_1") + }) + .map(|(_, v)| *v) + .sum(); + let n2: u64 = c + .iter() + .filter(|(k, _)| { + key_name(k) == METRIC_INPUT_MESSAGES && labels_include_step(k, "step_2") + }) + .map(|(_, v)| *v) + .sum(); + assert_eq!(n1, 2); + assert_eq!(n2, 1); + } + + #[test] + fn throttle_flushes_after_interval() { + let ctx = CaptureTestContext::new(Duration::from_millis(50)); + let stats = get_stats(); + let _guard = ctx.install_metrics_recorder(); + stats.step_exec("throttle_step"); + assert!(ctx.counters.lock().unwrap().is_empty()); + + std::thread::sleep(Duration::from_millis(120)); + stats.step_exec("throttle_step"); + + let c = ctx.counters.lock().unwrap(); + let sum: u64 = c + .iter() + .filter(|(k, _)| { + key_name(k) == METRIC_INPUT_MESSAGES && labels_include_step(k, "throttle_step") + }) + .map(|(_, v)| *v) + .sum(); + assert_eq!(sum, 2, "expected throttle flush to emit both execs"); + } +} diff --git a/sentry_streams/src/transformer.rs b/sentry_streams/src/transformer.rs index 4e500e17..931b5353 100644 --- a/sentry_streams/src/transformer.rs +++ b/sentry_streams/src/transformer.rs @@ -59,10 +59,11 @@ pub fn build_map( pub fn build_filter( route: &Route, callable: Py, + step_name: String, next: Box>, ) -> Box> { let copied_route = route.clone(); - Box::new(Filter::new(callable, next, copied_route)) + Box::new(Filter::new(callable, next, copied_route, step_name)) } #[cfg(test)]