From 5b3d03b171efe43de3ba33f8b92cd1a45d889d7f Mon Sep 17 00:00:00 2001 From: Manu Zhang Date: Wed, 1 Apr 2026 23:41:52 +0800 Subject: [PATCH] Core: Fix shared LockManagers scheduler shutdown Prevent one in-memory lock manager from shutting down the shared scheduler while another manager is still active. Add a regression test covering close-on-one-manager while another remains live. Co-authored-by: Codex --- .../org/apache/iceberg/util/LockManagers.java | 78 ++++++++----- .../iceberg/util/TestInMemoryLockManager.java | 18 +++ .../apache/iceberg/util/TestLockManagers.java | 103 ++++++++++++++++++ 3 files changed, 171 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/LockManagers.java b/core/src/main/java/org/apache/iceberg/util/LockManagers.java index 96622cb57f83..cf71beaf8292 100644 --- a/core/src/main/java/org/apache/iceberg/util/LockManagers.java +++ b/core/src/main/java/org/apache/iceberg/util/LockManagers.java @@ -81,12 +81,14 @@ private static LockManager loadLockManager(String impl, Map prop public abstract static class BaseLockManager implements LockManager { private static volatile ScheduledExecutorService scheduler; + private static int activeManagers = 0; private long acquireTimeoutMs; private long acquireIntervalMs; private long heartbeatIntervalMs; private long heartbeatTimeoutMs; private int heartbeatThreads; + private boolean registered = false; public long heartbeatTimeoutMs() { return heartbeatTimeoutMs; @@ -108,24 +110,30 @@ public int heartbeatThreads() { return heartbeatThreads; } + ScheduledExecutorService newScheduler() { + return MoreExecutors.getExitingScheduledExecutorService( + (ScheduledThreadPoolExecutor) + Executors.newScheduledThreadPool( + heartbeatThreads(), + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("iceberg-lock-manager-%d") + .build())); + } + public ScheduledExecutorService scheduler() { - if (scheduler == null) { - synchronized (BaseLockManager.class) { - if (scheduler == null) { - scheduler = - MoreExecutors.getExitingScheduledExecutorService( - (ScheduledThreadPoolExecutor) - Executors.newScheduledThreadPool( - heartbeatThreads(), - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("iceberg-lock-manager-%d") - .build())); - } + synchronized (BaseLockManager.class) { + if (!registered) { + activeManagers += 1; + registered = true; } - } - return scheduler; + if (scheduler == null || scheduler.isShutdown() || scheduler.isTerminated()) { + scheduler = newScheduler(); + } + + return scheduler; + } } @Override @@ -159,15 +167,28 @@ public void initialize(Map properties) { @Override public void close() throws Exception { - if (scheduler != null) { - List tasks = scheduler.shutdownNow(); - tasks.forEach( - task -> { - if (task instanceof Future) { - ((Future) task).cancel(true); - } - }); - scheduler = null; + unregister(); + } + + protected boolean unregister() { + synchronized (BaseLockManager.class) { + if (registered) { + activeManagers -= 1; + registered = false; + } + + if (activeManagers == 0 && scheduler != null) { + List tasks = scheduler.shutdownNow(); + tasks.forEach( + task -> { + if (task instanceof Future) { + ((Future) task).cancel(true); + } + }); + scheduler = null; + } + + return activeManagers == 0; } } } @@ -277,10 +298,11 @@ public boolean release(String entityId, String ownerId) { @Override public void close() throws Exception { - HEARTBEATS.values().forEach(future -> future.cancel(false)); - HEARTBEATS.clear(); - LOCKS.clear(); - super.close(); + if (unregister()) { + HEARTBEATS.values().forEach(future -> future.cancel(false)); + HEARTBEATS.clear(); + LOCKS.clear(); + } } } diff --git a/core/src/test/java/org/apache/iceberg/util/TestInMemoryLockManager.java b/core/src/test/java/org/apache/iceberg/util/TestInMemoryLockManager.java index da7de5e2fbf8..31f99caf970a 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestInMemoryLockManager.java +++ b/core/src/test/java/org/apache/iceberg/util/TestInMemoryLockManager.java @@ -175,4 +175,22 @@ public void testAcquireMultiProcessOnlyOneSucceed() { .as("only 1 thread should have acquired the lock") .isOne(); } + + @Test + public void testClosingOneManagerDoesNotBreakAnother() throws Exception { + LockManagers.InMemoryLockManager anotherManager = + new LockManagers.InMemoryLockManager(Maps.newHashMap()); + + try { + lockManager.close(); + assertThat(anotherManager.acquire(lockEntityId, ownerId)).isTrue(); + assertThat(anotherManager.release(lockEntityId, ownerId)).isTrue(); + lockManager = anotherManager; + anotherManager = null; + } finally { + if (anotherManager != null) { + anotherManager.close(); + } + } + } } diff --git a/core/src/test/java/org/apache/iceberg/util/TestLockManagers.java b/core/src/test/java/org/apache/iceberg/util/TestLockManagers.java index c3207ae13426..2dc02b6accb2 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestLockManagers.java +++ b/core/src/test/java/org/apache/iceberg/util/TestLockManagers.java @@ -21,6 +21,10 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.LockManager; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -41,6 +45,69 @@ public void testLoadCustomLockManager() { assertThat(LockManagers.from(properties)).isInstanceOf(CustomLockManager.class); } + @Test + public void testSchedulerUsageRegistersManagerWithoutInitialize() throws Exception { + TestBaseLockManager firstManager = new TestBaseLockManager(); + + try (TestBaseLockManager secondManager = new TestBaseLockManager()) { + firstManager.scheduler(); + ScheduledExecutorService scheduler = secondManager.scheduler(); + + firstManager.close(); + + assertThat(scheduler.isShutdown()).isFalse(); + } + } + + @Test + public void testCloseWaitsForSchedulerRegistrationAndCreation() throws Exception { + CountDownLatch creatingScheduler = new CountDownLatch(1); + CountDownLatch finishCreatingScheduler = new CountDownLatch(1); + BlockingBaseLockManager manager = + new BlockingBaseLockManager(creatingScheduler, finishCreatingScheduler); + + CompletableFuture schedulerFuture = + CompletableFuture.supplyAsync(manager::scheduler); + + assertThat(creatingScheduler.await(5, TimeUnit.SECONDS)).isTrue(); + + CompletableFuture closeFuture = + CompletableFuture.runAsync( + () -> { + try { + manager.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + assertThat(closeFuture).isNotDone(); + + finishCreatingScheduler.countDown(); + + ScheduledExecutorService scheduler = schedulerFuture.get(5, TimeUnit.SECONDS); + closeFuture.get(5, TimeUnit.SECONDS); + + assertThat(scheduler.isShutdown()).isTrue(); + } + + @Test + public void testCloseDoesNotClearLocksWhileAnotherManagerIsActive() throws Exception { + LockManagers.InMemoryLockManager firstManager = + new LockManagers.InMemoryLockManager(Maps.newHashMap()); + + try (LockManagers.InMemoryLockManager secondManager = + new LockManagers.InMemoryLockManager(Maps.newHashMap())) { + firstManager.acquireOnce("entity", "owner"); + + secondManager.close(); + + assertThat(firstManager.release("entity", "owner")).isTrue(); + } finally { + firstManager.close(); + } + } + static class CustomLockManager implements LockManager { @Override @@ -59,4 +126,40 @@ public void close() throws Exception {} @Override public void initialize(Map properties) {} } + + static class TestBaseLockManager extends LockManagers.BaseLockManager { + + @Override + public boolean acquire(String entityId, String ownerId) { + return false; + } + + @Override + public boolean release(String entityId, String ownerId) { + return false; + } + } + + static class BlockingBaseLockManager extends TestBaseLockManager { + private final CountDownLatch creatingScheduler; + private final CountDownLatch finishCreatingScheduler; + + BlockingBaseLockManager( + CountDownLatch creatingScheduler, CountDownLatch finishCreatingScheduler) { + this.creatingScheduler = creatingScheduler; + this.finishCreatingScheduler = finishCreatingScheduler; + } + + @Override + ScheduledExecutorService newScheduler() { + creatingScheduler.countDown(); + try { + assertThat(finishCreatingScheduler.await(5, TimeUnit.SECONDS)).isTrue(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + return super.newScheduler(); + } + } }