From 2a5ef9c08adc81f047e39fba293146671dae7348 Mon Sep 17 00:00:00 2001 From: Huaxiang Sun Date: Thu, 10 Apr 2025 19:32:00 -0700 Subject: [PATCH 1/6] add table properties to disable/enable parquet column statistics --- .../org/apache/iceberg/TableProperties.java | 7 ++ .../org/apache/iceberg/parquet/Parquet.java | 101 ++++++++++++++---- .../spark/data/TestSparkParquetWriter.java | 24 +++++ 3 files changed, 113 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 605e41fe90f1..e3bf77cc93df 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -174,6 +174,13 @@ private TableProperties() {} public static final String PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column."; + public static final String PARQUET_COLUMN_STATS_ENABLED_PREFIX = + "write.parquet.stats-enabled.column."; + + public static final String DEFAULT_PARQUET_COLUMN_STATS_ENABLED = + PARQUET_COLUMN_STATS_ENABLED_PREFIX + "default"; + public static final boolean DEFAULT_PARQUET_COLUMN_STATS_ENABLED_DEFAULT = true; + public static final String AVRO_COMPRESSION = "write.avro.compression-codec"; public static final String DELETE_AVRO_COMPRESSION = "write.delete.avro.compression-codec"; public static final String AVRO_COMPRESSION_DEFAULT = "gzip"; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 31f9e2a80a6b..b4a6fc13f061 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.parquet; +import static org.apache.iceberg.TableProperties.DEFAULT_PARQUET_COLUMN_STATS_ENABLED_DEFAULT; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_DICT_SIZE_BYTES; @@ -30,6 +31,7 @@ import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; @@ -306,33 +308,29 @@ private WriteBuilder createContextFunc( return this; } + // Utility method to get the column path + private String getParquetColumnPath(Map fieldIdToParquetPath, String colPath) { + Types.NestedField fieldId = schema.findField(colPath); + if (fieldId == null) { + return null; + } + + return fieldIdToParquetPath.get(fieldId.fieldId()); + } + private void setBloomFilterConfig( Context context, - MessageType parquetSchema, + Map fieldIdToParquetPath, BiConsumer withBloomFilterEnabled, BiConsumer withBloomFilterFPP) { - Map fieldIdToParquetPath = - parquetSchema.getColumns().stream() - .filter(col -> col.getPrimitiveType().getId() != null) - .collect( - Collectors.toMap( - col -> col.getPrimitiveType().getId().intValue(), - col -> String.join(".", col.getPath()))); - context .columnBloomFilterEnabled() .forEach( (colPath, isEnabled) -> { - Types.NestedField fieldId = schema.findField(colPath); - if (fieldId == null) { - LOG.warn("Skipping bloom filter config for missing field: {}", colPath); - return; - } - - String parquetColumnPath = fieldIdToParquetPath.get(fieldId.fieldId()); + String parquetColumnPath = getParquetColumnPath(fieldIdToParquetPath, colPath); if (parquetColumnPath == null) { - LOG.warn("Skipping bloom filter config for missing field: {}", fieldId); + LOG.warn("Skipping bloom filter config for missing field: {}", colPath); return; } @@ -344,6 +342,25 @@ private void setBloomFilterConfig( }); } + private void setColumnStatsConfig( + Context context, + Map fieldIdToParquetPath, + BiConsumer withColumnStatsEnabled) { + + context + .columnStatsEnabled() + .forEach( + (colPath, isEnabled) -> { + String parquetColumnPath = getParquetColumnPath(fieldIdToParquetPath, colPath); + if (parquetColumnPath == null) { + LOG.warn("Skipping column statistics config for missing field: {}", colPath); + return; + } + + withColumnStatsEnabled.accept(parquetColumnPath, Boolean.valueOf(isEnabled)); + }); + } + @Override public FileAppender build() throws IOException { Preconditions.checkNotNull(schema, "Schema is required"); @@ -365,6 +382,7 @@ public FileAppender build() throws IOException { int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount(); int bloomFilterMaxBytes = context.bloomFilterMaxBytes(); boolean dictionaryEnabled = context.dictionaryEnabled(); + boolean defaultColumnStatsEnabled = context.defaultColumnStatsEnabled(); if (compressionLevel != null) { switch (codec) { @@ -401,6 +419,14 @@ public FileAppender build() throws IOException { Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key"); } + Map fieldIdToParquetPath = + type.getColumns().stream() + .filter(col -> col.getPrimitiveType().getId() != null) + .collect( + Collectors.toMap( + col -> col.getPrimitiveType().getId().intValue(), + col -> String.join(".", col.getPath()))); + if (createWriterFunc != null) { Preconditions.checkArgument( writeSupport == null, "Cannot write with both write support and Parquet value writer"); @@ -419,9 +445,15 @@ public FileAppender build() throws IOException { .withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount) .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount) .withMaxBloomFilterBytes(bloomFilterMaxBytes); + propsBuilder.withStatisticsEnabled(defaultColumnStatsEnabled); setBloomFilterConfig( - context, type, propsBuilder::withBloomFilterEnabled, propsBuilder::withBloomFilterFPP); + context, + fieldIdToParquetPath, + propsBuilder::withBloomFilterEnabled, + propsBuilder::withBloomFilterFPP); + + setColumnStatsConfig(context, fieldIdToParquetPath, propsBuilder::withStatisticsEnabled); ParquetProperties parquetProperties = propsBuilder.build(); @@ -453,14 +485,18 @@ public FileAppender build() throws IOException { .withPageRowCountLimit(pageRowLimit) .withDictionaryEncoding(dictionaryEnabled) .withDictionaryPageSize(dictionaryPageSize) + .withStatisticsEnabled(defaultColumnStatsEnabled) .withEncryption(fileEncryptionProperties); setBloomFilterConfig( context, - type, + fieldIdToParquetPath, parquetWriteBuilder::withBloomFilterEnabled, parquetWriteBuilder::withBloomFilterFPP); + setColumnStatsConfig( + context, fieldIdToParquetPath, parquetWriteBuilder::withStatisticsEnabled); + return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig); } } @@ -477,6 +513,9 @@ private static class Context { private final int bloomFilterMaxBytes; private final Map columnBloomFilterFpp; private final Map columnBloomFilterEnabled; + + private final Map columnStatsEnabled; + private final boolean defaultColumnStatsEnabled; private final boolean dictionaryEnabled; private Context( @@ -491,6 +530,8 @@ private Context( int bloomFilterMaxBytes, Map columnBloomFilterFpp, Map columnBloomFilterEnabled, + Map columnStatsEnabled, + boolean defaultColumnStatsEnabled, boolean dictionaryEnabled) { this.rowGroupSize = rowGroupSize; this.pageSize = pageSize; @@ -503,7 +544,9 @@ private Context( this.bloomFilterMaxBytes = bloomFilterMaxBytes; this.columnBloomFilterFpp = columnBloomFilterFpp; this.columnBloomFilterEnabled = columnBloomFilterEnabled; + this.columnStatsEnabled = columnStatsEnabled; this.dictionaryEnabled = dictionaryEnabled; + this.defaultColumnStatsEnabled = defaultColumnStatsEnabled; } static Context dataContext(Map config) { @@ -564,6 +607,14 @@ static Context dataContext(Map config) { Map columnBloomFilterEnabled = PropertyUtil.propertiesWithPrefix(config, PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX); + Map columnStatsEnabled = + PropertyUtil.propertiesWithPrefix(config, PARQUET_COLUMN_STATS_ENABLED_PREFIX); + String defaultStatsEnabledStr = columnStatsEnabled.remove("default"); + boolean defaultStatsEnabled = + (defaultStatsEnabledStr == null) + ? DEFAULT_PARQUET_COLUMN_STATS_ENABLED_DEFAULT + : Boolean.valueOf(defaultStatsEnabledStr); + boolean dictionaryEnabled = PropertyUtil.propertyAsBoolean(config, ParquetOutputFormat.ENABLE_DICTIONARY, true); @@ -579,6 +630,8 @@ static Context dataContext(Map config) { bloomFilterMaxBytes, columnBloomFilterFpp, columnBloomFilterEnabled, + columnStatsEnabled, + defaultStatsEnabled, dictionaryEnabled); } @@ -647,6 +700,8 @@ static Context deleteContext(Map config) { PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT, ImmutableMap.of(), ImmutableMap.of(), + ImmutableMap.of(), + DEFAULT_PARQUET_COLUMN_STATS_ENABLED_DEFAULT, dictionaryEnabled); } @@ -702,6 +757,14 @@ Map columnBloomFilterEnabled() { return columnBloomFilterEnabled; } + Map columnStatsEnabled() { + return columnStatsEnabled; + } + + boolean defaultColumnStatsEnabled() { + return defaultColumnStatsEnabled; + } + boolean dictionaryEnabled() { return dictionaryEnabled; } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java index 73800d3cf3e0..f9822582d773 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java @@ -20,6 +20,7 @@ import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX; +import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; @@ -151,4 +152,27 @@ public void testFpp() throws IOException, NoSuchFieldException, IllegalAccessExc assertThat(fpp).isEqualTo(0.05); } } + + @Test + public void testColumnStatsEnabled() + throws IOException, NoSuchFieldException, IllegalAccessException { + File testFile = File.createTempFile("junit", null, temp.toFile()); + try (FileAppender writer = + Parquet.write(Files.localOutput(testFile)) + .schema(SCHEMA) + .set(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "id_long", "false") + .createWriterFunc( + msgType -> + SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType)) + .build()) { + // Using reflection to access the private 'props' field in ParquetWriter + Field propsField = writer.getClass().getDeclaredField("props"); + propsField.setAccessible(true); + ParquetProperties props = (ParquetProperties) propsField.get(writer); + MessageType parquetSchema = ParquetSchemaUtil.convert(SCHEMA, "test"); + ColumnDescriptor idlDescriptor = parquetSchema.getColumnDescription(new String[] {"id_long"}); + // Default statisticsEnabled should be true and for column id_long, it is disabled. + assertThat(props.getStatisticsEnabled(idlDescriptor)).isEqualTo(false); + } + } } From 2a60dcf4f43a16b43c7db2ced6d5b1034ba160ac Mon Sep 17 00:00:00 2001 From: Huaxiang Sun Date: Tue, 15 Apr 2025 13:09:53 -0700 Subject: [PATCH 2/6] Address review comments 1). Moved unitest to parquet module. 2). Added documentation for new table properties. --- docs/docs/configuration.md | 2 + .../org/apache/iceberg/parquet/Parquet.java | 4 +- .../apache/iceberg/parquet/TestParquet.java | 46 +++++++++++++++++++ .../spark/data/TestSparkParquetWriter.java | 24 ---------- 4 files changed, 50 insertions(+), 26 deletions(-) diff --git a/docs/docs/configuration.md b/docs/docs/configuration.md index c784566ef5f5..534db2097119 100644 --- a/docs/docs/configuration.md +++ b/docs/docs/configuration.md @@ -52,6 +52,8 @@ Iceberg tables support table properties to configure table behavior, like the de | write.parquet.bloom-filter-enabled.column.col1 | (not set) | Hint to parquet to write a bloom filter for the column: 'col1' | | write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset | | write.parquet.bloom-filter-fpp.column.col1 | 0.01 | The false positive probability for a bloom filter applied to 'col1' (must > 0.0 and < 1.0) | +| write.parquet.stats-enabled.column.default | true | Default flag to enable parquet column statistics for all columns in the table | +| write.parquet.stats-enabled.column.col1 | (not set) | Flag to enable parquet column statistics for column 'col1' to allow per-column tuning | | write.avro.compression-codec | gzip | Avro compression codec: gzip(deflate with 9 level), zstd, snappy, uncompressed | | write.avro.compression-level | null | Avro compression level | | write.orc.stripe-size-bytes | 67108864 (64 MB) | Define the default ORC stripe size, in bytes | diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index b4a6fc13f061..182d159292de 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -444,8 +444,8 @@ public FileAppender build() throws IOException { .withDictionaryPageSize(dictionaryPageSize) .withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount) .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount) - .withMaxBloomFilterBytes(bloomFilterMaxBytes); - propsBuilder.withStatisticsEnabled(defaultColumnStatsEnabled); + .withMaxBloomFilterBytes(bloomFilterMaxBytes) + .withStatisticsEnabled(defaultColumnStatsEnabled); setBloomFilterConfig( context, diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java index ae0a822d3464..5eabeb02d32d 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java @@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.iceberg.Files.localInput; +import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; @@ -57,6 +58,7 @@ import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.schema.MessageType; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -219,6 +221,50 @@ public void testTwoLevelList() throws IOException { assertThat(recordRead.get("topbytes")).isEqualTo(expectedBinary); } + @Test + public void testColumnStatisticsEnabled() throws Exception { + Schema schema = + new Schema( + optional(1, "int_field", IntegerType.get()), + optional(2, "string_field", Types.StringType.get())); + + File file = createTempFile(temp); + + List records = Lists.newArrayListWithCapacity(5); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + for (int i = 1; i <= 5; i++) { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("int_field", i); + record.put("string_field", "test"); + records.add(record); + } + + write( + file, + schema, + ImmutableMap.builder() + .put(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "int_field", "true") + .put(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "string_field", "false") + .buildOrThrow(), + ParquetAvroWriter::buildWriter, + records.toArray(new GenericData.Record[] {})); + + InputFile inputFile = Files.localInput(file); + + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(inputFile))) { + for (BlockMetaData block : reader.getFooter().getBlocks()) { + for (ColumnChunkMetaData column : block.getColumns()) { + boolean emptyStats = column.getStatistics().isEmpty(); + if (column.getPath().toDotString().equals("int_field")) { + assertThat(emptyStats).as("int_field has statistics").isEqualTo(false); + } else if (column.getPath().toDotString().equals("string_field")) { + assertThat(emptyStats).as("string_field has statistics").isEqualTo(true); + } + } + } + } + } + private Pair generateFile( Function> createWriterFunc, int desiredRecordCount, diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java index f9822582d773..73800d3cf3e0 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java @@ -20,7 +20,6 @@ import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX; -import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; @@ -152,27 +151,4 @@ public void testFpp() throws IOException, NoSuchFieldException, IllegalAccessExc assertThat(fpp).isEqualTo(0.05); } } - - @Test - public void testColumnStatsEnabled() - throws IOException, NoSuchFieldException, IllegalAccessException { - File testFile = File.createTempFile("junit", null, temp.toFile()); - try (FileAppender writer = - Parquet.write(Files.localOutput(testFile)) - .schema(SCHEMA) - .set(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "id_long", "false") - .createWriterFunc( - msgType -> - SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType)) - .build()) { - // Using reflection to access the private 'props' field in ParquetWriter - Field propsField = writer.getClass().getDeclaredField("props"); - propsField.setAccessible(true); - ParquetProperties props = (ParquetProperties) propsField.get(writer); - MessageType parquetSchema = ParquetSchemaUtil.convert(SCHEMA, "test"); - ColumnDescriptor idlDescriptor = parquetSchema.getColumnDescription(new String[] {"id_long"}); - // Default statisticsEnabled should be true and for column id_long, it is disabled. - assertThat(props.getStatisticsEnabled(idlDescriptor)).isEqualTo(false); - } - } } From 33662edc7be0e5372a5a424f1502c6bacf504e46 Mon Sep 17 00:00:00 2001 From: Huaxiang Sun Date: Mon, 21 Apr 2025 23:03:11 -0700 Subject: [PATCH 3/6] Address more review comments --- .../org/apache/iceberg/TableProperties.java | 5 ++- docs/docs/configuration.md | 4 +-- .../org/apache/iceberg/parquet/Parquet.java | 31 +++++++++++-------- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index e3bf77cc93df..e6578d1142b8 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -177,9 +177,8 @@ private TableProperties() {} public static final String PARQUET_COLUMN_STATS_ENABLED_PREFIX = "write.parquet.stats-enabled.column."; - public static final String DEFAULT_PARQUET_COLUMN_STATS_ENABLED = - PARQUET_COLUMN_STATS_ENABLED_PREFIX + "default"; - public static final boolean DEFAULT_PARQUET_COLUMN_STATS_ENABLED_DEFAULT = true; + public static final String PARQUET_COLUMN_STATS_ENABLED = "write.parquet.stats-enabled.default"; + public static final boolean PARQUET_COLUMN_STATS_ENABLED_DEFAULT = true; public static final String AVRO_COMPRESSION = "write.avro.compression-codec"; public static final String DELETE_AVRO_COMPRESSION = "write.delete.avro.compression-codec"; diff --git a/docs/docs/configuration.md b/docs/docs/configuration.md index 534db2097119..75f19a41287c 100644 --- a/docs/docs/configuration.md +++ b/docs/docs/configuration.md @@ -52,8 +52,8 @@ Iceberg tables support table properties to configure table behavior, like the de | write.parquet.bloom-filter-enabled.column.col1 | (not set) | Hint to parquet to write a bloom filter for the column: 'col1' | | write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset | | write.parquet.bloom-filter-fpp.column.col1 | 0.01 | The false positive probability for a bloom filter applied to 'col1' (must > 0.0 and < 1.0) | -| write.parquet.stats-enabled.column.default | true | Default flag to enable parquet column statistics for all columns in the table | -| write.parquet.stats-enabled.column.col1 | (not set) | Flag to enable parquet column statistics for column 'col1' to allow per-column tuning | +| write.parquet.stats-enabled.default | true | Controls whether to collect parquet column statistics when not specified on column level, currently not configurable [parquet-java#3189](https://github.com/apache/parquet-java/issues/3188) | +| write.parquet.stats-enabled.column.col1 | (not set) | Controls whether to collect parquet column statistics for column 'col1'; true or false | | write.avro.compression-codec | gzip | Avro compression codec: gzip(deflate with 9 level), zstd, snappy, uncompressed | | write.avro.compression-level | null | Avro compression level | | write.orc.stripe-size-bytes | 67108864 (64 MB) | Define the default ORC stripe size, in bytes | diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 182d159292de..76d355d267e3 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.parquet; -import static org.apache.iceberg.TableProperties.DEFAULT_PARQUET_COLUMN_STATS_ENABLED_DEFAULT; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_DICT_SIZE_BYTES; @@ -31,6 +30,8 @@ import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED; +import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT; @@ -309,13 +310,19 @@ private WriteBuilder createContextFunc( } // Utility method to get the column path - private String getParquetColumnPath(Map fieldIdToParquetPath, String colPath) { + private String getParquetColumnPath( + Map fieldIdToParquetPath, String colPath, String configStr) { Types.NestedField fieldId = schema.findField(colPath); if (fieldId == null) { + LOG.warn("Skipping {} config for missing field: {}", configStr, colPath); return null; } - return fieldIdToParquetPath.get(fieldId.fieldId()); + String columnPath = fieldIdToParquetPath.get(fieldId.fieldId()); + if (columnPath == null) { + LOG.warn("Skipping {}} config for missing field: {}", configStr, fieldId); + } + return columnPath; } private void setBloomFilterConfig( @@ -328,9 +335,9 @@ private void setBloomFilterConfig( .columnBloomFilterEnabled() .forEach( (colPath, isEnabled) -> { - String parquetColumnPath = getParquetColumnPath(fieldIdToParquetPath, colPath); + String parquetColumnPath = + getParquetColumnPath(fieldIdToParquetPath, colPath, "bloom filter"); if (parquetColumnPath == null) { - LOG.warn("Skipping bloom filter config for missing field: {}", colPath); return; } @@ -351,9 +358,9 @@ private void setColumnStatsConfig( .columnStatsEnabled() .forEach( (colPath, isEnabled) -> { - String parquetColumnPath = getParquetColumnPath(fieldIdToParquetPath, colPath); + String parquetColumnPath = + getParquetColumnPath(fieldIdToParquetPath, colPath, "column statistics"); if (parquetColumnPath == null) { - LOG.warn("Skipping column statistics config for missing field: {}", colPath); return; } @@ -513,7 +520,6 @@ private static class Context { private final int bloomFilterMaxBytes; private final Map columnBloomFilterFpp; private final Map columnBloomFilterEnabled; - private final Map columnStatsEnabled; private final boolean defaultColumnStatsEnabled; private final boolean dictionaryEnabled; @@ -609,11 +615,10 @@ static Context dataContext(Map config) { Map columnStatsEnabled = PropertyUtil.propertiesWithPrefix(config, PARQUET_COLUMN_STATS_ENABLED_PREFIX); - String defaultStatsEnabledStr = columnStatsEnabled.remove("default"); + boolean defaultStatsEnabled = - (defaultStatsEnabledStr == null) - ? DEFAULT_PARQUET_COLUMN_STATS_ENABLED_DEFAULT - : Boolean.valueOf(defaultStatsEnabledStr); + PropertyUtil.propertyAsBoolean( + config, PARQUET_COLUMN_STATS_ENABLED, PARQUET_COLUMN_STATS_ENABLED_DEFAULT); boolean dictionaryEnabled = PropertyUtil.propertyAsBoolean(config, ParquetOutputFormat.ENABLE_DICTIONARY, true); @@ -701,7 +706,7 @@ static Context deleteContext(Map config) { ImmutableMap.of(), ImmutableMap.of(), ImmutableMap.of(), - DEFAULT_PARQUET_COLUMN_STATS_ENABLED_DEFAULT, + PARQUET_COLUMN_STATS_ENABLED_DEFAULT, dictionaryEnabled); } From 3ddd50bc7a475b9ffd938f6c8b7f8a3653ec2197 Mon Sep 17 00:00:00 2001 From: Huaxiang Sun Date: Tue, 22 Apr 2025 22:26:27 -0700 Subject: [PATCH 4/6] Addess review comments about documentation --- docs/docs/configuration.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/docs/configuration.md b/docs/docs/configuration.md index 75f19a41287c..3ba1fd73e704 100644 --- a/docs/docs/configuration.md +++ b/docs/docs/configuration.md @@ -52,8 +52,8 @@ Iceberg tables support table properties to configure table behavior, like the de | write.parquet.bloom-filter-enabled.column.col1 | (not set) | Hint to parquet to write a bloom filter for the column: 'col1' | | write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset | | write.parquet.bloom-filter-fpp.column.col1 | 0.01 | The false positive probability for a bloom filter applied to 'col1' (must > 0.0 and < 1.0) | -| write.parquet.stats-enabled.default | true | Controls whether to collect parquet column statistics when not specified on column level, currently not configurable [parquet-java#3189](https://github.com/apache/parquet-java/issues/3188) | -| write.parquet.stats-enabled.column.col1 | (not set) | Controls whether to collect parquet column statistics for column 'col1'; true or false | +| write.parquet.stats-enabled.default | true | Controls whether to collect parquet column statistics when not specified on column level, can be configured to false with [parquet-java#3189](https://github.com/apache/parquet-java/issues/3188) | +| write.parquet.stats-enabled.column.col1 | (not set) | Controls whether to collect parquet column statistics for column 'col1' | | write.avro.compression-codec | gzip | Avro compression codec: gzip(deflate with 9 level), zstd, snappy, uncompressed | | write.avro.compression-level | null | Avro compression level | | write.orc.stripe-size-bytes | 67108864 (64 MB) | Define the default ORC stripe size, in bytes | From d95c09aa45485408cdf5751016138c4208e734af Mon Sep 17 00:00:00 2001 From: Huaxiang Sun Date: Mon, 5 May 2025 16:16:57 -0700 Subject: [PATCH 5/6] 1. Removed default config for column statistics, wait until the parquet release with fix is updated in Iceberg release. 2. Addressed the comments to remove fieldIdToParquetPath map, replaced with columnNameToParquetPath map. 3. Added missing code for schema update of the new parquet column statistics enable config. --- .../java/org/apache/iceberg/SchemaUpdate.java | 3 +- .../org/apache/iceberg/TableProperties.java | 3 - .../iceberg/TestSchemaAndMappingUpdate.java | 15 ++++ docs/docs/configuration.md | 1 - .../org/apache/iceberg/parquet/Parquet.java | 70 ++++++------------- 5 files changed, 38 insertions(+), 54 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java index db02a0e96e10..8f2bfe184cab 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java +++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java @@ -516,7 +516,8 @@ private TableMetadata applyChangesToMetadata(TableMetadata metadata) { Set columnProperties = ImmutableSet.of( TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX, - TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX); + TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX, + TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX); Map updatedProperties = PropertyUtil.applySchemaChanges( newMetadata.properties(), deletedColumns, renamedColumns, columnProperties); diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index e6578d1142b8..f16a0ed3da2b 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -177,9 +177,6 @@ private TableProperties() {} public static final String PARQUET_COLUMN_STATS_ENABLED_PREFIX = "write.parquet.stats-enabled.column."; - public static final String PARQUET_COLUMN_STATS_ENABLED = "write.parquet.stats-enabled.default"; - public static final boolean PARQUET_COLUMN_STATS_ENABLED_DEFAULT = true; - public static final String AVRO_COMPRESSION = "write.avro.compression-codec"; public static final String DELETE_AVRO_COMPRESSION = "write.delete.avro.compression-codec"; public static final String AVRO_COMPRESSION_DEFAULT = "gzip"; diff --git a/core/src/test/java/org/apache/iceberg/TestSchemaAndMappingUpdate.java b/core/src/test/java/org/apache/iceberg/TestSchemaAndMappingUpdate.java index dc6e9e1a7f85..84f1a2ec9043 100644 --- a/core/src/test/java/org/apache/iceberg/TestSchemaAndMappingUpdate.java +++ b/core/src/test/java/org/apache/iceberg/TestSchemaAndMappingUpdate.java @@ -19,6 +19,7 @@ package org.apache.iceberg; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX; +import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -191,6 +192,20 @@ public void testModificationWithParquetBloomConfig() { table.properties().get(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "ID")); } + @TestTemplate + public void testModificationWithParquetColumnStats() { + table.updateProperties().set(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "id", "true").commit(); + + table.updateSchema().renameColumn("id", "ID").commit(); + assertThat(table.properties()) + .containsEntry(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "ID", "true") + .doesNotContainKey(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "id"); + + table.updateSchema().deleteColumn("ID").commit(); + assertThat(table.properties()) + .doesNotContainKey(table.properties().get(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "ID")); + } + @TestTemplate public void testDeleteAndAddColumnReassign() { NameMapping mapping = MappingUtil.create(table.schema()); diff --git a/docs/docs/configuration.md b/docs/docs/configuration.md index 3ba1fd73e704..2c9f6608cb54 100644 --- a/docs/docs/configuration.md +++ b/docs/docs/configuration.md @@ -52,7 +52,6 @@ Iceberg tables support table properties to configure table behavior, like the de | write.parquet.bloom-filter-enabled.column.col1 | (not set) | Hint to parquet to write a bloom filter for the column: 'col1' | | write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset | | write.parquet.bloom-filter-fpp.column.col1 | 0.01 | The false positive probability for a bloom filter applied to 'col1' (must > 0.0 and < 1.0) | -| write.parquet.stats-enabled.default | true | Controls whether to collect parquet column statistics when not specified on column level, can be configured to false with [parquet-java#3189](https://github.com/apache/parquet-java/issues/3188) | | write.parquet.stats-enabled.column.col1 | (not set) | Controls whether to collect parquet column statistics for column 'col1' | | write.avro.compression-codec | gzip | Avro compression codec: gzip(deflate with 9 level), zstd, snappy, uncompressed | | write.avro.compression-level | null | Avro compression level | diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 76d355d267e3..ec5363d169c9 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -30,8 +30,6 @@ import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT; -import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED; -import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT; @@ -101,7 +99,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.PropertyUtil; @@ -309,25 +306,9 @@ private WriteBuilder createContextFunc( return this; } - // Utility method to get the column path - private String getParquetColumnPath( - Map fieldIdToParquetPath, String colPath, String configStr) { - Types.NestedField fieldId = schema.findField(colPath); - if (fieldId == null) { - LOG.warn("Skipping {} config for missing field: {}", configStr, colPath); - return null; - } - - String columnPath = fieldIdToParquetPath.get(fieldId.fieldId()); - if (columnPath == null) { - LOG.warn("Skipping {}} config for missing field: {}", configStr, fieldId); - } - return columnPath; - } - private void setBloomFilterConfig( Context context, - Map fieldIdToParquetPath, + Map colNameToParquetPathMap, BiConsumer withBloomFilterEnabled, BiConsumer withBloomFilterFPP) { @@ -335,9 +316,9 @@ private void setBloomFilterConfig( .columnBloomFilterEnabled() .forEach( (colPath, isEnabled) -> { - String parquetColumnPath = - getParquetColumnPath(fieldIdToParquetPath, colPath, "bloom filter"); + String parquetColumnPath = colNameToParquetPathMap.get(colPath); if (parquetColumnPath == null) { + LOG.warn("Skipping bloom filter config for missing field: {}", colPath); return; } @@ -351,19 +332,18 @@ private void setBloomFilterConfig( private void setColumnStatsConfig( Context context, - Map fieldIdToParquetPath, + Map colNameToParquetPathMap, BiConsumer withColumnStatsEnabled) { context .columnStatsEnabled() .forEach( (colPath, isEnabled) -> { - String parquetColumnPath = - getParquetColumnPath(fieldIdToParquetPath, colPath, "column statistics"); + String parquetColumnPath = colNameToParquetPathMap.get(colPath); if (parquetColumnPath == null) { + LOG.warn("Skipping column statistics config for missing field: {}", colPath); return; } - withColumnStatsEnabled.accept(parquetColumnPath, Boolean.valueOf(isEnabled)); }); } @@ -389,7 +369,6 @@ public FileAppender build() throws IOException { int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount(); int bloomFilterMaxBytes = context.bloomFilterMaxBytes(); boolean dictionaryEnabled = context.dictionaryEnabled(); - boolean defaultColumnStatsEnabled = context.defaultColumnStatsEnabled(); if (compressionLevel != null) { switch (codec) { @@ -426,12 +405,20 @@ public FileAppender build() throws IOException { Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key"); } - Map fieldIdToParquetPath = + Map colNameToParquetPathMap = type.getColumns().stream() .filter(col -> col.getPrimitiveType().getId() != null) + .filter( + col -> { + int fieldId = col.getPrimitiveType().getId().intValue(); + return schema.findColumnName(fieldId) != null; + }) .collect( Collectors.toMap( - col -> col.getPrimitiveType().getId().intValue(), + col -> { + int fieldId = col.getPrimitiveType().getId().intValue(); + return schema.findColumnName(fieldId); + }, col -> String.join(".", col.getPath()))); if (createWriterFunc != null) { @@ -451,16 +438,15 @@ public FileAppender build() throws IOException { .withDictionaryPageSize(dictionaryPageSize) .withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount) .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount) - .withMaxBloomFilterBytes(bloomFilterMaxBytes) - .withStatisticsEnabled(defaultColumnStatsEnabled); + .withMaxBloomFilterBytes(bloomFilterMaxBytes); setBloomFilterConfig( context, - fieldIdToParquetPath, + colNameToParquetPathMap, propsBuilder::withBloomFilterEnabled, propsBuilder::withBloomFilterFPP); - setColumnStatsConfig(context, fieldIdToParquetPath, propsBuilder::withStatisticsEnabled); + setColumnStatsConfig(context, colNameToParquetPathMap, propsBuilder::withStatisticsEnabled); ParquetProperties parquetProperties = propsBuilder.build(); @@ -492,17 +478,16 @@ public FileAppender build() throws IOException { .withPageRowCountLimit(pageRowLimit) .withDictionaryEncoding(dictionaryEnabled) .withDictionaryPageSize(dictionaryPageSize) - .withStatisticsEnabled(defaultColumnStatsEnabled) .withEncryption(fileEncryptionProperties); setBloomFilterConfig( context, - fieldIdToParquetPath, + colNameToParquetPathMap, parquetWriteBuilder::withBloomFilterEnabled, parquetWriteBuilder::withBloomFilterFPP); setColumnStatsConfig( - context, fieldIdToParquetPath, parquetWriteBuilder::withStatisticsEnabled); + context, colNameToParquetPathMap, parquetWriteBuilder::withStatisticsEnabled); return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig); } @@ -521,7 +506,6 @@ private static class Context { private final Map columnBloomFilterFpp; private final Map columnBloomFilterEnabled; private final Map columnStatsEnabled; - private final boolean defaultColumnStatsEnabled; private final boolean dictionaryEnabled; private Context( @@ -537,7 +521,6 @@ private Context( Map columnBloomFilterFpp, Map columnBloomFilterEnabled, Map columnStatsEnabled, - boolean defaultColumnStatsEnabled, boolean dictionaryEnabled) { this.rowGroupSize = rowGroupSize; this.pageSize = pageSize; @@ -552,7 +535,6 @@ private Context( this.columnBloomFilterEnabled = columnBloomFilterEnabled; this.columnStatsEnabled = columnStatsEnabled; this.dictionaryEnabled = dictionaryEnabled; - this.defaultColumnStatsEnabled = defaultColumnStatsEnabled; } static Context dataContext(Map config) { @@ -616,10 +598,6 @@ static Context dataContext(Map config) { Map columnStatsEnabled = PropertyUtil.propertiesWithPrefix(config, PARQUET_COLUMN_STATS_ENABLED_PREFIX); - boolean defaultStatsEnabled = - PropertyUtil.propertyAsBoolean( - config, PARQUET_COLUMN_STATS_ENABLED, PARQUET_COLUMN_STATS_ENABLED_DEFAULT); - boolean dictionaryEnabled = PropertyUtil.propertyAsBoolean(config, ParquetOutputFormat.ENABLE_DICTIONARY, true); @@ -636,7 +614,6 @@ static Context dataContext(Map config) { columnBloomFilterFpp, columnBloomFilterEnabled, columnStatsEnabled, - defaultStatsEnabled, dictionaryEnabled); } @@ -706,7 +683,6 @@ static Context deleteContext(Map config) { ImmutableMap.of(), ImmutableMap.of(), ImmutableMap.of(), - PARQUET_COLUMN_STATS_ENABLED_DEFAULT, dictionaryEnabled); } @@ -766,10 +742,6 @@ Map columnStatsEnabled() { return columnStatsEnabled; } - boolean defaultColumnStatsEnabled() { - return defaultColumnStatsEnabled; - } - boolean dictionaryEnabled() { return dictionaryEnabled; } From e76453af923a26d6a2497e832c25c4d51a0c34d4 Mon Sep 17 00:00:00 2001 From: Huaxiang Sun Date: Sun, 11 May 2025 07:56:11 -0700 Subject: [PATCH 6/6] refactor code --- .../main/java/org/apache/iceberg/parquet/Parquet.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index ec5363d169c9..6f68fbe150ff 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -120,6 +120,7 @@ import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type.ID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -407,18 +408,14 @@ public FileAppender build() throws IOException { Map colNameToParquetPathMap = type.getColumns().stream() - .filter(col -> col.getPrimitiveType().getId() != null) .filter( col -> { - int fieldId = col.getPrimitiveType().getId().intValue(); - return schema.findColumnName(fieldId) != null; + ID id = col.getPrimitiveType().getId(); + return (id != null) && (schema.findColumnName(id.intValue()) != null); }) .collect( Collectors.toMap( - col -> { - int fieldId = col.getPrimitiveType().getId().intValue(); - return schema.findColumnName(fieldId); - }, + col -> schema.findColumnName(col.getPrimitiveType().getId().intValue()), col -> String.join(".", col.getPath()))); if (createWriterFunc != null) {