Draft
Conversation
…s of data bytes Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
f09840b to
d445e60
Compare
Signed-off-by: Lior Sventitzky <liorsve@amazon.com> minor fixes Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
d445e60 to
987caeb
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Incremental O(1) memory tracking for streams and vset. The rax struct gets an
external_logical_sizepointer — every node mutation inside rax.c propagates the logical size delta to a caller-owned counter. For streams, all sub-rax trees point to the same stream-level counter, making rax overhead tracking fully automatic. Data tracking (struct sizes, listpack bytes, consumer names) remains manual at each command handler site.Design
Two counters on the stream struct
```
tracked_data_bytes = Σ lpBytes(listpacks)
+ Σ sdsReqSize(consumer names)
+ Σ sizeof(streamCG/NACK/Consumer)
tracked_overhead = rax node overhead only (automatic via external pointer)
```
`tracked_overhead` is never written by stream code directly — only by rax.c internals via the external pointer. Stream command handlers only touch `tracked_data_bytes`.
How the external pointer works
Each rax instance has a `size_t *external_logical_size` pointer, NULL by default. When set via `raxSetExternalLogicalSize(rax, &counter)`, the current tree's logical size is added to the counter immediately. From that point, every node mutation inside rax.c (insert, remove, split, compress, resize, free) propagates the logical size delta to `*external_logical_size` using `raxNodeCurrentLength` (logical sizes, not allocation sizes).
A stream has 5+ rax trees (`s->rax`, `s->cgroups`, per-CG `cg->pel` and `cg->consumers`, per-consumer `consumer->pel`). All point their `external_logical_size` to `&s->tracked_overhead`. Deltas from all trees accumulate into one counter — O(1) to read the total.
Non-tracking rax users (ACL, tracking, cluster) are unaffected — the pointer is NULL by default, and the NULL check before each delta write adds negligible overhead.
Why logical sizes must be tracked inside rax.c
`raxNodeCurrentLength` depends on internal node fields (`iskey`, `isnull`, `iscompr`, `size`) not exposed outside rax.c. The rax iterator only visits key nodes, not internal routing nodes. Only rax.c code sees all node mutations (splits, compressions, resizes). Callers cannot compute rax node overhead externally.
Destruction
When a stream is freed, `raxFreeWithCallback` recursively walks all nodes and calls a callback to free data (e.g., `zfree` for NACKs, `lpFree` for listpacks). The rax overhead part is automatically subtracted by `raxExternalOverheadDelta` during this walk. But the data part (sizeof, lpBytes, sdsReqSize) still needs manual subtraction.
The standard `raxFreeWithCallback` signature is `void (*cb)(void *data)` — it receives the data pointer but not the stream pointer, so it can't update the stream's data counter. `raxFreeWithCallbackAndContext` extends this to `void (*cb)(void *data, void *ctx)`, allowing custom callbacks that subtract data sizes during the existing recursive walk:
```c
void streamFreeNACKWithTracking(void *data, void *ctx) {
stream *s = ctx;
s->tracked_data_bytes -= sizeof(streamNACK);
zfree(data);
}
```
4 such callbacks (NACK, consumer, CG, listpack) handle destruction tracking. No extra iteration.
rax changes (rax.h / rax.c)
Stream changes (stream.h / t_stream.c)
Initialization
`streamNew()` creates `s->rax` and connects it via `raxSetExternalLogicalSize(s->rax, &s->tracked_overhead)`. Both counters start at zero (the external pointer adds the initial rax overhead automatically).
Automatic rax overhead tracking
Every rax operation on any connected tree (insert, remove, free) automatically updates `tracked_overhead`. No stream-side code needed for rax overhead.
Manual data tracking (~25 sites)
Data sizes are tracked manually at each command handler site:
Verification
`streamVerifyTracking()` walks the entire stream hierarchy, computes both counters independently using `raxComputeLogicalSize()` for overhead and struct/lpBytes/sdsReqSize walks for data, then compares against tracked values. Exposed via `DEBUG STREAM-VERIFY-TRACKING `.
Vset changes (vset.c / vset.h)
Wrapper struct
Vset is `typedef void *` — a tagged pointer with no struct for counters. A `vsetRaxState` wrapper holds `rax *r`, `tracked_data_bytes`, and `tracked_rax_overhead`. The tagged VSET_BUCKET_RAX pointer points to this wrapper instead of the rax directly. All existing `vsetBucketRax()` calls go through the wrapper transparently.
Rax overhead tracking
The rax's `external_logical_size` points to `&state->tracked_rax_overhead`. Automatic — zero vset code needed.
Inner bucket data tracking (~8 manual sites)
`vsetInnerBucketDataSize()` computes logical size per bucket type (VECTOR: sizeof(pVector) + lensizeof(void), HT: hashtableMemUsage, SINGLE/NONE: 0). Manual before/after deltas at bucket mutation sites:
O(1) memory reporting
`vsetBucketMemUsage_RAX` = `sizeof(vsetRaxState) + state->tracked_rax_overhead + state->tracked_data_bytes`
Defrag and free
Wrapper is defragged alongside the rax. Free paths (`freeVsetBucket`, `shrinkRaxBucketIfPossible`, empty-rax cleanup) free the wrapper after the rax.
`vsetVerifyTracking()` walks inner buckets and compares against `tracked_data_bytes`. Used only by unit tests.
Tests
Stream unit tests (11 gtest tests)
AppendEntries, Trim, TrimByID, TrimEncodingBoundary, IteratorRemoveAll, ConsumerGroupCreate, FullLifecycle, DestroyConsumerGroup, DelConsumerWithNACKs, StreamDup, Fuzzer (2000 random ops).
Tests that involve CGs/consumers/NACKs manually add sizeof/sdsReqSize to `tracked_data_bytes` because the internal APIs (`streamCreateConsumer`, `streamDelConsumer`, etc.) don't take `stream *s` — the tracking lives in command handlers that require a full server with client connections, parsed arguments, and reply buffers. The unit tests verify the tracking formula and ground truth walk are correct.
Tcl integration tests (16 tests)
Exercise real command handlers against a live server (XADD, XREADGROUP, XACK, XCLAIM, XAUTOCLAIM, XGROUP CREATE/DESTROY/CREATECONSUMER/DELCONSUMER, XTRIM, XDEL). Verify via `DEBUG STREAM-VERIFY-TRACKING`. These cover the actual production tracking paths that unit tests can't reach.
Vset tracking tests (9 gtest tests)
TrackingAddToRax, TrackingRemoveFromRax, TrackingExpireFromRax, TrackingUpdateInRax, TrackingSameBucketPromotion, TrackingVectorToHashtable, TrackingDefrag, TrackingShrinkFromRax, TrackingFuzzer (10000 random ops).