Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions be/src/exec/operator/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,7 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
tnode.agg_node.__isset.agg_sort_infos ? tnode.agg_node.agg_sort_infos[i] : dummy,
tnode.agg_node.grouping_exprs.empty(), false, &evaluator));
_aggregate_evaluators.push_back(evaluator);
_is_merge |= evaluator->is_merge();
}

if (tnode.agg_node.__isset.agg_sort_info_by_group_key) {
Expand Down Expand Up @@ -943,7 +944,6 @@ Status AggSinkOperatorX::_init_aggregate_evaluators(RuntimeState* state) {

Status AggSinkOperatorX::_calc_aggregate_evaluators() {
_offsets_of_aggregate_states.resize(_aggregate_evaluators.size());
_is_merge = false;
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
_offsets_of_aggregate_states[i] = _total_size_of_aggregate_states;

Expand All @@ -966,9 +966,6 @@ Status AggSinkOperatorX::_calc_aggregate_evaluators() {
(_total_size_of_aggregate_states + alignment_of_next_state - 1) /
alignment_of_next_state * alignment_of_next_state;
}
if (_aggregate_evaluators[i]->is_merge()) {
_is_merge = true;
}
}
return Status::OK();
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/operator/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ class AggSinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX<AggSinkLoca
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution(
state);
}
if (!_needs_finalize && !state->enable_local_exchange_before_agg()) {
if (!_needs_finalize && !state->enable_local_exchange_before_agg() &&
!(_is_merge && _child && _child->is_serial_operator())) {
return DataSinkOperatorX<AggSinkLocalState>::required_data_distribution(state);
}
return _is_colocate && _require_bucket_distribution
Expand Down
2 changes: 2 additions & 0 deletions regression-test/data/nereids_syntax_p0/agg_4_phase.out
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@
2 -4,-4 1 b 1
3 -4 1 f 1

-- !serial_exchange_distinct_sum --
45 44850
25 changes: 24 additions & 1 deletion regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,35 @@ suite("agg_4_phase") {
from agg_4_phase_tbl2
group by id
order by id"""

sql "drop table if exists agg_4_phase_serial_exchange_tbl"
sql """
create table agg_4_phase_serial_exchange_tbl (
pk int,
x int,
y int
) engine=olap
duplicate key(pk)
distributed by hash(pk) buckets 10
properties("replication_num"="1");
"""
sql """
insert into agg_4_phase_serial_exchange_tbl
select number, number % 10, number from numbers("number" = "300");
"""
multi_sql """
set runtime_filter_type= "BLOOM_FILTER,MIN_MAX";
set enable_runtime_filter_prune= "false";
set exchange_multi_blocks_byte_size= "4722978";
set parallel_pipeline_task_num= "3";
set parallel_pipeline_task_num= "4";
set experimental_parallel_scan_min_rows_per_scanner= "256";
set enable_strong_consistency_read= "true";
set runtime_filter_wait_infinitely= "true";
set enable_share_hash_table_for_broadcast_join= "false";
set experimental_parallel_scan_max_scanners_count= "8";
set disable_streaming_preaggregations= "true";
set experimental_use_serial_exchange= "true";
set enable_local_exchange_before_agg= "false";
"""
qt_phase4_multi_distinct """
select
Expand All @@ -108,4 +125,10 @@ set experimental_use_serial_exchange= "true";
from agg_4_phase_tbl2
group by id
order by id"""

qt_serial_exchange_distinct_sum """
select
sum(distinct x),
sum(y)
from agg_4_phase_serial_exchange_tbl"""
}
Loading