diff --git a/sentry_streams/sentry_streams/adapters/arroyo/reduce.py b/sentry_streams/sentry_streams/adapters/arroyo/reduce.py index 9bbef10f..089b4038 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/reduce.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/reduce.py @@ -19,7 +19,7 @@ from arroyo.types import BaseValue, FilteredPayload, Message, Partition, Value from sentry_streams.adapters.arroyo.routes import Route, RoutedValue -from sentry_streams.metrics import Metric, get_metrics, get_size +from sentry_streams.metrics.stats import get_stats from sentry_streams.pipeline.function_template import ( Accumulator, GroupBy, @@ -82,30 +82,22 @@ def __init__( ) -> None: self.acc = acc() self.start_time: float | None = None - self.metrics = get_metrics() - self.tags = {"step": name} + self.stats = get_stats() + self._step = name def add(self, value: PipelineMessage[InputType]) -> Self: if self.start_time is None: self.start_time = time.time() - self.metrics.increment(Metric.INPUT_MESSAGES, tags=self.tags) - size = get_size(value.payload) - if size is not None: - self.metrics.increment(Metric.INPUT_BYTES, tags=self.tags, value=size) + self.stats.step_exec(self._step) self.acc.add(value) return self def get_value(self) -> OutputType: result = self.acc.get_value() - self.metrics.increment(Metric.OUTPUT_MESSAGES, tags=self.tags) - size = get_size(result) - if size is not None: - self.metrics.increment(Metric.OUTPUT_BYTES, tags=self.tags, value=size) - duration = time.time() - self.start_time if self.start_time is not None else 0 - self.metrics.timing(Metric.DURATION, duration, tags=self.tags) + self.stats.step_timing(self._step, duration) return result def merge(self, other: Self) -> Self: diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 5e367966..7d894730 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -35,12 +35,10 @@ StepConfig, ) from sentry_streams.metrics import ( - Metric, MetricsConfig, configure_metrics, - get_metrics, - get_size, ) +from sentry_streams.metrics.stats import get_stats from sentry_streams.pipeline.function_template import ( InputType, OutputType, @@ -103,42 +101,29 @@ def _metrics_wrapped_function( Module-level wrapper function for adding metrics to step functions. This is defined at module level to be picklable for multiprocessing. """ - msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None - start_time = input_metrics(step_name, msg_size) - has_error = output_size = None + input_metrics(step_name) + has_error = None + start_time = time.time() try: result = application_function(msg) - output_size = get_size(result) return result except Exception as e: has_error = str(e.__class__.__name__) raise e finally: - output_metrics(step_name, has_error, start_time, output_size) + output_metrics(step_name, has_error, start_time) -def input_metrics(name: str, message_size: int | None) -> float: - metrics = get_metrics() - tags = {"step": name} - metrics.increment(Metric.INPUT_MESSAGES, tags=tags) - if message_size is not None: - metrics.increment(Metric.INPUT_BYTES, tags=tags, value=message_size) - return time.time() +def input_metrics(name: str) -> None: + stats = get_stats() + stats.step_exec(name) -def output_metrics( - name: str, error: str | None, start_time: float, message_size: int | None -) -> None: - metrics = get_metrics() - tags = {"step": name} +def output_metrics(name: str, error: str | None, start_time: float) -> None: + stats = get_stats() if error: - tags["error"] = error - metrics.increment(Metric.ERRORS, tags=tags) - - metrics.increment(Metric.OUTPUT_MESSAGES, tags=tags) - if message_size is not None: - metrics.increment(Metric.OUTPUT_BYTES, tags=tags, value=message_size) - metrics.timing(Metric.DURATION, time.time() - start_time, tags=tags) + stats.step_error(name) + stats.step_timing(name, time.time() - start_time) def build_initial_offset(offset_reset: str) -> InitialOffset: @@ -326,11 +311,12 @@ def sink(self, step: Sink[Any], stream: Route) -> Route: # Fix this to wrap the actual step instead of just the object_generator. # This will at least capture the number of calls to the step, if nothing else. def wrapped_generator() -> str: - start_time = input_metrics(step.name, None) + input_metrics(step.name) + start_time = time.time() try: return step.object_generator() finally: - output_metrics(step.name, None, start_time, None) + output_metrics(step.name, None, start_time) logger.info(f"Adding GCS sink: {step.name} to pipeline") self.__consumers[stream.source].add_step( @@ -450,18 +436,17 @@ def filter(self, step: Filter[Any], stream: Route) -> Route: elif isinstance(step, PredicateFilter): def filter_msg(msg: Message[Any]) -> bool: - msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None - start_time = input_metrics(step.name, msg_size) - has_error = output_size = None + input_metrics(step.name) + has_error = None + start_time = time.time() try: result = step.resolved_function(msg) - output_size = get_size(result) return result except Exception as e: has_error = str(e.__class__.__name__) raise e finally: - output_metrics(step.name, has_error, start_time, output_size) + output_metrics(step.name, has_error, start_time) self.__consumers[stream.source].add_step(RuntimeOperator.Filter(route, filter_msg)) return stream @@ -537,8 +522,8 @@ def router( route = RustRoute(stream.source, stream.waypoints) def routing_function(msg: Message[Any]) -> str: - msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None - start_time = input_metrics(step.name, msg_size) + input_metrics(step.name) + start_time = time.time() has_error = None try: waypoint = step.routing_function(msg) @@ -548,7 +533,7 @@ def routing_function(msg: Message[Any]) -> str: has_error = str(e.__class__.__name__) raise e finally: - output_metrics(step.name, has_error, start_time, None) + output_metrics(step.name, has_error, start_time) logger.info(f"Adding router: {step.name} to pipeline") self.__consumers[stream.source].add_step( diff --git a/sentry_streams/sentry_streams/metrics/metrics.py b/sentry_streams/sentry_streams/metrics/metrics.py index bc2a1f11..d0c967b7 100644 --- a/sentry_streams/sentry_streams/metrics/metrics.py +++ b/sentry_streams/sentry_streams/metrics/metrics.py @@ -371,7 +371,7 @@ class Metrics: """ def __init__(self, backend: MetricsBackend) -> None: - self.__backend = backend + self._backend = backend def increment( self, @@ -382,19 +382,19 @@ def increment( """ Increments a counter metric by a given value. """ - self.__backend.increment(name.value, value, tags=tags) + self._backend.increment(name.value, value, tags=tags) def gauge(self, name: Metric, value: Union[int, float], tags: Optional[Tags] = None) -> None: """ Sets a gauge metric to the given value. """ - self.__backend.gauge(name.value, value, tags=tags) + self._backend.gauge(name.value, value, tags=tags) def timing(self, name: Metric, value: Union[int, float], tags: Optional[Tags] = None) -> None: """ Records a timing metric. """ - self.__backend.timing(name.value, value, tags=tags) + self._backend.timing(name.value, value, tags=tags) class ArroyoMetricsBackend: @@ -432,7 +432,8 @@ def timing( self.__backend.timing(name, value, tags=_tags_from_mapping(tags)) -_metrics_backend: Optional[MetricsBackend] = None +_raw_metrics: Optional[Metrics] = None +_metrics: Optional[Metrics] = None _dummy_metrics_backend = DummyMetricsBackend() @@ -458,22 +459,43 @@ def configure_metrics(config: MetricsConfig, force: bool = False) -> None: ``config.json``) so worker processes can rebuild the same backends under ``spawn`` multiprocessing. """ - global _metrics_backend + global _metrics + global _raw_metrics if not force: - assert _metrics_backend is None, "Metrics is already set" + assert _metrics is None, "Metrics is already set" + assert _raw_metrics is None, "Raw metrics backend is already set" inner = build_metrics_backend(config) - _metrics_backend = BufferedMetricsBackend( + # TODO: Consider removing the buffered backend entirely now that we have + # pipeline stats. + buffered_backend = BufferedMetricsBackend( inner, throttle_interval_sec=_buffer_throttle_interval_sec(config), ) - arroyo_configure_metrics(ArroyoMetricsBackend(_metrics_backend)) + _metrics = Metrics(buffered_backend) + _raw_metrics = Metrics(inner) + arroyo_configure_metrics(ArroyoMetricsBackend(buffered_backend)) def get_metrics() -> Metrics: - if _metrics_backend is None: - return Metrics(_dummy_metrics_backend) - return Metrics(_metrics_backend) + """ + Gets the currently configured buffered metrics adapter. + """ + global _metrics + if _metrics is None: + _metrics = Metrics(_dummy_metrics_backend) + + return _metrics + + +def get_raw_metrics() -> Metrics: + """ + Gets the currently configured raw metrics backend without buffering. + """ + global _raw_metrics + if _raw_metrics is None: + _raw_metrics = Metrics(_dummy_metrics_backend) + return _raw_metrics def get_size(obj: Any) -> int | None: diff --git a/sentry_streams/sentry_streams/metrics/stats.py b/sentry_streams/sentry_streams/metrics/stats.py new file mode 100644 index 00000000..e8dddd07 --- /dev/null +++ b/sentry_streams/sentry_streams/metrics/stats.py @@ -0,0 +1,59 @@ +import time +from collections import defaultdict + +from sentry_streams.metrics import Metrics +from sentry_streams.metrics.metrics import Metric, get_raw_metrics + +FLUSH_TIME = 10 + + +class PipielineStats: + + def __init__(self, metrics: Metrics) -> None: + self._metrics = metrics + + self._exec_buffer: dict[str, int] = defaultdict(int) + self._error_buffer: dict[str, int] = defaultdict(int) + self._timing_buffer: dict[str, float] = defaultdict(float) + + self.__last_flush_time = 0.0 + + def step_exec(self, step: str) -> None: + self._exec_buffer[step] += 1 + self._maybe_flush() + + def step_error(self, step: str) -> None: + self._error_buffer[step] += 1 + self._maybe_flush() + + def step_timing(self, step: str, value: float) -> None: + if self._timing_buffer[step] < value: + self._timing_buffer[step] = value + self._maybe_flush() + + def _maybe_flush(self) -> None: + if time.time() - self.__last_flush_time >= FLUSH_TIME: + self.__last_flush_time = time.time() + for step, value in self._exec_buffer.items(): + tags = {"step": step} + self._metrics.increment(Metric.INPUT_MESSAGES, value, tags) + for step, value in self._error_buffer.items(): + tags = {"step": step} + self._metrics.increment(Metric.ERRORS, value, tags) + for step, fvalue in self._timing_buffer.items(): + tags = {"step": step} + self._metrics.timing(Metric.DURATION, fvalue, tags) + + self._exec_buffer = defaultdict(int) + self._error_buffer = defaultdict(int) + self._timing_buffer = defaultdict(float) + + +_stats: PipielineStats | None = None + + +def get_stats() -> PipielineStats: + global _stats + if _stats is None: + _stats = PipielineStats(get_raw_metrics()) + return _stats diff --git a/sentry_streams/tests/metrics/test_stats.py b/sentry_streams/tests/metrics/test_stats.py new file mode 100644 index 00000000..20ad27ae --- /dev/null +++ b/sentry_streams/tests/metrics/test_stats.py @@ -0,0 +1,91 @@ +from unittest.mock import MagicMock, call, patch + +from sentry_streams.metrics.metrics import DummyMetricsBackend, Metric, Metrics +from sentry_streams.metrics.stats import PipielineStats + + +def _make_stats() -> tuple[PipielineStats, MagicMock]: + """Build :class:`PipielineStats` with a mocked backend; return ``(stats, inner_backend)``.""" + inner = MagicMock(spec=DummyMetricsBackend) + return PipielineStats(Metrics(inner)), inner + + +@patch("time.time") +def test_correct_values_are_flushed( + _mock_time: MagicMock, +) -> None: + """ + After buffered exec / error / timing data, a flush should emit the correct + ``increment`` / ``timing`` calls on the raw metrics backend (string names + tags). + """ + stats, inner = _make_stats() + _mock_time.return_value = 100.0 + stats._maybe_flush() # Flush to set last flush time + stats.step_exec("in_step") + stats.step_exec("in_step") + stats.step_error("err_step") + stats.step_timing("timer_step", 0.1) + _mock_time.return_value = 120.0 + stats.step_timing("timer_step", 0.05) # max is 0.1 + + inner.increment.assert_has_calls( + [ + call(Metric.INPUT_MESSAGES.value, 2, tags={"step": "in_step"}), + call(Metric.ERRORS.value, 1, tags={"step": "err_step"}), + ], + any_order=True, + ) + inner.timing.assert_called_once_with(Metric.DURATION.value, 0.1, tags={"step": "timer_step"}) + + +@patch("time.time") +def test_no_flush_before_deadline( + _mock_time: MagicMock, +) -> None: + """``_maybe_flush`` is a no-op (no backend calls) until ``FLUSH_TIME`` has passed since last.""" + _mock_time.return_value = 100.0 + stats, inner = _make_stats() + stats._maybe_flush() + stats.step_exec("a") + _mock_time.return_value = 105.0 + inner.increment.assert_not_called() + inner.timing.assert_not_called() + # Last flush time is only set on a successful flush + last_flush: float = object.__getattribute__(stats, "_PipielineStats__last_flush_time") + assert last_flush == 100.0 + exec_buf: dict[str, int] = stats._exec_buffer # noqa: SLF001 + assert exec_buf["a"] == 1 + + +@patch("time.time", return_value=0.0) +def test_pipieline_stats_flush_clears_buffers(mock_time: MagicMock) -> None: + """A successful flush clears in-memory counts so a later flush only reports new data.""" + stats, inner = _make_stats() + mock_time.return_value = 100.0 + stats.step_exec("s") + inner.reset_mock() + mock_time.return_value = 100.0 + inner.assert_not_called() + mock_time.return_value = 110.0 + stats.step_exec("s") + inner.increment.assert_called_once_with(Metric.INPUT_MESSAGES.value, 1, tags={"step": "s"}) + + +@patch("time.time", return_value=20.0) +def test_pipieline_stats_multiple_steps_one_flush(_mock_time: MagicMock) -> None: + """One flush can emit several backend calls, one per buffered step (each tag set).""" + stats, inner = _make_stats() + _mock_time.return_value = 100.0 + stats._maybe_flush() # Flush to set last flush time + stats.step_exec("step_1") + stats.step_exec("step_1") + _mock_time.return_value = 120.0 + stats.step_exec("step_2") + assert inner.increment.call_count == 2 + inner.increment.assert_has_calls( + [ + call(Metric.INPUT_MESSAGES.value, 2, tags={"step": "step_1"}), + call(Metric.INPUT_MESSAGES.value, 1, tags={"step": "step_2"}), + ], + any_order=True, + ) diff --git a/sentry_streams/tests/pipeline/test_metrics.py b/sentry_streams/tests/pipeline/test_metrics.py index f014b162..08eea60d 100644 --- a/sentry_streams/tests/pipeline/test_metrics.py +++ b/sentry_streams/tests/pipeline/test_metrics.py @@ -20,6 +20,7 @@ MetricsConfig, build_metrics_backend, configure_metrics, + get_metrics, ) @@ -36,9 +37,11 @@ def _buffered_inner_backend(buffered: BufferedMetricsBackend) -> MetricsBackend: @pytest.fixture(autouse=True) def reset_metrics_backend() -> Generator[None, None, None]: - metrics_module._metrics_backend = None + metrics_module._metrics = None + metrics_module._raw_metrics = None yield - metrics_module._metrics_backend = None + metrics_module._metrics = None + metrics_module._raw_metrics = None def test_metric_enum_values() -> None: @@ -368,18 +371,19 @@ def test_configure_metrics_dummy(mock_arroyo_configure: Any) -> None: cfg: MetricsConfig = {"type": "dummy"} configure_metrics(cfg) + metrics = get_metrics() + inner = metrics._backend - wrapped = metrics_module._metrics_backend - assert isinstance(wrapped, BufferedMetricsBackend) - assert isinstance(_buffered_inner_backend(wrapped), DummyMetricsBackend) + assert isinstance(inner, BufferedMetricsBackend) + assert isinstance(_buffered_inner_backend(inner), DummyMetricsBackend) assert ( - object.__getattribute__(wrapped, "_BufferedMetricsBackend__throttle_interval_sec") + object.__getattribute__(inner, "_BufferedMetricsBackend__throttle_interval_sec") == METRICS_FREQUENCY_SEC ) mock_arroyo_configure.assert_called_once() arroyo_backend = mock_arroyo_configure.call_args[0][0] assert isinstance(arroyo_backend, ArroyoMetricsBackend) - assert object.__getattribute__(arroyo_backend, "_ArroyoMetricsBackend__backend") is wrapped + assert object.__getattribute__(arroyo_backend, "_ArroyoMetricsBackend__backend") is inner @patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") @@ -392,24 +396,33 @@ def test_configure_metrics_datadog(mock_dogstatsd: Any, mock_arroyo_configure: A "tags": {}, } - configure_metrics(cfg) + configure_metrics( + cfg, + ) - wrapped = metrics_module._metrics_backend - assert isinstance(wrapped, BufferedMetricsBackend) - assert isinstance(_buffered_inner_backend(wrapped), DatadogMetricsBackend) + metrics = get_metrics() + inner = metrics._backend + assert isinstance(inner, BufferedMetricsBackend) + assert isinstance(_buffered_inner_backend(inner), DatadogMetricsBackend) mock_arroyo_configure.assert_called_once() def test_configure_metrics_already_set() -> None: - configure_metrics({"type": "dummy"}) + configure_metrics( + {"type": "dummy"}, + ) with pytest.raises(AssertionError, match="Metrics is already set"): - configure_metrics({"type": "dummy"}) + configure_metrics( + {"type": "dummy"}, + ) @patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") def test_configure_metrics_force(mock_arroyo_configure: Any) -> None: - configure_metrics({"type": "dummy"}) + configure_metrics( + {"type": "dummy"}, + ) configure_metrics( { "type": "datadog", @@ -420,7 +433,7 @@ def test_configure_metrics_force(mock_arroyo_configure: Any) -> None: force=True, ) - wrapped = metrics_module._metrics_backend + wrapped = get_metrics()._backend assert isinstance(wrapped, BufferedMetricsBackend) assert isinstance(_buffered_inner_backend(wrapped), DatadogMetricsBackend) @@ -429,8 +442,10 @@ def test_configure_metrics_force(mock_arroyo_configure: Any) -> None: def test_configure_metrics_log_uses_config_throttle_interval( mock_arroyo_configure: Any, ) -> None: - configure_metrics({"type": "log", "period_sec": 33.0, "tags": {}}) - wrapped = metrics_module._metrics_backend + configure_metrics( + {"type": "log", "period_sec": 33.0, "tags": {}}, + ) + wrapped = get_metrics()._backend assert isinstance(wrapped, BufferedMetricsBackend) assert ( object.__getattribute__(wrapped, "_BufferedMetricsBackend__throttle_interval_sec") == 33.0 diff --git a/sentry_streams/tests/test_load_runtime.py b/sentry_streams/tests/test_load_runtime.py index 36724897..04987762 100644 --- a/sentry_streams/tests/test_load_runtime.py +++ b/sentry_streams/tests/test_load_runtime.py @@ -54,11 +54,13 @@ def reset_metrics_backend() -> Generator[None, None, None]: import sentry_streams.metrics.metrics # Reset before test runs (setup) - sentry_streams.metrics.metrics._metrics_backend = None + sentry_streams.metrics.metrics._metrics = None + sentry_streams.metrics.metrics._raw_metrics = None arroyo.utils.metrics._metrics_backend = None yield # Reset after test completes (teardown) - sentry_streams.metrics.metrics._metrics_backend = None + sentry_streams.metrics.metrics._metrics = None + sentry_streams.metrics.metrics._raw_metrics = None arroyo.utils.metrics._metrics_backend = None