Skip to content

feat(rust_streams): PipelineStats and Batch/Filter metrics#309

Merged
fpacifici merged 4 commits intomainfrom
fpacifici/rust_metrics
May 4, 2026
Merged

feat(rust_streams): PipelineStats and Batch/Filter metrics#309
fpacifici merged 4 commits intomainfrom
fpacifici/rust_metrics

Conversation

@fpacifici
Copy link
Copy Markdown
Collaborator

@fpacifici fpacifici commented May 1, 2026

Summary

We are not collecting metrics from Rust Steps. This adds the equivalent abstractions we have in python.
The next step will be to replace the python ones with the rust ones.

Adds a Rust pipeline_stats module aligned with Python PipelineStats (stats.py): in-memory aggregation of per-step exec counts, error counts, and max duration per flush window (10s), then emission through the metrics crate only (streams.pipeline.input.messages, errors, duration with a step label). No Arroyo types on the recording path.

Changes

  • RuntimeOperator: step_name on Batch and Filter; Python adapter passes pipeline step names.
  • BatchStep: step_exec per ingested streaming row; step_timing on batch flush (b.flush()).
  • Filter: step_exec / step_timing / step_error around the Python predicate; PredicateFilter closure no longer calls Python input_metrics / output_metrics to avoid double counting.
  • Tests: Rust unit tests for flush behavior and throttle (local metrics recorder).

Made with Cursor

Introduce pipeline_stats.rs mirroring Python PipelineStats: buffered per-step exec, error, and max timing, flushed every 10s via the metrics crate (streams.pipeline.*) with a step label. No Arroyo dependency for recording.

Plumb step_name through RuntimeOperator Batch/Filter. BatchStep records step_exec per row and step_timing on flush; Filter records around the Python predicate. Remove duplicate Python metrics from PredicateFilter.

Rust tests cover aggregation and throttling; use scripts/rust-envvars with cargo test per Makefile.

Made-with: Cursor
@fpacifici fpacifici requested a review from a team as a code owner May 1, 2026 06:37
Put step_name before optional Batch parameters so rust_streams.pyi is valid Python (required args before defaults). Apply cargo fmt to satisfy the lint job.

Made-with: Cursor
Comment thread sentry_streams/src/pipeline_stats.rs Outdated
impl PipelineStats {
fn with_flush_interval(flush_interval: Duration) -> Self {
Self {
inner: Mutex::new(Inner::new()),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the additional buffering? Couldn't the metrics backend take care of this?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were used to have that and it turned out to be a major performance issue (addressed here in the python version #305).

In a nutshell:

  • multiple stats are collected at every step of the pipeline, including steps that are trivial, this makes such collection extremely high throughput.
  • The metrics backend does additional work (see the profile attached to the linked PR): like tags normalization as it has to be general. It does not know there is one tag only, that ended up being the culprit of the overhead.
  • Stats are much more lightweight. At every call we just bump a counter, which is viable.

Now, the rust version is in a mutex which may turn out to be a problem. I will profile that part, though the reason we need a light weight version of the buffered metrics is the one described in the python PR, we did not retrofit it in the buffer because we get speed in exchange for flexibility in the stats abstraction, which we want to preserve in the backend.


fn submit(&mut self, message: Message<RoutedValue>) -> Result<(), SubmitError<RoutedValue>> {
if self.route != message.payload().route {
if self.route != message.payload().route || message.payload().payload.is_watermark_msg() {
Copy link
Copy Markdown
Collaborator Author

@fpacifici fpacifici May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved here in order not to count watermarks in metrics

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a tag or use a different metric for watermarks? I would think we'd still like metrics on watermark messages, but tracked separately from main messages.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good point. More importantly we should emit metrics when they are emitted and when they are committed.

Let me think over it.
I will send a PR for those that covers the overall pipeline
https://linear.app/getsentry/issue/STREAM-921/add-metrics-to-track-watermarks-per-step

Copy link
Copy Markdown
Member

@evanh evanh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally approved, added one comment.


fn submit(&mut self, message: Message<RoutedValue>) -> Result<(), SubmitError<RoutedValue>> {
if self.route != message.payload().route {
if self.route != message.payload().route || message.payload().payload.is_watermark_msg() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a tag or use a different metric for watermarks? I would think we'd still like metrics on watermark messages, but tracked separately from main messages.

@fpacifici fpacifici merged commit 865f51b into main May 4, 2026
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants