Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ public static Path switchVolume(String path, FileType ft, List<Pair<Path,Path>>
return null;
}

public static LogEntry switchVolume(LogEntry le, List<Pair<Path,Path>> replacements) {
return switchVolumes(le, replacements);
}

protected static LogEntry switchVolumes(LogEntry le, List<Pair<Path,Path>> replacements) {
Path switchedPath = switchVolume(le.filename, FileType.WAL, replacements);
String switchedString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,44 +157,45 @@ public void run() {
boolean successful = false;

try {
server.acquireRecoveryMemory(extent);

TabletResourceManager trm = server.resourceManager.createTabletResourceManager(extent,
server.getTableConfiguration(extent));
TabletData data = new TabletData(tabletMetadata);

tablet = new Tablet(server, extent, trm, data);
// If a minor compaction starts after a tablet opens, this indicates a log recovery
// occurred. This recovered data must be minor compacted.
// There are three reasons to wait for this minor compaction to finish before placing the
// tablet in online tablets.
//
// 1) The log recovery code does not handle data written to the tablet on multiple tablet
// servers.
// 2) The log recovery code does not block if memory is full. Therefore recovering lots of
// tablets that use a lot of memory could run out of memory.
// 3) The minor compaction finish event did not make it to the logs (the file will be in
// metadata, preventing replay of compacted data)... but do not
// want a majc to wipe the file out from metadata and then have another process failure...
// this could cause duplicate data to replay.
if (tablet.getNumEntriesInMemory() > 0
&& !tablet.minorCompactNow(MinorCompactionReason.RECOVERY)) {
throw new RuntimeException("Minor compaction after recovery fails for " + extent);
}
Assignment assignment =
new Assignment(extent, server.getTabletSession(), tabletMetadata.getLast());
TabletStateStore.setLocation(server.getContext(), assignment);
try (var recoveryLock = server.acquireRecoveryMemory(tabletMetadata)) {

TabletResourceManager trm = server.resourceManager.createTabletResourceManager(extent,
server.getTableConfiguration(extent));
TabletData data = new TabletData(tabletMetadata);

tablet = new Tablet(server, extent, trm, data);
// If a minor compaction starts after a tablet opens, this indicates a log recovery
// occurred. This recovered data must be minor compacted.
// There are three reasons to wait for this minor compaction to finish before placing the
// tablet in online tablets.
//
// 1) The log recovery code does not handle data written to the tablet on multiple tablet
// servers.
// 2) The log recovery code does not block if memory is full. Therefore recovering lots of
// tablets that use a lot of memory could run out of memory.
// 3) The minor compaction finish event did not make it to the logs (the file will be in
// metadata, preventing replay of compacted data)... but do not
// want a majc to wipe the file out from metadata and then have another process failure...
// this could cause duplicate data to replay.
if (tablet.getNumEntriesInMemory() > 0
&& !tablet.minorCompactNow(MinorCompactionReason.RECOVERY)) {
throw new RuntimeException("Minor compaction after recovery fails for " + extent);
}
Assignment assignment =
new Assignment(extent, server.getTabletSession(), tabletMetadata.getLast());
TabletStateStore.setLocation(server.getContext(), assignment);

synchronized (server.openingTablets) {
synchronized (server.onlineTablets) {
server.openingTablets.remove(extent);
server.onlineTablets.put(extent, tablet);
server.openingTablets.notifyAll();
server.recentlyUnloadedCache.remove(tablet.getExtent());
synchronized (server.openingTablets) {
synchronized (server.onlineTablets) {
server.openingTablets.remove(extent);
server.onlineTablets.put(extent, tablet);
server.openingTablets.notifyAll();
server.recentlyUnloadedCache.remove(tablet.getExtent());
}
}
tablet = null; // release this reference
successful = true;
}
tablet = null; // release this reference
successful = true;
} catch (Exception e) {
log.warn("exception trying to assign tablet {} {}", extent, locationToOpen, e);

Expand All @@ -205,8 +206,6 @@ public void run() {
TableId tableId = extent.tableId();
ProblemReports.getInstance(server.getContext()).report(new ProblemReport(tableId, TABLET_LOAD,
extent.getUUID().toString(), server.getClientAddressString(), e));
} finally {
server.releaseRecoveryMemory(extent);
}

if (successful) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.apache.accumulo.core.util.threads.ThreadPools.watchNonCriticalScheduledTask;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.security.SecureRandom;
Expand Down Expand Up @@ -89,6 +90,7 @@
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.process.thrift.ServerProcessService;
Expand Down Expand Up @@ -119,6 +121,7 @@
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeUtil;
import org.apache.accumulo.server.log.SortedLogState;
import org.apache.accumulo.server.log.WalStateManager;
import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
Expand Down Expand Up @@ -547,9 +550,14 @@ public void enqueueManagerMessage(ManagerMessage m) {
managerMessages.addLast(m);
}

void acquireRecoveryMemory(KeyExtent extent) {
if (!extent.isMeta()) {
private static final AutoCloseable NOOP_CLOSEABLE = () -> {};

AutoCloseable acquireRecoveryMemory(TabletMetadata tabletMetadata) {
if (tabletMetadata.getExtent().isMeta() || !needsRecovery(tabletMetadata)) {
return NOOP_CLOSEABLE;
} else {
recoveryLock.lock();
return () -> recoveryLock.unlock();
}
}

Expand All @@ -559,6 +567,36 @@ void releaseRecoveryMemory(KeyExtent extent) {
}
}

public boolean needsRecovery(TabletMetadata tabletMetadata) {
var logEntries = tabletMetadata.getLogs();

if (logEntries.isEmpty()) {
return false;
}

// This method is called prior to volumes being switched for a tablet during the load process,
// so switch volumes before calling needsRecovery()
var switchedLogEntries = new ArrayList<LogEntry>(logEntries.size());

for (LogEntry logEntry : logEntries) {
var switchedWalog = VolumeUtil.switchVolume(logEntry, context.getVolumeReplacements());
LogEntry walog;
if (switchedWalog != null) {
log.debug("Volume switched for needsRecovery {} -> {}", logEntry, switchedWalog);
walog = switchedWalog;
} else {
walog = logEntry;
}
switchedLogEntries.add(walog);
}

try {
return logger.needsRecovery(getContext(), tabletMetadata.getExtent(), switchedLogEntries);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private ServerAddress startServer(String address, TProcessor processor)
throws UnknownHostException {
@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -1243,24 +1281,21 @@ public void minorCompactionStarted(CommitSession tablet, long lastUpdateSequence

public void recover(VolumeManager fs, KeyExtent extent, List<LogEntry> logEntries,
Set<String> tabletFiles, MutationReceiver mutationReceiver) throws IOException {
List<Path> recoveryDirs = new ArrayList<>();
List<LogEntry> sorted = new ArrayList<>(logEntries);
sorted.sort((e1, e2) -> (int) (e1.timestamp - e2.timestamp));

// Validate that recovery logs exist before attempting recovery
for (LogEntry entry : sorted) {
Path recovery = null;
Path finished = RecoveryPath.getRecoveryPath(new Path(entry.filename));
finished = SortedLogState.getFinishedMarkerPath(finished);
TabletServer.log.debug("Looking for " + finished);
if (fs.exists(finished)) {
recovery = finished.getParent();
}
if (recovery == null) {
if (!fs.exists(finished)) {
throw new IOException(
"Unable to find recovery files for extent " + extent + " logEntry: " + entry);
}
recoveryDirs.add(recovery);
}
logger.recover(getContext(), extent, recoveryDirs, tabletFiles, mutationReceiver);

logger.recover(getContext(), extent, sorted, tabletFiles, mutationReceiver);
}

public int createLogId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
import org.apache.accumulo.core.iterators.IteratorAdapter;
import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
import org.apache.accumulo.core.spi.crypto.CryptoService;
Expand All @@ -50,6 +51,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.cache.Cache;
import com.google.common.collect.Iterators;

/**
Expand All @@ -64,11 +66,17 @@ public class RecoveryLogsIterator
private final Iterator<Entry<Key,Value>> iter;
private final CryptoEnvironment env = new CryptoEnvironmentImpl(CryptoEnvironment.Scope.RECOVERY);

public RecoveryLogsIterator(ServerContext context, List<ResolvedSortedLog> recoveryLogDirs,
LogFileKey start, LogFileKey end, boolean checkFirstKey) throws IOException {
this(context, recoveryLogDirs, start, end, checkFirstKey, null, null);
}

/**
* Scans the files in each recoveryLogDir over the range [start,end].
*/
public RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
LogFileKey end, boolean checkFirstKey) throws IOException {
public RecoveryLogsIterator(ServerContext context, List<ResolvedSortedLog> recoveryLogDirs,
LogFileKey start, LogFileKey end, boolean checkFirstKey, Cache<String,Long> fileLenCache,
CacheProvider cacheProvider) throws IOException {

List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
fileIters = new ArrayList<>();
Expand All @@ -78,20 +86,20 @@ public RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, L
final CryptoService cryptoService = context.getCryptoFactory().getService(env,
context.getConfiguration().getAllCryptoProperties());

for (Path logDir : recoveryLogDirs) {
LOG.debug("Opening recovery log dir {}", logDir.getName());
SortedSet<Path> logFiles = getFiles(vm, logDir);
var fs = vm.getFileSystemByPath(logDir);
for (ResolvedSortedLog logDir : recoveryLogDirs) {
LOG.debug("Opening recovery log dir {}", logDir.getDir().getName());
SortedSet<Path> logFiles = logDir.getChildren();
var fs = vm.getFileSystemByPath(logDir.getDir());

// only check the first key once to prevent extra iterator creation and seeking
if (checkFirstKey && !logFiles.isEmpty()) {
validateFirstKey(context, cryptoService, fs, logFiles, logDir);
validateFirstKey(context, cryptoService, fs, logFiles, logDir.getDir(), fileLenCache,
cacheProvider);
}

for (Path log : logFiles) {
FileSKVIterator fileIter = FileOperations.getInstance().newReaderBuilder()
.forFile(log.toString(), fs, fs.getConf(), cryptoService)
.withTableConfiguration(context.getConfiguration()).seekToBeginning().build();
FileSKVIterator fileIter =
openLogFile(context, log, cryptoService, fs, fileLenCache, cacheProvider);
if (range != null) {
fileIter.seek(range, Collections.emptySet(), false);
}
Expand Down Expand Up @@ -134,6 +142,23 @@ public void close() throws IOException {
}
}

FileSKVIterator openLogFile(ServerContext context, Path logFile, CryptoService cs, FileSystem fs,
Cache<String,Long> fileLenCache, CacheProvider cacheProvider) throws IOException {
var builder = FileOperations.getInstance().newReaderBuilder()
.forFile(logFile.toString(), fs, fs.getConf(), cs)
.withTableConfiguration(context.getConfiguration());

if (fileLenCache != null) {
builder = builder.withFileLenCache(fileLenCache);
}

if (cacheProvider != null) {
builder = builder.withCacheProvider(cacheProvider);
}

return builder.seekToBeginning().build();
}

/**
* Check for sorting signal files (finished/failed) and get the logs in the provided directory.
*/
Expand Down Expand Up @@ -169,10 +194,10 @@ private SortedSet<Path> getFiles(VolumeManager fs, Path directory) throws IOExce
* Check that the first entry in the WAL is OPEN. Only need to do this once.
*/
private void validateFirstKey(ServerContext context, CryptoService cs, FileSystem fs,
SortedSet<Path> logFiles, Path fullLogPath) throws IOException {
try (FileSKVIterator fileIter = FileOperations.getInstance().newReaderBuilder()
.forFile(logFiles.first().toString(), fs, fs.getConf(), cs)
.withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) {
SortedSet<Path> logFiles, Path fullLogPath, Cache<String,Long> fileLenCache,
CacheProvider cacheProvider) throws IOException {
try (FileSKVIterator fileIter =
openLogFile(context, logFiles.first(), cs, fs, fileLenCache, cacheProvider)) {
Iterator<Entry<Key,Value>> iterator = new IteratorAdapter(fileIter);

if (iterator.hasNext()) {
Expand Down
Loading