Skip to content

Rax memory tracking: external pointer#7

Draft
liorsve wants to merge 4 commits intounstablefrom
rax-external-pointer
Draft

Rax memory tracking: external pointer#7
liorsve wants to merge 4 commits intounstablefrom
rax-external-pointer

Conversation

@liorsve
Copy link
Copy Markdown
Owner

@liorsve liorsve commented Apr 3, 2026

Summary

Incremental O(1) memory tracking for streams and vset. The rax struct gets an external_logical_size pointer — every node mutation inside rax.c propagates the logical size delta to a caller-owned counter. For streams, all sub-rax trees point to the same stream-level counter, making rax overhead tracking fully automatic. Data tracking (struct sizes, listpack bytes, consumer names) remains manual at each command handler site.

Design

Two counters on the stream struct

```
tracked_data_bytes = Σ lpBytes(listpacks)
+ Σ sdsReqSize(consumer names)
+ Σ sizeof(streamCG/NACK/Consumer)

tracked_overhead = rax node overhead only (automatic via external pointer)
```

`tracked_overhead` is never written by stream code directly — only by rax.c internals via the external pointer. Stream command handlers only touch `tracked_data_bytes`.

How the external pointer works

Each rax instance has a `size_t *external_logical_size` pointer, NULL by default. When set via `raxSetExternalLogicalSize(rax, &counter)`, the current tree's logical size is added to the counter immediately. From that point, every node mutation inside rax.c (insert, remove, split, compress, resize, free) propagates the logical size delta to `*external_logical_size` using `raxNodeCurrentLength` (logical sizes, not allocation sizes).

A stream has 5+ rax trees (`s->rax`, `s->cgroups`, per-CG `cg->pel` and `cg->consumers`, per-consumer `consumer->pel`). All point their `external_logical_size` to `&s->tracked_overhead`. Deltas from all trees accumulate into one counter — O(1) to read the total.

Non-tracking rax users (ACL, tracking, cluster) are unaffected — the pointer is NULL by default, and the NULL check before each delta write adds negligible overhead.

Why logical sizes must be tracked inside rax.c

`raxNodeCurrentLength` depends on internal node fields (`iskey`, `isnull`, `iscompr`, `size`) not exposed outside rax.c. The rax iterator only visits key nodes, not internal routing nodes. Only rax.c code sees all node mutations (splits, compressions, resizes). Callers cannot compute rax node overhead externally.

Destruction

When a stream is freed, `raxFreeWithCallback` recursively walks all nodes and calls a callback to free data (e.g., `zfree` for NACKs, `lpFree` for listpacks). The rax overhead part is automatically subtracted by `raxExternalOverheadDelta` during this walk. But the data part (sizeof, lpBytes, sdsReqSize) still needs manual subtraction.

The standard `raxFreeWithCallback` signature is `void (*cb)(void *data)` — it receives the data pointer but not the stream pointer, so it can't update the stream's data counter. `raxFreeWithCallbackAndContext` extends this to `void (*cb)(void *data, void *ctx)`, allowing custom callbacks that subtract data sizes during the existing recursive walk:

```c
void streamFreeNACKWithTracking(void *data, void *ctx) {
stream *s = ctx;
s->tracked_data_bytes -= sizeof(streamNACK);
zfree(data);
}
```

4 such callbacks (NACK, consumer, CG, listpack) handle destruction tracking. No extra iteration.

rax changes (rax.h / rax.c)

  • Added `external_logical_size` pointer to rax struct (+8 bytes per instance).
  • `raxSetExternalLogicalSize(rax, ptr)` — connects a tree and adds initial size.
  • `raxExternalOverheadDelta(rax, delta)` — propagates at ~15 internal mutation sites.
  • `raxFreeWithCallbackAndContext()` — context-aware free for stream destruction.
  • `raxComputeLogicalSize()` — O(n) recursive walk, used only by test verification functions.

Stream changes (stream.h / t_stream.c)

Initialization

`streamNew()` creates `s->rax` and connects it via `raxSetExternalLogicalSize(s->rax, &s->tracked_overhead)`. Both counters start at zero (the external pointer adds the initial rax overhead automatically).

Automatic rax overhead tracking

Every rax operation on any connected tree (insert, remove, free) automatically updates `tracked_overhead`. No stream-side code needed for rax overhead.

Manual data tracking (~25 sites)

Data sizes are tracked manually at each command handler site:

  • `streamAppendItem` — listpack data delta (new node + append to existing, 3 manual sites for in-place mutations)
  • `streamTrim` / `streamIteratorRemoveEntry` — listpack removal (whole node + in-place)
  • `streamCreateCG` — `tracked_data_bytes += sizeof(streamCG)`, plus `raxSetExternalLogicalSize` on `cg->pel` and `cg->consumers` to connect new sub-trees
  • Consumer creation (4 sites: XREADGROUP, CREATECONSUMER, XCLAIM, XAUTOCLAIM) — `sizeof(streamConsumer) + sdsReqSize(name)`, plus `raxSetExternalLogicalSize` on `consumer->pel`
  • NACK delivery (`streamReplyWithRange`) — `sizeof(streamNACK)`
  • XACK, XCLAIM/XAUTOCLAIM stale NACK removal — subtract `sizeof(streamNACK)`
  • XGROUP DESTROY — subtract `sizeof(streamCG)`, then `raxFreeWithCallbackAndContext` on sub-trees
  • XGROUP DELCONSUMER — subtract `sizeof(streamConsumer) + NACKs + name`
  • `streamDup` — track all copied data in new stream

Verification

`streamVerifyTracking()` walks the entire stream hierarchy, computes both counters independently using `raxComputeLogicalSize()` for overhead and struct/lpBytes/sdsReqSize walks for data, then compares against tracked values. Exposed via `DEBUG STREAM-VERIFY-TRACKING `.

Vset changes (vset.c / vset.h)

Wrapper struct

Vset is `typedef void *` — a tagged pointer with no struct for counters. A `vsetRaxState` wrapper holds `rax *r`, `tracked_data_bytes`, and `tracked_rax_overhead`. The tagged VSET_BUCKET_RAX pointer points to this wrapper instead of the rax directly. All existing `vsetBucketRax()` calls go through the wrapper transparently.

Rax overhead tracking

The rax's `external_logical_size` points to `&state->tracked_rax_overhead`. Automatic — zero vset code needed.

Inner bucket data tracking (~8 manual sites)

`vsetInnerBucketDataSize()` computes logical size per bucket type (VECTOR: sizeof(pVector) + lensizeof(void), HT: hashtableMemUsage, SINGLE/NONE: 0). Manual before/after deltas at bucket mutation sites:

  • `insertToBucket_RAX` — bucket creation and type transitions (SINGLE→VECTOR, VECTOR→HT)
  • `splitBucketIfPossible` — bucket split (SPLIT path only; RELOCATE has zero delta)
  • `removeEntryFromRaxBucket` — bucket shrink or removal
  • `vsetBucketRemoveExpired_RAX` — partial bucket expiry

O(1) memory reporting

`vsetBucketMemUsage_RAX` = `sizeof(vsetRaxState) + state->tracked_rax_overhead + state->tracked_data_bytes`

Defrag and free

Wrapper is defragged alongside the rax. Free paths (`freeVsetBucket`, `shrinkRaxBucketIfPossible`, empty-rax cleanup) free the wrapper after the rax.

`vsetVerifyTracking()` walks inner buckets and compares against `tracked_data_bytes`. Used only by unit tests.

Tests

Stream unit tests (11 gtest tests)

AppendEntries, Trim, TrimByID, TrimEncodingBoundary, IteratorRemoveAll, ConsumerGroupCreate, FullLifecycle, DestroyConsumerGroup, DelConsumerWithNACKs, StreamDup, Fuzzer (2000 random ops).

Tests that involve CGs/consumers/NACKs manually add sizeof/sdsReqSize to `tracked_data_bytes` because the internal APIs (`streamCreateConsumer`, `streamDelConsumer`, etc.) don't take `stream *s` — the tracking lives in command handlers that require a full server with client connections, parsed arguments, and reply buffers. The unit tests verify the tracking formula and ground truth walk are correct.

Tcl integration tests (16 tests)

Exercise real command handlers against a live server (XADD, XREADGROUP, XACK, XCLAIM, XAUTOCLAIM, XGROUP CREATE/DESTROY/CREATECONSUMER/DELCONSUMER, XTRIM, XDEL). Verify via `DEBUG STREAM-VERIFY-TRACKING`. These cover the actual production tracking paths that unit tests can't reach.

Vset tracking tests (9 gtest tests)

TrackingAddToRax, TrackingRemoveFromRax, TrackingExpireFromRax, TrackingUpdateInRax, TrackingSameBucketPromotion, TrackingVectorToHashtable, TrackingDefrag, TrackingShrinkFromRax, TrackingFuzzer (10000 random ops).

liorsve added 3 commits April 3, 2026 12:20
…s of data bytes

Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
@liorsve liorsve force-pushed the rax-external-pointer branch 5 times, most recently from f09840b to d445e60 Compare April 5, 2026 11:20
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>

minor fixes

Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
@liorsve liorsve force-pushed the rax-external-pointer branch from d445e60 to 987caeb Compare April 5, 2026 11:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant