From dbaee6e6618cc8ada45cadf083115b27c5292fdd Mon Sep 17 00:00:00 2001 From: Copilot <223556219+Copilot@users.noreply.github.com> Date: Fri, 22 May 2026 15:54:12 +0800 Subject: [PATCH 1/2] [fix](be) Preserve shuffle for serial merge aggregation ### What problem does this PR solve? Issue Number: None Related PR: None Problem Summary: When experimental serial exchange is enabled and local exchange before aggregation is disabled, a serial exchange source can be followed by a passthrough local exchange before a non-finalizing merge aggregation. This breaks the hash distribution required by DISTINCT aggregation, so duplicate distinct keys can be merged by different local tasks and later partial sums can produce incorrect results. Preserve hash shuffle distribution for serial-child merge aggregation and compute the merge flag during init so local exchange planning can see it. ### Release note Fix occasional incorrect DISTINCT aggregate results when serial exchange is enabled. ### Check List (For Author) - Test: Manual test - build-support/clang-format.sh - build-support/check-format.sh - ./run-regression-test.sh --run -d nereids_syntax_p0 -s agg_4_phase (reproduced failure on old BE before patched binary update) - Behavior changed: Yes. Serial-child merge aggregation now preserves hash local shuffle when required for correctness. - Does this need documentation: No Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../operator/aggregation_sink_operator.cpp | 5 +--- .../exec/operator/aggregation_sink_operator.h | 6 +++++ .../data/nereids_syntax_p0/agg_4_phase.out | 2 ++ .../nereids_syntax_p0/agg_4_phase.groovy | 25 ++++++++++++++++++- 4 files changed, 33 insertions(+), 5 deletions(-) diff --git a/be/src/exec/operator/aggregation_sink_operator.cpp b/be/src/exec/operator/aggregation_sink_operator.cpp index f6a9c2cdc4211d..677a13330e2932 100644 --- a/be/src/exec/operator/aggregation_sink_operator.cpp +++ b/be/src/exec/operator/aggregation_sink_operator.cpp @@ -871,6 +871,7 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { tnode.agg_node.__isset.agg_sort_infos ? tnode.agg_node.agg_sort_infos[i] : dummy, tnode.agg_node.grouping_exprs.empty(), false, &evaluator)); _aggregate_evaluators.push_back(evaluator); + _is_merge |= evaluator->is_merge(); } if (tnode.agg_node.__isset.agg_sort_info_by_group_key) { @@ -943,7 +944,6 @@ Status AggSinkOperatorX::_init_aggregate_evaluators(RuntimeState* state) { Status AggSinkOperatorX::_calc_aggregate_evaluators() { _offsets_of_aggregate_states.resize(_aggregate_evaluators.size()); - _is_merge = false; for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { _offsets_of_aggregate_states[i] = _total_size_of_aggregate_states; @@ -966,9 +966,6 @@ Status AggSinkOperatorX::_calc_aggregate_evaluators() { (_total_size_of_aggregate_states + alignment_of_next_state - 1) / alignment_of_next_state * alignment_of_next_state; } - if (_aggregate_evaluators[i]->is_merge()) { - _is_merge = true; - } } return Status::OK(); } diff --git a/be/src/exec/operator/aggregation_sink_operator.h b/be/src/exec/operator/aggregation_sink_operator.h index f0cc6b70d4e567..f25a5e5962ce95 100644 --- a/be/src/exec/operator/aggregation_sink_operator.h +++ b/be/src/exec/operator/aggregation_sink_operator.h @@ -164,6 +164,12 @@ class AggSinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorXenable_local_exchange_before_agg()) { + if (_is_merge && _child && _child->is_serial_operator()) { + return _is_colocate && _require_bucket_distribution + ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, + _partition_exprs) + : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + } return DataSinkOperatorX::required_data_distribution(state); } return _is_colocate && _require_bucket_distribution diff --git a/regression-test/data/nereids_syntax_p0/agg_4_phase.out b/regression-test/data/nereids_syntax_p0/agg_4_phase.out index 97a97b8816d80e..0d22b7039e9b2f 100644 --- a/regression-test/data/nereids_syntax_p0/agg_4_phase.out +++ b/regression-test/data/nereids_syntax_p0/agg_4_phase.out @@ -17,3 +17,5 @@ 2 -4,-4 1 b 1 3 -4 1 f 1 +-- !serial_exchange_distinct_sum -- +45 44850 diff --git a/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy b/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy index e38c92018ff257..772ba119b163a7 100644 --- a/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy +++ b/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy @@ -85,11 +85,27 @@ suite("agg_4_phase") { from agg_4_phase_tbl2 group by id order by id""" + + sql "drop table if exists agg_4_phase_serial_exchange_tbl" + sql """ + create table agg_4_phase_serial_exchange_tbl ( + pk int, + x int, + y int + ) engine=olap + duplicate key(pk) + distributed by hash(pk) buckets 10 + properties("replication_num"="1"); + """ + sql """ + insert into agg_4_phase_serial_exchange_tbl + select number, number % 10, number from numbers("number" = "300"); + """ multi_sql """ set runtime_filter_type= "BLOOM_FILTER,MIN_MAX"; set enable_runtime_filter_prune= "false"; set exchange_multi_blocks_byte_size= "4722978"; -set parallel_pipeline_task_num= "3"; +set parallel_pipeline_task_num= "4"; set experimental_parallel_scan_min_rows_per_scanner= "256"; set enable_strong_consistency_read= "true"; set runtime_filter_wait_infinitely= "true"; @@ -97,6 +113,7 @@ set enable_share_hash_table_for_broadcast_join= "false"; set experimental_parallel_scan_max_scanners_count= "8"; set disable_streaming_preaggregations= "true"; set experimental_use_serial_exchange= "true"; +set enable_local_exchange_before_agg= "false"; """ qt_phase4_multi_distinct """ select @@ -108,4 +125,10 @@ set experimental_use_serial_exchange= "true"; from agg_4_phase_tbl2 group by id order by id""" + + qt_serial_exchange_distinct_sum """ + select + sum(distinct x), + sum(y) + from agg_4_phase_serial_exchange_tbl""" } From a8e496ea1def13252d8ee1f5793244d1e90efa49 Mon Sep 17 00:00:00 2001 From: Copilot <223556219+Copilot@users.noreply.github.com> Date: Fri, 22 May 2026 16:16:25 +0800 Subject: [PATCH 2/2] [fix](be) Clarify serial merge aggregation distribution ### What problem does this PR solve? Issue Number: None Related PR: None Problem Summary: The previous version handled serial-child merge aggregation with an early return inside the disabled local-exchange-before-agg branch. Rewrite the condition so merge aggregation with a serial child naturally falls through to the existing hash distribution path, while preserving the null child guard during pipeline construction. ### Release note None ### Check List (For Author) - Test: Manual test - build-support/clang-format.sh - build-support/check-format.sh - Behavior changed: No. This only clarifies the condition added for serial-child merge aggregation. - Does this need documentation: No Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- be/src/exec/operator/aggregation_sink_operator.h | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/be/src/exec/operator/aggregation_sink_operator.h b/be/src/exec/operator/aggregation_sink_operator.h index f25a5e5962ce95..605bb62a1dd4b3 100644 --- a/be/src/exec/operator/aggregation_sink_operator.h +++ b/be/src/exec/operator/aggregation_sink_operator.h @@ -163,13 +163,8 @@ class AggSinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX::required_data_distribution( state); } - if (!_needs_finalize && !state->enable_local_exchange_before_agg()) { - if (_is_merge && _child && _child->is_serial_operator()) { - return _is_colocate && _require_bucket_distribution - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, - _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); - } + if (!_needs_finalize && !state->enable_local_exchange_before_agg() && + !(_is_merge && _child && _child->is_serial_operator())) { return DataSinkOperatorX::required_data_distribution(state); } return _is_colocate && _require_bucket_distribution