diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 751e3d42a4b..14755be12bb 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -161,11 +162,11 @@ public FateTxStore reserve(FateId fateId) { EnumSet.of(TStatus.SUBMITTED, TStatus.FAILED_IN_PROGRESS); @Override - public void runnable(AtomicBoolean keepWaiting, Consumer idConsumer) { + public void runnable(BooleanSupplier keepWaiting, Consumer idConsumer) { AtomicLong seen = new AtomicLong(0); - while (keepWaiting.get() && seen.get() == 0) { + while (keepWaiting.getAsBoolean() && seen.get() == 0) { final long beforeCount = unreservedRunnableCount.getCount(); final boolean beforeDeferredOverflow = deferredOverflow.get(); @@ -207,8 +208,7 @@ public void runnable(AtomicBoolean keepWaiting, Consumer idConsume } if (waitTime > 0) { - unreservedRunnableCount.waitFor(count -> count != beforeCount, waitTime, - keepWaiting::get); + unreservedRunnableCount.waitFor(count -> count != beforeCount, waitTime, keepWaiting); } } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java index bbf0bcb81ef..883146eb4e4 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java @@ -308,7 +308,7 @@ private class WorkFinder implements Runnable { public void run() { while (fate.getKeepRunning().get() && !isShutdown()) { try { - fate.getStore().runnable(fate.getKeepRunning(), fateIdStatus -> { + fate.getStore().runnable(() -> fate.getKeepRunning().get(), fateIdStatus -> { // The FateId with the fate operation 'fateOp' is workable by this FateExecutor if // 1) This FateExecutor is assigned to work on 'fateOp' ('fateOp' is in 'fateOps') // 2) The transaction was cancelled while NEW. This is an edge case that needs to be diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index e28e7936421..6756f84f5aa 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -29,6 +29,8 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.function.BooleanSupplier; +import java.util.function.Consumer; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.hadoop.io.DataInputBuffer; @@ -151,8 +153,7 @@ interface FateTxStore extends ReadOnlyFateTxStore { * longer interact with it. * * @param deferTime time to keep this transaction from being returned by - * {@link #runnable(java.util.concurrent.atomic.AtomicBoolean, java.util.function.Consumer)}. - * Must be non-negative. + * {@link #runnable(BooleanSupplier, Consumer)}. Must be non-negative. */ void unreserve(Duration deferTime); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java index 263a9b090b9..451823ac700 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.stream.Stream; @@ -163,7 +163,7 @@ interface FateIdStatus { * is found or until the keepWaiting parameter is false. It will return once all runnable ids * found were passed to the consumer. */ - void runnable(AtomicBoolean keepWaiting, Consumer idConsumer); + void runnable(BooleanSupplier keepWaiting, Consumer idConsumer); /** * Returns true if the deferred map was cleared and if deferred executions are currently disabled diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index 331401fc6bb..8c7a956c053 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -24,7 +24,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; @@ -140,7 +140,7 @@ public Stream list(FateKey.FateKeyType type) { } @Override - public void runnable(AtomicBoolean keepWaiting, Consumer idConsumer) { + public void runnable(BooleanSupplier keepWaiting, Consumer idConsumer) { store.runnable(keepWaiting, idConsumer); } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index e4d057fc108..86a4106c614 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -30,7 +30,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.stream.Stream; @@ -273,7 +273,7 @@ public Stream list(FateKey.FateKeyType type) { } @Override - public void runnable(AtomicBoolean keepWaiting, Consumer idConsumer) { + public void runnable(BooleanSupplier keepWaiting, Consumer idConsumer) { throw new UnsupportedOperationException(); } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreITBase.java b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreITBase.java index 2198b737675..9fd3ba769d9 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreITBase.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreITBase.java @@ -199,7 +199,7 @@ protected void testDeferredOverflow(FateStore store, ServerContext sctx try { // Run and verify all 10 transactions still exist and were not // run because of the deferral time of all the transactions - future = executor.submit(() -> store.runnable(keepRunning, + future = executor.submit(() -> store.runnable(keepRunning::get, fateIdStatus -> transactions.remove(fateIdStatus.getFateId()))); Thread.sleep(2000); assertEquals(10, transactions.size()); @@ -225,7 +225,7 @@ protected void testDeferredOverflow(FateStore store, ServerContext sctx // Run and verify all 11 transactions were processed // and removed from the store keepRunning.set(true); - future = executor.submit(() -> store.runnable(keepRunning, + future = executor.submit(() -> store.runnable(keepRunning::get, fateIdStatus -> transactions.remove(fateIdStatus.getFateId()))); Wait.waitFor(transactions::isEmpty); // Setting this flag to false should terminate the task if sleeping