diff --git a/sentry_streams/Cargo.lock b/sentry_streams/Cargo.lock index aa9aeb70..d3042d04 100644 --- a/sentry_streams/Cargo.lock +++ b/sentry_streams/Cargo.lock @@ -1577,6 +1577,8 @@ dependencies = [ "log", "metrics", "metrics-exporter-dogstatsd", + "metrics-util", + "ordered-float", "parking_lot", "pyo3", "rand 0.8.5", diff --git a/sentry_streams/Cargo.toml b/sentry_streams/Cargo.toml index 98b94e22..3f4c6f09 100644 --- a/sentry_streams/Cargo.toml +++ b/sentry_streams/Cargo.toml @@ -36,6 +36,8 @@ extension-module = ["pyo3/extension-module"] cli = ["dep:clap", "pyo3/auto-initialize"] [dev-dependencies] +metrics-util = { version = "0.20.1", default-features = false, features = ["debugging"] } +ordered-float = { version = "5", default-features = false } parking_lot = "0.12.1" pyo3 = { version = "*", features = ["auto-initialize"] } diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index f93c2b93..f0708f6f 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -197,6 +197,7 @@ def finalize_chain( chains: TransformChains, route: Route, metrics_config: MetricsConfig, + segment_label: str, ) -> RuntimeOperator: rust_route = RustRoute(route.source, route.waypoints) config, func = chains.finalize(route) @@ -209,6 +210,7 @@ def finalize_chain( ) return RuntimeOperator.PythonAdapter( + segment_label, rust_route, MultiprocessDelegateFactory( func, @@ -226,7 +228,7 @@ def finalize_chain( ) else: logger.info(f"Finalizing chain for route {route} without multiprocessing") - return RuntimeOperator.Map(rust_route, lambda msg: func(msg).to_inner()) + return RuntimeOperator.Map(segment_label, rust_route, lambda msg: func(msg).to_inner()) class RustArroyoAdapter(StreamAdapter[Route, Route]): @@ -259,8 +261,9 @@ def build( # type: ignore[override] def __close_chain(self, stream: Route) -> None: if self.__chains.exists(stream): logger.info(f"Closing transformation chain: {stream} and adding to pipeline") + segment_label = self.__chains.segment_label(stream) self.__consumers[stream.source].add_step( - finalize_chain(self.__chains, stream, self.__metrics_config) + finalize_chain(self.__chains, stream, self.__metrics_config, segment_label), ) def get_consumer(self, source: str) -> ArroyoConsumer: @@ -332,19 +335,22 @@ def wrapped_generator() -> str: logger.info(f"Adding GCS sink: {step.name} to pipeline") self.__consumers[stream.source].add_step( - RuntimeOperator.GCSSink(route, step.bucket, wrapped_generator, step.thread_count) + RuntimeOperator.GCSSink( + step.name, route, step.bucket, wrapped_generator, step.thread_count + ), ) elif isinstance(step, DevNullSink): logger.info(f"Adding DevNull sink: {step.name} to pipeline") self.__consumers[stream.source].add_step( RuntimeOperator.DevNullSink( + step.name, route, batch_size=step.batch_size, batch_time_ms=step.batch_time_ms, average_sleep_time_ms=step.average_sleep_time_ms, max_sleep_time_ms=step.max_sleep_time_ms, - ) + ), ) # Our fallback for now since there's no other Sink type @@ -353,10 +359,11 @@ def wrapped_generator() -> str: logger.info(f"Adding stream sink: {step.name} to pipeline") self.__consumers[stream.source].add_step( RuntimeOperator.StreamSink( + step.name, route, step.stream_name, build_kafka_producer_config(step.name, self.steps_config), - ) + ), ) return stream @@ -449,7 +456,9 @@ def filter_msg(msg: Message[Any]) -> bool: route = RustRoute(stream.source, stream.waypoints) logger.info(f"Adding filter: {step.name} to pipeline") - self.__consumers[stream.source].add_step(RuntimeOperator.Filter(route, filter_msg)) + self.__consumers[stream.source].add_step( + RuntimeOperator.Filter(step.name, route, filter_msg) + ) return stream @@ -472,7 +481,7 @@ def reduce( logger.info(f"Adding reduce: {step.name} to pipeline") self.__consumers[stream.source].add_step( - RuntimeOperator.PythonAdapter(route, ReduceDelegateFactory(step)) + RuntimeOperator.PythonAdapter(step.name, route, ReduceDelegateFactory(step)), ) return stream @@ -495,8 +504,10 @@ def broadcast( logger.info(f"Adding broadcast: {step.name} to pipeline") self.__consumers[stream.source].add_step( RuntimeOperator.Broadcast( - route, downstream_routes=[branch.root.name for branch in step.routes] - ) + step.name, + route, + downstream_routes=[branch.root.name for branch in step.routes], + ), ) return build_branches(stream, step.routes) @@ -534,8 +545,11 @@ def routing_function(msg: Message[Any]) -> str: logger.info(f"Adding router: {step.name} to pipeline") self.__consumers[stream.source].add_step( RuntimeOperator.Router( - route, routing_function, cast(Sequence[str], step.routing_table.values()) - ) + step.name, + route, + routing_function, + cast(Sequence[str], step.routing_table.values()), + ), ) return build_branches(stream, step.routing_table.values()) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py b/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py index 278c9bb0..0c51039b 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py @@ -90,6 +90,18 @@ def add_map(self, route: Route, step: Map[Any, Any]) -> None: raise ValueError(f"Chain {route} not initialized") self.__chains[hashable_route].steps.append(step) + def segment_label(self, route: Route) -> str: + """ + Metric label for the chained map segment: the first map step's name, before `finalize`. + """ + hashable_route = _hashable_route(route) + if hashable_route not in self.__chains: + raise ValueError(f"No chain for route {route}") + steps = self.__chains[hashable_route].steps + if not steps: + return route.source + return steps[0].name + def finalize( self, route: Route ) -> Tuple[MultiProcessConfig | None, Callable[[Message[Any]], Message[Any]]]: diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index 6f58ffbb..417bccab 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -59,17 +59,26 @@ class PyMetricConfig: def flush_interval_ms(self) -> int | None: ... class RuntimeOperator: + @property + def step_name(self) -> str: ... @classmethod - def Map(cls, route: Route, function: Callable[[Message[Any]], Any]) -> Self: ... + def Map(cls, step_name: str, route: Route, function: Callable[[Message[Any]], Any]) -> Self: ... @classmethod - def Filter(cls, route: Route, function: Callable[[Message[Any]], bool]) -> Self: ... + def Filter( + cls, step_name: str, route: Route, function: Callable[[Message[Any]], bool] + ) -> Self: ... @classmethod def StreamSink( - cls, route: Route, topic_name: str, kafka_config: PyKafkaProducerConfig + cls, + step_name: str, + route: Route, + topic_name: str, + kafka_config: PyKafkaProducerConfig, ) -> Self: ... @classmethod def GCSSink( cls, + step_name: str, route: Route, bucket: str, object_generator: Callable[[], str], @@ -78,6 +87,7 @@ class RuntimeOperator: @classmethod def DevNullSink( cls, + step_name: str, route: Route, batch_size: int | None = None, batch_time_ms: float | None = None, @@ -87,14 +97,17 @@ class RuntimeOperator: @classmethod def Router( cls, + step_name: str, route: Route, function: Callable[[Message[Any]], str], downstream_routes: Sequence[str], ) -> Self: ... @classmethod - def Broadcast(cls, route: Route, downstream_routes: Sequence[str]) -> Self: ... + def Broadcast(cls, step_name: str, route: Route, downstream_routes: Sequence[str]) -> Self: ... @classmethod - def PythonAdapter(cls, route: Route, delegate_Factory: RustOperatorFactory) -> Self: ... + def PythonAdapter( + cls, step_name: str, route: Route, delegate_Factory: RustOperatorFactory + ) -> Self: ... class ArroyoConsumer: def __init__( diff --git a/sentry_streams/src/backpressure_metrics.rs b/sentry_streams/src/backpressure_metrics.rs new file mode 100644 index 00000000..996032e5 --- /dev/null +++ b/sentry_streams/src/backpressure_metrics.rs @@ -0,0 +1,189 @@ +//! This module tracks backpressure that can raise during the processing +//! of messages through the pipeline. +//! Every step in arroyo can raise a MessageRejected error to signal backpressure +//! to the previoius step in the pipeline. +//! Tracking this event is critical to understand the bottleneck in the pipeline. +//! +//! Here we track these events by wrapping a strategy and producing counters and +//! event-duration histograms to track backpressure. + +use sentry_arroyo::processing::strategies::{ + CommitRequest, ProcessingStrategy, StrategyError, SubmitError, +}; +use sentry_arroyo::types::Message; +use std::time::{Duration, Instant}; + +use crate::metrics::{record_counter, record_histogram}; +use crate::routes::RoutedValue; + +/// Tracks a single backpressure event for histogram sampling. +/// Events still open when the consumer stops are not recorded. +#[derive(Debug, Default)] +pub struct BackpressureTracker { + start: Option, +} + +impl BackpressureTracker { + /// Start or continue an event (timer begins on first rejection in a run). + pub fn on_message_rejected(&mut self) { + if self.start.is_none() { + self.start = Some(Instant::now()); + } + } + + /// If an event was active, record one histogram sample (milliseconds) and clear. + pub fn on_success(&mut self, step: &str, duration_metric: &str) { + if let Some(start) = self.start.take() { + let elapsed_ms = duration_ms(start.elapsed()); + record_histogram(duration_metric, &[("step", step)], elapsed_ms); + } + } +} + +fn duration_ms(d: Duration) -> f64 { + d.as_secs_f64() * 1000.0 +} + +/// Increment `send_backpressure` and extend the send-duration event tracker. +pub fn record_send_rejected(step: &str, tracker: &mut BackpressureTracker) { + record_counter("send_backpressure", &[("step", step)], 1); + tracker.on_message_rejected(); +} + +/// Increment `receive_backpressure` and extend the receive-duration event tracker. +pub fn record_rcvd_rejected(step: &str, tracker: &mut BackpressureTracker) { + record_counter("receive_backpressure", &[("step", step)], 1); + tracker.on_message_rejected(); +} + +/// End a send backpressure event after a successful submit path. +pub fn send_on_success(step: &str, tracker: &mut BackpressureTracker) { + tracker.on_success(step, "send_backpressure_duration"); +} + +/// End a receive backpressure event after downstream accepted a submit. +pub fn recv_on_success(step: &str, tracker: &mut BackpressureTracker) { + tracker.on_success(step, "receive_backpressure_duration"); +} + +/// Wraps the downstream step and records receive-side backpressure when `inner.submit` returns +/// `MessageRejected`. +pub struct BackpressureNext { + inner: Box>, + step_label: String, + recv_tracker: BackpressureTracker, +} + +impl BackpressureNext { + pub fn new(inner: Box>, step_label: String) -> Self { + Self { + inner, + step_label, + recv_tracker: BackpressureTracker::default(), + } + } +} + +impl ProcessingStrategy for BackpressureNext { + fn poll(&mut self) -> Result, StrategyError> { + // `SubmitError::MessageRejected` is only returned from `submit`, not from `poll`. + // The wrapped `next` may still see backpressure when the inner strategy calls + // `submit` during its own `poll` (e.g. watermark flush, retrying pending messages); + // those calls go through `BackpressureNext::submit` and are instrumented there. + self.inner.poll() + } + + fn submit(&mut self, message: Message) -> Result<(), SubmitError> { + match self.inner.submit(message) { + Ok(()) => { + recv_on_success(&self.step_label, &mut self.recv_tracker); + Ok(()) + } + Err(SubmitError::MessageRejected(mr)) => { + record_rcvd_rejected(&self.step_label, &mut self.recv_tracker); + Err(SubmitError::MessageRejected(mr)) + } + Err(e) => Err(e), + } + } + + fn terminate(&mut self) { + self.inner.terminate(); + } + + fn join(&mut self, timeout: Option) -> Result, StrategyError> { + self.inner.join(timeout) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::fake_strategy::FakeStrategy; + use crate::messages::RoutedValuePayload; + use crate::metrics::{init_streams_recorder_for_tests, PIPELINE_METRIC_PREFIX}; + use crate::routes::{Route, RoutedValue}; + use chrono::Utc; + use metrics::{with_local_recorder, Key, Label}; + use metrics_util::debugging::{DebugValue, DebuggingRecorder}; + use metrics_util::{CompositeKey, MetricKind}; + use sentry_arroyo::types::{Message, Partition, Topic}; + use std::collections::BTreeMap; + use std::sync::{Arc, Mutex}; + + #[test] + fn backpressure_next_increments_receive_backpressure_when_inner_rejects() { + init_streams_recorder_for_tests(); + let debug = DebuggingRecorder::new(); + let snapshotter = debug.snapshotter(); + let step_label = "upstream_map"; + + let routed = RoutedValue { + route: Route::new("src".to_string(), vec![]), + payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 1), + }; + let msg = + Message::new_broker_message(routed, Partition::new(Topic::new("t"), 0), 1, Utc::now()); + + let fake = FakeStrategy::new( + Arc::new(Mutex::new(Vec::new())), + Arc::new(Mutex::new(Vec::new())), + true, + ); + let mut next = BackpressureNext::new(Box::new(fake), step_label.to_string()); + + with_local_recorder(&debug, || match next.submit(msg) { + Err(SubmitError::MessageRejected(_)) => {} + other => panic!("expected MessageRejected, got {other:?}"), + }); + + let key = CompositeKey::new( + MetricKind::Counter, + Key::from_parts( + format!("{PIPELINE_METRIC_PREFIX}.receive_backpressure"), + vec![Label::new("step", step_label)], + ), + ); + let map = snapshotter.snapshot().into_hashmap(); + assert_eq!(map.get(&key), Some(&(None, None, DebugValue::Counter(1)))); + } + + #[test] + fn event_consecutive_rejections_one_histogram_on_success() { + let mut t = BackpressureTracker::default(); + t.on_message_rejected(); + std::thread::sleep(Duration::from_millis(2)); + t.on_message_rejected(); + // No panic; second rejection does not restart timer + assert!(t.start.is_some()); + t.on_success("step_a", "receive_backpressure_duration"); + assert!(t.start.is_none()); + } + + #[test] + fn event_success_without_rejection_no_histogram() { + let mut t = BackpressureTracker::default(); + t.on_success("step_b", "send_backpressure_duration"); + assert!(t.start.is_none()); + } +} diff --git a/sentry_streams/src/consumer.rs b/sentry_streams/src/consumer.rs index 3b77dee7..68554a44 100644 --- a/sentry_streams/src/consumer.rs +++ b/sentry_streams/src/consumer.rs @@ -5,6 +5,7 @@ //! and all the steps following that source. //! The pipeline is built by adding RuntimeOperators to the consumer. +use crate::backpressure_metrics::BackpressureNext; use crate::commit_policy::WatermarkCommitOffsets; use crate::kafka_config::PyKafkaConsumerConfig; use crate::messages::{into_pyraw, PyStreamingMessage, RawMessage, RoutedValuePayload}; @@ -97,8 +98,7 @@ impl ArroyoConsumer { } /// Add a step to the Consumer pipeline at the end of it. - /// This class is supposed to be instantiated by the Python adapter - /// so it takes the steps descriptor as a Py. + /// Backpressure metric labels use each step's [`RuntimeOperator::pipeline_step_name`]. fn add_step(&mut self, step: Py) { self.steps.push(step); } @@ -223,6 +223,10 @@ pub fn build_chain( for step in steps.iter().rev() { next = build(step, next, Box::new(Noop {}), concurrency_config); } + let next = Box::new(BackpressureNext::new( + next, + format!("WatermarkEmitter:{source}"), + )); let watermark_step = Box::new(WatermarkEmitter::new( next, Route { @@ -380,6 +384,7 @@ mod tests { let r = Py::new( py, RuntimeOperator::Map { + step_name: "map".to_string(), route: Route::new("source".to_string(), vec![]), function: callable, }, diff --git a/sentry_streams/src/lib.rs b/sentry_streams/src/lib.rs index c8913a39..7d114ac1 100644 --- a/sentry_streams/src/lib.rs +++ b/sentry_streams/src/lib.rs @@ -1,4 +1,5 @@ use pyo3::prelude::*; +mod backpressure_metrics; mod broadcaster; mod callers; mod commit_policy; diff --git a/sentry_streams/src/metrics.rs b/sentry_streams/src/metrics.rs index 8749dc2f..e23815ba 100644 --- a/sentry_streams/src/metrics.rs +++ b/sentry_streams/src/metrics.rs @@ -2,14 +2,84 @@ use crate::metrics_config::PyMetricConfig; use metrics::Label; use metrics_exporter_dogstatsd::DogStatsDBuilder; use sentry_arroyo::metrics::{init as arroyo_init, Metric, MetricType, Recorder}; +use std::sync::OnceLock; use std::time::Duration; use tracing::{error, info, warn}; +/// Prefix for all stream pipeline metrics emitted from this crate (matches Arroyo facade keys). +pub const PIPELINE_METRIC_PREFIX: &str = "streams.pipeline"; + +static STREAMS_RECORDER: OnceLock = OnceLock::new(); + +/// Installs [`StreamsMetricsRecorder`] so [`record_counter`] / [`record_histogram`] emit. For unit tests; first call wins. +#[cfg(test)] +pub(crate) fn init_streams_recorder_for_tests() { + let _ = STREAMS_RECORDER.set(StreamsMetricsRecorder); +} + +/// Records pipeline metrics with the standard prefix. Configured default tags come from DogStatsD +/// [`DogStatsDBuilder::with_global_labels`] (not duplicated here). +#[derive(Debug, Default, Clone, Copy)] +pub struct StreamsMetricsRecorder; + +impl StreamsMetricsRecorder { + fn labels(extra: &[(&str, &str)]) -> Vec<(String, String)> { + extra + .iter() + .map(|(k, v)| ((*k).to_string(), (*v).to_string())) + .collect() + } + + fn full_key(name: &str) -> String { + format!("{}.{}", PIPELINE_METRIC_PREFIX, name) + } + + pub fn record_counter(&self, name: &str, extra: &[(&str, &str)], value: u64) { + let key = Self::full_key(name); + let labels = Self::labels(extra); + metrics::counter!(key, &labels).increment(value); + } + + pub fn record_histogram(&self, name: &str, extra: &[(&str, &str)], value: f64) { + let key = Self::full_key(name); + let labels = Self::labels(extra); + metrics::histogram!(key, &labels).record(value); + } + + pub fn record_gauge(&self, name: &str, extra: &[(&str, &str)], value: f64) { + let key = Self::full_key(name); + let labels = Self::labels(extra); + metrics::gauge!(key, &labels).set(value); + } +} + +/// Records a counter if [`configure_metrics`] has installed the recorder; otherwise a no-op (same idea as Arroyo's global recorder). +pub(crate) fn record_counter(name: &str, extra: &[(&str, &str)], value: u64) { + if let Some(r) = STREAMS_RECORDER.get() { + r.record_counter(name, extra, value); + } +} + +/// Records a histogram sample if the recorder is installed; otherwise a no-op. +pub(crate) fn record_histogram(name: &str, extra: &[(&str, &str)], value: f64) { + if let Some(r) = STREAMS_RECORDER.get() { + r.record_histogram(name, extra, value); + } +} + +/// Records a gauge if the recorder is installed; otherwise a no-op. +#[allow(dead_code)] +pub(crate) fn record_gauge(name: &str, extra: &[(&str, &str)], value: f64) { + if let Some(r) = STREAMS_RECORDER.get() { + r.record_gauge(name, extra, value); + } +} + struct MetricsFacadeRecorder; impl Recorder for MetricsFacadeRecorder { fn record_metric(&self, metric: Metric<'_>) { - let key = format!("streams.pipeline.{}", metric.key); + let key = format!("{}.{}", PIPELINE_METRIC_PREFIX, metric.key); let value_f64 = match metric.value { sentry_arroyo::metrics::MetricValue::I64(v) => v as f64, sentry_arroyo::metrics::MetricValue::U64(v) => v as f64, @@ -53,7 +123,6 @@ pub fn configure_metrics(metric_config: Option) { .with_remote_address(&format!("{}:{}", host, port)) .expect("Failed to parse address"); - // Apply global tags if configured let default_tags = metric_config.tags().unwrap_or_default(); if !default_tags.is_empty() { info!( @@ -70,7 +139,6 @@ pub fn configure_metrics(metric_config: Option) { builder = builder.with_global_labels(labels); } - // Apply flush interval if configured if let Some(flush_interval_ms) = metric_config.flush_interval_ms() { let duration = Duration::from_millis(flush_interval_ms); info!("Configuring metrics flush interval to {:?}", duration); @@ -87,6 +155,10 @@ pub fn configure_metrics(metric_config: Option) { } } + if STREAMS_RECORDER.set(StreamsMetricsRecorder).is_err() { + warn!("Streams pipeline metrics recorder was already initialized; skipping"); + } + if arroyo_init(MetricsFacadeRecorder).is_err() { warn!("Arroyo metrics recorder already initialized, skipping"); return; @@ -95,3 +167,76 @@ pub fn configure_metrics(metric_config: Option) { info!("Successfully initialized arroyo metrics"); } } + +/// Tests use [`metrics::with_local_recorder`] so emission goes to a [`metrics_util::debugging::DebuggingRecorder`] +/// without installing a global `metrics` recorder. See . +#[cfg(test)] +mod tests { + use super::{StreamsMetricsRecorder, PIPELINE_METRIC_PREFIX}; + use metrics::{with_local_recorder, Key, Label}; + use metrics_util::debugging::{DebugValue, DebuggingRecorder}; + use metrics_util::{CompositeKey, MetricKind}; + use ordered_float::OrderedFloat; + + fn expected_key(name: &str, labels: Vec