From cb0fdea700b7ef7e29b1df4c61af4bc4c86d8bcc Mon Sep 17 00:00:00 2001 From: Pxl Date: Thu, 26 Mar 2026 16:02:33 +0800 Subject: [PATCH] [Bug](pipeline) fix wake up early without terminate call (#61679) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``` Thread A (正在执行 HashJoin Build Task) Thread B (下游 pipeline 全部完成) ──────────────────────────────────────── ────────────────────────────────── Defer 开始执行: line 475: 读取 _wake_up_early → false decrement_running_task() 触发 make_all_runnable(): line 127: set_wake_up_early() → true line 132: terminate() → finish_dep.set_always_ready() line 481: else if (_eos && !_spilling && !_is_pending_finish()) _is_pending_finish() = false ← 因为 always_ready! line 483: *done = true ← 注意: _sink->terminate() 从未被调用! close_task(): task->close(OK): ``` This pull request addresses a subtle race condition in the pipeline task execution logic and adds a targeted test to verify the fix. The main improvement ensures that operator termination is reliably triggered even in the presence of concurrent state changes, preventing operators from being left in an inconsistent state. Additionally, the pull request introduces a debug point for precise testing and includes minor test code cleanups. **Race condition fix and test coverage:** * Fixed a race condition in `PipelineTask::execute()` by reordering the logic to ensure `terminate()` is always called if required, even when another thread updates task state between checks. Added a debug point to simulate the race for testing. * Added a new test `TEST_TERMINATE_RACE_FIX` in `pipeline_task_test.cpp` that uses the debug point to reliably reproduce and verify the race condition fix, ensuring operator termination is not skipped. **Test infrastructure and cleanup:** * Included `debug_points.h` and `common/config.h` in `pipeline_task_test.cpp` to support debug point injection and configuration toggling for the new test. [[1]](diffhunk://#diff-262afd1bf43b83333335fec0b00b65ab0b0241315fd3ceb98c5b3d568971052fR21) [[2]](diffhunk://#diff-262afd1bf43b83333335fec0b00b65ab0b0241315fd3ceb98c5b3d568971052fR36) * Minor formatting cleanup in an existing test case for readability. (cherry picked from commit 2b9a1a5523ba837f28393ba96c4416acd6dd9239) --- be/src/pipeline/pipeline_task.cpp | 32 +++++++++- be/test/pipeline/pipeline_task_test.cpp | 79 +++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index dbe52cbabc0d99..afd40596241738 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -409,15 +409,41 @@ Status PipelineTask::execute(bool* done) { // If task is woke up early, we should terminate all operators, and this task could be closed immediately. if (_wake_up_early) { - terminate(); - THROW_IF_ERROR(_root->terminate(_state)); - THROW_IF_ERROR(_sink->terminate(_state)); _eos = true; *done = true; } else if (_eos && !_spilling && (fragment_context->is_canceled() || !_is_pending_finish())) { + // Debug point for testing the race condition fix: inject set_wake_up_early() + + // terminate() here to simulate Thread B writing A then B between Thread A's two + // reads of _wake_up_early. + DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", { + set_wake_up_early(); + terminate(); + }); *done = true; } + + // NOTE: The terminate() call is intentionally placed AFTER the _is_pending_finish() check + // above, not before. This ordering is critical to avoid a race condition: + // + // Pipeline::make_all_runnable() writes in this order: + // (A) set_wake_up_early() -> (B) terminate() [sets finish_dep._always_ready] + // + // If we checked _wake_up_early (A) before _is_pending_finish() (B), there would be a + // window where Thread A reads _wake_up_early=false, then Thread B writes both A and B, + // then Thread A reads _is_pending_finish()=false (due to _always_ready). Thread A would + // then set *done=true without ever calling operator terminate(), causing close() to run + // on operators that were never properly terminated (e.g. RuntimeFilterProducer still in + // WAITING_FOR_SYNCED_SIZE state when insert() is called). + // + // By reading _is_pending_finish() (B) before the second read of _wake_up_early (A), + // if Thread A observes B's effect (_always_ready=true), it is guaranteed to also observe + // A's effect (_wake_up_early=true) on this second read, ensuring terminate() is called. + if (_wake_up_early) { + terminate(); + THROW_IF_ERROR(_root->terminate(_state)); + THROW_IF_ERROR(_sink->terminate(_state)); + } }}; const auto query_id = _state->query_id(); // If this task is already EOS and block is empty (which means we already output all blocks), diff --git a/be/test/pipeline/pipeline_task_test.cpp b/be/test/pipeline/pipeline_task_test.cpp index 8a8e16e3c4da1d..a629344363a134 100644 --- a/be/test/pipeline/pipeline_task_test.cpp +++ b/be/test/pipeline/pipeline_task_test.cpp @@ -18,6 +18,7 @@ #include #include +#include "common/config.h" #include "common/status.h" #include "dummy_task_queue.h" #include "pipeline/dependency.h" @@ -30,6 +31,7 @@ #include "testutil/mock/mock_thread_mem_tracker_mgr.h" #include "testutil/mock/mock_workload_group_mgr.h" #include "thrift_builder.h" +#include "util/debug_points.h" namespace doris::pipeline { @@ -1186,4 +1188,81 @@ TEST_F(PipelineTaskTest, TEST_INJECT_SHARED_STATE) { } } +// Test for the race condition fix between _wake_up_early and _is_pending_finish(). +// +// The race: Pipeline::make_all_runnable() writes in order (A) set_wake_up_early -> (B) terminate() +// [sets finish_dep._always_ready]. In execute()'s Defer block, if Thread A reads _wake_up_early=false +// (A), then Thread B writes A and B, then Thread A reads _is_pending_finish()=false (due to +// _always_ready from B), Thread A would set *done=true without calling operator terminate(). +// +// The fix: terminate() is called after _is_pending_finish() in the Defer. So if Thread A sees B's +// effect (_always_ready=true), it must also see A's effect (_wake_up_early=true) on the subsequent +// read, ensuring terminate() is always called. +// +// This test uses a debug point injected into the else-if branch to simulate the exact bad timing: +// the debug point fires set_wake_up_early() + terminate() after _is_pending_finish() returns false +// (due to finish_dep being naturally unblocked) but before the second _wake_up_early check. +TEST_F(PipelineTaskTest, TEST_TERMINATE_RACE_FIX) { + auto num_instances = 1; + auto pip_id = 0; + auto task_id = 0; + auto pip = std::make_shared(pip_id, num_instances, num_instances); + { + OperatorPtr source_op; + source_op.reset(new DummyOperator()); + EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok()); + + int op_id = 1; + int node_id = 2; + int dest_id = 3; + DataSinkOperatorPtr sink_op; + sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id)); + EXPECT_TRUE(pip->set_sink(sink_op).ok()); + } + auto profile = std::make_shared("Pipeline : " + std::to_string(pip_id)); + std::map, std::vector>>> + shared_state_map; + _runtime_state->resize_op_id_to_local_state(-1); + auto task = std::make_shared(pip, task_id, _runtime_state.get(), _context, + profile.get(), shared_state_map, task_id); + task->_exec_time_slice = 10'000'000'000ULL; + { + std::vector scan_range; + int sender_id = 0; + TDataSink tsink; + EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); + } + _query_ctx->get_execution_dependency()->set_ready(); + + auto* sink_finish_dep = + _runtime_state->get_sink_local_state()->cast().finishdependency(); + EXPECT_NE(sink_finish_dep, nullptr); + sink_finish_dep->block(); + + task->_operators.front()->cast()._eos = true; + { + bool done = false; + EXPECT_TRUE(task->execute(&done).ok()); + EXPECT_TRUE(task->_eos); + EXPECT_FALSE(done); + EXPECT_FALSE(task->_wake_up_early); + } + + sink_finish_dep->set_ready(); + config::enable_debug_points = true; + DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if"); + { + bool done = false; + EXPECT_TRUE(task->execute(&done).ok()); + EXPECT_TRUE(task->_eos); + EXPECT_TRUE(done); + EXPECT_TRUE(task->_wake_up_early); + EXPECT_TRUE(task->_operators.front()->cast()._terminated); + EXPECT_TRUE(task->_sink->cast()._terminated); + } + DebugPoints::instance()->clear(); + config::enable_debug_points = false; +} + } // namespace doris::pipeline