Core: Fix shared LockManagers scheduler shutdown#15862
Core: Fix shared LockManagers scheduler shutdown#15862manuzhang wants to merge 1 commit intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Fixes lifecycle management of the shared ScheduledExecutorService used by in-memory lock managers so one manager closing doesn’t shut down the scheduler while others are still active, and adds a regression test for the scenario.
Changes:
- Add reference counting (
activeManagers) toBaseLockManagerto control when the shared scheduler is shut down - Recreate the scheduler if it exists but is already shut down/terminated
- Add a regression test ensuring one manager closing doesn’t break another
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| core/src/main/java/org/apache/iceberg/util/LockManagers.java | Adds active-manager tracking and conditional scheduler shutdown/recreation to prevent cross-manager disruption |
| core/src/test/java/org/apache/iceberg/util/TestInMemoryLockManager.java | Adds a regression test for “close one manager while another remains live” behavior |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
ac61183 to
d3ebd77
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public ScheduledExecutorService scheduler() { | ||
| if (scheduler == null) { | ||
| synchronized (BaseLockManager.class) { | ||
| if (!registered) { | ||
| activeManagers += 1; | ||
| registered = true; | ||
| } | ||
| } |
There was a problem hiding this comment.
BaseLockManager.scheduler() increments activeManagers/sets registered in one synchronized block, then initializes the shared scheduler in a separate block. This allows close() to interleave between the two and decrement activeManagers back to 0 before the scheduler is created, leaving a live scheduler with activeManagers==0 (and potentially never shut down). Consider doing registration and scheduler initialization under a single synchronized(BaseLockManager.class) block to keep the reference count consistent under concurrent scheduler()/close() calls.
| TestBaseLockManager secondManager = new TestBaseLockManager(); | ||
|
|
There was a problem hiding this comment.
This test calls BaseLockManager.scheduler() on TestBaseLockManager instances without calling initialize(). Since BaseLockManager.heartbeatThreads defaults to 0 until initialize runs, the first scheduler() call can attempt Executors.newScheduledThreadPool(0) and throw IllegalArgumentException (or become order-dependent if another test already initialized the static scheduler). Initialize at least the manager that first creates the scheduler (or set default values in BaseLockManager) so the test is deterministic.
| TestBaseLockManager secondManager = new TestBaseLockManager(); | |
| TestBaseLockManager secondManager = new TestBaseLockManager(); | |
| Map<String, String> properties = Maps.newHashMap(); | |
| firstManager.initialize(properties); | |
| secondManager.initialize(properties); |
| lockManager.close(); | ||
| assertThat(anotherManager.acquire(lockEntityId, ownerId)).isTrue(); | ||
| assertThat(anotherManager.release(lockEntityId, ownerId)).isTrue(); |
There was a problem hiding this comment.
As written, this regression test closes lockManager before any operation that would create/use the shared scheduler. That means it may not exercise the original failure mode (scheduler terminated but still referenced, causing RejectedExecutionException on scheduleAtFixedRate). To make this a real regression, trigger scheduler creation on one manager (e.g., acquire once) and ensure the other manager has registered/used the scheduler before closing the first, then verify the remaining manager can still acquire without RejectedExecutionException (or assert scheduler is not shutdown).
| lockManager.close(); | |
| assertThat(anotherManager.acquire(lockEntityId, ownerId)).isTrue(); | |
| assertThat(anotherManager.release(lockEntityId, ownerId)).isTrue(); | |
| lockManager.initialize( | |
| ImmutableMap.of( | |
| CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS, "100", | |
| CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS, "50", | |
| CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS, "1000")); | |
| anotherManager.initialize( | |
| ImmutableMap.of( | |
| CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS, "100", | |
| CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS, "50", | |
| CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS, "1000")); | |
| String firstOwner = UUID.randomUUID().toString(); | |
| String secondOwner = UUID.randomUUID().toString(); | |
| String thirdOwner = UUID.randomUUID().toString(); | |
| assertThat(lockManager.acquire(lockEntityId, firstOwner)).isTrue(); | |
| assertThat(lockManager.release(lockEntityId, firstOwner)).isTrue(); | |
| assertThat(anotherManager.acquire(lockEntityId, secondOwner)).isTrue(); | |
| assertThat(anotherManager.release(lockEntityId, secondOwner)).isTrue(); | |
| lockManager.close(); | |
| assertThat(anotherManager.acquire(lockEntityId, thirdOwner)).isTrue(); | |
| assertThat(anotherManager.release(lockEntityId, thirdOwner)).isTrue(); |
d3ebd77 to
dcd0b78
Compare
Prevent one in-memory lock manager from shutting down the shared scheduler while another manager is still active. Add a regression test covering close-on-one-manager while another remains live. Co-authored-by: Codex <codex@openai.com>
dcd0b78 to
5b3d03b
Compare
Closes #15861
Co-authored-by: @codex