Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions sentry_streams/sentry_streams/adapters/arroyo/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from arroyo.types import BaseValue, FilteredPayload, Message, Partition, Value

from sentry_streams.adapters.arroyo.routes import Route, RoutedValue
from sentry_streams.metrics import Metric, get_metrics, get_size
from sentry_streams.metrics.stats import get_stats
from sentry_streams.pipeline.function_template import (
Accumulator,
GroupBy,
Expand Down Expand Up @@ -82,30 +82,22 @@ def __init__(
) -> None:
self.acc = acc()
self.start_time: float | None = None
self.metrics = get_metrics()
self.tags = {"step": name}
self.stats = get_stats()
self._step = name

def add(self, value: PipelineMessage[InputType]) -> Self:
if self.start_time is None:
self.start_time = time.time()
self.metrics.increment(Metric.INPUT_MESSAGES, tags=self.tags)
size = get_size(value.payload)
if size is not None:
self.metrics.increment(Metric.INPUT_BYTES, tags=self.tags, value=size)

self.stats.step_exec(self._step)
self.acc.add(value)

return self

def get_value(self) -> OutputType:
result = self.acc.get_value()
self.metrics.increment(Metric.OUTPUT_MESSAGES, tags=self.tags)
size = get_size(result)
if size is not None:
self.metrics.increment(Metric.OUTPUT_BYTES, tags=self.tags, value=size)

duration = time.time() - self.start_time if self.start_time is not None else 0
self.metrics.timing(Metric.DURATION, duration, tags=self.tags)
self.stats.step_timing(self._step, duration)
return result

def merge(self, other: Self) -> Self:
Expand Down
59 changes: 22 additions & 37 deletions sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,10 @@
StepConfig,
)
from sentry_streams.metrics import (
Metric,
MetricsConfig,
configure_metrics,
get_metrics,
get_size,
)
from sentry_streams.metrics.stats import get_stats
from sentry_streams.pipeline.function_template import (
InputType,
OutputType,
Expand Down Expand Up @@ -103,42 +101,29 @@ def _metrics_wrapped_function(
Module-level wrapper function for adding metrics to step functions.
This is defined at module level to be picklable for multiprocessing.
"""
msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None
start_time = input_metrics(step_name, msg_size)
has_error = output_size = None
input_metrics(step_name)
has_error = None
start_time = time.time()
try:
result = application_function(msg)
output_size = get_size(result)
return result
except Exception as e:
has_error = str(e.__class__.__name__)
raise e
finally:
output_metrics(step_name, has_error, start_time, output_size)
output_metrics(step_name, has_error, start_time)


def input_metrics(name: str, message_size: int | None) -> float:
metrics = get_metrics()
tags = {"step": name}
metrics.increment(Metric.INPUT_MESSAGES, tags=tags)
if message_size is not None:
metrics.increment(Metric.INPUT_BYTES, tags=tags, value=message_size)
return time.time()
def input_metrics(name: str) -> None:
stats = get_stats()
stats.step_exec(name)


def output_metrics(
name: str, error: str | None, start_time: float, message_size: int | None
) -> None:
metrics = get_metrics()
tags = {"step": name}
def output_metrics(name: str, error: str | None, start_time: float) -> None:
stats = get_stats()
if error:
tags["error"] = error
metrics.increment(Metric.ERRORS, tags=tags)

metrics.increment(Metric.OUTPUT_MESSAGES, tags=tags)
if message_size is not None:
metrics.increment(Metric.OUTPUT_BYTES, tags=tags, value=message_size)
metrics.timing(Metric.DURATION, time.time() - start_time, tags=tags)
stats.step_error(name)
stats.step_timing(name, time.time() - start_time)


def build_initial_offset(offset_reset: str) -> InitialOffset:
Expand Down Expand Up @@ -326,11 +311,12 @@ def sink(self, step: Sink[Any], stream: Route) -> Route:
# Fix this to wrap the actual step instead of just the object_generator.
# This will at least capture the number of calls to the step, if nothing else.
def wrapped_generator() -> str:
start_time = input_metrics(step.name, None)
input_metrics(step.name)
start_time = time.time()
try:
return step.object_generator()
finally:
output_metrics(step.name, None, start_time, None)
output_metrics(step.name, None, start_time)

logger.info(f"Adding GCS sink: {step.name} to pipeline")
self.__consumers[stream.source].add_step(
Expand Down Expand Up @@ -450,18 +436,17 @@ def filter(self, step: Filter[Any], stream: Route) -> Route:
elif isinstance(step, PredicateFilter):

def filter_msg(msg: Message[Any]) -> bool:
msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None
start_time = input_metrics(step.name, msg_size)
has_error = output_size = None
input_metrics(step.name)
has_error = None
start_time = time.time()
try:
result = step.resolved_function(msg)
output_size = get_size(result)
return result
except Exception as e:
has_error = str(e.__class__.__name__)
raise e
finally:
output_metrics(step.name, has_error, start_time, output_size)
output_metrics(step.name, has_error, start_time)

self.__consumers[stream.source].add_step(RuntimeOperator.Filter(route, filter_msg))
return stream
Expand Down Expand Up @@ -537,8 +522,8 @@ def router(
route = RustRoute(stream.source, stream.waypoints)

def routing_function(msg: Message[Any]) -> str:
msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None
start_time = input_metrics(step.name, msg_size)
input_metrics(step.name)
start_time = time.time()
has_error = None
try:
waypoint = step.routing_function(msg)
Expand All @@ -548,7 +533,7 @@ def routing_function(msg: Message[Any]) -> str:
has_error = str(e.__class__.__name__)
raise e
finally:
output_metrics(step.name, has_error, start_time, None)
output_metrics(step.name, has_error, start_time)

logger.info(f"Adding router: {step.name} to pipeline")
self.__consumers[stream.source].add_step(
Expand Down
46 changes: 34 additions & 12 deletions sentry_streams/sentry_streams/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ class Metrics:
"""

def __init__(self, backend: MetricsBackend) -> None:
self.__backend = backend
self._backend = backend

def increment(
self,
Expand All @@ -382,19 +382,19 @@ def increment(
"""
Increments a counter metric by a given value.
"""
self.__backend.increment(name.value, value, tags=tags)
self._backend.increment(name.value, value, tags=tags)

def gauge(self, name: Metric, value: Union[int, float], tags: Optional[Tags] = None) -> None:
"""
Sets a gauge metric to the given value.
"""
self.__backend.gauge(name.value, value, tags=tags)
self._backend.gauge(name.value, value, tags=tags)

def timing(self, name: Metric, value: Union[int, float], tags: Optional[Tags] = None) -> None:
"""
Records a timing metric.
"""
self.__backend.timing(name.value, value, tags=tags)
self._backend.timing(name.value, value, tags=tags)


class ArroyoMetricsBackend:
Expand Down Expand Up @@ -432,7 +432,8 @@ def timing(
self.__backend.timing(name, value, tags=_tags_from_mapping(tags))


_metrics_backend: Optional[MetricsBackend] = None
_raw_metrics: Optional[Metrics] = None
_metrics: Optional[Metrics] = None
_dummy_metrics_backend = DummyMetricsBackend()


Expand All @@ -458,22 +459,43 @@ def configure_metrics(config: MetricsConfig, force: bool = False) -> None:
``config.json``) so worker processes can rebuild the same backends under
``spawn`` multiprocessing.
"""
global _metrics_backend
global _metrics
global _raw_metrics
if not force:
assert _metrics_backend is None, "Metrics is already set"
assert _metrics is None, "Metrics is already set"
assert _raw_metrics is None, "Raw metrics backend is already set"

inner = build_metrics_backend(config)
_metrics_backend = BufferedMetricsBackend(
# TODO: Consider removing the buffered backend entirely now that we have
# pipeline stats.
buffered_backend = BufferedMetricsBackend(
inner,
throttle_interval_sec=_buffer_throttle_interval_sec(config),
)
arroyo_configure_metrics(ArroyoMetricsBackend(_metrics_backend))
_metrics = Metrics(buffered_backend)
_raw_metrics = Metrics(inner)
arroyo_configure_metrics(ArroyoMetricsBackend(buffered_backend))


def get_metrics() -> Metrics:
if _metrics_backend is None:
return Metrics(_dummy_metrics_backend)
return Metrics(_metrics_backend)
"""
Gets the currently configured buffered metrics adapter.
"""
global _metrics
if _metrics is None:
_metrics = Metrics(_dummy_metrics_backend)

return _metrics
Comment thread
fpacifici marked this conversation as resolved.


def get_raw_metrics() -> Metrics:
"""
Gets the currently configured raw metrics backend without buffering.
"""
global _raw_metrics
if _raw_metrics is None:
_raw_metrics = Metrics(_dummy_metrics_backend)
return _raw_metrics


def get_size(obj: Any) -> int | None:
Expand Down
59 changes: 59 additions & 0 deletions sentry_streams/sentry_streams/metrics/stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import time
from collections import defaultdict

from sentry_streams.metrics import Metrics
from sentry_streams.metrics.metrics import Metric, get_raw_metrics

FLUSH_TIME = 10


class PipielineStats:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class name PipielineStats is misspelled

Low Severity

The newly introduced class is named PipielineStats (extra i) instead of PipelineStats, and the misspelling has propagated into the global type annotation, the get_stats() return type, the test imports, and the name-mangled attribute string _PipielineStats__last_flush_time. Renaming after wider adoption is harder than fixing it now while the symbol is still internal.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit cf3d1e4. Configure here.


def __init__(self, metrics: Metrics) -> None:
self._metrics = metrics

self._exec_buffer: dict[str, int] = defaultdict(int)
self._error_buffer: dict[str, int] = defaultdict(int)
self._timing_buffer: dict[str, float] = defaultdict(float)

self.__last_flush_time = 0.0

def step_exec(self, step: str) -> None:
self._exec_buffer[step] += 1
self._maybe_flush()

def step_error(self, step: str) -> None:
self._error_buffer[step] += 1
self._maybe_flush()

def step_timing(self, step: str, value: float) -> None:
if self._timing_buffer[step] < value:
self._timing_buffer[step] = value
self._maybe_flush()

def _maybe_flush(self) -> None:
if time.time() - self.__last_flush_time >= FLUSH_TIME:
self.__last_flush_time = time.time()
for step, value in self._exec_buffer.items():
tags = {"step": step}
self._metrics.increment(Metric.INPUT_MESSAGES, value, tags)
for step, value in self._error_buffer.items():
tags = {"step": step}
self._metrics.increment(Metric.ERRORS, value, tags)
for step, fvalue in self._timing_buffer.items():
tags = {"step": step}
self._metrics.timing(Metric.DURATION, fvalue, tags)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

step_timing reports max-of-window instead of distribution

Medium Severity

step_timing keeps only the largest value seen per step in a 10-second window and then emits it via Metrics.timing(Metric.DURATION, …). Downstream backends (Datadog dogstatsd timing, log backend) treat each timing sample as a histogram observation, so dashboards and percentiles built on duration will now see a single max-per-window sample instead of the previous per-call distribution. Effective p50/p95/p99 will collapse toward the max and the call rate carried by the timing metric disappears.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit cf3d1e4. Configure here.


self._exec_buffer = defaultdict(int)
self._error_buffer = defaultdict(int)
self._timing_buffer = defaultdict(float)
Comment thread
sentry[bot] marked this conversation as resolved.
Comment thread
cursor[bot] marked this conversation as resolved.


_stats: PipielineStats | None = None


def get_stats() -> PipielineStats:
global _stats
if _stats is None:
_stats = PipielineStats(get_raw_metrics())
return _stats
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_stats retains stale raw metrics after reconfigure

Medium Severity

The _stats singleton captures the Metrics instance from get_raw_metrics() on its first call. When configure_metrics(force=True) later updates the global metrics backend, _stats continues to use the stale Metrics object. This routes subsequent pipeline stats to the wrong backend and causes test isolation issues.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit cf3d1e4. Configure here.

Loading
Loading