Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -161,11 +162,11 @@ public FateTxStore<T> reserve(FateId fateId) {
EnumSet.of(TStatus.SUBMITTED, TStatus.FAILED_IN_PROGRESS);

@Override
public void runnable(AtomicBoolean keepWaiting, Consumer<FateIdStatus> idConsumer) {
public void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus> idConsumer) {

AtomicLong seen = new AtomicLong(0);

while (keepWaiting.get() && seen.get() == 0) {
while (keepWaiting.getAsBoolean() && seen.get() == 0) {
final long beforeCount = unreservedRunnableCount.getCount();
final boolean beforeDeferredOverflow = deferredOverflow.get();

Expand Down Expand Up @@ -207,8 +208,7 @@ public void runnable(AtomicBoolean keepWaiting, Consumer<FateIdStatus> idConsume
}

if (waitTime > 0) {
unreservedRunnableCount.waitFor(count -> count != beforeCount, waitTime,
keepWaiting::get);
unreservedRunnableCount.waitFor(count -> count != beforeCount, waitTime, keepWaiting);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private class WorkFinder implements Runnable {
public void run() {
while (fate.getKeepRunning().get() && !isShutdown()) {
try {
fate.getStore().runnable(fate.getKeepRunning(), fateIdStatus -> {
fate.getStore().runnable(() -> fate.getKeepRunning().get(), fateIdStatus -> {
// The FateId with the fate operation 'fateOp' is workable by this FateExecutor if
// 1) This FateExecutor is assigned to work on 'fateOp' ('fateOp' is in 'fateOps')
// 2) The transaction was cancelled while NEW. This is an edge case that needs to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;

import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.hadoop.io.DataInputBuffer;
Expand Down Expand Up @@ -151,8 +153,7 @@ interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {
* longer interact with it.
*
* @param deferTime time to keep this transaction from being returned by
* {@link #runnable(java.util.concurrent.atomic.AtomicBoolean, java.util.function.Consumer)}.
* Must be non-negative.
* {@link #runnable(BooleanSupplier, Consumer)}. Must be non-negative.
*/
void unreserve(Duration deferTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.stream.Stream;

Expand Down Expand Up @@ -163,7 +163,7 @@ interface FateIdStatus {
* is found or until the keepWaiting parameter is false. It will return once all runnable ids
* found were passed to the consumer.
*/
void runnable(AtomicBoolean keepWaiting, Consumer<FateIdStatus> idConsumer);
void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus> idConsumer);

/**
* Returns true if the deferred map was cleared and if deferred executions are currently disabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
Expand Down Expand Up @@ -140,7 +140,7 @@ public Stream<FateKey> list(FateKey.FateKeyType type) {
}

@Override
public void runnable(AtomicBoolean keepWaiting, Consumer<FateIdStatus> idConsumer) {
public void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus> idConsumer) {
store.runnable(keepWaiting, idConsumer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.stream.Stream;

Expand Down Expand Up @@ -273,7 +273,7 @@ public Stream<FateKey> list(FateKey.FateKeyType type) {
}

@Override
public void runnable(AtomicBoolean keepWaiting, Consumer<FateIdStatus> idConsumer) {
public void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus> idConsumer) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ protected void testDeferredOverflow(FateStore<TestEnv> store, ServerContext sctx
try {
// Run and verify all 10 transactions still exist and were not
// run because of the deferral time of all the transactions
future = executor.submit(() -> store.runnable(keepRunning,
future = executor.submit(() -> store.runnable(keepRunning::get,
fateIdStatus -> transactions.remove(fateIdStatus.getFateId())));
Thread.sleep(2000);
assertEquals(10, transactions.size());
Expand All @@ -225,7 +225,7 @@ protected void testDeferredOverflow(FateStore<TestEnv> store, ServerContext sctx
// Run and verify all 11 transactions were processed
// and removed from the store
keepRunning.set(true);
future = executor.submit(() -> store.runnable(keepRunning,
future = executor.submit(() -> store.runnable(keepRunning::get,
fateIdStatus -> transactions.remove(fateIdStatus.getFateId())));
Wait.waitFor(transactions::isEmpty);
// Setting this flag to false should terminate the task if sleeping
Expand Down