feat(rust_streams): PipelineStats and Batch/Filter metrics#309
feat(rust_streams): PipelineStats and Batch/Filter metrics#309
Conversation
Introduce pipeline_stats.rs mirroring Python PipelineStats: buffered per-step exec, error, and max timing, flushed every 10s via the metrics crate (streams.pipeline.*) with a step label. No Arroyo dependency for recording. Plumb step_name through RuntimeOperator Batch/Filter. BatchStep records step_exec per row and step_timing on flush; Filter records around the Python predicate. Remove duplicate Python metrics from PredicateFilter. Rust tests cover aggregation and throttling; use scripts/rust-envvars with cargo test per Makefile. Made-with: Cursor
Put step_name before optional Batch parameters so rust_streams.pyi is valid Python (required args before defaults). Apply cargo fmt to satisfy the lint job. Made-with: Cursor
| impl PipelineStats { | ||
| fn with_flush_interval(flush_interval: Duration) -> Self { | ||
| Self { | ||
| inner: Mutex::new(Inner::new()), |
There was a problem hiding this comment.
Why do we need the additional buffering? Couldn't the metrics backend take care of this?
There was a problem hiding this comment.
We were used to have that and it turned out to be a major performance issue (addressed here in the python version #305).
In a nutshell:
- multiple stats are collected at every step of the pipeline, including steps that are trivial, this makes such collection extremely high throughput.
- The metrics backend does additional work (see the profile attached to the linked PR): like tags normalization as it has to be general. It does not know there is one tag only, that ended up being the culprit of the overhead.
- Stats are much more lightweight. At every call we just bump a counter, which is viable.
Now, the rust version is in a mutex which may turn out to be a problem. I will profile that part, though the reason we need a light weight version of the buffered metrics is the one described in the python PR, we did not retrofit it in the buffer because we get speed in exchange for flexibility in the stats abstraction, which we want to preserve in the backend.
|
|
||
| fn submit(&mut self, message: Message<RoutedValue>) -> Result<(), SubmitError<RoutedValue>> { | ||
| if self.route != message.payload().route { | ||
| if self.route != message.payload().route || message.payload().payload.is_watermark_msg() { |
There was a problem hiding this comment.
Moved here in order not to count watermarks in metrics
There was a problem hiding this comment.
Should we add a tag or use a different metric for watermarks? I would think we'd still like metrics on watermark messages, but tracked separately from main messages.
There was a problem hiding this comment.
I think this is a good point. More importantly we should emit metrics when they are emitted and when they are committed.
Let me think over it.
I will send a PR for those that covers the overall pipeline
https://linear.app/getsentry/issue/STREAM-921/add-metrics-to-track-watermarks-per-step
evanh
left a comment
There was a problem hiding this comment.
Generally approved, added one comment.
|
|
||
| fn submit(&mut self, message: Message<RoutedValue>) -> Result<(), SubmitError<RoutedValue>> { | ||
| if self.route != message.payload().route { | ||
| if self.route != message.payload().route || message.payload().payload.is_watermark_msg() { |
There was a problem hiding this comment.
Should we add a tag or use a different metric for watermarks? I would think we'd still like metrics on watermark messages, but tracked separately from main messages.
Summary
We are not collecting metrics from Rust Steps. This adds the equivalent abstractions we have in python.
The next step will be to replace the python ones with the rust ones.
Adds a Rust
pipeline_statsmodule aligned with PythonPipelineStats(stats.py): in-memory aggregation of per-step exec counts, error counts, and max duration per flush window (10s), then emission through themetricscrate only (streams.pipeline.input.messages,errors,durationwith asteplabel). No Arroyo types on the recording path.Changes
step_nameonBatchandFilter; Python adapter passes pipeline step names.step_execper ingested streaming row;step_timingon batch flush (b.flush()).step_exec/step_timing/step_erroraround the Python predicate; PredicateFilter closure no longer calls Pythoninput_metrics/output_metricsto avoid double counting.Made with Cursor