Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ private Constants() {
}

public static final String CONFIG_OPTIONS = "--config";
public static final String DEPLOY_MODE_OPTIONS = "--deploy-mode";
public static final String MASTER_OPTIONS = "--master";
public static final String STARTUP_SCRIPT_SPARK = "spark";
public static final String STARTUP_SCRIPT_FLINK = "flink";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void init() {
public List<String> buildOptions() throws Exception {
List<String> args = super.buildOptions();
if (!Objects.isNull(seatunnelParameters.getDeployMode())) {
args.add(Constants.DEPLOY_MODE_OPTIONS);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should remove Constants.DEPLOY_MODE_OPTIONS since it's useless.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 935c4b6 by removing Constants.DEPLOY_MODE_OPTIONS and inlining the Spark task --deploy-mode argument to preserve the existing Spark command behavior.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You remove the usage of Constants.DEPLOY_MODE_OPTIONS. But it didn't modify its test and remove this constant.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in the latest push: Constants.DEPLOY_MODE_OPTIONS is removed, and the engine path is covered by SeatunnelEngineTaskTest.

args.add(Constants.MASTER_OPTIONS);
args.add(seatunnelParameters.getDeployMode().getCommand());
}
if (StringUtils.isNotBlank(seatunnelParameters.getOthers())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.dolphinscheduler.plugin.task.seatunnel.spark;

import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.DEPLOY_MODE_OPTIONS;
import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.MASTER_OPTIONS;

import org.apache.dolphinscheduler.common.utils.JSONUtils;
Expand Down Expand Up @@ -51,7 +50,7 @@ public void init() {
@Override
public List<String> buildOptions() throws Exception {
List<String> args = super.buildOptions();
args.add(DEPLOY_MODE_OPTIONS);
args.add("--deploy-mode");
args.add(seatunnelParameters.getDeployMode().getCommand());

MasterTypeEnum master = DeployModeEnum.local == seatunnelParameters.getDeployMode() ? MasterTypeEnum.LOCAL
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.task.seatunnel.self;

import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.seatunnel.DeployModeEnum;

import org.apache.commons.io.FileUtils;

import java.lang.reflect.Field;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

public class SeatunnelEngineTaskTest {

Check warning on line 35 in dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTaskTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZ6_xZPVwhkEO4EgyLbX&open=AZ6_xZPVwhkEO4EgyLbX&pullRequest=18315

private static final String EXECUTE_PATH = "/tmp";
private static final String TASK_ID = "1234";
private static final String RAW_SCRIPT =
"env {\n job.mode = \"BATCH\"\n}\nsource {\n FakeSource {}\n}\nsink {\n Console {}\n}";

private MockedStatic<FileUtils> mockedStaticFileUtils;

@BeforeEach
public void setUp() {

Check warning on line 45 in dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTaskTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZ6_xZPVwhkEO4EgyLbS&open=AZ6_xZPVwhkEO4EgyLbS&pullRequest=18315
mockedStaticFileUtils = Mockito.mockStatic(FileUtils.class);
}

@AfterEach
public void after() {

Check warning on line 50 in dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTaskTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZ6_xZPVwhkEO4EgyLbT&open=AZ6_xZPVwhkEO4EgyLbT&pullRequest=18315
mockedStaticFileUtils.close();
}

@Test
public void buildOptionsUsesMasterForDeployMode() throws Exception {

Check warning on line 55 in dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTaskTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZ6_xZPVwhkEO4EgyLbU&open=AZ6_xZPVwhkEO4EgyLbU&pullRequest=18315
SeatunnelEngineParameters seatunnelParameters = newSeatunnelEngineParameters();
seatunnelParameters.setDeployMode(DeployModeEnum.cluster);

SeatunnelEngineTask task = newSeatunnelEngineTask(seatunnelParameters);

String command = String.join(" ", task.buildOptions());
String expectedCommand = String.format("--config %s/seatunnel_%s.conf --master cluster", EXECUTE_PATH, TASK_ID);
Assertions.assertEquals(expectedCommand, command);
}

@Test
public void buildOptionsOmitsMasterWhenDeployModeMissing() throws Exception {

Check warning on line 67 in dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTaskTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZ6_xZPVwhkEO4EgyLbV&open=AZ6_xZPVwhkEO4EgyLbV&pullRequest=18315
SeatunnelEngineParameters seatunnelParameters = newSeatunnelEngineParameters();

SeatunnelEngineTask task = newSeatunnelEngineTask(seatunnelParameters);

String command = String.join(" ", task.buildOptions());
String expectedCommand = String.format("--config %s/seatunnel_%s.conf", EXECUTE_PATH, TASK_ID);
Assertions.assertEquals(expectedCommand, command);
}

@Test
public void buildOptionsAppendsOthersWhenPresent() throws Exception {

Check warning on line 78 in dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTaskTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZ6_xZPVwhkEO4EgyLbW&open=AZ6_xZPVwhkEO4EgyLbW&pullRequest=18315
SeatunnelEngineParameters seatunnelParameters = newSeatunnelEngineParameters();
seatunnelParameters.setDeployMode(DeployModeEnum.local);
seatunnelParameters.setOthers("--name demo-job");

SeatunnelEngineTask task = newSeatunnelEngineTask(seatunnelParameters);

String command = String.join(" ", task.buildOptions());
String expectedCommand =
String.format("--config %s/seatunnel_%s.conf --master local --name demo-job", EXECUTE_PATH, TASK_ID);
Assertions.assertEquals(expectedCommand, command);
}

private SeatunnelEngineParameters newSeatunnelEngineParameters() {
SeatunnelEngineParameters seatunnelParameters = new SeatunnelEngineParameters();
seatunnelParameters.setUseCustom(true);
seatunnelParameters.setRawScript(RAW_SCRIPT);
return seatunnelParameters;
}

private SeatunnelEngineTask newSeatunnelEngineTask(SeatunnelEngineParameters seatunnelParameters) throws Exception {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setExecutePath(EXECUTE_PATH);
taskExecutionContext.setTaskAppId(TASK_ID);

Check warning on line 101 in dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTaskTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this use of "setTaskAppId"; it is deprecated.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZ6_xZPVwhkEO4EgyLbY&open=AZ6_xZPVwhkEO4EgyLbY&pullRequest=18315
taskExecutionContext.setTaskParams(JSONUtils.toJsonString(seatunnelParameters));

SeatunnelEngineTask task = new SeatunnelEngineTask(taskExecutionContext);
task.setSeatunnelParameters(seatunnelParameters);
setPrivateSeatunnelParameters(task, seatunnelParameters);
return task;
}

private void setPrivateSeatunnelParameters(SeatunnelEngineTask task,
SeatunnelEngineParameters seatunnelParameters) throws Exception {
Field field = SeatunnelEngineTask.class.getDeclaredField("seatunnelParameters");
field.setAccessible(true);
field.set(task, seatunnelParameters);
}
}
Loading