From 7b6f972e8906824842c202352fa4754a07828910 Mon Sep 17 00:00:00 2001
From: eye-gu <734164350@qq.com>
Date: Fri, 12 Jun 2026 16:58:09 +0800
Subject: [PATCH 1/3] [Fix-18338] check task if it's waiting for TaskGroup slot
when pause/kill
---
.../master/engine/ITaskGroupCoordinator.java | 12 ++++++++++++
.../master/engine/TaskGroupCoordinator.java | 10 ++++++++++
.../statemachine/TaskSubmittedStateAction.java | 18 ++++++++++++++++++
3 files changed, 40 insertions(+)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
index 6fd885a21fbd..34f926866cb9 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
@@ -91,6 +91,18 @@ public interface ITaskGroupCoordinator extends AutoCloseable {
*/
void releaseTaskGroupSlot(TaskInstance taskInstance);
+ /**
+ * Check if the given task instance is currently waiting for a task group slot.
+ *
+ * A task is waiting for a slot if it has an active TaskGroupQueue record with
+ * {@link TaskGroupQueueStatus#WAIT_QUEUE} status. This is a runtime state check,
+ * not a static configuration check like {@link #needAcquireTaskGroupSlot(TaskInstance)}.
+ *
+ * @param taskInstance the task instance
+ * @return true if the task is currently waiting for a task group slot
+ */
+ boolean isTaskWaitingForTaskGroupSlot(final TaskInstance taskInstance);
+
/**
* Close the TaskGroupCoordinator, once closed, the coordinator will not work until you have started the coordinator again.
*/
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
index 1a0312c70f9f..2012e968be28 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
@@ -317,6 +317,16 @@ private void dealWithWaitingTaskGroupQueue() {
}
}
+ @Override
+ public boolean isTaskWaitingForTaskGroupSlot(final TaskInstance taskInstance) {
+ if (!TaskGroupUtils.isUsingTaskGroup(taskInstance)) {
+ return false;
+ }
+ List taskGroupQueues = taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId());
+ return taskGroupQueues.stream()
+ .anyMatch(q -> q.getStatus() == TaskGroupQueueStatus.WAIT_QUEUE);
+ }
+
@Override
public boolean needAcquireTaskGroupSlot(final TaskInstance taskInstance) {
if (taskInstance == null) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
index 22c71cc1d7ce..855b3545fcc5 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
@@ -143,6 +143,15 @@ public void onPauseEvent(final IWorkflowExecution workflowExecution,
taskExecution.getWorkflowEventBus().publish(TaskPausedLifecycleEvent.of(taskExecution));
return;
}
+ // Task not in dispatch queue, check if it's waiting for TaskGroup slot
+ if (taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskExecution.getTaskInstance())) {
+ // Release the TaskGroupQueue first to prevent TaskGroupCoordinator
+ // from concurrently acquiring the slot and racing with this pause
+ taskGroupCoordinator.releaseTaskGroupSlot(taskExecution.getTaskInstance());
+ log.info("Task: {} is waiting for TaskGroup slot, pause it directly", taskExecution.getName());
+ taskExecution.getWorkflowEventBus().publish(TaskPausedLifecycleEvent.of(taskExecution));
+ return;
+ }
log.info("The task[id={}] is submitted and already dispatched, cannot pause, will try to pause it after 5s",
taskExecution.getId());
taskExecution.getWorkflowEventBus()
@@ -167,6 +176,15 @@ public void onKillEvent(final IWorkflowExecution workflowExecution,
taskExecution.getWorkflowEventBus().publish(TaskKilledLifecycleEvent.of(taskExecution));
return;
}
+ // Task not in dispatch queue, check if it's waiting for TaskGroup slot
+ if (taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskExecution.getTaskInstance())) {
+ // Release the TaskGroupQueue first to prevent TaskGroupCoordinator
+ // from concurrently acquiring the slot and racing with this kill
+ taskGroupCoordinator.releaseTaskGroupSlot(taskExecution.getTaskInstance());
+ log.info("Task: {} is waiting for TaskGroup slot, kill it directly", taskExecution.getName());
+ taskExecution.getWorkflowEventBus().publish(TaskKilledLifecycleEvent.of(taskExecution));
+ return;
+ }
log.info("The task[id={}] is submitted and already dispatched, cannot kill, will kill it after 5s",
taskExecution.getId());
taskExecution.getWorkflowEventBus()
From d483c760addb85b3a66d53dba9f32ca935712e1f Mon Sep 17 00:00:00 2001
From: eye-gu <734164350@qq.com>
Date: Fri, 12 Jun 2026 17:10:18 +0800
Subject: [PATCH 2/3] add tests
---
.../engine/TaskGroupCoordinatorTest.java | 32 ++++
.../TaskSubmittedStateActionTest.java | 168 ++++++++++++++++++
2 files changed, 200 insertions(+)
create mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateActionTest.java
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
index 6f4b7d516ca4..98e88be55815 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
@@ -26,6 +26,7 @@
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
@@ -149,6 +150,37 @@ void needToReleaseTaskGroupSlot() {
assertTrue(taskGroupCoordinator.needToReleaseTaskGroupSlot(taskInstance));
}
+ @Test
+ void isTaskWaitingForTaskGroupSlot() {
+ // Task not using task group -> false
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setTaskGroupId(0);
+ assertFalse(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance));
+
+ // Task using task group but no TaskGroupQueue records -> false
+ taskInstance.setTaskGroupId(1);
+ taskInstance.setId(100);
+ when(taskGroupQueueDao.queryByTaskInstanceId(100)).thenReturn(Lists.newArrayList());
+ assertFalse(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance));
+
+ // Task using task group with TaskGroupQueue but no WAIT_QUEUE status -> false
+ TaskGroupQueue acquireSuccessQueue = new TaskGroupQueue();
+ acquireSuccessQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
+ when(taskGroupQueueDao.queryByTaskInstanceId(100)).thenReturn(Lists.newArrayList(acquireSuccessQueue));
+ assertFalse(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance));
+
+ // Task using task group with TaskGroupQueue in WAIT_QUEUE status -> true
+ TaskGroupQueue waitQueue = new TaskGroupQueue();
+ waitQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE);
+ when(taskGroupQueueDao.queryByTaskInstanceId(100)).thenReturn(Lists.newArrayList(waitQueue));
+ assertTrue(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance));
+
+ // Task using task group with mixed statuses including WAIT_QUEUE -> true
+ when(taskGroupQueueDao.queryByTaskInstanceId(100))
+ .thenReturn(Lists.newArrayList(acquireSuccessQueue, waitQueue));
+ assertTrue(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance));
+ }
+
@Test
void releaseTaskGroupSlot() {
// TaskInstance is NULL
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateActionTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateActionTest.java
new file mode 100644
index 000000000000..373561a5db47
--- /dev/null
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateActionTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.task.statemachine;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.server.master.engine.ITaskGroupCoordinator;
+import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
+import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.WorkerGroupDispatcherCoordinator;
+import org.apache.dolphinscheduler.server.master.engine.task.execution.ITaskExecution;
+import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKillLifecycleEvent;
+import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
+import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPauseLifecycleEvent;
+import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPausedLifecycleEvent;
+import org.apache.dolphinscheduler.server.master.engine.workflow.execution.IWorkflowExecution;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+class TaskSubmittedStateActionTest {
+
+ @InjectMocks
+ private TaskSubmittedStateAction taskSubmittedStateAction;
+
+ @Mock
+ private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator;
+
+ @Mock
+ private TaskInstanceDao taskInstanceDao;
+
+ @Mock
+ private ITaskGroupCoordinator taskGroupCoordinator;
+
+ @Mock
+ private IWorkflowExecution workflowExecution;
+
+ @Mock
+ private ITaskExecution taskExecution;
+
+ @Mock
+ private WorkflowEventBus workflowEventBus;
+
+ private TaskInstance taskInstance;
+
+ @BeforeEach
+ void setUp() {
+ taskInstance = new TaskInstance();
+ taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
+ taskInstance.setName("testTask");
+ taskInstance.setId(1);
+
+ when(taskExecution.getTaskInstance()).thenReturn(taskInstance);
+ when(taskExecution.getWorkflowEventBus()).thenReturn(workflowEventBus);
+ when(taskExecution.getName()).thenReturn("testTask");
+ when(taskExecution.getId()).thenReturn(1);
+ }
+
+ @Test
+ void onPauseEvent_taskWaitingForTaskGroupSlot_shouldPauseDirectly() {
+ // Task not in dispatch queue
+ when(workerGroupDispatcherCoordinator.removeTask(taskExecution)).thenReturn(false);
+ // Task is waiting for TaskGroup slot
+ when(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance)).thenReturn(true);
+
+ taskSubmittedStateAction.onPauseEvent(workflowExecution, taskExecution, null);
+
+ verify(taskGroupCoordinator).releaseTaskGroupSlot(taskInstance);
+ verify(workflowEventBus).publish(any(TaskPausedLifecycleEvent.class));
+ // Should NOT publish a delayed TaskPauseLifecycleEvent
+ verify(workflowEventBus, never()).publish(any(TaskPauseLifecycleEvent.class));
+ }
+
+ @Test
+ void onPauseEvent_taskInDispatchQueue_shouldPauseDirectly() {
+ // Task is in dispatch queue
+ when(workerGroupDispatcherCoordinator.removeTask(taskExecution)).thenReturn(true);
+
+ taskSubmittedStateAction.onPauseEvent(workflowExecution, taskExecution, null);
+
+ verify(workflowEventBus).publish(any(TaskPausedLifecycleEvent.class));
+ // Should NOT check TaskGroup since task was removed from dispatch queue
+ verify(taskGroupCoordinator, never()).isTaskWaitingForTaskGroupSlot(any());
+ }
+
+ @Test
+ void onPauseEvent_taskNotInDispatchQueueAndNotWaiting_shouldRetryAfter5s() {
+ // Task not in dispatch queue
+ when(workerGroupDispatcherCoordinator.removeTask(taskExecution)).thenReturn(false);
+ // Task is NOT waiting for TaskGroup slot
+ when(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance)).thenReturn(false);
+
+ taskSubmittedStateAction.onPauseEvent(workflowExecution, taskExecution, null);
+
+ verify(taskGroupCoordinator, never()).releaseTaskGroupSlot(any());
+ // Should publish a delayed TaskPauseLifecycleEvent (retry after 5s)
+ verify(workflowEventBus).publish(any(TaskPauseLifecycleEvent.class));
+ }
+
+ @Test
+ void onKillEvent_taskWaitingForTaskGroupSlot_shouldKillDirectly() {
+ // Task not in dispatch queue
+ when(workerGroupDispatcherCoordinator.removeTask(taskExecution)).thenReturn(false);
+ // Task is waiting for TaskGroup slot
+ when(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance)).thenReturn(true);
+
+ taskSubmittedStateAction.onKillEvent(workflowExecution, taskExecution, null);
+
+ verify(taskGroupCoordinator).releaseTaskGroupSlot(taskInstance);
+ verify(workflowEventBus).publish(any(TaskKilledLifecycleEvent.class));
+ // Should NOT publish a delayed TaskKillLifecycleEvent
+ verify(workflowEventBus, never()).publish(any(TaskKillLifecycleEvent.class));
+ }
+
+ @Test
+ void onKillEvent_taskInDispatchQueue_shouldKillDirectly() {
+ // Task is in dispatch queue
+ when(workerGroupDispatcherCoordinator.removeTask(taskExecution)).thenReturn(true);
+
+ taskSubmittedStateAction.onKillEvent(workflowExecution, taskExecution, null);
+
+ verify(workflowEventBus).publish(any(TaskKilledLifecycleEvent.class));
+ // Should NOT check TaskGroup since task was removed from dispatch queue
+ verify(taskGroupCoordinator, never()).isTaskWaitingForTaskGroupSlot(any());
+ }
+
+ @Test
+ void onKillEvent_taskNotInDispatchQueueAndNotWaiting_shouldRetryAfter5s() {
+ // Task not in dispatch queue
+ when(workerGroupDispatcherCoordinator.removeTask(taskExecution)).thenReturn(false);
+ // Task is NOT waiting for TaskGroup slot
+ when(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance)).thenReturn(false);
+
+ taskSubmittedStateAction.onKillEvent(workflowExecution, taskExecution, null);
+
+ verify(taskGroupCoordinator, never()).releaseTaskGroupSlot(any());
+ // Should publish a delayed TaskKillLifecycleEvent (retry after 5s)
+ verify(workflowEventBus).publish(any(TaskKillLifecycleEvent.class));
+ }
+}
From 2e1b97a1a0bd1372a4dc72780218c0f61e7d56e8 Mon Sep 17 00:00:00 2001
From: eye-gu <734164350@qq.com>
Date: Fri, 12 Jun 2026 18:15:13 +0800
Subject: [PATCH 3/3] add null check
---
.../server/master/engine/TaskGroupCoordinator.java | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
index 2012e968be28..7bbad9c3abb1 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
@@ -319,10 +319,16 @@ private void dealWithWaitingTaskGroupQueue() {
@Override
public boolean isTaskWaitingForTaskGroupSlot(final TaskInstance taskInstance) {
+ if (taskInstance == null || taskInstance.getId() == null) {
+ return false;
+ }
if (!TaskGroupUtils.isUsingTaskGroup(taskInstance)) {
return false;
}
List taskGroupQueues = taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId());
+ if (CollectionUtils.isEmpty(taskGroupQueues)) {
+ return false;
+ }
return taskGroupQueues.stream()
.anyMatch(q -> q.getStatus() == TaskGroupQueueStatus.WAIT_QUEUE);
}