From 66789d1d154840d0a9ad71f2bd1d3e8b1909ae24 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 18 Jun 2026 14:35:36 -0700 Subject: [PATCH] [fix][broker] Avoid blocking the metrics thread on the pending-ack managed ledger TransactionAggregator.generate read the pending-ack store's managed ledger with ((PersistentSubscription) subscription).getPendingAckManageLedger().get() -- an un-timed CompletableFuture.get() running on the prometheus-stats executor (a bounded, 4-thread pool). The guard checkIfPendingAckStoreInit() only verifies pendingAckStoreFuture.isDone(); getStoreManageLedger() then composes pendingAckStoreFuture.thenCompose(store -> store.getManagedLedger()), so the managed-ledger future can still be resolving. A stuck future blocks that metrics thread, and enough of them exhaust the pool, hanging all metric scraping. Use getNow(null) instead of get(): if the managed ledger is already resolved it is used as before; if not, this subscription's pending-ack-store stats are skipped for the current scrape and picked up on the next one. A future that completed exceptionally still throws into the existing catch (unchanged). --- .../stats/prometheus/TransactionAggregator.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java index 9f9bb560c1786..0738dfbb3ac9c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java @@ -65,10 +65,15 @@ public static void generate(PulsarService pulsar, PrometheusMetricStreams stream if (!isEventSystemTopic(TopicName.get(subscription.getTopic().getName())) && subscription instanceof PersistentSubscription && ((PersistentSubscription) subscription).checkIfPendingAckStoreInit()) { + // Use getNow() rather than get() so a not-yet-resolved managed + // ledger cannot block (and exhaust) the metrics thread pool; the + // stats are simply skipped for this scrape and picked up later. ManagedLedger managedLedger = ((PersistentSubscription) subscription) - .getPendingAckManageLedger().get(); - generateManageLedgerStats(managedLedger, - stream, cluster, namespace, name, subscription.getName()); + .getPendingAckManageLedger().getNow(null); + if (managedLedger != null) { + generateManageLedgerStats(managedLedger, + stream, cluster, namespace, name, subscription.getName()); + } } } catch (Exception e) { log.warn()