diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index dbe52cbabc0d99..afd40596241738 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -409,15 +409,41 @@ Status PipelineTask::execute(bool* done) { // If task is woke up early, we should terminate all operators, and this task could be closed immediately. if (_wake_up_early) { - terminate(); - THROW_IF_ERROR(_root->terminate(_state)); - THROW_IF_ERROR(_sink->terminate(_state)); _eos = true; *done = true; } else if (_eos && !_spilling && (fragment_context->is_canceled() || !_is_pending_finish())) { + // Debug point for testing the race condition fix: inject set_wake_up_early() + + // terminate() here to simulate Thread B writing A then B between Thread A's two + // reads of _wake_up_early. + DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", { + set_wake_up_early(); + terminate(); + }); *done = true; } + + // NOTE: The terminate() call is intentionally placed AFTER the _is_pending_finish() check + // above, not before. This ordering is critical to avoid a race condition: + // + // Pipeline::make_all_runnable() writes in this order: + // (A) set_wake_up_early() -> (B) terminate() [sets finish_dep._always_ready] + // + // If we checked _wake_up_early (A) before _is_pending_finish() (B), there would be a + // window where Thread A reads _wake_up_early=false, then Thread B writes both A and B, + // then Thread A reads _is_pending_finish()=false (due to _always_ready). Thread A would + // then set *done=true without ever calling operator terminate(), causing close() to run + // on operators that were never properly terminated (e.g. RuntimeFilterProducer still in + // WAITING_FOR_SYNCED_SIZE state when insert() is called). + // + // By reading _is_pending_finish() (B) before the second read of _wake_up_early (A), + // if Thread A observes B's effect (_always_ready=true), it is guaranteed to also observe + // A's effect (_wake_up_early=true) on this second read, ensuring terminate() is called. + if (_wake_up_early) { + terminate(); + THROW_IF_ERROR(_root->terminate(_state)); + THROW_IF_ERROR(_sink->terminate(_state)); + } }}; const auto query_id = _state->query_id(); // If this task is already EOS and block is empty (which means we already output all blocks), diff --git a/be/test/pipeline/pipeline_task_test.cpp b/be/test/pipeline/pipeline_task_test.cpp index 8a8e16e3c4da1d..a629344363a134 100644 --- a/be/test/pipeline/pipeline_task_test.cpp +++ b/be/test/pipeline/pipeline_task_test.cpp @@ -18,6 +18,7 @@ #include #include +#include "common/config.h" #include "common/status.h" #include "dummy_task_queue.h" #include "pipeline/dependency.h" @@ -30,6 +31,7 @@ #include "testutil/mock/mock_thread_mem_tracker_mgr.h" #include "testutil/mock/mock_workload_group_mgr.h" #include "thrift_builder.h" +#include "util/debug_points.h" namespace doris::pipeline { @@ -1186,4 +1188,81 @@ TEST_F(PipelineTaskTest, TEST_INJECT_SHARED_STATE) { } } +// Test for the race condition fix between _wake_up_early and _is_pending_finish(). +// +// The race: Pipeline::make_all_runnable() writes in order (A) set_wake_up_early -> (B) terminate() +// [sets finish_dep._always_ready]. In execute()'s Defer block, if Thread A reads _wake_up_early=false +// (A), then Thread B writes A and B, then Thread A reads _is_pending_finish()=false (due to +// _always_ready from B), Thread A would set *done=true without calling operator terminate(). +// +// The fix: terminate() is called after _is_pending_finish() in the Defer. So if Thread A sees B's +// effect (_always_ready=true), it must also see A's effect (_wake_up_early=true) on the subsequent +// read, ensuring terminate() is always called. +// +// This test uses a debug point injected into the else-if branch to simulate the exact bad timing: +// the debug point fires set_wake_up_early() + terminate() after _is_pending_finish() returns false +// (due to finish_dep being naturally unblocked) but before the second _wake_up_early check. +TEST_F(PipelineTaskTest, TEST_TERMINATE_RACE_FIX) { + auto num_instances = 1; + auto pip_id = 0; + auto task_id = 0; + auto pip = std::make_shared(pip_id, num_instances, num_instances); + { + OperatorPtr source_op; + source_op.reset(new DummyOperator()); + EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok()); + + int op_id = 1; + int node_id = 2; + int dest_id = 3; + DataSinkOperatorPtr sink_op; + sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id)); + EXPECT_TRUE(pip->set_sink(sink_op).ok()); + } + auto profile = std::make_shared("Pipeline : " + std::to_string(pip_id)); + std::map, std::vector>>> + shared_state_map; + _runtime_state->resize_op_id_to_local_state(-1); + auto task = std::make_shared(pip, task_id, _runtime_state.get(), _context, + profile.get(), shared_state_map, task_id); + task->_exec_time_slice = 10'000'000'000ULL; + { + std::vector scan_range; + int sender_id = 0; + TDataSink tsink; + EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); + } + _query_ctx->get_execution_dependency()->set_ready(); + + auto* sink_finish_dep = + _runtime_state->get_sink_local_state()->cast().finishdependency(); + EXPECT_NE(sink_finish_dep, nullptr); + sink_finish_dep->block(); + + task->_operators.front()->cast()._eos = true; + { + bool done = false; + EXPECT_TRUE(task->execute(&done).ok()); + EXPECT_TRUE(task->_eos); + EXPECT_FALSE(done); + EXPECT_FALSE(task->_wake_up_early); + } + + sink_finish_dep->set_ready(); + config::enable_debug_points = true; + DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if"); + { + bool done = false; + EXPECT_TRUE(task->execute(&done).ok()); + EXPECT_TRUE(task->_eos); + EXPECT_TRUE(done); + EXPECT_TRUE(task->_wake_up_early); + EXPECT_TRUE(task->_operators.front()->cast()._terminated); + EXPECT_TRUE(task->_sink->cast()._terminated); + } + DebugPoints::instance()->clear(); + config::enable_debug_points = false; +} + } // namespace doris::pipeline