taskInstances = repository.queryTaskInstance(workflow);
+ Assertions
+ .assertThat(taskInstances)
+ .hasSize(2)
+ .allMatch(taskInstance -> TaskExecutionStatus.SUCCESS.equals(taskInstance.getState()) &&
+ taskInstance.getTaskGroupId() == context.getTaskGroups().get(0).getId());
+
+ final TaskInstance taskA = taskInstances.stream()
+ .filter(t -> "A".equals(t.getName()))
+ .findFirst().get();
+ final TaskInstance taskB = taskInstances.stream()
+ .filter(t -> "B".equals(t.getName()))
+ .findFirst().get();
+ // TaskA's task group priority is smaller than B
+ Assertions.assertThat(taskA.getStartTime()).isAfter(taskB.getStartTime());
+ Assertions.assertThat(taskA.getEndTime()).isAfter(taskB.getEndTime());
+
+ });
+
+ masterContainer.assertAllResourceReleased();
+ }
+
+}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
deleted file mode 100644
index d62c52f45d4a..000000000000
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++ /dev/null
@@ -1,1967 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.integration.cases;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.awaitility.Awaitility.await;
-
-import org.apache.dolphinscheduler.common.enums.AlertType;
-import org.apache.dolphinscheduler.common.enums.FailureStrategy;
-import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.common.enums.TaskDependType;
-import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
-import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
-import org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam;
-import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
-import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
-import org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
-import org.apache.dolphinscheduler.server.master.config.TaskDispatchPolicy;
-import org.apache.dolphinscheduler.server.master.integration.WorkflowOperator;
-import org.apache.dolphinscheduler.server.master.integration.WorkflowTestCaseContext;
-
-import org.apache.commons.lang3.time.DateUtils;
-
-import java.time.Duration;
-import java.util.List;
-import java.util.function.Consumer;
-
-import org.assertj.core.api.Assertions;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-
-import com.google.common.collect.Lists;
-
-/**
- * The integration test for starting a workflow from workflow definition.
- * In each test method, will create different workflow from yaml, and then trigger it, and do assertions.
- *
The method name should be clear to describe the test scenario.
- */
-public class WorkflowStartTestCase extends AbstractMasterIntegrationTestCase {
-
- @Test
- @DisplayName("Test start a workflow with one fake task(A) success")
- public void testStartWorkflow_with_oneSuccessTask() {
- final String yaml = "/it/start/workflow_with_one_fake_task_success.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
- .matches(
- workflowInstance -> workflowInstance.getDryRun() == Flag.NO.getCode());
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- assertThat(taskInstance.getDryRun()).isEqualTo(Flag.NO.getCode());
- });
- });
-
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one fake task(A) dry run success")
- public void testStartWorkflow_with_oneSuccessTaskDryRun() {
- final String yaml = "/it/start/workflow_with_one_fake_task_success.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .dryRun(Flag.YES)
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
- .matches(
- workflowInstance -> workflowInstance.getDryRun() == Flag.YES.getCode());
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- assertThat(taskInstance.getDryRun()).isEqualTo(Flag.YES.getCode());
- });
- });
-
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with two fake task(A) has the same name")
- public void testStartWorkflow_contains_duplicateTaskName() {
- final String yaml = "/it/start/workflow_with_duplicate_task_name.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
- .isEqualTo(WorkflowExecutionStatus.FAILURE);
- assertThat(repository.queryTaskInstance(workflowInstanceId)).isEmpty();
- });
-
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one fake task(A) using serial wait strategy")
- public void testStartWorkflow_with_serialWaitStrategy() {
- final String yaml = "/it/start/workflow_with_serial_wait_strategy.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- final Integer workflowInstanceId1 = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
- final Integer workflowInstanceId2 = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
- final Integer workflowInstanceId3 = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- assertThat(repository.queryWorkflowInstance(workflowInstanceId1).getState())
- .isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
- assertThat(repository.queryWorkflowInstance(workflowInstanceId2).getState())
- .isEqualTo(WorkflowExecutionStatus.SERIAL_WAIT);
- assertThat(repository.queryWorkflowInstance(workflowInstanceId3).getState())
- .isEqualTo(WorkflowExecutionStatus.SERIAL_WAIT);
- });
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- final WorkflowInstance workflowInstance1 = repository.queryWorkflowInstance(workflowInstanceId1);
- final WorkflowInstance workflowInstance2 = repository.queryWorkflowInstance(workflowInstanceId2);
- final WorkflowInstance workflowInstance3 = repository.queryWorkflowInstance(workflowInstanceId3);
- assertThat(workflowInstance1.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
- assertThat(workflowInstance2.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
- assertThat(workflowInstance2.getEndTime())
- .isAtLeast(DateUtils.addSeconds(workflowInstance1.getEndTime(), 5));
- assertThat(workflowInstance3.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
- assertThat(workflowInstance3.getEndTime())
- .isAtLeast(DateUtils.addSeconds(workflowInstance2.getEndTime(), 5));
- });
-
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one fake task(A) using serial discard strategy")
- public void testStartWorkflow_with_serialDiscardStrategy() {
- final String yaml = "/it/start/workflow_with_serial_discard_strategy.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- final Integer workflowInstanceId1 = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
- final Integer workflowInstanceId2 = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
- final Integer workflowInstanceId3 = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- final WorkflowInstance workflowInstance1 = repository.queryWorkflowInstance(workflowInstanceId1);
- final WorkflowInstance workflowInstance2 = repository.queryWorkflowInstance(workflowInstanceId2);
- final WorkflowInstance workflowInstance3 = repository.queryWorkflowInstance(workflowInstanceId3);
- assertThat(workflowInstance1.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
- assertThat(workflowInstance2.getState()).isEqualTo(WorkflowExecutionStatus.STOP);
- assertThat(workflowInstance2.getEndTime()).isNotNull();
- assertThat(workflowInstance2.getEndTime()).isAtLeast(workflowInstance2.getStartTime());
- assertThat(workflowInstance3.getState()).isEqualTo(WorkflowExecutionStatus.STOP);
- assertThat(workflowInstance3.getEndTime()).isNotNull();
- assertThat(workflowInstance3.getEndTime()).isAtLeast(workflowInstance3.getStartTime());
- });
-
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one fake task(A) using serial priority strategy")
- public void testStartWorkflow_with_serialPriorityStrategy() {
- final String yaml = "/it/start/workflow_with_serial_priority_strategy.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- final Integer workflowInstanceId1 = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
- final Integer workflowInstanceId2 = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
- final Integer workflowInstanceId3 = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- final WorkflowInstance workflowInstance1 = repository.queryWorkflowInstance(workflowInstanceId1);
- final WorkflowInstance workflowInstance2 = repository.queryWorkflowInstance(workflowInstanceId2);
- final WorkflowInstance workflowInstance3 = repository.queryWorkflowInstance(workflowInstanceId3);
- assertThat(workflowInstance1.getState()).isEqualTo(WorkflowExecutionStatus.STOP);
- assertThat(workflowInstance1.getEndTime()).isNotNull();
- assertThat(workflowInstance1.getEndTime()).isAtLeast(workflowInstance1.getStartTime());
- assertThat(workflowInstance2.getState()).isEqualTo(WorkflowExecutionStatus.STOP);
- assertThat(workflowInstance2.getEndTime()).isNotNull();
- assertThat(workflowInstance2.getEndTime()).isAtLeast(workflowInstance2.getStartTime());
- assertThat(workflowInstance3.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
- });
-
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with three fake task(A) using end failure strategy")
- public void testStartWorkflow_with_threeFakeTask_usingFailureStrategyEnd() {
- final String yaml = "/it/start/workflow_with_three_parallel_fake_task_using_failure_strategy.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .failureStrategy(FailureStrategy.END)
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
- .isEqualTo(WorkflowExecutionStatus.FAILURE);
- Assertions.assertThat(repository.queryTaskInstance(workflow))
- .hasSize(3)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("B");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("C");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- });
-
- });
-
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with two fake task(A) using task group")
- public void testStartWorkflow_with_successTaskUsingTaskGroup() {
- final String yaml = "/it/start/workflow_with_fake_tasks_using_task_group.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
-
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(2))
- .atLeast(Duration.ofSeconds(20))
- .untilAsserted(() -> {
- final List taskInstances = repository.queryTaskInstance(workflow);
- Assertions
- .assertThat(taskInstances)
- .hasSize(2)
- .allMatch(taskInstance -> TaskExecutionStatus.SUCCESS.equals(taskInstance.getState()) &&
- taskInstance.getTaskGroupId() == context.getTaskGroups().get(0).getId());
-
- final TaskInstance taskA = taskInstances.stream()
- .filter(t -> "A".equals(t.getName()))
- .findFirst().get();
- final TaskInstance taskB = taskInstances.stream()
- .filter(t -> "B".equals(t.getName()))
- .findFirst().get();
- // TaskA's task group priority is smaller than B
- Assertions.assertThat(taskA.getStartTime()).isAfter(taskB.getStartTime());
- Assertions.assertThat(taskA.getEndTime()).isAfter(taskB.getEndTime());
-
- });
-
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one fake task(A) using environment config")
- public void testStartWorkflow_with_oneSuccessTaskUsingEnvironmentConfig() {
- final String yaml = "/it/start/workflow_with_one_fake_task_using_environment_success.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
-
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
-
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one success switch task and two fake task")
- public void testStartWorkflow_with_oneSuccessSwitch_twoFakeTask() {
- final String yaml = "/it/start/workflow_with_one_success_switch_two_fake_task.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(parentWorkflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .failureStrategy(FailureStrategy.CONTINUE)
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
-
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
- .matches(
- workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO)
- .matches(
- workflowInstance -> workflowInstance.getDryRun() == Flag.NO.getCode());
-
- Assertions
- .assertThat(repository.queryTaskInstance(workflowInstanceId))
- .hasSize(2)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("switch_task");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("success_branch");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
-
- });
-
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one failed switch task and two fake task")
- public void testStartWorkflow_with_oneFailedSwitch_twoFakeTask() {
- final String yaml = "/it/start/workflow_with_one_failed_switch_two_fake_task.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(parentWorkflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
-
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
- .matches(
- workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO)
- .matches(
- workflowInstance -> workflowInstance.getDryRun() == Flag.NO.getCode());
-
- Assertions
- .assertThat(repository.queryTaskInstance(workflowInstanceId))
- .hasSize(2)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("switch_task");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("default_branch");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
-
- });
-
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one sub workflow task(A) success")
- public void testStartWorkflow_with_subWorkflowTask_success() {
- final String yaml = "/it/start/workflow_with_sub_workflow_task_success.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(parentWorkflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
-
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
- .matches(
- workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO)
- .matches(
- workflowInstance -> workflowInstance.getDryRun() == Flag.NO.getCode());
-
- final List subWorkflowInstance =
- repository.queryWorkflowInstance(context.getWorkflows().get(1));
- Assertions
- .assertThat(subWorkflowInstance)
- .hasSize(1)
- .satisfiesExactly(workflowInstance -> {
- assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
- assertThat(workflowInstance.getIsSubWorkflow()).isEqualTo(Flag.YES);
- assertThat(workflowInstance.getDryRun()).isEqualTo(Flag.NO.getCode());
- });
-
- Assertions
- .assertThat(repository.queryTaskInstance(workflowInstanceId))
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("sub_logic_task");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
-
- Assertions
- .assertThat(repository.queryTaskInstance(subWorkflowInstance.get(0).getId()))
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("fake_task");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
-
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one sub workflow task(A) dry run, will not execute")
- public void testStartWorkflow_with_subWorkflowTask_dryRunSuccess() {
- final String yaml = "/it/start/workflow_with_sub_workflow_task_success.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(parentWorkflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .dryRun(Flag.YES)
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
-
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
- .matches(
- workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO)
- .matches(
- workflowInstance -> workflowInstance.getDryRun() == Flag.YES.getCode());
-
- final List subWorkflowInstance =
- repository.queryWorkflowInstance(context.getWorkflows().get(1));
- Assertions
- .assertThat(subWorkflowInstance)
- .isEmpty();
-
- Assertions
- .assertThat(repository.queryTaskInstance(workflowInstanceId))
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("sub_logic_task");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- assertThat(taskInstance.getDryRun()).isEqualTo(Flag.YES.getCode());
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one fake task(A) with multiple predecessors run success")
- void testStartWorkflow_with_oneTaskWithMultiplePredecessors_runSuccess() {
- final String yaml = "/it/start/workflow_with_one_fake_task_with_multiple_predecessors_success.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(parentWorkflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
-
- Assertions
- .assertThat(repository.queryTaskInstance(workflowInstanceId))
- .hasSize(4)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("B");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("C");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("D");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one fake task(A) with multiple predecessors run failed")
- void testStartWorkflow_with_oneTaskWithMultiplePredecessors_runFailed() {
- final String yaml = "/it/start/workflow_with_one_fake_task_with_multiple_predecessors_failed.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(parentWorkflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .failureStrategy(FailureStrategy.CONTINUE)
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.FAILURE);
-
- Assertions
- .assertThat(repository.queryTaskInstance(workflowInstanceId))
- .hasSize(3)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("B");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("C");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with shared downstream task when failed predecessor finishes first using continue failure strategy")
- void testStartWorkflow_with_sharedDownstreamTask_whenFailedPredecessorFinishFirst_usingFailureStrategyContinue() {
- final String yaml =
- "/it/start/workflow_with_shared_downstream_task_when_failed_predecessor_finish_first.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(parentWorkflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .failureStrategy(FailureStrategy.CONTINUE)
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.FAILURE)
- .matches(workflowInstance -> workflowInstance.getEndTime() != null);
-
- Assertions
- .assertThat(repository.queryTaskInstance(workflowInstanceId))
- .hasSize(2)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("B");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one sub workflow task(A) failed")
- public void testStartWorkflow_with_subWorkflowTask_failed() {
- final String yaml = "/it/start/workflow_with_sub_workflow_task_failed.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(parentWorkflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
-
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.FAILURE)
- .matches(
- workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO);
-
- final List subWorkflowInstance =
- repository.queryWorkflowInstance(context.getWorkflows().get(1));
- Assertions
- .assertThat(subWorkflowInstance)
- .hasSize(1)
- .satisfiesExactly(workflowInstance -> {
- assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.FAILURE);
- assertThat(workflowInstance.getIsSubWorkflow()).isEqualTo(Flag.YES);
- });
-
- Assertions
- .assertThat(repository.queryTaskInstance(workflowInstanceId))
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("sub_logic_task");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- });
-
- Assertions
- .assertThat(repository.queryTaskInstance(subWorkflowInstance.get(0).getId()))
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("fake_task");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- });
- });
-
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow which using workflow params")
- public void testStartWorkflow_usingWorkflowParam() {
- final String yaml = "/it/start/workflow_with_global_param.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.FAILURE));
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .hasSize(2)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("B");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- });
- });
-
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow which using command params")
- public void testStartWorkflow_usingCommandParam() {
- final String yaml = "/it/start/workflow_with_global_param.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final RunWorkflowCommandParam runWorkflowCommandParam = RunWorkflowCommandParam.builder()
- .commandParams(Lists.newArrayList(Property.builder()
- .prop("name")
- .direct(Direct.IN)
- .type(DataType.VARCHAR)
- .value("commandParam")
- .build()))
- .build();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(runWorkflowCommandParam)
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.FAILURE));
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .hasSize(2)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("B");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow contains fake task using local param will be overwrite by varpool")
- public void testStartWorkflow_fakeTask_usingLocalParamOverWriteByVarPool() {
- final String yaml = "/it/start/workflow_with_local_param_overwrite_by_varpool.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final RunWorkflowCommandParam runWorkflowCommandParam = RunWorkflowCommandParam.builder()
- .build();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(runWorkflowCommandParam)
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- List assertVarPools = Lists.newArrayList(
- Property.builder().prop("output").direct(Direct.OUT).type(DataType.VARCHAR).value("1").build());
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> {
- assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
- assertThat(VarPoolUtils.deserializeVarPool(workflowInstance.getVarPool()))
- .isEqualTo(assertVarPools);
- });
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .hasSize(3)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(VarPoolUtils.deserializeVarPool(taskInstance.getVarPool()))
- .isEqualTo(assertVarPools);
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("B");
- assertThat(VarPoolUtils.deserializeVarPool(taskInstance.getVarPool()))
- .isEqualTo(assertVarPools);
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("C");
- assertThat(VarPoolUtils.deserializeVarPool(taskInstance.getVarPool()))
- .isEqualTo(assertVarPools);
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow which using null key params")
- public void testStartWorkflow_usingNullKeyParam() {
- final String yaml = "/it/start/workflow_with_null_key_param.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final RunWorkflowCommandParam runWorkflowCommandParam = RunWorkflowCommandParam.builder()
- .commandParams(Lists.newArrayList(Property.builder()
- .prop(null)
- .direct(Direct.IN)
- .type(DataType.VARCHAR)
- .value("commandParam")
- .build()))
- .build();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(runWorkflowCommandParam)
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.SUCCESS));
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .hasSize(2)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("B");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one fake task(A) failed")
- public void testStartWorkflow_with_oneFailedTask() {
- final String yaml = "/it/start/workflow_with_one_fake_task_failed.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.FAILURE));
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one fake task(A) fatal")
- public void testStartWorkflow_with_oneFatalTask() {
- final String yaml = "/it/start/workflow_with_one_fake_task_fatal.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.FAILURE));
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one fake task(A) failed")
- public void testStartWorkflow_with_oneFailedTaskWithRetry() {
- final String yaml = "/it/start/workflow_with_one_fake_task_failed_with_retry.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(3))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.FAILURE));
-
- final List taskInstances = repository.queryTaskInstance(workflow);
- Assertions
- .assertThat(taskInstances)
- .allSatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- })
- .hasSize(2);
-
- final TaskInstance taskInstance = taskInstances.get(0);
- Assertions
- .assertThat(taskInstance)
- .matches(task -> task.getRetryTimes() == 0)
- .matches(task -> task.getFlag() == Flag.NO)
- .isNotNull();
-
- final TaskInstance latestTaskInstance = taskInstances.get(1);
- Assertions
- .assertThat(latestTaskInstance)
- .matches(task -> task.getRetryTimes() == 1)
- .matches(task -> task.getFlag() == Flag.YES)
- .isNotNull();
- assertThat(latestTaskInstance.getFirstSubmitTime()).isEqualTo(taskInstance.getFirstSubmitTime());
- assertThat(latestTaskInstance.getSubmitTime())
- .isAtLeast(DateUtils.addSeconds(taskInstance.getSubmitTime(), -65));
- assertThat(latestTaskInstance.getSubmitTime())
- .isAtMost(DateUtils.addMinutes(taskInstance.getSubmitTime(), 65));
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with two serial fake tasks(A -> B) success")
- public void testStartWorkflow_with_twoSerialSuccessTask() {
- String yaml = "/it/start/workflow_with_two_serial_fake_task_success.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.SUCCESS))
- .hasSize(1);
-
- final List taskInstances = repository.queryTaskInstance(workflow);
- Assertions
- .assertThat(taskInstances)
- .hasSize(2)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("B");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with two serial fake tasks(A(failed) -> B) success")
- public void testStartWorkflow_with_twoSerialFailedTask() {
- final String yaml = "/it/start/workflow_with_two_serial_fake_task_failed.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.FAILURE));
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with two parallel fake tasks(A, B) success")
- public void testStartWorkflow_with_twoParallelSuccessTask() {
- final String yaml = "/it/start/workflow_with_two_parallel_fake_task_success.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.SUCCESS));
-
- final List taskInstances = repository.queryTaskInstance(workflow);
- Assertions
- .assertThat(taskInstances)
- .hasSize(2)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("B");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with two parallel fake tasks(A(failed), B(failed)) success")
- public void testStartWorkflow_with_twoParallelFailedTask() {
- final String yaml = "/it/start/workflow_with_two_parallel_fake_task_failed.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .filteredOn(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.FAILURE)
- .hasSize(1);
-
- final List taskInstances = repository.queryTaskInstance(workflow);
- Assertions
- .assertThat(taskInstances)
- .hasSize(2)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("B");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with three parallel three fake tasks(A1->A2->A3, B1->B2->B3, C1->C2->C3) success")
- public void testStartWorkflow_with_threeParallelSuccessTask() {
- final String yaml = "/it/start/workflow_with_three_parallel_three_fake_task_success.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .filteredOn(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
- .hasSize(1);
-
- final List taskInstances = repository.queryTaskInstance(workflow);
- Assertions
- .assertThat(taskInstances)
- .hasSize(9)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A1");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A2");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A3");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("B1");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("B2");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("B3");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("C1");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("C2");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("C3");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with three parallel three fake tasks(A1->A2->A3, B1->B2->B3, C1->C2->C3) success")
- public void testStartWorkflowFromStartNodes_with_threeParallelSuccessTask() {
- final String yaml = "/it/start/workflow_with_three_parallel_three_fake_task_success.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final RunWorkflowCommandParam runWorkflowCommandParam = RunWorkflowCommandParam.builder()
- .startNodes(Lists.newArrayList(6L))
- .build();
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(runWorkflowCommandParam)
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .filteredOn(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
- .hasSize(1);
-
- final List taskInstances = repository.queryTaskInstance(workflow);
- Assertions
- .assertThat(taskInstances)
- .hasSize(2)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("C2");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("C3");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow which using workflow built in params")
- public void testStartWorkflow_usingWorkflowBuiltInParam() {
- final String yaml = "/it/start/workflow_with_built_in_param.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.SUCCESS));
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .hasSize(2)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("B");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow which contains a dep task with timeout kill strategy")
- public void testStartWorkflow_withTimeoutKillTask() {
- final String yaml = "/it/start/workflow_with_timeout_kill_task.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getWorkflow("workflow_with_timeout_kill_task");
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofSeconds(90))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.STOP));
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .hasSize(1)
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("dep_task_with_timeout_killed");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow which contains a dep task will be kill by system timeout")
- public void testStartWorkflow_withSystemTimeoutKillTask() {
- masterConfig.getServerLoadProtection().setMaxTaskInstanceRuntime(Duration.ofMinutes(1));
-
- final String yaml = "/it/start/workflow_with_system_timeout_kill_task.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getWorkflow("workflow_with_timeout_kill_task");
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofSeconds(90))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.STOP));
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .hasSize(1)
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("dep_task_with_timeout_killed");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with task depend type TASK_ONLY")
- public void testStartWorkflow_withTaskOnlyStrategy() {
- final String yaml = "/it/start/workflow_with_task_only_strategy.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam().withStartNodes(Lists.newArrayList(1L)))
- .taskDependType(TaskDependType.TASK_ONLY)
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.SUCCESS));
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .hasSize(1)
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with task which successors is forbidden")
- public void testStartWorkflow_withTaskSuccessorsIsForbidden() {
- final String yaml = "/it/start/workflow_with_task_successors_is_forbidden.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.SUCCESS));
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .hasSize(2)
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- }, (Consumer) taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("C1");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one condition task(B) when one fake predecessor task(A) run success")
- void testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runSuccess() {
- final String yaml = "/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_success.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(parentWorkflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
-
- Assertions
- .assertThat(repository.queryTaskInstance(workflowInstanceId))
- .hasSize(3)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("B");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("C");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one condition task(B) which is forbidden when one fake predecessor task(A) run success")
- void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runSuccess() {
- final String yaml =
- "/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_success.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(parentWorkflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
- Assertions
- .assertThat(repository.queryTaskInstance(workflowInstanceId))
- .hasSize(3)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("C");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("D");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one condition task(B) when one fake predecessor task(A) run failed")
- void testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runFailed() {
- final String yaml = "/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_failed.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(parentWorkflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
-
- Assertions
- .assertThat(repository.queryTaskInstance(workflowInstanceId))
- .hasSize(3)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("B");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("D");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one condition task(B) when one fake predecessor task(A) run fatal")
- void testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runFatal() {
- final String yaml = "/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_fatal.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(parentWorkflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
-
- Assertions
- .assertThat(repository.queryTaskInstance(workflowInstanceId))
- .hasSize(3)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("B");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- })
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("D");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one condition task(B) which is forbidden when one fake predecessor task(A) run failed")
- void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runFailed() {
- final String yaml =
- "/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_failed.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(parentWorkflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.FAILURE);
-
- Assertions
- .assertThat(repository.queryTaskInstance(workflowInstanceId))
- .hasSize(1)
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow with one condition task(B) which is forbidden when one fake predecessor task(A) run fatal")
- void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runFatal() {
- final String yaml =
- "/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_fatal.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(parentWorkflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofMinutes(1))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.FAILURE);
-
- Assertions
- .assertThat(repository.queryTaskInstance(workflowInstanceId))
- .hasSize(1)
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- });
- });
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow whose task specifies a non-existent worker group when dispatch timeout is enabled")
- public void testTaskFail_with_workerGroupNotFoundAndTimeoutEnabled() {
- TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
- taskDispatchPolicy.setDispatchTimeoutEnabled(true);
- taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofSeconds(10));
- this.masterConfig.setTaskDispatchPolicy(taskDispatchPolicy);
-
- final String yaml = "/it/start/workflow_with_worker_group_not_found.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofSeconds(30))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .hasSize(1)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getWorkerGroup()).isEqualTo("workerGroupNotFound");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- });
-
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.FAILURE));
- });
-
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow whose task specifies a non-existent worker group when dispatch timeout is disabled")
- public void testTaskRemainsSubmittedSuccess_with_workerGroupNotFoundAndTimeoutDisabled() {
- TaskDispatchPolicy policy = new TaskDispatchPolicy();
- policy.setDispatchTimeoutEnabled(false);
- this.masterConfig.setTaskDispatchPolicy(policy);
-
- final String yaml = "/it/start/workflow_with_worker_group_not_found.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofSeconds(30))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .hasSize(1)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getWorkerGroup()).isEqualTo("workerGroupNotFound");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUBMITTED_SUCCESS);
- });
-
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION));
-
- });
-
- // This test intentionally leaves the workflow running, so we skip the resource cleanup check.
- // masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow when no available worker and dispatch timeout is enabled")
- public void testTaskFail_with_noAvailableWorkerAndTimeoutEnabled() {
- TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
- taskDispatchPolicy.setDispatchTimeoutEnabled(true);
- taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofSeconds(10));
- this.masterConfig.setTaskDispatchPolicy(taskDispatchPolicy);
-
- final String yaml = "/it/start/workflow_with_no_available_worker.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofSeconds(30))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .hasSize(1)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getWorkerGroup()).isEqualTo("default");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
- });
-
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.FAILURE));
- });
-
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow when no available worker and dispatch timeout is disabled")
- public void testTaskRemainsSubmittedSuccess_with_noAvailableWorkerAndTimeoutDisabled() {
- TaskDispatchPolicy policy = new TaskDispatchPolicy();
- policy.setDispatchTimeoutEnabled(false);
- this.masterConfig.setTaskDispatchPolicy(policy);
-
- final String yaml = "/it/start/workflow_with_no_available_worker.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .build();
- workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofSeconds(30))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .hasSize(1)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("A");
- assertThat(taskInstance.getWorkerGroup()).isEqualTo("default");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUBMITTED_SUCCESS);
- });
-
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION));
- });
-
- // This test intentionally leaves the workflow running, so we skip the resource cleanup check.
- // masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow when timeout should trigger alert when warningGroupId is set")
- public void testWorkflowTimeout_WithAlertGroup_ShouldSendAlert() {
- final String yaml = "/it/start/workflow_with_workflow_timeout_alert.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getOneWorkflow();
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .warningGroupId(workflow.getWarningGroupId())
- .build();
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await().atMost(Duration.ofMinutes(2))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(
- workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .hasSize(1)
- .anySatisfy(taskInstance -> {
- assertThat(taskInstance.getName()).isEqualTo("long_running_task");
- assertThat(taskInstance.getWorkerGroup()).isEqualTo("default");
- assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
- });
- Assertions
- .assertThat(repository.queryAlert(workflowInstanceId))
- .hasSize(1)
- .anySatisfy(alert -> {
- assertThat(alert.getTitle()).isEqualTo("Workflow Timeout Warn");
- assertThat(alert.getProjectCode()).isEqualTo(1);
- assertThat(alert.getWorkflowDefinitionCode()).isEqualTo(1);
- assertThat(alert.getAlertType()).isEqualTo(AlertType.WORKFLOW_INSTANCE_TIMEOUT);
- });
- });
-
- masterContainer.assertAllResourceReleased();
- }
- @Test
- @DisplayName("Test start a workflow which contains a dep task with timeout warn strategy")
- public void testStartWorkflow_withTimeoutWarnTask() {
- masterConfig.getServerLoadProtection().setEnabled(false);
-
- final String yaml = "/it/start/workflow_with_timeout_warn_task.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getWorkflow("workflow_with_timeout_warn_task");
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO
- .builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .warningGroupId(workflow.getWarningGroupId())
- .build();
-
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofSeconds(90))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(
- workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION));
-
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .hasSize(1)
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getName())
- .isEqualTo("dep_task_with_timeout_warn");
- assertThat(taskInstance.getState())
- .isEqualTo(TaskExecutionStatus.RUNNING_EXECUTION);
- });
-
- Assertions
- .assertThat(repository.queryAlert(workflowInstanceId))
- .isNotEmpty()
- .anySatisfy(alert -> {
- assertThat(alert.getAlertType())
- .isEqualTo(AlertType.TASK_TIMEOUT);
- });
- });
-
- workflowOperator.stopWorkflowInstance(workflowInstanceId);
- await()
- .atMost(Duration.ofSeconds(30))
- .untilAsserted(() -> Assertions.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
- .matches(w -> w.getState() == WorkflowExecutionStatus.STOP));
- masterContainer.assertAllResourceReleased();
- }
-
- @Test
- @DisplayName("Test start a workflow which contains a dep task with timeout warn failed strategy")
- public void testStartWorkflow_withTimeoutWarnFailedTask() {
- masterConfig.getServerLoadProtection().setEnabled(false);
-
- final String yaml = "/it/start/workflow_with_timeout_warnfailed_task.yaml";
- final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
- final WorkflowDefinition workflow = context.getWorkflow("workflow_with_timeout_warnfailed_task");
-
- final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO
- .builder()
- .workflowDefinition(workflow)
- .runWorkflowCommandParam(new RunWorkflowCommandParam())
- .warningGroupId(workflow.getWarningGroupId())
- .build();
-
- final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
-
- await()
- .atMost(Duration.ofSeconds(90))
- .untilAsserted(() -> {
- Assertions
- .assertThat(repository.queryWorkflowInstance(workflow))
- .satisfiesExactly(workflowInstance -> assertThat(
- workflowInstance.getState())
- .isEqualTo(WorkflowExecutionStatus.STOP));
-
- Assertions
- .assertThat(repository.queryTaskInstance(workflow))
- .hasSize(1)
- .satisfiesExactly(taskInstance -> {
- assertThat(taskInstance.getName())
- .isEqualTo("dep_task_with_timeout_warnfailed");
- assertThat(taskInstance.getState())
- .isEqualTo(TaskExecutionStatus.KILL);
- });
-
- Assertions
- .assertThat(repository.queryAlert(workflowInstanceId))
- .isNotEmpty()
- .anySatisfy(alert -> {
- assertThat(alert.getAlertType())
- .isEqualTo(AlertType.TASK_TIMEOUT);
- });
- });
-
- masterContainer.assertAllResourceReleased();
- }
-
-}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTimeoutTestCase.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTimeoutTestCase.java
new file mode 100644
index 000000000000..935290104fc2
--- /dev/null
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTimeoutTestCase.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.integration.cases;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import org.apache.dolphinscheduler.common.enums.AlertType;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
+import org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
+import org.apache.dolphinscheduler.server.master.integration.WorkflowOperator;
+import org.apache.dolphinscheduler.server.master.integration.WorkflowTestCaseContext;
+
+import java.time.Duration;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for workflow start timeout and alert scenarios.
+ */
+public class WorkflowStartTimeoutTestCase extends AbstractMasterIntegrationTestCase {
+
+ @Test
+ @DisplayName("Test start a workflow which contains a dep task with timeout kill strategy")
+ public void testStartWorkflow_withTimeoutKillTask() {
+ final String yaml = "/it/start/workflow_with_timeout_kill_task.yaml";
+ final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getWorkflow("workflow_with_timeout_kill_task");
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofSeconds(90))
+ .untilAsserted(() -> {
+ Assertions
+ .assertThat(repository.queryWorkflowInstance(workflow))
+ .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
+ .isEqualTo(WorkflowExecutionStatus.STOP));
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .hasSize(1)
+ .satisfiesExactly(taskInstance -> {
+ assertThat(taskInstance.getName()).isEqualTo("dep_task_with_timeout_killed");
+ assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL);
+ });
+ });
+ masterContainer.assertAllResourceReleased();
+ }
+
+ @Test
+ @DisplayName("Test start a workflow which contains a dep task will be kill by system timeout")
+ public void testStartWorkflow_withSystemTimeoutKillTask() {
+ masterConfig.getServerLoadProtection().setMaxTaskInstanceRuntime(Duration.ofMinutes(1));
+
+ final String yaml = "/it/start/workflow_with_system_timeout_kill_task.yaml";
+ final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getWorkflow("workflow_with_timeout_kill_task");
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofSeconds(90))
+ .untilAsserted(() -> {
+ Assertions
+ .assertThat(repository.queryWorkflowInstance(workflow))
+ .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
+ .isEqualTo(WorkflowExecutionStatus.STOP));
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .hasSize(1)
+ .satisfiesExactly(taskInstance -> {
+ assertThat(taskInstance.getName()).isEqualTo("dep_task_with_timeout_killed");
+ assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL);
+ });
+ });
+ masterContainer.assertAllResourceReleased();
+ }
+
+ @Test
+ @DisplayName("Test start a workflow when timeout should trigger alert when warningGroupId is set")
+ public void testWorkflowTimeout_WithAlertGroup_ShouldSendAlert() {
+ final String yaml = "/it/start/workflow_with_workflow_timeout_alert.yaml";
+ final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .warningGroupId(workflow.getWarningGroupId())
+ .build();
+ final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await().atMost(Duration.ofMinutes(2))
+ .untilAsserted(() -> {
+ Assertions
+ .assertThat(repository.queryWorkflowInstance(workflowInstanceId))
+ .matches(
+ workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .hasSize(1)
+ .anySatisfy(taskInstance -> {
+ assertThat(taskInstance.getName()).isEqualTo("long_running_task");
+ assertThat(taskInstance.getWorkerGroup()).isEqualTo("default");
+ assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+ });
+ Assertions
+ .assertThat(repository.queryAlert(workflowInstanceId))
+ .hasSize(1)
+ .anySatisfy(alert -> {
+ assertThat(alert.getTitle()).isEqualTo("Workflow Timeout Warn");
+ assertThat(alert.getProjectCode()).isEqualTo(1);
+ assertThat(alert.getWorkflowDefinitionCode()).isEqualTo(1);
+ assertThat(alert.getAlertType()).isEqualTo(AlertType.WORKFLOW_INSTANCE_TIMEOUT);
+ });
+ });
+
+ masterContainer.assertAllResourceReleased();
+ }
+
+ @Test
+ @DisplayName("Test start a workflow which contains a dep task with timeout warn strategy")
+ public void testStartWorkflow_withTimeoutWarnTask() {
+ masterConfig.getServerLoadProtection().setEnabled(false);
+
+ final String yaml = "/it/start/workflow_with_timeout_warn_task.yaml";
+ final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getWorkflow("workflow_with_timeout_warn_task");
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO
+ .builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .warningGroupId(workflow.getWarningGroupId())
+ .build();
+
+ final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofSeconds(90))
+ .untilAsserted(() -> {
+ Assertions
+ .assertThat(repository.queryWorkflowInstance(workflow))
+ .satisfiesExactly(workflowInstance -> assertThat(
+ workflowInstance.getState())
+ .isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION));
+
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .hasSize(1)
+ .satisfiesExactly(taskInstance -> {
+ assertThat(taskInstance.getName())
+ .isEqualTo("dep_task_with_timeout_warn");
+ assertThat(taskInstance.getState())
+ .isEqualTo(TaskExecutionStatus.RUNNING_EXECUTION);
+ });
+
+ Assertions
+ .assertThat(repository.queryAlert(workflowInstanceId))
+ .isNotEmpty()
+ .anySatisfy(alert -> {
+ assertThat(alert.getAlertType())
+ .isEqualTo(AlertType.TASK_TIMEOUT);
+ });
+ });
+
+ workflowOperator.stopWorkflowInstance(workflowInstanceId);
+ await()
+ .atMost(Duration.ofSeconds(30))
+ .untilAsserted(() -> Assertions.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
+ .matches(w -> w.getState() == WorkflowExecutionStatus.STOP));
+ masterContainer.assertAllResourceReleased();
+ }
+
+ @Test
+ @DisplayName("Test start a workflow which contains a dep task with timeout warn failed strategy")
+ public void testStartWorkflow_withTimeoutWarnFailedTask() {
+ masterConfig.getServerLoadProtection().setEnabled(false);
+
+ final String yaml = "/it/start/workflow_with_timeout_warnfailed_task.yaml";
+ final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getWorkflow("workflow_with_timeout_warnfailed_task");
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO
+ .builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .warningGroupId(workflow.getWarningGroupId())
+ .build();
+
+ final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofSeconds(90))
+ .untilAsserted(() -> {
+ Assertions
+ .assertThat(repository.queryWorkflowInstance(workflow))
+ .satisfiesExactly(workflowInstance -> assertThat(
+ workflowInstance.getState())
+ .isEqualTo(WorkflowExecutionStatus.STOP));
+
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .hasSize(1)
+ .satisfiesExactly(taskInstance -> {
+ assertThat(taskInstance.getName())
+ .isEqualTo("dep_task_with_timeout_warnfailed");
+ assertThat(taskInstance.getState())
+ .isEqualTo(TaskExecutionStatus.KILL);
+ });
+
+ Assertions
+ .assertThat(repository.queryAlert(workflowInstanceId))
+ .isNotEmpty()
+ .anySatisfy(alert -> {
+ assertThat(alert.getAlertType())
+ .isEqualTo(AlertType.TASK_TIMEOUT);
+ });
+ });
+
+ masterContainer.assertAllResourceReleased();
+ }
+
+}