diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java index 35e1fbe1bc1..7fd7b98bf19 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java @@ -89,6 +89,10 @@ public static Path switchVolume(String path, FileType ft, List> return null; } + public static LogEntry switchVolume(LogEntry le, List> replacements) { + return switchVolumes(le, replacements); + } + protected static LogEntry switchVolumes(LogEntry le, List> replacements) { Path switchedPath = switchVolume(le.filename, FileType.WAL, replacements); String switchedString; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java index e6154e1392e..a0612b8891b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java @@ -157,44 +157,45 @@ public void run() { boolean successful = false; try { - server.acquireRecoveryMemory(extent); - - TabletResourceManager trm = server.resourceManager.createTabletResourceManager(extent, - server.getTableConfiguration(extent)); - TabletData data = new TabletData(tabletMetadata); - - tablet = new Tablet(server, extent, trm, data); - // If a minor compaction starts after a tablet opens, this indicates a log recovery - // occurred. This recovered data must be minor compacted. - // There are three reasons to wait for this minor compaction to finish before placing the - // tablet in online tablets. - // - // 1) The log recovery code does not handle data written to the tablet on multiple tablet - // servers. - // 2) The log recovery code does not block if memory is full. Therefore recovering lots of - // tablets that use a lot of memory could run out of memory. - // 3) The minor compaction finish event did not make it to the logs (the file will be in - // metadata, preventing replay of compacted data)... but do not - // want a majc to wipe the file out from metadata and then have another process failure... - // this could cause duplicate data to replay. - if (tablet.getNumEntriesInMemory() > 0 - && !tablet.minorCompactNow(MinorCompactionReason.RECOVERY)) { - throw new RuntimeException("Minor compaction after recovery fails for " + extent); - } - Assignment assignment = - new Assignment(extent, server.getTabletSession(), tabletMetadata.getLast()); - TabletStateStore.setLocation(server.getContext(), assignment); + try (var recoveryLock = server.acquireRecoveryMemory(tabletMetadata)) { + + TabletResourceManager trm = server.resourceManager.createTabletResourceManager(extent, + server.getTableConfiguration(extent)); + TabletData data = new TabletData(tabletMetadata); + + tablet = new Tablet(server, extent, trm, data); + // If a minor compaction starts after a tablet opens, this indicates a log recovery + // occurred. This recovered data must be minor compacted. + // There are three reasons to wait for this minor compaction to finish before placing the + // tablet in online tablets. + // + // 1) The log recovery code does not handle data written to the tablet on multiple tablet + // servers. + // 2) The log recovery code does not block if memory is full. Therefore recovering lots of + // tablets that use a lot of memory could run out of memory. + // 3) The minor compaction finish event did not make it to the logs (the file will be in + // metadata, preventing replay of compacted data)... but do not + // want a majc to wipe the file out from metadata and then have another process failure... + // this could cause duplicate data to replay. + if (tablet.getNumEntriesInMemory() > 0 + && !tablet.minorCompactNow(MinorCompactionReason.RECOVERY)) { + throw new RuntimeException("Minor compaction after recovery fails for " + extent); + } + Assignment assignment = + new Assignment(extent, server.getTabletSession(), tabletMetadata.getLast()); + TabletStateStore.setLocation(server.getContext(), assignment); - synchronized (server.openingTablets) { - synchronized (server.onlineTablets) { - server.openingTablets.remove(extent); - server.onlineTablets.put(extent, tablet); - server.openingTablets.notifyAll(); - server.recentlyUnloadedCache.remove(tablet.getExtent()); + synchronized (server.openingTablets) { + synchronized (server.onlineTablets) { + server.openingTablets.remove(extent); + server.onlineTablets.put(extent, tablet); + server.openingTablets.notifyAll(); + server.recentlyUnloadedCache.remove(tablet.getExtent()); + } } + tablet = null; // release this reference + successful = true; } - tablet = null; // release this reference - successful = true; } catch (Exception e) { log.warn("exception trying to assign tablet {} {}", extent, locationToOpen, e); @@ -205,8 +206,6 @@ public void run() { TableId tableId = extent.tableId(); ProblemReports.getInstance(server.getContext()).report(new ProblemReport(tableId, TABLET_LOAD, extent.getUUID().toString(), server.getClientAddressString(), e)); - } finally { - server.releaseRecoveryMemory(extent); } if (successful) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 25ea5276729..9d901e4c771 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -29,6 +29,7 @@ import static org.apache.accumulo.core.util.threads.ThreadPools.watchNonCriticalScheduledTask; import java.io.IOException; +import java.io.UncheckedIOException; import java.lang.management.ManagementFactory; import java.net.UnknownHostException; import java.security.SecureRandom; @@ -89,6 +90,7 @@ import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.process.thrift.ServerProcessService; @@ -119,6 +121,7 @@ import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.accumulo.server.log.SortedLogState; import org.apache.accumulo.server.log.WalStateManager; import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; @@ -547,9 +550,14 @@ public void enqueueManagerMessage(ManagerMessage m) { managerMessages.addLast(m); } - void acquireRecoveryMemory(KeyExtent extent) { - if (!extent.isMeta()) { + private static final AutoCloseable NOOP_CLOSEABLE = () -> {}; + + AutoCloseable acquireRecoveryMemory(TabletMetadata tabletMetadata) { + if (tabletMetadata.getExtent().isMeta() || !needsRecovery(tabletMetadata)) { + return NOOP_CLOSEABLE; + } else { recoveryLock.lock(); + return () -> recoveryLock.unlock(); } } @@ -559,6 +567,36 @@ void releaseRecoveryMemory(KeyExtent extent) { } } + public boolean needsRecovery(TabletMetadata tabletMetadata) { + var logEntries = tabletMetadata.getLogs(); + + if (logEntries.isEmpty()) { + return false; + } + + // This method is called prior to volumes being switched for a tablet during the load process, + // so switch volumes before calling needsRecovery() + var switchedLogEntries = new ArrayList(logEntries.size()); + + for (LogEntry logEntry : logEntries) { + var switchedWalog = VolumeUtil.switchVolume(logEntry, context.getVolumeReplacements()); + LogEntry walog; + if (switchedWalog != null) { + log.debug("Volume switched for needsRecovery {} -> {}", logEntry, switchedWalog); + walog = switchedWalog; + } else { + walog = logEntry; + } + switchedLogEntries.add(walog); + } + + try { + return logger.needsRecovery(getContext(), tabletMetadata.getExtent(), switchedLogEntries); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private ServerAddress startServer(String address, TProcessor processor) throws UnknownHostException { @SuppressWarnings("deprecation") @@ -1243,24 +1281,21 @@ public void minorCompactionStarted(CommitSession tablet, long lastUpdateSequence public void recover(VolumeManager fs, KeyExtent extent, List logEntries, Set tabletFiles, MutationReceiver mutationReceiver) throws IOException { - List recoveryDirs = new ArrayList<>(); List sorted = new ArrayList<>(logEntries); sorted.sort((e1, e2) -> (int) (e1.timestamp - e2.timestamp)); + + // Validate that recovery logs exist before attempting recovery for (LogEntry entry : sorted) { - Path recovery = null; Path finished = RecoveryPath.getRecoveryPath(new Path(entry.filename)); finished = SortedLogState.getFinishedMarkerPath(finished); TabletServer.log.debug("Looking for " + finished); - if (fs.exists(finished)) { - recovery = finished.getParent(); - } - if (recovery == null) { + if (!fs.exists(finished)) { throw new IOException( "Unable to find recovery files for extent " + extent + " logEntry: " + entry); } - recoveryDirs.add(recovery); } - logger.recover(getContext(), extent, recoveryDirs, tabletFiles, mutationReceiver); + + logger.recover(getContext(), extent, sorted, tabletFiles, mutationReceiver); } public int createLogId() { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java index f2a1a5c990d..32809d1caa6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.blockfile.impl.CacheProvider; import org.apache.accumulo.core.iterators.IteratorAdapter; import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; import org.apache.accumulo.core.spi.crypto.CryptoService; @@ -50,6 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.cache.Cache; import com.google.common.collect.Iterators; /** @@ -64,11 +66,17 @@ public class RecoveryLogsIterator private final Iterator> iter; private final CryptoEnvironment env = new CryptoEnvironmentImpl(CryptoEnvironment.Scope.RECOVERY); + public RecoveryLogsIterator(ServerContext context, List recoveryLogDirs, + LogFileKey start, LogFileKey end, boolean checkFirstKey) throws IOException { + this(context, recoveryLogDirs, start, end, checkFirstKey, null, null); + } + /** * Scans the files in each recoveryLogDir over the range [start,end]. */ - public RecoveryLogsIterator(ServerContext context, List recoveryLogDirs, LogFileKey start, - LogFileKey end, boolean checkFirstKey) throws IOException { + public RecoveryLogsIterator(ServerContext context, List recoveryLogDirs, + LogFileKey start, LogFileKey end, boolean checkFirstKey, Cache fileLenCache, + CacheProvider cacheProvider) throws IOException { List>> iterators = new ArrayList<>(recoveryLogDirs.size()); fileIters = new ArrayList<>(); @@ -78,20 +86,20 @@ public RecoveryLogsIterator(ServerContext context, List recoveryLogDirs, L final CryptoService cryptoService = context.getCryptoFactory().getService(env, context.getConfiguration().getAllCryptoProperties()); - for (Path logDir : recoveryLogDirs) { - LOG.debug("Opening recovery log dir {}", logDir.getName()); - SortedSet logFiles = getFiles(vm, logDir); - var fs = vm.getFileSystemByPath(logDir); + for (ResolvedSortedLog logDir : recoveryLogDirs) { + LOG.debug("Opening recovery log dir {}", logDir.getDir().getName()); + SortedSet logFiles = logDir.getChildren(); + var fs = vm.getFileSystemByPath(logDir.getDir()); // only check the first key once to prevent extra iterator creation and seeking if (checkFirstKey && !logFiles.isEmpty()) { - validateFirstKey(context, cryptoService, fs, logFiles, logDir); + validateFirstKey(context, cryptoService, fs, logFiles, logDir.getDir(), fileLenCache, + cacheProvider); } for (Path log : logFiles) { - FileSKVIterator fileIter = FileOperations.getInstance().newReaderBuilder() - .forFile(log.toString(), fs, fs.getConf(), cryptoService) - .withTableConfiguration(context.getConfiguration()).seekToBeginning().build(); + FileSKVIterator fileIter = + openLogFile(context, log, cryptoService, fs, fileLenCache, cacheProvider); if (range != null) { fileIter.seek(range, Collections.emptySet(), false); } @@ -134,6 +142,23 @@ public void close() throws IOException { } } + FileSKVIterator openLogFile(ServerContext context, Path logFile, CryptoService cs, FileSystem fs, + Cache fileLenCache, CacheProvider cacheProvider) throws IOException { + var builder = FileOperations.getInstance().newReaderBuilder() + .forFile(logFile.toString(), fs, fs.getConf(), cs) + .withTableConfiguration(context.getConfiguration()); + + if (fileLenCache != null) { + builder = builder.withFileLenCache(fileLenCache); + } + + if (cacheProvider != null) { + builder = builder.withCacheProvider(cacheProvider); + } + + return builder.seekToBeginning().build(); + } + /** * Check for sorting signal files (finished/failed) and get the logs in the provided directory. */ @@ -169,10 +194,10 @@ private SortedSet getFiles(VolumeManager fs, Path directory) throws IOExce * Check that the first entry in the WAL is OPEN. Only need to do this once. */ private void validateFirstKey(ServerContext context, CryptoService cs, FileSystem fs, - SortedSet logFiles, Path fullLogPath) throws IOException { - try (FileSKVIterator fileIter = FileOperations.getInstance().newReaderBuilder() - .forFile(logFiles.first().toString(), fs, fs.getConf(), cs) - .withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) { + SortedSet logFiles, Path fullLogPath, Cache fileLenCache, + CacheProvider cacheProvider) throws IOException { + try (FileSKVIterator fileIter = + openLogFile(context, logFiles.first(), cs, fs, fileLenCache, cacheProvider)) { Iterator> iterator = new IteratorAdapter(fileIter); if (iterator.hasNext()) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java new file mode 100644 index 00000000000..06f23cd120f --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.tserver.log; + +import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.log.SortedLogState; +import org.apache.accumulo.server.manager.recovery.RecoveryPath; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * Write ahead logs have two paths in DFS. There is the path of the original unsorted walog and the + * path of the sorted walog. The purpose of this class is to convert the unsorted wal path to a + * sorted wal path and validate the sorted dir exists and is finished. + */ +public class ResolvedSortedLog { + + private final SortedSet children; + private final LogEntry origin; + private final Path sortedLogDir; + + private ResolvedSortedLog(LogEntry origin, Path sortedLogDir, SortedSet children) { + this.origin = origin; + this.sortedLogDir = sortedLogDir; + this.children = Collections.unmodifiableSortedSet(children); + } + + /** + * @return the unsorted walog path from which this was created. + */ + public LogEntry getOrigin() { + return origin; + } + + /** + * @return the path of the directory in which sorted logs are stored + */ + public Path getDir() { + return sortedLogDir; + } + + /** + * @return When an unsorted walog is sorted the sorted data is stored in one or more rfiles, this + * returns the paths of those rfiles. + */ + public SortedSet getChildren() { + return children; + } + + @Override + public String toString() { + return sortedLogDir.toString(); + } + + /** + * For a given path of an unsorted walog check to see if the corresponding sorted log dir exists + * and is finished. If it is return an immutable object containing information about the sorted + * walogs. + */ + public static ResolvedSortedLog resolve(LogEntry logEntry, VolumeManager fs) throws IOException { + + // convert the path of an unsorted log to the expected path for the corresponding sorted log + // dir + Path sortedLogPath = RecoveryPath.getRecoveryPath(new Path(logEntry.filename)); + + boolean foundFinish = false; + // Path::getName compares the last component of each Path value. In this case, the last + // component should + // always have the format 'part-r-XXXXX.rf', where XXXXX are one-up values. + SortedSet logFiles = new TreeSet<>(Comparator.comparing(Path::getName)); + for (FileStatus child : fs.listStatus(sortedLogPath)) { + if (child.getPath().getName().startsWith("_")) { + continue; + } + if (SortedLogState.isFinished(child.getPath().getName())) { + foundFinish = true; + continue; + } + if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) { + continue; + } + FileSystem ns = fs.getFileSystemByPath(child.getPath()); + Path fullLogPath = ns.makeQualified(child.getPath()); + logFiles.add(fullLogPath); + } + if (!foundFinish) { + throw new IOException("Sort '" + SortedLogState.FINISHED.getMarker() + "' flag not found in " + + sortedLogPath + " for walog " + logEntry.filename); + } + + return new ResolvedSortedLog(logEntry, sortedLogPath, logFiles); + } + + /** + * Create a ResolvedSortedLog directly from a sorted log directory path. This is useful for + * diagnostic tools that operate directly on sorted recovery logs without going through the normal + * recovery flow with LogEntry objects. + */ + public static ResolvedSortedLog fromSortedLogDir(Path sortedLogDir, VolumeManager fs) + throws IOException { + boolean foundFinish = false; + SortedSet logFiles = new TreeSet<>(Comparator.comparing(Path::getName)); + for (FileStatus child : fs.listStatus(sortedLogDir)) { + if (child.getPath().getName().startsWith("_")) { + continue; + } + if (SortedLogState.isFinished(child.getPath().getName())) { + foundFinish = true; + continue; + } + if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) { + continue; + } + FileSystem ns = fs.getFileSystemByPath(child.getPath()); + Path fullLogPath = ns.makeQualified(child.getPath()); + logFiles.add(fullLogPath); + } + if (!foundFinish) { + throw new IOException( + "Sort '" + SortedLogState.FINISHED.getMarker() + "' flag not found in " + sortedLogDir); + } + + // Create a dummy LogEntry for the origin (used only for diagnostics) + LogEntry dummyOrigin = new LogEntry(null, 0, sortedLogDir.toString()); + return new ResolvedSortedLog(dummyOrigin, sortedLogDir, logFiles); + } +} diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java index 40e8496ba79..71210de1203 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java @@ -42,6 +42,7 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.file.blockfile.impl.CacheProvider; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.tserver.logger.LogEvents; @@ -51,6 +52,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.cache.Cache; import com.google.common.collect.Collections2; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; @@ -65,8 +67,23 @@ public class SortedLogRecovery { private final ServerContext context; - public SortedLogRecovery(ServerContext context) { + private final CacheProvider cacheProvider; + + private final Cache fileLenCache; + + public SortedLogRecovery(ServerContext context, Cache fileLenCache, + CacheProvider cacheProvider) { this.context = context; + this.cacheProvider = cacheProvider; + this.fileLenCache = fileLenCache; + } + + public boolean needsRecovery(KeyExtent extent, List recoveryDirs) + throws IOException { + Entry> maxEntry = + findLogsThatDefineTablet(extent, recoveryDirs); + int tabletId = maxEntry.getKey(); + return tabletId != -1; } static LogFileKey maxKey(LogEvents event) { @@ -100,11 +117,12 @@ static LogFileKey minKey(LogEvents event, int tabletId) { return key; } - private int findMaxTabletId(KeyExtent extent, List recoveryLogDirs) throws IOException { + private int findMaxTabletId(KeyExtent extent, List recoveryLogDirs) + throws IOException { int tabletId = -1; try (var rli = new RecoveryLogsIterator(context, recoveryLogDirs, minKey(DEFINE_TABLET), - maxKey(DEFINE_TABLET), true)) { + maxKey(DEFINE_TABLET), true, fileLenCache, cacheProvider)) { KeyExtent alternative = extent; if (extent.isRootTablet()) { @@ -140,18 +158,18 @@ private int findMaxTabletId(KeyExtent extent, List recoveryLogDirs) throws * @return The maximum tablet ID observed AND the list of logs that contained the maximum tablet * ID. */ - private Entry> findLogsThatDefineTablet(KeyExtent extent, - List recoveryDirs) throws IOException { - Map> logsThatDefineTablet = new HashMap<>(); + private Entry> findLogsThatDefineTablet(KeyExtent extent, + List recoveryDirs) throws IOException { + Map> logsThatDefineTablet = new HashMap<>(); - for (Path walDir : recoveryDirs) { + for (ResolvedSortedLog walDir : recoveryDirs) { int tabletId = findMaxTabletId(extent, Collections.singletonList(walDir)); if (tabletId == -1) { - log.debug("Did not find tablet {} in recovery log {}", extent, walDir.getName()); + log.debug("Did not find tablet {} in recovery log {}", extent, walDir.getDir().getName()); } else { logsThatDefineTablet.computeIfAbsent(tabletId, k -> new ArrayList<>()).add(walDir); log.debug("Found tablet {} with id {} in recovery log {}", extent, tabletId, - walDir.getName()); + walDir.getDir().getName()); } } @@ -196,8 +214,8 @@ public Entry next() { } - private long findRecoverySeq(List recoveryLogs, Set tabletFiles, int tabletId) - throws IOException { + private long findRecoverySeq(List recoveryLogs, Set tabletFiles, + int tabletId) throws IOException { HashSet suffixes = new HashSet<>(); for (String path : tabletFiles) { suffixes.add(getPathSuffix(path)); @@ -207,8 +225,9 @@ private long findRecoverySeq(List recoveryLogs, Set tabletFiles, i long lastFinish = 0; long recoverySeq = 0; - try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, recoveryLogs, - minKey(COMPACTION_START, tabletId), maxKey(COMPACTION_START, tabletId), false)) { + try (RecoveryLogsIterator rli = + new RecoveryLogsIterator(context, recoveryLogs, minKey(COMPACTION_START, tabletId), + maxKey(COMPACTION_START, tabletId), false, fileLenCache, cacheProvider)) { DeduplicatingIterator ddi = new DeduplicatingIterator(rli); @@ -258,14 +277,15 @@ private long findRecoverySeq(List recoveryLogs, Set tabletFiles, i return recoverySeq; } - private void playbackMutations(List recoveryLogs, MutationReceiver mr, int tabletId, - long recoverySeq) throws IOException { + private void playbackMutations(List recoveryLogs, MutationReceiver mr, + int tabletId, long recoverySeq) throws IOException { LogFileKey start = minKey(MUTATION, tabletId); start.setSeq(recoverySeq); LogFileKey end = maxKey(MUTATION, tabletId); - try (var rli = new RecoveryLogsIterator(context, recoveryLogs, start, end, false)) { + try (var rli = new RecoveryLogsIterator(context, recoveryLogs, start, end, false, fileLenCache, + cacheProvider)) { while (rli.hasNext()) { Entry entry = rli.next(); LogFileKey logFileKey = entry.getKey(); @@ -286,20 +306,21 @@ private void playbackMutations(List recoveryLogs, MutationReceiver mr, int } } - Collection asNames(List recoveryLogs) { - return Collections2.transform(recoveryLogs, Path::getName); + Collection asNames(List recoveryLogs) { + return Collections2.transform(recoveryLogs, rl -> rl.getDir().getName()); } - public void recover(KeyExtent extent, List recoveryDirs, Set tabletFiles, - MutationReceiver mr) throws IOException { + public void recover(KeyExtent extent, List recoveryDirs, + Set tabletFiles, MutationReceiver mr) throws IOException { - Entry> maxEntry = findLogsThatDefineTablet(extent, recoveryDirs); + Entry> maxEntry = + findLogsThatDefineTablet(extent, recoveryDirs); // A tablet may leave a tserver and then come back, in which case it would have a different and // higher tablet id. Only want to consider events in the log related to the last time the tablet // was loaded. int tabletId = maxEntry.getKey(); - List logsThatDefineTablet = maxEntry.getValue(); + List logsThatDefineTablet = maxEntry.getValue(); if (tabletId == -1) { log.info("Tablet {} is not defined in recovery logs {} ", extent, asNames(recoveryDirs)); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 5cc5c26e56d..c0084f84105 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -40,7 +41,12 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; +import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider; +import org.apache.accumulo.core.file.blockfile.impl.CacheProvider; +import org.apache.accumulo.core.logging.LoggingBlockCache; import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.spi.cache.CacheType; +import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.Retry.RetryFactory; @@ -51,6 +57,7 @@ import org.apache.accumulo.server.util.ReplicationTableUtil; import org.apache.accumulo.tserver.TabletMutations; import org.apache.accumulo.tserver.TabletServer; +import org.apache.accumulo.tserver.TabletServerResourceManager; import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation; import org.apache.accumulo.tserver.log.DfsLogger.ServerResources; import org.apache.accumulo.tserver.tablet.CommitSession; @@ -58,6 +65,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + /** * Central logging facility for the TServerInfo. * @@ -76,6 +86,9 @@ public class TabletServerLogger { private final TabletServer tserver; + // Cache for resolved sorted logs to avoid repeated expensive I/O operations + private final Cache sortedLogCache; + // The current logger private DfsLogger currentLog = null; private final SynchronousQueue nextLog = new SynchronousQueue<>(); @@ -157,6 +170,8 @@ public TabletServerLogger(TabletServer tserver, long maxSize, AtomicLong syncCou this.createRetry = null; this.writeRetryFactory = writeRetryFactory; this.maxAge = maxAge; + this.sortedLogCache = + CacheBuilder.newBuilder().expireAfterWrite(3, TimeUnit.SECONDS).maximumSize(1000).build(); } private DfsLogger initializeLoggers(final AtomicInteger logIdOut) throws IOException { @@ -553,11 +568,48 @@ public long minorCompactionStarted(final CommitSession commitSession, final long return seq; } - public void recover(ServerContext context, KeyExtent extent, List recoveryDirs, + private List resolve(Collection walogs) throws IOException { + List sortedLogs = new ArrayList<>(walogs.size()); + VolumeManager fs = tserver.getContext().getVolumeManager(); + for (var logEntry : walogs) { + try { + ResolvedSortedLog resolvedLog = + sortedLogCache.get(logEntry, () -> ResolvedSortedLog.resolve(logEntry, fs)); + sortedLogs.add(resolvedLog); + } catch (Exception e) { + throw new IOException("Failed to resolve sorted log for " + logEntry.filename, e); + } + } + return sortedLogs; + } + + private CacheProvider createCacheProvider(TabletServerResourceManager resourceMgr) { + return new BasicCacheProvider( + LoggingBlockCache.wrap(CacheType.INDEX, resourceMgr.getIndexCache()), + LoggingBlockCache.wrap(CacheType.DATA, resourceMgr.getDataCache())); + } + + public boolean needsRecovery(ServerContext context, KeyExtent extent, Collection walogs) + throws IOException { + try { + var resourceMgr = tserver.getResourceManager(); + var cacheProvider = createCacheProvider(resourceMgr); + SortedLogRecovery recovery = + new SortedLogRecovery(context, resourceMgr.getFileLenCache(), cacheProvider); + return recovery.needsRecovery(extent, resolve(walogs)); + } catch (Exception e) { + throw new IOException(e); + } + } + + public void recover(ServerContext context, KeyExtent extent, Collection walogs, Set tabletFiles, MutationReceiver mr) throws IOException { try { - SortedLogRecovery recovery = new SortedLogRecovery(context); - recovery.recover(extent, recoveryDirs, tabletFiles, mr); + var resourceMgr = tserver.getResourceManager(); + var cacheProvider = createCacheProvider(resourceMgr); + SortedLogRecovery recovery = + new SortedLogRecovery(context, resourceMgr.getFileLenCache(), cacheProvider); + recovery.recover(extent, resolve(walogs), tabletFiles, mr); } catch (Exception e) { throw new IOException(e); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java index 0d9ea4fd142..cb4d7eca42e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java @@ -48,6 +48,7 @@ import org.apache.accumulo.tserver.log.DfsLogger; import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException; import org.apache.accumulo.tserver.log.RecoveryLogsIterator; +import org.apache.accumulo.tserver.log.ResolvedSortedLog; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -173,8 +174,9 @@ public void execute(String[] args) throws Exception { } else { // read the log entries in a sorted RFile. This has to be a directory that contains the // finished file. - try (var rli = new RecoveryLogsIterator(context, Collections.singletonList(path), null, - null, false)) { + try (var rli = new RecoveryLogsIterator(context, + Collections.singletonList(ResolvedSortedLog.fromSortedLogDir(path, fs)), null, null, + false)) { while (rli.hasNext()) { Entry entry = rli.next(); printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds, diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java index 75d933db372..b6473fd494f 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java @@ -132,7 +132,8 @@ public void testSimpleRLI() throws IOException { createRecoveryDir(logs, dirs, true); - try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, dirs, null, null, false)) { + try (RecoveryLogsIterator rli = + new RecoveryLogsIterator(context, pathsToResolvedLogs(dirs), null, null, false)) { while (rli.hasNext()) { Entry entry = rli.next(); assertEquals(1, entry.getKey().getTabletId(), "TabletId does not match"); @@ -159,7 +160,7 @@ public void testFinishMarker() throws IOException { createRecoveryDir(logs, dirs, false); assertThrows(IOException.class, - () -> new RecoveryLogsIterator(context, dirs, null, null, false), + () -> new RecoveryLogsIterator(context, pathsToResolvedLogs(dirs), null, null, false), "Finish marker should not be found"); } @@ -168,9 +169,10 @@ public void testSingleFile() throws IOException { String destPath = workDir + "/test.rf"; fs.create(new Path(destPath)); - assertThrows( - IOException.class, () -> new RecoveryLogsIterator(context, - Collections.singletonList(new Path(destPath)), null, null, false), + assertThrows(IOException.class, + () -> new RecoveryLogsIterator(context, + Collections.singletonList(ResolvedSortedLog.fromSortedLogDir(new Path(destPath), fs)), + null, null, false), "Finish marker should not be found for a single file."); } @@ -192,7 +194,7 @@ public void testCheckFirstKeyFailed() throws IOException { createRecoveryDir(logs, dirs, true); assertThrows(IllegalStateException.class, - () -> new RecoveryLogsIterator(context, dirs, null, null, true), + () -> new RecoveryLogsIterator(context, pathsToResolvedLogs(dirs), null, null, true), "First log entry is not OPEN so exception should be thrown."); } @@ -219,7 +221,8 @@ public void testCheckFirstKeyPass() throws IOException { createRecoveryDir(logs, dirs, true); - try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, dirs, null, null, true)) { + try (RecoveryLogsIterator rli = + new RecoveryLogsIterator(context, pathsToResolvedLogs(dirs), null, null, true)) { while (rli.hasNext()) { Entry entry = rli.next(); assertNotNull(entry.getKey()); @@ -248,4 +251,12 @@ private void createRecoveryDir(Map logs, ArrayList dirs dirs.add(new Path(destPath)); } } + + private List pathsToResolvedLogs(List dirs) throws IOException { + List resolvedLogs = new ArrayList<>(); + for (Path dir : dirs) { + resolvedLogs.add(ResolvedSortedLog.fromSortedLogDir(dir, fs)); + } + return resolvedLogs; + } } diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java index 1e4a79c71c6..592c9c0f946 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java @@ -51,10 +51,15 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCache; +import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider; +import org.apache.accumulo.core.file.blockfile.impl.CacheProvider; import org.apache.accumulo.core.file.rfile.bcfile.Compression; import org.apache.accumulo.core.file.rfile.bcfile.CompressionAlgorithm; import org.apache.accumulo.core.file.rfile.bcfile.Utils; import org.apache.accumulo.core.file.streams.SeekableDataInputStream; +import org.apache.accumulo.core.spi.cache.CacheType; import org.apache.accumulo.core.spi.crypto.CryptoServiceFactory; import org.apache.accumulo.core.spi.crypto.GenericCryptoServiceFactory; import org.apache.accumulo.core.util.Pair; @@ -75,6 +80,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input") @@ -165,6 +173,15 @@ private List recover(Map logs, KeyExtent extent) th private List recover(Map logs, Set files, KeyExtent extent, int bufferSize) throws IOException { + CacheProvider cacheProvider = new BasicCacheProvider( + new TinyLfuBlockCache( + BlockCacheConfiguration.forTabletServer(DefaultConfiguration.getInstance()), + CacheType.INDEX), + new TinyLfuBlockCache( + BlockCacheConfiguration.forTabletServer(DefaultConfiguration.getInstance()), + CacheType.DATA)); + Cache fileLenCache = CacheBuilder.newBuilder().build(); + final String workdir = new File(tempDir, testName()).getAbsolutePath(); try (var fs = VolumeManagerImpl.getLocalForTesting(workdir)) { CryptoServiceFactory cryptoFactory = new GenericCryptoServiceFactory(); @@ -198,9 +215,14 @@ private List recover(Map logs, Set files, K dirs.add(new Path(destPath)); } // Recover - SortedLogRecovery recovery = new SortedLogRecovery(context); + SortedLogRecovery recovery = new SortedLogRecovery(context, fileLenCache, cacheProvider); CaptureMutations capture = new CaptureMutations(); - recovery.recover(extent, dirs, files, capture); + // Convert Path objects to ResolvedSortedLog objects for recovery + List resolvedLogs = new ArrayList<>(); + for (Path dir : dirs) { + resolvedLogs.add(ResolvedSortedLog.fromSortedLogDir(dir, fs)); + } + recovery.recover(extent, resolvedLogs, files, capture); verify(context); return capture.result; }