Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 44 additions & 30 deletions api/src/main/java/org/apache/iceberg/PartitionStatistics.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.types.Types;

/** Interface for partition statistics returned from a {@link PartitionStatisticsScan}. */
Expand Down Expand Up @@ -57,7 +58,6 @@ public interface PartitionStatistics extends StructLike {
.build();

static Schema schema(Types.StructType unifiedPartitionType, int formatVersion) {
Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned");
Preconditions.checkState(formatVersion > 0, "Invalid format version: %d", formatVersion);

if (formatVersion <= 2) {
Expand All @@ -68,50 +68,64 @@ static Schema schema(Types.StructType unifiedPartitionType, int formatVersion) {
}

private static Schema v2Schema(Types.StructType unifiedPartitionType) {
return new Schema(
Types.NestedField.required(
EMPTY_PARTITION_FIELD.fieldId(), EMPTY_PARTITION_FIELD.name(), unifiedPartitionType),
SPEC_ID,
DATA_RECORD_COUNT,
DATA_FILE_COUNT,
TOTAL_DATA_FILE_SIZE_IN_BYTES,
POSITION_DELETE_RECORD_COUNT,
POSITION_DELETE_FILE_COUNT,
EQUALITY_DELETE_RECORD_COUNT,
EQUALITY_DELETE_FILE_COUNT,
TOTAL_RECORD_COUNT,
LAST_UPDATED_AT,
LAST_UPDATED_SNAPSHOT_ID);
ImmutableList.Builder<Types.NestedField> fields = ImmutableList.builder();
if (!unifiedPartitionType.fields().isEmpty()) {
fields.add(
Types.NestedField.required(
EMPTY_PARTITION_FIELD.fieldId(), EMPTY_PARTITION_FIELD.name(), unifiedPartitionType));
}

fields.add(SPEC_ID);
fields.add(DATA_RECORD_COUNT);
fields.add(DATA_FILE_COUNT);
fields.add(TOTAL_DATA_FILE_SIZE_IN_BYTES);
fields.add(POSITION_DELETE_RECORD_COUNT);
fields.add(POSITION_DELETE_FILE_COUNT);
fields.add(EQUALITY_DELETE_RECORD_COUNT);
fields.add(EQUALITY_DELETE_FILE_COUNT);
fields.add(TOTAL_RECORD_COUNT);
fields.add(LAST_UPDATED_AT);
fields.add(LAST_UPDATED_SNAPSHOT_ID);
return new Schema(fields.build());
}

private static Schema v3Schema(Types.StructType unifiedPartitionType) {
return new Schema(
Types.NestedField.required(
EMPTY_PARTITION_FIELD.fieldId(), EMPTY_PARTITION_FIELD.name(), unifiedPartitionType),
SPEC_ID,
DATA_RECORD_COUNT,
DATA_FILE_COUNT,
TOTAL_DATA_FILE_SIZE_IN_BYTES,
ImmutableList.Builder<Types.NestedField> fields = ImmutableList.builder();
if (!unifiedPartitionType.fields().isEmpty()) {
fields.add(
Types.NestedField.required(
EMPTY_PARTITION_FIELD.fieldId(), EMPTY_PARTITION_FIELD.name(), unifiedPartitionType));
}

fields.add(SPEC_ID);
fields.add(DATA_RECORD_COUNT);
fields.add(DATA_FILE_COUNT);
fields.add(TOTAL_DATA_FILE_SIZE_IN_BYTES);
fields.add(
Types.NestedField.required(
POSITION_DELETE_RECORD_COUNT.fieldId(),
POSITION_DELETE_RECORD_COUNT.name(),
Types.LongType.get()),
Types.LongType.get()));
fields.add(
Types.NestedField.required(
POSITION_DELETE_FILE_COUNT.fieldId(),
POSITION_DELETE_FILE_COUNT.name(),
Types.IntegerType.get()),
Types.IntegerType.get()));
fields.add(
Types.NestedField.required(
EQUALITY_DELETE_RECORD_COUNT.fieldId(),
EQUALITY_DELETE_RECORD_COUNT.name(),
Types.LongType.get()),
Types.LongType.get()));
fields.add(
Types.NestedField.required(
EQUALITY_DELETE_FILE_COUNT.fieldId(),
EQUALITY_DELETE_FILE_COUNT.name(),
Types.IntegerType.get()),
TOTAL_RECORD_COUNT,
LAST_UPDATED_AT,
LAST_UPDATED_SNAPSHOT_ID,
DV_COUNT);
Types.IntegerType.get()));
fields.add(TOTAL_RECORD_COUNT);
fields.add(LAST_UPDATED_AT);
fields.add(LAST_UPDATED_SNAPSHOT_ID);
fields.add(DV_COUNT);
return new Schema(fields.build());
}

/* The positions of each statistics within the full schema of partition statistics. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ public class BasePartitionStatistics extends SupportsIndexProjection

private static final int STATS_COUNT = 13;

private static final Types.StructType BASE_TYPE =
Types.StructType.of(
EMPTY_PARTITION_FIELD,
SPEC_ID,
DATA_RECORD_COUNT,
DATA_FILE_COUNT,
TOTAL_DATA_FILE_SIZE_IN_BYTES,
POSITION_DELETE_RECORD_COUNT,
POSITION_DELETE_FILE_COUNT,
EQUALITY_DELETE_RECORD_COUNT,
EQUALITY_DELETE_FILE_COUNT,
TOTAL_RECORD_COUNT,
LAST_UPDATED_AT,
LAST_UPDATED_SNAPSHOT_ID,
DV_COUNT);

BasePartitionStatistics(StructLike partition, int specId) {
super(STATS_COUNT);

Expand All @@ -58,7 +74,7 @@ public class BasePartitionStatistics extends SupportsIndexProjection

/** Used by internal readers to instantiate this class with a projection schema. */
BasePartitionStatistics(Types.StructType projection) {
super(STATS_COUNT);
super(BASE_TYPE, projection);
}

@Override
Expand Down
139 changes: 107 additions & 32 deletions core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ private PartitionStatsHandler() {}
*/
@Deprecated
public static Schema schema(StructType unifiedPartitionType, int formatVersion) {
Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned");
Preconditions.checkState(
formatVersion > 0 && formatVersion <= TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION,
"Invalid format version: %d",
Expand All @@ -197,48 +196,62 @@ public static Schema schema(StructType unifiedPartitionType, int formatVersion)
}

private static Schema v2Schema(StructType unifiedPartitionType) {
return new Schema(
NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType),
SPEC_ID,
DATA_RECORD_COUNT,
DATA_FILE_COUNT,
TOTAL_DATA_FILE_SIZE_IN_BYTES,
POSITION_DELETE_RECORD_COUNT,
POSITION_DELETE_FILE_COUNT,
EQUALITY_DELETE_RECORD_COUNT,
EQUALITY_DELETE_FILE_COUNT,
TOTAL_RECORD_COUNT,
LAST_UPDATED_AT,
LAST_UPDATED_SNAPSHOT_ID);
List<NestedField> fields = Lists.newArrayList();
if (!unifiedPartitionType.fields().isEmpty()) {
fields.add(
NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType));
}

fields.add(SPEC_ID);
fields.add(DATA_RECORD_COUNT);
fields.add(DATA_FILE_COUNT);
fields.add(TOTAL_DATA_FILE_SIZE_IN_BYTES);
fields.add(POSITION_DELETE_RECORD_COUNT);
fields.add(POSITION_DELETE_FILE_COUNT);
fields.add(EQUALITY_DELETE_RECORD_COUNT);
fields.add(EQUALITY_DELETE_FILE_COUNT);
fields.add(TOTAL_RECORD_COUNT);
fields.add(LAST_UPDATED_AT);
fields.add(LAST_UPDATED_SNAPSHOT_ID);
return new Schema(fields);
}

private static Schema v3Schema(StructType unifiedPartitionType) {
return new Schema(
NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType),
SPEC_ID,
DATA_RECORD_COUNT,
DATA_FILE_COUNT,
TOTAL_DATA_FILE_SIZE_IN_BYTES,
List<NestedField> fields = Lists.newArrayList();
if (!unifiedPartitionType.fields().isEmpty()) {
fields.add(
NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType));
}

fields.add(SPEC_ID);
fields.add(DATA_RECORD_COUNT);
fields.add(DATA_FILE_COUNT);
fields.add(TOTAL_DATA_FILE_SIZE_IN_BYTES);
fields.add(
NestedField.required(
POSITION_DELETE_RECORD_COUNT.fieldId(),
POSITION_DELETE_RECORD_COUNT.name(),
LongType.get()),
LongType.get()));
fields.add(
NestedField.required(
POSITION_DELETE_FILE_COUNT.fieldId(),
POSITION_DELETE_FILE_COUNT.name(),
IntegerType.get()),
IntegerType.get()));
fields.add(
NestedField.required(
EQUALITY_DELETE_RECORD_COUNT.fieldId(),
EQUALITY_DELETE_RECORD_COUNT.name(),
LongType.get()),
LongType.get()));
fields.add(
NestedField.required(
EQUALITY_DELETE_FILE_COUNT.fieldId(),
EQUALITY_DELETE_FILE_COUNT.name(),
IntegerType.get()),
TOTAL_RECORD_COUNT,
LAST_UPDATED_AT,
LAST_UPDATED_SNAPSHOT_ID,
DV_COUNT);
IntegerType.get()));
fields.add(TOTAL_RECORD_COUNT);
fields.add(LAST_UPDATED_AT);
fields.add(LAST_UPDATED_SNAPSHOT_ID);
fields.add(DV_COUNT);
return new Schema(fields);
}

/**
Expand Down Expand Up @@ -278,7 +291,6 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) thro
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long snapshotId)
throws IOException {
Preconditions.checkArgument(table != null, "Invalid table: null");
Preconditions.checkArgument(Partitioning.isPartitioned(table), "Table must be partitioned");
Snapshot snapshot = table.snapshot(snapshotId);
Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", snapshotId);

Expand Down Expand Up @@ -331,9 +343,14 @@ static PartitionStatisticsFile writePartitionStatsFile(

OutputFile outputFile = newPartitionStatsFile(table, fileFormat, snapshotId);

boolean hasPartition =
dataSchema.findField(PartitionStatistics.EMPTY_PARTITION_FIELD.fieldId()) != null;
try (FileAppender<StructLike> writer =
InternalData.write(fileFormat, outputFile).schema(dataSchema).build()) {
records.iterator().forEachRemaining(writer::add);
records
.iterator()
.forEachRemaining(
record -> writer.add(toGenericRecord(record, dataSchema, hasPartition)));
}

return ImmutableGenericPartitionStatisticsFile.builder()
Expand All @@ -343,6 +360,32 @@ static PartitionStatisticsFile writePartitionStatsFile(
.build();
}

private static GenericRecord toGenericRecord(
PartitionStatistics stats, Schema dataSchema, boolean hasPartition) {
GenericRecord record = GenericRecord.create(dataSchema.asStruct());
int pos = 0;
if (hasPartition) {
record.set(pos++, stats.partition());
}

record.set(pos++, stats.specId());
record.set(pos++, stats.dataRecordCount());
record.set(pos++, stats.dataFileCount());
record.set(pos++, stats.totalDataFileSizeInBytes());
record.set(pos++, stats.positionDeleteRecordCount());
record.set(pos++, stats.positionDeleteFileCount());
record.set(pos++, stats.equalityDeleteRecordCount());
record.set(pos++, stats.equalityDeleteFileCount());
record.set(pos++, stats.totalRecords());
record.set(pos++, stats.lastUpdatedAt());
record.set(pos++, stats.lastUpdatedSnapshotId());
if (dataSchema.findField(PartitionStatistics.DV_COUNT.fieldId()) != null) {
record.set(pos, stats.dvCount());
}

return record;
}

/**
* Reads partition statistics from the specified {@link InputFile} using given schema.
*
Expand Down Expand Up @@ -402,7 +445,10 @@ private static Collection<PartitionStatistics> computeAndMergeStatsIncremental(
table.newPartitionStatisticsScan().useSnapshot(lastSnapshotWithStats).scan()) {
oldStats.forEach(
partitionStats ->
statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats));
statsMap.put(
partitionStats.specId(),
partitionStats.partition(),
copyWithIdentityMapping(partitionStats)));
} catch (Exception exception) {
throw new InvalidStatsFileException(exception);
}
Expand All @@ -426,6 +472,10 @@ private static Collection<PartitionStatistics> computeAndMergeStatsIncremental(
}

private static GenericRecord partitionDataToRecord(PartitionData data) {
if (data.getPartitionType().fields().isEmpty()) {
return null;
}

GenericRecord record = GenericRecord.create(data.getPartitionType());
for (int index = 0; index < record.size(); index++) {
record.set(index, data.get(index));
Expand Down Expand Up @@ -512,11 +562,12 @@ private static PartitionMap<PartitionStatistics> collectStatsForManifest(
PartitionUtil.coercePartition(partitionType, spec, file.partition());
StructLike key = keyTemplate.copyFor(coercedPartition);
Snapshot snapshot = table.snapshot(entry.snapshotId());
StructLike partitionValue = partitionType.fields().isEmpty() ? null : key;
PartitionStatistics stats =
statsMap.computeIfAbsent(
specId,
((PartitionData) file.partition()).copy(),
() -> new BasePartitionStatistics(key, specId));
() -> new BasePartitionStatistics(partitionValue, specId));
if (entry.isLive()) {
// Live can have both added and existing entries. Consider only added entries for
// incremental compute as existing entries was already included in previous compute.
Expand Down Expand Up @@ -742,6 +793,30 @@ private static void updateSnapshotInfo(
}
}

private static BasePartitionStatistics copyWithIdentityMapping(PartitionStatistics stats) {
BasePartitionStatistics copy = new BasePartitionStatistics(stats.partition(), stats.specId());
copy.set(PartitionStatistics.DATA_RECORD_COUNT_POSITION, stats.dataRecordCount());
copy.set(PartitionStatistics.DATA_FILE_COUNT_POSITION, stats.dataFileCount());
copy.set(
PartitionStatistics.TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION,
stats.totalDataFileSizeInBytes());
copy.set(
PartitionStatistics.POSITION_DELETE_RECORD_COUNT_POSITION,
stats.positionDeleteRecordCount());
copy.set(
PartitionStatistics.POSITION_DELETE_FILE_COUNT_POSITION, stats.positionDeleteFileCount());
copy.set(
PartitionStatistics.EQUALITY_DELETE_RECORD_COUNT_POSITION,
stats.equalityDeleteRecordCount());
copy.set(
PartitionStatistics.EQUALITY_DELETE_FILE_COUNT_POSITION, stats.equalityDeleteFileCount());
copy.set(PartitionStatistics.TOTAL_RECORD_COUNT_POSITION, stats.totalRecords());
copy.set(PartitionStatistics.LAST_UPDATED_AT_POSITION, stats.lastUpdatedAt());
copy.set(PartitionStatistics.LAST_UPDATED_SNAPSHOT_ID_POSITION, stats.lastUpdatedSnapshotId());
copy.set(PartitionStatistics.DV_COUNT_POSITION, stats.dvCount());
return copy;
}

private static class InvalidStatsFileException extends RuntimeException {

InvalidStatsFileException(Throwable cause) {
Expand Down
Loading
Loading