From 257bd53d3cc4d90381772ebd697e285f832e6a38 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Sun, 26 Apr 2026 14:15:05 -0700 Subject: [PATCH 1/6] Avoid reinstantion of the mtrics backend --- .../sentry_streams/metrics/metrics.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/sentry_streams/sentry_streams/metrics/metrics.py b/sentry_streams/sentry_streams/metrics/metrics.py index bc2a1f11..950f8703 100644 --- a/sentry_streams/sentry_streams/metrics/metrics.py +++ b/sentry_streams/sentry_streams/metrics/metrics.py @@ -432,7 +432,7 @@ def timing( self.__backend.timing(name, value, tags=_tags_from_mapping(tags)) -_metrics_backend: Optional[MetricsBackend] = None +_metrics: Optional[Metrics] = None _dummy_metrics_backend = DummyMetricsBackend() @@ -458,22 +458,25 @@ def configure_metrics(config: MetricsConfig, force: bool = False) -> None: ``config.json``) so worker processes can rebuild the same backends under ``spawn`` multiprocessing. """ - global _metrics_backend + global _metrics if not force: - assert _metrics_backend is None, "Metrics is already set" + assert _metrics is None, "Metrics is already set" inner = build_metrics_backend(config) - _metrics_backend = BufferedMetricsBackend( + backend = BufferedMetricsBackend( inner, throttle_interval_sec=_buffer_throttle_interval_sec(config), ) - arroyo_configure_metrics(ArroyoMetricsBackend(_metrics_backend)) + _metrics = Metrics(backend) + arroyo_configure_metrics(ArroyoMetricsBackend(backend)) def get_metrics() -> Metrics: - if _metrics_backend is None: - return Metrics(_dummy_metrics_backend) - return Metrics(_metrics_backend) + global _metrics + if _metrics is None: + _metrics = Metrics(_dummy_metrics_backend) + + return _metrics def get_size(obj: Any) -> int | None: From 15baf676fe762df303f9c0ee00533b45b20af4b2 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Sun, 26 Apr 2026 14:38:32 -0700 Subject: [PATCH 2/6] Introduce stats --- .../sentry_streams/adapters/arroyo/reduce.py | 18 ++---- .../adapters/arroyo/rust_arroyo.py | 59 +++++++------------ .../sentry_streams/metrics/metrics.py | 8 +-- .../sentry_streams/metrics/stats.py | 26 ++++++++ sentry_streams/tests/pipeline/test_metrics.py | 47 +++++++++------ 5 files changed, 87 insertions(+), 71 deletions(-) create mode 100644 sentry_streams/sentry_streams/metrics/stats.py diff --git a/sentry_streams/sentry_streams/adapters/arroyo/reduce.py b/sentry_streams/sentry_streams/adapters/arroyo/reduce.py index 9bbef10f..089b4038 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/reduce.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/reduce.py @@ -19,7 +19,7 @@ from arroyo.types import BaseValue, FilteredPayload, Message, Partition, Value from sentry_streams.adapters.arroyo.routes import Route, RoutedValue -from sentry_streams.metrics import Metric, get_metrics, get_size +from sentry_streams.metrics.stats import get_stats from sentry_streams.pipeline.function_template import ( Accumulator, GroupBy, @@ -82,30 +82,22 @@ def __init__( ) -> None: self.acc = acc() self.start_time: float | None = None - self.metrics = get_metrics() - self.tags = {"step": name} + self.stats = get_stats() + self._step = name def add(self, value: PipelineMessage[InputType]) -> Self: if self.start_time is None: self.start_time = time.time() - self.metrics.increment(Metric.INPUT_MESSAGES, tags=self.tags) - size = get_size(value.payload) - if size is not None: - self.metrics.increment(Metric.INPUT_BYTES, tags=self.tags, value=size) + self.stats.step_exec(self._step) self.acc.add(value) return self def get_value(self) -> OutputType: result = self.acc.get_value() - self.metrics.increment(Metric.OUTPUT_MESSAGES, tags=self.tags) - size = get_size(result) - if size is not None: - self.metrics.increment(Metric.OUTPUT_BYTES, tags=self.tags, value=size) - duration = time.time() - self.start_time if self.start_time is not None else 0 - self.metrics.timing(Metric.DURATION, duration, tags=self.tags) + self.stats.step_timing(self._step, duration) return result def merge(self, other: Self) -> Self: diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 5e367966..7d894730 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -35,12 +35,10 @@ StepConfig, ) from sentry_streams.metrics import ( - Metric, MetricsConfig, configure_metrics, - get_metrics, - get_size, ) +from sentry_streams.metrics.stats import get_stats from sentry_streams.pipeline.function_template import ( InputType, OutputType, @@ -103,42 +101,29 @@ def _metrics_wrapped_function( Module-level wrapper function for adding metrics to step functions. This is defined at module level to be picklable for multiprocessing. """ - msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None - start_time = input_metrics(step_name, msg_size) - has_error = output_size = None + input_metrics(step_name) + has_error = None + start_time = time.time() try: result = application_function(msg) - output_size = get_size(result) return result except Exception as e: has_error = str(e.__class__.__name__) raise e finally: - output_metrics(step_name, has_error, start_time, output_size) + output_metrics(step_name, has_error, start_time) -def input_metrics(name: str, message_size: int | None) -> float: - metrics = get_metrics() - tags = {"step": name} - metrics.increment(Metric.INPUT_MESSAGES, tags=tags) - if message_size is not None: - metrics.increment(Metric.INPUT_BYTES, tags=tags, value=message_size) - return time.time() +def input_metrics(name: str) -> None: + stats = get_stats() + stats.step_exec(name) -def output_metrics( - name: str, error: str | None, start_time: float, message_size: int | None -) -> None: - metrics = get_metrics() - tags = {"step": name} +def output_metrics(name: str, error: str | None, start_time: float) -> None: + stats = get_stats() if error: - tags["error"] = error - metrics.increment(Metric.ERRORS, tags=tags) - - metrics.increment(Metric.OUTPUT_MESSAGES, tags=tags) - if message_size is not None: - metrics.increment(Metric.OUTPUT_BYTES, tags=tags, value=message_size) - metrics.timing(Metric.DURATION, time.time() - start_time, tags=tags) + stats.step_error(name) + stats.step_timing(name, time.time() - start_time) def build_initial_offset(offset_reset: str) -> InitialOffset: @@ -326,11 +311,12 @@ def sink(self, step: Sink[Any], stream: Route) -> Route: # Fix this to wrap the actual step instead of just the object_generator. # This will at least capture the number of calls to the step, if nothing else. def wrapped_generator() -> str: - start_time = input_metrics(step.name, None) + input_metrics(step.name) + start_time = time.time() try: return step.object_generator() finally: - output_metrics(step.name, None, start_time, None) + output_metrics(step.name, None, start_time) logger.info(f"Adding GCS sink: {step.name} to pipeline") self.__consumers[stream.source].add_step( @@ -450,18 +436,17 @@ def filter(self, step: Filter[Any], stream: Route) -> Route: elif isinstance(step, PredicateFilter): def filter_msg(msg: Message[Any]) -> bool: - msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None - start_time = input_metrics(step.name, msg_size) - has_error = output_size = None + input_metrics(step.name) + has_error = None + start_time = time.time() try: result = step.resolved_function(msg) - output_size = get_size(result) return result except Exception as e: has_error = str(e.__class__.__name__) raise e finally: - output_metrics(step.name, has_error, start_time, output_size) + output_metrics(step.name, has_error, start_time) self.__consumers[stream.source].add_step(RuntimeOperator.Filter(route, filter_msg)) return stream @@ -537,8 +522,8 @@ def router( route = RustRoute(stream.source, stream.waypoints) def routing_function(msg: Message[Any]) -> str: - msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None - start_time = input_metrics(step.name, msg_size) + input_metrics(step.name) + start_time = time.time() has_error = None try: waypoint = step.routing_function(msg) @@ -548,7 +533,7 @@ def routing_function(msg: Message[Any]) -> str: has_error = str(e.__class__.__name__) raise e finally: - output_metrics(step.name, has_error, start_time, None) + output_metrics(step.name, has_error, start_time) logger.info(f"Adding router: {step.name} to pipeline") self.__consumers[stream.source].add_step( diff --git a/sentry_streams/sentry_streams/metrics/metrics.py b/sentry_streams/sentry_streams/metrics/metrics.py index 950f8703..cb0d05dc 100644 --- a/sentry_streams/sentry_streams/metrics/metrics.py +++ b/sentry_streams/sentry_streams/metrics/metrics.py @@ -371,7 +371,7 @@ class Metrics: """ def __init__(self, backend: MetricsBackend) -> None: - self.__backend = backend + self._backend = backend def increment( self, @@ -382,19 +382,19 @@ def increment( """ Increments a counter metric by a given value. """ - self.__backend.increment(name.value, value, tags=tags) + self._backend.increment(name.value, value, tags=tags) def gauge(self, name: Metric, value: Union[int, float], tags: Optional[Tags] = None) -> None: """ Sets a gauge metric to the given value. """ - self.__backend.gauge(name.value, value, tags=tags) + self._backend.gauge(name.value, value, tags=tags) def timing(self, name: Metric, value: Union[int, float], tags: Optional[Tags] = None) -> None: """ Records a timing metric. """ - self.__backend.timing(name.value, value, tags=tags) + self._backend.timing(name.value, value, tags=tags) class ArroyoMetricsBackend: diff --git a/sentry_streams/sentry_streams/metrics/stats.py b/sentry_streams/sentry_streams/metrics/stats.py new file mode 100644 index 00000000..747c760b --- /dev/null +++ b/sentry_streams/sentry_streams/metrics/stats.py @@ -0,0 +1,26 @@ +from arroyo.utils.metrics import Metrics, get_metrics + + +class PipielineStats: + + def __init__(self, metrics: Metrics) -> None: + self._metrics = metrics + + def step_exec(self, step: str) -> None: + pass + + def step_error(self, step: str) -> None: + pass + + def step_timing(self, step: str, value: float) -> None: + pass + + +_stats: PipielineStats | None = None + + +def get_stats() -> PipielineStats: + global _stats + if _stats is None: + _stats = PipielineStats(get_metrics()) + return _stats diff --git a/sentry_streams/tests/pipeline/test_metrics.py b/sentry_streams/tests/pipeline/test_metrics.py index f014b162..f0da7e29 100644 --- a/sentry_streams/tests/pipeline/test_metrics.py +++ b/sentry_streams/tests/pipeline/test_metrics.py @@ -20,6 +20,7 @@ MetricsConfig, build_metrics_backend, configure_metrics, + get_metrics, ) @@ -36,9 +37,9 @@ def _buffered_inner_backend(buffered: BufferedMetricsBackend) -> MetricsBackend: @pytest.fixture(autouse=True) def reset_metrics_backend() -> Generator[None, None, None]: - metrics_module._metrics_backend = None + metrics_module._metrics = None yield - metrics_module._metrics_backend = None + metrics_module._metrics = None def test_metric_enum_values() -> None: @@ -368,18 +369,19 @@ def test_configure_metrics_dummy(mock_arroyo_configure: Any) -> None: cfg: MetricsConfig = {"type": "dummy"} configure_metrics(cfg) + metrics = get_metrics() + inner = metrics._backend - wrapped = metrics_module._metrics_backend - assert isinstance(wrapped, BufferedMetricsBackend) - assert isinstance(_buffered_inner_backend(wrapped), DummyMetricsBackend) + assert isinstance(inner, BufferedMetricsBackend) + assert isinstance(_buffered_inner_backend(inner), DummyMetricsBackend) assert ( - object.__getattribute__(wrapped, "_BufferedMetricsBackend__throttle_interval_sec") + object.__getattribute__(inner, "_BufferedMetricsBackend__throttle_interval_sec") == METRICS_FREQUENCY_SEC ) mock_arroyo_configure.assert_called_once() arroyo_backend = mock_arroyo_configure.call_args[0][0] assert isinstance(arroyo_backend, ArroyoMetricsBackend) - assert object.__getattribute__(arroyo_backend, "_ArroyoMetricsBackend__backend") is wrapped + assert object.__getattribute__(arroyo_backend, "_ArroyoMetricsBackend__backend") is inner @patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") @@ -392,24 +394,33 @@ def test_configure_metrics_datadog(mock_dogstatsd: Any, mock_arroyo_configure: A "tags": {}, } - configure_metrics(cfg) + configure_metrics( + cfg, + ) - wrapped = metrics_module._metrics_backend - assert isinstance(wrapped, BufferedMetricsBackend) - assert isinstance(_buffered_inner_backend(wrapped), DatadogMetricsBackend) + metrics = get_metrics() + inner = metrics._backend + assert isinstance(inner, BufferedMetricsBackend) + assert isinstance(_buffered_inner_backend(inner), DatadogMetricsBackend) mock_arroyo_configure.assert_called_once() def test_configure_metrics_already_set() -> None: - configure_metrics({"type": "dummy"}) + configure_metrics( + {"type": "dummy"}, + ) with pytest.raises(AssertionError, match="Metrics is already set"): - configure_metrics({"type": "dummy"}) + configure_metrics( + {"type": "dummy"}, + ) @patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") def test_configure_metrics_force(mock_arroyo_configure: Any) -> None: - configure_metrics({"type": "dummy"}) + configure_metrics( + {"type": "dummy"}, + ) configure_metrics( { "type": "datadog", @@ -420,7 +431,7 @@ def test_configure_metrics_force(mock_arroyo_configure: Any) -> None: force=True, ) - wrapped = metrics_module._metrics_backend + wrapped = get_metrics()._backend assert isinstance(wrapped, BufferedMetricsBackend) assert isinstance(_buffered_inner_backend(wrapped), DatadogMetricsBackend) @@ -429,8 +440,10 @@ def test_configure_metrics_force(mock_arroyo_configure: Any) -> None: def test_configure_metrics_log_uses_config_throttle_interval( mock_arroyo_configure: Any, ) -> None: - configure_metrics({"type": "log", "period_sec": 33.0, "tags": {}}) - wrapped = metrics_module._metrics_backend + configure_metrics( + {"type": "log", "period_sec": 33.0, "tags": {}}, + ) + wrapped = get_metrics()._backend assert isinstance(wrapped, BufferedMetricsBackend) assert ( object.__getattribute__(wrapped, "_BufferedMetricsBackend__throttle_interval_sec") == 33.0 From 17d846b8a4c02154b9e4c9cb05f31a1457943bed Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Sun, 26 Apr 2026 15:06:54 -0700 Subject: [PATCH 3/6] Implement stats --- .../sentry_streams/metrics/stats.py | 38 +++++++- sentry_streams/tests/metrics/test_stats.py | 92 +++++++++++++++++++ 2 files changed, 126 insertions(+), 4 deletions(-) create mode 100644 sentry_streams/tests/metrics/test_stats.py diff --git a/sentry_streams/sentry_streams/metrics/stats.py b/sentry_streams/sentry_streams/metrics/stats.py index 747c760b..70440c30 100644 --- a/sentry_streams/sentry_streams/metrics/stats.py +++ b/sentry_streams/sentry_streams/metrics/stats.py @@ -1,4 +1,10 @@ -from arroyo.utils.metrics import Metrics, get_metrics +import time +from collections import defaultdict + +from sentry_streams.metrics import Metrics, get_metrics +from sentry_streams.metrics.metrics import Metric + +FLUSH_TIME = 10 class PipielineStats: @@ -6,14 +12,38 @@ class PipielineStats: def __init__(self, metrics: Metrics) -> None: self._metrics = metrics + self._exec_buffer: dict[str, int] = defaultdict(int) + self._error_buffer: dict[str, int] = defaultdict(int) + self._timing_buffer: dict[str, float] = defaultdict(float) + + self.__last_flush_time = 0.0 + def step_exec(self, step: str) -> None: - pass + self._exec_buffer[step] += 1 def step_error(self, step: str) -> None: - pass + self._error_buffer[step] += 1 def step_timing(self, step: str, value: float) -> None: - pass + if self._timing_buffer[step] < value: + self._timing_buffer[step] = value + + def _maybe_flush(self) -> None: + if time.time() - self.__last_flush_time >= FLUSH_TIME: + self.__last_flush_time = time.time() + for step, value in self._exec_buffer.items(): + tags = {"step": step} + self._metrics.increment(Metric.INPUT_MESSAGES, value, tags) + for step, value in self._error_buffer.items(): + tags = {"step": step} + self._metrics.increment(Metric.ERRORS, value, tags) + for step, fvalue in self._timing_buffer.items(): + tags = {"step": step} + self._metrics.timing(Metric.DURATION, fvalue, tags) + + self._exec_buffer = defaultdict(int) + self._error_buffer = defaultdict(int) + self._timing_buffer = defaultdict(float) _stats: PipielineStats | None = None diff --git a/sentry_streams/tests/metrics/test_stats.py b/sentry_streams/tests/metrics/test_stats.py new file mode 100644 index 00000000..6dd566fa --- /dev/null +++ b/sentry_streams/tests/metrics/test_stats.py @@ -0,0 +1,92 @@ +from unittest.mock import MagicMock, call, patch + +from sentry_streams.metrics.metrics import DummyMetricsBackend, Metric, Metrics +from sentry_streams.metrics.stats import PipielineStats + + +def _make_stats() -> tuple[PipielineStats, MagicMock]: + """Build :class:`PipielineStats` with a mocked backend; return ``(stats, inner_backend)``.""" + inner = MagicMock(spec=DummyMetricsBackend) + return PipielineStats(Metrics(inner)), inner + + +@patch("time.time") +def test_correct_values_are_flushed( + _mock_time: MagicMock, +) -> None: + """ + After buffered exec / error / timing data, a flush should emit the correct + ``increment`` / ``timing`` calls on the raw metrics backend (string names + tags). + """ + stats, inner = _make_stats() + _mock_time.return_value = 100.0 + stats.step_exec("in_step") + stats.step_exec("in_step") + stats.step_error("err_step") + stats.step_timing("timer_step", 0.1) + stats.step_timing("timer_step", 0.05) # max is 0.1 + _mock_time.return_value = 120.0 + stats._maybe_flush() + inner.increment.assert_has_calls( + [ + call(Metric.INPUT_MESSAGES.value, 2, tags={"step": "in_step"}), + call(Metric.ERRORS.value, 1, tags={"step": "err_step"}), + ], + any_order=True, + ) + inner.timing.assert_called_once_with(Metric.DURATION.value, 0.1, tags={"step": "timer_step"}) + + +@patch("time.time") +def test_no_flush_before_deadline( + _mock_time: MagicMock, +) -> None: + """``_maybe_flush`` is a no-op (no backend calls) until ``FLUSH_TIME`` has passed since last.""" + _mock_time.return_value = 100.0 + stats, inner = _make_stats() + stats._maybe_flush() + stats.step_exec("a") + _mock_time.return_value = 105.0 + stats._maybe_flush() + inner.increment.assert_not_called() + inner.timing.assert_not_called() + # Last flush time is only set on a successful flush + last_flush: float = object.__getattribute__(stats, "_PipielineStats__last_flush_time") + assert last_flush == 100.0 + exec_buf: dict[str, int] = stats._exec_buffer # noqa: SLF001 + assert exec_buf["a"] == 1 + + +@patch("time.time", return_value=0.0) +def test_pipieline_stats_flush_clears_buffers(mock_time: MagicMock) -> None: + """A successful flush clears in-memory counts so a later flush only reports new data.""" + stats, inner = _make_stats() + mock_time.return_value = 100.0 + stats.step_exec("s") + stats._maybe_flush() + inner.reset_mock() + mock_time.return_value = 100.0 + stats._maybe_flush() + inner.assert_not_called() + mock_time.return_value = 110.0 + stats.step_exec("s") + stats._maybe_flush() + inner.increment.assert_called_once_with(Metric.INPUT_MESSAGES.value, 1, tags={"step": "s"}) + + +@patch("time.time", return_value=20.0) +def test_pipieline_stats_multiple_steps_one_flush(_mock_time: MagicMock) -> None: + """One flush can emit several backend calls, one per buffered step (each tag set).""" + stats, inner = _make_stats() + stats.step_exec("step_1") + stats.step_exec("step_1") + stats.step_exec("step_2") + stats._maybe_flush() + assert inner.increment.call_count == 2 + inner.increment.assert_has_calls( + [ + call(Metric.INPUT_MESSAGES.value, 2, tags={"step": "step_1"}), + call(Metric.INPUT_MESSAGES.value, 1, tags={"step": "step_2"}), + ], + any_order=True, + ) From a94272c101d44a71b76a591768ecba007e5e651e Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Sun, 26 Apr 2026 15:08:42 -0700 Subject: [PATCH 4/6] Fix typing --- sentry_streams/tests/test_load_runtime.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_streams/tests/test_load_runtime.py b/sentry_streams/tests/test_load_runtime.py index 36724897..df38e75a 100644 --- a/sentry_streams/tests/test_load_runtime.py +++ b/sentry_streams/tests/test_load_runtime.py @@ -54,11 +54,11 @@ def reset_metrics_backend() -> Generator[None, None, None]: import sentry_streams.metrics.metrics # Reset before test runs (setup) - sentry_streams.metrics.metrics._metrics_backend = None + sentry_streams.metrics.metrics._metrics = None arroyo.utils.metrics._metrics_backend = None yield # Reset after test completes (teardown) - sentry_streams.metrics.metrics._metrics_backend = None + sentry_streams.metrics.metrics._metrics = None arroyo.utils.metrics._metrics_backend = None From 30dc9fc359340d4c7897095bfc4fd0bbf9c4bee7 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Sun, 26 Apr 2026 15:16:46 -0700 Subject: [PATCH 5/6] reuse the metrics adapter --- .../sentry_streams/metrics/metrics.py | 25 ++++++++++++++++--- .../sentry_streams/metrics/stats.py | 6 ++--- sentry_streams/tests/pipeline/test_metrics.py | 2 ++ sentry_streams/tests/test_load_runtime.py | 2 ++ 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/sentry_streams/sentry_streams/metrics/metrics.py b/sentry_streams/sentry_streams/metrics/metrics.py index cb0d05dc..d0c967b7 100644 --- a/sentry_streams/sentry_streams/metrics/metrics.py +++ b/sentry_streams/sentry_streams/metrics/metrics.py @@ -432,6 +432,7 @@ def timing( self.__backend.timing(name, value, tags=_tags_from_mapping(tags)) +_raw_metrics: Optional[Metrics] = None _metrics: Optional[Metrics] = None _dummy_metrics_backend = DummyMetricsBackend() @@ -459,19 +460,27 @@ def configure_metrics(config: MetricsConfig, force: bool = False) -> None: ``spawn`` multiprocessing. """ global _metrics + global _raw_metrics if not force: assert _metrics is None, "Metrics is already set" + assert _raw_metrics is None, "Raw metrics backend is already set" inner = build_metrics_backend(config) - backend = BufferedMetricsBackend( + # TODO: Consider removing the buffered backend entirely now that we have + # pipeline stats. + buffered_backend = BufferedMetricsBackend( inner, throttle_interval_sec=_buffer_throttle_interval_sec(config), ) - _metrics = Metrics(backend) - arroyo_configure_metrics(ArroyoMetricsBackend(backend)) + _metrics = Metrics(buffered_backend) + _raw_metrics = Metrics(inner) + arroyo_configure_metrics(ArroyoMetricsBackend(buffered_backend)) def get_metrics() -> Metrics: + """ + Gets the currently configured buffered metrics adapter. + """ global _metrics if _metrics is None: _metrics = Metrics(_dummy_metrics_backend) @@ -479,6 +488,16 @@ def get_metrics() -> Metrics: return _metrics +def get_raw_metrics() -> Metrics: + """ + Gets the currently configured raw metrics backend without buffering. + """ + global _raw_metrics + if _raw_metrics is None: + _raw_metrics = Metrics(_dummy_metrics_backend) + return _raw_metrics + + def get_size(obj: Any) -> int | None: # TODO: Make this work for all types if isinstance(obj, (str, bytes)): diff --git a/sentry_streams/sentry_streams/metrics/stats.py b/sentry_streams/sentry_streams/metrics/stats.py index 70440c30..d211fe80 100644 --- a/sentry_streams/sentry_streams/metrics/stats.py +++ b/sentry_streams/sentry_streams/metrics/stats.py @@ -1,8 +1,8 @@ import time from collections import defaultdict -from sentry_streams.metrics import Metrics, get_metrics -from sentry_streams.metrics.metrics import Metric +from sentry_streams.metrics import Metrics +from sentry_streams.metrics.metrics import Metric, get_raw_metrics FLUSH_TIME = 10 @@ -52,5 +52,5 @@ def _maybe_flush(self) -> None: def get_stats() -> PipielineStats: global _stats if _stats is None: - _stats = PipielineStats(get_metrics()) + _stats = PipielineStats(get_raw_metrics()) return _stats diff --git a/sentry_streams/tests/pipeline/test_metrics.py b/sentry_streams/tests/pipeline/test_metrics.py index f0da7e29..08eea60d 100644 --- a/sentry_streams/tests/pipeline/test_metrics.py +++ b/sentry_streams/tests/pipeline/test_metrics.py @@ -38,8 +38,10 @@ def _buffered_inner_backend(buffered: BufferedMetricsBackend) -> MetricsBackend: @pytest.fixture(autouse=True) def reset_metrics_backend() -> Generator[None, None, None]: metrics_module._metrics = None + metrics_module._raw_metrics = None yield metrics_module._metrics = None + metrics_module._raw_metrics = None def test_metric_enum_values() -> None: diff --git a/sentry_streams/tests/test_load_runtime.py b/sentry_streams/tests/test_load_runtime.py index df38e75a..04987762 100644 --- a/sentry_streams/tests/test_load_runtime.py +++ b/sentry_streams/tests/test_load_runtime.py @@ -55,10 +55,12 @@ def reset_metrics_backend() -> Generator[None, None, None]: # Reset before test runs (setup) sentry_streams.metrics.metrics._metrics = None + sentry_streams.metrics.metrics._raw_metrics = None arroyo.utils.metrics._metrics_backend = None yield # Reset after test completes (teardown) sentry_streams.metrics.metrics._metrics = None + sentry_streams.metrics.metrics._raw_metrics = None arroyo.utils.metrics._metrics_backend = None From cf3d1e4a6bae721717c62e0c4863c1014ae1148a Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Sun, 26 Apr 2026 15:25:45 -0700 Subject: [PATCH 6/6] Fix flushing bug --- sentry_streams/sentry_streams/metrics/stats.py | 3 +++ sentry_streams/tests/metrics/test_stats.py | 13 ++++++------- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/sentry_streams/sentry_streams/metrics/stats.py b/sentry_streams/sentry_streams/metrics/stats.py index d211fe80..e8dddd07 100644 --- a/sentry_streams/sentry_streams/metrics/stats.py +++ b/sentry_streams/sentry_streams/metrics/stats.py @@ -20,13 +20,16 @@ def __init__(self, metrics: Metrics) -> None: def step_exec(self, step: str) -> None: self._exec_buffer[step] += 1 + self._maybe_flush() def step_error(self, step: str) -> None: self._error_buffer[step] += 1 + self._maybe_flush() def step_timing(self, step: str, value: float) -> None: if self._timing_buffer[step] < value: self._timing_buffer[step] = value + self._maybe_flush() def _maybe_flush(self) -> None: if time.time() - self.__last_flush_time >= FLUSH_TIME: diff --git a/sentry_streams/tests/metrics/test_stats.py b/sentry_streams/tests/metrics/test_stats.py index 6dd566fa..20ad27ae 100644 --- a/sentry_streams/tests/metrics/test_stats.py +++ b/sentry_streams/tests/metrics/test_stats.py @@ -20,13 +20,14 @@ def test_correct_values_are_flushed( """ stats, inner = _make_stats() _mock_time.return_value = 100.0 + stats._maybe_flush() # Flush to set last flush time stats.step_exec("in_step") stats.step_exec("in_step") stats.step_error("err_step") stats.step_timing("timer_step", 0.1) - stats.step_timing("timer_step", 0.05) # max is 0.1 _mock_time.return_value = 120.0 - stats._maybe_flush() + stats.step_timing("timer_step", 0.05) # max is 0.1 + inner.increment.assert_has_calls( [ call(Metric.INPUT_MESSAGES.value, 2, tags={"step": "in_step"}), @@ -47,7 +48,6 @@ def test_no_flush_before_deadline( stats._maybe_flush() stats.step_exec("a") _mock_time.return_value = 105.0 - stats._maybe_flush() inner.increment.assert_not_called() inner.timing.assert_not_called() # Last flush time is only set on a successful flush @@ -63,14 +63,11 @@ def test_pipieline_stats_flush_clears_buffers(mock_time: MagicMock) -> None: stats, inner = _make_stats() mock_time.return_value = 100.0 stats.step_exec("s") - stats._maybe_flush() inner.reset_mock() mock_time.return_value = 100.0 - stats._maybe_flush() inner.assert_not_called() mock_time.return_value = 110.0 stats.step_exec("s") - stats._maybe_flush() inner.increment.assert_called_once_with(Metric.INPUT_MESSAGES.value, 1, tags={"step": "s"}) @@ -78,10 +75,12 @@ def test_pipieline_stats_flush_clears_buffers(mock_time: MagicMock) -> None: def test_pipieline_stats_multiple_steps_one_flush(_mock_time: MagicMock) -> None: """One flush can emit several backend calls, one per buffered step (each tag set).""" stats, inner = _make_stats() + _mock_time.return_value = 100.0 + stats._maybe_flush() # Flush to set last flush time stats.step_exec("step_1") stats.step_exec("step_1") + _mock_time.return_value = 120.0 stats.step_exec("step_2") - stats._maybe_flush() assert inner.increment.call_count == 2 inner.increment.assert_has_calls( [