The aggregate executor retains the raw msgpack bytes of every row in a matched group until scan completes. A multi-million-row aggregation OOMs the data-plane core — contradicting the code's own comment that claims streaming.
Summary
handle_aggregate accumulates every matching document's raw bytes into HashMap<String, Vec<Vec<u8>>> binary_groups and only computes aggregate functions after the full scan finishes. The inline comment at line 304 claims O(chunk_size + num_groups) memory, but the code path is O(total_matching_docs × avg_doc_size).
Current code
nodedb/src/data/executor/handlers/aggregate.rs:67-92 — group_doc:
fn group_doc(
value: &[u8],
group_by: &[String],
filter_predicates: &[ScanFilter],
use_field_index: bool,
binary_groups: &mut std::collections::HashMap<String, Vec<Vec<u8>>>,
) {
if use_field_index {
let idx = msgpack_scan::FieldIndex::build(value, 0)
.unwrap_or_else(msgpack_scan::FieldIndex::empty);
if !filter_predicates.iter().all(|f| f.matches_binary_indexed(value, &idx)) {
return;
}
let key = msgpack_scan::group_key::build_group_key_indexed(value, group_by, &idx);
binary_groups.entry(key).or_default().push(value.to_vec()); // ← full doc clone
} else {
if !filter_predicates.iter().all(|f| f.matches_binary(value)) {
return;
}
let key = msgpack_scan::build_group_key(value, group_by);
binary_groups.entry(key).or_default().push(value.to_vec()); // ← full doc clone
}
}
nodedb/src/data/executor/handlers/aggregate.rs:301-343 — scan driver with the misleading comment:
// ── Streaming aggregation: process documents in chunks ──
// Instead of loading all documents into memory, scan in chunks of
// 10K docs, group + aggregate each chunk, then merge partial results.
// Memory: O(chunk_size + num_groups) instead of O(all_docs).
...
let mut binary_groups: std::collections::HashMap<String, Vec<Vec<u8>>> =
std::collections::HashMap::new();
let chunk_size = 10_000;
let scan_result = self
.scan_collection(tid, collection, scan_limit)
.map(|docs| {
for chunk in docs.chunks(chunk_size) {
for (_, value) in chunk {
group_doc(value, group_by, &filter_predicates,
use_field_index, &mut binary_groups);
}
}
});
The chunks(10_000) loop only paces iteration — it never bounds memory, because binary_groups accumulates across chunks. Aggregate functions are invoked on the collected bytes only at line 361 onward, after the scan finishes.
Why it's broken
- For
SELECT g, SUM(v), AVG(v) FROM t GROUP BY g over N rows with D distinct groups, memory is Σ raw_msgpack_bytes(matching rows), not O(D).
- Works for small tables, detonates at ~10 M rows with ~1 KB docs: ~10 GB RSS before a single result row is emitted.
- Paired with
count_distinct / array_agg_distinct (which independently accumulate HashSet<Vec<u8>>), the memory is doubled.
- No spill, no cap, no early error — the executor just allocates until the OS kills the process.
Reproduction
CREATE COLLECTION t (...);
-- Insert 5 M small docs with 1000 distinct values in column g.
SELECT g, SUM(v), AVG(v) FROM t GROUP BY g;
-- RSS climbs to ~(5M × avg_doc_bytes) before first row is emitted.
Routed to this path via any schemaless collection or any aggregate that falls back from the native columnar fast-path.
Notes
- Found during a CPU/memory audit sweep of
nodedb-query/src/* and nodedb/src/data/executor/handlers/.
The aggregate executor retains the raw msgpack bytes of every row in a matched group until scan completes. A multi-million-row aggregation OOMs the data-plane core — contradicting the code's own comment that claims streaming.
Summary
handle_aggregateaccumulates every matching document's raw bytes intoHashMap<String, Vec<Vec<u8>>> binary_groupsand only computes aggregate functions after the full scan finishes. The inline comment at line 304 claimsO(chunk_size + num_groups)memory, but the code path isO(total_matching_docs × avg_doc_size).Current code
nodedb/src/data/executor/handlers/aggregate.rs:67-92—group_doc:nodedb/src/data/executor/handlers/aggregate.rs:301-343— scan driver with the misleading comment:The
chunks(10_000)loop only paces iteration — it never bounds memory, becausebinary_groupsaccumulates across chunks. Aggregate functions are invoked on the collected bytes only at line 361 onward, after the scan finishes.Why it's broken
SELECT g, SUM(v), AVG(v) FROM t GROUP BY gover N rows with D distinct groups, memory isΣ raw_msgpack_bytes(matching rows), notO(D).count_distinct/array_agg_distinct(which independently accumulateHashSet<Vec<u8>>), the memory is doubled.Reproduction
Routed to this path via any schemaless collection or any aggregate that falls back from the native columnar fast-path.
Notes
nodedb-query/src/*andnodedb/src/data/executor/handlers/.