diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java index db02a0e96e10..8f2bfe184cab 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java +++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java @@ -516,7 +516,8 @@ private TableMetadata applyChangesToMetadata(TableMetadata metadata) { Set columnProperties = ImmutableSet.of( TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX, - TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX); + TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX, + TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX); Map updatedProperties = PropertyUtil.applySchemaChanges( newMetadata.properties(), deletedColumns, renamedColumns, columnProperties); diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 605e41fe90f1..f16a0ed3da2b 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -174,6 +174,9 @@ private TableProperties() {} public static final String PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column."; + public static final String PARQUET_COLUMN_STATS_ENABLED_PREFIX = + "write.parquet.stats-enabled.column."; + public static final String AVRO_COMPRESSION = "write.avro.compression-codec"; public static final String DELETE_AVRO_COMPRESSION = "write.delete.avro.compression-codec"; public static final String AVRO_COMPRESSION_DEFAULT = "gzip"; diff --git a/core/src/test/java/org/apache/iceberg/TestSchemaAndMappingUpdate.java b/core/src/test/java/org/apache/iceberg/TestSchemaAndMappingUpdate.java index dc6e9e1a7f85..84f1a2ec9043 100644 --- a/core/src/test/java/org/apache/iceberg/TestSchemaAndMappingUpdate.java +++ b/core/src/test/java/org/apache/iceberg/TestSchemaAndMappingUpdate.java @@ -19,6 +19,7 @@ package org.apache.iceberg; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX; +import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -191,6 +192,20 @@ public void testModificationWithParquetBloomConfig() { table.properties().get(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "ID")); } + @TestTemplate + public void testModificationWithParquetColumnStats() { + table.updateProperties().set(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "id", "true").commit(); + + table.updateSchema().renameColumn("id", "ID").commit(); + assertThat(table.properties()) + .containsEntry(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "ID", "true") + .doesNotContainKey(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "id"); + + table.updateSchema().deleteColumn("ID").commit(); + assertThat(table.properties()) + .doesNotContainKey(table.properties().get(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "ID")); + } + @TestTemplate public void testDeleteAndAddColumnReassign() { NameMapping mapping = MappingUtil.create(table.schema()); diff --git a/docs/docs/configuration.md b/docs/docs/configuration.md index c784566ef5f5..2c9f6608cb54 100644 --- a/docs/docs/configuration.md +++ b/docs/docs/configuration.md @@ -52,6 +52,7 @@ Iceberg tables support table properties to configure table behavior, like the de | write.parquet.bloom-filter-enabled.column.col1 | (not set) | Hint to parquet to write a bloom filter for the column: 'col1' | | write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset | | write.parquet.bloom-filter-fpp.column.col1 | 0.01 | The false positive probability for a bloom filter applied to 'col1' (must > 0.0 and < 1.0) | +| write.parquet.stats-enabled.column.col1 | (not set) | Controls whether to collect parquet column statistics for column 'col1' | | write.avro.compression-codec | gzip | Avro compression codec: gzip(deflate with 9 level), zstd, snappy, uncompressed | | write.avro.compression-level | null | Avro compression level | | write.orc.stripe-size-bytes | 67108864 (64 MB) | Define the default ORC stripe size, in bytes | diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 31f9e2a80a6b..6f68fbe150ff 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -30,6 +30,7 @@ import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; @@ -98,7 +99,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.PropertyUtil; @@ -120,6 +120,7 @@ import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type.ID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -308,31 +309,17 @@ private WriteBuilder createContextFunc( private void setBloomFilterConfig( Context context, - MessageType parquetSchema, + Map colNameToParquetPathMap, BiConsumer withBloomFilterEnabled, BiConsumer withBloomFilterFPP) { - Map fieldIdToParquetPath = - parquetSchema.getColumns().stream() - .filter(col -> col.getPrimitiveType().getId() != null) - .collect( - Collectors.toMap( - col -> col.getPrimitiveType().getId().intValue(), - col -> String.join(".", col.getPath()))); - context .columnBloomFilterEnabled() .forEach( (colPath, isEnabled) -> { - Types.NestedField fieldId = schema.findField(colPath); - if (fieldId == null) { - LOG.warn("Skipping bloom filter config for missing field: {}", colPath); - return; - } - - String parquetColumnPath = fieldIdToParquetPath.get(fieldId.fieldId()); + String parquetColumnPath = colNameToParquetPathMap.get(colPath); if (parquetColumnPath == null) { - LOG.warn("Skipping bloom filter config for missing field: {}", fieldId); + LOG.warn("Skipping bloom filter config for missing field: {}", colPath); return; } @@ -344,6 +331,24 @@ private void setBloomFilterConfig( }); } + private void setColumnStatsConfig( + Context context, + Map colNameToParquetPathMap, + BiConsumer withColumnStatsEnabled) { + + context + .columnStatsEnabled() + .forEach( + (colPath, isEnabled) -> { + String parquetColumnPath = colNameToParquetPathMap.get(colPath); + if (parquetColumnPath == null) { + LOG.warn("Skipping column statistics config for missing field: {}", colPath); + return; + } + withColumnStatsEnabled.accept(parquetColumnPath, Boolean.valueOf(isEnabled)); + }); + } + @Override public FileAppender build() throws IOException { Preconditions.checkNotNull(schema, "Schema is required"); @@ -401,6 +406,18 @@ public FileAppender build() throws IOException { Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key"); } + Map colNameToParquetPathMap = + type.getColumns().stream() + .filter( + col -> { + ID id = col.getPrimitiveType().getId(); + return (id != null) && (schema.findColumnName(id.intValue()) != null); + }) + .collect( + Collectors.toMap( + col -> schema.findColumnName(col.getPrimitiveType().getId().intValue()), + col -> String.join(".", col.getPath()))); + if (createWriterFunc != null) { Preconditions.checkArgument( writeSupport == null, "Cannot write with both write support and Parquet value writer"); @@ -421,7 +438,12 @@ public FileAppender build() throws IOException { .withMaxBloomFilterBytes(bloomFilterMaxBytes); setBloomFilterConfig( - context, type, propsBuilder::withBloomFilterEnabled, propsBuilder::withBloomFilterFPP); + context, + colNameToParquetPathMap, + propsBuilder::withBloomFilterEnabled, + propsBuilder::withBloomFilterFPP); + + setColumnStatsConfig(context, colNameToParquetPathMap, propsBuilder::withStatisticsEnabled); ParquetProperties parquetProperties = propsBuilder.build(); @@ -457,10 +479,13 @@ public FileAppender build() throws IOException { setBloomFilterConfig( context, - type, + colNameToParquetPathMap, parquetWriteBuilder::withBloomFilterEnabled, parquetWriteBuilder::withBloomFilterFPP); + setColumnStatsConfig( + context, colNameToParquetPathMap, parquetWriteBuilder::withStatisticsEnabled); + return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig); } } @@ -477,6 +502,7 @@ private static class Context { private final int bloomFilterMaxBytes; private final Map columnBloomFilterFpp; private final Map columnBloomFilterEnabled; + private final Map columnStatsEnabled; private final boolean dictionaryEnabled; private Context( @@ -491,6 +517,7 @@ private Context( int bloomFilterMaxBytes, Map columnBloomFilterFpp, Map columnBloomFilterEnabled, + Map columnStatsEnabled, boolean dictionaryEnabled) { this.rowGroupSize = rowGroupSize; this.pageSize = pageSize; @@ -503,6 +530,7 @@ private Context( this.bloomFilterMaxBytes = bloomFilterMaxBytes; this.columnBloomFilterFpp = columnBloomFilterFpp; this.columnBloomFilterEnabled = columnBloomFilterEnabled; + this.columnStatsEnabled = columnStatsEnabled; this.dictionaryEnabled = dictionaryEnabled; } @@ -564,6 +592,9 @@ static Context dataContext(Map config) { Map columnBloomFilterEnabled = PropertyUtil.propertiesWithPrefix(config, PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX); + Map columnStatsEnabled = + PropertyUtil.propertiesWithPrefix(config, PARQUET_COLUMN_STATS_ENABLED_PREFIX); + boolean dictionaryEnabled = PropertyUtil.propertyAsBoolean(config, ParquetOutputFormat.ENABLE_DICTIONARY, true); @@ -579,6 +610,7 @@ static Context dataContext(Map config) { bloomFilterMaxBytes, columnBloomFilterFpp, columnBloomFilterEnabled, + columnStatsEnabled, dictionaryEnabled); } @@ -647,6 +679,7 @@ static Context deleteContext(Map config) { PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT, ImmutableMap.of(), ImmutableMap.of(), + ImmutableMap.of(), dictionaryEnabled); } @@ -702,6 +735,10 @@ Map columnBloomFilterEnabled() { return columnBloomFilterEnabled; } + Map columnStatsEnabled() { + return columnStatsEnabled; + } + boolean dictionaryEnabled() { return dictionaryEnabled; } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java index ae0a822d3464..5eabeb02d32d 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java @@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.iceberg.Files.localInput; +import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; @@ -57,6 +58,7 @@ import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.schema.MessageType; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -219,6 +221,50 @@ public void testTwoLevelList() throws IOException { assertThat(recordRead.get("topbytes")).isEqualTo(expectedBinary); } + @Test + public void testColumnStatisticsEnabled() throws Exception { + Schema schema = + new Schema( + optional(1, "int_field", IntegerType.get()), + optional(2, "string_field", Types.StringType.get())); + + File file = createTempFile(temp); + + List records = Lists.newArrayListWithCapacity(5); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + for (int i = 1; i <= 5; i++) { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("int_field", i); + record.put("string_field", "test"); + records.add(record); + } + + write( + file, + schema, + ImmutableMap.builder() + .put(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "int_field", "true") + .put(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "string_field", "false") + .buildOrThrow(), + ParquetAvroWriter::buildWriter, + records.toArray(new GenericData.Record[] {})); + + InputFile inputFile = Files.localInput(file); + + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(inputFile))) { + for (BlockMetaData block : reader.getFooter().getBlocks()) { + for (ColumnChunkMetaData column : block.getColumns()) { + boolean emptyStats = column.getStatistics().isEmpty(); + if (column.getPath().toDotString().equals("int_field")) { + assertThat(emptyStats).as("int_field has statistics").isEqualTo(false); + } else if (column.getPath().toDotString().equals("string_field")) { + assertThat(emptyStats).as("string_field has statistics").isEqualTo(true); + } + } + } + } + } + private Pair generateFile( Function> createWriterFunc, int desiredRecordCount,