Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ private IoTDBConstant() {}
public static final String DATA_FOLDER_NAME = "data";
public static final String SEQUENCE_FLODER_NAME = "sequence";
public static final String UNSEQUENCE_FLODER_NAME = "unsequence";
public static final String COMPACTION_BACKUP_FOLDER_NAME = "compaction_back";
public static final String FILE_NAME_SEPARATOR = "-";
public static final String UPGRADE_FOLDER_NAME = "upgrade";
public static final String CONSENSUS_FOLDER_NAME = "consensus";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/** CompactionMergeTaskPoolManager provides a ThreadPool tPro queue and run all compaction tasks. */
public class CompactionTaskManager implements IService {
Expand Down Expand Up @@ -79,6 +80,7 @@ public class CompactionTaskManager implements IService {

private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private volatile boolean init = false;
public static final AtomicLong compactionId = new AtomicLong(0);

public static CompactionTaskManager getInstance() {
return INSTANCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.iotdb.db.engine.compaction;

import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
Expand All @@ -34,9 +36,12 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -256,4 +261,121 @@ public static void updatePlanIndexes(
}
}
}

public static void takeSnapshot(
TsFileManager tsFileManager, String storageGroupName, long timePartition, long compactionId)
throws IOException {
File targetBaseDir =
new File(
IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0]
+ File.separator
+ IoTDBConstant.COMPACTION_BACKUP_FOLDER_NAME
+ File.separator
+ compactionId
+ File.separator
+ storageGroupName
+ File.separator
+ timePartition);
if (!targetBaseDir.exists() && !targetBaseDir.mkdirs()) {
throw new IOException("Failed to create " + targetBaseDir.getAbsolutePath());
}
tsFileManager.readLock();
try {
List<TsFileResource> sequenceFiles =
tsFileManager.getSequenceListByTimePartition(timePartition);
for (TsFileResource resource : sequenceFiles) {
File targetFile =
new File(
targetBaseDir
+ File.separator
+ IoTDBConstant.SEQUENCE_FLODER_NAME
+ File.separator
+ resource.getTsFile().getName());
File targetResource =
new File(
targetBaseDir
+ File.separator
+ IoTDBConstant.SEQUENCE_FLODER_NAME
+ File.separator
+ resource.getTsFile().getName()
+ TsFileResource.RESOURCE_SUFFIX);
if (!targetFile.getParentFile().exists()) {
targetFile.getParentFile().mkdirs();
}
Files.createLink(targetFile.toPath(), resource.getTsFile().toPath());
Files.createLink(
targetResource.toPath(),
new File(resource.getTsFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX)
.toPath());
}

List<TsFileResource> unsequenceFiles =
tsFileManager.getUnsequenceListByTimePartition(timePartition);
for (TsFileResource resource : unsequenceFiles) {
File targetFile =
new File(
targetBaseDir
+ File.separator
+ IoTDBConstant.UNSEQUENCE_FLODER_NAME
+ File.separator
+ resource.getTsFile().getName());
File targetResource =
new File(
targetBaseDir
+ File.separator
+ IoTDBConstant.UNSEQUENCE_FLODER_NAME
+ File.separator
+ resource.getTsFile().getName()
+ TsFileResource.RESOURCE_SUFFIX);
if (!targetFile.getParentFile().exists()) {
targetFile.getParentFile().mkdirs();
}
Files.createLink(targetFile.toPath(), resource.getTsFile().toPath());
Files.createLink(
targetResource.toPath(),
new File(resource.getTsFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX)
.toPath());
}
} finally {
tsFileManager.readUnlock();
}
}

public static boolean validateTsFileResources(
TsFileManager manager, String storageGroupName, long timePartition, long compactionId) {
List<TsFileResource> resources =
manager.getSequenceListByTimePartition(timePartition).getArrayList();
resources.sort(
(f1, f2) ->
Long.compareUnsigned(
Long.parseLong(f1.getTsFile().getName().split("-")[0]),
Long.parseLong(f2.getTsFile().getName().split("-")[0])));
Map<String, Long> lastEndTimeMap = new HashMap<>();
TsFileResource prevTsFileResource = null;
for (TsFileResource resource : resources) {
Set<String> devices = resource.getDevices();
for (String device : devices) {
long currentStartTime = resource.getStartTime(device);
long currentEndTime = resource.getEndTime(device);
long lastEndTime = lastEndTimeMap.computeIfAbsent(device, x -> Long.MIN_VALUE);
if (lastEndTime >= currentStartTime) {
logger.error(
"{} [TaskId: {}] Device {} is overlapped between {} and {}, end time in {} is {}, start time in {} is {}",
storageGroupName,
compactionId,
device,
prevTsFileResource,
resource,
prevTsFileResource,
lastEndTime,
resource,
currentStartTime);
return false;
}
lastEndTimeMap.put(device, currentEndTime);
}
prevTsFileResource = resource;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,10 @@ protected void doCompaction() {
}

LOGGER.info(
"{}-{} [Compaction] CrossSpaceCompaction task starts with {} seq files and {} unsequence files. Sequence files : {}, unsequence files : {} . Sequence files size is {} MB, unsequence file size is {} MB, total size is {} MB",
"{}-{} [Compaction] [Task-id: {}] CrossSpaceCompaction task starts with {} seq files and {} unsequence files. Sequence files : {}, unsequence files : {} . Sequence files size is {} MB, unsequence file size is {} MB, total size is {} MB",
storageGroupName,
dataRegionId,
compactionId,
selectedSequenceFiles.size(),
selectedUnsequenceFiles.size(),
selectedSequenceFiles,
Expand All @@ -151,6 +152,9 @@ protected void doCompaction() {
// restart recovery
compactionLogger.close();

CompactionUtils.takeSnapshot(
tsFileManager, storageGroupName + "-" + dataRegionId, timePartition, compactionId);

performer.setSourceFiles(selectedSequenceFiles, selectedUnsequenceFiles);
performer.setTargetFiles(targetTsfileResourceList);
performer.setSummary(summary);
Expand All @@ -169,6 +173,20 @@ protected void doCompaction() {
timePartition,
true);

if (!CompactionUtils.validateTsFileResources(
tsFileManager, storageGroupName, timePartition, compactionId)) {
LOGGER.error(
"{}-{} [Compaction] [Task-id: {}] Failed to pass the check of resources. "
+ "Source sequence files is {}, unsequence files is {}, target files is {}. Terminate the system.",
storageGroupName,
dataRegionId,
compactionId,
selectedSequenceFiles,
selectedUnsequenceFiles,
targetTsfileResourceList);
System.exit(-1);
}

releaseReadAndLockWrite(selectedSequenceFiles);
releaseReadAndLockWrite(selectedUnsequenceFiles);

Expand Down Expand Up @@ -205,22 +223,28 @@ protected void doCompaction() {
}
long costTime = (System.currentTimeMillis() - startTime) / 1000;
LOGGER.info(
"{}-{} [Compaction] CrossSpaceCompaction task finishes successfully, time cost is {} s, compaction speed is {} MB/s",
"{}-{} [Compaction] [Task-id: {}] CrossSpaceCompaction task finishes successfully, time cost is {} s, compaction speed is {} MB/s",
storageGroupName,
dataRegionId,
compactionId,
costTime,
(selectedSeqFileSize + selectedUnseqFileSize) / 1024 / 1024 / costTime);
}
} catch (Throwable throwable) {
// catch throwable to handle OOM errors
if (!(throwable instanceof InterruptedException)) {
LOGGER.error(
"{}-{} [Compaction] Meet errors in cross space compaction.",
"{}-{} [Compaction] [Task-id: {}] Meet errors in cross space compaction.",
storageGroupName,
dataRegionId,
compactionId,
throwable);
} else {
LOGGER.warn("{}-{} [Compaction] Compaction interrupted", storageGroupName, dataRegionId);
LOGGER.warn(
"{}-{} [Compaction] [Task-id: {}] Compaction interrupted",
storageGroupName,
dataRegionId,
compactionId);
// clean the interrupted flag
Thread.interrupted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ protected void doCompaction() {
// get resource of target file
String dataDirectory = selectedTsFileResourceList.get(0).getTsFile().getParent();
LOGGER.info(
"{}-{} [Compaction] InnerSpaceCompaction task starts with {} files",
"{}-{} [Compaction] [Task-id: {}] InnerSpaceCompaction task starts with {} files",
storageGroupName,
dataRegionId,
compactionId,
selectedTsFileResourceList.size());
try {
targetTsFileResource =
Expand All @@ -133,11 +134,15 @@ protected void doCompaction() {
"{}-{} [InnerSpaceCompactionTask] Close the logger", storageGroupName, dataRegionId);
compactionLogger.close();
LOGGER.info(
"{}-{} [Compaction] compaction with {}",
"{}-{} [Compaction] [Task-id: {}] compaction with {}",
storageGroupName,
dataRegionId,
compactionId,
selectedTsFileResourceList);

// take a snapshot for all tsfile in this time partition
CompactionUtils.takeSnapshot(
tsFileManager, storageGroupName + "-" + dataRegionId, timePartition, compactionId);
// carry out the compaction
performer.setSourceFiles(selectedTsFileResourceList);
// As elements in targetFiles may be removed in ReadPointCompactionPerformer, we should use a
Expand All @@ -149,15 +154,18 @@ protected void doCompaction() {
CompactionUtils.moveTargetFile(targetTsFileList, true, storageGroupName + "-" + dataRegionId);

LOGGER.info(
"{}-{} [InnerSpaceCompactionTask] start to rename mods file",
"{}-{} [InnerSpaceCompactionTask] [Task-id: {}] start to rename mods file",
storageGroupName,
dataRegionId);
dataRegionId,
compactionId);
CompactionUtils.combineModsInInnerCompaction(
selectedTsFileResourceList, targetTsFileResource);

if (Thread.currentThread().isInterrupted() || summary.isCancel()) {
throw new InterruptedException(
String.format("%s-%s [Compaction] abort", storageGroupName, dataRegionId));
String.format(
"%s-%s [Compaction] [Task-id: %d] abort",
storageGroupName, dataRegionId, compactionId));
}

// replace the old files with new file, the new is in same position as the old
Expand All @@ -177,10 +185,24 @@ protected void doCompaction() {
false);
}

if (!CompactionUtils.validateTsFileResources(
tsFileManager, storageGroupName, timePartition, compactionId)) {
LOGGER.error(
"{}-{} [Compaction] [Task-id: {}] Failed to pass the check of resources. "
+ "Source files is {}, target file is {}. Terminate the system.",
storageGroupName,
dataRegionId,
compactionId,
selectedTsFileResourceList,
targetTsFileList);
System.exit(-1);
}

LOGGER.info(
"{}-{} [Compaction] Compacted target files, try to get the write lock of source files",
"{}-{} [Compaction] [Task-id: {}] Compacted target files, try to get the write lock of source files",
storageGroupName,
dataRegionId);
dataRegionId,
compactionId);

// release the read lock of all source files, and get the write lock of them to delete them
for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
Expand All @@ -201,9 +223,10 @@ protected void doCompaction() {
}

LOGGER.info(
"{}-{} [Compaction] compaction finish, start to delete old files",
"{}-{} [Compaction] [Task-id: {}] compaction finish, start to delete old files",
storageGroupName,
dataRegionId);
dataRegionId,
compactionId);
// delete the old files
long totalSizeOfDeletedFile = 0L;
for (TsFileResource resource : selectedTsFileResourceList) {
Expand Down Expand Up @@ -237,10 +260,11 @@ protected void doCompaction() {

double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
LOGGER.info(
"{}-{} [Compaction] InnerSpaceCompaction task finishes successfully, target file is {},"
"{}-{} [Compaction] [Task-id: {}] InnerSpaceCompaction task finishes successfully, target file is {},"
+ "time cost is {} s, compaction speed is {} MB/s",
storageGroupName,
dataRegionId,
compactionId,
targetTsFileResource.getTsFile().getName(),
costTime,
((double) selectedFileSize) / 1024.0d / 1024.0d / costTime);
Expand All @@ -252,13 +276,18 @@ protected void doCompaction() {
// catch throwable to handle OOM errors
if (!(throwable instanceof InterruptedException)) {
LOGGER.error(
"{}-{} [Compaction] Meet errors in inner space compaction.",
"{}-{} [Compaction] [Task-id: {}] Meet errors in inner space compaction.",
storageGroupName,
dataRegionId,
compactionId,
throwable);
} else {
// clean the interrupt flag
LOGGER.warn("{}-{} [Compaction] Compaction interrupted", storageGroupName, dataRegionId);
LOGGER.warn(
"{}-{} [Compaction] [Task-id: {}] Compaction interrupted",
storageGroupName,
dataRegionId,
compactionId);
Thread.interrupted();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public abstract class AbstractCompactionTask {
protected int hashCode = -1;
protected CompactionTaskSummary summary = new CompactionTaskSummary();
protected long serialId;
protected long compactionId;

public AbstractCompactionTask(
String storageGroupName,
Expand All @@ -62,6 +63,7 @@ public AbstractCompactionTask(
this.tsFileManager = tsFileManager;
this.currentTaskNum = currentTaskNum;
this.serialId = serialId;
this.compactionId = CompactionTaskManager.compactionId.getAndIncrement();
}

public abstract void setSourceFilesToCompactionCandidate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ void moveTo(File targetDir) {

@Override
public String toString() {
return String.format("file is %s, status: %s", file.toString(), status);
return String.format("%s ", file.toString());
}

@Override
Expand Down