Rax memory tracking: manual deltas#6
Draft
liorsve wants to merge 8 commits into
Draft
Conversation
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
0da477b to
83c776f
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Incremental O(1) memory tracking for streams and vset. Replaces the O(n) walk in
objectComputeSize()with running counters updated at every mutation site. This approach uses manual before/after deltas — no rax struct changes beyond a read-only logical size field.Design
Two counters on the stream struct
tracked_data_bytescontains everything the user created — listpack payloads, consumer group structs, NACK structs, consumer structs, and consumer names.tracked_overheadcontains only rax internal node overhead (headers, key bytes, child pointers, padding). Stream total memory =tracked_overhead + tracked_data_bytes.Why logical sizes
rax already has
alloc_sizetracking allocation sizes viazmalloc_usable_size. We add a parallellogical_sizefield usingraxNodeCurrentLength— struct arithmetic without allocator calls. Logical sizes are deterministic, platform-independent, and consistent with how we track other data structures (hashtable, quicklist).Why this must be inside rax.c
rax node overhead cannot be computed by callers —
raxNodeCurrentLengthdepends on internal node fields (iskey,isnull,iscompr,size) not exposed outside rax.c. The iterator only visits key nodes, not internal routing nodes. Only rax.c sees all node mutations (splits, compressions, resizes).rax changes (rax.h / rax.c)
logical_sizefield to rax struct — updated at every site wherealloc_sizeis updated (~15 sites). UsesraxNodeCurrentLengthinstead ofrax_ptr_alloc_size.raxLogicalSize()— returns thelogical_sizefield. Callers use it for before/after delta computation.raxFreeWithCallbackAndContext(rax, cb, ctx)— same asraxFreeWithCallbackbut the callback receives a context pointer (see "Destruction callbacks" below for why this is needed).Stream changes (stream.h / t_stream.c)
Tracking at mutation sites
Every rax mutation in stream command handlers is wrapped with:
Manual
sizeof/lpBytes/sdsReqSizeadditions for data tracking. ~45 manual sites across:streamNew— initialize countersstreamAppendItem— listpack data delta (new node + append to existing)streamTrim/streamIteratorRemoveEntry— listpack removal (whole node + in-place)streamCreateCG— CG struct + sub-rax treesstreamDup— copy all tracked dataDestruction callbacks
When a stream is freed,
raxFreeWithCallbackrecursively walks all nodes and calls a callback to free data (e.g.,zfreefor NACKs,lpFreefor listpacks). But these standard callbacks don't know about tracking counters — they just release memory. The tracking counters still hold accumulated sizes that need to be subtracted.The standard
raxFreeWithCallbacksignature isvoid (*cb)(void *data)— it receives the data pointer but not the stream pointer, so it can't update the counters.raxFreeWithCallbackAndContextextends this tovoid (*cb)(void *data, void *ctx), allowing the stream to pass itself as context. Custom callbacks piggyback on the existing recursive walk:4 such callbacks (NACK, consumer, CG, listpack) mirror the insert-side tracking. No extra iteration — the subtraction happens during the same walk that frees the data.
Verification
streamVerifyTracking()walks the entire stream hierarchy and compares against tracked counters. Exposed viaDEBUG STREAM-VERIFY-TRACKING <key>for Tcl integration tests.Vset changes (vset.c / vset.h)
vsetRaxStatewrapper struct — holdsrax *r+ tracking counters. Required because vset istypedef void *with no struct for counters.vsetInnerBucketDataSize()— logical size per bucket type (VECTOR: sizeof(pVector) + lensizeof(void), HT: hashtableMemUsage, SINGLE/NONE: 0).vsetBucketMemUsage_RAX.vsetVerifyTracking()for test verification.splitBucketIfPossibleRELOCATE path incorrectly added bucket size (should be zero delta — same bucket, different rax key).Tests
Stream unit tests (11 gtest tests)
AppendEntries, Trim, TrimByID, TrimEncodingBoundary, IteratorRemoveAll, ConsumerGroupCreate, FullLifecycle, DestroyConsumerGroup, DelConsumerWithNACKs, StreamDup, Fuzzer (2000 random ops).
Limitation: Several unit tests (FullLifecycle, DestroyConsumerGroup, DelConsumerWithNACKs, StreamDup, Fuzzer) manually mirror the production tracking code — they call internal APIs like
streamCreateConsumerandraxInsertdirectly, then manually addsizeof(streamNACK)etc. to the counters, replicating what the command handlers do. This is necessary because functions likestreamCreateConsumer,streamDelConsumer, andstreamCreateNACKdon't takestream *s— the tracking lives in the command handlers that call them, and those command handlers require a full server with client connections, parsed arguments, and reply buffers that can't be set up in a unit test. The unit tests verify the tracking formula and ground truth walk are correct, but don't exercise the actual command handler code paths.Tcl integration tests (16 tests)
The Tcl tests cover that gap — they run against a live server and exercise real command handlers (XADD, XREADGROUP, XACK, XCLAIM, XAUTOCLAIM, XGROUP CREATE/DESTROY/CREATECONSUMER/DELCONSUMER, XTRIM, XDEL), then call
DEBUG STREAM-VERIFY-TRACKINGto verify the production tracking is correct. Any bug in a command handler's tracking would be caught here.Vset tracking tests (9 gtest tests)
TrackingAddToRax, TrackingRemoveFromRax, TrackingExpireFromRax, TrackingUpdateInRax, TrackingSameBucketPromotion, TrackingVectorToHashtable, TrackingDefrag, TrackingShrinkFromRax, TrackingFuzzer (10000 random ops).