feat(capture): emit counter metrics as deltas instead of cumulative values#1696
Open
scottopell wants to merge 2 commits intomainfrom
Open
feat(capture): emit counter metrics as deltas instead of cumulative values#1696scottopell wants to merge 2 commits intomainfrom
scottopell wants to merge 2 commits intomainfrom
Conversation
…alues Convert registry counter capture from cumulative to delta semantics: - Add `counter_prev` field to StateMachine to track previous counter values - First observation of each counter is skipped (used as baseline) - Subsequent observations emit delta = current - prev - Counter resets handled via saturating_sub (emit 0 if current < prev) Add first_tick tracking to Accumulator to prevent outputting data for ticks before a key was first observed: - Add `counter_first_tick` and `gauge_first_tick` HashMaps - Skip output in flush() and drain() for ticks before first_tick Add parquet schema versioning: - `lading.schema_version = "2"` - `lading.counter_semantics = "delta"` This change enables downstream consumers to use counter values directly without post-processing to calculate deltas. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
GeorgeHahn
reviewed
Jan 13, 2026
Comment on lines
188
to
194
| .set_key_value_metadata(Some(vec![ | ||
| KeyValue::new("lading.schema_version".to_string(), "2".to_string()), | ||
| KeyValue::new( | ||
| "lading.counter_semantics".to_string(), | ||
| "delta".to_string(), | ||
| ), | ||
| ])) |
blt
reviewed
Jan 16, 2026
| /// Previous counter values for delta calculation. | ||
| /// First observation of each counter is skipped to avoid emitting | ||
| /// the entire cumulative value as a "delta". | ||
| counter_prev: FxHashMap<Key, u64>, |
Collaborator
There was a problem hiding this comment.
Hmm the only thing I think I have as a concern with this PR is how this new map interacts with expiration. I don't think it does, so this is a steady accumulation site?
|
|
||
| // Verify we actually produced output | ||
| assert!(last_counter_value.is_some(), "No counter values in output"); | ||
| // Note: First counter observation is skipped, so we have 599 counter values |
Collaborator
There was a problem hiding this comment.
I was confused by this, would be worth stating that it's skipped because of the delta calculation. I guess that is done later L1584 - L1586.
Comment on lines
+310
to
+313
| /// Tick when each counter key was first written (for delta counter support) | ||
| counter_first_tick: FxHashMap<Key, u64>, | ||
| /// Tick when each gauge key was first written | ||
| gauge_first_tick: FxHashMap<Key, u64>, |
Collaborator
There was a problem hiding this comment.
Rather than have two whole separate maps I would almost rather than we alter
counters: FxHashMap<Key, { first_tick: u64, buf: [u64; BUFFER_SIZE]}>,
That way the information is co-local in memory and we avoid two additional maps, lookups.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
lading.schema_version = "2",lading.counter_semantics = "delta")Changes
StateMachine
counter_prevfield to track previous counter valuesdelta = current - prevsaturating_sub(emit 0 if current < prev)Accumulator
counter_first_tickandgauge_first_tickHashMapsflush()anddrain()for ticks before a key'sfirst_tickParquet Format
lading.schema_version = "2"lading.counter_semantics = "delta"Test plan
🤖 Generated with Claude Code