-
-
Notifications
You must be signed in to change notification settings - Fork 0
Reduce the impact of metrics on throughput #304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
|
|
||
| import functools | ||
| import logging | ||
| import random | ||
| import time | ||
| from dataclasses import replace | ||
| from typing import ( | ||
|
|
@@ -39,7 +40,6 @@ | |
| MetricsConfig, | ||
| configure_metrics, | ||
| get_metrics, | ||
| get_size, | ||
| ) | ||
| from sentry_streams.pipeline.function_template import ( | ||
| InputType, | ||
|
|
@@ -97,47 +97,50 @@ def initializer(metrics_config: MetricsConfig) -> None: | |
|
|
||
|
|
||
| def _metrics_wrapped_function( | ||
| step_name: str, application_function: Callable[[Message[Any]], Any], msg: Message[Any] | ||
| step_name: str, | ||
| sample_rate: float, | ||
| application_function: Callable[[Message[Any]], Any], | ||
| msg: Message[Any], | ||
| ) -> Any: | ||
| """ | ||
| Module-level wrapper function for adding metrics to step functions. | ||
| This is defined at module level to be picklable for multiprocessing. | ||
| """ | ||
| msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None | ||
| start_time = input_metrics(step_name, msg_size) | ||
| has_error = output_size = None | ||
| input_metrics(step_name, sample_rate) | ||
| has_error = None | ||
| start_time = time.time() | ||
| try: | ||
| result = application_function(msg) | ||
| output_size = get_size(result) | ||
| return result | ||
| except Exception as e: | ||
| has_error = str(e.__class__.__name__) | ||
| raise e | ||
| finally: | ||
| output_metrics(step_name, has_error, start_time, output_size) | ||
| output_metrics(step_name, has_error, start_time, sample_rate) | ||
|
|
||
|
|
||
| def input_metrics(name: str, message_size: int | None) -> float: | ||
| def input_metrics(name: str, sample_rate: float) -> None: | ||
| if random.random() > sample_rate: | ||
| return | ||
| metrics = get_metrics() | ||
| tags = {"step": name} | ||
| metrics.increment(Metric.INPUT_MESSAGES, tags=tags) | ||
| if message_size is not None: | ||
| metrics.increment(Metric.INPUT_BYTES, tags=tags, value=message_size) | ||
| return time.time() | ||
| metrics.increment(Metric.INPUT_MESSAGES, value=1 / sample_rate, tags=tags) | ||
|
|
||
|
|
||
| def output_metrics( | ||
| name: str, error: str | None, start_time: float, message_size: int | None | ||
| name: str, | ||
| error: str | None, | ||
| start_time: float, | ||
| sample_rate: float, | ||
| ) -> None: | ||
| if random.random() > sample_rate: | ||
| return | ||
| metrics = get_metrics() | ||
| tags = {"step": name} | ||
| if error: | ||
| tags["error"] = error | ||
| metrics.increment(Metric.ERRORS, tags=tags) | ||
|
|
||
| metrics.increment(Metric.OUTPUT_MESSAGES, tags=tags) | ||
| if message_size is not None: | ||
| metrics.increment(Metric.OUTPUT_BYTES, tags=tags, value=message_size) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error counter not scaled to compensate for samplingHigh Severity The Reviewed by Cursor Bugbot for commit 6e7d58f. Configure here. |
||
| metrics.timing(Metric.DURATION, time.time() - start_time, tags=tags) | ||
|
|
||
|
|
||
|
|
@@ -237,13 +240,15 @@ def __init__( | |
| steps_config: Mapping[str, StepConfig], | ||
| metrics_config: MetricsConfig, | ||
| write_healthcheck: bool = False, | ||
| metrics_sample_rate: float = 0.1, | ||
| ) -> None: | ||
| super().__init__() | ||
| self.steps_config = steps_config | ||
| self.__metrics_config = metrics_config | ||
| self.__write_healthcheck = write_healthcheck | ||
| self.__consumers: MutableMapping[str, ArroyoConsumer] = {} | ||
| self.__chains = TransformChains() | ||
| self.__metrics_sample_rate = metrics_sample_rate | ||
|
|
||
| @classmethod | ||
| def build( # type: ignore[override] | ||
|
|
@@ -255,8 +260,9 @@ def build( # type: ignore[override] | |
| adapter_config = config.get("adapter_config") or {} | ||
| arroyo_config = adapter_config.get("arroyo") or {} | ||
| write_healthcheck = bool(arroyo_config.get("write_healthcheck", False)) | ||
| metrics_sample_rate = float(arroyo_config.get("metrics_sample_rate", 0.1)) | ||
|
|
||
| return cls(steps_config, metrics_config, write_healthcheck) | ||
| return cls(steps_config, metrics_config, write_healthcheck, metrics_sample_rate) | ||
|
|
||
| def __close_chain(self, stream: Route) -> None: | ||
| if self.__chains.exists(stream): | ||
|
|
@@ -326,11 +332,12 @@ def sink(self, step: Sink[Any], stream: Route) -> Route: | |
| # Fix this to wrap the actual step instead of just the object_generator. | ||
| # This will at least capture the number of calls to the step, if nothing else. | ||
| def wrapped_generator() -> str: | ||
| start_time = input_metrics(step.name, None) | ||
| start_time = time.time() | ||
| input_metrics(step.name, self.__metrics_sample_rate) | ||
| try: | ||
| return step.object_generator() | ||
| finally: | ||
| output_metrics(step.name, None, start_time, None) | ||
| output_metrics(step.name, None, start_time, self.__metrics_sample_rate) | ||
|
|
||
| logger.info(f"Adding GCS sink: {step.name} to pipeline") | ||
| self.__consumers[stream.source].add_step( | ||
|
|
@@ -381,7 +388,7 @@ def map(self, step: Map[Any, Any], stream: Route) -> Route: | |
| application_function = step.resolved_function | ||
|
|
||
| wrapped_function = functools.partial( | ||
| _metrics_wrapped_function, step.name, application_function | ||
| _metrics_wrapped_function, step.name, self.__metrics_sample_rate, application_function | ||
| ) | ||
|
|
||
| step = replace(step, function=wrapped_function) | ||
|
|
@@ -450,18 +457,17 @@ def filter(self, step: Filter[Any], stream: Route) -> Route: | |
| elif isinstance(step, PredicateFilter): | ||
|
|
||
| def filter_msg(msg: Message[Any]) -> bool: | ||
| msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None | ||
| start_time = input_metrics(step.name, msg_size) | ||
| has_error = output_size = None | ||
| input_metrics(step.name, self.__metrics_sample_rate) | ||
| has_error = None | ||
| start_time = time.time() | ||
| try: | ||
| result = step.resolved_function(msg) | ||
| output_size = get_size(result) | ||
| return result | ||
| except Exception as e: | ||
| has_error = str(e.__class__.__name__) | ||
| raise e | ||
| finally: | ||
| output_metrics(step.name, has_error, start_time, output_size) | ||
| output_metrics(step.name, has_error, start_time, self.__metrics_sample_rate) | ||
|
|
||
| self.__consumers[stream.source].add_step(RuntimeOperator.Filter(route, filter_msg)) | ||
| return stream | ||
|
|
@@ -537,8 +543,8 @@ def router( | |
| route = RustRoute(stream.source, stream.waypoints) | ||
|
|
||
| def routing_function(msg: Message[Any]) -> str: | ||
| msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None | ||
| start_time = input_metrics(step.name, msg_size) | ||
| input_metrics(step.name, self.__metrics_sample_rate) | ||
| start_time = time.time() | ||
| has_error = None | ||
| try: | ||
| waypoint = step.routing_function(msg) | ||
|
|
@@ -548,7 +554,7 @@ def routing_function(msg: Message[Any]) -> str: | |
| has_error = str(e.__class__.__name__) | ||
| raise e | ||
| finally: | ||
| output_metrics(step.name, has_error, start_time, None) | ||
| output_metrics(step.name, has_error, start_time, self.__metrics_sample_rate) | ||
|
|
||
| logger.info(f"Adding router: {step.name} to pipeline") | ||
| self.__consumers[stream.source].add_step( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -432,7 +432,7 @@ def timing( | |
| self.__backend.timing(name, value, tags=_tags_from_mapping(tags)) | ||
|
|
||
|
|
||
| _metrics_backend: Optional[MetricsBackend] = None | ||
| _metrics: Optional[Metrics] = None | ||
| _dummy_metrics_backend = DummyMetricsBackend() | ||
|
|
||
|
|
||
|
|
@@ -458,22 +458,25 @@ def configure_metrics(config: MetricsConfig, force: bool = False) -> None: | |
| ``config.json``) so worker processes can rebuild the same backends under | ||
| ``spawn`` multiprocessing. | ||
| """ | ||
| global _metrics_backend | ||
| global _metrics | ||
| if not force: | ||
| assert _metrics_backend is None, "Metrics is already set" | ||
| assert _metrics is None, "Metrics is already set" | ||
|
|
||
| inner = build_metrics_backend(config) | ||
| _metrics_backend = BufferedMetricsBackend( | ||
| backend = BufferedMetricsBackend( | ||
| inner, | ||
| throttle_interval_sec=_buffer_throttle_interval_sec(config), | ||
| ) | ||
| arroyo_configure_metrics(ArroyoMetricsBackend(_metrics_backend)) | ||
| _metrics = Metrics(backend) | ||
| arroyo_configure_metrics(ArroyoMetricsBackend(backend)) | ||
|
|
||
|
|
||
| def get_metrics() -> Metrics: | ||
| if _metrics_backend is None: | ||
| return Metrics(_dummy_metrics_backend) | ||
| return Metrics(_metrics_backend) | ||
| global _metrics | ||
| if _metrics is None: | ||
| _metrics = Metrics(_dummy_metrics_backend) | ||
|
|
||
| return _metrics | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Caching dummy metrics breaks later configuration callMedium Severity
Additional Locations (1)Reviewed by Cursor Bugbot for commit f2ca949. Configure here.
Comment on lines
474
to
+479
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: The test fixture Suggested FixIn the Prompt for AI Agent |
||
|
|
||
|
|
||
| def get_size(obj: Any) -> int | None: | ||
|
|
||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: The
ERRORSmetric is not scaled by1/sample_rateto account for sampling, unlikeINPUT_MESSAGES, leading to undercounted errors.Severity: HIGH
Suggested Fix
In the
output_metricsblock, scale theERRORSmetric by1 / sample_rateto match the handling ofINPUT_MESSAGES. Changemetrics.increment(Metric.ERRORS, tags=tags)tometrics.increment(Metric.ERRORS, value=1 / sample_rate, tags=tags).Prompt for AI Agent
Did we get this right? 👍 / 👎 to inform future reviews.