Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@

import java.io.File;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
Expand Down Expand Up @@ -184,12 +186,31 @@
@Autowired
private FlinkK8sWatcherWrapper k8sWatcherWrapper;

private static final List<Integer> STARTABLE_STATES =
Collections.unmodifiableList(
Arrays.asList(
FlinkAppStateEnum.ADDED.getValue(),
FlinkAppStateEnum.CREATED.getValue(),
FlinkAppStateEnum.FAILED.getValue(),
FlinkAppStateEnum.CANCELED.getValue(),
FlinkAppStateEnum.FINISHED.getValue(),
FlinkAppStateEnum.LOST.getValue(),
FlinkAppStateEnum.TERMINATED.getValue(),
FlinkAppStateEnum.SUCCEEDED.getValue(),
FlinkAppStateEnum.KILLED.getValue(),
FlinkAppStateEnum.POS_TERMINATED.getValue()));

private final Map<Long, CompletableFuture<SubmitResponse>> startFutureMap =
new ConcurrentHashMap<>();

private final Map<Long, CompletableFuture<CancelResponse>> cancelFutureMap =
new ConcurrentHashMap<>();

/** Guard against concurrent start/cancel requests for the same application. */
private final Set<Long> pendingStarts = ConcurrentHashMap.newKeySet();

private final Set<Long> pendingCancels = ConcurrentHashMap.newKeySet();

@Override
public void revoke(Long appId) throws ApplicationException {
FlinkApplication application = getById(appId);
Expand Down Expand Up @@ -244,8 +265,37 @@

@Override
public void cancel(FlinkApplication appParam) throws Exception {
if (!pendingCancels.add(appParam.getId())) {
log.warn(
"[StreamPark] Cancel is already in progress for application id={}",
appParam.getId());
return;
}

FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.CANCELLING);
FlinkApplication application = getById(appParam.getId());

Date optionTime = new Date();
Integer optionState =
appParam.getRestoreOrTriggerSavepoint()
? OptionStateEnum.SAVEPOINTING.getValue()
: OptionStateEnum.CANCELLING.getValue();
boolean stateUpdated =
this.lambdaUpdate()
.eq(FlinkApplication::getId, appParam.getId())
.ne(FlinkApplication::getState, FlinkAppStateEnum.CANCELLING.getValue())
.set(FlinkApplication::getState, FlinkAppStateEnum.CANCELLING.getValue())
.set(FlinkApplication::getOptionTime, optionTime)
.set(FlinkApplication::getOptionState, optionState)
.update();
if (!stateUpdated) {
pendingCancels.remove(appParam.getId());
log.warn(
"[StreamPark] Application id={} is already cancelling or not cancellable",
appParam.getId());
return;
}

application.setState(FlinkAppStateEnum.CANCELLING.getValue());

ApplicationLog applicationLog = new ApplicationLog();
Expand All @@ -259,13 +309,9 @@

if (appParam.getRestoreOrTriggerSavepoint()) {
FlinkAppHttpWatcher.addSavepoint(application.getId());
application.setOptionState(OptionStateEnum.SAVEPOINTING.getValue());
} else {
application.setOptionState(OptionStateEnum.CANCELLING.getValue());
}

application.setOptionTime(new Date());
this.baseMapper.updateById(application);
application.setOptionState(optionState);
application.setOptionTime(optionTime);

Long userId = ServiceHelper.getUserId();
if (!application.getUserId().equals(userId)) {
Expand Down Expand Up @@ -325,6 +371,7 @@

cancelFuture.whenCompleteAsync(
(cancelResponse, throwable) -> {
pendingCancels.remove(application.getId());
cancelFutureMap.remove(application.getId());

if (throwable != null) {
Expand Down Expand Up @@ -380,119 +427,138 @@
}

@Override
public void start(FlinkApplication appParam, boolean auto) throws Exception {

Check failure on line 430 in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 18 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_incubator-streampark&issues=AZ8IBb1lDyical_VQp4I&open=AZ8IBb1lDyical_VQp4I&pullRequest=4385
// 1) check application
final FlinkApplication application = getById(appParam.getId());
AssertUtils.notNull(application);
ApiAlertException.throwIfTrue(
!application.isCanBeStart(), "[StreamPark] The application cannot be started repeatedly.");

if (FlinkDeployMode.isRemoteMode(application.getDeployModeEnum())
|| FlinkDeployMode.isSessionMode(application.getDeployModeEnum())) {
checkBeforeStart(application);
}

if (FlinkDeployMode.isYarnMode(application.getDeployModeEnum())) {
if (!pendingStarts.add(application.getId())) {
ApiAlertException.throwIfTrue(
!applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(),
"[StreamPark] The same task name is already running in the yarn queue");
true, "[StreamPark] The application cannot be started repeatedly.");
}

ApplicationBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId());
AssertUtils.notNull(buildPipeline);

FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId());
ApiAlertException.throwIfNull(flinkEnv, "[StreamPark] can no found flink version");

// if manually started, clear the restart flag
if (!auto) {
application.setRestartCount(0);
} else {
if (!application.isNeedRestartOnFailed()) {
return;
boolean startSubmitted = false;
try {
if (FlinkDeployMode.isRemoteMode(application.getDeployModeEnum())
|| FlinkDeployMode.isSessionMode(application.getDeployModeEnum())) {
checkBeforeStart(application);
}
appParam.setRestoreOrTriggerSavepoint(true);
application.setRestartCount(application.getRestartCount() + 1);
}
// 2) update app state to starting...
starting(application);
ApplicationLog applicationLog = constructAppLog(application);
// set the latest to Effective, (it will only become the current effective at this time)
applicationManageService.toEffective(application);

Map<String, Object> extraParameter = new HashMap<>(0);
if (application.isFlinkSql()) {
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true);
// Get the sql of the replaced placeholder
String realSql = variableService.replaceVariable(application.getTeamId(), flinkSql.getSql());
flinkSql.setSql(DeflaterUtils.zipString(realSql));
extraParameter.put(ConfigKeys.KEY_FLINK_SQL(null), flinkSql.getSql());
}
if (FlinkDeployMode.isYarnMode(application.getDeployModeEnum())) {
ApiAlertException.throwIfTrue(
!applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(),
"[StreamPark] The same task name is already running in the yarn queue");
}

Tuple2<String, String> userJarAndAppConf = getUserJarAndAppConf(flinkEnv, application);
String flinkUserJar = userJarAndAppConf.t1;
String appConf = userJarAndAppConf.t2;
ApplicationBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId());
AssertUtils.notNull(buildPipeline);

BuildResult buildResult = buildPipeline.getBuildResult();
if (FlinkDeployMode.YARN_APPLICATION == application.getDeployModeEnum()) {
buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
}
FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId());
ApiAlertException.throwIfNull(flinkEnv, "[StreamPark] can no found flink version");

// Get the args after placeholder replacement
String args =
StringUtils.isBlank(appParam.getArgs()) ? application.getArgs() : appParam.getArgs();
String applicationArgs = variableService.replaceVariable(application.getTeamId(), args);
// if manually started, clear the restart flag
if (!auto) {
application.setRestartCount(0);
} else {
if (!application.isNeedRestartOnFailed()) {
pendingStarts.remove(application.getId());
return;
}
appParam.setRestoreOrTriggerSavepoint(true);
application.setRestartCount(application.getRestartCount() + 1);
}
// 2) atomically update app state to starting
if (!tryMarkStarting(application.getId())) {
pendingStarts.remove(application.getId());
ApiAlertException.throwIfTrue(
true, "[StreamPark] The application cannot be started repeatedly.");
}
application.setState(FlinkAppStateEnum.STARTING.getValue());
application.setOptionTime(new Date());
ApplicationLog applicationLog = constructAppLog(application);
// set the latest to Effective, (it will only become the current effective at this time)
applicationManageService.toEffective(application);

Map<String, Object> extraParameter = new HashMap<>(0);
if (application.isFlinkSql()) {
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true);
// Get the sql of the replaced placeholder
String realSql = variableService.replaceVariable(application.getTeamId(), flinkSql.getSql());
flinkSql.setSql(DeflaterUtils.zipString(realSql));
extraParameter.put(ConfigKeys.KEY_FLINK_SQL(null), flinkSql.getSql());
}

Tuple3<String, String, FlinkK8sRestExposedType> clusterIdNamespace =
getNamespaceClusterId(application);
String k8sNamespace = clusterIdNamespace.t1;
String k8sClusterId = clusterIdNamespace.t2;
FlinkK8sRestExposedType exposedType = clusterIdNamespace.t3;
Tuple2<String, String> userJarAndAppConf = getUserJarAndAppConf(flinkEnv, application);
String flinkUserJar = userJarAndAppConf.t1;
String appConf = userJarAndAppConf.t2;

String dynamicProperties =
StringUtils.isBlank(appParam.getDynamicProperties())
? application.getDynamicProperties()
: appParam.getDynamicProperties();
BuildResult buildResult = buildPipeline.getBuildResult();
if (FlinkDeployMode.YARN_APPLICATION == application.getDeployModeEnum()) {
buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
}

SubmitRequest submitRequest =
new SubmitRequest(
flinkEnv.getFlinkVersion(),
FlinkDeployMode.of(application.getDeployMode()),
getProperties(application, dynamicProperties),
flinkEnv.getFlinkConf(),
FlinkJobType.of(application.getJobType()),
application.getId(),
new JobID().toHexString(),
application.getJobName(),
appConf,
application.getApplicationType(),
getSavepointPath(appParam),
FlinkRestoreMode.of(appParam.getRestoreMode()),
applicationArgs,
k8sClusterId,
application.getHadoopUser(),
buildResult,
extraParameter,
k8sNamespace,
exposedType);

CompletableFuture<SubmitResponse> future =
CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), executorService);

startFutureMap.put(application.getId(), future);

future.whenCompleteAsync(
(response, throwable) -> {
// 1) remove Future
startFutureMap.remove(application.getId());
// 2) exception
if (throwable != null) {
processForException(appParam, throwable, applicationLog, application);
return;
}
// 3) success
processForSuccess(appParam, response, applicationLog, application);
});
// Get the args after placeholder replacement
String args =
StringUtils.isBlank(appParam.getArgs()) ? application.getArgs() : appParam.getArgs();
String applicationArgs = variableService.replaceVariable(application.getTeamId(), args);

Tuple3<String, String, FlinkK8sRestExposedType> clusterIdNamespace =
getNamespaceClusterId(application);
String k8sNamespace = clusterIdNamespace.t1;
String k8sClusterId = clusterIdNamespace.t2;
FlinkK8sRestExposedType exposedType = clusterIdNamespace.t3;

String dynamicProperties =
StringUtils.isBlank(appParam.getDynamicProperties())
? application.getDynamicProperties()
: appParam.getDynamicProperties();

SubmitRequest submitRequest =
new SubmitRequest(
flinkEnv.getFlinkVersion(),
FlinkDeployMode.of(application.getDeployMode()),
getProperties(application, dynamicProperties),
flinkEnv.getFlinkConf(),
FlinkJobType.of(application.getJobType()),
application.getId(),
new JobID().toHexString(),
application.getJobName(),
appConf,
application.getApplicationType(),
getSavepointPath(appParam),
FlinkRestoreMode.of(appParam.getRestoreMode()),
applicationArgs,
k8sClusterId,
application.getHadoopUser(),
buildResult,
extraParameter,
k8sNamespace,
exposedType);

CompletableFuture<SubmitResponse> future =
CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), executorService);

startFutureMap.put(application.getId(), future);

future.whenCompleteAsync(
(response, throwable) -> {
// 1) remove Future
pendingStarts.remove(application.getId());
startFutureMap.remove(application.getId());
// 2) exception
if (throwable != null) {
processForException(appParam, throwable, applicationLog, application);
return;
}
// 3) success
processForSuccess(appParam, response, applicationLog, application);
});
startSubmitted = true;
} catch (Exception e) {
if (!startSubmitted) {
pendingStarts.remove(application.getId());
}
throw e;
}
}

@Nonnull
Expand Down Expand Up @@ -624,10 +690,13 @@
}
}

private void starting(FlinkApplication application) {
application.setState(FlinkAppStateEnum.STARTING.getValue());
application.setOptionTime(new Date());
updateById(application);
private boolean tryMarkStarting(Long appId) {
return this.lambdaUpdate()
.eq(FlinkApplication::getId, appId)
.in(FlinkApplication::getState, STARTABLE_STATES)
.set(FlinkApplication::getState, FlinkAppStateEnum.STARTING.getValue())
.set(FlinkApplication::getOptionTime, new Date())
.update();
}

private Tuple2<String, String> getUserJarAndAppConf(
Expand Down
Loading
Loading