From a065652993eea26b26e90c7a69e8d8862e6b13f7 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 20 Feb 2026 19:11:53 +0000 Subject: [PATCH] Breaks Fate into Fate and FateClient --- .../org/apache/accumulo/core/fate/Fate.java | 146 +------------- .../apache/accumulo/core/fate/FateClient.java | 179 ++++++++++++++++++ .../accumulo/manager/FateServiceHandler.java | 50 ++--- .../org/apache/accumulo/manager/Manager.java | 36 ++-- .../coordinator/CompactionCoordinator.java | 24 +-- .../coordinator/DeadCompactionDetector.java | 20 +- .../manager/merge/FindMergeableRangeTask.java | 2 +- .../accumulo/manager/split/Splitter.java | 2 +- .../compaction/CompactionCoordinatorTest.java | 9 +- 9 files changed, 249 insertions(+), 219 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/fate/FateClient.java diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index eebe1147853..58bdc5cef66 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -19,12 +19,6 @@ package org.apache.accumulo.core.fate; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED; -import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS; -import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW; -import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED; -import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL; -import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.META_DEAD_RESERVATION_CLEANER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.USER_DEAD_RESERVATION_CLEANER_POOL; @@ -36,7 +30,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ExecutorService; @@ -48,18 +41,12 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.fate.FateStore.FateTxStore; -import org.apache.accumulo.core.fate.FateStore.Seeder; -import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.logging.FateLogger; import org.apache.accumulo.core.manager.thrift.TFateOperation; -import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.threads.ThreadPools; -import org.apache.thrift.TApplicationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,16 +60,15 @@ */ @SuppressFBWarnings(value = "CT_CONSTRUCTOR_THROW", justification = "Constructor validation is required for proper initialization") -public class Fate { +public class Fate extends FateClient { - private static final Logger log = LoggerFactory.getLogger(Fate.class); + static final Logger log = LoggerFactory.getLogger(Fate.class); private final FateStore store; private final ScheduledFuture fatePoolsWatcherFuture; private final AtomicInteger needMoreThreadsWarnCount = new AtomicInteger(0); private final ExecutorService deadResCleanerExecutor; - private static final EnumSet FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN); public static final Duration INITIAL_DELAY = Duration.ofSeconds(3); private static final Duration DEAD_RES_CLEANUP_DELAY = Duration.ofMinutes(3); public static final Duration POOL_WATCHER_DELAY = Duration.ofSeconds(30); @@ -262,6 +248,7 @@ public void run() { public Fate(T environment, FateStore store, boolean runDeadResCleaner, Function,String> toLogStrFunc, AccumuloConfiguration conf, ScheduledThreadPoolExecutor genSchedExecutor) { + super(store, toLogStrFunc); this.store = FateLogger.wrap(store, toLogStrFunc, false); fatePoolsWatcherFuture = @@ -382,133 +369,6 @@ public AtomicInteger getNeedMoreThreadsWarnCount() { return needMoreThreadsWarnCount; } - // get a transaction id back to the requester before doing any work - public FateId startTransaction() { - return store.create(); - } - - public Seeder beginSeeding() { - return store.beginSeeding(); - } - - public void seedTransaction(FateOperation fateOp, FateKey fateKey, Repo repo, - boolean autoCleanUp) { - try (var seeder = store.beginSeeding()) { - @SuppressWarnings("unused") - var unused = seeder.attemptToSeedTransaction(fateOp, fateKey, repo, autoCleanUp); - } - } - - // start work in the transaction.. it is safe to call this - // multiple times for a transaction... but it will only seed once - public void seedTransaction(FateOperation fateOp, FateId fateId, Repo repo, - boolean autoCleanUp, String goalMessage) { - log.info("[{}] Seeding {} {} {}", store.type(), fateOp, fateId, goalMessage); - store.seedTransaction(fateOp, fateId, repo, autoCleanUp); - } - - // check on the transaction - public TStatus waitForCompletion(FateId fateId) { - return store.read(fateId).waitForStatusChange(FINISHED_STATES); - } - - /** - * Attempts to cancel a running Fate transaction - * - * @param fateId fate transaction id - * @return true if transaction transitioned to a failed state or already in a completed state, - * false otherwise - */ - public boolean cancel(FateId fateId) { - for (int retries = 0; retries < 5; retries++) { - Optional> optionalTxStore = store.tryReserve(fateId); - if (optionalTxStore.isPresent()) { - var txStore = optionalTxStore.orElseThrow(); - try { - TStatus status = txStore.getStatus(); - log.info("[{}] status is: {}", store.type(), status); - if (status == NEW || status == SUBMITTED) { - txStore.setTransactionInfo(TxInfo.EXCEPTION, new TApplicationException( - TApplicationException.INTERNAL_ERROR, "Fate transaction cancelled by user")); - txStore.setStatus(FAILED_IN_PROGRESS); - log.info( - "[{}] Updated status for {} to FAILED_IN_PROGRESS because it was cancelled by user", - store.type(), fateId); - return true; - } else { - log.info("[{}] {} cancelled by user but already in progress or finished state", - store.type(), fateId); - return false; - } - } finally { - txStore.unreserve(Duration.ZERO); - } - } else { - // reserved, lets retry. - UtilWaitThread.sleep(500); - } - } - log.info("[{}] Unable to reserve transaction {} to cancel it", store.type(), fateId); - return false; - } - - // resource cleanup - public void delete(FateId fateId) { - FateTxStore txStore = store.reserve(fateId); - try { - switch (txStore.getStatus()) { - case NEW: - case SUBMITTED: - case FAILED: - case SUCCESSFUL: - txStore.delete(); - break; - case FAILED_IN_PROGRESS: - case IN_PROGRESS: - throw new IllegalStateException("Can not delete in progress transaction " + fateId); - case UNKNOWN: - // nothing to do, it does not exist - break; - } - } finally { - txStore.unreserve(Duration.ZERO); - } - } - - public String getReturn(FateId fateId) { - FateTxStore txStore = store.reserve(fateId); - try { - if (txStore.getStatus() != SUCCESSFUL) { - throw new IllegalStateException( - "Tried to get exception when transaction " + fateId + " not in successful state"); - } - return (String) txStore.getTransactionInfo(TxInfo.RETURN_VALUE); - } finally { - txStore.unreserve(Duration.ZERO); - } - } - - // get reportable failures - public Exception getException(FateId fateId) { - FateTxStore txStore = store.reserve(fateId); - try { - if (txStore.getStatus() != FAILED) { - throw new IllegalStateException( - "Tried to get exception when transaction " + fateId + " not in failed state"); - } - return (Exception) txStore.getTransactionInfo(TxInfo.EXCEPTION); - } finally { - txStore.unreserve(Duration.ZERO); - } - } - - /** - * Lists transctions for a given fate key type. - */ - public Stream list(FateKey.FateKeyType type) { - return store.list(type); - } - /** * Initiates shutdown of background threads that run fate operations and cleanup fate data and * optionally waits on them. Leaves the fate object in a state where it can still update and read diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateClient.java b/core/src/main/java/org/apache/accumulo/core/fate/FateClient.java new file mode 100644 index 00000000000..2dc472e4bd8 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateClient.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN; + +import java.time.Duration; +import java.util.EnumSet; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Stream; + +import org.apache.accumulo.core.logging.FateLogger; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.thrift.TApplicationException; + +/** + * Supports initiating and checking status of fate operations. + * + */ +public class FateClient { + + private final FateStore store; + + private static final EnumSet FINISHED_STATES = + EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN); + + public FateClient(FateStore store, Function,String> toLogStrFunc) { + this.store = FateLogger.wrap(store, toLogStrFunc, false); + } + + // get a transaction id back to the requester before doing any work + public FateId startTransaction() { + return store.create(); + } + + public FateStore.Seeder beginSeeding() { + return store.beginSeeding(); + } + + public void seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo repo, + boolean autoCleanUp) { + try (var seeder = store.beginSeeding()) { + @SuppressWarnings("unused") + var unused = seeder.attemptToSeedTransaction(fateOp, fateKey, repo, autoCleanUp); + } + } + + // start work in the transaction.. it is safe to call this + // multiple times for a transaction... but it will only seed once + public void seedTransaction(Fate.FateOperation fateOp, FateId fateId, Repo repo, + boolean autoCleanUp, String goalMessage) { + Fate.log.info("[{}] Seeding {} {} {}", store.type(), fateOp, fateId, goalMessage); + store.seedTransaction(fateOp, fateId, repo, autoCleanUp); + } + + // check on the transaction + public ReadOnlyFateStore.TStatus waitForCompletion(FateId fateId) { + return store.read(fateId).waitForStatusChange(FINISHED_STATES); + } + + /** + * Attempts to cancel a running Fate transaction + * + * @param fateId fate transaction id + * @return true if transaction transitioned to a failed state or already in a completed state, + * false otherwise + */ + public boolean cancel(FateId fateId) { + for (int retries = 0; retries < 5; retries++) { + Optional> optionalTxStore = store.tryReserve(fateId); + if (optionalTxStore.isPresent()) { + var txStore = optionalTxStore.orElseThrow(); + try { + ReadOnlyFateStore.TStatus status = txStore.getStatus(); + Fate.log.info("[{}] status is: {}", store.type(), status); + if (status == NEW || status == SUBMITTED) { + txStore.setTransactionInfo(Fate.TxInfo.EXCEPTION, new TApplicationException( + TApplicationException.INTERNAL_ERROR, "Fate transaction cancelled by user")); + txStore.setStatus(FAILED_IN_PROGRESS); + Fate.log.info( + "[{}] Updated status for {} to FAILED_IN_PROGRESS because it was cancelled by user", + store.type(), fateId); + return true; + } else { + Fate.log.info("[{}] {} cancelled by user but already in progress or finished state", + store.type(), fateId); + return false; + } + } finally { + txStore.unreserve(Duration.ZERO); + } + } else { + // reserved, lets retry. + UtilWaitThread.sleep(500); + } + } + Fate.log.info("[{}] Unable to reserve transaction {} to cancel it", store.type(), fateId); + return false; + } + + // resource cleanup + public void delete(FateId fateId) { + FateStore.FateTxStore txStore = store.reserve(fateId); + try { + switch (txStore.getStatus()) { + case NEW: + case SUBMITTED: + case FAILED: + case SUCCESSFUL: + txStore.delete(); + break; + case FAILED_IN_PROGRESS: + case IN_PROGRESS: + throw new IllegalStateException("Can not delete in progress transaction " + fateId); + case UNKNOWN: + // nothing to do, it does not exist + break; + } + } finally { + txStore.unreserve(Duration.ZERO); + } + } + + public String getReturn(FateId fateId) { + FateStore.FateTxStore txStore = store.reserve(fateId); + try { + if (txStore.getStatus() != SUCCESSFUL) { + throw new IllegalStateException( + "Tried to get exception when transaction " + fateId + " not in successful state"); + } + return (String) txStore.getTransactionInfo(Fate.TxInfo.RETURN_VALUE); + } finally { + txStore.unreserve(Duration.ZERO); + } + } + + // get reportable failures + public Exception getException(FateId fateId) { + FateStore.FateTxStore txStore = store.reserve(fateId); + try { + if (txStore.getStatus() != FAILED) { + throw new IllegalStateException( + "Tried to get exception when transaction " + fateId + " not in failed state"); + } + return (Exception) txStore.getTransactionInfo(Fate.TxInfo.EXCEPTION); + } finally { + txStore.unreserve(Duration.ZERO); + } + } + + /** + * Lists transctions for a given fate key type. + */ + public Stream list(FateKey.FateKeyType type) { + return store.list(type); + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java index 6c31e9174b7..615ea1a0d22 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java @@ -132,7 +132,7 @@ public TFateId beginFateOperation(TInfo tinfo, TCredentials credentials, TFateIn throws ThriftSecurityException { authenticate(credentials); return new TFateId(type, - manager.fate(FateInstanceType.fromThrift(type)).startTransaction().getTxUUIDStr()); + manager.fateClient(FateInstanceType.fromThrift(type)).startTransaction().getTxUUIDStr()); } @Override @@ -157,7 +157,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat } goalMessage += "Create " + namespace + " namespace."; - manager.fate(type).seedTransaction(op, fateId, + manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>(new CreateNamespace(c.getPrincipal(), namespace, options)), autoCleanup, goalMessage); break; @@ -176,7 +176,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat } goalMessage += "Rename " + oldName + " namespace to " + newName; - manager.fate(type).seedTransaction(op, fateId, + manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>(new RenameNamespace(namespaceId, oldName, newName)), autoCleanup, goalMessage); break; @@ -194,7 +194,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat } goalMessage += "Delete namespace Id: " + namespaceId; - manager.fate(type).seedTransaction(op, fateId, + manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>(new DeleteNamespace(namespaceId)), autoCleanup, goalMessage); break; } @@ -252,7 +252,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat goalMessage += "Create table " + tableName + " " + initialTableState + " with " + splitCount + " splits and initial tabletAvailability of " + initialTabletAvailability; - manager.fate(type).seedTransaction(op, fateId, + manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>(new CreateTable(c.getPrincipal(), tableName, timeType, options, splitsPath, splitCount, splitsDirsPath, initialTableState, // Set the default tablet to be auto-mergeable with other tablets if it is split @@ -288,7 +288,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat goalMessage += "Rename table " + oldTableName + "(" + tableId + ") to " + oldTableName; try { - manager.fate(type).seedTransaction(op, fateId, + manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>(new RenameTable(namespaceId, tableId, oldTableName, newTableName)), autoCleanup, goalMessage); } catch (NamespaceNotFoundException e) { @@ -370,7 +370,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat goalMessage += " and keep offline."; } - manager.fate(type).seedTransaction(op, fateId, + manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>(new CloneTable(c.getPrincipal(), srcNamespaceId, srcTableId, namespaceId, tableName, propertiesToSet, propertiesToExclude, keepOffline)), autoCleanup, goalMessage); @@ -400,7 +400,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat } goalMessage += "Delete table " + tableName + "(" + tableId + ")"; - manager.fate(type).seedTransaction(op, fateId, + manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>(new PreDeleteTable(namespaceId, tableId)), autoCleanup, goalMessage); break; } @@ -427,7 +427,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat goalMessage += "Online table " + tableId; final EnumSet expectedCurrStates = EnumSet.of(TableState.ONLINE, TableState.OFFLINE); - manager.fate(type).seedTransaction(op, fateId, + manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>( new ChangeTableState(namespaceId, tableId, tableOp, expectedCurrStates)), autoCleanup, goalMessage); @@ -456,7 +456,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat goalMessage += "Offline table " + tableId; final EnumSet expectedCurrStates = EnumSet.of(TableState.ONLINE, TableState.OFFLINE); - manager.fate(type).seedTransaction(op, fateId, + manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>( new ChangeTableState(namespaceId, tableId, tableOp, expectedCurrStates)), autoCleanup, goalMessage); @@ -492,7 +492,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat startRowStr, endRowStr); goalMessage += "Merge table " + tableName + "(" + tableId + ") splits from " + startRowStr + " to " + endRowStr; - manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>( + manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>( new TableRangeOp(MergeInfo.Operation.MERGE, namespaceId, tableId, startRow, endRow)), autoCleanup, goalMessage); break; @@ -524,7 +524,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat goalMessage += "Delete table " + tableName + "(" + tableId + ") range " + startRow + " to " + endRow; - manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>( + manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>( new TableRangeOp(MergeInfo.Operation.DELETE, namespaceId, tableId, startRow, endRow)), autoCleanup, goalMessage); break; @@ -550,7 +550,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat } goalMessage += "Compact table (" + tableId + ") with config " + compactionConfig; - manager.fate(type).seedTransaction(op, fateId, + manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>(new CompactRange(namespaceId, tableId, compactionConfig)), autoCleanup, goalMessage); break; @@ -574,7 +574,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat } goalMessage += "Cancel compaction of table (" + tableId + ")"; - manager.fate(type).seedTransaction(op, fateId, + manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>(new CancelCompactions(namespaceId, tableId)), autoCleanup, goalMessage); break; } @@ -609,7 +609,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat } goalMessage += "Import table with new name: " + tableName + " from " + exportDirs; - manager.fate(type) + manager.fateClient(type) .seedTransaction(op, fateId, new TraceRepo<>(new ImportTable(c.getPrincipal(), tableName, exportDirs, namespaceId, keepMappings, keepOffline)), autoCleanup, goalMessage); @@ -639,7 +639,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat } goalMessage += "Export table " + tableName + "(" + tableId + ") to " + exportDir; - manager.fate(type).seedTransaction(op, fateId, + manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>(new ExportTable(namespaceId, tableName, tableId, exportDir)), autoCleanup, goalMessage); break; @@ -676,7 +676,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat manager.updateBulkImportStatus(dir, BulkImportState.INITIAL); goalMessage += "Bulk import (v2) " + dir + " to " + tableName + "(" + tableId + ")"; - manager.fate(type).seedTransaction(op, fateId, + manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>(new ComputeBulkRange(tableId, dir, setTime)), autoCleanup, goalMessage); break; } @@ -720,7 +720,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat goalMessage += "Set availability for table: " + tableName + "(" + tableId + ") range: " + tRange + " to: " + tabletAvailability.name(); - manager.fate(type).seedTransaction(op, fateId, + manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>(new LockTable(tableId, namespaceId, tRange, tabletAvailability)), autoCleanup, goalMessage); break; @@ -794,8 +794,8 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat } goalMessage = "Splitting " + extent + " for user into " + (splits.size() + 1) + " tablets"; - manager.fate(type).seedTransaction(op, fateId, new PreSplit(extent, splits), autoCleanup, - goalMessage); + manager.fateClient(type).seedTransaction(op, fateId, new PreSplit(extent, splits), + autoCleanup, goalMessage); break; } default: @@ -847,9 +847,9 @@ public String waitForFateOperation(TInfo tinfo, TCredentials credentials, TFateI FateId fateId = FateId.fromThrift(opid); FateInstanceType type = fateId.getType(); - TStatus status = manager.fate(type).waitForCompletion(fateId); + TStatus status = manager.fateClient(type).waitForCompletion(fateId); if (status == TStatus.FAILED) { - Exception e = manager.fate(type).getException(fateId); + Exception e = manager.fateClient(type).getException(fateId); if (e instanceof ThriftTableOperationException) { throw (ThriftTableOperationException) e; } else if (e instanceof ThriftSecurityException) { @@ -861,7 +861,7 @@ public String waitForFateOperation(TInfo tinfo, TCredentials credentials, TFateI } } - String ret = manager.fate(type).getReturn(fateId); + String ret = manager.fateClient(type).getReturn(fateId); if (ret == null) { ret = ""; // thrift does not like returning null } @@ -873,7 +873,7 @@ public void finishFateOperation(TInfo tinfo, TCredentials credentials, TFateId o throws ThriftSecurityException { authenticate(credentials); FateId fateId = FateId.fromThrift(opid); - manager.fate(fateId.getType()).delete(fateId); + manager.fateClient(fateId.getType()).delete(fateId); } protected void authenticate(TCredentials credentials) throws ThriftSecurityException { @@ -987,6 +987,6 @@ public boolean cancelFateOperation(TInfo tinfo, TCredentials credentials, TFateI SecurityErrorCode.PERMISSION_DENIED); } - return manager.fate(fateId.getType()).cancel(fateId); + return manager.fateClient(fateId.getType()).cancel(fateId); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 99caaf54d81..8ecda7f3aff 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -68,6 +68,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateCleaner; +import org.apache.accumulo.core.fate.FateClient; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateStore; @@ -294,17 +295,7 @@ public boolean stillManager() { return getManagerState() != ManagerState.STOP; } - /** - * Retrieve the Fate object, blocking until it is ready. This could cause problems if Fate - * operations are attempted to be used prior to the Manager being ready for them. If these - * operations are triggered by a client side request from a tserver or client, it should be safe - * to wait to handle those until Fate is ready, but if it occurs during an upgrade, or some other - * time in the Manager before Fate is started, that may result in a deadlock and will need to be - * fixed. - * - * @return the Fate object, only after the fate components are running and ready - */ - public Fate fate(FateInstanceType type) { + private void waitForFate() { try { // block up to 30 seconds until it's ready; if it's still not ready, introduce some logging if (!fateReadyLatch.await(30, SECONDS)) { @@ -325,7 +316,26 @@ public Fate fate(FateInstanceType type) { Thread.currentThread().interrupt(); throw new IllegalStateException("Thread was interrupted; cannot proceed"); } - return getFateRefs().get(type); + } + + /** + * Retrieve the Fate object, blocking until it is ready. This could cause problems if Fate + * operations are attempted to be used prior to the Manager being ready for them. If these + * operations are triggered by a client side request from a tserver or client, it should be safe + * to wait to handle those until Fate is ready, but if it occurs during an upgrade, or some other + * time in the Manager before Fate is started, that may result in a deadlock and will need to be + * fixed. + * + * @return the Fate object, only after the fate components are running and ready + */ + public Fate fate(FateInstanceType type) { + waitForFate(); + var fate = Objects.requireNonNull(fateRefs.get(), "fateRefs is not set yet").get(type); + return Objects.requireNonNull(fate, () -> "fate type " + type + " is not present"); + } + + public FateClient fateClient(FateInstanceType type) { + return fate(type); } static final boolean X = true; @@ -928,7 +938,7 @@ public void run() { // Start the Manager's Fate Service fateServiceHandler = new FateServiceHandler(this); managerClientHandler = new ManagerClientServiceHandler(this); - compactionCoordinator = new CompactionCoordinator(this, fateRefs); + compactionCoordinator = new CompactionCoordinator(this, this::fateClient); var processor = ThriftProcessorTypes.getManagerTProcessor(this, fateServiceHandler, compactionCoordinator.getThriftService(), managerClientHandler, getContext()); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index e6c0c86f2e5..d6915b5155c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -56,8 +56,8 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -90,6 +90,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateClient; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateKey; @@ -121,7 +122,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; -import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.cache.Caches.CacheName; import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; @@ -271,7 +271,7 @@ static FailureCounts incrementSuccess(Object key, FailureCounts counts) { private final ServerContext ctx; private final AuditedSecurityOperation security; private final CompactionJobQueues jobQueues; - private final AtomicReference>> fateInstances; + private final Function> fateClients; // Exposed for tests protected final CountDownLatch shutdown = new CountDownLatch(1); @@ -291,7 +291,7 @@ static FailureCounts incrementSuccess(Object key, FailureCounts counts) { private final Set activeCompactorReservationRequest = ConcurrentHashMap.newKeySet(); public CompactionCoordinator(Manager manager, - AtomicReference>> fateInstances) { + Function> fateClients) { this.ctx = manager.getContext(); this.security = ctx.getSecurityOperation(); this.manager = Objects.requireNonNull(manager); @@ -303,7 +303,7 @@ public CompactionCoordinator(Manager manager, this.queueMetrics = new QueueMetrics(jobQueues); - this.fateInstances = fateInstances; + this.fateClients = fateClients; completed = ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true) .maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build(); @@ -326,7 +326,7 @@ public CompactionCoordinator(Manager manager, .maximumWeight(10485760L).weigher(weigher).build(); deadCompactionDetector = - new DeadCompactionDetector(this.ctx, this, ctx.getScheduledExecutor(), fateInstances); + new DeadCompactionDetector(this.ctx, this, ctx.getScheduledExecutor(), fateClients); var rootReservationPool = ThreadPools.getServerThreadPools().createExecutorService( ctx.getConfiguration(), Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT, true); @@ -789,17 +789,9 @@ public void compactionCompleted(TInfo tinfo, TCredentials credentials, } // maybe fate has not started yet - var localFates = fateInstances.get(); - while (localFates == null) { - UtilWaitThread.sleep(100); - if (shutdown.getCount() == 0) { - return; - } - localFates = fateInstances.get(); - } - var extent = KeyExtent.fromThrift(textent); - var localFate = localFates.get(FateInstanceType.fromTableId(extent.tableId())); + var fateType = FateInstanceType.fromTableId(extent.tableId()); + var localFate = fateClients.apply(fateType); LOG.info("Compaction completed, id: {}, stats: {}, extent: {}", externalCompactionId, stats, extent); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java index da852f1bb1a..ce04296a615 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.manager.compaction.coordinator; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -27,14 +28,14 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateClient; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; @@ -62,16 +63,16 @@ public class DeadCompactionDetector { private final ScheduledThreadPoolExecutor schedExecutor; private final ConcurrentHashMap deadCompactions; private final Set tablesWithUnreferencedTmpFiles = new HashSet<>(); - private final AtomicReference>> fateInstances; + private final Function> fateClients; public DeadCompactionDetector(ServerContext context, CompactionCoordinator coordinator, ScheduledThreadPoolExecutor stpe, - AtomicReference>> fateInstances) { + Function> fateClients) { this.context = context; this.coordinator = coordinator; this.schedExecutor = stpe; this.deadCompactions = new ConcurrentHashMap<>(); - this.fateInstances = fateInstances; + this.fateClients = fateClients; } public void addTableId(TableId tableWithUnreferencedTmpFiles) { @@ -196,13 +197,8 @@ private void detectDeadCompactions() { if (!tabletCompactions.isEmpty()) { // look for any compactions committing in fate and remove those - var fateMap = fateInstances.get(); - if (fateMap == null) { - log.warn("Fate is not present, can not look for dead compactions"); - return; - } - try (Stream keyStream = fateMap.values().stream() - .flatMap(fate -> fate.list(FateKey.FateKeyType.COMPACTION_COMMIT))) { + try (Stream keyStream = Arrays.stream(FateInstanceType.values()).map(fateClients) + .flatMap(fateClient -> fateClient.list(FateKey.FateKeyType.COMPACTION_COMMIT))) { keyStream.map(fateKey -> fateKey.getCompactionId().orElseThrow()).forEach(ecid -> { if (tabletCompactions.remove(ecid) != null) { log.debug("Ignoring compaction {} that is committing in a fate", ecid); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java b/server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java index e4b39229ef1..e5e8c850725 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java @@ -158,7 +158,7 @@ void submit(MergeableRange range, FateInstanceType type, Entry t tableId, startRowStr, endRowStr); var fateKey = FateKey.forMerge(new KeyExtent(tableId, range.endRow, range.startRow)); - manager.fate(type).seedTransaction(FateOperation.SYSTEM_MERGE, fateKey, + manager.fateClient(type).seedTransaction(FateOperation.SYSTEM_MERGE, fateKey, new TraceRepo<>( new TableRangeOp(Operation.SYSTEM_MERGE, namespaceId, tableId, startRow, endRow)), true); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java index 1f21fde170e..93d4c1cf03e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java @@ -89,7 +89,7 @@ public void run() { private void seedSplits(FateInstanceType instanceType, Map splits) { if (!splits.isEmpty()) { - try (var seeder = manager.fate(instanceType).beginSeeding()) { + try (var seeder = manager.fateClient(instanceType).beginSeeding()) { for (KeyExtent extent : splits.values()) { @SuppressWarnings("unused") var unused = seeder.attemptToSeedTransaction(Fate.FateOperation.SYSTEM_SPLIT, diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index b121faf2b39..0800c42d2a6 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -39,7 +39,6 @@ import java.util.UUID; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.admin.CompactionConfig; @@ -55,7 +54,6 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; -import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; @@ -79,7 +77,6 @@ import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator; import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue; import org.apache.accumulo.manager.compaction.queue.ResolvedCompactionJob; -import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.security.AuditedSecurityOperation; @@ -94,10 +91,6 @@ public class CompactionCoordinatorTest { - // Need a non-null fateInstances reference for CompactionCoordinator.compactionCompleted - private static final AtomicReference>> fateInstances = - new AtomicReference<>(Map.of()); - private static final ResourceGroupId GROUP_ID = ResourceGroupId.of("R2DQ"); private final HostAndPort tserverAddr = HostAndPort.fromParts("192.168.1.1", 9090); @@ -118,7 +111,7 @@ public class TestCoordinator extends CompactionCoordinator { private Set metadataCompactionIds = null; public TestCoordinator(Manager manager, List runningCompactions) { - super(manager, fateInstances); + super(manager, t -> null); this.runningCompactions = runningCompactions; }