From 4bd6e7574d21a3f8f79d24492958a6b5496ba712 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Mon, 30 Mar 2026 23:40:21 +0200 Subject: [PATCH 1/4] feat(streams): Add backpressure metrics for consumer strategies Instrument ProcessingStrategy steps with counters and episode histograms for MessageRejected: send_backpressure vs receive_backpressure and matching duration series, using a BackpressureNext wrapper and explicit tracking for StreamSink produce and PythonAdapter. Each metric is labeled by step (operator kind and route). Incomplete episodes at shutdown are not emitted. Co-Authored-By: Cursor Made-with: Cursor --- sentry_streams/src/backpressure_metrics.rs | 139 +++++++++++++++++++++ sentry_streams/src/consumer.rs | 11 +- sentry_streams/src/lib.rs | 1 + sentry_streams/src/operators.rs | 42 ++++++- sentry_streams/src/python_operator.rs | 18 ++- sentry_streams/src/sinks.rs | 33 ++++- 6 files changed, 236 insertions(+), 8 deletions(-) create mode 100644 sentry_streams/src/backpressure_metrics.rs diff --git a/sentry_streams/src/backpressure_metrics.rs b/sentry_streams/src/backpressure_metrics.rs new file mode 100644 index 00000000..beebc05f --- /dev/null +++ b/sentry_streams/src/backpressure_metrics.rs @@ -0,0 +1,139 @@ +//! Backpressure metrics: counters and episode-duration histograms for +//! `MessageRejected` (send vs receive). Incomplete episodes at shutdown are dropped. + +use sentry_arroyo::processing::strategies::{ + CommitRequest, ProcessingStrategy, StrategyError, SubmitError, +}; +use sentry_arroyo::types::Message; +use std::time::{Duration, Instant}; + +use crate::routes::RoutedValue; + +const PREFIX: &str = "streams.pipeline"; + +/// Tracks a single backpressure episode for histogram sampling. +/// Episodes still open when the consumer stops are not recorded. +#[derive(Debug, Default)] +pub struct EpisodeTracker { + start: Option, +} + +impl EpisodeTracker { + /// Start or continue an episode (timer begins on first rejection in a run). + pub fn on_message_rejected(&mut self) { + if self.start.is_none() { + self.start = Some(Instant::now()); + } + } + + /// If an episode was active, record one histogram sample (milliseconds) and clear. + pub fn on_success(&mut self, step: &str, duration_metric: &str) { + if let Some(start) = self.start.take() { + let elapsed_ms = duration_ms(start.elapsed()); + let key = format!("{}.{}", PREFIX, duration_metric); + let labels = vec![("step".to_string(), step.to_string())]; + metrics::histogram!(key, &labels).record(elapsed_ms); + } + } +} + +fn duration_ms(d: Duration) -> f64 { + d.as_secs_f64() * 1000.0 +} + +fn counter_increment(key: String, step: &str) { + let labels = vec![("step".to_string(), step.to_string())]; + metrics::counter!(key, &labels).increment(1); +} + +/// Increment `send_backpressure` and extend the send-duration episode tracker. +pub fn record_send_rejected(step: &str, tracker: &mut EpisodeTracker) { + counter_increment(format!("{}.send_backpressure", PREFIX), step); + tracker.on_message_rejected(); +} + +/// Increment `receive_backpressure` and extend the receive-duration episode tracker. +pub fn record_rcvd_rejected(step: &str, tracker: &mut EpisodeTracker) { + counter_increment(format!("{}.receive_backpressure", PREFIX), step); + tracker.on_message_rejected(); +} + +/// End a send backpressure episode after a successful submit path. +pub fn send_on_success(step: &str, tracker: &mut EpisodeTracker) { + tracker.on_success(step, "send_backpressure_duration"); +} + +/// End a receive backpressure episode after downstream accepted a submit. +pub fn recv_on_success(step: &str, tracker: &mut EpisodeTracker) { + tracker.on_success(step, "receive_backpressure_duration"); +} + +/// Wraps the downstream step and records receive-side backpressure when `inner.submit` returns +/// `MessageRejected`. +pub struct BackpressureNext { + inner: Box>, + step_label: String, + recv_tracker: EpisodeTracker, +} + +impl BackpressureNext { + pub fn new(inner: Box>, step_label: String) -> Self { + Self { + inner, + step_label, + recv_tracker: EpisodeTracker::default(), + } + } +} + +impl ProcessingStrategy for BackpressureNext { + fn poll(&mut self) -> Result, StrategyError> { + self.inner.poll() + } + + fn submit(&mut self, message: Message) -> Result<(), SubmitError> { + match self.inner.submit(message) { + Ok(()) => { + recv_on_success(&self.step_label, &mut self.recv_tracker); + Ok(()) + } + Err(SubmitError::MessageRejected(mr)) => { + record_rcvd_rejected(&self.step_label, &mut self.recv_tracker); + Err(SubmitError::MessageRejected(mr)) + } + Err(e) => Err(e), + } + } + + fn terminate(&mut self) { + self.inner.terminate(); + } + + fn join(&mut self, timeout: Option) -> Result, StrategyError> { + self.inner.join(timeout) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn episode_consecutive_rejections_one_histogram_on_success() { + let mut t = EpisodeTracker::default(); + t.on_message_rejected(); + std::thread::sleep(Duration::from_millis(2)); + t.on_message_rejected(); + // No panic; second rejection does not restart timer + assert!(t.start.is_some()); + t.on_success("step_a", "receive_backpressure_duration"); + assert!(t.start.is_none()); + } + + #[test] + fn episode_success_without_rejection_no_histogram() { + let mut t = EpisodeTracker::default(); + t.on_success("step_b", "send_backpressure_duration"); + assert!(t.start.is_none()); + } +} diff --git a/sentry_streams/src/consumer.rs b/sentry_streams/src/consumer.rs index 3b77dee7..174a4f33 100644 --- a/sentry_streams/src/consumer.rs +++ b/sentry_streams/src/consumer.rs @@ -5,6 +5,7 @@ //! and all the steps following that source. //! The pipeline is built by adding RuntimeOperators to the consumer. +use crate::backpressure_metrics::BackpressureNext; use crate::commit_policy::WatermarkCommitOffsets; use crate::kafka_config::PyKafkaConsumerConfig; use crate::messages::{into_pyraw, PyStreamingMessage, RawMessage, RoutedValuePayload}; @@ -223,6 +224,10 @@ pub fn build_chain( for step in steps.iter().rev() { next = build(step, next, Box::new(Noop {}), concurrency_config); } + let next = Box::new(BackpressureNext::new( + next, + format!("WatermarkEmitter:{source}"), + )); let watermark_step = Box::new(WatermarkEmitter::new( next, Route { @@ -238,7 +243,11 @@ pub fn build_chain( Ok(to_routed_value(&copied_source, message, &copied_schema)) }; - let converter = RunTask::new(conversion_function, watermark_step); + let watermark_wrapped = Box::new(BackpressureNext::new( + watermark_step, + format!("KafkaPayloadConverter:{source}"), + )); + let converter = RunTask::new(conversion_function, watermark_wrapped); let chain: Box> = Box::new(converter); if write_healthcheck { diff --git a/sentry_streams/src/lib.rs b/sentry_streams/src/lib.rs index c8913a39..7d114ac1 100644 --- a/sentry_streams/src/lib.rs +++ b/sentry_streams/src/lib.rs @@ -1,4 +1,5 @@ use pyo3::prelude::*; +mod backpressure_metrics; mod broadcaster; mod callers; mod commit_policy; diff --git a/sentry_streams/src/operators.rs b/sentry_streams/src/operators.rs index bba20563..9c82b00c 100644 --- a/sentry_streams/src/operators.rs +++ b/sentry_streams/src/operators.rs @@ -1,3 +1,4 @@ +use crate::backpressure_metrics::BackpressureNext; use crate::broadcaster::Broadcaster; use crate::kafka_config::PyKafkaProducerConfig; use crate::python_operator::PythonAdapter; @@ -91,13 +92,49 @@ pub enum RuntimeOperator { }, } +/// Stable label for `step` on backpressure metrics (one series per runtime operator). +pub fn metric_label_for_runtime_operator(op: &RuntimeOperator) -> String { + fn route_seg(r: &Route) -> String { + if r.waypoints.is_empty() { + r.source.clone() + } else { + format!("{}>{}", r.source, r.waypoints.join(">")) + } + } + match op { + RuntimeOperator::Map { route, .. } => format!("Map:{}", route_seg(route)), + RuntimeOperator::Filter { route, .. } => format!("Filter:{}", route_seg(route)), + RuntimeOperator::StreamSink { + route, topic_name, .. + } => { + format!("StreamSink:{}:{}", route_seg(route), topic_name) + } + RuntimeOperator::GCSSink { route, bucket, .. } => { + format!("GCSSink:{}:{}", route_seg(route), bucket) + } + RuntimeOperator::DevNullSink { route, .. } => { + format!("DevNullSink:{}", route_seg(route)) + } + RuntimeOperator::Broadcast { route, .. } => { + format!("Broadcast:{}", route_seg(route)) + } + RuntimeOperator::Router { route, .. } => format!("Router:{}", route_seg(route)), + RuntimeOperator::PythonAdapter { route, .. } => { + format!("PythonAdapter:{}", route_seg(route)) + } + } +} + pub fn build( step: &Py, next: Box>, terminator_strategy: Box>, concurrency_config: &ConcurrencyConfig, ) -> Box> { - match step.get() { + let op = step.get(); + let label = metric_label_for_runtime_operator(&op); + let next = Box::new(BackpressureNext::new(next, label.clone())); + match op { RuntimeOperator::Map { function, route } => { // All functions (Python and Rust) are called the same way now // Rust functions automatically release the GIL internally @@ -123,6 +160,7 @@ pub fn build( topic_name, next, terminator_strategy, + label, )) } RuntimeOperator::GCSSink { @@ -174,7 +212,7 @@ pub fn build( delegate_factory, } => { let factory = traced_with_gil!(|py| { delegate_factory.clone_ref(py) }); - Box::new(PythonAdapter::new(route.clone(), factory, next)) + Box::new(PythonAdapter::new(route.clone(), factory, next, label)) } RuntimeOperator::Broadcast { route, diff --git a/sentry_streams/src/python_operator.rs b/sentry_streams/src/python_operator.rs index 1a62a620..b5a17968 100644 --- a/sentry_streams/src/python_operator.rs +++ b/sentry_streams/src/python_operator.rs @@ -2,6 +2,7 @@ //! processing strategy that delegates the processing of messages to the //! python operator. +use crate::backpressure_metrics::{record_send_rejected, send_on_success, EpisodeTracker}; use crate::committable::{clone_committable, convert_committable_to_py, convert_py_committable}; use crate::messages::{PyWatermark, RoutedValuePayload, WatermarkMessage}; use crate::routes::{Route, RoutedValue}; @@ -38,6 +39,8 @@ pub struct PythonAdapter { // TODO: Add a mutex here next_strategy: Box>, commit_request_carried_over: Option, + backpressure_step_label: String, + send_tracker: EpisodeTracker, } impl PythonAdapter { @@ -45,6 +48,7 @@ impl PythonAdapter { route: Route, delegate_factory: Py, next_strategy: Box>, + backpressure_step_label: String, ) -> Self { traced_with_gil!(|py| { let processing_step = delegate_factory.call_method0(py, "build").unwrap(); @@ -55,6 +59,8 @@ impl PythonAdapter { next_strategy, transformed_messages: VecDeque::new(), commit_request_carried_over: None, + backpressure_step_label, + send_tracker: EpisodeTracker::default(), } }) } @@ -118,8 +124,13 @@ impl ProcessingStrategy for PythonAdapter { /// /// Any other exception is unexpected and triggers a panic. fn submit(&mut self, message: Message) -> Result<(), SubmitError> { + let label = &self.backpressure_step_label; if self.route != message.payload().route { - return self.next_strategy.submit(message); + let r = self.next_strategy.submit(message); + if r.is_ok() { + send_on_success(label, &mut self.send_tracker); + } + return r; } let committable = match &message.payload().payload { @@ -142,10 +153,12 @@ impl ProcessingStrategy for PythonAdapter { .call_method1(py, "submit", (python_payload, py_committable)); let Err(py_err) = res else { + send_on_success(label, &mut self.send_tracker); return Ok(()); }; if py_err.is_instance(py, &py.get_type::()) { + record_send_rejected(label, &mut self.send_tracker); Err(SubmitError::MessageRejected( sentry_arroyo::processing::strategies::MessageRejected { message }, )) @@ -377,6 +390,7 @@ class RustOperatorDelegateFactory: Route::new("source1".to_string(), vec!["waypoint1".to_string()]), instance.unbind(), Box::new(Noop {}), + String::from("PythonAdapter:test"), ); let message = make_msg(py, "ok"); @@ -428,6 +442,7 @@ class RustOperatorDelegateFactory: Route::new("source1".to_string(), vec!["waypoint1".to_string()]), instance.unbind(), Box::new(next_step), + String::from("PythonAdapter:test"), ); let message = make_msg(py, "ok"); @@ -497,6 +512,7 @@ class RustOperatorDelegateFactory: Route::new("source1".to_string(), vec!["waypoint1".to_string()]), instance.unbind(), Box::new(next_step), + String::from("PythonAdapter:test"), ); let message = make_msg(py, "ok"); diff --git a/sentry_streams/src/sinks.rs b/sentry_streams/src/sinks.rs index dd539087..dea300d5 100644 --- a/sentry_streams/src/sinks.rs +++ b/sentry_streams/src/sinks.rs @@ -4,6 +4,9 @@ //! As all the strategies in the Arroyo streaming pipeline adapter, //! This checks whether a message should be processed or forwarded //! via the `Route` attribute. +use crate::backpressure_metrics::{ + record_rcvd_rejected, record_send_rejected, recv_on_success, send_on_success, EpisodeTracker, +}; use crate::messages::{PyStreamingMessage, RoutedValuePayload, Watermark, WatermarkMessage}; use crate::routes::{Route, RoutedValue}; use crate::utils::traced_with_gil; @@ -153,6 +156,9 @@ pub struct StreamSink { /// passed to the transformer function and been mutated. message_carried_over: Option>, commit_request_carried_over: Option, + backpressure_step_label: String, + send_tracker: EpisodeTracker, + produce_recv_tracker: EpisodeTracker, } impl StreamSink { @@ -163,6 +169,7 @@ impl StreamSink { topic: &str, next_strategy: Box>, terminator_strategy: Box>, + backpressure_step_label: String, ) -> Self { let produce_strategy = Produce::new( terminator_strategy, @@ -177,6 +184,9 @@ impl StreamSink { next_strategy, message_carried_over: None, commit_request_carried_over: None, + backpressure_step_label, + send_tracker: EpisodeTracker::default(), + produce_recv_tracker: EpisodeTracker::default(), } } } @@ -198,17 +208,21 @@ impl ProcessingStrategy for StreamSink { produce_commit_request, ); + let label = &self.backpressure_step_label; if let Some(message) = self.message_carried_over.take() { match self.produce_strategy.submit(message) { Err(SubmitError::MessageRejected(MessageRejected { message: transformed_message, })) => { + record_rcvd_rejected(label, &mut self.produce_recv_tracker); self.message_carried_over = Some(transformed_message); } Err(SubmitError::InvalidMessage(invalid_message)) => { return Err(invalid_message.into()); } - Ok(_) => {} + Ok(_) => { + recv_on_success(label, &mut self.produce_recv_tracker); + } } } @@ -219,27 +233,36 @@ impl ProcessingStrategy for StreamSink { /// corresponds to the route in the message correspond to the Route /// attribute in this strategy. fn submit(&mut self, message: Message) -> Result<(), SubmitError> { + let label = &self.backpressure_step_label; if self.message_carried_over.is_some() { + record_send_rejected(label, &mut self.send_tracker); return Err(SubmitError::MessageRejected(MessageRejected { message })); } // TODO: pass watermark messages on to produce step once produce step is async and we have a wrapper // around it to handle watermarks if self.route != message.payload().route || message.payload().payload.is_watermark_msg() { - self.next_strategy.submit(message) + let r = self.next_strategy.submit(message); + if r.is_ok() { + send_on_success(label, &mut self.send_tracker); + } + r } else { match self.produce_strategy.submit(to_kafka_payload(message)) { Err(SubmitError::MessageRejected(MessageRejected { message: transformed_message, })) => { - println!("Message rejected: {:?}", transformed_message); + record_rcvd_rejected(label, &mut self.produce_recv_tracker); self.message_carried_over = Some(transformed_message); } Err(SubmitError::InvalidMessage(invalid_message)) => { return Err(SubmitError::InvalidMessage(invalid_message)); } - Ok(_) => {} + Ok(_) => { + recv_on_success(label, &mut self.produce_recv_tracker); + } } + send_on_success(label, &mut self.send_tracker); Ok(()) } } @@ -369,6 +392,7 @@ mod tests { "result-topic", Box::new(next_step), Box::new(Noop {}), + String::from("StreamSink:test"), ); let watermark_val = RoutedValue { @@ -409,6 +433,7 @@ mod tests { "result-topic", Box::new(next_step), Box::new(terminator), + String::from("StreamSink:test"), ); traced_with_gil!(|py| { From ba1b60feb637c17d60618df223aa1bd4330ec486 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Tue, 31 Mar 2026 00:30:01 +0200 Subject: [PATCH 2/4] ref(streams): Align backpressure metrics with pipeline step names Use DSL Step.name for the step label: add_step(step, step_name) on the Rust consumer, parallel step_names in build_chain, and wire names from rust_arroyo (including segment_label for chained maps). Rename EpisodeTracker to BackpressureTracker. Document BackpressureNext::poll vs submit for MessageRejected. Co-Authored-By: Cursor Made-with: Cursor --- .../adapters/arroyo/rust_arroyo.py | 26 +++++++++---- .../adapters/arroyo/steps_chain.py | 12 ++++++ .../sentry_streams/rust_streams.pyi | 2 +- sentry_streams/src/backpressure_metrics.rs | 24 +++++++----- sentry_streams/src/consumer.rs | 38 ++++++++++++++++--- sentry_streams/src/operators.rs | 38 ++----------------- sentry_streams/src/python_operator.rs | 6 +-- sentry_streams/src/sinks.rs | 11 +++--- sentry_streams/src/watermark.rs | 2 + 9 files changed, 93 insertions(+), 66 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index f93c2b93..88da39c2 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -259,8 +259,10 @@ def build( # type: ignore[override] def __close_chain(self, stream: Route) -> None: if self.__chains.exists(stream): logger.info(f"Closing transformation chain: {stream} and adding to pipeline") + segment_label = self.__chains.segment_label(stream) self.__consumers[stream.source].add_step( - finalize_chain(self.__chains, stream, self.__metrics_config) + finalize_chain(self.__chains, stream, self.__metrics_config), + segment_label, ) def get_consumer(self, source: str) -> ArroyoConsumer: @@ -332,7 +334,8 @@ def wrapped_generator() -> str: logger.info(f"Adding GCS sink: {step.name} to pipeline") self.__consumers[stream.source].add_step( - RuntimeOperator.GCSSink(route, step.bucket, wrapped_generator, step.thread_count) + RuntimeOperator.GCSSink(route, step.bucket, wrapped_generator, step.thread_count), + step.name, ) elif isinstance(step, DevNullSink): @@ -344,7 +347,8 @@ def wrapped_generator() -> str: batch_time_ms=step.batch_time_ms, average_sleep_time_ms=step.average_sleep_time_ms, max_sleep_time_ms=step.max_sleep_time_ms, - ) + ), + step.name, ) # Our fallback for now since there's no other Sink type @@ -356,7 +360,8 @@ def wrapped_generator() -> str: route, step.stream_name, build_kafka_producer_config(step.name, self.steps_config), - ) + ), + step.name, ) return stream @@ -449,7 +454,9 @@ def filter_msg(msg: Message[Any]) -> bool: route = RustRoute(stream.source, stream.waypoints) logger.info(f"Adding filter: {step.name} to pipeline") - self.__consumers[stream.source].add_step(RuntimeOperator.Filter(route, filter_msg)) + self.__consumers[stream.source].add_step( + RuntimeOperator.Filter(route, filter_msg), step.name + ) return stream @@ -472,7 +479,8 @@ def reduce( logger.info(f"Adding reduce: {step.name} to pipeline") self.__consumers[stream.source].add_step( - RuntimeOperator.PythonAdapter(route, ReduceDelegateFactory(step)) + RuntimeOperator.PythonAdapter(route, ReduceDelegateFactory(step)), + step.name, ) return stream @@ -496,7 +504,8 @@ def broadcast( self.__consumers[stream.source].add_step( RuntimeOperator.Broadcast( route, downstream_routes=[branch.root.name for branch in step.routes] - ) + ), + step.name, ) return build_branches(stream, step.routes) @@ -535,7 +544,8 @@ def routing_function(msg: Message[Any]) -> str: self.__consumers[stream.source].add_step( RuntimeOperator.Router( route, routing_function, cast(Sequence[str], step.routing_table.values()) - ) + ), + step.name, ) return build_branches(stream, step.routing_table.values()) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py b/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py index 278c9bb0..423714ee 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py @@ -90,6 +90,18 @@ def add_map(self, route: Route, step: Map[Any, Any]) -> None: raise ValueError(f"Chain {route} not initialized") self.__chains[hashable_route].steps.append(step) + def segment_label(self, route: Route) -> str: + """ + Metric label for the chained map segment (joined step names), before `finalize`. + """ + hashable_route = _hashable_route(route) + if hashable_route not in self.__chains: + raise ValueError(f"No chain for route {route}") + steps = self.__chains[hashable_route].steps + if not steps: + return route.source + return ">".join(step.name for step in steps) + def finalize( self, route: Route ) -> Tuple[MultiProcessConfig | None, Callable[[Message[Any]], Message[Any]]]: diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index 6f58ffbb..ce6a7c82 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -106,7 +106,7 @@ class ArroyoConsumer: metric_config: PyMetricConfig | None = None, write_healthcheck: bool = False, ) -> None: ... - def add_step(self, step: RuntimeOperator) -> None: ... + def add_step(self, step: RuntimeOperator, step_name: str) -> None: ... def run(self) -> None: ... def shutdown(self) -> None: ... diff --git a/sentry_streams/src/backpressure_metrics.rs b/sentry_streams/src/backpressure_metrics.rs index beebc05f..cb0f5bd5 100644 --- a/sentry_streams/src/backpressure_metrics.rs +++ b/sentry_streams/src/backpressure_metrics.rs @@ -14,11 +14,11 @@ const PREFIX: &str = "streams.pipeline"; /// Tracks a single backpressure episode for histogram sampling. /// Episodes still open when the consumer stops are not recorded. #[derive(Debug, Default)] -pub struct EpisodeTracker { +pub struct BackpressureTracker { start: Option, } -impl EpisodeTracker { +impl BackpressureTracker { /// Start or continue an episode (timer begins on first rejection in a run). pub fn on_message_rejected(&mut self) { if self.start.is_none() { @@ -47,24 +47,24 @@ fn counter_increment(key: String, step: &str) { } /// Increment `send_backpressure` and extend the send-duration episode tracker. -pub fn record_send_rejected(step: &str, tracker: &mut EpisodeTracker) { +pub fn record_send_rejected(step: &str, tracker: &mut BackpressureTracker) { counter_increment(format!("{}.send_backpressure", PREFIX), step); tracker.on_message_rejected(); } /// Increment `receive_backpressure` and extend the receive-duration episode tracker. -pub fn record_rcvd_rejected(step: &str, tracker: &mut EpisodeTracker) { +pub fn record_rcvd_rejected(step: &str, tracker: &mut BackpressureTracker) { counter_increment(format!("{}.receive_backpressure", PREFIX), step); tracker.on_message_rejected(); } /// End a send backpressure episode after a successful submit path. -pub fn send_on_success(step: &str, tracker: &mut EpisodeTracker) { +pub fn send_on_success(step: &str, tracker: &mut BackpressureTracker) { tracker.on_success(step, "send_backpressure_duration"); } /// End a receive backpressure episode after downstream accepted a submit. -pub fn recv_on_success(step: &str, tracker: &mut EpisodeTracker) { +pub fn recv_on_success(step: &str, tracker: &mut BackpressureTracker) { tracker.on_success(step, "receive_backpressure_duration"); } @@ -73,7 +73,7 @@ pub fn recv_on_success(step: &str, tracker: &mut EpisodeTracker) { pub struct BackpressureNext { inner: Box>, step_label: String, - recv_tracker: EpisodeTracker, + recv_tracker: BackpressureTracker, } impl BackpressureNext { @@ -81,13 +81,17 @@ impl BackpressureNext { Self { inner, step_label, - recv_tracker: EpisodeTracker::default(), + recv_tracker: BackpressureTracker::default(), } } } impl ProcessingStrategy for BackpressureNext { fn poll(&mut self) -> Result, StrategyError> { + // `SubmitError::MessageRejected` is only returned from `submit`, not from `poll`. + // The wrapped `next` may still see backpressure when the inner strategy calls + // `submit` during its own `poll` (e.g. watermark flush, retrying pending messages); + // those calls go through `BackpressureNext::submit` and are instrumented there. self.inner.poll() } @@ -120,7 +124,7 @@ mod tests { #[test] fn episode_consecutive_rejections_one_histogram_on_success() { - let mut t = EpisodeTracker::default(); + let mut t = BackpressureTracker::default(); t.on_message_rejected(); std::thread::sleep(Duration::from_millis(2)); t.on_message_rejected(); @@ -132,7 +136,7 @@ mod tests { #[test] fn episode_success_without_rejection_no_histogram() { - let mut t = EpisodeTracker::default(); + let mut t = BackpressureTracker::default(); t.on_success("step_b", "send_backpressure_duration"); assert!(t.start.is_none()); } diff --git a/sentry_streams/src/consumer.rs b/sentry_streams/src/consumer.rs index 174a4f33..6b17e837 100644 --- a/sentry_streams/src/consumer.rs +++ b/sentry_streams/src/consumer.rs @@ -58,6 +58,9 @@ pub struct ArroyoConsumer { steps: Vec>, + /// `Step.name` for each entry in `steps` (same order). + step_names: Vec, + /// The ProcessorHandle allows the main thread to stop the StreamingProcessor /// from a different thread. handle: Option, @@ -90,6 +93,7 @@ impl ArroyoConsumer { source, schema, steps: Vec::new(), + step_names: Vec::new(), handle: None, concurrency_config: Arc::new(ConcurrencyConfig::new(1)), metric_config, @@ -98,10 +102,10 @@ impl ArroyoConsumer { } /// Add a step to the Consumer pipeline at the end of it. - /// This class is supposed to be instantiated by the Python adapter - /// so it takes the steps descriptor as a Py. - fn add_step(&mut self, step: Py) { + /// `step_name` must be the pipeline DSL step name (`Step.name`) for backpressure metric labels. + fn add_step(&mut self, step: Py, step_name: String) { self.steps.push(step); + self.step_names.push(step_name); } /// Runs the consumer. @@ -116,6 +120,7 @@ impl ArroyoConsumer { let factory = ArroyoStreamingFactory::new( self.source.clone(), &self.steps, + &self.step_names, self.concurrency_config.clone(), self.schema.clone(), self.write_healthcheck, @@ -215,14 +220,26 @@ fn to_routed_value( pub fn build_chain( source: &str, steps: &[Py], + step_names: &[String], ending_strategy: Box>, concurrency_config: &ConcurrencyConfig, schema: &Option, write_healthcheck: bool, ) -> Box> { + assert_eq!( + steps.len(), + step_names.len(), + "steps and step_names must have the same length" + ); let mut next = ending_strategy; - for step in steps.iter().rev() { - next = build(step, next, Box::new(Noop {}), concurrency_config); + for (step, name) in steps.iter().rev().zip(step_names.iter().rev()) { + next = build( + step, + name.as_str(), + next, + Box::new(Noop {}), + concurrency_config, + ); } let next = Box::new(BackpressureNext::new( next, @@ -260,6 +277,7 @@ pub fn build_chain( struct ArroyoStreamingFactory { source: String, steps: Vec>, + step_names: Vec, concurrency_config: Arc, schema: Option, write_healthcheck: bool, @@ -270,10 +288,16 @@ impl ArroyoStreamingFactory { fn new( source: String, steps: &[Py], + step_names: &[String], concurrency_config: Arc, schema: Option, write_healthcheck: bool, ) -> Self { + assert_eq!( + steps.len(), + step_names.len(), + "steps and step_names must have the same length" + ); let steps_copy = traced_with_gil!(|py| { steps .iter() @@ -284,6 +308,7 @@ impl ArroyoStreamingFactory { ArroyoStreamingFactory { source, steps: steps_copy, + step_names: step_names.to_vec(), concurrency_config, schema, write_healthcheck, @@ -296,6 +321,7 @@ impl ProcessingStrategyFactory for ArroyoStreamingFactory { build_chain( &self.source, &self.steps, + &self.step_names, // TODO: once Broadcast/Router work properly, count how many total downstream // branches a pipeline has and pass that value to the Watermark Box::new(WatermarkCommitOffsets::new(1)), @@ -402,9 +428,11 @@ mod tests { let next_step = FakeStrategy::new(submitted_messages, submitted_watermarks, false); let concurrency_config = ConcurrencyConfig::new(1); + let step_names = vec![String::from("map")]; let mut chain = build_chain( "source", &steps, + &step_names, Box::new(next_step), &concurrency_config, &Some("schema".to_string()), diff --git a/sentry_streams/src/operators.rs b/sentry_streams/src/operators.rs index 9c82b00c..9578601e 100644 --- a/sentry_streams/src/operators.rs +++ b/sentry_streams/src/operators.rs @@ -92,47 +92,17 @@ pub enum RuntimeOperator { }, } -/// Stable label for `step` on backpressure metrics (one series per runtime operator). -pub fn metric_label_for_runtime_operator(op: &RuntimeOperator) -> String { - fn route_seg(r: &Route) -> String { - if r.waypoints.is_empty() { - r.source.clone() - } else { - format!("{}>{}", r.source, r.waypoints.join(">")) - } - } - match op { - RuntimeOperator::Map { route, .. } => format!("Map:{}", route_seg(route)), - RuntimeOperator::Filter { route, .. } => format!("Filter:{}", route_seg(route)), - RuntimeOperator::StreamSink { - route, topic_name, .. - } => { - format!("StreamSink:{}:{}", route_seg(route), topic_name) - } - RuntimeOperator::GCSSink { route, bucket, .. } => { - format!("GCSSink:{}:{}", route_seg(route), bucket) - } - RuntimeOperator::DevNullSink { route, .. } => { - format!("DevNullSink:{}", route_seg(route)) - } - RuntimeOperator::Broadcast { route, .. } => { - format!("Broadcast:{}", route_seg(route)) - } - RuntimeOperator::Router { route, .. } => format!("Router:{}", route_seg(route)), - RuntimeOperator::PythonAdapter { route, .. } => { - format!("PythonAdapter:{}", route_seg(route)) - } - } -} - +/// Wires `next` with backpressure instrumentation using the pipeline step name (DSL `Step.name`) +/// passed from Python when the step is registered on the consumer. pub fn build( step: &Py, + pipeline_step_name: &str, next: Box>, terminator_strategy: Box>, concurrency_config: &ConcurrencyConfig, ) -> Box> { let op = step.get(); - let label = metric_label_for_runtime_operator(&op); + let label = pipeline_step_name.to_string(); let next = Box::new(BackpressureNext::new(next, label.clone())); match op { RuntimeOperator::Map { function, route } => { diff --git a/sentry_streams/src/python_operator.rs b/sentry_streams/src/python_operator.rs index b5a17968..6edf75ae 100644 --- a/sentry_streams/src/python_operator.rs +++ b/sentry_streams/src/python_operator.rs @@ -2,7 +2,7 @@ //! processing strategy that delegates the processing of messages to the //! python operator. -use crate::backpressure_metrics::{record_send_rejected, send_on_success, EpisodeTracker}; +use crate::backpressure_metrics::{record_send_rejected, send_on_success, BackpressureTracker}; use crate::committable::{clone_committable, convert_committable_to_py, convert_py_committable}; use crate::messages::{PyWatermark, RoutedValuePayload, WatermarkMessage}; use crate::routes::{Route, RoutedValue}; @@ -40,7 +40,7 @@ pub struct PythonAdapter { next_strategy: Box>, commit_request_carried_over: Option, backpressure_step_label: String, - send_tracker: EpisodeTracker, + send_tracker: BackpressureTracker, } impl PythonAdapter { @@ -60,7 +60,7 @@ impl PythonAdapter { transformed_messages: VecDeque::new(), commit_request_carried_over: None, backpressure_step_label, - send_tracker: EpisodeTracker::default(), + send_tracker: BackpressureTracker::default(), } }) } diff --git a/sentry_streams/src/sinks.rs b/sentry_streams/src/sinks.rs index dea300d5..4e3d1877 100644 --- a/sentry_streams/src/sinks.rs +++ b/sentry_streams/src/sinks.rs @@ -5,7 +5,8 @@ //! This checks whether a message should be processed or forwarded //! via the `Route` attribute. use crate::backpressure_metrics::{ - record_rcvd_rejected, record_send_rejected, recv_on_success, send_on_success, EpisodeTracker, + record_rcvd_rejected, record_send_rejected, recv_on_success, send_on_success, + BackpressureTracker, }; use crate::messages::{PyStreamingMessage, RoutedValuePayload, Watermark, WatermarkMessage}; use crate::routes::{Route, RoutedValue}; @@ -157,8 +158,8 @@ pub struct StreamSink { message_carried_over: Option>, commit_request_carried_over: Option, backpressure_step_label: String, - send_tracker: EpisodeTracker, - produce_recv_tracker: EpisodeTracker, + send_tracker: BackpressureTracker, + produce_recv_tracker: BackpressureTracker, } impl StreamSink { @@ -185,8 +186,8 @@ impl StreamSink { message_carried_over: None, commit_request_carried_over: None, backpressure_step_label, - send_tracker: EpisodeTracker::default(), - produce_recv_tracker: EpisodeTracker::default(), + send_tracker: BackpressureTracker::default(), + produce_recv_tracker: BackpressureTracker::default(), } } } diff --git a/sentry_streams/src/watermark.rs b/sentry_streams/src/watermark.rs index cd9f2dd0..ea80689f 100644 --- a/sentry_streams/src/watermark.rs +++ b/sentry_streams/src/watermark.rs @@ -208,9 +208,11 @@ mod tests { }, ) .unwrap(); + let step_names = vec![String::from("map")]; let mut watermark_step = build_chain( "source", &[map_step], + &step_names, Box::new(WatermarkCommitOffsets::new(1)), &ConcurrencyConfig::new(1), &None, From 8f9c8eff23bb736062b75bfca1aa060a515713aa Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Tue, 31 Mar 2026 11:56:50 +0200 Subject: [PATCH 3/4] ref(streams): Store step names on runtime operators Move pipeline step labels into RuntimeOperator so consumer assembly no longer tracks a parallel step_names list. This lets Arroyo segment finalization pass a single first-step label through operator construction for backpressure metrics. Co-Authored-By: GPT-5 Codex Made-with: Cursor --- .../adapters/arroyo/rust_arroyo.py | 32 +++++---- .../adapters/arroyo/steps_chain.py | 4 +- .../sentry_streams/rust_streams.pyi | 25 +++++-- sentry_streams/src/consumer.rs | 44 ++---------- sentry_streams/src/operators.rs | 72 +++++++++++++++---- sentry_streams/src/watermark.rs | 3 +- sentry_streams/tests/test_pipeline.py | 8 ++- 7 files changed, 112 insertions(+), 76 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 88da39c2..f0708f6f 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -197,6 +197,7 @@ def finalize_chain( chains: TransformChains, route: Route, metrics_config: MetricsConfig, + segment_label: str, ) -> RuntimeOperator: rust_route = RustRoute(route.source, route.waypoints) config, func = chains.finalize(route) @@ -209,6 +210,7 @@ def finalize_chain( ) return RuntimeOperator.PythonAdapter( + segment_label, rust_route, MultiprocessDelegateFactory( func, @@ -226,7 +228,7 @@ def finalize_chain( ) else: logger.info(f"Finalizing chain for route {route} without multiprocessing") - return RuntimeOperator.Map(rust_route, lambda msg: func(msg).to_inner()) + return RuntimeOperator.Map(segment_label, rust_route, lambda msg: func(msg).to_inner()) class RustArroyoAdapter(StreamAdapter[Route, Route]): @@ -261,8 +263,7 @@ def __close_chain(self, stream: Route) -> None: logger.info(f"Closing transformation chain: {stream} and adding to pipeline") segment_label = self.__chains.segment_label(stream) self.__consumers[stream.source].add_step( - finalize_chain(self.__chains, stream, self.__metrics_config), - segment_label, + finalize_chain(self.__chains, stream, self.__metrics_config, segment_label), ) def get_consumer(self, source: str) -> ArroyoConsumer: @@ -334,21 +335,22 @@ def wrapped_generator() -> str: logger.info(f"Adding GCS sink: {step.name} to pipeline") self.__consumers[stream.source].add_step( - RuntimeOperator.GCSSink(route, step.bucket, wrapped_generator, step.thread_count), - step.name, + RuntimeOperator.GCSSink( + step.name, route, step.bucket, wrapped_generator, step.thread_count + ), ) elif isinstance(step, DevNullSink): logger.info(f"Adding DevNull sink: {step.name} to pipeline") self.__consumers[stream.source].add_step( RuntimeOperator.DevNullSink( + step.name, route, batch_size=step.batch_size, batch_time_ms=step.batch_time_ms, average_sleep_time_ms=step.average_sleep_time_ms, max_sleep_time_ms=step.max_sleep_time_ms, ), - step.name, ) # Our fallback for now since there's no other Sink type @@ -357,11 +359,11 @@ def wrapped_generator() -> str: logger.info(f"Adding stream sink: {step.name} to pipeline") self.__consumers[stream.source].add_step( RuntimeOperator.StreamSink( + step.name, route, step.stream_name, build_kafka_producer_config(step.name, self.steps_config), ), - step.name, ) return stream @@ -455,7 +457,7 @@ def filter_msg(msg: Message[Any]) -> bool: logger.info(f"Adding filter: {step.name} to pipeline") self.__consumers[stream.source].add_step( - RuntimeOperator.Filter(route, filter_msg), step.name + RuntimeOperator.Filter(step.name, route, filter_msg) ) return stream @@ -479,8 +481,7 @@ def reduce( logger.info(f"Adding reduce: {step.name} to pipeline") self.__consumers[stream.source].add_step( - RuntimeOperator.PythonAdapter(route, ReduceDelegateFactory(step)), - step.name, + RuntimeOperator.PythonAdapter(step.name, route, ReduceDelegateFactory(step)), ) return stream @@ -503,9 +504,10 @@ def broadcast( logger.info(f"Adding broadcast: {step.name} to pipeline") self.__consumers[stream.source].add_step( RuntimeOperator.Broadcast( - route, downstream_routes=[branch.root.name for branch in step.routes] + step.name, + route, + downstream_routes=[branch.root.name for branch in step.routes], ), - step.name, ) return build_branches(stream, step.routes) @@ -543,9 +545,11 @@ def routing_function(msg: Message[Any]) -> str: logger.info(f"Adding router: {step.name} to pipeline") self.__consumers[stream.source].add_step( RuntimeOperator.Router( - route, routing_function, cast(Sequence[str], step.routing_table.values()) + step.name, + route, + routing_function, + cast(Sequence[str], step.routing_table.values()), ), - step.name, ) return build_branches(stream, step.routing_table.values()) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py b/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py index 423714ee..0c51039b 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py @@ -92,7 +92,7 @@ def add_map(self, route: Route, step: Map[Any, Any]) -> None: def segment_label(self, route: Route) -> str: """ - Metric label for the chained map segment (joined step names), before `finalize`. + Metric label for the chained map segment: the first map step's name, before `finalize`. """ hashable_route = _hashable_route(route) if hashable_route not in self.__chains: @@ -100,7 +100,7 @@ def segment_label(self, route: Route) -> str: steps = self.__chains[hashable_route].steps if not steps: return route.source - return ">".join(step.name for step in steps) + return steps[0].name def finalize( self, route: Route diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index ce6a7c82..417bccab 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -59,17 +59,26 @@ class PyMetricConfig: def flush_interval_ms(self) -> int | None: ... class RuntimeOperator: + @property + def step_name(self) -> str: ... @classmethod - def Map(cls, route: Route, function: Callable[[Message[Any]], Any]) -> Self: ... + def Map(cls, step_name: str, route: Route, function: Callable[[Message[Any]], Any]) -> Self: ... @classmethod - def Filter(cls, route: Route, function: Callable[[Message[Any]], bool]) -> Self: ... + def Filter( + cls, step_name: str, route: Route, function: Callable[[Message[Any]], bool] + ) -> Self: ... @classmethod def StreamSink( - cls, route: Route, topic_name: str, kafka_config: PyKafkaProducerConfig + cls, + step_name: str, + route: Route, + topic_name: str, + kafka_config: PyKafkaProducerConfig, ) -> Self: ... @classmethod def GCSSink( cls, + step_name: str, route: Route, bucket: str, object_generator: Callable[[], str], @@ -78,6 +87,7 @@ class RuntimeOperator: @classmethod def DevNullSink( cls, + step_name: str, route: Route, batch_size: int | None = None, batch_time_ms: float | None = None, @@ -87,14 +97,17 @@ class RuntimeOperator: @classmethod def Router( cls, + step_name: str, route: Route, function: Callable[[Message[Any]], str], downstream_routes: Sequence[str], ) -> Self: ... @classmethod - def Broadcast(cls, route: Route, downstream_routes: Sequence[str]) -> Self: ... + def Broadcast(cls, step_name: str, route: Route, downstream_routes: Sequence[str]) -> Self: ... @classmethod - def PythonAdapter(cls, route: Route, delegate_Factory: RustOperatorFactory) -> Self: ... + def PythonAdapter( + cls, step_name: str, route: Route, delegate_Factory: RustOperatorFactory + ) -> Self: ... class ArroyoConsumer: def __init__( @@ -106,7 +119,7 @@ class ArroyoConsumer: metric_config: PyMetricConfig | None = None, write_healthcheck: bool = False, ) -> None: ... - def add_step(self, step: RuntimeOperator, step_name: str) -> None: ... + def add_step(self, step: RuntimeOperator) -> None: ... def run(self) -> None: ... def shutdown(self) -> None: ... diff --git a/sentry_streams/src/consumer.rs b/sentry_streams/src/consumer.rs index 6b17e837..68554a44 100644 --- a/sentry_streams/src/consumer.rs +++ b/sentry_streams/src/consumer.rs @@ -58,9 +58,6 @@ pub struct ArroyoConsumer { steps: Vec>, - /// `Step.name` for each entry in `steps` (same order). - step_names: Vec, - /// The ProcessorHandle allows the main thread to stop the StreamingProcessor /// from a different thread. handle: Option, @@ -93,7 +90,6 @@ impl ArroyoConsumer { source, schema, steps: Vec::new(), - step_names: Vec::new(), handle: None, concurrency_config: Arc::new(ConcurrencyConfig::new(1)), metric_config, @@ -102,10 +98,9 @@ impl ArroyoConsumer { } /// Add a step to the Consumer pipeline at the end of it. - /// `step_name` must be the pipeline DSL step name (`Step.name`) for backpressure metric labels. - fn add_step(&mut self, step: Py, step_name: String) { + /// Backpressure metric labels use each step's [`RuntimeOperator::pipeline_step_name`]. + fn add_step(&mut self, step: Py) { self.steps.push(step); - self.step_names.push(step_name); } /// Runs the consumer. @@ -120,7 +115,6 @@ impl ArroyoConsumer { let factory = ArroyoStreamingFactory::new( self.source.clone(), &self.steps, - &self.step_names, self.concurrency_config.clone(), self.schema.clone(), self.write_healthcheck, @@ -220,26 +214,14 @@ fn to_routed_value( pub fn build_chain( source: &str, steps: &[Py], - step_names: &[String], ending_strategy: Box>, concurrency_config: &ConcurrencyConfig, schema: &Option, write_healthcheck: bool, ) -> Box> { - assert_eq!( - steps.len(), - step_names.len(), - "steps and step_names must have the same length" - ); let mut next = ending_strategy; - for (step, name) in steps.iter().rev().zip(step_names.iter().rev()) { - next = build( - step, - name.as_str(), - next, - Box::new(Noop {}), - concurrency_config, - ); + for step in steps.iter().rev() { + next = build(step, next, Box::new(Noop {}), concurrency_config); } let next = Box::new(BackpressureNext::new( next, @@ -260,11 +242,7 @@ pub fn build_chain( Ok(to_routed_value(&copied_source, message, &copied_schema)) }; - let watermark_wrapped = Box::new(BackpressureNext::new( - watermark_step, - format!("KafkaPayloadConverter:{source}"), - )); - let converter = RunTask::new(conversion_function, watermark_wrapped); + let converter = RunTask::new(conversion_function, watermark_step); let chain: Box> = Box::new(converter); if write_healthcheck { @@ -277,7 +255,6 @@ pub fn build_chain( struct ArroyoStreamingFactory { source: String, steps: Vec>, - step_names: Vec, concurrency_config: Arc, schema: Option, write_healthcheck: bool, @@ -288,16 +265,10 @@ impl ArroyoStreamingFactory { fn new( source: String, steps: &[Py], - step_names: &[String], concurrency_config: Arc, schema: Option, write_healthcheck: bool, ) -> Self { - assert_eq!( - steps.len(), - step_names.len(), - "steps and step_names must have the same length" - ); let steps_copy = traced_with_gil!(|py| { steps .iter() @@ -308,7 +279,6 @@ impl ArroyoStreamingFactory { ArroyoStreamingFactory { source, steps: steps_copy, - step_names: step_names.to_vec(), concurrency_config, schema, write_healthcheck, @@ -321,7 +291,6 @@ impl ProcessingStrategyFactory for ArroyoStreamingFactory { build_chain( &self.source, &self.steps, - &self.step_names, // TODO: once Broadcast/Router work properly, count how many total downstream // branches a pipeline has and pass that value to the Watermark Box::new(WatermarkCommitOffsets::new(1)), @@ -415,6 +384,7 @@ mod tests { let r = Py::new( py, RuntimeOperator::Map { + step_name: "map".to_string(), route: Route::new("source".to_string(), vec![]), function: callable, }, @@ -428,11 +398,9 @@ mod tests { let next_step = FakeStrategy::new(submitted_messages, submitted_watermarks, false); let concurrency_config = ConcurrencyConfig::new(1); - let step_names = vec![String::from("map")]; let mut chain = build_chain( "source", &steps, - &step_names, Box::new(next_step), &concurrency_config, &Some("schema".to_string()), diff --git a/sentry_streams/src/operators.rs b/sentry_streams/src/operators.rs index 9578601e..782dd028 100644 --- a/sentry_streams/src/operators.rs +++ b/sentry_streams/src/operators.rs @@ -32,18 +32,27 @@ pub enum RuntimeOperator { /// is provided to transform the message payload into a different /// one. #[pyo3(name = "Map")] - Map { route: Route, function: Py }, + Map { + step_name: String, + route: Route, + function: Py, + }, /// Represents a Filter step in the streaming pipeline. /// This translates to a custom Arroyo strategy (Filter step) where a function /// is provided to transform the message payload into a bool. #[pyo3(name = "Filter")] - Filter { route: Route, function: Py }, + Filter { + step_name: String, + route: Route, + function: Py, + }, /// Represents a Kafka Producer as a Sink in the pipeline. /// It is translated to an Arroyo Kafka producer. #[pyo3(name = "StreamSink")] StreamSink { + step_name: String, route: Route, topic_name: String, kafka_config: PyKafkaProducerConfig, @@ -51,6 +60,7 @@ pub enum RuntimeOperator { #[pyo3(name = "GCSSink")] GCSSink { + step_name: String, route: Route, bucket: String, object_generator: Py, @@ -61,6 +71,7 @@ pub enum RuntimeOperator { /// Simulates batching with configurable delays. #[pyo3(name = "DevNullSink")] DevNullSink { + step_name: String, route: Route, batch_size: Option, batch_time_ms: Option, @@ -71,6 +82,7 @@ pub enum RuntimeOperator { /// message and submits a copy of that message to each downstream route. #[pyo3(name = "Broadcast")] Broadcast { + step_name: String, route: Route, downstream_routes: Py, }, @@ -78,6 +90,7 @@ pub enum RuntimeOperator { /// to one of the downstream routes. #[pyo3(name = "Router")] Router { + step_name: String, route: Route, routing_function: Py, downstream_routes: Py, @@ -87,37 +100,66 @@ pub enum RuntimeOperator { /// to simplify the porting of python strategies to Rust. #[pyo3(name = "PythonAdapter")] PythonAdapter { + step_name: String, route: Route, delegate_factory: Py, }, } -/// Wires `next` with backpressure instrumentation using the pipeline step name (DSL `Step.name`) -/// passed from Python when the step is registered on the consumer. +impl RuntimeOperator { + /// Pipeline DSL `Step.name` for backpressure metric labels. + pub fn pipeline_step_name(&self) -> &str { + match self { + RuntimeOperator::Map { step_name, .. } + | RuntimeOperator::Filter { step_name, .. } + | RuntimeOperator::StreamSink { step_name, .. } + | RuntimeOperator::GCSSink { step_name, .. } + | RuntimeOperator::DevNullSink { step_name, .. } + | RuntimeOperator::Broadcast { step_name, .. } + | RuntimeOperator::Router { step_name, .. } + | RuntimeOperator::PythonAdapter { step_name, .. } => step_name.as_str(), + } + } +} + +#[pymethods] +impl RuntimeOperator { + /// Pipeline DSL step name (`Step.name`). + #[getter] + fn step_name(&self) -> String { + self.pipeline_step_name().to_string() + } +} + +/// Wires `next` with backpressure instrumentation using [`RuntimeOperator::pipeline_step_name`]. pub fn build( step: &Py, - pipeline_step_name: &str, next: Box>, terminator_strategy: Box>, concurrency_config: &ConcurrencyConfig, ) -> Box> { let op = step.get(); - let label = pipeline_step_name.to_string(); + let label = op.pipeline_step_name().to_string(); let next = Box::new(BackpressureNext::new(next, label.clone())); match op { - RuntimeOperator::Map { function, route } => { - // All functions (Python and Rust) are called the same way now - // Rust functions automatically release the GIL internally + RuntimeOperator::Map { + step_name: _, + function, + route, + } => { let func_ref = traced_with_gil!(|py| function.clone_ref(py)); build_map(route, func_ref, next) } - RuntimeOperator::Filter { function, route } => { - // All functions (Python and Rust) are called the same way now - // Rust functions automatically release the GIL internally + RuntimeOperator::Filter { + step_name: _, + function, + route, + } => { let func_ref = traced_with_gil!(|py| function.clone_ref(py)); build_filter(route, func_ref, next) } RuntimeOperator::StreamSink { + step_name: _, route, topic_name, kafka_config, @@ -134,6 +176,7 @@ pub fn build( )) } RuntimeOperator::GCSSink { + step_name: _, route, bucket, object_generator, @@ -150,6 +193,7 @@ pub fn build( )) } RuntimeOperator::DevNullSink { + step_name: _, route, batch_size, batch_time_ms, @@ -167,9 +211,9 @@ pub fn build( )) } RuntimeOperator::Router { + step_name: _, route, routing_function, - // TODO: Router step will use downstream_routes once it's fixed to work with watermarks #[allow(unused_variables)] downstream_routes, } => { @@ -178,6 +222,7 @@ pub fn build( build_router(route, func_ref, next) } RuntimeOperator::PythonAdapter { + step_name: _, route, delegate_factory, } => { @@ -185,6 +230,7 @@ pub fn build( Box::new(PythonAdapter::new(route.clone(), factory, next, label)) } RuntimeOperator::Broadcast { + step_name: _, route, downstream_routes, } => { diff --git a/sentry_streams/src/watermark.rs b/sentry_streams/src/watermark.rs index ea80689f..e67300bb 100644 --- a/sentry_streams/src/watermark.rs +++ b/sentry_streams/src/watermark.rs @@ -203,16 +203,15 @@ mod tests { let map_step = Py::new( py, RuntimeOperator::Map { + step_name: "map".to_string(), route: Route::new("source".to_string(), vec![]), function: callable, }, ) .unwrap(); - let step_names = vec![String::from("map")]; let mut watermark_step = build_chain( "source", &[map_step], - &step_names, Box::new(WatermarkCommitOffsets::new(1)), &ConcurrencyConfig::new(1), &None, diff --git a/sentry_streams/tests/test_pipeline.py b/sentry_streams/tests/test_pipeline.py index a6fcf59d..a44e9880 100644 --- a/sentry_streams/tests/test_pipeline.py +++ b/sentry_streams/tests/test_pipeline.py @@ -341,13 +341,18 @@ def generate_file_name() -> str: route = Route(source="test-source", waypoints=["step1", "step2"]) RuntimeOperator.GCSSink( - route=route, bucket="my-bucket", object_generator=generate_file_name, thread_count=1 + step_name="gcs_step", + route=route, + bucket="my-bucket", + object_generator=generate_file_name, + thread_count=1, ) def test_devnullsink_instantiation() -> None: route = Route(source="test-source", waypoints=["step1", "step2"]) RuntimeOperator.DevNullSink( + step_name="devnull", route=route, batch_size=None, batch_time_ms=None, @@ -357,6 +362,7 @@ def test_devnullsink_instantiation() -> None: # Test with batch configuration RuntimeOperator.DevNullSink( + step_name="devnull_batched", route=route, batch_size=100, batch_time_ms=10000.0, # 10 seconds in ms From dcbb381b010780966dab847eed6bf28ce24459fe Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Tue, 31 Mar 2026 12:46:03 +0200 Subject: [PATCH 4/4] feat(streams): Centralize Rust pipeline metrics emission Route backpressure counters and duration histograms through STREAMS_RECORDER and shared streams.pipeline helpers. Restore DogStatsD global labels on the exporter. Add unit tests using metrics with_local_recorder plus DebuggingRecorder, a merged StreamsMetricsRecorder test, and BackpressureNext wired to FakeStrategy for MessageRejected. Co-Authored-By: GPT-5 Codex Made-with: Cursor --- sentry_streams/Cargo.lock | 2 + sentry_streams/Cargo.toml | 2 + sentry_streams/src/backpressure_metrics.rs | 94 +++++++++---- sentry_streams/src/metrics.rs | 151 ++++++++++++++++++++- sentry_streams/uv.lock | 2 +- 5 files changed, 223 insertions(+), 28 deletions(-) diff --git a/sentry_streams/Cargo.lock b/sentry_streams/Cargo.lock index aa9aeb70..d3042d04 100644 --- a/sentry_streams/Cargo.lock +++ b/sentry_streams/Cargo.lock @@ -1577,6 +1577,8 @@ dependencies = [ "log", "metrics", "metrics-exporter-dogstatsd", + "metrics-util", + "ordered-float", "parking_lot", "pyo3", "rand 0.8.5", diff --git a/sentry_streams/Cargo.toml b/sentry_streams/Cargo.toml index 98b94e22..3f4c6f09 100644 --- a/sentry_streams/Cargo.toml +++ b/sentry_streams/Cargo.toml @@ -36,6 +36,8 @@ extension-module = ["pyo3/extension-module"] cli = ["dep:clap", "pyo3/auto-initialize"] [dev-dependencies] +metrics-util = { version = "0.20.1", default-features = false, features = ["debugging"] } +ordered-float = { version = "5", default-features = false } parking_lot = "0.12.1" pyo3 = { version = "*", features = ["auto-initialize"] } diff --git a/sentry_streams/src/backpressure_metrics.rs b/sentry_streams/src/backpressure_metrics.rs index cb0f5bd5..996032e5 100644 --- a/sentry_streams/src/backpressure_metrics.rs +++ b/sentry_streams/src/backpressure_metrics.rs @@ -1,5 +1,11 @@ -//! Backpressure metrics: counters and episode-duration histograms for -//! `MessageRejected` (send vs receive). Incomplete episodes at shutdown are dropped. +//! This module tracks backpressure that can raise during the processing +//! of messages through the pipeline. +//! Every step in arroyo can raise a MessageRejected error to signal backpressure +//! to the previoius step in the pipeline. +//! Tracking this event is critical to understand the bottleneck in the pipeline. +//! +//! Here we track these events by wrapping a strategy and producing counters and +//! event-duration histograms to track backpressure. use sentry_arroyo::processing::strategies::{ CommitRequest, ProcessingStrategy, StrategyError, SubmitError, @@ -7,32 +13,29 @@ use sentry_arroyo::processing::strategies::{ use sentry_arroyo::types::Message; use std::time::{Duration, Instant}; +use crate::metrics::{record_counter, record_histogram}; use crate::routes::RoutedValue; -const PREFIX: &str = "streams.pipeline"; - -/// Tracks a single backpressure episode for histogram sampling. -/// Episodes still open when the consumer stops are not recorded. +/// Tracks a single backpressure event for histogram sampling. +/// Events still open when the consumer stops are not recorded. #[derive(Debug, Default)] pub struct BackpressureTracker { start: Option, } impl BackpressureTracker { - /// Start or continue an episode (timer begins on first rejection in a run). + /// Start or continue an event (timer begins on first rejection in a run). pub fn on_message_rejected(&mut self) { if self.start.is_none() { self.start = Some(Instant::now()); } } - /// If an episode was active, record one histogram sample (milliseconds) and clear. + /// If an event was active, record one histogram sample (milliseconds) and clear. pub fn on_success(&mut self, step: &str, duration_metric: &str) { if let Some(start) = self.start.take() { let elapsed_ms = duration_ms(start.elapsed()); - let key = format!("{}.{}", PREFIX, duration_metric); - let labels = vec![("step".to_string(), step.to_string())]; - metrics::histogram!(key, &labels).record(elapsed_ms); + record_histogram(duration_metric, &[("step", step)], elapsed_ms); } } } @@ -41,29 +44,24 @@ fn duration_ms(d: Duration) -> f64 { d.as_secs_f64() * 1000.0 } -fn counter_increment(key: String, step: &str) { - let labels = vec![("step".to_string(), step.to_string())]; - metrics::counter!(key, &labels).increment(1); -} - -/// Increment `send_backpressure` and extend the send-duration episode tracker. +/// Increment `send_backpressure` and extend the send-duration event tracker. pub fn record_send_rejected(step: &str, tracker: &mut BackpressureTracker) { - counter_increment(format!("{}.send_backpressure", PREFIX), step); + record_counter("send_backpressure", &[("step", step)], 1); tracker.on_message_rejected(); } -/// Increment `receive_backpressure` and extend the receive-duration episode tracker. +/// Increment `receive_backpressure` and extend the receive-duration event tracker. pub fn record_rcvd_rejected(step: &str, tracker: &mut BackpressureTracker) { - counter_increment(format!("{}.receive_backpressure", PREFIX), step); + record_counter("receive_backpressure", &[("step", step)], 1); tracker.on_message_rejected(); } -/// End a send backpressure episode after a successful submit path. +/// End a send backpressure event after a successful submit path. pub fn send_on_success(step: &str, tracker: &mut BackpressureTracker) { tracker.on_success(step, "send_backpressure_duration"); } -/// End a receive backpressure episode after downstream accepted a submit. +/// End a receive backpressure event after downstream accepted a submit. pub fn recv_on_success(step: &str, tracker: &mut BackpressureTracker) { tracker.on_success(step, "receive_backpressure_duration"); } @@ -121,9 +119,57 @@ impl ProcessingStrategy for BackpressureNext { #[cfg(test)] mod tests { use super::*; + use crate::fake_strategy::FakeStrategy; + use crate::messages::RoutedValuePayload; + use crate::metrics::{init_streams_recorder_for_tests, PIPELINE_METRIC_PREFIX}; + use crate::routes::{Route, RoutedValue}; + use chrono::Utc; + use metrics::{with_local_recorder, Key, Label}; + use metrics_util::debugging::{DebugValue, DebuggingRecorder}; + use metrics_util::{CompositeKey, MetricKind}; + use sentry_arroyo::types::{Message, Partition, Topic}; + use std::collections::BTreeMap; + use std::sync::{Arc, Mutex}; + + #[test] + fn backpressure_next_increments_receive_backpressure_when_inner_rejects() { + init_streams_recorder_for_tests(); + let debug = DebuggingRecorder::new(); + let snapshotter = debug.snapshotter(); + let step_label = "upstream_map"; + + let routed = RoutedValue { + route: Route::new("src".to_string(), vec![]), + payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 1), + }; + let msg = + Message::new_broker_message(routed, Partition::new(Topic::new("t"), 0), 1, Utc::now()); + + let fake = FakeStrategy::new( + Arc::new(Mutex::new(Vec::new())), + Arc::new(Mutex::new(Vec::new())), + true, + ); + let mut next = BackpressureNext::new(Box::new(fake), step_label.to_string()); + + with_local_recorder(&debug, || match next.submit(msg) { + Err(SubmitError::MessageRejected(_)) => {} + other => panic!("expected MessageRejected, got {other:?}"), + }); + + let key = CompositeKey::new( + MetricKind::Counter, + Key::from_parts( + format!("{PIPELINE_METRIC_PREFIX}.receive_backpressure"), + vec![Label::new("step", step_label)], + ), + ); + let map = snapshotter.snapshot().into_hashmap(); + assert_eq!(map.get(&key), Some(&(None, None, DebugValue::Counter(1)))); + } #[test] - fn episode_consecutive_rejections_one_histogram_on_success() { + fn event_consecutive_rejections_one_histogram_on_success() { let mut t = BackpressureTracker::default(); t.on_message_rejected(); std::thread::sleep(Duration::from_millis(2)); @@ -135,7 +181,7 @@ mod tests { } #[test] - fn episode_success_without_rejection_no_histogram() { + fn event_success_without_rejection_no_histogram() { let mut t = BackpressureTracker::default(); t.on_success("step_b", "send_backpressure_duration"); assert!(t.start.is_none()); diff --git a/sentry_streams/src/metrics.rs b/sentry_streams/src/metrics.rs index 8749dc2f..e23815ba 100644 --- a/sentry_streams/src/metrics.rs +++ b/sentry_streams/src/metrics.rs @@ -2,14 +2,84 @@ use crate::metrics_config::PyMetricConfig; use metrics::Label; use metrics_exporter_dogstatsd::DogStatsDBuilder; use sentry_arroyo::metrics::{init as arroyo_init, Metric, MetricType, Recorder}; +use std::sync::OnceLock; use std::time::Duration; use tracing::{error, info, warn}; +/// Prefix for all stream pipeline metrics emitted from this crate (matches Arroyo facade keys). +pub const PIPELINE_METRIC_PREFIX: &str = "streams.pipeline"; + +static STREAMS_RECORDER: OnceLock = OnceLock::new(); + +/// Installs [`StreamsMetricsRecorder`] so [`record_counter`] / [`record_histogram`] emit. For unit tests; first call wins. +#[cfg(test)] +pub(crate) fn init_streams_recorder_for_tests() { + let _ = STREAMS_RECORDER.set(StreamsMetricsRecorder); +} + +/// Records pipeline metrics with the standard prefix. Configured default tags come from DogStatsD +/// [`DogStatsDBuilder::with_global_labels`] (not duplicated here). +#[derive(Debug, Default, Clone, Copy)] +pub struct StreamsMetricsRecorder; + +impl StreamsMetricsRecorder { + fn labels(extra: &[(&str, &str)]) -> Vec<(String, String)> { + extra + .iter() + .map(|(k, v)| ((*k).to_string(), (*v).to_string())) + .collect() + } + + fn full_key(name: &str) -> String { + format!("{}.{}", PIPELINE_METRIC_PREFIX, name) + } + + pub fn record_counter(&self, name: &str, extra: &[(&str, &str)], value: u64) { + let key = Self::full_key(name); + let labels = Self::labels(extra); + metrics::counter!(key, &labels).increment(value); + } + + pub fn record_histogram(&self, name: &str, extra: &[(&str, &str)], value: f64) { + let key = Self::full_key(name); + let labels = Self::labels(extra); + metrics::histogram!(key, &labels).record(value); + } + + pub fn record_gauge(&self, name: &str, extra: &[(&str, &str)], value: f64) { + let key = Self::full_key(name); + let labels = Self::labels(extra); + metrics::gauge!(key, &labels).set(value); + } +} + +/// Records a counter if [`configure_metrics`] has installed the recorder; otherwise a no-op (same idea as Arroyo's global recorder). +pub(crate) fn record_counter(name: &str, extra: &[(&str, &str)], value: u64) { + if let Some(r) = STREAMS_RECORDER.get() { + r.record_counter(name, extra, value); + } +} + +/// Records a histogram sample if the recorder is installed; otherwise a no-op. +pub(crate) fn record_histogram(name: &str, extra: &[(&str, &str)], value: f64) { + if let Some(r) = STREAMS_RECORDER.get() { + r.record_histogram(name, extra, value); + } +} + +/// Records a gauge if the recorder is installed; otherwise a no-op. +#[allow(dead_code)] +pub(crate) fn record_gauge(name: &str, extra: &[(&str, &str)], value: f64) { + if let Some(r) = STREAMS_RECORDER.get() { + r.record_gauge(name, extra, value); + } +} + struct MetricsFacadeRecorder; impl Recorder for MetricsFacadeRecorder { fn record_metric(&self, metric: Metric<'_>) { - let key = format!("streams.pipeline.{}", metric.key); + let key = format!("{}.{}", PIPELINE_METRIC_PREFIX, metric.key); let value_f64 = match metric.value { sentry_arroyo::metrics::MetricValue::I64(v) => v as f64, sentry_arroyo::metrics::MetricValue::U64(v) => v as f64, @@ -53,7 +123,6 @@ pub fn configure_metrics(metric_config: Option) { .with_remote_address(&format!("{}:{}", host, port)) .expect("Failed to parse address"); - // Apply global tags if configured let default_tags = metric_config.tags().unwrap_or_default(); if !default_tags.is_empty() { info!( @@ -70,7 +139,6 @@ pub fn configure_metrics(metric_config: Option) { builder = builder.with_global_labels(labels); } - // Apply flush interval if configured if let Some(flush_interval_ms) = metric_config.flush_interval_ms() { let duration = Duration::from_millis(flush_interval_ms); info!("Configuring metrics flush interval to {:?}", duration); @@ -87,6 +155,10 @@ pub fn configure_metrics(metric_config: Option) { } } + if STREAMS_RECORDER.set(StreamsMetricsRecorder).is_err() { + warn!("Streams pipeline metrics recorder was already initialized; skipping"); + } + if arroyo_init(MetricsFacadeRecorder).is_err() { warn!("Arroyo metrics recorder already initialized, skipping"); return; @@ -95,3 +167,76 @@ pub fn configure_metrics(metric_config: Option) { info!("Successfully initialized arroyo metrics"); } } + +/// Tests use [`metrics::with_local_recorder`] so emission goes to a [`metrics_util::debugging::DebuggingRecorder`] +/// without installing a global `metrics` recorder. See . +#[cfg(test)] +mod tests { + use super::{StreamsMetricsRecorder, PIPELINE_METRIC_PREFIX}; + use metrics::{with_local_recorder, Key, Label}; + use metrics_util::debugging::{DebugValue, DebuggingRecorder}; + use metrics_util::{CompositeKey, MetricKind}; + use ordered_float::OrderedFloat; + + fn expected_key(name: &str, labels: Vec