From 61c7c210858de56190c2fab8dda3aa1d1ee265e9 Mon Sep 17 00:00:00 2001 From: Jiwon Park Date: Thu, 12 Feb 2026 16:55:04 +0900 Subject: [PATCH] [AMORO-4044] Fix partition mapping bug in TableEntriesScan for tables with evolved PartitionSpec The entries metadata table returns a unified super-struct partition containing fields from all PartitionSpecs. buildDataFile() and buildDeleteFile() passed this directly to withPartition(), causing partition mismatch after spec evolution. Fix by projecting the unified partition to the spec-specific type using StructProjection. Signed-off-by: Jiwon Park --- .../apache/amoro/scan/TableEntriesScan.java | 21 +++++++- .../amoro/scan/TestTableEntriesScan.java | 50 +++++++++++++++++++ 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/scan/TableEntriesScan.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/scan/TableEntriesScan.java index 2ae47f5b2c..0628568eae 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/scan/TableEntriesScan.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/scan/TableEntriesScan.java @@ -37,6 +37,7 @@ import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.Metrics; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Partitioning; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; @@ -46,6 +47,7 @@ import org.apache.iceberg.expressions.InclusiveMetricsEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,6 +79,7 @@ public class TableEntriesScan { private InclusiveMetricsEvaluator lazyMetricsEvaluator = null; private Map lazyIndexOfDataFileType; private Map lazyIndexOfEntryType; + private Types.StructType lazyUnifiedPartitionType; public static Builder builder(Table table) { return new Builder(table); @@ -340,7 +343,7 @@ private DataFile buildDataFile(StructLike fileRecord) { if (spec.isPartitioned()) { StructLike partition = fileRecord.get(dataFileFieldIndex(DataFile.PARTITION_NAME), StructLike.class); - builder.withPartition(partition); + builder.withPartition(projectPartition(spec, partition)); } return builder.build(); } @@ -372,7 +375,7 @@ private DeleteFile buildDeleteFile(StructLike fileRecord, FileContent fileConten if (spec.isPartitioned()) { StructLike partition = fileRecord.get(dataFileFieldIndex(DataFile.PARTITION_NAME), StructLike.class); - builder.withPartition(partition); + builder.withPartition(projectPartition(spec, partition)); } if (fileContent == FileContent.EQUALITY_DELETES) { builder.ofEqualityDeletes(); @@ -429,6 +432,20 @@ private int dataFileFieldIndex(String fieldName) { return lazyIndexOfDataFileType.get(fieldName); } + private StructLike projectPartition(PartitionSpec spec, StructLike partition) { + StructProjection projected = + StructProjection.createAllowMissing(unifiedPartitionType(), spec.partitionType()); + projected.wrap(partition); + return projected; + } + + private Types.StructType unifiedPartitionType() { + if (lazyUnifiedPartitionType == null) { + lazyUnifiedPartitionType = Partitioning.partitionType(table); + } + return lazyUnifiedPartitionType; + } + private InclusiveMetricsEvaluator metricsEvaluator() { if (lazyMetricsEvaluator == null) { if (dataFilter != null) { diff --git a/amoro-format-iceberg/src/test/java/org/apache/amoro/scan/TestTableEntriesScan.java b/amoro-format-iceberg/src/test/java/org/apache/amoro/scan/TestTableEntriesScan.java index 3896c24991..f261baa88b 100644 --- a/amoro-format-iceberg/src/test/java/org/apache/amoro/scan/TestTableEntriesScan.java +++ b/amoro-format-iceberg/src/test/java/org/apache/amoro/scan/TestTableEntriesScan.java @@ -25,10 +25,13 @@ import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables; import org.apache.amoro.utils.ManifestEntryFields; import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.DeleteFiles; import org.apache.iceberg.FileContent; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.data.Record; @@ -186,6 +189,53 @@ public void testScanEntriesFromSequence() throws IOException { Assert.assertEquals(1, cnt); } + @Test + public void testScanEntriesWithPartitionSpecEvolution() throws IOException { + // base table has data files under spec 0: day(op_time) + Table baseTable = getMixedTable().asKeyedTable().baseTable(); + PartitionSpec originalSpec = baseTable.spec(); + + // evolve partition spec: add identity(id) + baseTable.updateSpec().addField("id").commit(); + PartitionSpec newSpec = baseTable.spec(); + Assert.assertNotEquals(originalSpec.specId(), newSpec.specId()); + + // append a data file under the new spec + DataFile newSpecFile = + DataFiles.builder(newSpec) + .withPath("/tmp/test-partition-evolution/data-new-spec.parquet") + .withFileSizeInBytes(100) + .withRecordCount(1) + .withPartitionPath("op_time_day=2022-01-01/id=1") + .build(); + baseTable.newAppend().appendFile(newSpecFile).commit(); + + // scan all entries - without the fix this would fail due to partition struct mismatch + TableEntriesScan entriesScan = + TableEntriesScan.builder(baseTable) + .includeFileContent( + FileContent.DATA, FileContent.POSITION_DELETES, FileContent.EQUALITY_DELETES) + .build(); + + int count = 0; + try (CloseableIterable entries = entriesScan.entries()) { + for (IcebergFileEntry entry : entries) { + count++; + ContentFile file = entry.getFile(); + Assert.assertNotNull(file.partition()); + + // verify partition values are not null for partitioned files + PartitionSpec fileSpec = baseTable.specs().get(file.specId()); + if (fileSpec.isPartitioned()) { + // op_time_day should be present for all specs + Assert.assertNotNull(file.partition().get(0, Object.class)); + } + } + } + // original: 4 data + 1 pos-delete + 1 new data = 6 + Assert.assertEquals(6, count); + } + private List writeIntoBase() throws IOException { long transactionId = getMixedTable().asKeyedTable().beginTransaction(""); GenericBaseTaskWriter writer =