Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,32 @@
package org.apache.spark.deploy.raydp;

import java.util.List;
import java.util.Optional;

import io.ray.api.ActorHandle;
import io.ray.api.Ray;

public class ExternalShuffleServiceUtils {

private static String getShuffleServiceActorName(String node) {
return "raydp-shuffle-service-" + node.replace('.', '-');
}

public static ActorHandle<RayExternalShuffleService> createShuffleService(
String node, List<String> options) {
String actorName = getShuffleServiceActorName(node);
Optional<ActorHandle<RayExternalShuffleService>> existing = Ray.getActor(actorName);
if (existing.isPresent()) {
return existing.get();
}

return Ray.actor(RayExternalShuffleService::new)
.setName(actorName)
.setResource("node:" + node, 0.01)
.setJvmOptions(options).remote();
}

public static void startShuffleService(
ActorHandle<RayExternalShuffleService> handle) {
handle.task(RayExternalShuffleService::start).remote();
.setJvmOptions(options)
.setMaxRestarts(-1)
.setMaxTaskRetries(-1)
.remote();
}

public static void stopShuffleService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,12 @@ class RayAppMaster(host: String,
case RegisterExecutor(executorId, executorIp) =>
val success = appInfo.registerExecutor(executorId)
if (success) {
// external shuffle service is enabled
if (conf.getBoolean("spark.shuffle.service.enabled", false)) {
// the node executor is in has not started shuffle service
if (!nodesWithShuffleService.contains(executorIp)) {
logInfo(s"Starting shuffle service on ${executorIp}")
val service = ExternalShuffleServiceUtils.createShuffleService(
executorIp, shuffleServiceOptions.toBuffer.asJava)
ExternalShuffleServiceUtils.startShuffleService(service)
nodesWithShuffleService(executorIp) = service
}
if (conf.getBoolean("spark.shuffle.service.enabled", false) &&
!nodesWithShuffleService.contains(executorIp)) {
logInfo(s"Ensuring shuffle service on ${executorIp}")
val service = ExternalShuffleServiceUtils.createShuffleService(
executorIp, shuffleServiceOptions.toBuffer.asJava)
nodesWithShuffleService(executorIp) = service
}
setUpExecutor(executorId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy.raydp

import io.ray.api.Ray;
import io.ray.api.Ray

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.ExternalShuffleService
Expand All @@ -28,7 +28,9 @@ class RayExternalShuffleService() extends Logging {
val mgr = new SecurityManager(conf)
val instance = new ExternalShuffleService(conf, mgr)

def start(): Unit = {
start()

final def start(): Unit = {
instance.start()
}

Expand Down
Loading