diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index e855a21c17d9e..fd85eb90c9573 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -707,9 +707,10 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { ? bookkeeperFactory.get() : CompletableFuture.completedFuture(null); return bookkeeperFuture - .thenRun(() -> { + .thenCompose(__ -> { log.info().attr("numLedgers", ledgers.size()).log("Closing ledgers"); //make sure all callbacks is called. + List> remainingFutures = new ArrayList<>(); ledgers.forEach(((ledgerName, ledgerFuture) -> { if (!ledgerFuture.isDone()) { ledgerFuture.completeExceptionally( @@ -719,14 +720,26 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { if (managedLedger == null) { return; } - try { - managedLedger.close(); - } catch (Throwable throwable) { - log.warn().attr("managedLedger", managedLedger.getName()).exception(throwable) - .log("Got exception when closing managed ledger"); - } + // Close asynchronously so a slow close cannot block, serially, the thread + // that completes the shutdown future. + CompletableFuture closeFuture = new CompletableFuture<>(); + remainingFutures.add(closeFuture); + managedLedger.asyncClose(new AsyncCallbacks.CloseCallback() { + @Override + public void closeComplete(Object ctx) { + closeFuture.complete(null); + } + + @Override + public void closeFailed(ManagedLedgerException exception, Object ctx) { + log.warn().attr("managedLedger", managedLedger.getName()).exception(exception) + .log("Got exception when closing managed ledger"); + closeFuture.complete(null); + } + }, null); } })); + return Futures.waitForAll(remainingFutures); }).whenCompleteAsync((__, ___) -> { //wait for tasks in scheduledExecutor executed. openTelemetryManagedCursorStats.close();