From 63d40a2fe545ef2425400158deafb44a119e5ac8 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 18 Jun 2026 14:48:02 -0700 Subject: [PATCH] [fix][ml] Avoid blocking the factory-shutdown thread on serial managed-ledger close ManagedLedgerFactoryImpl.shutdownAsync first initiates an async close (asyncClose) for every managed ledger known at the start of shutdown. A second pass, inside bookkeeperFuture.thenRun(...), then closes any ledgers that were added after that snapshot -- but it did so with a serial, blocking managedLedger.close() (which waits on a CountDownLatch), blocking the thread that completes the shutdown future. Change that second pass from thenRun to thenCompose, and replace the blocking close() with asyncClose(), collecting the per-ledger futures and awaiting them with Futures.waitForAll. In the common case this pass is empty (the first pass has already drained the ledgers map), so it is a no-op; when ledgers were added concurrently during shutdown they now close in parallel without blocking the continuation thread. The first pass and the rest of the shutdown sequence are unchanged. --- .../impl/ManagedLedgerFactoryImpl.java | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index e855a21c17d9e..fd85eb90c9573 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -707,9 +707,10 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { ? bookkeeperFactory.get() : CompletableFuture.completedFuture(null); return bookkeeperFuture - .thenRun(() -> { + .thenCompose(__ -> { log.info().attr("numLedgers", ledgers.size()).log("Closing ledgers"); //make sure all callbacks is called. + List> remainingFutures = new ArrayList<>(); ledgers.forEach(((ledgerName, ledgerFuture) -> { if (!ledgerFuture.isDone()) { ledgerFuture.completeExceptionally( @@ -719,14 +720,26 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { if (managedLedger == null) { return; } - try { - managedLedger.close(); - } catch (Throwable throwable) { - log.warn().attr("managedLedger", managedLedger.getName()).exception(throwable) - .log("Got exception when closing managed ledger"); - } + // Close asynchronously so a slow close cannot block, serially, the thread + // that completes the shutdown future. + CompletableFuture closeFuture = new CompletableFuture<>(); + remainingFutures.add(closeFuture); + managedLedger.asyncClose(new AsyncCallbacks.CloseCallback() { + @Override + public void closeComplete(Object ctx) { + closeFuture.complete(null); + } + + @Override + public void closeFailed(ManagedLedgerException exception, Object ctx) { + log.warn().attr("managedLedger", managedLedger.getName()).exception(exception) + .log("Got exception when closing managed ledger"); + closeFuture.complete(null); + } + }, null); } })); + return Futures.waitForAll(remainingFutures); }).whenCompleteAsync((__, ___) -> { //wait for tasks in scheduledExecutor executed. openTelemetryManagedCursorStats.close();