From d29938644dee83906da4cf60e86ce547cec09e28 Mon Sep 17 00:00:00 2001 From: Guozhenyang Date: Thu, 15 Jan 2026 18:57:30 +0800 Subject: [PATCH] [AMORO-4044] Return correct partition for delete files in iceberg tables whose partition spec have changed --- .../iceberg/utils/IcebergTableUtil.java | 33 ++-- .../iceberg/utils/IcebergTableUtilTest.java | 149 ++++++++++++++++++ 2 files changed, 168 insertions(+), 14 deletions(-) create mode 100644 amoro-format-iceberg/src/test/java/org/apache/amoro/formats/iceberg/utils/IcebergTableUtilTest.java diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/utils/IcebergTableUtil.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/utils/IcebergTableUtil.java index 23a5b48a28..6cd291b260 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/utils/IcebergTableUtil.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/utils/IcebergTableUtil.java @@ -25,12 +25,14 @@ import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.utils.TableFileUtil; -import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataOperations; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestReader; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.ReachableFileUtil; @@ -40,6 +42,7 @@ import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableScan; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -141,21 +144,23 @@ public static Set getDanglingDeleteFiles(Table internalTable) { } Set danglingDeleteFiles = new HashSet<>(); - TableEntriesScan entriesScan = - TableEntriesScan.builder(internalTable) - .useSnapshot(snapshotId) - .includeFileContent(FileContent.EQUALITY_DELETES, FileContent.POSITION_DELETES) - .build(); - try (CloseableIterable entries = entriesScan.entries()) { - for (IcebergFileEntry entry : entries) { - ContentFile file = entry.getFile(); - String path = file.path().toString(); - if (!deleteFilesPath.contains(path)) { - danglingDeleteFiles.add((DeleteFile) file); + List deleteManifests = + internalTable.snapshot(snapshotId).deleteManifests(internalTable.io()); + for (ManifestFile manifest : deleteManifests) { + try (ManifestReader reader = + ManifestFiles.readDeleteManifest(manifest, internalTable.io(), internalTable.specs())) { + try (CloseableIterator iterable = reader.iterator()) { + while (iterable.hasNext()) { + DeleteFile deleteFile = iterable.next(); + String path = deleteFile.path().toString(); + if (!deleteFilesPath.contains(path)) { + danglingDeleteFiles.add(deleteFile); + } + } } + } catch (IOException e) { + throw new RuntimeException(e); } - } catch (IOException e) { - throw new RuntimeException("Error when fetch iceberg entries", e); } return danglingDeleteFiles; diff --git a/amoro-format-iceberg/src/test/java/org/apache/amoro/formats/iceberg/utils/IcebergTableUtilTest.java b/amoro-format-iceberg/src/test/java/org/apache/amoro/formats/iceberg/utils/IcebergTableUtilTest.java new file mode 100644 index 0000000000..65d3eba149 --- /dev/null +++ b/amoro-format-iceberg/src/test/java/org/apache/amoro/formats/iceberg/utils/IcebergTableUtilTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.formats.iceberg.utils; + +import org.apache.amoro.BasicTableTestHelper; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableTestHelper; +import org.apache.amoro.catalog.BasicCatalogTestHelper; +import org.apache.amoro.catalog.TableTestBase; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.amoro.table.PrimaryKeySpec; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +public class IcebergTableUtilTest extends TableTestBase { + + public static final Schema TABLE_SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + public static final Schema EQUALITY_DELETE_ROW_SCHEMA = TABLE_SCHEMA.select("id"); + + public static final PrimaryKeySpec PRIMARY_KEY_SPEC = + PrimaryKeySpec.builderFor(TABLE_SCHEMA).addColumn("id").build(); + + public static final PartitionSpec SPEC_0 = + PartitionSpec.builderFor(TABLE_SCHEMA).bucket("id", 5, "id_bucket_5").build(); + + public static final PartitionSpec SPEC_1 = + PartitionSpec.builderFor(TABLE_SCHEMA).bucket("id", 10, "id_bucket_10").build(); + + public static final Map TABLE_PROPERTIES = Maps.newHashMapWithExpectedSize(1); + + static { + TABLE_PROPERTIES.put(TableProperties.FORMAT_VERSION, "2"); + } + + public IcebergTableUtilTest() { + super( + new BasicCatalogTestHelper(TableFormat.ICEBERG), + new BasicTableTestHelper(TABLE_SCHEMA, PRIMARY_KEY_SPEC, SPEC_0, TABLE_PROPERTIES)); + } + + @Test + public void getDanglingDeleteFiles() throws IOException { + Table table = + getIcebergCatalog() + .loadTable( + TableIdentifier.of(TableTestHelper.TEST_DB_NAME, TableTestHelper.TEST_TABLE_NAME)); + GenericRecord record0 = GenericRecord.create(TABLE_SCHEMA); + record0.set(0, 0); + record0.set(1, "spec=0"); + OutputFileFactory outputFileFactory0 = + OutputFileFactory.builderFor(table, 0, 1).format(FileFormat.PARQUET).build(); + PartitionKey partitionKey0 = new PartitionKey(SPEC_0, TABLE_SCHEMA); + partitionKey0.partition(record0); + DataFile dataFile0 = + FileHelpers.writeDataFile( + table, + outputFileFactory0.newOutputFile(partitionKey0).encryptingOutputFile(), + partitionKey0, + Collections.singletonList(record0)); + DeleteFile deleteFile0 = + FileHelpers.writeDeleteFile( + table, + outputFileFactory0.newOutputFile(partitionKey0).encryptingOutputFile(), + partitionKey0, + Collections.singletonList(record0), + EQUALITY_DELETE_ROW_SCHEMA); + RowDelta rowDelta0 = table.newRowDelta(); + rowDelta0.addRows(dataFile0); + rowDelta0.addDeletes(deleteFile0); + rowDelta0.commit(); + + table + .updateSpec() + .removeField("id_bucket_5") + .addField("id_bucket_10", Expressions.bucket("id", 10)) + .commit(); + + GenericRecord record1 = GenericRecord.create(TABLE_SCHEMA); + record1.set(0, 5); + record1.set(1, "spec=1"); + OutputFileFactory outputFileFactory1 = + OutputFileFactory.builderFor(table, 1, 2).format(FileFormat.PARQUET).build(); + PartitionKey partitionKey1 = new PartitionKey(SPEC_1, TABLE_SCHEMA); + partitionKey1.partition(record1); + DataFile dataFile1 = + FileHelpers.writeDataFile( + table, + outputFileFactory1.newOutputFile(partitionKey1).encryptingOutputFile(), + partitionKey1, + Collections.singletonList(record1)); + DeleteFile deleteFile1 = + FileHelpers.writeDeleteFile( + table, + outputFileFactory1.newOutputFile(partitionKey1).encryptingOutputFile(), + partitionKey1, + Collections.singletonList(record1), + EQUALITY_DELETE_ROW_SCHEMA); + RowDelta rowDelta1 = table.newRowDelta(); + rowDelta1.addRows(dataFile1); + rowDelta1.addDeletes(deleteFile1); + rowDelta1.commit(); + + Set deleteFileSet = IcebergTableUtil.getDanglingDeleteFiles(table); + for (DeleteFile deleteFile : deleteFileSet) { + StructLike partition = deleteFile.partition(); + Assert.assertNotNull(partition.get(0, Integer.class)); + } + } +}