From 937175a1c9647f61439c9beefe0eb3bcb4964809 Mon Sep 17 00:00:00 2001 From: JinRudy <28995387+JinRudy@users.noreply.github.com> Date: Fri, 5 Jun 2026 11:02:34 +0800 Subject: [PATCH] [Fix-16617][Seatunnel] Use master option for engine deploy mode --- .../plugin/task/seatunnel/Constants.java | 1 - .../seatunnel/self/SeatunnelEngineTask.java | 2 +- .../seatunnel/spark/SeatunnelSparkTask.java | 3 +- .../self/SeatunnelEngineTaskTest.java | 116 ++++++++++++++++++ 4 files changed, 118 insertions(+), 4 deletions(-) create mode 100644 dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTaskTest.java diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java index 5f0f22b0a1ea..217e7eec16e7 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java @@ -24,7 +24,6 @@ private Constants() { } public static final String CONFIG_OPTIONS = "--config"; - public static final String DEPLOY_MODE_OPTIONS = "--deploy-mode"; public static final String MASTER_OPTIONS = "--master"; public static final String STARTUP_SCRIPT_SPARK = "spark"; public static final String STARTUP_SCRIPT_FLINK = "flink"; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTask.java index f870dfdcd9b7..55acbd656978 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTask.java @@ -46,7 +46,7 @@ public void init() { public List buildOptions() throws Exception { List args = super.buildOptions(); if (!Objects.isNull(seatunnelParameters.getDeployMode())) { - args.add(Constants.DEPLOY_MODE_OPTIONS); + args.add(Constants.MASTER_OPTIONS); args.add(seatunnelParameters.getDeployMode().getCommand()); } if (StringUtils.isNotBlank(seatunnelParameters.getOthers())) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkTask.java index cb1e6aeb3e75..0e0deadc907f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkTask.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.plugin.task.seatunnel.spark; -import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.DEPLOY_MODE_OPTIONS; import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.MASTER_OPTIONS; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -51,7 +50,7 @@ public void init() { @Override public List buildOptions() throws Exception { List args = super.buildOptions(); - args.add(DEPLOY_MODE_OPTIONS); + args.add("--deploy-mode"); args.add(seatunnelParameters.getDeployMode().getCommand()); MasterTypeEnum master = DeployModeEnum.local == seatunnelParameters.getDeployMode() ? MasterTypeEnum.LOCAL diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTaskTest.java new file mode 100644 index 000000000000..16552c7ba42b --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTaskTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.seatunnel.self; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.seatunnel.DeployModeEnum; + +import org.apache.commons.io.FileUtils; + +import java.lang.reflect.Field; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +public class SeatunnelEngineTaskTest { + + private static final String EXECUTE_PATH = "/tmp"; + private static final String TASK_ID = "1234"; + private static final String RAW_SCRIPT = + "env {\n job.mode = \"BATCH\"\n}\nsource {\n FakeSource {}\n}\nsink {\n Console {}\n}"; + + private MockedStatic mockedStaticFileUtils; + + @BeforeEach + public void setUp() { + mockedStaticFileUtils = Mockito.mockStatic(FileUtils.class); + } + + @AfterEach + public void after() { + mockedStaticFileUtils.close(); + } + + @Test + public void buildOptionsUsesMasterForDeployMode() throws Exception { + SeatunnelEngineParameters seatunnelParameters = newSeatunnelEngineParameters(); + seatunnelParameters.setDeployMode(DeployModeEnum.cluster); + + SeatunnelEngineTask task = newSeatunnelEngineTask(seatunnelParameters); + + String command = String.join(" ", task.buildOptions()); + String expectedCommand = String.format("--config %s/seatunnel_%s.conf --master cluster", EXECUTE_PATH, TASK_ID); + Assertions.assertEquals(expectedCommand, command); + } + + @Test + public void buildOptionsOmitsMasterWhenDeployModeMissing() throws Exception { + SeatunnelEngineParameters seatunnelParameters = newSeatunnelEngineParameters(); + + SeatunnelEngineTask task = newSeatunnelEngineTask(seatunnelParameters); + + String command = String.join(" ", task.buildOptions()); + String expectedCommand = String.format("--config %s/seatunnel_%s.conf", EXECUTE_PATH, TASK_ID); + Assertions.assertEquals(expectedCommand, command); + } + + @Test + public void buildOptionsAppendsOthersWhenPresent() throws Exception { + SeatunnelEngineParameters seatunnelParameters = newSeatunnelEngineParameters(); + seatunnelParameters.setDeployMode(DeployModeEnum.local); + seatunnelParameters.setOthers("--name demo-job"); + + SeatunnelEngineTask task = newSeatunnelEngineTask(seatunnelParameters); + + String command = String.join(" ", task.buildOptions()); + String expectedCommand = + String.format("--config %s/seatunnel_%s.conf --master local --name demo-job", EXECUTE_PATH, TASK_ID); + Assertions.assertEquals(expectedCommand, command); + } + + private SeatunnelEngineParameters newSeatunnelEngineParameters() { + SeatunnelEngineParameters seatunnelParameters = new SeatunnelEngineParameters(); + seatunnelParameters.setUseCustom(true); + seatunnelParameters.setRawScript(RAW_SCRIPT); + return seatunnelParameters; + } + + private SeatunnelEngineTask newSeatunnelEngineTask(SeatunnelEngineParameters seatunnelParameters) throws Exception { + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setExecutePath(EXECUTE_PATH); + taskExecutionContext.setTaskAppId(TASK_ID); + taskExecutionContext.setTaskParams(JSONUtils.toJsonString(seatunnelParameters)); + + SeatunnelEngineTask task = new SeatunnelEngineTask(taskExecutionContext); + task.setSeatunnelParameters(seatunnelParameters); + setPrivateSeatunnelParameters(task, seatunnelParameters); + return task; + } + + private void setPrivateSeatunnelParameters(SeatunnelEngineTask task, + SeatunnelEngineParameters seatunnelParameters) throws Exception { + Field field = SeatunnelEngineTask.class.getDeclaredField("seatunnelParameters"); + field.setAccessible(true); + field.set(task, seatunnelParameters); + } +}