Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 50 additions & 28 deletions core/src/main/java/org/apache/iceberg/util/LockManagers.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,14 @@ private static LockManager loadLockManager(String impl, Map<String, String> prop
public abstract static class BaseLockManager implements LockManager {

private static volatile ScheduledExecutorService scheduler;
private static int activeManagers = 0;

private long acquireTimeoutMs;
private long acquireIntervalMs;
private long heartbeatIntervalMs;
private long heartbeatTimeoutMs;
private int heartbeatThreads;
private boolean registered = false;

public long heartbeatTimeoutMs() {
return heartbeatTimeoutMs;
Expand All @@ -108,24 +110,30 @@ public int heartbeatThreads() {
return heartbeatThreads;
}

ScheduledExecutorService newScheduler() {
return MoreExecutors.getExitingScheduledExecutorService(
(ScheduledThreadPoolExecutor)
Executors.newScheduledThreadPool(
heartbeatThreads(),
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("iceberg-lock-manager-%d")
.build()));
}

public ScheduledExecutorService scheduler() {
if (scheduler == null) {
synchronized (BaseLockManager.class) {
if (scheduler == null) {
scheduler =
MoreExecutors.getExitingScheduledExecutorService(
(ScheduledThreadPoolExecutor)
Executors.newScheduledThreadPool(
heartbeatThreads(),
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("iceberg-lock-manager-%d")
.build()));
}
synchronized (BaseLockManager.class) {
if (!registered) {
activeManagers += 1;
registered = true;
}
}

return scheduler;
if (scheduler == null || scheduler.isShutdown() || scheduler.isTerminated()) {
scheduler = newScheduler();
}

return scheduler;
}
}

@Override
Expand Down Expand Up @@ -159,15 +167,28 @@ public void initialize(Map<String, String> properties) {

@Override
public void close() throws Exception {
if (scheduler != null) {
List<Runnable> tasks = scheduler.shutdownNow();
tasks.forEach(
task -> {
if (task instanceof Future) {
((Future<?>) task).cancel(true);
}
});
scheduler = null;
unregister();
}

protected boolean unregister() {
synchronized (BaseLockManager.class) {
if (registered) {
activeManagers -= 1;
registered = false;
}

if (activeManagers == 0 && scheduler != null) {
List<Runnable> tasks = scheduler.shutdownNow();
tasks.forEach(
task -> {
if (task instanceof Future) {
((Future<?>) task).cancel(true);
}
});
scheduler = null;
}

return activeManagers == 0;
}
}
}
Expand Down Expand Up @@ -277,10 +298,11 @@ public boolean release(String entityId, String ownerId) {

@Override
public void close() throws Exception {
HEARTBEATS.values().forEach(future -> future.cancel(false));
HEARTBEATS.clear();
LOCKS.clear();
super.close();
if (unregister()) {
HEARTBEATS.values().forEach(future -> future.cancel(false));
HEARTBEATS.clear();
LOCKS.clear();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,22 @@ public void testAcquireMultiProcessOnlyOneSucceed() {
.as("only 1 thread should have acquired the lock")
.isOne();
}

@Test
public void testClosingOneManagerDoesNotBreakAnother() throws Exception {
LockManagers.InMemoryLockManager anotherManager =
new LockManagers.InMemoryLockManager(Maps.newHashMap());

try {
lockManager.close();
assertThat(anotherManager.acquire(lockEntityId, ownerId)).isTrue();
assertThat(anotherManager.release(lockEntityId, ownerId)).isTrue();
Comment on lines +185 to +187
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As written, this regression test closes lockManager before any operation that would create/use the shared scheduler. That means it may not exercise the original failure mode (scheduler terminated but still referenced, causing RejectedExecutionException on scheduleAtFixedRate). To make this a real regression, trigger scheduler creation on one manager (e.g., acquire once) and ensure the other manager has registered/used the scheduler before closing the first, then verify the remaining manager can still acquire without RejectedExecutionException (or assert scheduler is not shutdown).

Suggested change
lockManager.close();
assertThat(anotherManager.acquire(lockEntityId, ownerId)).isTrue();
assertThat(anotherManager.release(lockEntityId, ownerId)).isTrue();
lockManager.initialize(
ImmutableMap.of(
CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS, "100",
CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS, "50",
CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS, "1000"));
anotherManager.initialize(
ImmutableMap.of(
CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS, "100",
CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS, "50",
CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS, "1000"));
String firstOwner = UUID.randomUUID().toString();
String secondOwner = UUID.randomUUID().toString();
String thirdOwner = UUID.randomUUID().toString();
assertThat(lockManager.acquire(lockEntityId, firstOwner)).isTrue();
assertThat(lockManager.release(lockEntityId, firstOwner)).isTrue();
assertThat(anotherManager.acquire(lockEntityId, secondOwner)).isTrue();
assertThat(anotherManager.release(lockEntityId, secondOwner)).isTrue();
lockManager.close();
assertThat(anotherManager.acquire(lockEntityId, thirdOwner)).isTrue();
assertThat(anotherManager.release(lockEntityId, thirdOwner)).isTrue();

Copilot uses AI. Check for mistakes.
lockManager = anotherManager;
anotherManager = null;
} finally {
if (anotherManager != null) {
anotherManager.close();
}
}
}
}
103 changes: 103 additions & 0 deletions core/src/test/java/org/apache/iceberg/util/TestLockManagers.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.LockManager;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand All @@ -41,6 +45,69 @@ public void testLoadCustomLockManager() {
assertThat(LockManagers.from(properties)).isInstanceOf(CustomLockManager.class);
}

@Test
public void testSchedulerUsageRegistersManagerWithoutInitialize() throws Exception {
TestBaseLockManager firstManager = new TestBaseLockManager();

try (TestBaseLockManager secondManager = new TestBaseLockManager()) {
firstManager.scheduler();
ScheduledExecutorService scheduler = secondManager.scheduler();

firstManager.close();

assertThat(scheduler.isShutdown()).isFalse();
}
}

@Test
public void testCloseWaitsForSchedulerRegistrationAndCreation() throws Exception {
CountDownLatch creatingScheduler = new CountDownLatch(1);
CountDownLatch finishCreatingScheduler = new CountDownLatch(1);
BlockingBaseLockManager manager =
new BlockingBaseLockManager(creatingScheduler, finishCreatingScheduler);

CompletableFuture<ScheduledExecutorService> schedulerFuture =
CompletableFuture.supplyAsync(manager::scheduler);

assertThat(creatingScheduler.await(5, TimeUnit.SECONDS)).isTrue();

CompletableFuture<Void> closeFuture =
CompletableFuture.runAsync(
() -> {
try {
manager.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
});

assertThat(closeFuture).isNotDone();

finishCreatingScheduler.countDown();

ScheduledExecutorService scheduler = schedulerFuture.get(5, TimeUnit.SECONDS);
closeFuture.get(5, TimeUnit.SECONDS);

assertThat(scheduler.isShutdown()).isTrue();
}

@Test
public void testCloseDoesNotClearLocksWhileAnotherManagerIsActive() throws Exception {
LockManagers.InMemoryLockManager firstManager =
new LockManagers.InMemoryLockManager(Maps.newHashMap());

try (LockManagers.InMemoryLockManager secondManager =
new LockManagers.InMemoryLockManager(Maps.newHashMap())) {
firstManager.acquireOnce("entity", "owner");

secondManager.close();

assertThat(firstManager.release("entity", "owner")).isTrue();
} finally {
firstManager.close();
}
}

static class CustomLockManager implements LockManager {

@Override
Expand All @@ -59,4 +126,40 @@ public void close() throws Exception {}
@Override
public void initialize(Map<String, String> properties) {}
}

static class TestBaseLockManager extends LockManagers.BaseLockManager {

@Override
public boolean acquire(String entityId, String ownerId) {
return false;
}

@Override
public boolean release(String entityId, String ownerId) {
return false;
}
}

static class BlockingBaseLockManager extends TestBaseLockManager {
private final CountDownLatch creatingScheduler;
private final CountDownLatch finishCreatingScheduler;

BlockingBaseLockManager(
CountDownLatch creatingScheduler, CountDownLatch finishCreatingScheduler) {
this.creatingScheduler = creatingScheduler;
this.finishCreatingScheduler = finishCreatingScheduler;
}

@Override
ScheduledExecutorService newScheduler() {
creatingScheduler.countDown();
try {
assertThat(finishCreatingScheduler.await(5, TimeUnit.SECONDS)).isTrue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

return super.newScheduler();
}
}
}
Loading