[fix](pipeline) Fix wake up early without terminate call#63539
Open
BiteTheDDDDt wants to merge 1 commit into
Open
[fix](pipeline) Fix wake up early without terminate call#63539BiteTheDDDDt wants to merge 1 commit into
BiteTheDDDDt wants to merge 1 commit into
Conversation
```
Thread A (正在执行 HashJoin Build Task) Thread B (下游 pipeline 全部完成)
──────────────────────────────────────── ──────────────────────────────────
Defer 开始执行:
line 475: 读取 _wake_up_early → false
decrement_running_task() 触发
make_all_runnable():
line 127: set_wake_up_early() → true
line 132: terminate()
→ finish_dep.set_always_ready()
line 481: else if (_eos && !_spilling &&
!_is_pending_finish())
_is_pending_finish() = false ← 因为 always_ready!
line 483: *done = true
← 注意: _sink->terminate() 从未被调用!
close_task():
task->close(OK):
```
This pull request addresses a subtle race condition in the pipeline task
execution logic and adds a targeted test to verify the fix. The main
improvement ensures that operator termination is reliably triggered even
in the presence of concurrent state changes, preventing operators from
being left in an inconsistent state. Additionally, the pull request
introduces a debug point for precise testing and includes minor test
code cleanups.
**Race condition fix and test coverage:**
* Fixed a race condition in `PipelineTask::execute()` by reordering the
logic to ensure `terminate()` is always called if required, even when
another thread updates task state between checks. Added a debug point to
simulate the race for testing.
* Added a new test `TEST_TERMINATE_RACE_FIX` in `pipeline_task_test.cpp`
that uses the debug point to reliably reproduce and verify the race
condition fix, ensuring operator termination is not skipped.
**Test infrastructure and cleanup:**
* Included `debug_points.h` and `common/config.h` in
`pipeline_task_test.cpp` to support debug point injection and
configuration toggling for the new test.
[[1]](diffhunk://#diff-262afd1bf43b83333335fec0b00b65ab0b0241315fd3ceb98c5b3d568971052fR21)
[[2]](diffhunk://#diff-262afd1bf43b83333335fec0b00b65ab0b0241315fd3ceb98c5b3d568971052fR36)
* Minor formatting cleanup in an existing test case for readability.
(cherry picked from commit 2b9a1a5)
Contributor
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
Contributor
There was a problem hiding this comment.
Pull request overview
This PR backports the fix from #61679 to branch-4.0 to address a pipeline task race where an early wake-up can cause a task to finish without calling operator terminate(), leaving some operators (e.g., runtime filter producers) in an invalid state during close.
Changes:
- Reorders termination logic in
PipelineTask::execute()so operator termination happens after the pending-finish check, ensuring early wake-up is observed before close. - Adds a debug point to deterministically simulate the problematic interleaving.
- Adds a new unit test (
TEST_TERMINATE_RACE_FIX) to reproduce and validate the race fix.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| be/src/pipeline/pipeline_task.cpp | Reorders termination relative to _is_pending_finish() and adds a debug point hook to simulate the race. |
| be/test/pipeline/pipeline_task_test.cpp | Adds a new unit test that uses the debug point to validate operators are terminated under the targeted race. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+1252
to
+1265
| sink_finish_dep->set_ready(); | ||
| config::enable_debug_points = true; | ||
| DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if"); | ||
| { | ||
| bool done = false; | ||
| EXPECT_TRUE(task->execute(&done).ok()); | ||
| EXPECT_TRUE(task->_eos); | ||
| EXPECT_TRUE(done); | ||
| EXPECT_TRUE(task->_wake_up_early); | ||
| EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated); | ||
| EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated); | ||
| } | ||
| DebugPoints::instance()->clear(); | ||
| config::enable_debug_points = false; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What problem does this PR solve?
Issue Number: None
Related PR: #61679
Problem Summary: Backport #61679 to branch-4.0. A pipeline task can race with downstream early wake-up: one thread may observe
_wake_up_earlyas false, then another thread sets_wake_up_earlyand unblocks finish dependencies, and the first thread later sees_is_pending_finish()as false and finishes without calling operatorterminate(). For hash join build tasks this can leave runtime filter producers inWAITING_FOR_SYNCED_SIZE; during close/build,insert()expectsWAITING_FOR_DATAand reports an invalid runtime filter producer state. This change moves operator termination after the pending-finish check so the second_wake_up_earlyread observes the early wake-up and terminates operators before close.Release note
None
Check List (For Author)
build-support/check-format.sh be/src/pipeline/pipeline_task.cpp be/test/pipeline/pipeline_task_test.cppninja -C be/build_Release src/exec/CMakeFiles/Exec.dir/pipeline/pipeline_task.cpp.oninja -C be/ut_build_ASAN src/exec/CMakeFiles/Exec.dir/pipeline/pipeline_task.cpp.o test/CMakeFiles/doris_be_test.dir/exec/pipeline/pipeline_task_test.cpp.o test/doris_be_testbe/ut_build_ASAN/test/doris_be_test --gtest_filter=PipelineTaskTest.TEST_TERMINATE_RACE_FIX --gtest_print_time=true