Skip to content

[fix](be) Preserve shuffle for serial merge aggregation#63529

Open
BiteTheDDDDt wants to merge 2 commits into
apache:masterfrom
BiteTheDDDDt:fix-serial-exchange-distinct-agg
Open

[fix](be) Preserve shuffle for serial merge aggregation#63529
BiteTheDDDDt wants to merge 2 commits into
apache:masterfrom
BiteTheDDDDt:fix-serial-exchange-distinct-agg

Conversation

@BiteTheDDDDt
Copy link
Copy Markdown
Contributor

@BiteTheDDDDt BiteTheDDDDt commented May 22, 2026

What problem does this PR solve?

Issue Number: None

Related PR: #62438

Problem Summary: PR #62438 added enable_local_exchange_before_agg to allow skipping local exchange before non-finalizing aggregation. That optimization used !_needs_finalize as the condition, but non-finalizing aggregation includes both first-phase update/serialize aggregation and merge/serialize aggregation.

When experimental_use_serial_exchange is enabled and enable_local_exchange_before_agg is disabled, a serial exchange source can be followed by the default PASSTHROUGH local exchange before a non-finalizing merge aggregation. For DISTINCT aggregation, the merge aggregation is the stage that deduplicates distinct keys after hash exchange. PASSTHROUGH only restores local parallelism and does not preserve key distribution, so duplicate distinct keys can be processed by different local tasks and later partial sums can produce incorrect results.

This PR keeps the knob behavior for aggregation stages that can safely skip the local hash exchange, but excludes merge aggregation with a serial child from the skip path. That case falls through to the existing HASH_SHUFFLE / BUCKET_HASH_SHUFFLE distribution requirement. The PR also computes the merge flag during AggSinkOperatorX::init() because local exchange planning runs before prepare().

Release note

Fix occasional incorrect DISTINCT aggregate results when serial exchange is enabled.

Check List (For Author)

  • Test: Manual test
    • build-support/clang-format.sh
    • build-support/check-format.sh
    • ./run-regression-test.sh --run -d nereids_syntax_p0 -s agg_4_phase (reproduced failure on old BE before patched binary update)
  • Behavior changed: Yes. Serial-child merge aggregation now preserves hash local shuffle when required for correctness.
  • Does this need documentation: No

### What problem does this PR solve?

Issue Number: None

Related PR: None

Problem Summary: When experimental serial exchange is enabled and local exchange before aggregation is disabled, a serial exchange source can be followed by a passthrough local exchange before a non-finalizing merge aggregation. This breaks the hash distribution required by DISTINCT aggregation, so duplicate distinct keys can be merged by different local tasks and later partial sums can produce incorrect results. Preserve hash shuffle distribution for serial-child merge aggregation and compute the merge flag during init so local exchange planning can see it.

### Release note

Fix occasional incorrect DISTINCT aggregate results when serial exchange is enabled.

### Check List (For Author)

- Test: Manual test
    - build-support/clang-format.sh
    - build-support/check-format.sh
    - ./run-regression-test.sh --run -d nereids_syntax_p0 -s agg_4_phase (reproduced failure on old BE before patched binary update)
- Behavior changed: Yes. Serial-child merge aggregation now preserves hash local shuffle when required for correctness.
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Copilot AI review requested due to automatic review settings May 22, 2026 07:54
@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@BiteTheDDDDt
Copy link
Copy Markdown
Contributor Author

run buildall

@BiteTheDDDDt
Copy link
Copy Markdown
Contributor Author

/review

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes incorrect results for DISTINCT merge aggregation when experimental_use_serial_exchange is enabled and enable_local_exchange_before_agg is disabled by ensuring hash local shuffle is preserved for merge aggregations that require partition expressions (rather than inserting a passthrough local exchange that breaks key distribution).

Changes:

  • Preserve HASH_SHUFFLE / BUCKET_HASH_SHUFFLE local exchange for non-finalizing merge aggregation when the child operator is serial and partition expressions exist.
  • Compute the _is_merge flag earlier during AggSinkOperatorX::init() to make it available during local exchange planning (which runs before prepare()).
  • Add a regression test covering sum(distinct ...) correctness under serial exchange + disabled local-exchange-before-agg.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

File Description
regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy Adds a serial-exchange DISTINCT aggregate query and required session settings.
regression-test/data/nereids_syntax_p0/agg_4_phase.out Adds expected output for the new regression case.
be/src/exec/operator/aggregation_sink_operator.h Adjusts required input data distribution to preserve hash shuffle for merge agg with serial child.
be/src/exec/operator/aggregation_sink_operator.cpp Moves merge-flag computation into init() (from prepare() path) to support earlier planning.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 866 to 875
TSortInfo dummy;
for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) {
AggFnEvaluator* evaluator = nullptr;
RETURN_IF_ERROR(AggFnEvaluator::create(
_pool, tnode.agg_node.aggregate_functions[i],
tnode.agg_node.__isset.agg_sort_infos ? tnode.agg_node.agg_sort_infos[i] : dummy,
tnode.agg_node.grouping_exprs.empty(), false, &evaluator));
_aggregate_evaluators.push_back(evaluator);
_is_merge |= evaluator->is_merge();
}
Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not add a duplicate inline comment because the only blocking issue I found is already covered by the existing thread on be/src/exec/operator/aggregation_sink_operator.cpp: _is_merge is now derived only during init(), while BE test/operator construction paths can prepare mocked aggregation sinks without that initialization path. That existing thread should be resolved before approval.

Critical checkpoint conclusions:

  • Goal/test: The PR targets preserving hash shuffle for serial merge aggregation and adds a regression case for serial exchange plus distinct aggregation. The test covers the intended end-to-end result, but the existing _is_merge lifecycle issue leaves non-standard/test construction paths uncovered.
  • Scope: The reviewable PR diff is focused on aggregation distribution and its regression test.
  • Concurrency/lifecycle: No new concurrency primitives are introduced. Lifecycle is the main risk, specifically _is_merge depending on init() rather than the evaluator calculation path used by existing tests/mocks.
  • Config/compatibility/persistence: No new configuration, wire/storage format, or persistence changes.
  • Parallel paths: Streaming aggregation and normal aggregation distribution paths were checked; the change is targeted to non-finalize merge aggregation with a serial child.
  • Testing: The added regression test is deterministic because it returns a single row. Missing coverage remains for the BE test path already identified in the existing inline thread.
  • Observability/performance: No new observability is required; the added distribution decision should not add unnecessary work except in the intended serial merge case.

User focus: No additional user-provided review focus was specified.

### What problem does this PR solve?

Issue Number: None

Related PR: None

Problem Summary: The previous version handled serial-child merge aggregation with an early return inside the disabled local-exchange-before-agg branch. Rewrite the condition so merge aggregation with a serial child naturally falls through to the existing hash distribution path, while preserving the null child guard during pipeline construction.

### Release note

None

### Check List (For Author)

- Test: Manual test
    - build-support/clang-format.sh
    - build-support/check-format.sh
- Behavior changed: No. This only clarifies the condition added for serial-child merge aggregation.
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@BiteTheDDDDt
Copy link
Copy Markdown
Contributor Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

TPC-H: Total hot run time: 31364 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
Tpch sf100 test result on commit dbaee6e6618cc8ada45cadf083115b27c5292fdd, data reload: false

------ Round 1 ----------------------------------
orders	Doris	NULL	NULL	0	0	0	NULL	0	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	17764	4003	3812	3812
q2	q3	10805	1394	803	803
q4	4687	475	342	342
q5	7588	2314	2077	2077
q6	239	169	145	145
q7	941	794	636	636
q8	9425	1803	1617	1617
q9	5097	4939	4906	4906
q10	6398	2093	1810	1810
q11	439	281	243	243
q12	631	426	302	302
q13	18117	3384	2808	2808
q14	261	252	233	233
q15	q16	818	764	710	710
q17	1001	958	908	908
q18	7073	5705	5636	5636
q19	1294	1247	1133	1133
q20	548	518	297	297
q21	6149	2940	2631	2631
q22	474	434	315	315
Total cold run time: 99749 ms
Total hot run time: 31364 ms

----- Round 2, with runtime_filter_mode=off -----
orders	Doris	NULL	NULL	150000000	42	6422171781	NULL	22778155	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	4574	4599	4455	4455
q2	q3	4865	5212	4563	4563
q4	2133	2210	1404	1404
q5	4966	4575	4666	4575
q6	229	179	133	133
q7	1863	1733	1530	1530
q8	2328	2063	2161	2063
q9	7662	7165	7208	7165
q10	4492	4404	3985	3985
q11	521	378	351	351
q12	714	723	507	507
q13	2973	3355	2821	2821
q14	274	270	262	262
q15	q16	678	699	609	609
q17	1255	1239	1240	1239
q18	7206	6849	6653	6653
q19	1092	1076	1092	1076
q20	2221	2210	1932	1932
q21	5285	4601	4499	4499
q22	513	450	425	425
Total cold run time: 55844 ms
Total hot run time: 50247 ms

@hello-stephen
Copy link
Copy Markdown
Contributor

TPC-DS: Total hot run time: 169132 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpcds-tools
TPC-DS sf100 test result on commit dbaee6e6618cc8ada45cadf083115b27c5292fdd, data reload: false

query5	4328	654	531	531
query6	329	227	205	205
query7	4232	570	308	308
query8	319	228	222	222
query9	8830	3933	3946	3933
query10	443	341	308	308
query11	5803	2402	2217	2217
query12	184	125	139	125
query13	1256	651	402	402
query14	5919	5384	5058	5058
query14_1	4299	4316	4344	4316
query15	213	201	177	177
query16	1009	468	439	439
query17	946	716	579	579
query18	2435	496	348	348
query19	215	202	167	167
query20	135	139	127	127
query21	212	133	112	112
query22	13663	13634	13381	13381
query23	17210	16291	16121	16121
query23_1	16169	16173	16167	16167
query24	7385	1758	1294	1294
query24_1	1297	1319	1316	1316
query25	589	502	455	455
query26	1325	310	169	169
query27	2732	554	329	329
query28	4519	1937	1922	1922
query29	970	631	497	497
query30	300	233	197	197
query31	1114	1062	937	937
query32	86	75	71	71
query33	536	350	298	298
query34	1183	1087	621	621
query35	765	797	694	694
query36	1303	1355	1217	1217
query37	157	102	89	89
query38	3220	3147	3091	3091
query39	930	921	905	905
query39_1	884	885	869	869
query40	228	151	128	128
query41	77	63	61	61
query42	110	108	111	108
query43	329	325	289	289
query44	
query45	209	199	194	194
query46	1072	1241	697	697
query47	2287	2312	2257	2257
query48	408	415	283	283
query49	632	484	383	383
query50	1006	335	244	244
query51	4362	4283	4339	4283
query52	109	106	94	94
query53	247	283	199	199
query54	318	271	253	253
query55	95	92	85	85
query56	315	315	324	315
query57	1415	1420	1321	1321
query58	292	271	264	264
query59	1548	1608	1398	1398
query60	331	323	345	323
query61	160	156	155	155
query62	675	624	561	561
query63	236	200	200	200
query64	2422	798	644	644
query65	
query66	1734	468	350	350
query67	29404	29951	29710	29710
query68	
query69	475	341	286	286
query70	1019	1006	989	989
query71	302	274	274	274
query72	3009	2734	2593	2593
query73	814	822	451	451
query74	5104	4911	4740	4740
query75	2675	2624	2292	2292
query76	2282	1128	771	771
query77	409	417	340	340
query78	12146	11952	11698	11698
query79	1287	1071	723	723
query80	614	578	487	487
query81	455	284	248	248
query82	231	159	124	124
query83	284	288	256	256
query84	297	144	116	116
query85	961	614	550	550
query86	363	343	328	328
query87	3409	3381	3214	3214
query88	3530	2666	2635	2635
query89	419	388	334	334
query90	2165	177	168	168
query91	178	167	141	141
query92	82	78	72	72
query93	1500	1323	836	836
query94	530	372	305	305
query95	673	375	376	375
query96	1055	805	325	325
query97	2670	2675	2543	2543
query98	233	231	226	226
query99	1089	1113	976	976
Total cold run time: 250590 ms
Total hot run time: 169132 ms

@hello-stephen
Copy link
Copy Markdown
Contributor

TPC-H: Total hot run time: 31252 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
Tpch sf100 test result on commit a8e496ea1def13252d8ee1f5793244d1e90efa49, data reload: false

------ Round 1 ----------------------------------
orders	Doris	NULL	NULL	0	0	0	NULL	0	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	17590	3852	3904	3852
q2	q3	10766	1352	810	810
q4	4685	480	349	349
q5	7647	2275	2112	2112
q6	249	175	141	141
q7	949	788	626	626
q8	9350	1634	1620	1620
q9	5151	4941	4879	4879
q10	6399	2083	1790	1790
q11	461	276	248	248
q12	627	432	300	300
q13	18105	3382	2821	2821
q14	263	258	232	232
q15	q16	790	783	707	707
q17	942	914	954	914
q18	6974	5940	5541	5541
q19	1310	1268	1062	1062
q20	513	390	323	323
q21	6447	2883	2587	2587
q22	461	372	338	338
Total cold run time: 99679 ms
Total hot run time: 31252 ms

----- Round 2, with runtime_filter_mode=off -----
orders	Doris	NULL	NULL	150000000	42	6422171781	NULL	22778155	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	4718	4457	4598	4457
q2	q3	4830	5169	4552	4552
q4	2136	2200	1388	1388
q5	4701	4739	4582	4582
q6	232	178	137	137
q7	1815	1890	1564	1564
q8	2336	2079	2061	2061
q9	7715	7683	7166	7166
q10	4450	4377	3969	3969
q11	551	383	343	343
q12	699	712	503	503
q13	2986	3376	2809	2809
q14	268	287	248	248
q15	q16	675	710	599	599
q17	1246	1235	1217	1217
q18	7414	6749	6718	6718
q19	1082	1109	1105	1105
q20	2234	2220	1929	1929
q21	5266	4580	4491	4491
q22	530	464	401	401
Total cold run time: 55884 ms
Total hot run time: 50239 ms

@hello-stephen
Copy link
Copy Markdown
Contributor

TPC-DS: Total hot run time: 168868 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpcds-tools
TPC-DS sf100 test result on commit a8e496ea1def13252d8ee1f5793244d1e90efa49, data reload: false

query5	4302	666	515	515
query6	352	209	194	194
query7	4215	571	299	299
query8	336	229	216	216
query9	8869	3967	3940	3940
query10	448	333	325	325
query11	5796	2407	2185	2185
query12	173	128	121	121
query13	1252	590	398	398
query14	5900	5344	4984	4984
query14_1	4334	4336	4282	4282
query15	210	200	180	180
query16	1005	452	431	431
query17	950	703	582	582
query18	2457	490	348	348
query19	228	197	160	160
query20	136	138	129	129
query21	220	142	118	118
query22	13721	13593	13491	13491
query23	17190	16447	16032	16032
query23_1	16084	16094	16136	16094
query24	7555	1749	1300	1300
query24_1	1306	1285	1320	1285
query25	518	459	404	404
query26	1344	318	172	172
query27	2656	554	334	334
query28	4498	1943	1953	1943
query29	1027	622	486	486
query30	299	234	199	199
query31	1111	1071	957	957
query32	88	74	71	71
query33	537	360	290	290
query34	1191	1181	642	642
query35	764	806	673	673
query36	1316	1316	1209	1209
query37	170	108	92	92
query38	3204	3159	3063	3063
query39	930	929	884	884
query39_1	876	879	877	877
query40	230	155	129	129
query41	72	69	69	69
query42	111	114	111	111
query43	322	328	284	284
query44	
query45	220	205	204	204
query46	1078	1211	715	715
query47	2333	2344	2155	2155
query48	419	405	313	313
query49	644	503	402	402
query50	1020	340	251	251
query51	4343	4438	4246	4246
query52	107	104	93	93
query53	266	288	206	206
query54	329	282	266	266
query55	97	95	90	90
query56	326	315	320	315
query57	1401	1413	1281	1281
query58	313	285	276	276
query59	1566	1634	1476	1476
query60	344	331	319	319
query61	177	180	178	178
query62	673	630	558	558
query63	241	203	212	203
query64	2479	847	671	671
query65	
query66	1757	499	372	372
query67	30106	29978	29859	29859
query68	
query69	462	350	313	313
query70	1062	999	954	954
query71	310	319	252	252
query72	3002	2701	2429	2429
query73	850	749	424	424
query74	5094	4895	4758	4758
query75	2669	2575	2248	2248
query76	2276	1153	734	734
query77	391	406	323	323
query78	12151	12242	11508	11508
query79	1499	1052	750	750
query80	768	543	437	437
query81	471	276	235	235
query82	1404	157	118	118
query83	361	281	243	243
query84	309	141	112	112
query85	921	531	461	461
query86	433	328	304	304
query87	3413	3395	3238	3238
query88	3576	2654	2635	2635
query89	441	380	335	335
query90	1833	183	179	179
query91	180	173	142	142
query92	80	80	75	75
query93	1481	1546	923	923
query94	607	350	341	341
query95	669	393	358	358
query96	1012	779	330	330
query97	2686	2685	2565	2565
query98	229	225	228	225
query99	1114	1105	963	963
Total cold run time: 253312 ms
Total hot run time: 168868 ms

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants