Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/accumulo/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class Constants {

public static final String ZMANAGERS = "/managers";
public static final String ZMANAGER_LOCK = ZMANAGERS + "/lock";
public static final String ZMANAGER_ASSISTANT_LOCK = ZMANAGERS + "/assistants";
public static final String ZMANAGER_GOAL_STATE = ZMANAGERS + "/goal_state";
public static final String ZMANAGER_TICK = ZMANAGERS + "/tick";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class ServerId implements Comparable<ServerId> {
* @since 4.0.0
*/
public enum Type {
MANAGER, MONITOR, GARBAGE_COLLECTOR, COMPACTOR, SCAN_SERVER, TABLET_SERVER;
MANAGER, MANAGER_ASSISTANT, MONITOR, GARBAGE_COLLECTOR, COMPACTOR, SCAN_SERVER, TABLET_SERVER;
}

private final Type type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ private static Set<String> createPersistentWatcherPaths() {
Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, Constants.ZMONITOR_LOCK,
Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, Constants.ZTABLES,
Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET, Constants.ZTEST_LOCK,
Constants.ZRESOURCEGROUPS)) {
Constants.ZMANAGER_ASSISTANT_LOCK, Constants.ZRESOURCEGROUPS)) {
pathsToWatch.add(path);
}
return pathsToWatch;
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,17 @@ was changed and it now can accept multiple class names. The metrics spi was intr
"Properties in this category affect the behavior of the manager server.", "2.1.0"),
MANAGER_CLIENTPORT("manager.port.client", "9999", PropertyType.PORT,
"The port used for handling client connections on the manager.", "1.3.5"),
MANAGER_ASSISTANT_PORT("manager.assistant.port", "10000", PropertyType.PORT,
"The port used by the primary manager to assign task to all manager processes.", "4.0.0"),
MANAGER_ASSISTANT_PORTSEARCH("manager.assistant.port.search", "true", PropertyType.BOOLEAN,
"if the manager.assistant.port ports are in use, search higher ports until one is available.",
"4.0.0"),
MANAGER_ASSISTANT_MINTHREADS("manager.assistant.server.threads.minimum", "20", PropertyType.COUNT,
"The minimum number of threads to use to handle incoming requests.", "4.0.0"),
MANAGER_ASSISTANT_MINTHREADS_TIMEOUT("manager.assistant.server.threads.timeout", "0s",
PropertyType.TIMEDURATION,
"The time after which incoming request threads terminate with no work available. Zero (0) will keep the threads alive indefinitely.",
"4.0.0"),
MANAGER_TABLET_BALANCER("manager.tablet.balancer",
"org.apache.accumulo.core.spi.balancer.TableLoadBalancer", PropertyType.CLASSNAME,
"The balancer class that accumulo will use to make tablet assignment and "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,21 @@ public FateTxStore<T> reserve(FateId fateId) {
EnumSet.of(TStatus.SUBMITTED, TStatus.FAILED_IN_PROGRESS);

@Override
public void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus> idConsumer) {
public void runnable(Set<FatePartition> partitions, BooleanSupplier keepWaiting,
Consumer<FateIdStatus> idConsumer) {

if (partitions.isEmpty()) {
return;
}

AtomicLong seen = new AtomicLong(0);

while (keepWaiting.getAsBoolean() && seen.get() == 0) {
final long beforeCount = unreservedRunnableCount.getCount();
final boolean beforeDeferredOverflow = deferredOverflow.get();

try (Stream<FateIdStatus> inProgress = getTransactions(IN_PROGRESS_SET);
Stream<FateIdStatus> other = getTransactions(OTHER_RUNNABLE_SET)) {
try (Stream<FateIdStatus> inProgress = getTransactions(partitions, IN_PROGRESS_SET);
Stream<FateIdStatus> other = getTransactions(partitions, OTHER_RUNNABLE_SET)) {
// read the in progress transaction first and then everything else in order to process those
// first
var transactions = Stream.concat(inProgress, other);
Expand Down Expand Up @@ -200,6 +205,8 @@ public void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus> idConsu
if (beforeCount == unreservedRunnableCount.getCount()) {
long waitTime = 5000;
synchronized (deferred) {
deferred.keySet().removeIf(
fateId -> partitions.stream().noneMatch(partition -> partition.contains(fateId)));
if (!deferred.isEmpty()) {
waitTime = deferred.values().stream()
.mapToLong(countDownTimer -> countDownTimer.timeLeft(TimeUnit.MILLISECONDS)).min()
Expand Down Expand Up @@ -240,9 +247,11 @@ public ReadOnlyFateTxStore<T> read(FateId fateId) {
}

@Override
public Map<FateId,FateReservation> getActiveReservations() {
return list().filter(entry -> entry.getFateReservation().isPresent()).collect(Collectors
.toMap(FateIdStatus::getFateId, entry -> entry.getFateReservation().orElseThrow()));
public Map<FateId,FateReservation> getActiveReservations(Set<FatePartition> partitions) {
try (var stream = getTransactions(partitions, EnumSet.allOf(TStatus.class))) {
return stream.filter(entry -> entry.getFateReservation().isPresent()).collect(Collectors
.toMap(FateIdStatus::getFateId, entry -> entry.getFateReservation().orElseThrow()));
}
}

protected boolean isRunnable(TStatus status) {
Expand Down Expand Up @@ -289,6 +298,9 @@ protected void verifyLock(ZooUtil.LockID lockID, FateId fateId) {

protected abstract Stream<FateIdStatus> getTransactions(EnumSet<TStatus> statuses);

protected abstract Stream<FateIdStatus> getTransactions(Set<FatePartition> partitions,
EnumSet<TStatus> statuses);

protected abstract TStatus _getStatus(FateId fateId);

protected abstract Optional<FateKey> getKey(FateId fateId);
Expand Down Expand Up @@ -418,7 +430,8 @@ public interface FateIdGenerator {
FateId newRandomId(FateInstanceType instanceType);
}

protected void seededTx() {
@Override
public void seeded() {
unreservedRunnableCount.increment();
}

Expand Down
48 changes: 45 additions & 3 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
Expand All @@ -51,6 +52,8 @@
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.gson.JsonParser;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand All @@ -76,6 +79,7 @@ public class Fate<T> extends FateClient<T> {
private final AtomicBoolean keepRunning = new AtomicBoolean(true);
// Visible for FlakyFate test object
protected final Set<FateExecutor<T>> fateExecutors = new HashSet<>();
private Set<FatePartition> currentPartitions = Set.of();

public enum TxInfo {
FATE_OP, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE
Expand Down Expand Up @@ -208,8 +212,10 @@ public void run() {
fe -> fe.getFateOps().equals(fateOps) && fe.getName().equals(fateExecutorName))) {
log.debug("[{}] Adding FateExecutor for {} with {} threads", store.type(), fateOps,
poolSize);
fateExecutors.add(
new FateExecutor<>(Fate.this, environment, fateOps, poolSize, fateExecutorName));
var fateExecutor =
new FateExecutor<>(Fate.this, environment, fateOps, poolSize, fateExecutorName);
fateExecutors.add(fateExecutor);
fateExecutor.setPartitions(currentPartitions);
}
}
}
Expand All @@ -233,7 +239,11 @@ private class DeadReservationCleaner implements Runnable {
@Override
public void run() {
if (keepRunning.get()) {
store.deleteDeadReservations();
Set<FatePartition> partitions;
synchronized (fateExecutors) {
partitions = currentPartitions;
}
store.deleteDeadReservations(partitions);
}
}
}
Expand Down Expand Up @@ -369,6 +379,17 @@ public AtomicInteger getNeedMoreThreadsWarnCount() {
return needMoreThreadsWarnCount;
}

public void seeded(Set<FatePartition> partitions) {
synchronized (fateExecutors) {
if (Sets.intersection(currentPartitions, partitions).isEmpty()) {
return;
}
}

log.trace("Notified of seeding for {}", partitions);
store.seeded();
}

/**
* Initiates shutdown of background threads that run fate operations and cleanup fate data and
* optionally waits on them. Leaves the fate object in a state where it can still update and read
Expand Down Expand Up @@ -432,6 +453,27 @@ public void close() {
store.close();
}

public Set<FatePartition> getPartitions() {
synchronized (fateExecutors) {
return currentPartitions;
}
}

public Set<FatePartition> setPartitions(Set<FatePartition> partitions) {
Objects.requireNonNull(partitions);
Preconditions.checkArgument(
partitions.stream().allMatch(
fp -> fp.start().getType() == store.type() && fp.end().getType() == store.type()),
"type mismatch type:%s partitions:%s", store.type(), partitions);

synchronized (fateExecutors) {
var old = currentPartitions;
currentPartitions = Set.copyOf(partitions);
fateExecutors.forEach(fe -> fe.setPartitions(currentPartitions));
return old;
}
}

private boolean anyFateExecutorIsAlive() {
synchronized (fateExecutors) {
return fateExecutors.stream().anyMatch(FateExecutor::isAlive);
Expand Down
29 changes: 28 additions & 1 deletion core/src/main/java/org/apache/accumulo/core/fate/FateClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import java.time.Duration;
import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;

Expand All @@ -46,8 +49,11 @@ public class FateClient<T> {
private static final EnumSet<ReadOnlyFateStore.TStatus> FINISHED_STATES =
EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN);

private AtomicReference<Consumer<FateId>> seedingConsumer = new AtomicReference<>(fid -> {});

public FateClient(FateStore<T> store, Function<Repo<T>,String> toLogStrFunc) {
this.store = FateLogger.wrap(store, toLogStrFunc, false);
;
}

// get a transaction id back to the requester before doing any work
Expand All @@ -56,7 +62,23 @@ public FateId startTransaction() {
}

public FateStore.Seeder<T> beginSeeding() {
return store.beginSeeding();
var seeder = store.beginSeeding();
return new FateStore.Seeder<T>() {
@Override
public CompletableFuture<Optional<FateId>> attemptToSeedTransaction(Fate.FateOperation fateOp,
FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
var cfuture = seeder.attemptToSeedTransaction(fateOp, fateKey, repo, autoCleanUp);
return cfuture.thenApply(optional -> {
optional.ifPresent(seedingConsumer.get());
return optional;
});
}

@Override
public void close() {
seeder.close();
}
};
}

public void seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo,
Expand All @@ -73,6 +95,7 @@ public void seedTransaction(Fate.FateOperation fateOp, FateId fateId, Repo<T> re
boolean autoCleanUp, String goalMessage) {
Fate.log.info("[{}] Seeding {} {} {}", store.type(), fateOp, fateId, goalMessage);
store.seedTransaction(fateOp, fateId, repo, autoCleanUp);
seedingConsumer.get().accept(fateId);
}

// check on the transaction
Expand Down Expand Up @@ -176,4 +199,8 @@ public Exception getException(FateId fateId) {
public Stream<FateKey> list(FateKey.FateKeyType type) {
return store.list(type);
}

public void setSeedingConsumer(Consumer<FateId> seedingConsumer) {
this.seedingConsumer.set(seedingConsumer);
}
}
18 changes: 16 additions & 2 deletions core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -43,6 +44,8 @@
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;

import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.conf.Property;
Expand Down Expand Up @@ -81,6 +84,7 @@ public class FateExecutor<T> {
private final Set<Fate.FateOperation> fateOps;
private final ConcurrentLinkedQueue<Integer> idleCountHistory = new ConcurrentLinkedQueue<>();
private final FateExecutorMetrics<T> fateExecutorMetrics;
private final AtomicReference<Set<FatePartition>> partitions = new AtomicReference<>(Set.of());

public FateExecutor(Fate<T> fate, T environment, Set<Fate.FateOperation> fateOps, int poolSize,
String name) {
Expand Down Expand Up @@ -298,6 +302,11 @@ protected ConcurrentLinkedQueue<Integer> getIdleCountHistory() {
return idleCountHistory;
}

public void setPartitions(Set<FatePartition> partitions) {
Objects.requireNonNull(partitions);
this.partitions.set(Set.copyOf(partitions));
}

/**
* A single thread that finds transactions to work on and queues them up. Do not want each worker
* thread going to the store and looking for work as it would place more load on the store.
Expand All @@ -308,7 +317,12 @@ private class WorkFinder implements Runnable {
public void run() {
while (fate.getKeepRunning().get() && !isShutdown()) {
try {
fate.getStore().runnable(() -> fate.getKeepRunning().get(), fateIdStatus -> {
var localPartitions = partitions.get();
// if the set of partitions changes, we should stop looking for work w/ the old set of
// partitions
BooleanSupplier keepRunning =
() -> fate.getKeepRunning().get() && localPartitions == partitions.get();
fate.getStore().runnable(localPartitions, keepRunning, fateIdStatus -> {
// The FateId with the fate operation 'fateOp' is workable by this FateExecutor if
// 1) This FateExecutor is assigned to work on 'fateOp' ('fateOp' is in 'fateOps')
// 2) The transaction was cancelled while NEW. This is an edge case that needs to be
Expand All @@ -319,7 +333,7 @@ public void run() {
var fateOp = fateIdStatus.getFateOperation().orElse(null);
if ((fateOp != null && fateOps.contains(fateOp))
|| txCancelledWhileNew(status, fateOp)) {
while (fate.getKeepRunning().get() && !isShutdown()) {
while (keepRunning.getAsBoolean() && !isShutdown()) {
try {
// The reason for calling transfer instead of queueing is avoid rescanning the
// storage layer and adding the same thing over and over. For example if all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.accumulo.core.metrics.Metric;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;

public class FateExecutorMetrics<T> implements MetricsProducer {
public class FateExecutorMetrics<T> {
private static final Logger log = LoggerFactory.getLogger(FateExecutorMetrics.class);
private final FateInstanceType type;
private final String poolName;
Expand All @@ -49,7 +48,6 @@ protected FateExecutorMetrics(FateInstanceType type, String poolName,
this.idleWorkerCount = idleWorkerCount;
}

@Override
public void registerMetrics(MeterRegistry registry) {
// noop if already registered or cleared
if (state == State.UNREGISTERED) {
Expand Down
Loading