From 0a84b0bbd572c5dbfaaab0789aa3662d88cad56f Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 18 Jun 2026 15:02:52 -0700 Subject: [PATCH] [fix][meta] Run ledger-underreplication notification callbacks off the metadata-store listener thread PulsarLedgerUnderreplicationManager.handleNotification is registered as the metadata-store change listener and runs on the metadata-store notification thread, inside synchronized(this). When a relevant z-node changed, it invoked the registered BookKeeper GenericCallbacks (replicationEnabledCallbacks / lostBookieRecoveryDelayCallbacks) inline. Those callbacks can perform synchronous metadata-store reads (the class documents this near notifyUnderReplicationLedgerChanged), so they blocked the metadata-store notification thread -- and held the manager's monitor -- while doing metadata IO. Run the registered callbacks on a dedicated single-threaded executor instead of the notification thread. The callback list is still snapshotted and cleared synchronously under the monitor, and notifyAll() (the primary wait/notify used by the blocking poll methods) stays synchronous; only the callback invocation is offloaded, so it runs off the notification thread and without holding the monitor. The single thread preserves notification ordering, and close() shuts the executor down. Callbacks re-read current state when they re-register, so this is eventually consistent. --- .../PulsarLedgerUnderreplicationManager.java | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java index 38d7a5cd54a61..1c9648af175bc 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java @@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT; import com.google.common.base.Joiner; +import io.netty.util.concurrent.DefaultThreadFactory; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; @@ -37,6 +38,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; import java.util.function.Predicate; import java.util.regex.Matcher; @@ -116,6 +119,11 @@ long getLedgerNodeVersion() { private final List> lostBookieRecoveryDelayCallbacks = new ArrayList<>(); + // Registered callbacks can perform synchronous metadata-store reads, so run them on a dedicated + // single-threaded executor instead of the metadata-store notification thread (and outside the lock). + private final ExecutorService notificationCallbackExecutor = + Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-underreplication-notification")); + private static class PulsarUnderreplicatedLedger extends UnderreplicatedLedger { PulsarUnderreplicatedLedger(long ledgerId) { super(ledgerId); @@ -246,13 +254,15 @@ private void handleNotification(Notification n) { callbackList = new ArrayList<>(lostBookieRecoveryDelayCallbacks); lostBookieRecoveryDelayCallbacks.clear(); } - for (BookkeeperInternalCallbacks.GenericCallback callback : callbackList) { - try { - callback.operationComplete(0, null); - } catch (Exception e) { - log.warn().exception(e).log("lostBookieRecoveryDelayCallbacks handle error"); + notificationCallbackExecutor.execute(() -> { + for (BookkeeperInternalCallbacks.GenericCallback callback : callbackList) { + try { + callback.operationComplete(0, null); + } catch (Exception e) { + log.warn().exception(e).log("lostBookieRecoveryDelayCallbacks handle error"); + } } - } + }); return; } if (replicationDisablePath.equals(n.getPath()) && n.getType() == NotificationType.Deleted) { @@ -263,13 +273,15 @@ private void handleNotification(Notification n) { callbackList = new ArrayList<>(replicationEnabledCallbacks); replicationEnabledCallbacks.clear(); } - for (BookkeeperInternalCallbacks.GenericCallback callback : callbackList) { - try { - callback.operationComplete(0, null); - } catch (Exception e) { - log.warn().exception(e).log("replicationEnabledCallbacks handle error"); + notificationCallbackExecutor.execute(() -> { + for (BookkeeperInternalCallbacks.GenericCallback callback : callbackList) { + try { + callback.operationComplete(0, null); + } catch (Exception e) { + log.warn().exception(e).log("replicationEnabledCallbacks handle error"); + } } - } + }); } } } @@ -670,6 +682,7 @@ public void releaseUnderreplicatedLedger(long ledgerId) throws ReplicationExcept @Override public void close() throws ReplicationException.UnavailableException { log.debug("close()"); + notificationCallbackExecutor.shutdownNow(); try { for (Map.Entry e : heldLocks.entrySet()) { store.delete(e.getValue().getLockPath(), Optional.empty())