diff --git a/bin/raydp-submit b/bin/raydp-submit index bd3a4de4..c236634f 100755 --- a/bin/raydp-submit +++ b/bin/raydp-submit @@ -143,6 +143,12 @@ added_args+=("spark.driver.host=$RAY_DRIVER_NODE_IP") added_args+=("--conf") added_args+=("spark.driver.bindAddress=$RAY_DRIVER_NODE_IP") +# JvmExitGuard: force JVM exit after Spark app ends if non-daemon threads keep it alive. +# Set RAYDP_JVM_EXIT_TIMEOUT env var to override the default timeout (300s) or disable (0). +if [ -n "$RAYDP_JVM_EXIT_TIMEOUT" ]; then + added_confs+=("-Draydp.jvm.exit.timeout=$RAYDP_JVM_EXIT_TIMEOUT") +fi + # Find the java binary if [ -n "${JAVA_HOME}" ]; then RUNNER="${JAVA_HOME}/bin/java" diff --git a/core/raydp-main/src/main/java/org/apache/spark/deploy/raydp/ExternalShuffleServiceUtils.java b/core/raydp-main/src/main/java/org/apache/spark/deploy/raydp/ExternalShuffleServiceUtils.java index ecaa747e..1b9b35d9 100644 --- a/core/raydp-main/src/main/java/org/apache/spark/deploy/raydp/ExternalShuffleServiceUtils.java +++ b/core/raydp-main/src/main/java/org/apache/spark/deploy/raydp/ExternalShuffleServiceUtils.java @@ -18,21 +18,31 @@ package org.apache.spark.deploy.raydp; import java.util.List; +import java.util.Optional; import io.ray.api.ActorHandle; import io.ray.api.Ray; public class ExternalShuffleServiceUtils { + private static String getShuffleServiceActorName(String node) { + return "raydp-shuffle-service-" + node.replace('.', '-'); + } + public static ActorHandle createShuffleService( String node, List options) { + String actorName = getShuffleServiceActorName(node); + Optional> existing = Ray.getActor(actorName); + if (existing.isPresent()) { + return existing.get(); + } + return Ray.actor(RayExternalShuffleService::new) + .setName(actorName) .setResource("node:" + node, 0.01) - .setJvmOptions(options).remote(); - } - - public static void startShuffleService( - ActorHandle handle) { - handle.task(RayExternalShuffleService::start).remote(); + .setJvmOptions(options) + .setMaxRestarts(-1) + .setMaxTaskRetries(-1) + .remote(); } public static void stopShuffleService( diff --git a/core/raydp-main/src/main/java/org/apache/spark/deploy/raydp/RayAppMasterUtils.java b/core/raydp-main/src/main/java/org/apache/spark/deploy/raydp/RayAppMasterUtils.java index c40873dc..5984d2a1 100644 --- a/core/raydp-main/src/main/java/org/apache/spark/deploy/raydp/RayAppMasterUtils.java +++ b/core/raydp-main/src/main/java/org/apache/spark/deploy/raydp/RayAppMasterUtils.java @@ -57,6 +57,16 @@ public static Map getRestartedExecutors( return handle.task(RayAppMaster::getRestartedExecutors).remote().get(); } + public static boolean finishApplication( + ActorHandle handle, + String appId, + String stateName, + int exitCode, + String diagnostics) { + return handle.task(RayAppMaster::finishApplication, appId, stateName, exitCode, diagnostics) + .remote().get(); + } + public static void stopAppMaster( ActorHandle handle) { handle.task(RayAppMaster::stop).remote().get(); diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 829517e5..be1dbaf3 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -50,6 +50,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl import org.apache.spark._ import org.apache.spark.api.r.RUtils +import org.apache.spark.deploy.raydp.{DriverAppMasterReporter, DriverExitState, JvmExitGuard} import org.apache.spark.deploy.rest._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -1011,6 +1012,20 @@ object SparkSubmit extends CommandLineUtils with Logging { private val CLASS_NOT_FOUND_EXIT_STATUS = 101 + private def finalizeDriverTermination(): Unit = { + val snapshot = DriverExitState.current() + if (DriverExitState.isTerminal(snapshot.state)) { + DriverAppMasterReporter.tryReportAndCleanup() + JvmExitGuard.arm(snapshot.exitCode) + } + } + + private def describeFailure(t: Throwable): String = { + val message = Option(t.getMessage).filter(_.nonEmpty) + .getOrElse("No additional diagnostics available.") + s"${t.getClass.getName}: $message" + } + // Following constants are visible for testing. private[deploy] val YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication" @@ -1020,6 +1035,19 @@ object SparkSubmit extends CommandLineUtils with Logging { "org.apache.spark.deploy.k8s.submit.KubernetesClientApplication" override def main(args: Array[String]): Unit = { + DriverExitState.reset() + DriverAppMasterReporter.reset() + val originalExitFn = exitFn + exitFn = (exitCode: Int) => { + if (exitCode == JvmExitGuard.EXIT_SUCCESS) { + DriverExitState.trySetFinished() + } else { + DriverExitState.trySetFailed(exitCode, s"SparkSubmit exited with status $exitCode") + } + finalizeDriverTermination() + originalExitFn(exitCode) + } + val submit = new SparkSubmit() { self => @@ -1050,7 +1078,18 @@ object SparkSubmit extends CommandLineUtils with Logging { } - submit.doSubmit(args) + try { + submit.doSubmit(args) + DriverExitState.trySetFinished() + finalizeDriverTermination() + } catch { + case t: Throwable => + DriverExitState.trySetFailed(JvmExitGuard.EXIT_APP_FAILED, describeFailure(t)) + finalizeDriverTermination() + throw t + } finally { + exitFn = originalExitFn + } } /** diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/ApplicationInfo.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/ApplicationInfo.scala index 4091ccdd..0e00fd53 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/ApplicationInfo.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/ApplicationInfo.scala @@ -25,6 +25,7 @@ import io.ray.api.ActorHandle import org.apache.spark.executor.RayDPExecutor import org.apache.spark.internal.Logging +import org.apache.spark.deploy.raydp.JvmExitGuard import org.apache.spark.raydp.RayExecutorUtils import org.apache.spark.resource.ResourceInformation import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} @@ -53,6 +54,8 @@ private[spark] class ApplicationInfo( var removedExecutors: ArrayBuffer[ExecutorDesc] = _ var coresGranted: Int = _ var endTime: Long = _ + var exitCode: Int = _ + var diagnostics: String = _ private var nextExecutorId: Int = _ // this only count those registered executors and minus removed executors private var registeredExecutors: Int = 0 @@ -65,6 +68,8 @@ private[spark] class ApplicationInfo( addressToExecutorId = new HashMap[RpcAddress, String] executorIdToHandler = new HashMap[String, ActorHandle[RayDPExecutor]] endTime = -1L + exitCode = JvmExitGuard.EXIT_SUCCESS + diagnostics = null nextExecutorId = 0 removedExecutors = new ArrayBuffer[ExecutorDesc] } @@ -165,9 +170,21 @@ private[spark] class ApplicationInfo( def resetRetryCount(): Unit = _retryCount = 0 + def finish(endState: ApplicationState.Value, endExitCode: Int, endDiagnostics: String): Boolean = + synchronized { + if (isFinished) { + false + } else { + state = endState + exitCode = endExitCode + diagnostics = endDiagnostics + endTime = System.currentTimeMillis() + true + } + } + def markFinished(endState: ApplicationState.Value): Unit = { - state = endState - endTime = System.currentTimeMillis() + finish(endState, JvmExitGuard.EXIT_SUCCESS, null) } def isFinished: Boolean = { diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/DriverAppMasterReporter.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/DriverAppMasterReporter.scala new file mode 100644 index 00000000..1747dca6 --- /dev/null +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/DriverAppMasterReporter.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.raydp + +import java.util.concurrent.atomic.AtomicBoolean + +import scala.util.control.NonFatal + +import io.ray.api.ActorHandle + +import org.apache.spark.internal.Logging + +object DriverAppMasterReporter extends Logging { + + private val reported = new AtomicBoolean(false) + + private var appId: String = null + private var masterHandle: ActorHandle[RayAppMaster] = null + + def reset(): Unit = synchronized { + reported.set(false) + appId = null + masterHandle = null + } + + def bind(appId: String): Unit = synchronized { + if (!reported.get()) { + this.appId = appId + } + } + + def bindMasterHandle(masterHandle: ActorHandle[RayAppMaster]): Unit = synchronized { + if (!reported.get()) { + this.masterHandle = masterHandle + } + } + + def tryReportAndCleanup(): Boolean = { + val snapshot = DriverExitState.current() + if (!DriverExitState.isTerminal(snapshot.state)) { + logDebug(s"Skip AppMaster report because driver state is not terminal: ${snapshot.state}") + false + } else { + val binding = synchronized { + if (!reported.compareAndSet(false, true)) None + else Some((appId, masterHandle)) + } + binding match { + case None => false + case Some((currentAppId, currentMasterHandle)) => + try { + if (currentAppId != null && currentMasterHandle != null) { + RayAppMasterUtils.finishApplication( + currentMasterHandle, + currentAppId, + snapshot.state.toString, + snapshot.exitCode, + snapshot.diagnostics) + } else { + logWarning("Skip reporting terminal application state because AppMaster binding " + + "is incomplete.") + } + } catch { + case NonFatal(e) => + logWarning("Failed to report terminal application state to AppMaster", e) + } finally { + if (currentMasterHandle != null) { + try { + RayAppMasterUtils.stopAppMaster(currentMasterHandle) + } catch { + case NonFatal(e) => + logWarning("Failed to stop AppMaster during driver cleanup", e) + } + } + synchronized { + appId = null + masterHandle = null + } + } + true + } + } + } +} diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/DriverExitState.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/DriverExitState.scala new file mode 100644 index 00000000..e3ebc6e4 --- /dev/null +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/DriverExitState.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.raydp + +object DriverExitState { + + case class Snapshot(state: ApplicationState.Value, exitCode: Int, diagnostics: String) + + private var snapshot = Snapshot(ApplicationState.UNKNOWN, JvmExitGuard.EXIT_SUCCESS, null) + + def reset(): Unit = synchronized { + snapshot = Snapshot(ApplicationState.UNKNOWN, JvmExitGuard.EXIT_SUCCESS, null) + } + + def current(): Snapshot = synchronized { + snapshot + } + + def isTerminal(state: ApplicationState.Value): Boolean = { + state == ApplicationState.FINISHED || + state == ApplicationState.FAILED || + state == ApplicationState.KILLED + } + + def trySetFinished(): Boolean = synchronized { + trySet(ApplicationState.FINISHED, JvmExitGuard.EXIT_SUCCESS, null) + } + + def trySetFailed(exitCode: Int, diagnostics: String): Boolean = synchronized { + trySet(ApplicationState.FAILED, normalizedFailureCode(exitCode), diagnostics) + } + + def trySetKilled(exitCode: Int, diagnostics: String): Boolean = synchronized { + val normalizedExitCode = if (exitCode == JvmExitGuard.EXIT_SUCCESS) { + JvmExitGuard.EXIT_KILLED + } else { + exitCode + } + trySet(ApplicationState.KILLED, normalizedExitCode, diagnostics) + } + + private def trySet( + state: ApplicationState.Value, + exitCode: Int, + diagnostics: String): Boolean = { + if (isTerminal(snapshot.state)) { + false + } else { + snapshot = Snapshot(state, exitCode, diagnostics) + true + } + } + + private def normalizedFailureCode(exitCode: Int): Int = { + if (exitCode == JvmExitGuard.EXIT_SUCCESS) { + JvmExitGuard.EXIT_APP_FAILED + } else { + exitCode + } + } +} diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/JvmExitGuard.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/JvmExitGuard.scala new file mode 100644 index 00000000..2c35a9c6 --- /dev/null +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/JvmExitGuard.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.raydp + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils +import scala.concurrent.duration._ + +/** + * Force JVM process to exit after the Spark application ends, even when non-daemon + * threads (e.g., Hudi Embedded Timeline Server) prevent natural JVM termination. + * + * When running Spark on Ray via `raydp-submit`, the JVM process started by the shell + * script may hang after `SparkContext.stop()` because certain non-daemon threads keep + * the JVM alive. KubeRay RayJob relies on the entrypoint process exiting to determine + * job completion, so this causes RayJobs to stay in RUNNING state indefinitely. + * + * This guard starts a daemon countdown thread when the driver reaches a terminal state. + * If the JVM hasn't exited within the configured timeout, it calls `System.exit()` to + * force termination. + * + * Configuration via JVM system properties: + * -Draydp.jvm.exit.timeout=300 (seconds, default: 300, 0 = disabled) + */ +object JvmExitGuard extends Logging { + + val EXIT_SUCCESS = 0 + val EXIT_APP_FAILED = 1 + val EXIT_KILLED = 143 + + /** Timeout in seconds before forced exit. Read from system property, default 300. */ + val EXIT_TIMEOUT_SEC: Int = { + val value = System.getProperty("raydp.jvm.exit.timeout", "300") + try { + value.toInt + } catch { + case _: NumberFormatException => + logWarning(s"Invalid raydp.jvm.exit.timeout value: $value, using default 300") + 300 + } + } + + /** Guard state, protected by synchronized on the singleton object. */ + private var triggered = false + private var exitCode = EXIT_SUCCESS + + /** + * Arm the delayed exit guard after the driver reaches a terminal state. + */ + def arm(appExitCode: Int = EXIT_SUCCESS): Unit = { + if (EXIT_TIMEOUT_SEC <= 0) { + logInfo("JvmExitGuard: disabled (raydp.jvm.exit.timeout <= 0)") + return + } + + JvmExitGuard.synchronized { + if (triggered) { + logDebug("JvmExitGuard: already triggered, ignoring duplicate call") + return + } + triggered = true + exitCode = appExitCode + } + + logInfo(s"JvmExitGuard: armed delayed JVM exit (exitCode=$exitCode). " + + s"Will force JVM exit in ${EXIT_TIMEOUT_SEC}s if it doesn't terminate naturally. " + + s"(Configure via -Draydp.jvm.exit.timeout=N, 0=disabled)") + + // Daemon thread: does NOT prevent JVM from exiting naturally. + // If the JVM exits on its own before the timeout, this thread is + // simply terminated along with the JVM. + val guardThread = new Thread(new Runnable { + override def run(): Unit = { + try { + val deadline = System.nanoTime() + EXIT_TIMEOUT_SEC.seconds.toNanos + while (System.nanoTime() < deadline) { + Thread.sleep(1000) + } + + logWarning(s"JvmExitGuard: JVM still alive after ${EXIT_TIMEOUT_SEC}s. " + + s"Forcing System.exit($exitCode).") + + try { + System.exit(exitCode) + } catch { + case _: IllegalStateException => + // JVM is already in shutdown sequence; fall back to Runtime.halt() + // which bypasses all shutdown hooks and terminates immediately. + logWarning("JvmExitGuard: System.exit() failed (already shutting down), " + + "falling back to Runtime.halt()") + Runtime.getRuntime.halt(exitCode) + } + } catch { + case _: InterruptedException => + logDebug("JvmExitGuard: interrupted, JVM likely exited naturally") + case e: Throwable => + // If the guard thread itself is failing, force halt as last resort. + logError(s"JvmExitGuard: unexpected error in guard thread", e) + try { + Runtime.getRuntime.halt(exitCode) + } catch { + case _: Throwable => // give up + } + } + } + }, "jvm-exit-guard") + + guardThread.setDaemon(true) + guardThread.start() + logDebug("JvmExitGuard: countdown thread started") + } +} diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/Messages.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/Messages.scala index 15e8ffc5..77838c48 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/Messages.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/Messages.scala @@ -26,7 +26,11 @@ case class RegisterApplication(appDescription: ApplicationDescription, driver: R case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends RayDPDeployMessage -case class UnregisterApplication(appId: String) extends RayDPDeployMessage +case class FinishApplication( + appId: String, + state: ApplicationState.Value, + exitCode: Int, + diagnostics: String) extends RayDPDeployMessage case class RegisterExecutor(executorId: String, nodeIp: String) extends RayDPDeployMessage diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala index f4cc823d..752d2468 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala @@ -83,6 +83,15 @@ class RayAppMaster(host: String, def getRestartedExecutors(): java.util.Map[String, String] = restartedExecutors.asJava + def finishApplication( + appId: String, + stateName: String, + exitCode: Int, + diagnostics: String): Boolean = { + endpoint.askSync[Boolean]( + FinishApplication(appId, ApplicationState.withName(stateName), exitCode, diagnostics)) + } + /** * This is used to represent the Spark on Ray cluster URL. */ @@ -141,13 +150,13 @@ class RayAppMaster(host: String, logInfo("Registered app " + appDescription.name + " with ID " + app.id) driver.send(RegisteredApplication(app.id, self)) schedule() - - case UnregisterApplication(appId) => - assert(appInfo != null && appInfo.id == appId) - appInfo.markFinished(ApplicationState.FINISHED) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case FinishApplication(appId, state, exitCode, diagnostics) => + assert(appInfo != null && appInfo.id == appId) + context.reply(appInfo.finish(state, exitCode, diagnostics)) + case RegisterExecutor(executorId, executorIp) => val success = appInfo.registerExecutor(executorId) if (success) { diff --git a/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala b/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala index dce83a78..daf8f0b3 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala @@ -59,7 +59,12 @@ class RayCoarseGrainedSchedulerBackend( private val launcherBackend = new LauncherBackend() { override protected def conf: SparkConf = sc.conf - override protected def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED) + override protected def onStopRequest(): Unit = { + DriverExitState.trySetKilled( + JvmExitGuard.EXIT_KILLED, + "Spark launcher requested application stop.") + stop(SparkAppHandle.State.KILLED) + } } def prependPreferPath(cp: String): String = { @@ -93,6 +98,7 @@ class RayCoarseGrainedSchedulerBackend( masterHandle = RayAppMasterUtils.createAppMaster(cp, null, options.toBuffer.asJava, appMasterResources.toMap.asJava) + DriverAppMasterReporter.bindMasterHandle(masterHandle) uri = new URI(RayAppMasterUtils.getMasterUrl(masterHandle)) } else { uri = new URI(sparkUrl) @@ -195,9 +201,6 @@ class RayCoarseGrainedSchedulerBackend( override def stop(): Unit = { stop(SparkAppHandle.State.FINISHED) - if (masterHandle != null) { - RayAppMasterUtils.stopAppMaster(masterHandle) - } } def parseRayDPResourceRequirements(sparkConf: SparkConf): Map[String, Double] = { @@ -259,6 +262,7 @@ class RayCoarseGrainedSchedulerBackend( appId.set(id) launcherBackend.setAppId(id) appMasterRef.set(ref) + DriverAppMasterReporter.bind(id) registrationBarrier.release() } @@ -304,7 +308,10 @@ class RayCoarseGrainedSchedulerBackend( if (stopped.compareAndSet(false, true)) { try { super.stop() // this will stop all executors - appMasterRef.get.send(UnregisterApplication(appId.get)) + if (finalState == SparkAppHandle.State.KILLED) { + DriverAppMasterReporter.tryReportAndCleanup() + JvmExitGuard.arm(DriverExitState.current().exitCode) + } } finally { appMasterRef.set(null) launcherBackend.setState(finalState)