Rax memory tracking: external pointer + callback#8
Draft
Conversation
…s of data bytes Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
8ea4b95 to
d4f4882
Compare
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
d4f4882 to
21638c1
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Incremental O(1) memory tracking for streams and vset. Extends the external pointer approach with a `dataGetSize` callback — rax automatically tracks data payload sizes on insert, remove, and recursive free. For streams, this eliminates ~22 manual data tracking sites and all 4 destruction callbacks. Only 4 manual sites remain (listpack mutations in streamAppendItem, streamTrim, streamIteratorRemoveEntry, and streamDup).
Design
Two counters on the stream struct
```
tracked_data_bytes = Σ lpBytes(listpacks) [manual, 3 sites]
+ Σ sizeof(streamCG/NACK/Consumer) [automatic via callback]
+ Σ sdsReqSize(consumer names) [automatic via callback]
tracked_overhead = rax node overhead only [automatic via external pointer]
```
`tracked_overhead` is written only by rax.c (via `external_logical_size`). `tracked_data_bytes` is written by rax.c (via `dataGetSize` callback) for insert/remove/free, and manually for in-place listpack mutations.
How the external pointer works
Same as the external pointer approach: each rax instance has `size_t *external_logical_size`, all sub-rax trees point to `&s->tracked_overhead`, rax node overhead is fully automatic.
How the dataGetSize callback works
Each rax instance can optionally have a `dataGetSize` callback + `external_tracked_data` pointer. When set via `raxSetDataTracking(rax, callback, &counter)`:
The callback does NOT fire on overwrite (existing key, new data pointer) — the old data pointer may be stale (e.g., listpack was reallocated in-place by the caller). Overwrites are handled manually via `raxAdjustTrackedDataBytes()`.
Per-tree callback configuration for streams
```
s->rax: no callback (raxInsert fires before listpack is fully built)
s->cgroups: dataGetSize returns sizeof(streamCG)
cg->pel: dataGetSize returns sizeof(streamNACK)
cg->consumers: dataGetSize returns sizeof(streamConsumer) + sdsReqSize(name)
consumer->pel: no callback (shared NACK refs, not counted)
```
All `external_tracked_data` → `&s->tracked_data_bytes`. All `external_logical_size` → `&s->tracked_overhead`.
Why s->rax has no callback
`streamAppendItem` calls `raxInsert` to create a new node BEFORE the listpack is fully built — primary entry fields are written, the node is inserted, then actual entry data is appended afterward. The callback would fire on the first insert with a partial `lpBytes` value, and the subsequent overwrite with the correct size wouldn't trigger it. All 3 listpack mutation sites are tracked manually.
Why vset doesn't use the callback
Vset's `removeFromBucket_VECTOR` frees the pVector bucket before calling `raxRemove`. If the callback tried to read the freed bucket pointer, it would be use-after-free. Since `raxSetData` sites dominate vset's mutation patterns (~8 sites, none of which fire the callback), the callback would save few sites while adding complexity. Vset keeps manual tracking.
Destruction
With the callback, destruction is fully automatic for streams. During `raxRecursiveFree`, rax calls `dataGetSize(data)` for each node before freeing — subtracting the data size from `*external_tracked_data`. The rax overhead is subtracted via `raxExternalOverheadDelta`. The standard `raxFreeWithCallback` with plain free functions (`zfree`, `lpFree`, `streamFreeCG`) suffices. No `raxFreeWithCallbackAndContext` needed. The 4 custom destruction callbacks from the external pointer approach are eliminated.
rax changes (rax.h / rax.c)
Total rax struct cost: +24 bytes per instance (3 pointers). All NULL by default.
Stream changes (stream.h / t_stream.c)
Initialization
`streamNew()` connects `s->rax` overhead tracking. `streamCreateCG()` connects `s->cgroups`, `cg->pel`, `cg->consumers` with both external pointer and callback. Consumer creation in command handlers connects `consumer->pel` overhead only.
What's automatic
What's manual (4 sites)
Verification
`streamVerifyTracking()` walks the entire stream hierarchy and compares both counters. Exposed via `DEBUG STREAM-VERIFY-TRACKING `.
Vset changes (vset.c / vset.h)
Wrapper struct
`vsetRaxState` wrapper holds `rax *r`, `tracked_data_bytes`, `tracked_rax_overhead`. Tagged pointer points to wrapper. All `vsetBucketRax()` calls go through wrapper.
Rax overhead tracking
Automatic via `external_logical_size` → `&state->tracked_rax_overhead`.
Inner bucket data tracking (~8 manual sites)
`vsetInnerBucketDataSize()` computes logical size per bucket type. Manual before/after deltas at all mutation sites (`insertToBucket_RAX`, `splitBucketIfPossible`, `removeEntryFromRaxBucket`, `vsetBucketRemoveExpired_RAX`). The callback is not used (see "Why vset doesn't use the callback" above).
O(1) memory reporting
`vsetBucketMemUsage_RAX` = `sizeof(vsetRaxState) + state->tracked_rax_overhead + state->tracked_data_bytes`
`vsetVerifyTracking()` walks inner buckets and compares against `tracked_data_bytes`. Used only by unit tests.
Tests
Stream unit tests (11 gtest tests)
AppendEntries, Trim, TrimByID, TrimEncodingBoundary, IteratorRemoveAll, ConsumerGroupCreate, FullLifecycle, DestroyConsumerGroup, DelConsumerWithNACKs, StreamDup, Fuzzer (2000 random ops).
Tests that involve CGs/consumers/NACKs don't need manual sizeof/sdsReqSize tracking — the callbacks handle it automatically via `raxInsert`. Tests just call `raxInsert(cg->pel, ...)` and the NACK size is auto-tracked. Consumer creation via `streamCreateConsumer` auto-tracks the consumer struct + name via the callback on `cg->consumers`. Only the `raxSetExternalLogicalSize` call for `consumer->pel` overhead tracking needs explicit setup.
Tcl integration tests (16 tests)
Exercise real command handlers against a live server. Verify via `DEBUG STREAM-VERIFY-TRACKING`. These cover the actual production tracking paths.
Vset tracking tests (9 gtest tests)
TrackingAddToRax, TrackingRemoveFromRax, TrackingExpireFromRax, TrackingUpdateInRax, TrackingSameBucketPromotion, TrackingVectorToHashtable, TrackingDefrag, TrackingShrinkFromRax, TrackingFuzzer (10000 random ops).