From c4eaa66cdfaf3480a8399db15c61a332da547d81 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 18 Jun 2026 14:24:48 -0700 Subject: [PATCH] [fix][broker] Avoid blocking the PulsarAdmin callback thread on the post-unload load-report write When a namespace isolation policy changes, ClustersBase .filterAndUnloadMatchedNamespaceAsync unloads the affected namespaces and then writes the load report so the cluster rebalances quickly. That write ran inside FutureUtil.waitForAll(unloadAsync...).thenAccept(...), which executes on the thread that completes the adminClient.namespaces().unloadAsync(...) futures -- a PulsarAdmin async-HTTP-client callback thread. LoadManager .writeLoadReportOnZookeeper(true) blocks there: ModularLoadManagerImpl .writeBrokerDataOnZooKeeper does lock.lock() plus a metadata-store updateValue(...).join(). Run the load-report write via thenAcceptAsync(..., pulsar().getExecutor()) so it executes on the broker executor (where short blocking metadata writes are routine) instead of the admin-client callback thread. The REST response still waits for the write (the chained future completes after it); the write remains best-effort (failure logged and swallowed, as before). --- .../org/apache/pulsar/broker/admin/impl/ClustersBase.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 3b8d7e4ff7820..ea587566da128 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -902,16 +902,18 @@ private CompletableFuture filterAndUnloadMatchedNamespaceAsync(String clus List> futures = clusterLocalNamespaces.stream() .map(namespaceName -> adminClient.namespaces().unloadAsync(namespaceName)) .collect(Collectors.toList()); - return FutureUtil.waitForAll(futures).thenAccept(__ -> { + return FutureUtil.waitForAll(futures).thenAcceptAsync(__ -> { try { - // write load info to load manager to make the load happens fast + // Write the load report so the unloaded namespaces rebalance quickly. Run it on the + // broker executor rather than the admin-client callback thread that completes the + // unload futures, because writeLoadReportOnZookeeper blocks on a metadata-store write. pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true); } catch (Exception e) { log.warn() .exception(e) .log("Failed to writeLoadReportOnZookeeper."); } - }); + }, pulsar().getExecutor()); }); }