Skip to content

[fix](pipeline) Fix wake up early without terminate call#63539

Open
BiteTheDDDDt wants to merge 1 commit into
apache:branch-4.0from
BiteTheDDDDt:fix-branch40-pipeline-terminate-race
Open

[fix](pipeline) Fix wake up early without terminate call#63539
BiteTheDDDDt wants to merge 1 commit into
apache:branch-4.0from
BiteTheDDDDt:fix-branch40-pipeline-terminate-race

Conversation

@BiteTheDDDDt
Copy link
Copy Markdown
Contributor

What problem does this PR solve?

Issue Number: None

Related PR: #61679

Problem Summary: Backport #61679 to branch-4.0. A pipeline task can race with downstream early wake-up: one thread may observe _wake_up_early as false, then another thread sets _wake_up_early and unblocks finish dependencies, and the first thread later sees _is_pending_finish() as false and finishes without calling operator terminate(). For hash join build tasks this can leave runtime filter producers in WAITING_FOR_SYNCED_SIZE; during close/build, insert() expects WAITING_FOR_DATA and reports an invalid runtime filter producer state. This change moves operator termination after the pending-finish check so the second _wake_up_early read observes the early wake-up and terminates operators before close.

Release note

None

Check List (For Author)

  • Test: Unit Test
    • build-support/check-format.sh be/src/pipeline/pipeline_task.cpp be/test/pipeline/pipeline_task_test.cpp
    • ninja -C be/build_Release src/exec/CMakeFiles/Exec.dir/pipeline/pipeline_task.cpp.o
    • ninja -C be/ut_build_ASAN src/exec/CMakeFiles/Exec.dir/pipeline/pipeline_task.cpp.o test/CMakeFiles/doris_be_test.dir/exec/pipeline/pipeline_task_test.cpp.o test/doris_be_test
    • be/ut_build_ASAN/test/doris_be_test --gtest_filter=PipelineTaskTest.TEST_TERMINATE_RACE_FIX --gtest_print_time=true
  • Behavior changed: No
  • Does this need documentation: No

```
Thread A (正在执行 HashJoin Build Task)           Thread B (下游 pipeline 全部完成)
────────────────────────────────────────           ──────────────────────────────────
Defer 开始执行:
  line 475: 读取 _wake_up_early → false
                                                   decrement_running_task() 触发
                                                   make_all_runnable():
                                                     line 127: set_wake_up_early() → true
                                                     line 132: terminate()
                                                       → finish_dep.set_always_ready()

  line 481: else if (_eos && !_spilling &&
              !_is_pending_finish())
            _is_pending_finish() = false ← 因为 always_ready!
  line 483: *done = true
  ← 注意: _sink->terminate() 从未被调用!

close_task():
  task->close(OK):
```

This pull request addresses a subtle race condition in the pipeline task
execution logic and adds a targeted test to verify the fix. The main
improvement ensures that operator termination is reliably triggered even
in the presence of concurrent state changes, preventing operators from
being left in an inconsistent state. Additionally, the pull request
introduces a debug point for precise testing and includes minor test
code cleanups.

**Race condition fix and test coverage:**

* Fixed a race condition in `PipelineTask::execute()` by reordering the
logic to ensure `terminate()` is always called if required, even when
another thread updates task state between checks. Added a debug point to
simulate the race for testing.
* Added a new test `TEST_TERMINATE_RACE_FIX` in `pipeline_task_test.cpp`
that uses the debug point to reliably reproduce and verify the race
condition fix, ensuring operator termination is not skipped.

**Test infrastructure and cleanup:**

* Included `debug_points.h` and `common/config.h` in
`pipeline_task_test.cpp` to support debug point injection and
configuration toggling for the new test.
[[1]](diffhunk://#diff-262afd1bf43b83333335fec0b00b65ab0b0241315fd3ceb98c5b3d568971052fR21)
[[2]](diffhunk://#diff-262afd1bf43b83333335fec0b00b65ab0b0241315fd3ceb98c5b3d568971052fR36)
* Minor formatting cleanup in an existing test case for readability.

(cherry picked from commit 2b9a1a5)
Copilot AI review requested due to automatic review settings May 22, 2026 09:31
@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR backports the fix from #61679 to branch-4.0 to address a pipeline task race where an early wake-up can cause a task to finish without calling operator terminate(), leaving some operators (e.g., runtime filter producers) in an invalid state during close.

Changes:

  • Reorders termination logic in PipelineTask::execute() so operator termination happens after the pending-finish check, ensuring early wake-up is observed before close.
  • Adds a debug point to deterministically simulate the problematic interleaving.
  • Adds a new unit test (TEST_TERMINATE_RACE_FIX) to reproduce and validate the race fix.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
be/src/pipeline/pipeline_task.cpp Reorders termination relative to _is_pending_finish() and adds a debug point hook to simulate the race.
be/test/pipeline/pipeline_task_test.cpp Adds a new unit test that uses the debug point to validate operators are terminated under the targeted race.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +1252 to +1265
sink_finish_dep->set_ready();
config::enable_debug_points = true;
DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if");
{
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_TRUE(task->_eos);
EXPECT_TRUE(done);
EXPECT_TRUE(task->_wake_up_early);
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
}
DebugPoints::instance()->clear();
config::enable_debug_points = false;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants