Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ public interface ITaskGroupCoordinator extends AutoCloseable {
*/
void releaseTaskGroupSlot(TaskInstance taskInstance);

/**
* Check if the given task instance is currently waiting for a task group slot.
* <p>
* A task is waiting for a slot if it has an active TaskGroupQueue record with
* {@link TaskGroupQueueStatus#WAIT_QUEUE} status. This is a runtime state check,
* not a static configuration check like {@link #needAcquireTaskGroupSlot(TaskInstance)}.
*
* @param taskInstance the task instance
* @return true if the task is currently waiting for a task group slot
*/
boolean isTaskWaitingForTaskGroupSlot(final TaskInstance taskInstance);

/**
* Close the TaskGroupCoordinator, once closed, the coordinator will not work until you have started the coordinator again.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,22 @@ private void dealWithWaitingTaskGroupQueue() {
}
}

@Override
public boolean isTaskWaitingForTaskGroupSlot(final TaskInstance taskInstance) {
if (taskInstance == null || taskInstance.getId() == null) {
return false;
}
if (!TaskGroupUtils.isUsingTaskGroup(taskInstance)) {
return false;
}
List<TaskGroupQueue> taskGroupQueues = taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId());
if (CollectionUtils.isEmpty(taskGroupQueues)) {
return false;
}
return taskGroupQueues.stream()
.anyMatch(q -> q.getStatus() == TaskGroupQueueStatus.WAIT_QUEUE);
}

@Override
public boolean needAcquireTaskGroupSlot(final TaskInstance taskInstance) {
if (taskInstance == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,15 @@ public void onPauseEvent(final IWorkflowExecution workflowExecution,
taskExecution.getWorkflowEventBus().publish(TaskPausedLifecycleEvent.of(taskExecution));
return;
}
// Task not in dispatch queue, check if it's waiting for TaskGroup slot
if (taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskExecution.getTaskInstance())) {
// Release the TaskGroupQueue first to prevent TaskGroupCoordinator
// from concurrently acquiring the slot and racing with this pause
taskGroupCoordinator.releaseTaskGroupSlot(taskExecution.getTaskInstance());
log.info("Task: {} is waiting for TaskGroup slot, pause it directly", taskExecution.getName());
taskExecution.getWorkflowEventBus().publish(TaskPausedLifecycleEvent.of(taskExecution));
return;
}
log.info("The task[id={}] is submitted and already dispatched, cannot pause, will try to pause it after 5s",
taskExecution.getId());
taskExecution.getWorkflowEventBus()
Expand All @@ -167,6 +176,15 @@ public void onKillEvent(final IWorkflowExecution workflowExecution,
taskExecution.getWorkflowEventBus().publish(TaskKilledLifecycleEvent.of(taskExecution));
return;
}
// Task not in dispatch queue, check if it's waiting for TaskGroup slot
if (taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskExecution.getTaskInstance())) {
// Release the TaskGroupQueue first to prevent TaskGroupCoordinator
// from concurrently acquiring the slot and racing with this kill
taskGroupCoordinator.releaseTaskGroupSlot(taskExecution.getTaskInstance());
log.info("Task: {} is waiting for TaskGroup slot, kill it directly", taskExecution.getName());
taskExecution.getWorkflowEventBus().publish(TaskKilledLifecycleEvent.of(taskExecution));
return;
}
log.info("The task[id={}] is submitted and already dispatched, cannot kill, will kill it after 5s",
taskExecution.getId());
taskExecution.getWorkflowEventBus()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.mockito.Mockito.when;

import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
Expand Down Expand Up @@ -149,6 +150,37 @@ void needToReleaseTaskGroupSlot() {
assertTrue(taskGroupCoordinator.needToReleaseTaskGroupSlot(taskInstance));
}

@Test
void isTaskWaitingForTaskGroupSlot() {
// Task not using task group -> false
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskGroupId(0);
assertFalse(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance));

// Task using task group but no TaskGroupQueue records -> false
taskInstance.setTaskGroupId(1);
taskInstance.setId(100);
when(taskGroupQueueDao.queryByTaskInstanceId(100)).thenReturn(Lists.newArrayList());
assertFalse(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance));

// Task using task group with TaskGroupQueue but no WAIT_QUEUE status -> false
TaskGroupQueue acquireSuccessQueue = new TaskGroupQueue();
acquireSuccessQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
when(taskGroupQueueDao.queryByTaskInstanceId(100)).thenReturn(Lists.newArrayList(acquireSuccessQueue));
assertFalse(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance));

// Task using task group with TaskGroupQueue in WAIT_QUEUE status -> true
TaskGroupQueue waitQueue = new TaskGroupQueue();
waitQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE);
when(taskGroupQueueDao.queryByTaskInstanceId(100)).thenReturn(Lists.newArrayList(waitQueue));
assertTrue(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance));

// Task using task group with mixed statuses including WAIT_QUEUE -> true
when(taskGroupQueueDao.queryByTaskInstanceId(100))
.thenReturn(Lists.newArrayList(acquireSuccessQueue, waitQueue));
assertTrue(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance));
}

@Test
void releaseTaskGroupSlot() {
// TaskInstance is NULL
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.engine.task.statemachine;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.engine.ITaskGroupCoordinator;
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.WorkerGroupDispatcherCoordinator;
import org.apache.dolphinscheduler.server.master.engine.task.execution.ITaskExecution;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKillLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPauseLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPausedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.execution.IWorkflowExecution;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class TaskSubmittedStateActionTest {

@InjectMocks
private TaskSubmittedStateAction taskSubmittedStateAction;

@Mock
private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator;

@Mock
private TaskInstanceDao taskInstanceDao;

@Mock
private ITaskGroupCoordinator taskGroupCoordinator;

@Mock
private IWorkflowExecution workflowExecution;

@Mock
private ITaskExecution taskExecution;

@Mock
private WorkflowEventBus workflowEventBus;

private TaskInstance taskInstance;

@BeforeEach
void setUp() {
taskInstance = new TaskInstance();
taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
taskInstance.setName("testTask");
taskInstance.setId(1);

when(taskExecution.getTaskInstance()).thenReturn(taskInstance);
when(taskExecution.getWorkflowEventBus()).thenReturn(workflowEventBus);
when(taskExecution.getName()).thenReturn("testTask");
when(taskExecution.getId()).thenReturn(1);
}

@Test
void onPauseEvent_taskWaitingForTaskGroupSlot_shouldPauseDirectly() {
// Task not in dispatch queue
when(workerGroupDispatcherCoordinator.removeTask(taskExecution)).thenReturn(false);
// Task is waiting for TaskGroup slot
when(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance)).thenReturn(true);

taskSubmittedStateAction.onPauseEvent(workflowExecution, taskExecution, null);

verify(taskGroupCoordinator).releaseTaskGroupSlot(taskInstance);
verify(workflowEventBus).publish(any(TaskPausedLifecycleEvent.class));
// Should NOT publish a delayed TaskPauseLifecycleEvent
verify(workflowEventBus, never()).publish(any(TaskPauseLifecycleEvent.class));
}

@Test
void onPauseEvent_taskInDispatchQueue_shouldPauseDirectly() {
// Task is in dispatch queue
when(workerGroupDispatcherCoordinator.removeTask(taskExecution)).thenReturn(true);

taskSubmittedStateAction.onPauseEvent(workflowExecution, taskExecution, null);

verify(workflowEventBus).publish(any(TaskPausedLifecycleEvent.class));
// Should NOT check TaskGroup since task was removed from dispatch queue
verify(taskGroupCoordinator, never()).isTaskWaitingForTaskGroupSlot(any());
}

@Test
void onPauseEvent_taskNotInDispatchQueueAndNotWaiting_shouldRetryAfter5s() {
// Task not in dispatch queue
when(workerGroupDispatcherCoordinator.removeTask(taskExecution)).thenReturn(false);
// Task is NOT waiting for TaskGroup slot
when(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance)).thenReturn(false);

taskSubmittedStateAction.onPauseEvent(workflowExecution, taskExecution, null);

verify(taskGroupCoordinator, never()).releaseTaskGroupSlot(any());
// Should publish a delayed TaskPauseLifecycleEvent (retry after 5s)
verify(workflowEventBus).publish(any(TaskPauseLifecycleEvent.class));
}

@Test
void onKillEvent_taskWaitingForTaskGroupSlot_shouldKillDirectly() {
// Task not in dispatch queue
when(workerGroupDispatcherCoordinator.removeTask(taskExecution)).thenReturn(false);
// Task is waiting for TaskGroup slot
when(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance)).thenReturn(true);

taskSubmittedStateAction.onKillEvent(workflowExecution, taskExecution, null);

verify(taskGroupCoordinator).releaseTaskGroupSlot(taskInstance);
verify(workflowEventBus).publish(any(TaskKilledLifecycleEvent.class));
// Should NOT publish a delayed TaskKillLifecycleEvent
verify(workflowEventBus, never()).publish(any(TaskKillLifecycleEvent.class));
}

@Test
void onKillEvent_taskInDispatchQueue_shouldKillDirectly() {
// Task is in dispatch queue
when(workerGroupDispatcherCoordinator.removeTask(taskExecution)).thenReturn(true);

taskSubmittedStateAction.onKillEvent(workflowExecution, taskExecution, null);

verify(workflowEventBus).publish(any(TaskKilledLifecycleEvent.class));
// Should NOT check TaskGroup since task was removed from dispatch queue
verify(taskGroupCoordinator, never()).isTaskWaitingForTaskGroupSlot(any());
}

@Test
void onKillEvent_taskNotInDispatchQueueAndNotWaiting_shouldRetryAfter5s() {
// Task not in dispatch queue
when(workerGroupDispatcherCoordinator.removeTask(taskExecution)).thenReturn(false);
// Task is NOT waiting for TaskGroup slot
when(taskGroupCoordinator.isTaskWaitingForTaskGroupSlot(taskInstance)).thenReturn(false);

taskSubmittedStateAction.onKillEvent(workflowExecution, taskExecution, null);

verify(taskGroupCoordinator, never()).releaseTaskGroupSlot(any());
// Should publish a delayed TaskKillLifecycleEvent (retry after 5s)
verify(workflowEventBus).publish(any(TaskKillLifecycleEvent.class));
}
}
Loading