Skip to content

Introduce lightweight stats#305

Open
fpacifici wants to merge 6 commits intomainfrom
fpacifici/add_stats
Open

Introduce lightweight stats#305
fpacifici wants to merge 6 commits intomainfrom
fpacifici/add_stats

Conversation

@fpacifici
Copy link
Copy Markdown
Collaborator

@fpacifici fpacifici commented Apr 26, 2026

Recording metrics at every step is still having a major impact on throughput.

In this profile we ran the protobuf parsing of a message + metrics + message instantiation 1M times:

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)                                                                                    
  1000000    1.957    0.000   12.888    0.000 /Users/filippopacifici/code/streams/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py:55(fake_transform)                                                                                                                                                     
  1000000    0.867    0.000    5.923    0.000 /Users/filippopacifici/code/streams/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py:34(output_metrics)                                                                                                                                                     
  1199970    0.442    0.000    2.962    0.000 /Users/filippopacifici/code/streams/sentry_streams/sentry_streams/metrics/metrics.py:380(increment)          
  1000000    0.510    0.000    2.678    0.000 /Users/filippopacifici/code/streams/sentry_streams/sentry_streams/pipeline/msg_codecs.py:45(msg_parser)      
  2199970    0.779    0.000    2.567    0.000 /Users/filippopacifici/code/streams/sentry_streams/sentry_streams/metrics/metrics.py:302(__add_to_buffer)    
  1000000    0.363    0.000    2.430    0.000 /Users/filippopacifici/code/streams/sentry_streams/sentry_streams/metrics/metrics.py:397(timing)             
  1199970    0.450    0.000    2.192    0.000 /Users/filippopacifici/code/streams/sentry_streams/sentry_streams/metrics/metrics.py:326(increment)          
  1000000    0.374    0.000    1.800    0.000 /Users/filippopacifici/code/streams/sentry_streams/sentry_streams/metrics/metrics.py:339(timing)             
  2199970    1.190    0.000    1.787    0.000 /Users/filippopacifici/code/streams/sentry_streams/sentry_streams/metrics/metrics.py:319(__hash_tags)        
  1000000    0.325    0.000    1.294    0.000 /Users/filippopacifici/code/streams/sentry_streams/.venv/lib/python3.11/site-packages/sentry_kafka_schemas/codecs/protobuf.py:34(decode)                                                                                                                                
  1000000    0.970    0.000    0.970    0.000 {method 'ParseFromString' of 'google._upb._message.Message' objects}                                         
  1000000    0.279    0.000    0.853    0.000 /Users/filippopacifici/code/streams/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py:20(input_metrics)                                                                                                                                                      
  1000000    0.503    0.000    0.805    0.000 /Users/filippopacifici/code/streams/sentry_streams/sentry_streams/pipeline/msg_codecs.py:32(_get_codec_from_msg)                                   
  2000000    0.493    0.000    0.744    0.000 /Users/filippopacifici/code/streams/sentry_streams/sentry_streams/metrics/metrics.py:488(get_size)
  2199970    0.454    0.000    0.601    0.000 /Users/filippopacifici/code/streams/sentry_streams/sentry_streams/metrics/metrics.py:343(__throttled_flush)
  2199970    0.450    0.000    0.595    0.000 /opt/homebrew/Cellar/python@3.11/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/enum.py:193(__get__)                            
  2199970    0.370    0.000    0.370    0.000 /Users/filippopacifici/code/streams/sentry_streams/sentry_streams/metrics/metrics.py:320(<listcomp>)
  3000555    0.271    0.000    0.271    0.000 {built-in method builtins.isinstance}
  1000000    0.177    0.000    0.249    0.000 {built-in method builtins.hasattr}
  3299957    0.222    0.000    0.222    0.000 {built-in method time.time}
  3000000    0.209    0.000    0.209    0.000 /Users/filippopacifici/code/streams/sentry_streams/sentry_streams/pipeline/message.py:201(payload)
  1000000    0.173    0.000    0.173    0.000 /Users/filippopacifici/code/streams/sentry_streams/sentry_streams/pipeline/message.py:105(__init__)
  2000000    0.166    0.000    0.166    0.000 /Users/filippopacifici/code/streams/sentry_streams/sentry_streams/pipeline/message.py:213(schema)
  1000000    0.161    0.000    0.163    0.000 /Users/filippopacifici/code/streams/sentry_streams/.venv/lib/python3.11/site-packages/sentry_kafka_schemas/sentry_kafka_schemas.py:147(get_codec)
  2199970    0.145    0.000    0.145    0.000 /opt/homebrew/Cellar/python@3.11/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/enum.py:12

3/4 of the time is spent in recording metrics in the buffered backend.
This does not include sending the metrics, jsut the buffering logic.

This is not unexpected:

  • we record metrics at each step and step logic can be minimal, even less than recording metrics.
  • we record 5 metrics per step

This PR makes the process lighter by introducing the PipelineStats abstraction which is a very lightweight way to collect stats per step. It is just passed a step name and, for each call it just increments a counter in a dictionary.

Also removing some metrics entirely

@fpacifici fpacifici requested a review from a team as a code owner April 26, 2026 21:38
Comment thread sentry_streams/sentry_streams/metrics/stats.py Outdated
Comment thread sentry_streams/sentry_streams/metrics/metrics.py
Comment thread sentry_streams/sentry_streams/metrics/stats.py
Comment thread sentry_streams/sentry_streams/metrics/stats.py
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit cf3d1e4. Configure here.

global _stats
if _stats is None:
_stats = PipielineStats(get_raw_metrics())
return _stats
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_stats retains stale raw metrics after reconfigure

Medium Severity

The _stats singleton captures the Metrics instance from get_raw_metrics() on its first call. When configure_metrics(force=True) later updates the global metrics backend, _stats continues to use the stale Metrics object. This routes subsequent pipeline stats to the wrong backend and causes test isolation issues.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit cf3d1e4. Configure here.

FLUSH_TIME = 10


class PipielineStats:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class name PipielineStats is misspelled

Low Severity

The newly introduced class is named PipielineStats (extra i) instead of PipelineStats, and the misspelling has propagated into the global type annotation, the get_stats() return type, the test imports, and the name-mangled attribute string _PipielineStats__last_flush_time. Renaming after wider adoption is harder than fixing it now while the symbol is still internal.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit cf3d1e4. Configure here.

self._metrics.increment(Metric.ERRORS, value, tags)
for step, fvalue in self._timing_buffer.items():
tags = {"step": step}
self._metrics.timing(Metric.DURATION, fvalue, tags)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

step_timing reports max-of-window instead of distribution

Medium Severity

step_timing keeps only the largest value seen per step in a 10-second window and then emits it via Metrics.timing(Metric.DURATION, …). Downstream backends (Datadog dogstatsd timing, log backend) treat each timing sample as a histogram observation, so dashboards and percentiles built on duration will now see a single max-per-window sample instead of the previous per-call distribution. Effective p50/p95/p99 will collapse toward the max and the call rate carried by the timing metric disappears.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit cf3d1e4. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant