From ecbfda56fc24d21f4f217c7d015d1226e7f0c9cd Mon Sep 17 00:00:00 2001 From: Mryange Date: Fri, 22 May 2026 20:16:47 +0800 Subject: [PATCH] upd --- be/src/exec/exchange/local_exchanger.cpp | 9 ++ .../exec/pipeline/local_exchanger_test.cpp | 92 +++++++++++++++++++ 2 files changed, 101 insertions(+) diff --git a/be/src/exec/exchange/local_exchanger.cpp b/be/src/exec/exchange/local_exchanger.cpp index 620aae737050d6..0840dda18a2e08 100644 --- a/be/src/exec/exchange/local_exchanger.cpp +++ b/be/src/exec/exchange/local_exchanger.cpp @@ -167,6 +167,9 @@ Status ShuffleExchanger::get_block(RuntimeState* state, Block* block, bool* eos, mutable_block = VectorizedUtils::build_mutable_mem_reuse_block( block, partitioned_block.first->_data_block); RETURN_IF_ERROR(get_data()); + if (mutable_block.rows() > 0) { + *block = mutable_block.to_block(); + } } return Status::OK(); } @@ -425,6 +428,9 @@ Status BroadcastExchanger::get_block(RuntimeState* state, Block* block, bool* eo RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->_data_block, partitioned_block.second.offset_start, partitioned_block.second.length)); + if (mutable_block.rows() > 0) { + *block = mutable_block.to_block(); + } } return Status::OK(); @@ -573,6 +579,9 @@ Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, Block* block mutable_block = VectorizedUtils::build_mutable_mem_reuse_block( block, partitioned_block.first->_data_block); RETURN_IF_ERROR(get_data()); + if (mutable_block.rows() > 0) { + *block = mutable_block.to_block(); + } } return Status::OK(); } diff --git a/be/test/exec/pipeline/local_exchanger_test.cpp b/be/test/exec/pipeline/local_exchanger_test.cpp index 3051625a3ee530..b16a3be6573e2a 100644 --- a/be/test/exec/pipeline/local_exchanger_test.cpp +++ b/be/test/exec/pipeline/local_exchanger_test.cpp @@ -1153,6 +1153,98 @@ TEST_F(LocalExchangerTest, AdaptivePassthroughExchanger) { } } +TEST_F(LocalExchangerTest, AdaptivePassthroughExchangerReturnsCopiedSliceWithSharedOutputColumn) { + int num_sink = 1; + int num_sources = 4; + int free_block_limit = 0; + const auto num_rows_per_block = num_sources * 3; + const auto expected_rows_per_source = num_rows_per_block / num_sources; + config::local_exchange_buffer_mem_limit = 1024 * 1024; + + std::vector> sink_local_states; + std::vector> source_local_states; + sink_local_states.resize(num_sink); + source_local_states.resize(num_sources); + auto profile = std::make_shared(""); + auto shared_state = LocalExchangeSharedState::create_shared(num_sources); + shared_state->exchanger = + AdaptivePassthroughExchanger::create_unique(num_sink, num_sources, free_block_limit); + auto sink_dep = std::make_shared(0, 0, "LOCAL_EXCHANGE_SINK_DEPENDENCY", true); + sink_dep->set_shared_state(shared_state.get()); + shared_state->sink_deps.push_back(sink_dep); + shared_state->create_source_dependencies(num_sources, 0, 0, "TEST"); + + auto* exchanger = (AdaptivePassthroughExchanger*)shared_state->exchanger.get(); + for (size_t i = 0; i < num_sink; i++) { + sink_local_states[i] = std::make_unique(nullptr, nullptr); + sink_local_states[i]->_exchanger = shared_state->exchanger.get(); + sink_local_states[i]->_compute_hash_value_timer = + ADD_TIMER(profile, "ComputeHashValueTime" + std::to_string(i)); + sink_local_states[i]->_distribute_timer = + ADD_TIMER(profile, "distribute_timer" + std::to_string(i)); + sink_local_states[i]->_channel_id = i; + sink_local_states[i]->_ins_idx = i; + sink_local_states[i]->_shared_state = shared_state.get(); + sink_local_states[i]->_dependency = sink_dep.get(); + sink_local_states[i]->_memory_used_counter = profile->AddHighWaterMarkCounter( + "SinkMemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1); + } + for (size_t i = 0; i < num_sources; i++) { + source_local_states[i] = + std::make_unique(_runtime_state.get(), nullptr); + source_local_states[i]->_exchanger = shared_state->exchanger.get(); + source_local_states[i]->_get_block_failed_counter = + ADD_TIMER(profile, "_get_block_failed_counter" + std::to_string(i)); + source_local_states[i]->_copy_data_timer = + ADD_TIMER(profile, "_copy_data_timer" + std::to_string(i)); + source_local_states[i]->_channel_id = i; + source_local_states[i]->_shared_state = shared_state.get(); + source_local_states[i]->_dependency = shared_state->get_dep_by_channel_id(i).front().get(); + source_local_states[i]->_memory_used_counter = profile->AddHighWaterMarkCounter( + "MemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1); + shared_state->mem_counters[i] = source_local_states[i]->_memory_used_counter; + } + + Block in_block; + DataTypePtr int_type = std::make_shared(); + auto int_col0 = ColumnInt32::create(); + for (int i = 0; i < num_rows_per_block; ++i) { + int_col0->insert_value(i); + } + in_block.insert({std::move(int_col0), int_type, "test_int_col0"}); + bool in_eos = false; + SinkInfo sink_info = {.channel_id = &sink_local_states[0]->_channel_id, + .partitioner = sink_local_states[0]->_partitioner.get(), + .local_state = sink_local_states[0].get(), + .shuffle_idx_to_instance_idx = nullptr, + .ins_idx = 0}; + ASSERT_EQ(exchanger->sink(_runtime_state.get(), &in_block, in_eos, + {sink_local_states[0]->_compute_hash_value_timer, + sink_local_states[0]->_distribute_timer, nullptr}, + sink_info), + Status::OK()); + + Block output_block; + output_block.insert({ColumnInt32::create(), int_type, "test_int_col0"}); + ColumnPtr shared_empty_column = output_block.get_by_position(0).column; + ASSERT_EQ(shared_empty_column->size(), 0); + + bool eos = false; + ASSERT_EQ(exchanger->get_block(_runtime_state.get(), &output_block, &eos, + {nullptr, nullptr, source_local_states[0]->_copy_data_timer}, + {cast_set(source_local_states[0]->_channel_id), + source_local_states[0].get()}), + Status::OK()); + EXPECT_FALSE(eos); + ASSERT_EQ(output_block.rows(), expected_rows_per_source); + + const auto& result_column = + assert_cast(*output_block.get_by_position(0).column); + EXPECT_EQ(result_column.get_element(0), 0); + EXPECT_EQ(result_column.get_element(1), 4); + EXPECT_EQ(result_column.get_element(2), 8); +} + TEST_F(LocalExchangerTest, TestShuffleExchangerWrongMap) { int num_sink = 1; int num_sources = 4;