diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java new file mode 100644 index 0000000000000..9b72f49c5f05c --- /dev/null +++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.integration; + +import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.constant.TestConstant; +import org.apache.iotdb.db.engine.StorageEngineV2; +import org.apache.iotdb.db.engine.cache.ChunkCache; +import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache; +import org.apache.iotdb.db.engine.snapshot.SnapshotLoader; +import org.apache.iotdb.db.engine.snapshot.SnapshotTaker; +import org.apache.iotdb.db.engine.snapshot.exception.DirectoryNotLegalException; +import org.apache.iotdb.db.engine.storagegroup.DataRegion; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.integration.env.EnvFactory; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; + +public class IoTDBSnapshotIT { + final String SG_NAME = "root.snapshotTest"; + + @Before + public void setUp() throws Exception { + EnvFactory.getEnv().initBeforeTest(); + IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false); + } + + @After + public void tearDown() throws Exception { + EnvFactory.getEnv().cleanAfterTest(); + IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true); + } + + @Test + public void testTakeSnapshot() + throws SQLException, IllegalPathException, StorageEngineException, IOException, + DirectoryNotLegalException { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("set storage group to " + SG_NAME); + for (int i = 0; i < 10; ++i) { + for (int j = 0; j < 10; ++j) { + statement.execute( + String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j)); + } + statement.execute("flush"); + for (int j = 0; j < 10; ++j) { + statement.execute( + String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j + 1)); + } + statement.execute("flush"); + } + + DataRegion region = StorageEngineV2.getInstance().getDataRegion(new DataRegionId(0)); + File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot"); + if (snapshotDir.exists()) { + FileUtils.forceDelete(snapshotDir); + } + + new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); + + Assert.assertTrue(snapshotDir.exists()); + Assert.assertTrue(snapshotDir.isDirectory()); + File[] seqTsfiles = + snapshotDir.listFiles((dir, name) -> name.endsWith(".tsfile") && name.startsWith("seq")); + File[] unseqTsfiles = + snapshotDir.listFiles( + (dir, name) -> name.endsWith(".tsfile") && name.startsWith("unseq")); + File[] tsfileResources = + snapshotDir.listFiles((dir, name) -> name.endsWith(".tsfile.resource")); + Assert.assertNotNull(seqTsfiles); + Assert.assertNotNull(unseqTsfiles); + Assert.assertNotNull(tsfileResources); + Assert.assertEquals(10, seqTsfiles.length); + Assert.assertEquals(10, unseqTsfiles.length); + Assert.assertEquals(20, tsfileResources.length); + } + } + + @Test(expected = DirectoryNotLegalException.class) + public void testTakeSnapshotInNotEmptyDir() + throws SQLException, IOException, IllegalPathException, StorageEngineException, + DirectoryNotLegalException { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("set storage group to " + SG_NAME); + for (int i = 0; i < 10; ++i) { + for (int j = 0; j < 10; ++j) { + statement.execute( + String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j)); + } + statement.execute("flush"); + for (int j = 0; j < 10; ++j) { + statement.execute( + String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j + 1)); + } + statement.execute("flush"); + } + + DataRegion region = StorageEngineV2.getInstance().getDataRegion(new DataRegionId(0)); + File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot"); + if (!snapshotDir.exists()) { + snapshotDir.mkdirs(); + } + + File tmpFile = new File(snapshotDir, "test"); + tmpFile.createNewFile(); + + new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); + } + } + + @Test + public void testLoadSnapshot() + throws SQLException, MetadataException, StorageEngineException, DirectoryNotLegalException, + IOException { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + Map resultMap = new HashMap<>(); + statement.execute("set storage group to " + SG_NAME); + for (int i = 0; i < 10; ++i) { + for (int j = 0; j < 10; ++j) { + statement.execute( + String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j)); + } + statement.execute("flush"); + for (int j = 0; j < 10; ++j) { + statement.execute( + String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j + 1)); + } + statement.execute("flush"); + } + ResultSet resultSet = statement.executeQuery("select ** from root"); + while (resultSet.next()) { + long time = resultSet.getLong("Time"); + for (int i = 0; i < 10; ++i) { + String measurment = SG_NAME + ".d" + i + ".s"; + int res = resultSet.getInt(SG_NAME + ".d" + i + ".s"); + resultMap.put(time + measurment, res); + } + } + + DataRegion region = StorageEngineV2.getInstance().getDataRegion(new DataRegionId(0)); + File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot"); + if (!snapshotDir.exists()) { + snapshotDir.mkdirs(); + } + new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); + StorageEngineV2.getInstance() + .setDataRegion( + new DataRegionId(0), + new SnapshotLoader(snapshotDir.getAbsolutePath(), SG_NAME, "0") + .loadSnapshotForStateMachine()); + + ChunkCache.getInstance().clear(); + TimeSeriesMetadataCache.getInstance().clear(); + resultSet = statement.executeQuery("select ** from root"); + while (resultSet.next()) { + long time = resultSet.getLong("Time"); + for (int i = 0; i < 10; ++i) { + String measurment = SG_NAME + ".d" + i + ".s"; + int res = resultSet.getInt(SG_NAME + ".d" + i + ".s"); + Assert.assertEquals(resultMap.get(time + measurment).intValue(), res); + } + } + } + } + + @Test + public void testTakeAndLoadSnapshotWhenCompaction() + throws SQLException, MetadataException, StorageEngineException, InterruptedException, + DirectoryNotLegalException, IOException { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + Map resultMap = new HashMap<>(); + statement.execute("set storage group to " + SG_NAME); + for (int i = 0; i < 10; ++i) { + for (int j = 0; j < 10; ++j) { + statement.execute( + String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j)); + } + statement.execute("flush"); + for (int j = 0; j < 10; ++j) { + statement.execute( + String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j + 1)); + } + statement.execute("flush"); + } + + ResultSet resultSet = statement.executeQuery("select ** from root"); + while (resultSet.next()) { + long time = resultSet.getLong("Time"); + for (int i = 0; i < 10; ++i) { + String measurment = SG_NAME + ".d" + i + ".s"; + int res = resultSet.getInt(SG_NAME + ".d" + i + ".s"); + resultMap.put(time + measurment, res); + } + } + + File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot"); + if (!snapshotDir.exists()) { + snapshotDir.mkdirs(); + } + IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true); + statement.execute("merge"); + DataRegion region = StorageEngineV2.getInstance().getDataRegion(new DataRegionId(0)); + new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); + region.abortCompaction(); + StorageEngineV2.getInstance() + .setDataRegion( + new DataRegionId(0), + new SnapshotLoader(snapshotDir.getAbsolutePath(), SG_NAME, "0") + .loadSnapshotForStateMachine()); + ChunkCache.getInstance().clear(); + TimeSeriesMetadataCache.getInstance().clear(); + resultSet = statement.executeQuery("select ** from root"); + while (resultSet.next()) { + long time = resultSet.getLong("Time"); + for (int i = 0; i < 10; ++i) { + String measurment = SG_NAME + ".d" + i + ".s"; + int res = resultSet.getInt(SG_NAME + ".d" + i + ".s"); + Assert.assertEquals(resultMap.get(time + measurment).intValue(), res); + } + } + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java index 89110c4700951..1decc0b84d379 100644 --- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java @@ -20,8 +20,12 @@ package org.apache.iotdb.db.consensus.statemachine; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor; +import org.apache.iotdb.db.engine.StorageEngineV2; +import org.apache.iotdb.db.engine.snapshot.SnapshotLoader; +import org.apache.iotdb.db.engine.snapshot.SnapshotTaker; import org.apache.iotdb.db.engine.storagegroup.DataRegion; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; @@ -39,7 +43,7 @@ public class DataRegionStateMachine extends BaseStateMachine { private static final FragmentInstanceManager QUERY_INSTANCE_MANAGER = FragmentInstanceManager.getInstance(); - private final DataRegion region; + private DataRegion region; public DataRegionStateMachine(DataRegion region) { this.region = region; @@ -53,11 +57,39 @@ public void stop() {} @Override public boolean takeSnapshot(File snapshotDir) { - return false; + try { + return new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); + } catch (Exception e) { + logger.error( + "Exception occurs when taking snapshot for {}-{} in {}", + region.getLogicalStorageGroupName(), + region.getDataRegionId(), + snapshotDir, + e); + return false; + } } @Override - public void loadSnapshot(File latestSnapshotRootDir) {} + public void loadSnapshot(File latestSnapshotRootDir) { + DataRegion newRegion = + new SnapshotLoader( + latestSnapshotRootDir.getAbsolutePath(), + region.getLogicalStorageGroupName(), + region.getDataRegionId()) + .loadSnapshotForStateMachine(); + if (newRegion == null) { + logger.error("Fail to load snapshot from {}", latestSnapshotRootDir); + return; + } + this.region = newRegion; + try { + StorageEngineV2.getInstance() + .setDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId())), region); + } catch (Exception e) { + logger.error("Exception occurs when replacing data region in storage engine.", e); + } + } @Override protected TSStatus write(FragmentInstance fragmentInstance) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java index 30a716512f316..94ea120d20376 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java @@ -601,6 +601,19 @@ public DataRegion getDataRegion(DataRegionId regionId) { return dataRegionMap.get(regionId); } + public void setDataRegion(DataRegionId regionId, DataRegion newRegion) { + if (dataRegionMap.containsKey(regionId)) { + DataRegion oldRegion = dataRegionMap.get(regionId); + oldRegion.syncCloseAllWorkingTsFileProcessors(); + oldRegion.abortCompaction(); + } + dataRegionMap.put(regionId, newRegion); + } + + public TsFileFlushPolicy getFileFlushPolicy() { + return fileFlushPolicy; + } + static class InstanceHolder { private static final StorageEngineV2 INSTANCE = new StorageEngineV2(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java new file mode 100644 index 0000000000000..476a6455335ed --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.snapshot; + +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.conf.directories.DirectoryManager; +import org.apache.iotdb.db.engine.StorageEngineV2; +import org.apache.iotdb.db.engine.storagegroup.DataRegion; +import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; + +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class SnapshotLoader { + private Logger LOGGER = LoggerFactory.getLogger(SnapshotLoader.class); + private String storageGroupName; + private String snapshotPath; + private String dataRegionId; + + public SnapshotLoader(String snapshotPath, String storageGroupName, String dataRegionId) { + this.snapshotPath = snapshotPath; + this.storageGroupName = storageGroupName; + this.dataRegionId = dataRegionId; + } + + private DataRegion loadSnapshot() { + try { + return new DataRegion( + IoTDBDescriptor.getInstance().getConfig().getSystemDir() + + File.separator + + "storage_groups" + + File.separator + + storageGroupName, + dataRegionId, + StorageEngineV2.getInstance().getFileFlushPolicy(), + storageGroupName); + } catch (Exception e) { + LOGGER.error("Exception occurs while load snapshot from {}", snapshotPath, e); + return null; + } + } + + /** + * 1. Clear origin data 2. Move snapshot data to data dir 3. Load data region + * + * @return + */ + public DataRegion loadSnapshotForStateMachine() { + try { + deleteAllFilesInDataDirs(); + } catch (IOException e) { + return null; + } + + // move the snapshot data to data dir + String seqBaseDir = + IoTDBConstant.SEQUENCE_FLODER_NAME + + File.separator + + storageGroupName + + File.separator + + dataRegionId; + String unseqBaseDir = + IoTDBConstant.UNSEQUENCE_FLODER_NAME + + File.separator + + storageGroupName + + File.separator + + dataRegionId; + File sourceDataDir = new File(snapshotPath); + if (sourceDataDir.exists()) { + try { + createLinksFromSnapshotDirToDataDir(sourceDataDir, seqBaseDir, unseqBaseDir); + } catch (IOException | DiskSpaceInsufficientException e) { + LOGGER.error( + "Exception occurs when creating links from snapshot directory to data directory", e); + return null; + } + } + + return loadSnapshot(); + } + + private void deleteAllFilesInDataDirs() throws IOException { + String[] dataDirPaths = IoTDBDescriptor.getInstance().getConfig().getDataDirs(); + + // delete + List timePartitions = new ArrayList<>(); + for (String dataDirPath : dataDirPaths) { + File seqDataDirForThisRegion = + new File( + dataDirPath + + File.separator + + IoTDBConstant.SEQUENCE_FLODER_NAME + + File.separator + + storageGroupName + + File.separator + + dataRegionId); + if (seqDataDirForThisRegion.exists()) { + File[] files = seqDataDirForThisRegion.listFiles(); + if (files != null) { + timePartitions.addAll(Arrays.asList(files)); + } + } + + File unseqDataDirForThisRegion = + new File( + dataDirPath + + File.separator + + IoTDBConstant.UNSEQUENCE_FLODER_NAME + + File.separator + + storageGroupName + + File.separator + + dataRegionId); + + if (unseqDataDirForThisRegion.exists()) { + File[] files = unseqDataDirForThisRegion.listFiles(); + if (files != null) { + timePartitions.addAll(Arrays.asList(files)); + } + } + } + + try { + for (File timePartition : timePartitions) { + FileUtils.forceDelete(timePartition); + } + } catch (IOException e) { + LOGGER.error( + "Exception occurs when deleting time partition directory for {}-{}", + storageGroupName, + dataRegionId, + e); + throw e; + } + } + + private void createLinksFromSnapshotDirToDataDir( + File sourceDir, String seqBaseDir, String unseqBaseDir) + throws IOException, DiskSpaceInsufficientException { + File[] files = sourceDir.listFiles(); + if (files == null) { + return; + } + for (File sourceFile : files) { + String[] fileInfo = sourceFile.getName().split(SnapshotTaker.SNAPSHOT_FILE_INFO_SEP_STR); + if (fileInfo.length != 5) { + continue; + } + boolean seq = fileInfo[0].equals("seq"); + String timePartition = fileInfo[3]; + String fileName = fileInfo[4]; + String nextDataDir = + seq + ? DirectoryManager.getInstance().getNextFolderForSequenceFile() + : DirectoryManager.getInstance().getNextFolderForUnSequenceFile(); + File baseDir = new File(nextDataDir, seq ? seqBaseDir : unseqBaseDir); + File targetDirForThisTimePartition = new File(baseDir, timePartition); + if (!targetDirForThisTimePartition.exists() && !targetDirForThisTimePartition.mkdirs()) { + throw new IOException( + String.format("Failed to make directory %s", targetDirForThisTimePartition)); + } + + File targetFile = new File(targetDirForThisTimePartition, fileName); + try { + Files.createLink(targetFile.toPath(), sourceFile.toPath()); + } catch (IOException e) { + throw new IOException( + String.format("Failed to create hard link from %s to %s", sourceFile, targetFile), e); + } + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java new file mode 100644 index 0000000000000..1842b1b83068d --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.snapshot; + +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.log.CompactionLogger; +import org.apache.iotdb.db.engine.modification.ModificationFile; +import org.apache.iotdb.db.engine.snapshot.exception.DirectoryNotLegalException; +import org.apache.iotdb.db.engine.storagegroup.DataRegion; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.LinkedList; +import java.util.List; + +/** + * SnapshotTaker takes data snapshot for a DataRegion in one time. It does so by creating hard link + * for files or copying them. SnapshotTaker supports two different ways of snapshot: Full Snapshot + * and Incremental Snapshot. The former takes a snapshot for all files in an empty directory, and + * the latter takes a snapshot based on the snapshot that took before. + */ +public class SnapshotTaker { + private static final Logger LOGGER = LoggerFactory.getLogger(SnapshotTaker.class); + private final DataRegion dataRegion; + public static String SNAPSHOT_FILE_INFO_SEP_STR = "_"; + + public SnapshotTaker(DataRegion dataRegion) { + this.dataRegion = dataRegion; + } + + public boolean takeFullSnapshot(String snapshotDirPath, boolean flushBeforeSnapshot) + throws DirectoryNotLegalException, IOException { + File snapshotDir = new File(snapshotDirPath); + if (snapshotDir.exists() + && snapshotDir.listFiles() != null + && snapshotDir.listFiles().length > 0) { + // the directory should be empty or not exists + throw new DirectoryNotLegalException( + String.format("%s already exists and is not empty", snapshotDirPath)); + } + + if (!snapshotDir.exists() && !snapshotDir.mkdirs()) { + throw new IOException(String.format("Failed to create directory %s", snapshotDir)); + } + + if (flushBeforeSnapshot) { + dataRegion.syncCloseAllWorkingTsFileProcessors(); + } + + List timePartitions = dataRegion.getTimePartitions(); + for (Long timePartition : timePartitions) { + List seqDataDirs = getAllDataDirOfOnePartition(true, timePartition); + + try { + createFileSnapshot(seqDataDirs, snapshotDir, true, timePartition); + } catch (IOException e) { + LOGGER.error("Fail to create snapshot", e); + File[] files = snapshotDir.listFiles(); + if (files != null) { + for (File file : files) { + if (!file.delete()) { + LOGGER.error("Failed to delete link file {} after failing to create snapshot", file); + } + } + } + return false; + } + + List unseqDataDirs = getAllDataDirOfOnePartition(false, timePartition); + + try { + createFileSnapshot(unseqDataDirs, snapshotDir, false, timePartition); + } catch (IOException e) { + LOGGER.error("Fail to create snapshot", e); + return false; + } + } + + return true; + } + + private List getAllDataDirOfOnePartition(boolean sequence, long timePartition) { + String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs(); + List resultDirs = new LinkedList<>(); + + for (String dataDir : dataDirs) { + resultDirs.add( + dataDir + + File.separator + + (sequence + ? IoTDBConstant.SEQUENCE_FLODER_NAME + : IoTDBConstant.UNSEQUENCE_FLODER_NAME) + + File.separator + + dataRegion.getLogicalStorageGroupName() + + File.separator + + dataRegion.getDataRegionId() + + File.separator + + timePartition + + File.separator); + } + return resultDirs; + } + + private void createFileSnapshot( + List sourceDirPaths, File targetDir, boolean sequence, long timePartition) + throws IOException { + for (String sourceDirPath : sourceDirPaths) { + File sourceDir = new File(sourceDirPath); + if (!sourceDir.exists()) { + continue; + } + // Collect TsFile, TsFileResource, Mods, CompactionMods + File[] files = + sourceDir.listFiles( + (dir, name) -> + name.endsWith(".tsfile") + || name.endsWith(TsFileResource.RESOURCE_SUFFIX) + || name.endsWith(ModificationFile.FILE_SUFFIX) + || name.endsWith(ModificationFile.COMPACTION_FILE_SUFFIX) + || name.endsWith(CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX) + || name.endsWith(CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX) + || name.endsWith(IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX) + || name.endsWith(IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX)); + if (files == null || files.length == 0) { + continue; + } + + for (File file : files) { + String newFileName = + (sequence ? "seq" : "unseq") + + SNAPSHOT_FILE_INFO_SEP_STR + + dataRegion.getLogicalStorageGroupName() + + SNAPSHOT_FILE_INFO_SEP_STR + + dataRegion.getDataRegionId() + + SNAPSHOT_FILE_INFO_SEP_STR + + timePartition + + SNAPSHOT_FILE_INFO_SEP_STR + + file.getName(); + File linkFile = new File(targetDir, newFileName); + Files.createLink(linkFile.toPath(), file.toPath()); + } + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/exception/DirectoryNotLegalException.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/exception/DirectoryNotLegalException.java new file mode 100644 index 0000000000000..bd4742d9e5e33 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/exception/DirectoryNotLegalException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.snapshot.exception; + +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.rpc.TSStatusCode; + +public class DirectoryNotLegalException extends IoTDBException { + public DirectoryNotLegalException(String message) { + super(message, TSStatusCode.SNAPSHOT_DIR_NOT_LEGAL.getStatusCode()); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index 1fbd2ab5e0a4f..9fbc91165539f 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -3486,6 +3486,10 @@ void call(TsFileResource oldTsFileResource, List newTsFileResour throws WriteProcessException; } + public List getTimePartitions() { + return new ArrayList<>(partitionMaxFileVersions.keySet()); + } + public String getInsertWriteLockHolder() { return insertWriteLockHolder; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java index b185838b320ae..946d8274610a1 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java @@ -71,7 +71,7 @@ public static String generateNewTsFilePathWithMkdir( time, version, innerSpaceCompactionCount, crossSpaceCompactionCount); } - private static String generateTsFileDir( + public static String generateTsFileDir( boolean sequence, String logicalStorageGroup, String virtualStorageGroup, diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index f612f6732b074..bb037da449bef 100644 --- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -83,6 +83,7 @@ public enum TSStatusCode { WRITE_PROCESS_ERROR(412), WRITE_PROCESS_REJECT(413), QUERY_ID_NOT_EXIST(414), + SNAPSHOT_DIR_NOT_LEGAL(415), UNSUPPORTED_INDEX_FUNC_ERROR(421), UNSUPPORTED_INDEX_TYPE_ERROR(422),