Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
a18ed15
WIUP
keith-turner Jan 16, 2026
c99e61b
WIP
keith-turner Jan 26, 2026
21f9e97
WIP
keith-turner Jan 26, 2026
fbc213c
WIP
keith-turner Jan 27, 2026
08f221e
WIP
keith-turner Jan 29, 2026
b58def9
WIP
keith-turner Jan 29, 2026
9db3b9d
Merge branch 'main' into dist-fate
keith-turner Feb 5, 2026
af4a3cc
WIP
keith-turner Feb 5, 2026
1bc611e
WIP
keith-turner Feb 5, 2026
9e78b23
Refactor manager split code used by fate
keith-turner Feb 5, 2026
e4c6581
Merge remote-tracking branch 'upstream/main' into dist-fate
keith-turner Feb 6, 2026
d7e16c6
WIP
keith-turner Feb 6, 2026
2089906
Merge branch 'split-cache' into dist-fate
keith-turner Feb 6, 2026
e1597f7
WIP
keith-turner Feb 6, 2026
2b11a54
WIP
keith-turner Feb 6, 2026
10dcee9
WIP
keith-turner Feb 6, 2026
25cfe5b
WIP
keith-turner Feb 6, 2026
a09414a
Merge remote-tracking branch 'upstream/main' into dist-fate
keith-turner Feb 11, 2026
905a94d
WIP
keith-turner Feb 11, 2026
1952a47
WIP
keith-turner Feb 11, 2026
3155512
Merge branch 'main' into dist-fate
keith-turner Feb 12, 2026
765d1e1
WIP
keith-turner Feb 12, 2026
0dd1470
WIP
keith-turner Feb 12, 2026
68fefef
WIP
keith-turner Feb 13, 2026
e364518
Merge branch 'main' into dist-fate
keith-turner Feb 17, 2026
a92b73c
WIP
keith-turner Feb 17, 2026
ad14d56
WIP
keith-turner Feb 18, 2026
c541ec2
WIP
keith-turner Feb 18, 2026
f06b98d
WIP
keith-turner Feb 18, 2026
5f0a04a
WIP
keith-turner Feb 18, 2026
deab135
WIP
keith-turner Feb 18, 2026
5141239
WIP
keith-turner Feb 18, 2026
903eb47
WIP
keith-turner Feb 18, 2026
3f14811
WIP
keith-turner Feb 18, 2026
4a1a090
WIP
keith-turner Feb 18, 2026
41ee0ad
Merge branch 'main' into dist-fate
keith-turner Feb 18, 2026
634ba3f
WIP
keith-turner Feb 18, 2026
e5ef7e6
WIP
keith-turner Feb 19, 2026
05cd769
WIP
keith-turner Feb 19, 2026
aa5a1ec
WIP
keith-turner Feb 19, 2026
88c13a2
Merge remote-tracking branch 'upstream/main' into dist-fate
keith-turner Feb 19, 2026
566bce8
WIP
keith-turner Feb 19, 2026
8a64f5b
WIP
keith-turner Feb 19, 2026
812741b
WIP
keith-turner Feb 19, 2026
fa05f76
Merge remote-tracking branch 'upstream/main' into dist-fate
keith-turner Feb 19, 2026
1c82a68
WIP
keith-turner Feb 19, 2026
28ca16a
WIP
keith-turner Feb 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/accumulo/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class Constants {

public static final String ZMANAGERS = "/managers";
public static final String ZMANAGER_LOCK = ZMANAGERS + "/lock";
public static final String ZMANAGER_ASSISTANT_LOCK = ZMANAGERS + "/assistants";
public static final String ZMANAGER_GOAL_STATE = ZMANAGERS + "/goal_state";
public static final String ZMANAGER_TICK = ZMANAGERS + "/tick";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class ServerId implements Comparable<ServerId> {
* @since 4.0.0
*/
public enum Type {
MANAGER, MONITOR, GARBAGE_COLLECTOR, COMPACTOR, SCAN_SERVER, TABLET_SERVER;
MANAGER, MANAGER_ASSISTANT, MONITOR, GARBAGE_COLLECTOR, COMPACTOR, SCAN_SERVER, TABLET_SERVER;
}

private final Type type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ private static Set<String> createPersistentWatcherPaths() {
Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, Constants.ZMONITOR_LOCK,
Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, Constants.ZTABLES,
Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET, Constants.ZTEST_LOCK,
Constants.ZRESOURCEGROUPS)) {
Constants.ZMANAGER_ASSISTANT_LOCK, Constants.ZRESOURCEGROUPS)) {
pathsToWatch.add(path);
}
return pathsToWatch;
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,17 @@ was changed and it now can accept multiple class names. The metrics spi was intr
"Properties in this category affect the behavior of the manager server.", "2.1.0"),
MANAGER_CLIENTPORT("manager.port.client", "9999", PropertyType.PORT,
"The port used for handling client connections on the manager.", "1.3.5"),
MANAGER_ASSISTANT_PORT("manager.assistant.port", "10000", PropertyType.PORT,
"The port used by the primary manager to assign task to all manager processes.", "4.0.0"),
MANAGER_ASSISTANT_PORTSEARCH("manager.assistant.port.search", "true", PropertyType.BOOLEAN,
"if the manager.assistant.port ports are in use, search higher ports until one is available.",
"4.0.0"),
MANAGER_ASSISTANT_MINTHREADS("manager.assistant.server.threads.minimum", "20", PropertyType.COUNT,
"The minimum number of threads to use to handle incoming requests.", "4.0.0"),
MANAGER_ASSISTANT_MINTHREADS_TIMEOUT("manager.assistant.server.threads.timeout", "0s",
PropertyType.TIMEDURATION,
"The time after which incoming request threads terminate with no work available. Zero (0) will keep the threads alive indefinitely.",
"4.0.0"),
MANAGER_TABLET_BALANCER("manager.tablet.balancer",
"org.apache.accumulo.core.spi.balancer.TableLoadBalancer", PropertyType.CLASSNAME,
"The balancer class that accumulo will use to make tablet assignment and "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -161,16 +162,21 @@ public FateTxStore<T> reserve(FateId fateId) {
EnumSet.of(TStatus.SUBMITTED, TStatus.FAILED_IN_PROGRESS);

@Override
public void runnable(AtomicBoolean keepWaiting, Consumer<FateIdStatus> idConsumer) {
public void runnable(Set<FatePartition> partitions, BooleanSupplier keepWaiting,
Consumer<FateIdStatus> idConsumer) {

if (partitions.isEmpty()) {
return;
}

AtomicLong seen = new AtomicLong(0);

while (keepWaiting.get() && seen.get() == 0) {
while (keepWaiting.getAsBoolean() && seen.get() == 0) {
final long beforeCount = unreservedRunnableCount.getCount();
final boolean beforeDeferredOverflow = deferredOverflow.get();

try (Stream<FateIdStatus> inProgress = getTransactions(IN_PROGRESS_SET);
Stream<FateIdStatus> other = getTransactions(OTHER_RUNNABLE_SET)) {
try (Stream<FateIdStatus> inProgress = getTransactions(partitions, IN_PROGRESS_SET);
Stream<FateIdStatus> other = getTransactions(partitions, OTHER_RUNNABLE_SET)) {
// read the in progress transaction first and then everything else in order to process those
// first
var transactions = Stream.concat(inProgress, other);
Expand Down Expand Up @@ -199,6 +205,8 @@ public void runnable(AtomicBoolean keepWaiting, Consumer<FateIdStatus> idConsume
if (beforeCount == unreservedRunnableCount.getCount()) {
long waitTime = 5000;
synchronized (deferred) {
deferred.keySet().removeIf(
fateId -> partitions.stream().noneMatch(partition -> partition.contains(fateId)));
if (!deferred.isEmpty()) {
waitTime = deferred.values().stream()
.mapToLong(countDownTimer -> countDownTimer.timeLeft(TimeUnit.MILLISECONDS)).min()
Expand All @@ -207,8 +215,7 @@ public void runnable(AtomicBoolean keepWaiting, Consumer<FateIdStatus> idConsume
}

if (waitTime > 0) {
unreservedRunnableCount.waitFor(count -> count != beforeCount, waitTime,
keepWaiting::get);
unreservedRunnableCount.waitFor(count -> count != beforeCount, waitTime, keepWaiting);
}
}
}
Expand Down Expand Up @@ -240,9 +247,11 @@ public ReadOnlyFateTxStore<T> read(FateId fateId) {
}

@Override
public Map<FateId,FateReservation> getActiveReservations() {
return list().filter(entry -> entry.getFateReservation().isPresent()).collect(Collectors
.toMap(FateIdStatus::getFateId, entry -> entry.getFateReservation().orElseThrow()));
public Map<FateId,FateReservation> getActiveReservations(Set<FatePartition> partitions) {
try (var stream = getTransactions(partitions, EnumSet.allOf(TStatus.class))) {
return stream.filter(entry -> entry.getFateReservation().isPresent()).collect(Collectors
.toMap(FateIdStatus::getFateId, entry -> entry.getFateReservation().orElseThrow()));
}
}

protected boolean isRunnable(TStatus status) {
Expand Down Expand Up @@ -289,6 +298,9 @@ protected void verifyLock(ZooUtil.LockID lockID, FateId fateId) {

protected abstract Stream<FateIdStatus> getTransactions(EnumSet<TStatus> statuses);

protected abstract Stream<FateIdStatus> getTransactions(Set<FatePartition> partitions,
EnumSet<TStatus> statuses);

protected abstract TStatus _getStatus(FateId fateId);

protected abstract Optional<FateKey> getKey(FateId fateId);
Expand Down Expand Up @@ -418,7 +430,7 @@ public interface FateIdGenerator {
FateId newRandomId(FateInstanceType instanceType);
}

protected void seededTx() {
public void seeded() {
unreservedRunnableCount.increment();
}

Expand Down
Loading
Loading