diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java index 6fd885a21fbd..34f926866cb9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java @@ -91,6 +91,18 @@ public interface ITaskGroupCoordinator extends AutoCloseable { */ void releaseTaskGroupSlot(TaskInstance taskInstance); + /** + * Check if the given task instance is currently waiting for a task group slot. + *

+ * A task is waiting for a slot if it has an active TaskGroupQueue record with + * {@link TaskGroupQueueStatus#WAIT_QUEUE} status. This is a runtime state check, + * not a static configuration check like {@link #needAcquireTaskGroupSlot(TaskInstance)}. + * + * @param taskInstance the task instance + * @return true if the task is currently waiting for a task group slot + */ + boolean isTaskWaitingForTaskGroupSlot(final TaskInstance taskInstance); + /** * Close the TaskGroupCoordinator, once closed, the coordinator will not work until you have started the coordinator again. */ diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java index 1a0312c70f9f..7bbad9c3abb1 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java @@ -317,6 +317,22 @@ private void dealWithWaitingTaskGroupQueue() { } } + @Override + public boolean isTaskWaitingForTaskGroupSlot(final TaskInstance taskInstance) { + if (taskInstance == null || taskInstance.getId() == null) { + return false; + } + if (!TaskGroupUtils.isUsingTaskGroup(taskInstance)) { + return false; + } + List taskGroupQueues = taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId()); + if (CollectionUtils.isEmpty(taskGroupQueues)) { + return false; + } + return taskGroupQueues.stream() + .anyMatch(q -> q.getStatus() == TaskGroupQueueStatus.WAIT_QUEUE); + } + @Override public boolean needAcquireTaskGroupSlot(final TaskInstance taskInstance) { if (taskInstance == null) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java index 22c71cc1d7ce..855b3545fcc5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java @@ -143,6 +143,15 @@ public void onPauseEvent(final IWorkflowExecution workflowExecution, taskExecution.getWorkflowEventBus().publish(TaskPausedLifecycleEvent.of(taskExecution)); return; } + // Task not in dispatch queue, check if it's waiting for TaskGroup slot + if (taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskExecution.getTaskInstance())) { + // Release the TaskGroupQueue first to prevent TaskGroupCoordinator + // from concurrently acquiring the slot and racing with this pause + taskGroupCoordinator.releaseTaskGroupSlot(taskExecution.getTaskInstance()); + log.info("Task: {} is waiting for TaskGroup slot, pause it directly", taskExecution.getName()); + taskExecution.getWorkflowEventBus().publish(TaskPausedLifecycleEvent.of(taskExecution)); + return; + } log.info("The task[id={}] is submitted and already dispatched, cannot pause, will try to pause it after 5s", taskExecution.getId()); taskExecution.getWorkflowEventBus() @@ -167,6 +176,15 @@ public void onKillEvent(final IWorkflowExecution workflowExecution, taskExecution.getWorkflowEventBus().publish(TaskKilledLifecycleEvent.of(taskExecution)); return; } + // Task not in dispatch queue, check if it's waiting for TaskGroup slot + if (taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskExecution.getTaskInstance())) { + // Release the TaskGroupQueue first to prevent TaskGroupCoordinator + // from concurrently acquiring the slot and racing with this kill + taskGroupCoordinator.releaseTaskGroupSlot(taskExecution.getTaskInstance()); + log.info("Task: {} is waiting for TaskGroup slot, kill it directly", taskExecution.getName()); + taskExecution.getWorkflowEventBus().publish(TaskKilledLifecycleEvent.of(taskExecution)); + return; + } log.info("The task[id={}] is submitted and already dispatched, cannot kill, will kill it after 5s", taskExecution.getId()); taskExecution.getWorkflowEventBus() diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java index 6f4b7d516ca4..98e88be55815 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.when; import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskGroup; import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; @@ -149,6 +150,37 @@ void needToReleaseTaskGroupSlot() { assertTrue(taskGroupCoordinator.needToReleaseTaskGroupSlot(taskInstance)); } + @Test + void isTaskWaitingForTaskGroupSlot() { + // Task not using task group -> false + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setTaskGroupId(0); + assertFalse(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance)); + + // Task using task group but no TaskGroupQueue records -> false + taskInstance.setTaskGroupId(1); + taskInstance.setId(100); + when(taskGroupQueueDao.queryByTaskInstanceId(100)).thenReturn(Lists.newArrayList()); + assertFalse(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance)); + + // Task using task group with TaskGroupQueue but no WAIT_QUEUE status -> false + TaskGroupQueue acquireSuccessQueue = new TaskGroupQueue(); + acquireSuccessQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); + when(taskGroupQueueDao.queryByTaskInstanceId(100)).thenReturn(Lists.newArrayList(acquireSuccessQueue)); + assertFalse(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance)); + + // Task using task group with TaskGroupQueue in WAIT_QUEUE status -> true + TaskGroupQueue waitQueue = new TaskGroupQueue(); + waitQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE); + when(taskGroupQueueDao.queryByTaskInstanceId(100)).thenReturn(Lists.newArrayList(waitQueue)); + assertTrue(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance)); + + // Task using task group with mixed statuses including WAIT_QUEUE -> true + when(taskGroupQueueDao.queryByTaskInstanceId(100)) + .thenReturn(Lists.newArrayList(acquireSuccessQueue, waitQueue)); + assertTrue(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance)); + } + @Test void releaseTaskGroupSlot() { // TaskInstance is NULL diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateActionTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateActionTest.java new file mode 100644 index 000000000000..373561a5db47 --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateActionTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.task.statemachine; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.server.master.engine.ITaskGroupCoordinator; +import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; +import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.WorkerGroupDispatcherCoordinator; +import org.apache.dolphinscheduler.server.master.engine.task.execution.ITaskExecution; +import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKillLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPauseLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPausedLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.execution.IWorkflowExecution; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class TaskSubmittedStateActionTest { + + @InjectMocks + private TaskSubmittedStateAction taskSubmittedStateAction; + + @Mock + private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator; + + @Mock + private TaskInstanceDao taskInstanceDao; + + @Mock + private ITaskGroupCoordinator taskGroupCoordinator; + + @Mock + private IWorkflowExecution workflowExecution; + + @Mock + private ITaskExecution taskExecution; + + @Mock + private WorkflowEventBus workflowEventBus; + + private TaskInstance taskInstance; + + @BeforeEach + void setUp() { + taskInstance = new TaskInstance(); + taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS); + taskInstance.setName("testTask"); + taskInstance.setId(1); + + when(taskExecution.getTaskInstance()).thenReturn(taskInstance); + when(taskExecution.getWorkflowEventBus()).thenReturn(workflowEventBus); + when(taskExecution.getName()).thenReturn("testTask"); + when(taskExecution.getId()).thenReturn(1); + } + + @Test + void onPauseEvent_taskWaitingForTaskGroupSlot_shouldPauseDirectly() { + // Task not in dispatch queue + when(workerGroupDispatcherCoordinator.removeTask(taskExecution)).thenReturn(false); + // Task is waiting for TaskGroup slot + when(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance)).thenReturn(true); + + taskSubmittedStateAction.onPauseEvent(workflowExecution, taskExecution, null); + + verify(taskGroupCoordinator).releaseTaskGroupSlot(taskInstance); + verify(workflowEventBus).publish(any(TaskPausedLifecycleEvent.class)); + // Should NOT publish a delayed TaskPauseLifecycleEvent + verify(workflowEventBus, never()).publish(any(TaskPauseLifecycleEvent.class)); + } + + @Test + void onPauseEvent_taskInDispatchQueue_shouldPauseDirectly() { + // Task is in dispatch queue + when(workerGroupDispatcherCoordinator.removeTask(taskExecution)).thenReturn(true); + + taskSubmittedStateAction.onPauseEvent(workflowExecution, taskExecution, null); + + verify(workflowEventBus).publish(any(TaskPausedLifecycleEvent.class)); + // Should NOT check TaskGroup since task was removed from dispatch queue + verify(taskGroupCoordinator, never()).isTaskWaitingForTaskGroupSlot(any()); + } + + @Test + void onPauseEvent_taskNotInDispatchQueueAndNotWaiting_shouldRetryAfter5s() { + // Task not in dispatch queue + when(workerGroupDispatcherCoordinator.removeTask(taskExecution)).thenReturn(false); + // Task is NOT waiting for TaskGroup slot + when(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance)).thenReturn(false); + + taskSubmittedStateAction.onPauseEvent(workflowExecution, taskExecution, null); + + verify(taskGroupCoordinator, never()).releaseTaskGroupSlot(any()); + // Should publish a delayed TaskPauseLifecycleEvent (retry after 5s) + verify(workflowEventBus).publish(any(TaskPauseLifecycleEvent.class)); + } + + @Test + void onKillEvent_taskWaitingForTaskGroupSlot_shouldKillDirectly() { + // Task not in dispatch queue + when(workerGroupDispatcherCoordinator.removeTask(taskExecution)).thenReturn(false); + // Task is waiting for TaskGroup slot + when(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance)).thenReturn(true); + + taskSubmittedStateAction.onKillEvent(workflowExecution, taskExecution, null); + + verify(taskGroupCoordinator).releaseTaskGroupSlot(taskInstance); + verify(workflowEventBus).publish(any(TaskKilledLifecycleEvent.class)); + // Should NOT publish a delayed TaskKillLifecycleEvent + verify(workflowEventBus, never()).publish(any(TaskKillLifecycleEvent.class)); + } + + @Test + void onKillEvent_taskInDispatchQueue_shouldKillDirectly() { + // Task is in dispatch queue + when(workerGroupDispatcherCoordinator.removeTask(taskExecution)).thenReturn(true); + + taskSubmittedStateAction.onKillEvent(workflowExecution, taskExecution, null); + + verify(workflowEventBus).publish(any(TaskKilledLifecycleEvent.class)); + // Should NOT check TaskGroup since task was removed from dispatch queue + verify(taskGroupCoordinator, never()).isTaskWaitingForTaskGroupSlot(any()); + } + + @Test + void onKillEvent_taskNotInDispatchQueueAndNotWaiting_shouldRetryAfter5s() { + // Task not in dispatch queue + when(workerGroupDispatcherCoordinator.removeTask(taskExecution)).thenReturn(false); + // Task is NOT waiting for TaskGroup slot + when(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance)).thenReturn(false); + + taskSubmittedStateAction.onKillEvent(workflowExecution, taskExecution, null); + + verify(taskGroupCoordinator, never()).releaseTaskGroupSlot(any()); + // Should publish a delayed TaskKillLifecycleEvent (retry after 5s) + verify(workflowEventBus).publish(any(TaskKillLifecycleEvent.class)); + } +}