Skip to content

Rax memory tracking: manual deltas#6

Draft
liorsve wants to merge 8 commits into
unstablefrom
rax-data-tracking-caller
Draft

Rax memory tracking: manual deltas#6
liorsve wants to merge 8 commits into
unstablefrom
rax-data-tracking-caller

Conversation

@liorsve
Copy link
Copy Markdown
Owner

@liorsve liorsve commented Apr 3, 2026

Summary

Incremental O(1) memory tracking for streams and vset. Replaces the O(n) walk in objectComputeSize() with running counters updated at every mutation site. This approach uses manual before/after deltas — no rax struct changes beyond a read-only logical size field.

Design

Two counters on the stream struct

tracked_data_bytes = Σ lpBytes(listpacks)
                   + Σ sdsReqSize(consumer names)
                   + Σ sizeof(streamCG/NACK/Consumer)

tracked_overhead   = Σ raxLogicalSize deltas across all sub-rax trees

tracked_data_bytes contains everything the user created — listpack payloads, consumer group structs, NACK structs, consumer structs, and consumer names. tracked_overhead contains only rax internal node overhead (headers, key bytes, child pointers, padding). Stream total memory = tracked_overhead + tracked_data_bytes.

Why logical sizes

rax already has alloc_size tracking allocation sizes via zmalloc_usable_size. We add a parallel logical_size field using raxNodeCurrentLength — struct arithmetic without allocator calls. Logical sizes are deterministic, platform-independent, and consistent with how we track other data structures (hashtable, quicklist).

Why this must be inside rax.c

rax node overhead cannot be computed by callers — raxNodeCurrentLength depends on internal node fields (iskey, isnull, iscompr, size) not exposed outside rax.c. The iterator only visits key nodes, not internal routing nodes. Only rax.c sees all node mutations (splits, compressions, resizes).

rax changes (rax.h / rax.c)

  • Added logical_size field to rax struct — updated at every site where alloc_size is updated (~15 sites). Uses raxNodeCurrentLength instead of rax_ptr_alloc_size.
  • raxLogicalSize() — returns the logical_size field. Callers use it for before/after delta computation.
  • raxFreeWithCallbackAndContext(rax, cb, ctx) — same as raxFreeWithCallback but the callback receives a context pointer (see "Destruction callbacks" below for why this is needed).

Stream changes (stream.h / t_stream.c)

Tracking at mutation sites

Every rax mutation in stream command handlers is wrapped with:

size_t before = raxLogicalSize(rax);
raxInsert(rax, ...);
s->tracked_overhead += raxLogicalSize(rax) - before;

Manual sizeof/lpBytes/sdsReqSize additions for data tracking. ~45 manual sites across:

  • streamNew — initialize counters
  • streamAppendItem — listpack data delta (new node + append to existing)
  • streamTrim / streamIteratorRemoveEntry — listpack removal (whole node + in-place)
  • streamCreateCG — CG struct + sub-rax trees
  • Consumer creation (4 sites: XREADGROUP, CREATECONSUMER, XCLAIM, XAUTOCLAIM)
  • NACK delivery, XACK, XCLAIM/XAUTOCLAIM stale removal + reassignment
  • XGROUP DESTROY, XGROUP DELCONSUMER
  • streamDup — copy all tracked data

Destruction callbacks

When a stream is freed, raxFreeWithCallback recursively walks all nodes and calls a callback to free data (e.g., zfree for NACKs, lpFree for listpacks). But these standard callbacks don't know about tracking counters — they just release memory. The tracking counters still hold accumulated sizes that need to be subtracted.

The standard raxFreeWithCallback signature is void (*cb)(void *data) — it receives the data pointer but not the stream pointer, so it can't update the counters. raxFreeWithCallbackAndContext extends this to void (*cb)(void *data, void *ctx), allowing the stream to pass itself as context. Custom callbacks piggyback on the existing recursive walk:

void streamFreeNACKWithTracking(void *data, void *ctx) {
    stream *s = ctx;
    s->tracked_data_bytes -= sizeof(streamNACK);
    zfree(data);
}

4 such callbacks (NACK, consumer, CG, listpack) mirror the insert-side tracking. No extra iteration — the subtraction happens during the same walk that frees the data.

Verification

streamVerifyTracking() walks the entire stream hierarchy and compares against tracked counters. Exposed via DEBUG STREAM-VERIFY-TRACKING <key> for Tcl integration tests.

Vset changes (vset.c / vset.h)

  • vsetRaxState wrapper struct — holds rax *r + tracking counters. Required because vset is typedef void * with no struct for counters.
  • vsetInnerBucketDataSize() — logical size per bucket type (VECTOR: sizeof(pVector) + lensizeof(void), HT: hashtableMemUsage, SINGLE/NONE: 0).
  • Same before/after delta pattern as streams. ~14 manual sites.
  • O(1) vsetBucketMemUsage_RAX.
  • vsetVerifyTracking() for test verification.
  • Bugfix: splitBucketIfPossible RELOCATE path incorrectly added bucket size (should be zero delta — same bucket, different rax key).

Tests

Stream unit tests (11 gtest tests)

AppendEntries, Trim, TrimByID, TrimEncodingBoundary, IteratorRemoveAll, ConsumerGroupCreate, FullLifecycle, DestroyConsumerGroup, DelConsumerWithNACKs, StreamDup, Fuzzer (2000 random ops).

Limitation: Several unit tests (FullLifecycle, DestroyConsumerGroup, DelConsumerWithNACKs, StreamDup, Fuzzer) manually mirror the production tracking code — they call internal APIs like streamCreateConsumer and raxInsert directly, then manually add sizeof(streamNACK) etc. to the counters, replicating what the command handlers do. This is necessary because functions like streamCreateConsumer, streamDelConsumer, and streamCreateNACK don't take stream *s — the tracking lives in the command handlers that call them, and those command handlers require a full server with client connections, parsed arguments, and reply buffers that can't be set up in a unit test. The unit tests verify the tracking formula and ground truth walk are correct, but don't exercise the actual command handler code paths.

Tcl integration tests (16 tests)

The Tcl tests cover that gap — they run against a live server and exercise real command handlers (XADD, XREADGROUP, XACK, XCLAIM, XAUTOCLAIM, XGROUP CREATE/DESTROY/CREATECONSUMER/DELCONSUMER, XTRIM, XDEL), then call DEBUG STREAM-VERIFY-TRACKING to verify the production tracking is correct. Any bug in a command handler's tracking would be caught here.

Vset tracking tests (9 gtest tests)

TrackingAddToRax, TrackingRemoveFromRax, TrackingExpireFromRax, TrackingUpdateInRax, TrackingSameBucketPromotion, TrackingVectorToHashtable, TrackingDefrag, TrackingShrinkFromRax, TrackingFuzzer (10000 random ops).

liorsve added 5 commits April 3, 2026 12:12
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
liorsve added 3 commits April 3, 2026 18:32
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant