Skip to content

GROUP BY aggregate materialises every matching document in memory #39

@hollanf

Description

@hollanf

The aggregate executor retains the raw msgpack bytes of every row in a matched group until scan completes. A multi-million-row aggregation OOMs the data-plane core — contradicting the code's own comment that claims streaming.

Summary

handle_aggregate accumulates every matching document's raw bytes into HashMap<String, Vec<Vec<u8>>> binary_groups and only computes aggregate functions after the full scan finishes. The inline comment at line 304 claims O(chunk_size + num_groups) memory, but the code path is O(total_matching_docs × avg_doc_size).

Current code

nodedb/src/data/executor/handlers/aggregate.rs:67-92group_doc:

fn group_doc(
    value: &[u8],
    group_by: &[String],
    filter_predicates: &[ScanFilter],
    use_field_index: bool,
    binary_groups: &mut std::collections::HashMap<String, Vec<Vec<u8>>>,
) {
    if use_field_index {
        let idx = msgpack_scan::FieldIndex::build(value, 0)
            .unwrap_or_else(msgpack_scan::FieldIndex::empty);
        if !filter_predicates.iter().all(|f| f.matches_binary_indexed(value, &idx)) {
            return;
        }
        let key = msgpack_scan::group_key::build_group_key_indexed(value, group_by, &idx);
        binary_groups.entry(key).or_default().push(value.to_vec());   // ← full doc clone
    } else {
        if !filter_predicates.iter().all(|f| f.matches_binary(value)) {
            return;
        }
        let key = msgpack_scan::build_group_key(value, group_by);
        binary_groups.entry(key).or_default().push(value.to_vec());   // ← full doc clone
    }
}

nodedb/src/data/executor/handlers/aggregate.rs:301-343 — scan driver with the misleading comment:

// ── Streaming aggregation: process documents in chunks ──
// Instead of loading all documents into memory, scan in chunks of
// 10K docs, group + aggregate each chunk, then merge partial results.
// Memory: O(chunk_size + num_groups) instead of O(all_docs).
...
let mut binary_groups: std::collections::HashMap<String, Vec<Vec<u8>>> =
    std::collections::HashMap::new();
let chunk_size = 10_000;
let scan_result = self
    .scan_collection(tid, collection, scan_limit)
    .map(|docs| {
        for chunk in docs.chunks(chunk_size) {
            for (_, value) in chunk {
                group_doc(value, group_by, &filter_predicates,
                          use_field_index, &mut binary_groups);
            }
        }
    });

The chunks(10_000) loop only paces iteration — it never bounds memory, because binary_groups accumulates across chunks. Aggregate functions are invoked on the collected bytes only at line 361 onward, after the scan finishes.

Why it's broken

  • For SELECT g, SUM(v), AVG(v) FROM t GROUP BY g over N rows with D distinct groups, memory is Σ raw_msgpack_bytes(matching rows), not O(D).
  • Works for small tables, detonates at ~10 M rows with ~1 KB docs: ~10 GB RSS before a single result row is emitted.
  • Paired with count_distinct / array_agg_distinct (which independently accumulate HashSet<Vec<u8>>), the memory is doubled.
  • No spill, no cap, no early error — the executor just allocates until the OS kills the process.

Reproduction

CREATE COLLECTION t (...);
-- Insert 5 M small docs with 1000 distinct values in column g.
SELECT g, SUM(v), AVG(v) FROM t GROUP BY g;
-- RSS climbs to ~(5M × avg_doc_bytes) before first row is emitted.

Routed to this path via any schemaless collection or any aggregate that falls back from the native columnar fast-path.

Notes

  • Found during a CPU/memory audit sweep of nodedb-query/src/* and nodedb/src/data/executor/handlers/.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions