diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java index 38d7a5cd54a61..1c9648af175bc 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java @@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT; import com.google.common.base.Joiner; +import io.netty.util.concurrent.DefaultThreadFactory; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; @@ -37,6 +38,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; import java.util.function.Predicate; import java.util.regex.Matcher; @@ -116,6 +119,11 @@ long getLedgerNodeVersion() { private final List> lostBookieRecoveryDelayCallbacks = new ArrayList<>(); + // Registered callbacks can perform synchronous metadata-store reads, so run them on a dedicated + // single-threaded executor instead of the metadata-store notification thread (and outside the lock). + private final ExecutorService notificationCallbackExecutor = + Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-underreplication-notification")); + private static class PulsarUnderreplicatedLedger extends UnderreplicatedLedger { PulsarUnderreplicatedLedger(long ledgerId) { super(ledgerId); @@ -246,13 +254,15 @@ private void handleNotification(Notification n) { callbackList = new ArrayList<>(lostBookieRecoveryDelayCallbacks); lostBookieRecoveryDelayCallbacks.clear(); } - for (BookkeeperInternalCallbacks.GenericCallback callback : callbackList) { - try { - callback.operationComplete(0, null); - } catch (Exception e) { - log.warn().exception(e).log("lostBookieRecoveryDelayCallbacks handle error"); + notificationCallbackExecutor.execute(() -> { + for (BookkeeperInternalCallbacks.GenericCallback callback : callbackList) { + try { + callback.operationComplete(0, null); + } catch (Exception e) { + log.warn().exception(e).log("lostBookieRecoveryDelayCallbacks handle error"); + } } - } + }); return; } if (replicationDisablePath.equals(n.getPath()) && n.getType() == NotificationType.Deleted) { @@ -263,13 +273,15 @@ private void handleNotification(Notification n) { callbackList = new ArrayList<>(replicationEnabledCallbacks); replicationEnabledCallbacks.clear(); } - for (BookkeeperInternalCallbacks.GenericCallback callback : callbackList) { - try { - callback.operationComplete(0, null); - } catch (Exception e) { - log.warn().exception(e).log("replicationEnabledCallbacks handle error"); + notificationCallbackExecutor.execute(() -> { + for (BookkeeperInternalCallbacks.GenericCallback callback : callbackList) { + try { + callback.operationComplete(0, null); + } catch (Exception e) { + log.warn().exception(e).log("replicationEnabledCallbacks handle error"); + } } - } + }); } } } @@ -670,6 +682,7 @@ public void releaseUnderreplicatedLedger(long ledgerId) throws ReplicationExcept @Override public void close() throws ReplicationException.UnavailableException { log.debug("close()"); + notificationCallbackExecutor.shutdownNow(); try { for (Map.Entry e : heldLocks.entrySet()) { store.delete(e.getValue().getLockPath(), Optional.empty())