From 687f70554e18fe2df6f411ed981e8c9676beec57 Mon Sep 17 00:00:00 2001 From: andrew mcdonald Date: Mon, 4 May 2026 10:30:03 -0500 Subject: [PATCH 1/3] Back porting improves tablet load times for tablet w/ walogs and no data #4873 This commit makes two major changes. First it changed log recovery to use block caches. Second it checks if a tablet has any data in walogs before acquiring the recovery lock. These two changes together really speed up loading tablets that have no data in walogs. These changes introduce an extra opening of the walogs to see if the recovery lock needs to be acquired. Using the block caches for this extra opening should avoid any extra cost. The block caches also help in the case where many tablets with the same walogs are assigned to a tablet server. In some simple test saw an 8x speedup in tablet load times. Anytime a tablet has an unclean shutdown it will have the walogs of the dead tserver assigned to it even if had no data in those walogs. These change make loading tablets in that situation much faster. {"fundingSource": "41201", "team": "FED.ICGSA.OPS.MOE", "fshGit": "dummy-lo", "fshDocker": "sha256:20cf0045"} --- .../accumulo/tserver/AssignmentHandler.java | 73 +++++++++---------- .../apache/accumulo/tserver/TabletServer.java | 25 ++++++- .../tserver/log/RecoveryLogsIterator.java | 42 ++++++++--- .../tserver/log/SortedLogRecovery.java | 27 +++++-- .../tserver/log/TabletServerLogger.java | 39 +++++++++- .../tserver/log/SortedLogRecoveryTest.java | 19 ++++- 6 files changed, 170 insertions(+), 55 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java index e6154e1392e..a0612b8891b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java @@ -157,44 +157,45 @@ public void run() { boolean successful = false; try { - server.acquireRecoveryMemory(extent); - - TabletResourceManager trm = server.resourceManager.createTabletResourceManager(extent, - server.getTableConfiguration(extent)); - TabletData data = new TabletData(tabletMetadata); - - tablet = new Tablet(server, extent, trm, data); - // If a minor compaction starts after a tablet opens, this indicates a log recovery - // occurred. This recovered data must be minor compacted. - // There are three reasons to wait for this minor compaction to finish before placing the - // tablet in online tablets. - // - // 1) The log recovery code does not handle data written to the tablet on multiple tablet - // servers. - // 2) The log recovery code does not block if memory is full. Therefore recovering lots of - // tablets that use a lot of memory could run out of memory. - // 3) The minor compaction finish event did not make it to the logs (the file will be in - // metadata, preventing replay of compacted data)... but do not - // want a majc to wipe the file out from metadata and then have another process failure... - // this could cause duplicate data to replay. - if (tablet.getNumEntriesInMemory() > 0 - && !tablet.minorCompactNow(MinorCompactionReason.RECOVERY)) { - throw new RuntimeException("Minor compaction after recovery fails for " + extent); - } - Assignment assignment = - new Assignment(extent, server.getTabletSession(), tabletMetadata.getLast()); - TabletStateStore.setLocation(server.getContext(), assignment); + try (var recoveryLock = server.acquireRecoveryMemory(tabletMetadata)) { + + TabletResourceManager trm = server.resourceManager.createTabletResourceManager(extent, + server.getTableConfiguration(extent)); + TabletData data = new TabletData(tabletMetadata); + + tablet = new Tablet(server, extent, trm, data); + // If a minor compaction starts after a tablet opens, this indicates a log recovery + // occurred. This recovered data must be minor compacted. + // There are three reasons to wait for this minor compaction to finish before placing the + // tablet in online tablets. + // + // 1) The log recovery code does not handle data written to the tablet on multiple tablet + // servers. + // 2) The log recovery code does not block if memory is full. Therefore recovering lots of + // tablets that use a lot of memory could run out of memory. + // 3) The minor compaction finish event did not make it to the logs (the file will be in + // metadata, preventing replay of compacted data)... but do not + // want a majc to wipe the file out from metadata and then have another process failure... + // this could cause duplicate data to replay. + if (tablet.getNumEntriesInMemory() > 0 + && !tablet.minorCompactNow(MinorCompactionReason.RECOVERY)) { + throw new RuntimeException("Minor compaction after recovery fails for " + extent); + } + Assignment assignment = + new Assignment(extent, server.getTabletSession(), tabletMetadata.getLast()); + TabletStateStore.setLocation(server.getContext(), assignment); - synchronized (server.openingTablets) { - synchronized (server.onlineTablets) { - server.openingTablets.remove(extent); - server.onlineTablets.put(extent, tablet); - server.openingTablets.notifyAll(); - server.recentlyUnloadedCache.remove(tablet.getExtent()); + synchronized (server.openingTablets) { + synchronized (server.onlineTablets) { + server.openingTablets.remove(extent); + server.onlineTablets.put(extent, tablet); + server.openingTablets.notifyAll(); + server.recentlyUnloadedCache.remove(tablet.getExtent()); + } } + tablet = null; // release this reference + successful = true; } - tablet = null; // release this reference - successful = true; } catch (Exception e) { log.warn("exception trying to assign tablet {} {}", extent, locationToOpen, e); @@ -205,8 +206,6 @@ public void run() { TableId tableId = extent.tableId(); ProblemReports.getInstance(server.getContext()).report(new ProblemReport(tableId, TABLET_LOAD, extent.getUUID().toString(), server.getClientAddressString(), e)); - } finally { - server.releaseRecoveryMemory(extent); } if (successful) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 25ea5276729..507c4dec8b2 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -29,6 +29,7 @@ import static org.apache.accumulo.core.util.threads.ThreadPools.watchNonCriticalScheduledTask; import java.io.IOException; +import java.io.UncheckedIOException; import java.lang.management.ManagementFactory; import java.net.UnknownHostException; import java.security.SecureRandom; @@ -89,6 +90,7 @@ import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.process.thrift.ServerProcessService; @@ -547,9 +549,14 @@ public void enqueueManagerMessage(ManagerMessage m) { managerMessages.addLast(m); } - void acquireRecoveryMemory(KeyExtent extent) { - if (!extent.isMeta()) { + private static final AutoCloseable NOOP_CLOSEABLE = () -> {}; + + AutoCloseable acquireRecoveryMemory(TabletMetadata tabletMetadata) { + if (tabletMetadata.getExtent().isMeta() || !needsRecovery(tabletMetadata)) { + return NOOP_CLOSEABLE; + } else { recoveryLock.lock(); + return () -> recoveryLock.unlock(); } } @@ -559,6 +566,20 @@ void releaseRecoveryMemory(KeyExtent extent) { } } + public boolean needsRecovery(TabletMetadata tabletMetadata) { + var logEntries = tabletMetadata.getLogs(); + + if (logEntries.isEmpty()) { + return false; + } + + try { + return logger.needsRecovery(getContext(), tabletMetadata.getExtent(), logEntries); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private ServerAddress startServer(String address, TProcessor processor) throws UnknownHostException { @SuppressWarnings("deprecation") diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java index f2a1a5c990d..98f927bb700 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.blockfile.impl.CacheProvider; import org.apache.accumulo.core.iterators.IteratorAdapter; import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; import org.apache.accumulo.core.spi.crypto.CryptoService; @@ -50,6 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.cache.Cache; import com.google.common.collect.Iterators; /** @@ -64,11 +66,17 @@ public class RecoveryLogsIterator private final Iterator> iter; private final CryptoEnvironment env = new CryptoEnvironmentImpl(CryptoEnvironment.Scope.RECOVERY); + public RecoveryLogsIterator(ServerContext context, List recoveryLogDirs, LogFileKey start, + LogFileKey end, boolean checkFirstKey) throws IOException { + this(context, recoveryLogDirs, start, end, checkFirstKey, null, null); + } + /** * Scans the files in each recoveryLogDir over the range [start,end]. */ public RecoveryLogsIterator(ServerContext context, List recoveryLogDirs, LogFileKey start, - LogFileKey end, boolean checkFirstKey) throws IOException { + LogFileKey end, boolean checkFirstKey, Cache fileLenCache, + CacheProvider cacheProvider) throws IOException { List>> iterators = new ArrayList<>(recoveryLogDirs.size()); fileIters = new ArrayList<>(); @@ -85,13 +93,12 @@ public RecoveryLogsIterator(ServerContext context, List recoveryLogDirs, L // only check the first key once to prevent extra iterator creation and seeking if (checkFirstKey && !logFiles.isEmpty()) { - validateFirstKey(context, cryptoService, fs, logFiles, logDir); + validateFirstKey(context, cryptoService, fs, logFiles, logDir, fileLenCache, cacheProvider); } for (Path log : logFiles) { - FileSKVIterator fileIter = FileOperations.getInstance().newReaderBuilder() - .forFile(log.toString(), fs, fs.getConf(), cryptoService) - .withTableConfiguration(context.getConfiguration()).seekToBeginning().build(); + FileSKVIterator fileIter = + openLogFile(context, log, cryptoService, fs, fileLenCache, cacheProvider); if (range != null) { fileIter.seek(range, Collections.emptySet(), false); } @@ -134,6 +141,23 @@ public void close() throws IOException { } } + FileSKVIterator openLogFile(ServerContext context, Path logFile, CryptoService cs, FileSystem fs, + Cache fileLenCache, CacheProvider cacheProvider) throws IOException { + var builder = FileOperations.getInstance().newReaderBuilder() + .forFile(logFile.toString(), fs, fs.getConf(), cs) + .withTableConfiguration(context.getConfiguration()); + + if (fileLenCache != null) { + builder = builder.withFileLenCache(fileLenCache); + } + + if (cacheProvider != null) { + builder = builder.withCacheProvider(cacheProvider); + } + + return builder.seekToBeginning().build(); + } + /** * Check for sorting signal files (finished/failed) and get the logs in the provided directory. */ @@ -169,10 +193,10 @@ private SortedSet getFiles(VolumeManager fs, Path directory) throws IOExce * Check that the first entry in the WAL is OPEN. Only need to do this once. */ private void validateFirstKey(ServerContext context, CryptoService cs, FileSystem fs, - SortedSet logFiles, Path fullLogPath) throws IOException { - try (FileSKVIterator fileIter = FileOperations.getInstance().newReaderBuilder() - .forFile(logFiles.first().toString(), fs, fs.getConf(), cs) - .withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) { + SortedSet logFiles, Path fullLogPath, Cache fileLenCache, + CacheProvider cacheProvider) throws IOException { + try (FileSKVIterator fileIter = + openLogFile(context, logFiles.first(), cs, fs, fileLenCache, cacheProvider)) { Iterator> iterator = new IteratorAdapter(fileIter); if (iterator.hasNext()) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java index 40e8496ba79..99470c21565 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java @@ -42,6 +42,7 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.file.blockfile.impl.CacheProvider; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.tserver.logger.LogEvents; @@ -51,6 +52,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.cache.Cache; import com.google.common.collect.Collections2; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; @@ -65,8 +67,21 @@ public class SortedLogRecovery { private final ServerContext context; - public SortedLogRecovery(ServerContext context) { + private final CacheProvider cacheProvider; + + private final Cache fileLenCache; + + public SortedLogRecovery(ServerContext context, Cache fileLenCache, + CacheProvider cacheProvider) { this.context = context; + this.cacheProvider = cacheProvider; + this.fileLenCache = fileLenCache; + } + + public boolean needsRecovery(KeyExtent extent, List recoveryDirs) throws IOException { + Entry> maxEntry = findLogsThatDefineTablet(extent, recoveryDirs); + int tabletId = maxEntry.getKey(); + return tabletId != -1; } static LogFileKey maxKey(LogEvents event) { @@ -104,7 +119,7 @@ private int findMaxTabletId(KeyExtent extent, List recoveryLogDirs) throws int tabletId = -1; try (var rli = new RecoveryLogsIterator(context, recoveryLogDirs, minKey(DEFINE_TABLET), - maxKey(DEFINE_TABLET), true)) { + maxKey(DEFINE_TABLET), true, fileLenCache, cacheProvider)) { KeyExtent alternative = extent; if (extent.isRootTablet()) { @@ -207,8 +222,9 @@ private long findRecoverySeq(List recoveryLogs, Set tabletFiles, i long lastFinish = 0; long recoverySeq = 0; - try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, recoveryLogs, - minKey(COMPACTION_START, tabletId), maxKey(COMPACTION_START, tabletId), false)) { + try (RecoveryLogsIterator rli = + new RecoveryLogsIterator(context, recoveryLogs, minKey(COMPACTION_START, tabletId), + maxKey(COMPACTION_START, tabletId), false, fileLenCache, cacheProvider)) { DeduplicatingIterator ddi = new DeduplicatingIterator(rli); @@ -265,7 +281,8 @@ private void playbackMutations(List recoveryLogs, MutationReceiver mr, int LogFileKey end = maxKey(MUTATION, tabletId); - try (var rli = new RecoveryLogsIterator(context, recoveryLogs, start, end, false)) { + try (var rli = new RecoveryLogsIterator(context, recoveryLogs, start, end, false, fileLenCache, + cacheProvider)) { while (rli.hasNext()) { Entry entry = rli.next(); LogFileKey logFileKey = entry.getKey(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 5cc5c26e56d..21d34b755f9 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -40,7 +41,12 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; +import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider; +import org.apache.accumulo.core.file.blockfile.impl.CacheProvider; +import org.apache.accumulo.core.logging.LoggingBlockCache; import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.spi.cache.CacheType; +import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.Retry.RetryFactory; @@ -51,6 +57,7 @@ import org.apache.accumulo.server.util.ReplicationTableUtil; import org.apache.accumulo.tserver.TabletMutations; import org.apache.accumulo.tserver.TabletServer; +import org.apache.accumulo.tserver.TabletServerResourceManager; import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation; import org.apache.accumulo.tserver.log.DfsLogger.ServerResources; import org.apache.accumulo.tserver.tablet.CommitSession; @@ -553,10 +560,40 @@ public long minorCompactionStarted(final CommitSession commitSession, final long return seq; } + private List resolve(Collection walogs) { + List sortedLogs = new ArrayList<>(walogs.size()); + for (var logEntry : walogs) { + sortedLogs.add(new Path(logEntry.filename)); + } + return sortedLogs; + } + + private CacheProvider createCacheProvider(TabletServerResourceManager resourceMgr) { + return new BasicCacheProvider( + LoggingBlockCache.wrap(CacheType.INDEX, resourceMgr.getIndexCache()), + LoggingBlockCache.wrap(CacheType.DATA, resourceMgr.getDataCache())); + } + + public boolean needsRecovery(ServerContext context, KeyExtent extent, Collection walogs) + throws IOException { + try { + var resourceMgr = tserver.getResourceManager(); + var cacheProvider = createCacheProvider(resourceMgr); + SortedLogRecovery recovery = + new SortedLogRecovery(context, resourceMgr.getFileLenCache(), cacheProvider); + return recovery.needsRecovery(extent, resolve(walogs)); + } catch (Exception e) { + throw new IOException(e); + } + } + public void recover(ServerContext context, KeyExtent extent, List recoveryDirs, Set tabletFiles, MutationReceiver mr) throws IOException { try { - SortedLogRecovery recovery = new SortedLogRecovery(context); + var resourceMgr = tserver.getResourceManager(); + var cacheProvider = createCacheProvider(resourceMgr); + SortedLogRecovery recovery = + new SortedLogRecovery(context, resourceMgr.getFileLenCache(), cacheProvider); recovery.recover(extent, recoveryDirs, tabletFiles, mr); } catch (Exception e) { throw new IOException(e); diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java index 1e4a79c71c6..9450e9018e9 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java @@ -51,10 +51,15 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCache; +import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider; +import org.apache.accumulo.core.file.blockfile.impl.CacheProvider; import org.apache.accumulo.core.file.rfile.bcfile.Compression; import org.apache.accumulo.core.file.rfile.bcfile.CompressionAlgorithm; import org.apache.accumulo.core.file.rfile.bcfile.Utils; import org.apache.accumulo.core.file.streams.SeekableDataInputStream; +import org.apache.accumulo.core.spi.cache.CacheType; import org.apache.accumulo.core.spi.crypto.CryptoServiceFactory; import org.apache.accumulo.core.spi.crypto.GenericCryptoServiceFactory; import org.apache.accumulo.core.util.Pair; @@ -75,6 +80,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input") @@ -165,6 +173,15 @@ private List recover(Map logs, KeyExtent extent) th private List recover(Map logs, Set files, KeyExtent extent, int bufferSize) throws IOException { + CacheProvider cacheProvider = new BasicCacheProvider( + new TinyLfuBlockCache( + BlockCacheConfiguration.forTabletServer(DefaultConfiguration.getInstance()), + CacheType.INDEX), + new TinyLfuBlockCache( + BlockCacheConfiguration.forTabletServer(DefaultConfiguration.getInstance()), + CacheType.DATA)); + Cache fileLenCache = CacheBuilder.newBuilder().build(); + final String workdir = new File(tempDir, testName()).getAbsolutePath(); try (var fs = VolumeManagerImpl.getLocalForTesting(workdir)) { CryptoServiceFactory cryptoFactory = new GenericCryptoServiceFactory(); @@ -198,7 +215,7 @@ private List recover(Map logs, Set files, K dirs.add(new Path(destPath)); } // Recover - SortedLogRecovery recovery = new SortedLogRecovery(context); + SortedLogRecovery recovery = new SortedLogRecovery(context, fileLenCache, cacheProvider); CaptureMutations capture = new CaptureMutations(); recovery.recover(extent, dirs, files, capture); verify(context); From 9bfc2c8d8c04b9e0ac4f149b0a5bed968c817fbb Mon Sep 17 00:00:00 2001 From: andrew mcdonald Date: Tue, 5 May 2026 09:17:34 -0500 Subject: [PATCH 2/3] Back porting switches volumes prior to recovery check (#4889) In #4873 a check was added to inspect walogs during tablet load to see if they had any data for the tablet. This check happens prior to volume replacement that also runs during tablet load. Therefore if volume replacement is needed for the walogs then this check will fail because it can not find the files and the tablet will fail to load. To fix this problem modified the new check to switch volumes if needed prior to running the check. {"fundingSource": "41201", "team": "FED.ICGSA.OPS.MOE", "fshGit": "dummy-lo", "fshDocker": "sha256:20cf0045"} --- .../apache/accumulo/server/fs/VolumeUtil.java | 4 ++++ .../apache/accumulo/tserver/TabletServer.java | 19 ++++++++++++++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java index 35e1fbe1bc1..7fd7b98bf19 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java @@ -89,6 +89,10 @@ public static Path switchVolume(String path, FileType ft, List> return null; } + public static LogEntry switchVolume(LogEntry le, List> replacements) { + return switchVolumes(le, replacements); + } + protected static LogEntry switchVolumes(LogEntry le, List> replacements) { Path switchedPath = switchVolume(le.filename, FileType.WAL, replacements); String switchedString; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 507c4dec8b2..162e94bbe4c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -121,6 +121,7 @@ import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.accumulo.server.log.SortedLogState; import org.apache.accumulo.server.log.WalStateManager; import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; @@ -573,8 +574,24 @@ public boolean needsRecovery(TabletMetadata tabletMetadata) { return false; } + // This method is called prior to volumes being switched for a tablet during the load process, + // so switch volumes before calling needsRecovery() + var switchedLogEntries = new ArrayList(logEntries.size()); + + for (LogEntry logEntry : logEntries) { + var switchedWalog = VolumeUtil.switchVolume(logEntry, context.getVolumeReplacements()); + LogEntry walog; + if (switchedWalog != null) { + log.debug("Volume switched for needsRecovery {} -> {}", logEntry, switchedWalog); + walog = switchedWalog; + } else { + walog = logEntry; + } + switchedLogEntries.add(walog); + } + try { - return logger.needsRecovery(getContext(), tabletMetadata.getExtent(), logEntries); + return logger.needsRecovery(getContext(), tabletMetadata.getExtent(), switchedLogEntries); } catch (IOException e) { throw new UncheckedIOException(e); } From 8333a39e87b10571485256de6ffc45f2f7871878 Mon Sep 17 00:00:00 2001 From: andrew mcdonald Date: Tue, 5 May 2026 10:31:55 -0500 Subject: [PATCH 3/3] Back porting Avoids listing the sorted logs dir multiple times during log recovery. (#4874) The log recovery code would list the sorted walog files multiple times during recovery. These changes modify the code to only list the files once. Also the listing is cached for a short period of time to improve the case of multiple tablet referencing the same walogs. This along with #4873 should result in much less traffic to the namenode when an entire accumulo cluster shutsdown and needs to recover. {"fundingSource": "41201", "team": "FED.ICGSA.OPS.MOE", "fshGit": "dummy-lo", "fshDocker": "sha256:20cf0045"} --- .../apache/accumulo/tserver/TabletServer.java | 13 +- .../tserver/log/RecoveryLogsIterator.java | 19 +-- .../tserver/log/ResolvedSortedLog.java | 151 ++++++++++++++++++ .../tserver/log/SortedLogRecovery.java | 42 ++--- .../tserver/log/TabletServerLogger.java | 25 ++- .../accumulo/tserver/logger/LogReader.java | 6 +- .../tserver/log/RecoveryLogsIteratorTest.java | 25 ++- .../tserver/log/SortedLogRecoveryTest.java | 7 +- 8 files changed, 237 insertions(+), 51 deletions(-) create mode 100644 server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 162e94bbe4c..9d901e4c771 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -1281,24 +1281,21 @@ public void minorCompactionStarted(CommitSession tablet, long lastUpdateSequence public void recover(VolumeManager fs, KeyExtent extent, List logEntries, Set tabletFiles, MutationReceiver mutationReceiver) throws IOException { - List recoveryDirs = new ArrayList<>(); List sorted = new ArrayList<>(logEntries); sorted.sort((e1, e2) -> (int) (e1.timestamp - e2.timestamp)); + + // Validate that recovery logs exist before attempting recovery for (LogEntry entry : sorted) { - Path recovery = null; Path finished = RecoveryPath.getRecoveryPath(new Path(entry.filename)); finished = SortedLogState.getFinishedMarkerPath(finished); TabletServer.log.debug("Looking for " + finished); - if (fs.exists(finished)) { - recovery = finished.getParent(); - } - if (recovery == null) { + if (!fs.exists(finished)) { throw new IOException( "Unable to find recovery files for extent " + extent + " logEntry: " + entry); } - recoveryDirs.add(recovery); } - logger.recover(getContext(), extent, recoveryDirs, tabletFiles, mutationReceiver); + + logger.recover(getContext(), extent, sorted, tabletFiles, mutationReceiver); } public int createLogId() { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java index 98f927bb700..32809d1caa6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java @@ -66,16 +66,16 @@ public class RecoveryLogsIterator private final Iterator> iter; private final CryptoEnvironment env = new CryptoEnvironmentImpl(CryptoEnvironment.Scope.RECOVERY); - public RecoveryLogsIterator(ServerContext context, List recoveryLogDirs, LogFileKey start, - LogFileKey end, boolean checkFirstKey) throws IOException { + public RecoveryLogsIterator(ServerContext context, List recoveryLogDirs, + LogFileKey start, LogFileKey end, boolean checkFirstKey) throws IOException { this(context, recoveryLogDirs, start, end, checkFirstKey, null, null); } /** * Scans the files in each recoveryLogDir over the range [start,end]. */ - public RecoveryLogsIterator(ServerContext context, List recoveryLogDirs, LogFileKey start, - LogFileKey end, boolean checkFirstKey, Cache fileLenCache, + public RecoveryLogsIterator(ServerContext context, List recoveryLogDirs, + LogFileKey start, LogFileKey end, boolean checkFirstKey, Cache fileLenCache, CacheProvider cacheProvider) throws IOException { List>> iterators = new ArrayList<>(recoveryLogDirs.size()); @@ -86,14 +86,15 @@ public RecoveryLogsIterator(ServerContext context, List recoveryLogDirs, L final CryptoService cryptoService = context.getCryptoFactory().getService(env, context.getConfiguration().getAllCryptoProperties()); - for (Path logDir : recoveryLogDirs) { - LOG.debug("Opening recovery log dir {}", logDir.getName()); - SortedSet logFiles = getFiles(vm, logDir); - var fs = vm.getFileSystemByPath(logDir); + for (ResolvedSortedLog logDir : recoveryLogDirs) { + LOG.debug("Opening recovery log dir {}", logDir.getDir().getName()); + SortedSet logFiles = logDir.getChildren(); + var fs = vm.getFileSystemByPath(logDir.getDir()); // only check the first key once to prevent extra iterator creation and seeking if (checkFirstKey && !logFiles.isEmpty()) { - validateFirstKey(context, cryptoService, fs, logFiles, logDir, fileLenCache, cacheProvider); + validateFirstKey(context, cryptoService, fs, logFiles, logDir.getDir(), fileLenCache, + cacheProvider); } for (Path log : logFiles) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java new file mode 100644 index 00000000000..06f23cd120f --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.tserver.log; + +import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.log.SortedLogState; +import org.apache.accumulo.server.manager.recovery.RecoveryPath; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * Write ahead logs have two paths in DFS. There is the path of the original unsorted walog and the + * path of the sorted walog. The purpose of this class is to convert the unsorted wal path to a + * sorted wal path and validate the sorted dir exists and is finished. + */ +public class ResolvedSortedLog { + + private final SortedSet children; + private final LogEntry origin; + private final Path sortedLogDir; + + private ResolvedSortedLog(LogEntry origin, Path sortedLogDir, SortedSet children) { + this.origin = origin; + this.sortedLogDir = sortedLogDir; + this.children = Collections.unmodifiableSortedSet(children); + } + + /** + * @return the unsorted walog path from which this was created. + */ + public LogEntry getOrigin() { + return origin; + } + + /** + * @return the path of the directory in which sorted logs are stored + */ + public Path getDir() { + return sortedLogDir; + } + + /** + * @return When an unsorted walog is sorted the sorted data is stored in one or more rfiles, this + * returns the paths of those rfiles. + */ + public SortedSet getChildren() { + return children; + } + + @Override + public String toString() { + return sortedLogDir.toString(); + } + + /** + * For a given path of an unsorted walog check to see if the corresponding sorted log dir exists + * and is finished. If it is return an immutable object containing information about the sorted + * walogs. + */ + public static ResolvedSortedLog resolve(LogEntry logEntry, VolumeManager fs) throws IOException { + + // convert the path of an unsorted log to the expected path for the corresponding sorted log + // dir + Path sortedLogPath = RecoveryPath.getRecoveryPath(new Path(logEntry.filename)); + + boolean foundFinish = false; + // Path::getName compares the last component of each Path value. In this case, the last + // component should + // always have the format 'part-r-XXXXX.rf', where XXXXX are one-up values. + SortedSet logFiles = new TreeSet<>(Comparator.comparing(Path::getName)); + for (FileStatus child : fs.listStatus(sortedLogPath)) { + if (child.getPath().getName().startsWith("_")) { + continue; + } + if (SortedLogState.isFinished(child.getPath().getName())) { + foundFinish = true; + continue; + } + if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) { + continue; + } + FileSystem ns = fs.getFileSystemByPath(child.getPath()); + Path fullLogPath = ns.makeQualified(child.getPath()); + logFiles.add(fullLogPath); + } + if (!foundFinish) { + throw new IOException("Sort '" + SortedLogState.FINISHED.getMarker() + "' flag not found in " + + sortedLogPath + " for walog " + logEntry.filename); + } + + return new ResolvedSortedLog(logEntry, sortedLogPath, logFiles); + } + + /** + * Create a ResolvedSortedLog directly from a sorted log directory path. This is useful for + * diagnostic tools that operate directly on sorted recovery logs without going through the normal + * recovery flow with LogEntry objects. + */ + public static ResolvedSortedLog fromSortedLogDir(Path sortedLogDir, VolumeManager fs) + throws IOException { + boolean foundFinish = false; + SortedSet logFiles = new TreeSet<>(Comparator.comparing(Path::getName)); + for (FileStatus child : fs.listStatus(sortedLogDir)) { + if (child.getPath().getName().startsWith("_")) { + continue; + } + if (SortedLogState.isFinished(child.getPath().getName())) { + foundFinish = true; + continue; + } + if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) { + continue; + } + FileSystem ns = fs.getFileSystemByPath(child.getPath()); + Path fullLogPath = ns.makeQualified(child.getPath()); + logFiles.add(fullLogPath); + } + if (!foundFinish) { + throw new IOException( + "Sort '" + SortedLogState.FINISHED.getMarker() + "' flag not found in " + sortedLogDir); + } + + // Create a dummy LogEntry for the origin (used only for diagnostics) + LogEntry dummyOrigin = new LogEntry(null, 0, sortedLogDir.toString()); + return new ResolvedSortedLog(dummyOrigin, sortedLogDir, logFiles); + } +} diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java index 99470c21565..71210de1203 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java @@ -78,8 +78,10 @@ public SortedLogRecovery(ServerContext context, Cache fileLenCache, this.fileLenCache = fileLenCache; } - public boolean needsRecovery(KeyExtent extent, List recoveryDirs) throws IOException { - Entry> maxEntry = findLogsThatDefineTablet(extent, recoveryDirs); + public boolean needsRecovery(KeyExtent extent, List recoveryDirs) + throws IOException { + Entry> maxEntry = + findLogsThatDefineTablet(extent, recoveryDirs); int tabletId = maxEntry.getKey(); return tabletId != -1; } @@ -115,7 +117,8 @@ static LogFileKey minKey(LogEvents event, int tabletId) { return key; } - private int findMaxTabletId(KeyExtent extent, List recoveryLogDirs) throws IOException { + private int findMaxTabletId(KeyExtent extent, List recoveryLogDirs) + throws IOException { int tabletId = -1; try (var rli = new RecoveryLogsIterator(context, recoveryLogDirs, minKey(DEFINE_TABLET), @@ -155,18 +158,18 @@ private int findMaxTabletId(KeyExtent extent, List recoveryLogDirs) throws * @return The maximum tablet ID observed AND the list of logs that contained the maximum tablet * ID. */ - private Entry> findLogsThatDefineTablet(KeyExtent extent, - List recoveryDirs) throws IOException { - Map> logsThatDefineTablet = new HashMap<>(); + private Entry> findLogsThatDefineTablet(KeyExtent extent, + List recoveryDirs) throws IOException { + Map> logsThatDefineTablet = new HashMap<>(); - for (Path walDir : recoveryDirs) { + for (ResolvedSortedLog walDir : recoveryDirs) { int tabletId = findMaxTabletId(extent, Collections.singletonList(walDir)); if (tabletId == -1) { - log.debug("Did not find tablet {} in recovery log {}", extent, walDir.getName()); + log.debug("Did not find tablet {} in recovery log {}", extent, walDir.getDir().getName()); } else { logsThatDefineTablet.computeIfAbsent(tabletId, k -> new ArrayList<>()).add(walDir); log.debug("Found tablet {} with id {} in recovery log {}", extent, tabletId, - walDir.getName()); + walDir.getDir().getName()); } } @@ -211,8 +214,8 @@ public Entry next() { } - private long findRecoverySeq(List recoveryLogs, Set tabletFiles, int tabletId) - throws IOException { + private long findRecoverySeq(List recoveryLogs, Set tabletFiles, + int tabletId) throws IOException { HashSet suffixes = new HashSet<>(); for (String path : tabletFiles) { suffixes.add(getPathSuffix(path)); @@ -274,8 +277,8 @@ private long findRecoverySeq(List recoveryLogs, Set tabletFiles, i return recoverySeq; } - private void playbackMutations(List recoveryLogs, MutationReceiver mr, int tabletId, - long recoverySeq) throws IOException { + private void playbackMutations(List recoveryLogs, MutationReceiver mr, + int tabletId, long recoverySeq) throws IOException { LogFileKey start = minKey(MUTATION, tabletId); start.setSeq(recoverySeq); @@ -303,20 +306,21 @@ private void playbackMutations(List recoveryLogs, MutationReceiver mr, int } } - Collection asNames(List recoveryLogs) { - return Collections2.transform(recoveryLogs, Path::getName); + Collection asNames(List recoveryLogs) { + return Collections2.transform(recoveryLogs, rl -> rl.getDir().getName()); } - public void recover(KeyExtent extent, List recoveryDirs, Set tabletFiles, - MutationReceiver mr) throws IOException { + public void recover(KeyExtent extent, List recoveryDirs, + Set tabletFiles, MutationReceiver mr) throws IOException { - Entry> maxEntry = findLogsThatDefineTablet(extent, recoveryDirs); + Entry> maxEntry = + findLogsThatDefineTablet(extent, recoveryDirs); // A tablet may leave a tserver and then come back, in which case it would have a different and // higher tablet id. Only want to consider events in the log related to the last time the tablet // was loaded. int tabletId = maxEntry.getKey(); - List logsThatDefineTablet = maxEntry.getValue(); + List logsThatDefineTablet = maxEntry.getValue(); if (tabletId == -1) { log.info("Tablet {} is not defined in recovery logs {} ", extent, asNames(recoveryDirs)); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 21d34b755f9..c0084f84105 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -65,6 +65,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + /** * Central logging facility for the TServerInfo. * @@ -83,6 +86,9 @@ public class TabletServerLogger { private final TabletServer tserver; + // Cache for resolved sorted logs to avoid repeated expensive I/O operations + private final Cache sortedLogCache; + // The current logger private DfsLogger currentLog = null; private final SynchronousQueue nextLog = new SynchronousQueue<>(); @@ -164,6 +170,8 @@ public TabletServerLogger(TabletServer tserver, long maxSize, AtomicLong syncCou this.createRetry = null; this.writeRetryFactory = writeRetryFactory; this.maxAge = maxAge; + this.sortedLogCache = + CacheBuilder.newBuilder().expireAfterWrite(3, TimeUnit.SECONDS).maximumSize(1000).build(); } private DfsLogger initializeLoggers(final AtomicInteger logIdOut) throws IOException { @@ -560,10 +568,17 @@ public long minorCompactionStarted(final CommitSession commitSession, final long return seq; } - private List resolve(Collection walogs) { - List sortedLogs = new ArrayList<>(walogs.size()); + private List resolve(Collection walogs) throws IOException { + List sortedLogs = new ArrayList<>(walogs.size()); + VolumeManager fs = tserver.getContext().getVolumeManager(); for (var logEntry : walogs) { - sortedLogs.add(new Path(logEntry.filename)); + try { + ResolvedSortedLog resolvedLog = + sortedLogCache.get(logEntry, () -> ResolvedSortedLog.resolve(logEntry, fs)); + sortedLogs.add(resolvedLog); + } catch (Exception e) { + throw new IOException("Failed to resolve sorted log for " + logEntry.filename, e); + } } return sortedLogs; } @@ -587,14 +602,14 @@ public boolean needsRecovery(ServerContext context, KeyExtent extent, Collection } } - public void recover(ServerContext context, KeyExtent extent, List recoveryDirs, + public void recover(ServerContext context, KeyExtent extent, Collection walogs, Set tabletFiles, MutationReceiver mr) throws IOException { try { var resourceMgr = tserver.getResourceManager(); var cacheProvider = createCacheProvider(resourceMgr); SortedLogRecovery recovery = new SortedLogRecovery(context, resourceMgr.getFileLenCache(), cacheProvider); - recovery.recover(extent, recoveryDirs, tabletFiles, mr); + recovery.recover(extent, resolve(walogs), tabletFiles, mr); } catch (Exception e) { throw new IOException(e); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java index 0d9ea4fd142..cb4d7eca42e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java @@ -48,6 +48,7 @@ import org.apache.accumulo.tserver.log.DfsLogger; import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException; import org.apache.accumulo.tserver.log.RecoveryLogsIterator; +import org.apache.accumulo.tserver.log.ResolvedSortedLog; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -173,8 +174,9 @@ public void execute(String[] args) throws Exception { } else { // read the log entries in a sorted RFile. This has to be a directory that contains the // finished file. - try (var rli = new RecoveryLogsIterator(context, Collections.singletonList(path), null, - null, false)) { + try (var rli = new RecoveryLogsIterator(context, + Collections.singletonList(ResolvedSortedLog.fromSortedLogDir(path, fs)), null, null, + false)) { while (rli.hasNext()) { Entry entry = rli.next(); printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds, diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java index 75d933db372..b6473fd494f 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java @@ -132,7 +132,8 @@ public void testSimpleRLI() throws IOException { createRecoveryDir(logs, dirs, true); - try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, dirs, null, null, false)) { + try (RecoveryLogsIterator rli = + new RecoveryLogsIterator(context, pathsToResolvedLogs(dirs), null, null, false)) { while (rli.hasNext()) { Entry entry = rli.next(); assertEquals(1, entry.getKey().getTabletId(), "TabletId does not match"); @@ -159,7 +160,7 @@ public void testFinishMarker() throws IOException { createRecoveryDir(logs, dirs, false); assertThrows(IOException.class, - () -> new RecoveryLogsIterator(context, dirs, null, null, false), + () -> new RecoveryLogsIterator(context, pathsToResolvedLogs(dirs), null, null, false), "Finish marker should not be found"); } @@ -168,9 +169,10 @@ public void testSingleFile() throws IOException { String destPath = workDir + "/test.rf"; fs.create(new Path(destPath)); - assertThrows( - IOException.class, () -> new RecoveryLogsIterator(context, - Collections.singletonList(new Path(destPath)), null, null, false), + assertThrows(IOException.class, + () -> new RecoveryLogsIterator(context, + Collections.singletonList(ResolvedSortedLog.fromSortedLogDir(new Path(destPath), fs)), + null, null, false), "Finish marker should not be found for a single file."); } @@ -192,7 +194,7 @@ public void testCheckFirstKeyFailed() throws IOException { createRecoveryDir(logs, dirs, true); assertThrows(IllegalStateException.class, - () -> new RecoveryLogsIterator(context, dirs, null, null, true), + () -> new RecoveryLogsIterator(context, pathsToResolvedLogs(dirs), null, null, true), "First log entry is not OPEN so exception should be thrown."); } @@ -219,7 +221,8 @@ public void testCheckFirstKeyPass() throws IOException { createRecoveryDir(logs, dirs, true); - try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, dirs, null, null, true)) { + try (RecoveryLogsIterator rli = + new RecoveryLogsIterator(context, pathsToResolvedLogs(dirs), null, null, true)) { while (rli.hasNext()) { Entry entry = rli.next(); assertNotNull(entry.getKey()); @@ -248,4 +251,12 @@ private void createRecoveryDir(Map logs, ArrayList dirs dirs.add(new Path(destPath)); } } + + private List pathsToResolvedLogs(List dirs) throws IOException { + List resolvedLogs = new ArrayList<>(); + for (Path dir : dirs) { + resolvedLogs.add(ResolvedSortedLog.fromSortedLogDir(dir, fs)); + } + return resolvedLogs; + } } diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java index 9450e9018e9..592c9c0f946 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java @@ -217,7 +217,12 @@ private List recover(Map logs, Set files, K // Recover SortedLogRecovery recovery = new SortedLogRecovery(context, fileLenCache, cacheProvider); CaptureMutations capture = new CaptureMutations(); - recovery.recover(extent, dirs, files, capture); + // Convert Path objects to ResolvedSortedLog objects for recovery + List resolvedLogs = new ArrayList<>(); + for (Path dir : dirs) { + resolvedLogs.add(ResolvedSortedLog.fromSortedLogDir(dir, fs)); + } + recovery.recover(extent, resolvedLogs, files, capture); verify(context); return capture.result; }