Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,15 +409,41 @@ Status PipelineTask::execute(bool* done) {

// If task is woke up early, we should terminate all operators, and this task could be closed immediately.
if (_wake_up_early) {
terminate();
THROW_IF_ERROR(_root->terminate(_state));
THROW_IF_ERROR(_sink->terminate(_state));
_eos = true;
*done = true;
} else if (_eos && !_spilling &&
(fragment_context->is_canceled() || !_is_pending_finish())) {
// Debug point for testing the race condition fix: inject set_wake_up_early() +
// terminate() here to simulate Thread B writing A then B between Thread A's two
// reads of _wake_up_early.
DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
set_wake_up_early();
terminate();
});
*done = true;
}

// NOTE: The terminate() call is intentionally placed AFTER the _is_pending_finish() check
// above, not before. This ordering is critical to avoid a race condition:
//
// Pipeline::make_all_runnable() writes in this order:
// (A) set_wake_up_early() -> (B) terminate() [sets finish_dep._always_ready]
//
// If we checked _wake_up_early (A) before _is_pending_finish() (B), there would be a
// window where Thread A reads _wake_up_early=false, then Thread B writes both A and B,
// then Thread A reads _is_pending_finish()=false (due to _always_ready). Thread A would
// then set *done=true without ever calling operator terminate(), causing close() to run
// on operators that were never properly terminated (e.g. RuntimeFilterProducer still in
// WAITING_FOR_SYNCED_SIZE state when insert() is called).
//
// By reading _is_pending_finish() (B) before the second read of _wake_up_early (A),
// if Thread A observes B's effect (_always_ready=true), it is guaranteed to also observe
// A's effect (_wake_up_early=true) on this second read, ensuring terminate() is called.
if (_wake_up_early) {
terminate();
THROW_IF_ERROR(_root->terminate(_state));
THROW_IF_ERROR(_sink->terminate(_state));
}
}};
const auto query_id = _state->query_id();
// If this task is already EOS and block is empty (which means we already output all blocks),
Expand Down
79 changes: 79 additions & 0 deletions be/test/pipeline/pipeline_task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <glog/logging.h>
#include <gtest/gtest.h>

#include "common/config.h"
#include "common/status.h"
#include "dummy_task_queue.h"
#include "pipeline/dependency.h"
Expand All @@ -30,6 +31,7 @@
#include "testutil/mock/mock_thread_mem_tracker_mgr.h"
#include "testutil/mock/mock_workload_group_mgr.h"
#include "thrift_builder.h"
#include "util/debug_points.h"

namespace doris::pipeline {

Expand Down Expand Up @@ -1186,4 +1188,81 @@ TEST_F(PipelineTaskTest, TEST_INJECT_SHARED_STATE) {
}
}

// Test for the race condition fix between _wake_up_early and _is_pending_finish().
//
// The race: Pipeline::make_all_runnable() writes in order (A) set_wake_up_early -> (B) terminate()
// [sets finish_dep._always_ready]. In execute()'s Defer block, if Thread A reads _wake_up_early=false
// (A), then Thread B writes A and B, then Thread A reads _is_pending_finish()=false (due to
// _always_ready from B), Thread A would set *done=true without calling operator terminate().
//
// The fix: terminate() is called after _is_pending_finish() in the Defer. So if Thread A sees B's
// effect (_always_ready=true), it must also see A's effect (_wake_up_early=true) on the subsequent
// read, ensuring terminate() is always called.
//
// This test uses a debug point injected into the else-if branch to simulate the exact bad timing:
// the debug point fires set_wake_up_early() + terminate() after _is_pending_finish() returns false
// (due to finish_dep being naturally unblocked) but before the second _wake_up_early check.
TEST_F(PipelineTaskTest, TEST_TERMINATE_RACE_FIX) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());

int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
}
_query_ctx->get_execution_dependency()->set_ready();

auto* sink_finish_dep =
_runtime_state->get_sink_local_state()->cast<DummySinkLocalState>().finishdependency();
EXPECT_NE(sink_finish_dep, nullptr);
sink_finish_dep->block();

task->_operators.front()->cast<DummyOperator>()._eos = true;
{
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_TRUE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
}

sink_finish_dep->set_ready();
config::enable_debug_points = true;
DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if");
{
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_TRUE(task->_eos);
EXPECT_TRUE(done);
EXPECT_TRUE(task->_wake_up_early);
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
}
DebugPoints::instance()->clear();
config::enable_debug_points = false;
Comment on lines +1252 to +1265
}

} // namespace doris::pipeline
Loading