Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
Expand All @@ -46,6 +47,7 @@
import org.apache.iceberg.expressions.InclusiveMetricsEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructProjection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -77,6 +79,7 @@ public class TableEntriesScan {
private InclusiveMetricsEvaluator lazyMetricsEvaluator = null;
private Map<String, Integer> lazyIndexOfDataFileType;
private Map<String, Integer> lazyIndexOfEntryType;
private Types.StructType lazyUnifiedPartitionType;

public static Builder builder(Table table) {
return new Builder(table);
Expand Down Expand Up @@ -340,7 +343,7 @@ private DataFile buildDataFile(StructLike fileRecord) {
if (spec.isPartitioned()) {
StructLike partition =
fileRecord.get(dataFileFieldIndex(DataFile.PARTITION_NAME), StructLike.class);
builder.withPartition(partition);
builder.withPartition(projectPartition(spec, partition));
}
return builder.build();
}
Expand Down Expand Up @@ -372,7 +375,7 @@ private DeleteFile buildDeleteFile(StructLike fileRecord, FileContent fileConten
if (spec.isPartitioned()) {
StructLike partition =
fileRecord.get(dataFileFieldIndex(DataFile.PARTITION_NAME), StructLike.class);
builder.withPartition(partition);
builder.withPartition(projectPartition(spec, partition));
}
if (fileContent == FileContent.EQUALITY_DELETES) {
builder.ofEqualityDeletes();
Expand Down Expand Up @@ -429,6 +432,20 @@ private int dataFileFieldIndex(String fieldName) {
return lazyIndexOfDataFileType.get(fieldName);
}

private StructLike projectPartition(PartitionSpec spec, StructLike partition) {
StructProjection projected =
StructProjection.createAllowMissing(unifiedPartitionType(), spec.partitionType());
projected.wrap(partition);
return projected;
}

private Types.StructType unifiedPartitionType() {
if (lazyUnifiedPartitionType == null) {
lazyUnifiedPartitionType = Partitioning.partitionType(table);
}
return lazyUnifiedPartitionType;
}

private InclusiveMetricsEvaluator metricsEvaluator() {
if (lazyMetricsEvaluator == null) {
if (dataFilter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
import org.apache.amoro.utils.ManifestEntryFields;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.Record;
Expand Down Expand Up @@ -186,6 +189,53 @@ public void testScanEntriesFromSequence() throws IOException {
Assert.assertEquals(1, cnt);
}

@Test
public void testScanEntriesWithPartitionSpecEvolution() throws IOException {
// base table has data files under spec 0: day(op_time)
Table baseTable = getMixedTable().asKeyedTable().baseTable();
PartitionSpec originalSpec = baseTable.spec();

// evolve partition spec: add identity(id)
baseTable.updateSpec().addField("id").commit();
PartitionSpec newSpec = baseTable.spec();
Assert.assertNotEquals(originalSpec.specId(), newSpec.specId());

// append a data file under the new spec
DataFile newSpecFile =
DataFiles.builder(newSpec)
.withPath("/tmp/test-partition-evolution/data-new-spec.parquet")
.withFileSizeInBytes(100)
.withRecordCount(1)
.withPartitionPath("op_time_day=2022-01-01/id=1")
.build();
baseTable.newAppend().appendFile(newSpecFile).commit();

// scan all entries - without the fix this would fail due to partition struct mismatch
TableEntriesScan entriesScan =
TableEntriesScan.builder(baseTable)
.includeFileContent(
FileContent.DATA, FileContent.POSITION_DELETES, FileContent.EQUALITY_DELETES)
.build();

int count = 0;
try (CloseableIterable<IcebergFileEntry> entries = entriesScan.entries()) {
for (IcebergFileEntry entry : entries) {
count++;
ContentFile<?> file = entry.getFile();
Assert.assertNotNull(file.partition());

// verify partition values are not null for partitioned files
PartitionSpec fileSpec = baseTable.specs().get(file.specId());
if (fileSpec.isPartitioned()) {
// op_time_day should be present for all specs
Assert.assertNotNull(file.partition().get(0, Object.class));
}
}
}
// original: 4 data + 1 pos-delete + 1 new data = 6
Assert.assertEquals(6, count);
}

private List<DataFile> writeIntoBase() throws IOException {
long transactionId = getMixedTable().asKeyedTable().beginTransaction("");
GenericBaseTaskWriter writer =
Expand Down
Loading