From 4cdaece00e332c2d5681b906a7abeb5b2e77dc08 Mon Sep 17 00:00:00 2001 From: Brian Sheng Date: Fri, 10 Mar 2023 05:17:47 +0200 Subject: [PATCH] Fix invalid parameter issue when force to close endpoint Signed-off-by: Brian Sheng --- .../org/apache/spark/shuffle/ucx/UcxWorkerWrapper.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/main/scala/org/apache/spark/shuffle/ucx/UcxWorkerWrapper.scala b/src/main/scala/org/apache/spark/shuffle/ucx/UcxWorkerWrapper.scala index 6ad88feb..512ea6cc 100755 --- a/src/main/scala/org/apache/spark/shuffle/ucx/UcxWorkerWrapper.scala +++ b/src/main/scala/org/apache/spark/shuffle/ucx/UcxWorkerWrapper.scala @@ -188,7 +188,14 @@ case class UcxWorkerWrapper(worker: UcpWorker, transport: UcxShuffleTransport, i def connectByWorkerAddress(executorId: transport.ExecutorId, workerAddress: ByteBuffer): Unit = { logDebug(s"Worker $this connecting back to $executorId by worker address") val ep = worker.newEndpoint(new UcpEndpointParams().setName(s"Server connection to $executorId") - .setUcpAddress(workerAddress)) + .setUcpAddress(workerAddress) + .setPeerErrorHandlingMode() + .setErrorHandler(new UcpEndpointErrorHandler() { + override def onError(ep: UcpEndpoint, status: Int, errorMsg: String): Unit = { + logError(s"Endpoint to $executorId got an error: $errorMsg") + connections.remove(executorId) + } + })) connections.put(executorId, ep) }