diff --git a/be/src/exec/operator/aggregation_sink_operator.cpp b/be/src/exec/operator/aggregation_sink_operator.cpp index f6a9c2cdc4211d..677a13330e2932 100644 --- a/be/src/exec/operator/aggregation_sink_operator.cpp +++ b/be/src/exec/operator/aggregation_sink_operator.cpp @@ -871,6 +871,7 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { tnode.agg_node.__isset.agg_sort_infos ? tnode.agg_node.agg_sort_infos[i] : dummy, tnode.agg_node.grouping_exprs.empty(), false, &evaluator)); _aggregate_evaluators.push_back(evaluator); + _is_merge |= evaluator->is_merge(); } if (tnode.agg_node.__isset.agg_sort_info_by_group_key) { @@ -943,7 +944,6 @@ Status AggSinkOperatorX::_init_aggregate_evaluators(RuntimeState* state) { Status AggSinkOperatorX::_calc_aggregate_evaluators() { _offsets_of_aggregate_states.resize(_aggregate_evaluators.size()); - _is_merge = false; for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { _offsets_of_aggregate_states[i] = _total_size_of_aggregate_states; @@ -966,9 +966,6 @@ Status AggSinkOperatorX::_calc_aggregate_evaluators() { (_total_size_of_aggregate_states + alignment_of_next_state - 1) / alignment_of_next_state * alignment_of_next_state; } - if (_aggregate_evaluators[i]->is_merge()) { - _is_merge = true; - } } return Status::OK(); } diff --git a/be/src/exec/operator/aggregation_sink_operator.h b/be/src/exec/operator/aggregation_sink_operator.h index f0cc6b70d4e567..605bb62a1dd4b3 100644 --- a/be/src/exec/operator/aggregation_sink_operator.h +++ b/be/src/exec/operator/aggregation_sink_operator.h @@ -163,7 +163,8 @@ class AggSinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX::required_data_distribution( state); } - if (!_needs_finalize && !state->enable_local_exchange_before_agg()) { + if (!_needs_finalize && !state->enable_local_exchange_before_agg() && + !(_is_merge && _child && _child->is_serial_operator())) { return DataSinkOperatorX::required_data_distribution(state); } return _is_colocate && _require_bucket_distribution diff --git a/regression-test/data/nereids_syntax_p0/agg_4_phase.out b/regression-test/data/nereids_syntax_p0/agg_4_phase.out index 97a97b8816d80e..0d22b7039e9b2f 100644 --- a/regression-test/data/nereids_syntax_p0/agg_4_phase.out +++ b/regression-test/data/nereids_syntax_p0/agg_4_phase.out @@ -17,3 +17,5 @@ 2 -4,-4 1 b 1 3 -4 1 f 1 +-- !serial_exchange_distinct_sum -- +45 44850 diff --git a/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy b/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy index e38c92018ff257..772ba119b163a7 100644 --- a/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy +++ b/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy @@ -85,11 +85,27 @@ suite("agg_4_phase") { from agg_4_phase_tbl2 group by id order by id""" + + sql "drop table if exists agg_4_phase_serial_exchange_tbl" + sql """ + create table agg_4_phase_serial_exchange_tbl ( + pk int, + x int, + y int + ) engine=olap + duplicate key(pk) + distributed by hash(pk) buckets 10 + properties("replication_num"="1"); + """ + sql """ + insert into agg_4_phase_serial_exchange_tbl + select number, number % 10, number from numbers("number" = "300"); + """ multi_sql """ set runtime_filter_type= "BLOOM_FILTER,MIN_MAX"; set enable_runtime_filter_prune= "false"; set exchange_multi_blocks_byte_size= "4722978"; -set parallel_pipeline_task_num= "3"; +set parallel_pipeline_task_num= "4"; set experimental_parallel_scan_min_rows_per_scanner= "256"; set enable_strong_consistency_read= "true"; set runtime_filter_wait_infinitely= "true"; @@ -97,6 +113,7 @@ set enable_share_hash_table_for_broadcast_join= "false"; set experimental_parallel_scan_max_scanners_count= "8"; set disable_streaming_preaggregations= "true"; set experimental_use_serial_exchange= "true"; +set enable_local_exchange_before_agg= "false"; """ qt_phase4_multi_distinct """ select @@ -108,4 +125,10 @@ set experimental_use_serial_exchange= "true"; from agg_4_phase_tbl2 group by id order by id""" + + qt_serial_exchange_distinct_sum """ + select + sum(distinct x), + sum(y) + from agg_4_phase_serial_exchange_tbl""" }