Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.CA_CERT_FILE_PATH;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.getTlsFileForClient;
import static org.apache.pulsar.client.impl.SameAuthParamsLookupAutoClusterFailover.PulsarServiceState;
import io.netty.channel.EventLoopGroup;
import java.net.ServerSocket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.NetworkErrorTestBase;
import org.apache.pulsar.broker.service.OneWayReplicatorTestBase;
Expand Down Expand Up @@ -99,7 +99,7 @@ public void testAutoClusterFailover(boolean enabledTls) throws Exception {
.tlsTrustCertsFilePath(CA_CERT_FILE_PATH);
}
final PulsarClient client = clientBuilder.build();
final EventLoopGroup executor = WhiteboxImpl.getInternalState(failover, "executor");
final ScheduledExecutorService executor = WhiteboxImpl.getInternalState(failover, "executor");
final PulsarServiceState[] stateArray =
WhiteboxImpl.getInternalState(failover, "pulsarServiceStateArray");

Expand Down Expand Up @@ -160,7 +160,7 @@ public void testInitializeCanOnlyBeCalledOnce() throws Exception {
* and producer/lookup operations are kept out of the polling loop so a slow message send does
* not consume the convergence budget.
*/
private static void awaitStatesAndIndex(EventLoopGroup executor, PulsarServiceState[] stateArray,
private static void awaitStatesAndIndex(ScheduledExecutorService executor, PulsarServiceState[] stateArray,
SameAuthParamsLookupAutoClusterFailover failover,
int expectedIndex,
PulsarServiceState... expectedStates) {
Expand All @@ -170,7 +170,7 @@ private static void awaitStatesAndIndex(EventLoopGroup executor, PulsarServiceSt
});
}

private static void assertStatesEqual(EventLoopGroup executor, PulsarServiceState[] stateArray,
private static void assertStatesEqual(ScheduledExecutorService executor, PulsarServiceState[] stateArray,
PulsarServiceState... expected) throws Exception {
CompletableFuture<PulsarServiceState[]> snapshot = new CompletableFuture<>();
executor.submit(() -> snapshot.complete(stateArray.clone()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
package org.apache.pulsar.client.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.ScheduledFuture;
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.CustomLog;
import lombok.Getter;
Expand All @@ -35,7 +36,6 @@
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;

/**
* A service URL provider that probes multiple Pulsar service URLs with the same authentication
Expand All @@ -50,7 +50,7 @@
public class SameAuthParamsLookupAutoClusterFailover implements ServiceUrlProvider {

private PulsarClientImpl pulsarClient;
private EventLoopGroup executor;
private ScheduledExecutorService executor;
private volatile boolean closed;
private ScheduledFuture<?> scheduledCheckTask;
@Getter
Expand Down Expand Up @@ -79,9 +79,12 @@ public synchronized void initialize(PulsarClient client) {
}
this.currentPulsarServiceIndex = 0;
this.pulsarClient = (PulsarClientImpl) client;
this.executor = EventLoopUtil.newEventLoopGroup(1, false,
this.executor = Executors.newSingleThreadScheduledExecutor(
new ExecutorProvider.ExtendedThreadFactory("broker-service-url-check"));
scheduledCheckTask = executor.scheduleAtFixedRate(() -> {
// Use fixed-delay (not fixed-rate) scheduling: a probe can block up to its timeout, and with a
// plain single-threaded scheduled executor fixed-rate runs would otherwise pile up back-to-back
// and monopolize the thread. Fixed-delay leaves a gap after each check completes.
scheduledCheckTask = executor.scheduleWithFixedDelay(() -> {
try {
if (closed) {
return;
Expand Down
Loading