diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java index 60324baf07697..910792a192b48 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java @@ -21,11 +21,11 @@ import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.CA_CERT_FILE_PATH; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.getTlsFileForClient; import static org.apache.pulsar.client.impl.SameAuthParamsLookupAutoClusterFailover.PulsarServiceState; -import io.netty.channel.EventLoopGroup; import java.net.ServerSocket; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.service.NetworkErrorTestBase; import org.apache.pulsar.broker.service.OneWayReplicatorTestBase; @@ -99,7 +99,7 @@ public void testAutoClusterFailover(boolean enabledTls) throws Exception { .tlsTrustCertsFilePath(CA_CERT_FILE_PATH); } final PulsarClient client = clientBuilder.build(); - final EventLoopGroup executor = WhiteboxImpl.getInternalState(failover, "executor"); + final ScheduledExecutorService executor = WhiteboxImpl.getInternalState(failover, "executor"); final PulsarServiceState[] stateArray = WhiteboxImpl.getInternalState(failover, "pulsarServiceStateArray"); @@ -160,7 +160,7 @@ public void testInitializeCanOnlyBeCalledOnce() throws Exception { * and producer/lookup operations are kept out of the polling loop so a slow message send does * not consume the convergence budget. */ - private static void awaitStatesAndIndex(EventLoopGroup executor, PulsarServiceState[] stateArray, + private static void awaitStatesAndIndex(ScheduledExecutorService executor, PulsarServiceState[] stateArray, SameAuthParamsLookupAutoClusterFailover failover, int expectedIndex, PulsarServiceState... expectedStates) { @@ -170,7 +170,7 @@ private static void awaitStatesAndIndex(EventLoopGroup executor, PulsarServiceSt }); } - private static void assertStatesEqual(EventLoopGroup executor, PulsarServiceState[] stateArray, + private static void assertStatesEqual(ScheduledExecutorService executor, PulsarServiceState[] stateArray, PulsarServiceState... expected) throws Exception { CompletableFuture snapshot = new CompletableFuture<>(); executor.submit(() -> snapshot.complete(stateArray.clone())); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java index 8d1bd777a7f6c..4e0878733fed7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java @@ -19,10 +19,11 @@ package org.apache.pulsar.client.impl; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import io.netty.channel.EventLoopGroup; -import io.netty.util.concurrent.ScheduledFuture; import java.util.Arrays; import java.util.HashSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import lombok.CustomLog; import lombok.Getter; @@ -35,7 +36,6 @@ import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.netty.EventLoopUtil; /** * A service URL provider that probes multiple Pulsar service URLs with the same authentication @@ -50,7 +50,7 @@ public class SameAuthParamsLookupAutoClusterFailover implements ServiceUrlProvider { private PulsarClientImpl pulsarClient; - private EventLoopGroup executor; + private ScheduledExecutorService executor; private volatile boolean closed; private ScheduledFuture scheduledCheckTask; @Getter @@ -79,9 +79,12 @@ public synchronized void initialize(PulsarClient client) { } this.currentPulsarServiceIndex = 0; this.pulsarClient = (PulsarClientImpl) client; - this.executor = EventLoopUtil.newEventLoopGroup(1, false, + this.executor = Executors.newSingleThreadScheduledExecutor( new ExecutorProvider.ExtendedThreadFactory("broker-service-url-check")); - scheduledCheckTask = executor.scheduleAtFixedRate(() -> { + // Use fixed-delay (not fixed-rate) scheduling: a probe can block up to its timeout, and with a + // plain single-threaded scheduled executor fixed-rate runs would otherwise pile up back-to-back + // and monopolize the thread. Fixed-delay leaves a gap after each check completes. + scheduledCheckTask = executor.scheduleWithFixedDelay(() -> { try { if (closed) { return;