From f03430e406d46301c8a3cde62c8f34ab5810f2a8 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 18 Jun 2026 14:53:38 -0700 Subject: [PATCH 1/2] [fix][client] Run the failover health probe off the Netty event-loop thread SameAuthParamsLookupAutoClusterFailover periodically probes broker health with a blocking getLookup(url).getBroker(...).get(3, SECONDS). The periodic task was scheduled on a Netty EventLoopGroup (EventLoopUtil.newEventLoopGroup(1, ...)), so the blocking probe ran on an event-loop thread. Use a plain single-thread ScheduledExecutorService (Executors.newSingleThreadScheduledExecutor) for the periodic health check, matching the sibling AutoClusterFailover, so the blocking probe no longer occupies a Netty event-loop thread. scheduleAtFixedRate and shutdownNow are unchanged; the executor is dedicated solely to this check. --- .../impl/SameAuthParamsLookupAutoClusterFailover.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java index 8d1bd777a7f6c..d7fe951f088bb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java @@ -19,10 +19,11 @@ package org.apache.pulsar.client.impl; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import io.netty.channel.EventLoopGroup; -import io.netty.util.concurrent.ScheduledFuture; import java.util.Arrays; import java.util.HashSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import lombok.CustomLog; import lombok.Getter; @@ -35,7 +36,6 @@ import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.netty.EventLoopUtil; /** * A service URL provider that probes multiple Pulsar service URLs with the same authentication @@ -50,7 +50,7 @@ public class SameAuthParamsLookupAutoClusterFailover implements ServiceUrlProvider { private PulsarClientImpl pulsarClient; - private EventLoopGroup executor; + private ScheduledExecutorService executor; private volatile boolean closed; private ScheduledFuture scheduledCheckTask; @Getter @@ -79,7 +79,7 @@ public synchronized void initialize(PulsarClient client) { } this.currentPulsarServiceIndex = 0; this.pulsarClient = (PulsarClientImpl) client; - this.executor = EventLoopUtil.newEventLoopGroup(1, false, + this.executor = Executors.newSingleThreadScheduledExecutor( new ExecutorProvider.ExtendedThreadFactory("broker-service-url-check")); scheduledCheckTask = executor.scheduleAtFixedRate(() -> { try { From caa81d51d29f12ca3cf00bed4e8e9ce605c6528b Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 18 Jun 2026 17:49:11 -0700 Subject: [PATCH 2/2] [fix][client] Schedule the failover health check with fixed delay and fix its broker test Follow-up to the executor change in this PR, fixing the CI failure in org.apache.pulsar.broker.SameAuthParamsLookupAutoClusterFailoverTest. 1. The broker integration test reflects the private 'executor' field and typed it as io.netty.channel.EventLoopGroup; the field is now a ScheduledExecutorService, so the reflective cast threw ClassCastException. Update the three type references in the test to ScheduledExecutorService (it only uses execute/submit). 2. The test schedules the health check every 100ms while one service (a dead dummy proxy) blocks its probe for ~3s. On a plain single-threaded ScheduledExecutorService, scheduleAtFixedRate runs such slow checks back-to-back (catch-up) and monopolizes the thread, starving the task the test submits to the executor (a Netty EventLoopGroup interleaves immediate tasks, which is why it passed before). Use scheduleWithFixedDelay so a gap is left after each check; this is also better for a blocking health probe, which fixed-rate would otherwise issue continuously while a service is down. --- .../SameAuthParamsLookupAutoClusterFailoverTest.java | 8 ++++---- .../impl/SameAuthParamsLookupAutoClusterFailover.java | 5 ++++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java index 60324baf07697..910792a192b48 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java @@ -21,11 +21,11 @@ import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.CA_CERT_FILE_PATH; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.getTlsFileForClient; import static org.apache.pulsar.client.impl.SameAuthParamsLookupAutoClusterFailover.PulsarServiceState; -import io.netty.channel.EventLoopGroup; import java.net.ServerSocket; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.service.NetworkErrorTestBase; import org.apache.pulsar.broker.service.OneWayReplicatorTestBase; @@ -99,7 +99,7 @@ public void testAutoClusterFailover(boolean enabledTls) throws Exception { .tlsTrustCertsFilePath(CA_CERT_FILE_PATH); } final PulsarClient client = clientBuilder.build(); - final EventLoopGroup executor = WhiteboxImpl.getInternalState(failover, "executor"); + final ScheduledExecutorService executor = WhiteboxImpl.getInternalState(failover, "executor"); final PulsarServiceState[] stateArray = WhiteboxImpl.getInternalState(failover, "pulsarServiceStateArray"); @@ -160,7 +160,7 @@ public void testInitializeCanOnlyBeCalledOnce() throws Exception { * and producer/lookup operations are kept out of the polling loop so a slow message send does * not consume the convergence budget. */ - private static void awaitStatesAndIndex(EventLoopGroup executor, PulsarServiceState[] stateArray, + private static void awaitStatesAndIndex(ScheduledExecutorService executor, PulsarServiceState[] stateArray, SameAuthParamsLookupAutoClusterFailover failover, int expectedIndex, PulsarServiceState... expectedStates) { @@ -170,7 +170,7 @@ private static void awaitStatesAndIndex(EventLoopGroup executor, PulsarServiceSt }); } - private static void assertStatesEqual(EventLoopGroup executor, PulsarServiceState[] stateArray, + private static void assertStatesEqual(ScheduledExecutorService executor, PulsarServiceState[] stateArray, PulsarServiceState... expected) throws Exception { CompletableFuture snapshot = new CompletableFuture<>(); executor.submit(() -> snapshot.complete(stateArray.clone())); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java index d7fe951f088bb..4e0878733fed7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java @@ -81,7 +81,10 @@ public synchronized void initialize(PulsarClient client) { this.pulsarClient = (PulsarClientImpl) client; this.executor = Executors.newSingleThreadScheduledExecutor( new ExecutorProvider.ExtendedThreadFactory("broker-service-url-check")); - scheduledCheckTask = executor.scheduleAtFixedRate(() -> { + // Use fixed-delay (not fixed-rate) scheduling: a probe can block up to its timeout, and with a + // plain single-threaded scheduled executor fixed-rate runs would otherwise pile up back-to-back + // and monopolize the thread. Fixed-delay leaves a gap after each check completes. + scheduledCheckTask = executor.scheduleWithFixedDelay(() -> { try { if (closed) { return;