[fix](be) Preserve shuffle for serial merge aggregation#63529
[fix](be) Preserve shuffle for serial merge aggregation#63529BiteTheDDDDt wants to merge 2 commits into
Conversation
### What problem does this PR solve?
Issue Number: None
Related PR: None
Problem Summary: When experimental serial exchange is enabled and local exchange before aggregation is disabled, a serial exchange source can be followed by a passthrough local exchange before a non-finalizing merge aggregation. This breaks the hash distribution required by DISTINCT aggregation, so duplicate distinct keys can be merged by different local tasks and later partial sums can produce incorrect results. Preserve hash shuffle distribution for serial-child merge aggregation and compute the merge flag during init so local exchange planning can see it.
### Release note
Fix occasional incorrect DISTINCT aggregate results when serial exchange is enabled.
### Check List (For Author)
- Test: Manual test
- build-support/clang-format.sh
- build-support/check-format.sh
- ./run-regression-test.sh --run -d nereids_syntax_p0 -s agg_4_phase (reproduced failure on old BE before patched binary update)
- Behavior changed: Yes. Serial-child merge aggregation now preserves hash local shuffle when required for correctness.
- Does this need documentation: No
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
|
/review |
There was a problem hiding this comment.
Pull request overview
Fixes incorrect results for DISTINCT merge aggregation when experimental_use_serial_exchange is enabled and enable_local_exchange_before_agg is disabled by ensuring hash local shuffle is preserved for merge aggregations that require partition expressions (rather than inserting a passthrough local exchange that breaks key distribution).
Changes:
- Preserve
HASH_SHUFFLE/BUCKET_HASH_SHUFFLElocal exchange for non-finalizing merge aggregation when the child operator is serial and partition expressions exist. - Compute the
_is_mergeflag earlier duringAggSinkOperatorX::init()to make it available during local exchange planning (which runs beforeprepare()). - Add a regression test covering
sum(distinct ...)correctness under serial exchange + disabled local-exchange-before-agg.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy | Adds a serial-exchange DISTINCT aggregate query and required session settings. |
| regression-test/data/nereids_syntax_p0/agg_4_phase.out | Adds expected output for the new regression case. |
| be/src/exec/operator/aggregation_sink_operator.h | Adjusts required input data distribution to preserve hash shuffle for merge agg with serial child. |
| be/src/exec/operator/aggregation_sink_operator.cpp | Moves merge-flag computation into init() (from prepare() path) to support earlier planning. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| TSortInfo dummy; | ||
| for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) { | ||
| AggFnEvaluator* evaluator = nullptr; | ||
| RETURN_IF_ERROR(AggFnEvaluator::create( | ||
| _pool, tnode.agg_node.aggregate_functions[i], | ||
| tnode.agg_node.__isset.agg_sort_infos ? tnode.agg_node.agg_sort_infos[i] : dummy, | ||
| tnode.agg_node.grouping_exprs.empty(), false, &evaluator)); | ||
| _aggregate_evaluators.push_back(evaluator); | ||
| _is_merge |= evaluator->is_merge(); | ||
| } |
There was a problem hiding this comment.
I did not add a duplicate inline comment because the only blocking issue I found is already covered by the existing thread on be/src/exec/operator/aggregation_sink_operator.cpp: _is_merge is now derived only during init(), while BE test/operator construction paths can prepare mocked aggregation sinks without that initialization path. That existing thread should be resolved before approval.
Critical checkpoint conclusions:
- Goal/test: The PR targets preserving hash shuffle for serial merge aggregation and adds a regression case for serial exchange plus distinct aggregation. The test covers the intended end-to-end result, but the existing
_is_mergelifecycle issue leaves non-standard/test construction paths uncovered. - Scope: The reviewable PR diff is focused on aggregation distribution and its regression test.
- Concurrency/lifecycle: No new concurrency primitives are introduced. Lifecycle is the main risk, specifically
_is_mergedepending oninit()rather than the evaluator calculation path used by existing tests/mocks. - Config/compatibility/persistence: No new configuration, wire/storage format, or persistence changes.
- Parallel paths: Streaming aggregation and normal aggregation distribution paths were checked; the change is targeted to non-finalize merge aggregation with a serial child.
- Testing: The added regression test is deterministic because it returns a single row. Missing coverage remains for the BE test path already identified in the existing inline thread.
- Observability/performance: No new observability is required; the added distribution decision should not add unnecessary work except in the intended serial merge case.
User focus: No additional user-provided review focus was specified.
### What problem does this PR solve?
Issue Number: None
Related PR: None
Problem Summary: The previous version handled serial-child merge aggregation with an early return inside the disabled local-exchange-before-agg branch. Rewrite the condition so merge aggregation with a serial child naturally falls through to the existing hash distribution path, while preserving the null child guard during pipeline construction.
### Release note
None
### Check List (For Author)
- Test: Manual test
- build-support/clang-format.sh
- build-support/check-format.sh
- Behavior changed: No. This only clarifies the condition added for serial-child merge aggregation.
- Does this need documentation: No
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
run buildall |
TPC-H: Total hot run time: 31364 ms |
TPC-DS: Total hot run time: 169132 ms |
TPC-H: Total hot run time: 31252 ms |
TPC-DS: Total hot run time: 168868 ms |
What problem does this PR solve?
Issue Number: None
Related PR: #62438
Problem Summary: PR #62438 added
enable_local_exchange_before_aggto allow skipping local exchange before non-finalizing aggregation. That optimization used!_needs_finalizeas the condition, but non-finalizing aggregation includes both first-phase update/serialize aggregation and merge/serialize aggregation.When
experimental_use_serial_exchangeis enabled andenable_local_exchange_before_aggis disabled, a serial exchange source can be followed by the defaultPASSTHROUGHlocal exchange before a non-finalizing merge aggregation. For DISTINCT aggregation, the merge aggregation is the stage that deduplicates distinct keys after hash exchange.PASSTHROUGHonly restores local parallelism and does not preserve key distribution, so duplicate distinct keys can be processed by different local tasks and later partial sums can produce incorrect results.This PR keeps the knob behavior for aggregation stages that can safely skip the local hash exchange, but excludes merge aggregation with a serial child from the skip path. That case falls through to the existing
HASH_SHUFFLE/BUCKET_HASH_SHUFFLEdistribution requirement. The PR also computes the merge flag duringAggSinkOperatorX::init()because local exchange planning runs beforeprepare().Release note
Fix occasional incorrect DISTINCT aggregate results when serial exchange is enabled.
Check List (For Author)
build-support/clang-format.shbuild-support/check-format.sh./run-regression-test.sh --run -d nereids_syntax_p0 -s agg_4_phase(reproduced failure on old BE before patched binary update)