diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java index d79911d4cf..021dea505f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java @@ -111,6 +111,8 @@ import java.io.File; import java.net.URI; +import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.EnumSet; import java.util.HashMap; @@ -184,12 +186,31 @@ public class FlinkApplicationActionServiceImpl @Autowired private FlinkK8sWatcherWrapper k8sWatcherWrapper; + private static final List STARTABLE_STATES = + Collections.unmodifiableList( + Arrays.asList( + FlinkAppStateEnum.ADDED.getValue(), + FlinkAppStateEnum.CREATED.getValue(), + FlinkAppStateEnum.FAILED.getValue(), + FlinkAppStateEnum.CANCELED.getValue(), + FlinkAppStateEnum.FINISHED.getValue(), + FlinkAppStateEnum.LOST.getValue(), + FlinkAppStateEnum.TERMINATED.getValue(), + FlinkAppStateEnum.SUCCEEDED.getValue(), + FlinkAppStateEnum.KILLED.getValue(), + FlinkAppStateEnum.POS_TERMINATED.getValue())); + private final Map> startFutureMap = new ConcurrentHashMap<>(); private final Map> cancelFutureMap = new ConcurrentHashMap<>(); + /** Guard against concurrent start/cancel requests for the same application. */ + private final Set pendingStarts = ConcurrentHashMap.newKeySet(); + + private final Set pendingCancels = ConcurrentHashMap.newKeySet(); + @Override public void revoke(Long appId) throws ApplicationException { FlinkApplication application = getById(appId); @@ -244,8 +265,37 @@ public void abort(Long id) { @Override public void cancel(FlinkApplication appParam) throws Exception { + if (!pendingCancels.add(appParam.getId())) { + log.warn( + "[StreamPark] Cancel is already in progress for application id={}", + appParam.getId()); + return; + } + FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.CANCELLING); FlinkApplication application = getById(appParam.getId()); + + Date optionTime = new Date(); + Integer optionState = + appParam.getRestoreOrTriggerSavepoint() + ? OptionStateEnum.SAVEPOINTING.getValue() + : OptionStateEnum.CANCELLING.getValue(); + boolean stateUpdated = + this.lambdaUpdate() + .eq(FlinkApplication::getId, appParam.getId()) + .ne(FlinkApplication::getState, FlinkAppStateEnum.CANCELLING.getValue()) + .set(FlinkApplication::getState, FlinkAppStateEnum.CANCELLING.getValue()) + .set(FlinkApplication::getOptionTime, optionTime) + .set(FlinkApplication::getOptionState, optionState) + .update(); + if (!stateUpdated) { + pendingCancels.remove(appParam.getId()); + log.warn( + "[StreamPark] Application id={} is already cancelling or not cancellable", + appParam.getId()); + return; + } + application.setState(FlinkAppStateEnum.CANCELLING.getValue()); ApplicationLog applicationLog = new ApplicationLog(); @@ -259,13 +309,9 @@ public void cancel(FlinkApplication appParam) throws Exception { if (appParam.getRestoreOrTriggerSavepoint()) { FlinkAppHttpWatcher.addSavepoint(application.getId()); - application.setOptionState(OptionStateEnum.SAVEPOINTING.getValue()); - } else { - application.setOptionState(OptionStateEnum.CANCELLING.getValue()); } - - application.setOptionTime(new Date()); - this.baseMapper.updateById(application); + application.setOptionState(optionState); + application.setOptionTime(optionTime); Long userId = ServiceHelper.getUserId(); if (!application.getUserId().equals(userId)) { @@ -325,6 +371,7 @@ public void cancel(FlinkApplication appParam) throws Exception { cancelFuture.whenCompleteAsync( (cancelResponse, throwable) -> { + pendingCancels.remove(application.getId()); cancelFutureMap.remove(application.getId()); if (throwable != null) { @@ -384,115 +431,134 @@ public void start(FlinkApplication appParam, boolean auto) throws Exception { // 1) check application final FlinkApplication application = getById(appParam.getId()); AssertUtils.notNull(application); - ApiAlertException.throwIfTrue( - !application.isCanBeStart(), "[StreamPark] The application cannot be started repeatedly."); - - if (FlinkDeployMode.isRemoteMode(application.getDeployModeEnum()) - || FlinkDeployMode.isSessionMode(application.getDeployModeEnum())) { - checkBeforeStart(application); - } - - if (FlinkDeployMode.isYarnMode(application.getDeployModeEnum())) { + if (!pendingStarts.add(application.getId())) { ApiAlertException.throwIfTrue( - !applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(), - "[StreamPark] The same task name is already running in the yarn queue"); + true, "[StreamPark] The application cannot be started repeatedly."); } - ApplicationBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId()); - AssertUtils.notNull(buildPipeline); - - FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId()); - ApiAlertException.throwIfNull(flinkEnv, "[StreamPark] can no found flink version"); - - // if manually started, clear the restart flag - if (!auto) { - application.setRestartCount(0); - } else { - if (!application.isNeedRestartOnFailed()) { - return; + boolean startSubmitted = false; + try { + if (FlinkDeployMode.isRemoteMode(application.getDeployModeEnum()) + || FlinkDeployMode.isSessionMode(application.getDeployModeEnum())) { + checkBeforeStart(application); } - appParam.setRestoreOrTriggerSavepoint(true); - application.setRestartCount(application.getRestartCount() + 1); - } - // 2) update app state to starting... - starting(application); - ApplicationLog applicationLog = constructAppLog(application); - // set the latest to Effective, (it will only become the current effective at this time) - applicationManageService.toEffective(application); - Map extraParameter = new HashMap<>(0); - if (application.isFlinkSql()) { - FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true); - // Get the sql of the replaced placeholder - String realSql = variableService.replaceVariable(application.getTeamId(), flinkSql.getSql()); - flinkSql.setSql(DeflaterUtils.zipString(realSql)); - extraParameter.put(ConfigKeys.KEY_FLINK_SQL(null), flinkSql.getSql()); - } + if (FlinkDeployMode.isYarnMode(application.getDeployModeEnum())) { + ApiAlertException.throwIfTrue( + !applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(), + "[StreamPark] The same task name is already running in the yarn queue"); + } - Tuple2 userJarAndAppConf = getUserJarAndAppConf(flinkEnv, application); - String flinkUserJar = userJarAndAppConf.t1; - String appConf = userJarAndAppConf.t2; + ApplicationBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId()); + AssertUtils.notNull(buildPipeline); - BuildResult buildResult = buildPipeline.getBuildResult(); - if (FlinkDeployMode.YARN_APPLICATION == application.getDeployModeEnum()) { - buildResult = new ShadedBuildResponse(null, flinkUserJar, true); - } + FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId()); + ApiAlertException.throwIfNull(flinkEnv, "[StreamPark] can no found flink version"); - // Get the args after placeholder replacement - String args = - StringUtils.isBlank(appParam.getArgs()) ? application.getArgs() : appParam.getArgs(); - String applicationArgs = variableService.replaceVariable(application.getTeamId(), args); + // if manually started, clear the restart flag + if (!auto) { + application.setRestartCount(0); + } else { + if (!application.isNeedRestartOnFailed()) { + pendingStarts.remove(application.getId()); + return; + } + appParam.setRestoreOrTriggerSavepoint(true); + application.setRestartCount(application.getRestartCount() + 1); + } + // 2) atomically update app state to starting + if (!tryMarkStarting(application.getId())) { + pendingStarts.remove(application.getId()); + ApiAlertException.throwIfTrue( + true, "[StreamPark] The application cannot be started repeatedly."); + } + application.setState(FlinkAppStateEnum.STARTING.getValue()); + application.setOptionTime(new Date()); + ApplicationLog applicationLog = constructAppLog(application); + // set the latest to Effective, (it will only become the current effective at this time) + applicationManageService.toEffective(application); + + Map extraParameter = new HashMap<>(0); + if (application.isFlinkSql()) { + FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true); + // Get the sql of the replaced placeholder + String realSql = variableService.replaceVariable(application.getTeamId(), flinkSql.getSql()); + flinkSql.setSql(DeflaterUtils.zipString(realSql)); + extraParameter.put(ConfigKeys.KEY_FLINK_SQL(null), flinkSql.getSql()); + } - Tuple3 clusterIdNamespace = - getNamespaceClusterId(application); - String k8sNamespace = clusterIdNamespace.t1; - String k8sClusterId = clusterIdNamespace.t2; - FlinkK8sRestExposedType exposedType = clusterIdNamespace.t3; + Tuple2 userJarAndAppConf = getUserJarAndAppConf(flinkEnv, application); + String flinkUserJar = userJarAndAppConf.t1; + String appConf = userJarAndAppConf.t2; - String dynamicProperties = - StringUtils.isBlank(appParam.getDynamicProperties()) - ? application.getDynamicProperties() - : appParam.getDynamicProperties(); + BuildResult buildResult = buildPipeline.getBuildResult(); + if (FlinkDeployMode.YARN_APPLICATION == application.getDeployModeEnum()) { + buildResult = new ShadedBuildResponse(null, flinkUserJar, true); + } - SubmitRequest submitRequest = - new SubmitRequest( - flinkEnv.getFlinkVersion(), - FlinkDeployMode.of(application.getDeployMode()), - getProperties(application, dynamicProperties), - flinkEnv.getFlinkConf(), - FlinkJobType.of(application.getJobType()), - application.getId(), - new JobID().toHexString(), - application.getJobName(), - appConf, - application.getApplicationType(), - getSavepointPath(appParam), - FlinkRestoreMode.of(appParam.getRestoreMode()), - applicationArgs, - k8sClusterId, - application.getHadoopUser(), - buildResult, - extraParameter, - k8sNamespace, - exposedType); - - CompletableFuture future = - CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), executorService); - - startFutureMap.put(application.getId(), future); - - future.whenCompleteAsync( - (response, throwable) -> { - // 1) remove Future - startFutureMap.remove(application.getId()); - // 2) exception - if (throwable != null) { - processForException(appParam, throwable, applicationLog, application); - return; - } - // 3) success - processForSuccess(appParam, response, applicationLog, application); - }); + // Get the args after placeholder replacement + String args = + StringUtils.isBlank(appParam.getArgs()) ? application.getArgs() : appParam.getArgs(); + String applicationArgs = variableService.replaceVariable(application.getTeamId(), args); + + Tuple3 clusterIdNamespace = + getNamespaceClusterId(application); + String k8sNamespace = clusterIdNamespace.t1; + String k8sClusterId = clusterIdNamespace.t2; + FlinkK8sRestExposedType exposedType = clusterIdNamespace.t3; + + String dynamicProperties = + StringUtils.isBlank(appParam.getDynamicProperties()) + ? application.getDynamicProperties() + : appParam.getDynamicProperties(); + + SubmitRequest submitRequest = + new SubmitRequest( + flinkEnv.getFlinkVersion(), + FlinkDeployMode.of(application.getDeployMode()), + getProperties(application, dynamicProperties), + flinkEnv.getFlinkConf(), + FlinkJobType.of(application.getJobType()), + application.getId(), + new JobID().toHexString(), + application.getJobName(), + appConf, + application.getApplicationType(), + getSavepointPath(appParam), + FlinkRestoreMode.of(appParam.getRestoreMode()), + applicationArgs, + k8sClusterId, + application.getHadoopUser(), + buildResult, + extraParameter, + k8sNamespace, + exposedType); + + CompletableFuture future = + CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), executorService); + + startFutureMap.put(application.getId(), future); + + future.whenCompleteAsync( + (response, throwable) -> { + // 1) remove Future + pendingStarts.remove(application.getId()); + startFutureMap.remove(application.getId()); + // 2) exception + if (throwable != null) { + processForException(appParam, throwable, applicationLog, application); + return; + } + // 3) success + processForSuccess(appParam, response, applicationLog, application); + }); + startSubmitted = true; + } catch (Exception e) { + if (!startSubmitted) { + pendingStarts.remove(application.getId()); + } + throw e; + } } @Nonnull @@ -624,10 +690,13 @@ private boolean checkAppRepeatInYarn(String jobName) { } } - private void starting(FlinkApplication application) { - application.setState(FlinkAppStateEnum.STARTING.getValue()); - application.setOptionTime(new Date()); - updateById(application); + private boolean tryMarkStarting(Long appId) { + return this.lambdaUpdate() + .eq(FlinkApplication::getId, appId) + .in(FlinkApplication::getState, STARTABLE_STATES) + .set(FlinkApplication::getState, FlinkAppStateEnum.STARTING.getValue()) + .set(FlinkApplication::getOptionTime, new Date()) + .update(); } private Tuple2 getUserJarAndAppConf( diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java index 8f40380901..217d614dd6 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java @@ -81,6 +81,8 @@ import org.springframework.stereotype.Service; import java.io.File; +import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.EnumSet; import java.util.HashMap; @@ -130,10 +132,24 @@ public class SparkApplicationActionServiceImpl @Autowired private ResourceService resourceService; + private static final List STARTABLE_STATES = + Collections.unmodifiableList( + Arrays.asList( + SparkAppStateEnum.ADDED.getValue(), + SparkAppStateEnum.FAILED.getValue(), + SparkAppStateEnum.FINISHED.getValue(), + SparkAppStateEnum.LOST.getValue(), + SparkAppStateEnum.SUCCEEDED.getValue(), + SparkAppStateEnum.KILLED.getValue())); + private final Map> startJobFutureMap = new ConcurrentHashMap<>(); private final Map> cancelJobFutureMap = new ConcurrentHashMap<>(); + private final Set pendingStarts = ConcurrentHashMap.newKeySet(); + + private final Set pendingCancels = ConcurrentHashMap.newKeySet(); + @Override public void revoke(Long appId) throws ApplicationException { SparkApplication application = getById(appId); @@ -184,8 +200,32 @@ public void forcedStop(Long id) { @Override public void cancel(SparkApplication appParam) throws Exception { + if (!pendingCancels.add(appParam.getId())) { + log.warn( + "[StreamPark] Cancel is already in progress for application id={}", + appParam.getId()); + return; + } + SparkAppHttpWatcher.setOptionState(appParam.getId(), SparkOptionStateEnum.STOPPING); SparkApplication application = getById(appParam.getId()); + + Date optionTime = new Date(); + boolean stateUpdated = + this.lambdaUpdate() + .eq(SparkApplication::getId, appParam.getId()) + .ne(SparkApplication::getState, SparkAppStateEnum.STOPPING.getValue()) + .set(SparkApplication::getState, SparkAppStateEnum.STOPPING.getValue()) + .set(SparkApplication::getOptionTime, optionTime) + .update(); + if (!stateUpdated) { + pendingCancels.remove(appParam.getId()); + log.warn( + "[StreamPark] Application id={} is already stopping or not cancellable", + appParam.getId()); + return; + } + application.setState(SparkAppStateEnum.STOPPING.getValue()); ApplicationLog applicationLog = new ApplicationLog(); @@ -195,8 +235,7 @@ public void cancel(SparkApplication appParam) throws Exception { applicationLog.setCreateTime(new Date()); applicationLog.setClusterId(application.getClusterId()); applicationLog.setUserId(ServiceHelper.getUserId()); - application.setOptionTime(new Date()); - this.baseMapper.updateById(application); + application.setOptionTime(optionTime); Long userId = ServiceHelper.getUserId(); if (!application.getUserId().equals(userId)) { @@ -221,6 +260,7 @@ public void cancel(SparkApplication appParam) throws Exception { cancelJobFutureMap.put(application.getId(), stopFuture); stopFuture.whenComplete( (cancelResponse, throwable) -> { + pendingCancels.remove(application.getId()); cancelJobFutureMap.remove(application.getId()); if (throwable != null) { String exception = ExceptionUtils.stringifyException(throwable); @@ -250,132 +290,150 @@ public void start(SparkApplication appParam, boolean auto) throws Exception { // 1) check application final SparkApplication application = getById(appParam.getId()); AssertUtils.notNull(application); - ApiAlertException.throwIfTrue( - !application.isCanBeStart(), "[StreamPark] The application cannot be started repeatedly."); - - SparkEnv sparkEnv = sparkEnvService.getByIdOrDefault(application.getVersionId()); - ApiAlertException.throwIfNull(sparkEnv, "[StreamPark] can no found spark version"); - - if (SparkDeployMode.isYarnMode(application.getDeployModeEnum())) { - checkYarnBeforeStart(application); + if (!pendingStarts.add(application.getId())) { + ApiAlertException.throwIfTrue( + true, "[StreamPark] The application cannot be started repeatedly."); } - ApplicationBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId()); - AssertUtils.notNull(buildPipeline); + boolean startSubmitted = false; + try { + SparkEnv sparkEnv = sparkEnvService.getByIdOrDefault(application.getVersionId()); + ApiAlertException.throwIfNull(sparkEnv, "[StreamPark] can no found spark version"); - // if manually started, clear the restart flag - if (!auto) { - application.setRestartCount(0); - } else { - if (!application.isNeedRestartOnFailed()) { - return; + if (SparkDeployMode.isYarnMode(application.getDeployModeEnum())) { + checkYarnBeforeStart(application); } - application.setRestartCount(application.getRestartCount() + 1); - } - - // 2) update app state to starting... - starting(application); - ApplicationLog applicationLog = new ApplicationLog(); - applicationLog.setJobType(EngineTypeEnum.SPARK.getCode()); - applicationLog.setOptionName(SparkOperationEnum.START.getValue()); - applicationLog.setAppId(application.getId()); - applicationLog.setCreateTime(new Date()); - applicationLog.setUserId(ServiceHelper.getUserId()); + ApplicationBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId()); + AssertUtils.notNull(buildPipeline); - // set the latest to Effective, (it will only become the current effective at this time) - // applicationManageService.toEffective(application); + // if manually started, clear the restart flag + if (!auto) { + application.setRestartCount(0); + } else { + if (!application.isNeedRestartOnFailed()) { + pendingStarts.remove(application.getId()); + return; + } + application.setRestartCount(application.getRestartCount() + 1); + } - Map extraParameter = new HashMap<>(0); - if (application.isSparkSqlJob()) { - SparkSql sparkSql = sparkSqlService.getEffective(application.getId(), true); - // Get the sql of the replaced placeholder - String realSql = variableService.replaceVariable(application.getTeamId(), sparkSql.getSql()); - sparkSql.setSql(DeflaterUtils.zipString(realSql)); - extraParameter.put(ConfigKeys.KEY_SPARK_SQL(null), sparkSql.getSql()); - } + if (!tryMarkStarting(application.getId())) { + pendingStarts.remove(application.getId()); + ApiAlertException.throwIfTrue( + true, "[StreamPark] The application cannot be started repeatedly."); + } + application.setState(SparkAppStateEnum.STARTING.getValue()); + application.setOptionTime(new Date()); + + ApplicationLog applicationLog = new ApplicationLog(); + applicationLog.setJobType(EngineTypeEnum.SPARK.getCode()); + applicationLog.setOptionName(SparkOperationEnum.START.getValue()); + applicationLog.setAppId(application.getId()); + applicationLog.setCreateTime(new Date()); + applicationLog.setUserId(ServiceHelper.getUserId()); + + // set the latest to Effective, (it will only become the current effective at this time) + // applicationManageService.toEffective(application); + + Map extraParameter = new HashMap<>(0); + if (application.isSparkSqlJob()) { + SparkSql sparkSql = sparkSqlService.getEffective(application.getId(), true); + // Get the sql of the replaced placeholder + String realSql = variableService.replaceVariable(application.getTeamId(), sparkSql.getSql()); + sparkSql.setSql(DeflaterUtils.zipString(realSql)); + extraParameter.put(ConfigKeys.KEY_SPARK_SQL(null), sparkSql.getSql()); + } - Tuple2 userJarAndAppConf = getUserJarAndAppConf(sparkEnv, application); - String sparkUserJar = userJarAndAppConf.f0; - String appConf = userJarAndAppConf.f1; + Tuple2 userJarAndAppConf = getUserJarAndAppConf(sparkEnv, application); + String sparkUserJar = userJarAndAppConf.f0; + String appConf = userJarAndAppConf.f1; - BuildResult buildResult = buildPipeline.getBuildResult(); - if (SparkDeployMode.isYarnMode(application.getDeployModeEnum())) { - buildResult = new ShadedBuildResponse(null, sparkUserJar, true); - if (StringUtils.isNotBlank(application.getYarnQueueName())) { - extraParameter.put(ConfigKeys.KEY_SPARK_YARN_QUEUE_NAME(), application.getYarnQueueName()); - } - if (StringUtils.isNotBlank(application.getYarnQueueLabel())) { - extraParameter.put(ConfigKeys.KEY_SPARK_YARN_QUEUE_LABEL(), application.getYarnQueueLabel()); + BuildResult buildResult = buildPipeline.getBuildResult(); + if (SparkDeployMode.isYarnMode(application.getDeployModeEnum())) { + buildResult = new ShadedBuildResponse(null, sparkUserJar, true); + if (StringUtils.isNotBlank(application.getYarnQueueName())) { + extraParameter.put(ConfigKeys.KEY_SPARK_YARN_QUEUE_NAME(), application.getYarnQueueName()); + } + if (StringUtils.isNotBlank(application.getYarnQueueLabel())) { + extraParameter.put(ConfigKeys.KEY_SPARK_YARN_QUEUE_LABEL(), application.getYarnQueueLabel()); + } } - } - // Get the args after placeholder replacement - String applicationArgs = variableService.replaceVariable(application.getTeamId(), application.getAppArgs()); - - SubmitRequest submitRequest = new SubmitRequest( - sparkEnv.getSparkVersion(), - SparkDeployMode.of(application.getDeployMode()), - sparkEnv.getSparkConf(), - SparkJobType.valueOf(application.getJobType()), - application.getId(), - application.getAppName(), - application.getMainClass(), - appConf, - SparkConfigurationUtils.extractPropertiesAsJava(application.getAppProperties()), - SparkConfigurationUtils.extractArgumentsAsJava(applicationArgs), - application.getApplicationType(), - application.getHadoopUser(), - buildResult, - extraParameter); - - CompletableFuture future = CompletableFuture - .supplyAsync(() -> SparkClient.submit(submitRequest), executorService); - - startJobFutureMap.put(application.getId(), future); - future.whenComplete( - (response, throwable) -> { - // 1) remove Future - startJobFutureMap.remove(application.getId()); - - // 2) exception - if (throwable != null) { - String exception = ExceptionUtils.stringifyException(throwable); - applicationLog.setException(exception); - applicationLog.setSuccess(false); - applicationLogService.save(applicationLog); - if (throwable instanceof CancellationException) { - doStopped(application.getId()); - } else { - SparkApplication app = getById(appParam.getId()); - app.setState(SparkAppStateEnum.FAILED.getValue()); - app.setOptionState(SparkOptionStateEnum.NONE.getValue()); - updateById(app); - SparkAppHttpWatcher.unWatching(appParam.getId()); + // Get the args after placeholder replacement + String applicationArgs = variableService.replaceVariable(application.getTeamId(), application.getAppArgs()); + + SubmitRequest submitRequest = new SubmitRequest( + sparkEnv.getSparkVersion(), + SparkDeployMode.of(application.getDeployMode()), + sparkEnv.getSparkConf(), + SparkJobType.valueOf(application.getJobType()), + application.getId(), + application.getAppName(), + application.getMainClass(), + appConf, + SparkConfigurationUtils.extractPropertiesAsJava(application.getAppProperties()), + SparkConfigurationUtils.extractArgumentsAsJava(applicationArgs), + application.getApplicationType(), + application.getHadoopUser(), + buildResult, + extraParameter); + + CompletableFuture future = CompletableFuture + .supplyAsync(() -> SparkClient.submit(submitRequest), executorService); + + startJobFutureMap.put(application.getId(), future); + future.whenComplete( + (response, throwable) -> { + // 1) remove Future + pendingStarts.remove(application.getId()); + startJobFutureMap.remove(application.getId()); + + // 2) exception + if (throwable != null) { + String exception = ExceptionUtils.stringifyException(throwable); + applicationLog.setException(exception); + applicationLog.setSuccess(false); + applicationLogService.save(applicationLog); + if (throwable instanceof CancellationException) { + doStopped(application.getId()); + } else { + SparkApplication app = getById(appParam.getId()); + app.setState(SparkAppStateEnum.FAILED.getValue()); + app.setOptionState(SparkOptionStateEnum.NONE.getValue()); + updateById(app); + SparkAppHttpWatcher.unWatching(appParam.getId()); + } + return; } - return; - } - // 3) success - applicationLog.setSuccess(true); - application.resolveScheduleConf(response.sparkProperties()); - if (StringUtils.isNoneEmpty(response.sparkAppId())) { - application.setClusterId(response.sparkAppId()); - } - applicationLog.setClusterId(response.sparkAppId()); - applicationLog.setTrackingUrl(response.trackingUrl()); - application.setStartTime(new Date()); - application.setEndTime(null); - - // if start completed, will be added task to tracking queue - SparkAppHttpWatcher.setOptionState(appParam.getId(), SparkOptionStateEnum.STARTING); - SparkAppHttpWatcher.doWatching(application); - - // update app - updateById(application); - // save log - applicationLogService.save(applicationLog); - }); + // 3) success + applicationLog.setSuccess(true); + application.resolveScheduleConf(response.sparkProperties()); + if (StringUtils.isNoneEmpty(response.sparkAppId())) { + application.setClusterId(response.sparkAppId()); + } + applicationLog.setClusterId(response.sparkAppId()); + applicationLog.setTrackingUrl(response.trackingUrl()); + application.setStartTime(new Date()); + application.setEndTime(null); + + // if start completed, will be added task to tracking queue + SparkAppHttpWatcher.setOptionState(appParam.getId(), SparkOptionStateEnum.STARTING); + SparkAppHttpWatcher.doWatching(application); + + // update app + updateById(application); + // save log + applicationLogService.save(applicationLog); + }); + startSubmitted = true; + } catch (Exception e) { + if (!startSubmitted) { + pendingStarts.remove(application.getId()); + } + throw e; + } } /** @@ -403,10 +461,13 @@ private boolean checkAppRepeatInYarn(String jobName) { } } - private void starting(SparkApplication application) { - application.setState(SparkAppStateEnum.STARTING.getValue()); - application.setOptionTime(new Date()); - updateById(application); + private boolean tryMarkStarting(Long appId) { + return this.lambdaUpdate() + .eq(SparkApplication::getId, appId) + .in(SparkApplication::getState, STARTABLE_STATES) + .set(SparkApplication::getState, SparkAppStateEnum.STARTING.getValue()) + .set(SparkApplication::getOptionTime, new Date()) + .update(); } private Tuple2 getUserJarAndAppConf(