From 6e7d58f56d3ddf0c9200e403a17ae32291e002e2 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Sat, 25 Apr 2026 21:54:05 -0700 Subject: [PATCH 1/2] Reduce impact of metrics on perf --- .../adapters/arroyo/rust_arroyo.py | 62 ++++++++++--------- .../deployment_config/simple_map_filter.yaml | 1 + 2 files changed, 35 insertions(+), 28 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 5e367966..b1b49f16 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -2,6 +2,7 @@ import functools import logging +import random import time from dataclasses import replace from typing import ( @@ -39,7 +40,6 @@ MetricsConfig, configure_metrics, get_metrics, - get_size, ) from sentry_streams.pipeline.function_template import ( InputType, @@ -97,47 +97,50 @@ def initializer(metrics_config: MetricsConfig) -> None: def _metrics_wrapped_function( - step_name: str, application_function: Callable[[Message[Any]], Any], msg: Message[Any] + step_name: str, + sample_rate: float, + application_function: Callable[[Message[Any]], Any], + msg: Message[Any], ) -> Any: """ Module-level wrapper function for adding metrics to step functions. This is defined at module level to be picklable for multiprocessing. """ - msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None - start_time = input_metrics(step_name, msg_size) - has_error = output_size = None + input_metrics(step_name, sample_rate) + has_error = None + start_time = time.time() try: result = application_function(msg) - output_size = get_size(result) return result except Exception as e: has_error = str(e.__class__.__name__) raise e finally: - output_metrics(step_name, has_error, start_time, output_size) + output_metrics(step_name, has_error, start_time, sample_rate) -def input_metrics(name: str, message_size: int | None) -> float: +def input_metrics(name: str, sample_rate: float) -> None: + if random.random() > sample_rate: + return metrics = get_metrics() tags = {"step": name} - metrics.increment(Metric.INPUT_MESSAGES, tags=tags) - if message_size is not None: - metrics.increment(Metric.INPUT_BYTES, tags=tags, value=message_size) - return time.time() + metrics.increment(Metric.INPUT_MESSAGES, value=1 / sample_rate, tags=tags) def output_metrics( - name: str, error: str | None, start_time: float, message_size: int | None + name: str, + error: str | None, + start_time: float, + sample_rate: float, ) -> None: + if random.random() > sample_rate: + return metrics = get_metrics() tags = {"step": name} if error: tags["error"] = error metrics.increment(Metric.ERRORS, tags=tags) - metrics.increment(Metric.OUTPUT_MESSAGES, tags=tags) - if message_size is not None: - metrics.increment(Metric.OUTPUT_BYTES, tags=tags, value=message_size) metrics.timing(Metric.DURATION, time.time() - start_time, tags=tags) @@ -237,6 +240,7 @@ def __init__( steps_config: Mapping[str, StepConfig], metrics_config: MetricsConfig, write_healthcheck: bool = False, + metrics_sample_rate: float = 0.1, ) -> None: super().__init__() self.steps_config = steps_config @@ -244,6 +248,7 @@ def __init__( self.__write_healthcheck = write_healthcheck self.__consumers: MutableMapping[str, ArroyoConsumer] = {} self.__chains = TransformChains() + self.__metrics_sample_rate = metrics_sample_rate @classmethod def build( # type: ignore[override] @@ -255,8 +260,9 @@ def build( # type: ignore[override] adapter_config = config.get("adapter_config") or {} arroyo_config = adapter_config.get("arroyo") or {} write_healthcheck = bool(arroyo_config.get("write_healthcheck", False)) + metrics_sample_rate = float(arroyo_config.get("metrics_sample_rate", 0.1)) - return cls(steps_config, metrics_config, write_healthcheck) + return cls(steps_config, metrics_config, write_healthcheck, metrics_sample_rate) def __close_chain(self, stream: Route) -> None: if self.__chains.exists(stream): @@ -326,11 +332,12 @@ def sink(self, step: Sink[Any], stream: Route) -> Route: # Fix this to wrap the actual step instead of just the object_generator. # This will at least capture the number of calls to the step, if nothing else. def wrapped_generator() -> str: - start_time = input_metrics(step.name, None) + start_time = time.time() + input_metrics(step.name, self.__metrics_sample_rate) try: return step.object_generator() finally: - output_metrics(step.name, None, start_time, None) + output_metrics(step.name, None, start_time, self.__metrics_sample_rate) logger.info(f"Adding GCS sink: {step.name} to pipeline") self.__consumers[stream.source].add_step( @@ -381,7 +388,7 @@ def map(self, step: Map[Any, Any], stream: Route) -> Route: application_function = step.resolved_function wrapped_function = functools.partial( - _metrics_wrapped_function, step.name, application_function + _metrics_wrapped_function, step.name, self.__metrics_sample_rate, application_function ) step = replace(step, function=wrapped_function) @@ -450,18 +457,17 @@ def filter(self, step: Filter[Any], stream: Route) -> Route: elif isinstance(step, PredicateFilter): def filter_msg(msg: Message[Any]) -> bool: - msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None - start_time = input_metrics(step.name, msg_size) - has_error = output_size = None + input_metrics(step.name, self.__metrics_sample_rate) + has_error = None + start_time = time.time() try: result = step.resolved_function(msg) - output_size = get_size(result) return result except Exception as e: has_error = str(e.__class__.__name__) raise e finally: - output_metrics(step.name, has_error, start_time, output_size) + output_metrics(step.name, has_error, start_time, self.__metrics_sample_rate) self.__consumers[stream.source].add_step(RuntimeOperator.Filter(route, filter_msg)) return stream @@ -537,8 +543,8 @@ def router( route = RustRoute(stream.source, stream.waypoints) def routing_function(msg: Message[Any]) -> str: - msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None - start_time = input_metrics(step.name, msg_size) + input_metrics(step.name, self.__metrics_sample_rate) + start_time = time.time() has_error = None try: waypoint = step.routing_function(msg) @@ -548,7 +554,7 @@ def routing_function(msg: Message[Any]) -> str: has_error = str(e.__class__.__name__) raise e finally: - output_metrics(step.name, has_error, start_time, None) + output_metrics(step.name, has_error, start_time, self.__metrics_sample_rate) logger.info(f"Adding router: {step.name} to pipeline") self.__consumers[stream.source].add_step( diff --git a/sentry_streams/sentry_streams/deployment_config/simple_map_filter.yaml b/sentry_streams/sentry_streams/deployment_config/simple_map_filter.yaml index bdf89fbf..d87de517 100644 --- a/sentry_streams/sentry_streams/deployment_config/simple_map_filter.yaml +++ b/sentry_streams/sentry_streams/deployment_config/simple_map_filter.yaml @@ -8,6 +8,7 @@ pipeline: adapter_config: arroyo: write_healthcheck: true + metrics_sample_rate: 0.01 segments: - steps_config: myinput: From f2ca949cc0d4d39bcc3bd5ca756b3aad55b2e322 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Sat, 25 Apr 2026 22:06:46 -0700 Subject: [PATCH 2/2] Avoid reinstantiation of metrics each time --- .../sentry_streams/metrics/metrics.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/sentry_streams/sentry_streams/metrics/metrics.py b/sentry_streams/sentry_streams/metrics/metrics.py index bc2a1f11..950f8703 100644 --- a/sentry_streams/sentry_streams/metrics/metrics.py +++ b/sentry_streams/sentry_streams/metrics/metrics.py @@ -432,7 +432,7 @@ def timing( self.__backend.timing(name, value, tags=_tags_from_mapping(tags)) -_metrics_backend: Optional[MetricsBackend] = None +_metrics: Optional[Metrics] = None _dummy_metrics_backend = DummyMetricsBackend() @@ -458,22 +458,25 @@ def configure_metrics(config: MetricsConfig, force: bool = False) -> None: ``config.json``) so worker processes can rebuild the same backends under ``spawn`` multiprocessing. """ - global _metrics_backend + global _metrics if not force: - assert _metrics_backend is None, "Metrics is already set" + assert _metrics is None, "Metrics is already set" inner = build_metrics_backend(config) - _metrics_backend = BufferedMetricsBackend( + backend = BufferedMetricsBackend( inner, throttle_interval_sec=_buffer_throttle_interval_sec(config), ) - arroyo_configure_metrics(ArroyoMetricsBackend(_metrics_backend)) + _metrics = Metrics(backend) + arroyo_configure_metrics(ArroyoMetricsBackend(backend)) def get_metrics() -> Metrics: - if _metrics_backend is None: - return Metrics(_dummy_metrics_backend) - return Metrics(_metrics_backend) + global _metrics + if _metrics is None: + _metrics = Metrics(_dummy_metrics_backend) + + return _metrics def get_size(obj: Any) -> int | None: