Intermediate result blocked approach to aggregation memory management#15591
Intermediate result blocked approach to aggregation memory management#15591Rachelint wants to merge 102 commits into
Conversation
|
Hi @Rachelint I think I have a alternative proposal that seems relatively easy to implement. |
Really thanks. This design in pr indeed still introduces quite a few code changes... I tried to not modify anythings about
But I found this way will introduce too many extra cost... Maybe we place the |
cc37eba to
f690940
Compare
95c6a36 to
a4c6f42
Compare
2100a5b to
0ee951c
Compare
|
Has finished development(and test) of all needed common structs!
|
c51d409 to
2863809
Compare
|
It is very close, just need to add more tests! |
31d660d to
2b8dd1e
Compare
- Promote `push_block`/`pop_block` to `BlockStore` trait methods so any block store can be drained generically. `FlatBlockStore` implements them as direct replace + `mem::take`; `BlockedBlockStore` introduces `EmitContext` to lazily move accumulation blocks out on first pop and drain via cursor. - Replace the `VecBlockStore<T>` extension trait with a `VecBlockStore<T, S>` struct that wraps any `S: BlockStore<Vec<T>>` and implements `emit` purely via `push_block`/`pop_block`, removing the per-store `emit` impls. - Update `PrimitiveGroupsState` and `GroupValuesPrimitiveState` to bound the inner store with `BlockStore<Vec<V>> + Send`, hold the wrapper as `VecBlockStore<V, VB>`, and add `V: Send` where the closure passed to `NullState::accumulate` requires it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…anBlock> - Replace the `SeenValueStore` extension trait (and the per-store `emit` impls on `FlatBlockStore<BooleanBlock>` / `BlockedBlockStore<BooleanBlock>`) with a `SeenValueStore<S>` struct that wraps any `S: BlockStore<BooleanBlock>` and implements `emit` purely via `pop_block` + `BooleanBlock::finish` + `push_block`. - Update `SeenValues` and `NullState` to bound `S` with `BlockStore<BooleanBlock>` and hold the wrapper as `SeenValueStore<S>`; `NullState::new` wraps the empty builder internally so callers stay unchanged. - Update `PrimitiveGroupsState` bound from `SeenValueStore + Send` to `BlockStore<BooleanBlock> + Send`. - Keep only the inherent methods that have call sites (`set_bit`, `size`, `resize`, `num_blocks`, `emit`) plus `Index`/`IndexMut`; drop the unused `push_block`/`pop_block`/`allocate_block`/`is_empty`/`clear` delegators. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Marking as a draft as I don't think this one is ready to merge quite yet and I am trying to clean up the review / merge queue |
Yes, and I think the whole feature will be suitable to push forward after the aggregation refactoring stable. Howerver, actually to parts are included in this:
How about we split this pr into twos or mores? And push forward the part one ( |
|
And before splitting, I will continue to make and prove the refactoring of |
I think it's a good idea, this is important work and it would be easier to review if split into smaller PRs. |
I think the steps are
The performance seems to be a nearly solved issue, the PoC already showed high cardinality cases are faster (with several micro optimizations left on the table), low cardinality is slightly slower but @alamb's suggestion in #22712 (comment) is doable I think, to bring back the performance. I suggest not trying to parallelize steps 1 and 2, as they will likely conflict with each other. Step 3 should be highly parallelizable. As for the refactoring progress, I'd estimate it's about 50% complete. I haven't seen any major technical blockers so far—just need some time to better structure the implementation. |
Make sense.
Yes, and actually I think it make few difference to performance after experiment before (some steps are improved like removing slice of record batch, removing Vec resizing, and some steps are regressed like we need to perform 2 index op, and finally near to no difference will be made), and just a better memory management approach. |
I disagree, since the memory management is directly tied to performance via the spilling mechanism when running with memory limits configured. See #22526 (comment)
So I believe the new "blocked" approach will have significant performance improvements in production-like workloads. |
I agree we could proceed first without worrying too much about the benchmark numbers. This is like a tradeoff between micro-optimizations and algorithmic improvements to memory efficiency. I think completely giving up 10%-ish performance for architectural win is already a good idea. But realistically, I also believe it should be possible to avoid the regressions entirely with some low-level optimizations, but we'd better discuss those opportunities later. |
Good point, no difference to performance is maybe just for benchmark. |
|
We can run benchmarks with memory limits to force spilling if that helps |
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
@adriangb hello, is it possible to authorize me to trigger benchmark through bot? |
|
run benchmarks clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing intermeidate-result-blocked-approach (5869167) to a27f030 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Which issue does this PR close?
Rationale for this change
As mentioned in #7065 , we use a single
Vecto manageaggregation intermediate resultsboth inGroupAccumulatorandGroupValues.It is simple but not efficient enough in high-cardinality aggregation, because when
Vecis not large enough, we need to allocate a newVecand copy all data from the old one.So this pr introduces a
blocked approachto manage theaggregation intermediate results. We will never resize theVecin the approach, and instead we split the data to blocks, when the capacity is not enough, we just allocate a new block. Detail can see #7065What changes are included in this PR?
PrimitiveGroupsAccumulatorandGroupValuesPrimitiveas the exampleAre these changes tested?
Test by exist tests. And new unit tests, new fuzzy tests.
Are there any user-facing changes?
Two functions are added to
GroupValuesandGroupAccumulatortrait.But as you can see, there are default implementations for them, and users can choose to really support the blocked approach when wanting a better performance for their
udafs.