diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 71991f633d97..d12a2ceb4285 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -183,6 +183,12 @@ private TableProperties() {} public static final String PARQUET_COLUMN_STATS_ENABLED_PREFIX = "write.parquet.stats-enabled.column."; + public static final String PARQUET_COLUMN_COMPRESSION_CODEC_PREFIX = + "write.parquet.compression-codec.column."; + + public static final String PARQUET_COLUMN_COMPRESSION_LEVEL_PREFIX = + "write.parquet.compression-level.column."; + public static final String AVRO_COMPRESSION = "write.avro.compression-codec"; public static final String DELETE_AVRO_COMPRESSION = "write.delete.avro.compression-codec"; public static final String AVRO_COMPRESSION_DEFAULT = "gzip"; diff --git a/docs/docs/configuration.md b/docs/docs/configuration.md index f12bcea6afd5..18e4aa26f12d 100644 --- a/docs/docs/configuration.md +++ b/docs/docs/configuration.md @@ -49,6 +49,8 @@ Iceberg tables support table properties to configure table behavior, like the de | write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size | | write.parquet.compression-codec | zstd | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed | | write.parquet.compression-level | null | Parquet compression level | +| write.parquet.compression-codec.column.col1 | (not set) | Per-column compression codec override for column 'col1': zstd, brotli, lz4, gzip, snappy, uncompressed | +| write.parquet.compression-level.column.col1 | (not set) | Per-column compression level override for column 'col1' | | write.parquet.bloom-filter-enabled.column.col1 | (not set) | Hint to parquet to write a bloom filter for the column: 'col1' | | write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset | | write.parquet.bloom-filter-fpp.column.col1 | 0.01 | The false positive probability for a bloom filter applied to 'col1' (must > 0.0 and < 1.0) | diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 87ceb9012bd6..85c738be5a0f 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -80,7 +80,9 @@ nessie = "0.107.4" netty-buffer = "4.2.12.Final" object-client-bundle = "3.3.2" orc = "1.9.8" -parquet = "1.17.0" +# TODO: parquet per-column compression is not released yet; update to stable release once available. +# Related PRs: https://github.com/apache/parquet-java/pull/3526 and https://github.com/apache/parquet-java/pull/3396 +parquet = "1.18.0" roaringbitmap = "1.6.14" scala-collection-compat = "2.14.0" slf4j = "2.0.17" diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 2387d52edf2f..3da82f16dc48 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -31,6 +31,8 @@ import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_NDV_PREFIX; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_COMPRESSION_CODEC_PREFIX; +import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_COMPRESSION_LEVEL_PREFIX; import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT; @@ -354,6 +356,46 @@ private void setColumnStatsConfig( }); } + private void setColumnCompressionConfig( + Context context, + Map colNameToParquetPathMap, + BiConsumer withCompressionCodec, + BiConsumer withCompressionLevel) { + + Map columnCompressionCodec = context.columnCompressionCodec(); + Map columnCompressionLevel = context.columnCompressionLevel(); + + Sets.union(columnCompressionCodec.keySet(), columnCompressionLevel.keySet()) + .forEach( + colName -> { + String parquetColumnPath = colNameToParquetPathMap.get(colName); + if (parquetColumnPath == null) { + LOG.warn("Skipping per-column compression config for missing field: {}", colName); + return; + } + + String codecStr = columnCompressionCodec.get(colName); + try { + withCompressionCodec.accept( + parquetColumnPath, + codecStr != null ? Context.toCodec(codecStr) : context.codec()); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "Invalid compression codec for column " + colName + ": " + codecStr, e); + } + + String level = columnCompressionLevel.get(colName); + if (level != null) { + try { + withCompressionLevel.accept(parquetColumnPath, Integer.parseInt(level)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "Invalid compression level for column " + colName + ": " + level, e); + } + } + }); + } + @Override public FileAppender build() throws IOException { Preconditions.checkNotNull(schema, "Schema is required"); @@ -451,6 +493,12 @@ public FileAppender build() throws IOException { setColumnStatsConfig(context, colNameToParquetPathMap, propsBuilder::withStatisticsEnabled); + setColumnCompressionConfig( + context, + colNameToParquetPathMap, + propsBuilder::withCompressionCodec, + propsBuilder::withCompressionLevel); + ParquetProperties parquetProperties = propsBuilder.build(); return new org.apache.iceberg.parquet.ParquetWriter<>( @@ -493,6 +541,12 @@ public FileAppender build() throws IOException { setColumnStatsConfig( context, colNameToParquetPathMap, parquetWriteBuilder::withStatisticsEnabled); + setColumnCompressionConfig( + context, + colNameToParquetPathMap, + parquetWriteBuilder::withCompressionCodec, + parquetWriteBuilder::withCompressionLevel); + return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig); } } @@ -511,6 +565,8 @@ static class Context { private final Map columnBloomFilterNdv; private final Map columnBloomFilterEnabled; private final Map columnStatsEnabled; + private final Map columnCompressionCodec; + private final Map columnCompressionLevel; private final boolean dictionaryEnabled; private Context( @@ -527,6 +583,8 @@ private Context( Map columnBloomFilterNdv, Map columnBloomFilterEnabled, Map columnStatsEnabled, + Map columnCompressionCodec, + Map columnCompressionLevel, boolean dictionaryEnabled) { this.rowGroupSize = rowGroupSize; this.pageSize = pageSize; @@ -541,6 +599,8 @@ private Context( this.columnBloomFilterNdv = columnBloomFilterNdv; this.columnBloomFilterEnabled = columnBloomFilterEnabled; this.columnStatsEnabled = columnStatsEnabled; + this.columnCompressionCodec = columnCompressionCodec; + this.columnCompressionLevel = columnCompressionLevel; this.dictionaryEnabled = dictionaryEnabled; } @@ -608,6 +668,12 @@ static Context dataContext(Map config) { Map columnStatsEnabled = PropertyUtil.propertiesWithPrefix(config, PARQUET_COLUMN_STATS_ENABLED_PREFIX); + Map columnCompressionCodec = + PropertyUtil.propertiesWithPrefix(config, PARQUET_COLUMN_COMPRESSION_CODEC_PREFIX); + + Map columnCompressionLevel = + PropertyUtil.propertiesWithPrefix(config, PARQUET_COLUMN_COMPRESSION_LEVEL_PREFIX); + boolean dictionaryEnabled = PropertyUtil.propertyAsBoolean(config, ParquetOutputFormat.ENABLE_DICTIONARY, true); @@ -625,6 +691,8 @@ static Context dataContext(Map config) { columnBloomFilterNdv, columnBloomFilterEnabled, columnStatsEnabled, + columnCompressionCodec, + columnCompressionLevel, dictionaryEnabled); } @@ -695,6 +763,8 @@ static Context deleteContext(Map config) { ImmutableMap.of(), ImmutableMap.of(), ImmutableMap.of(), + ImmutableMap.of(), + ImmutableMap.of(), dictionaryEnabled); } @@ -758,6 +828,14 @@ Map columnStatsEnabled() { return columnStatsEnabled; } + Map columnCompressionCodec() { + return columnCompressionCodec; + } + + Map columnCompressionLevel() { + return columnCompressionLevel; + } + boolean dictionaryEnabled() { return dictionaryEnabled; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index 2334e75532be..a1d78619c4a6 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -50,7 +50,8 @@ class ParquetWriter implements FileAppender, Closeable { private final long targetRowGroupSize; private final Map metadata; private final ParquetProperties props; - private final CompressionCodecFactory.BytesInputCompressor compressor; + private final CompressionCodecFactory codecFactory; + private final CompressionCodecName codec; private final MessageType parquetSchema; private final ParquetValueWriter model; private final MetricsConfig metricsConfig; @@ -89,8 +90,8 @@ class ParquetWriter implements FileAppender, Closeable { this.targetRowGroupSize = rowGroupSize; this.props = properties; this.metadata = ImmutableMap.copyOf(metadata); - this.compressor = - new ParquetCodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec); + this.codecFactory = new ParquetCodecFactory(conf, props.getPageSizeThreshold()); + this.codec = codec; this.parquetSchema = parquetSchema; this.model = (ParquetValueWriter) createWriterFunc.apply(schema, parquetSchema); this.metricsConfig = metricsConfig; @@ -237,7 +238,9 @@ private void startRowGroup() { this.pageStore = new ColumnChunkPageWriteStore( - compressor, + codecFactory, + codec, + props, parquetSchema, props.getAllocator(), this.columnIndexTruncateLength, @@ -260,8 +263,8 @@ public void close() throws IOException { if (writer != null) { writer.end(metadata); } - if (compressor != null) { - compressor.release(); + if (codecFactory != null) { + codecFactory.release(); } } } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java index 5f1e0c83cc0f..479ccd8b94c1 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java @@ -20,7 +20,10 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.iceberg.Files.localInput; +import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_COMPRESSION_CODEC_PREFIX; +import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_COMPRESSION_LEVEL_PREFIX; import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; @@ -65,6 +68,7 @@ import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.io.LocalOutputFile; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; @@ -378,4 +382,188 @@ private Pair generateFile( records.toArray(new GenericData.Record[] {})); return Pair.of(file, size); } + + @Test + public void testGlobalCompressionCodecAppliesToAllColumns() throws Exception { + Schema schema = + new Schema( + optional(1, "int_field", IntegerType.get()), + optional(2, "string_field", Types.StringType.get())); + + File file = createTempFile(temp); + + List records = Lists.newArrayListWithCapacity(5); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + for (int i = 1; i <= 5; i++) { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("int_field", i); + record.put("string_field", "test"); + records.add(record); + } + + write( + file, + schema, + ImmutableMap.builder() + .put(PARQUET_COMPRESSION, "snappy") + .buildOrThrow(), + ParquetAvroWriter::buildWriter, + records.toArray(new GenericData.Record[] {})); + + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(Files.localInput(file)))) { + for (BlockMetaData block : reader.getFooter().getBlocks()) { + for (ColumnChunkMetaData column : block.getColumns()) { + assertThat(column.getCodec()) + .as("column %s uses global snappy", column.getPath().toDotString()) + .isEqualTo(CompressionCodecName.SNAPPY); + } + } + } + } + + @Test + public void testPerColumnCompressionCodec() throws Exception { + Schema schema = + new Schema( + optional(1, "int_field", IntegerType.get()), + optional(2, "string_field", Types.StringType.get())); + + File file = createTempFile(temp); + + List records = Lists.newArrayListWithCapacity(5); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + for (int i = 1; i <= 5; i++) { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("int_field", i); + record.put("string_field", "test"); + records.add(record); + } + + write( + file, + schema, + ImmutableMap.builder() + .put(PARQUET_COMPRESSION, "zstd") + .put(PARQUET_COLUMN_COMPRESSION_CODEC_PREFIX + "int_field", "snappy") + .buildOrThrow(), + ParquetAvroWriter::buildWriter, + records.toArray(new GenericData.Record[] {})); + + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(Files.localInput(file)))) { + for (BlockMetaData block : reader.getFooter().getBlocks()) { + for (ColumnChunkMetaData column : block.getColumns()) { + if (column.getPath().toDotString().equals("int_field")) { + assertThat(column.getCodec()) + .as("int_field uses snappy") + .isEqualTo(CompressionCodecName.SNAPPY); + } else if (column.getPath().toDotString().equals("string_field")) { + assertThat(column.getCodec()) + .as("string_field uses global zstd") + .isEqualTo(CompressionCodecName.ZSTD); + } + } + } + } + } + + @Test + public void testPerColumnCompressionCodecForNonExistentColumnIsIgnored() throws Exception { + Schema schema = + new Schema( + optional(1, "int_field", IntegerType.get()), + optional(2, "string_field", Types.StringType.get())); + + File file = createTempFile(temp); + + List records = Lists.newArrayListWithCapacity(5); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + for (int i = 1; i <= 5; i++) { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("int_field", i); + record.put("string_field", "test"); + records.add(record); + } + + write( + file, + schema, + ImmutableMap.builder() + .put(PARQUET_COMPRESSION, "snappy") + .put(PARQUET_COLUMN_COMPRESSION_CODEC_PREFIX + "non_existent_field", "zstd") + .buildOrThrow(), + ParquetAvroWriter::buildWriter, + records.toArray(new GenericData.Record[] {})); + + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(Files.localInput(file)))) { + for (BlockMetaData block : reader.getFooter().getBlocks()) { + for (ColumnChunkMetaData column : block.getColumns()) { + assertThat(column.getCodec()) + .as("column %s falls back to global snappy", column.getPath().toDotString()) + .isEqualTo(CompressionCodecName.SNAPPY); + } + } + } + } + + @Test + public void testInvalidCompressionLevelThrows() throws Exception { + Schema schema = + new Schema( + optional(1, "int_field", IntegerType.get()), + optional(2, "string_field", Types.StringType.get())); + + List records = Lists.newArrayListWithCapacity(1); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("int_field", 1); + record.put("string_field", "test"); + records.add(record); + + File file = createTempFile(temp); + + assertThatThrownBy( + () -> + write( + file, + schema, + ImmutableMap.builder() + .put(PARQUET_COLUMN_COMPRESSION_CODEC_PREFIX + "int_field", "zstd") + .put(PARQUET_COLUMN_COMPRESSION_LEVEL_PREFIX + "int_field", "not-a-number") + .buildOrThrow(), + ParquetAvroWriter::buildWriter, + records.toArray(new GenericData.Record[] {}))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("not-a-number"); + } + + @Test + public void testInvalidPerColumnLevelWithGlobalCodecThrows() throws Exception { + Schema schema = + new Schema( + optional(1, "int_field", IntegerType.get()), + optional(2, "string_field", Types.StringType.get())); + + List records = Lists.newArrayListWithCapacity(1); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("int_field", 1); + record.put("string_field", "test"); + records.add(record); + + File file = createTempFile(temp); + + assertThatThrownBy( + () -> + write( + file, + schema, + ImmutableMap.builder() + .put(PARQUET_COMPRESSION, "zstd") + .put(PARQUET_COLUMN_COMPRESSION_LEVEL_PREFIX + "int_field", "not-a-number") + .buildOrThrow(), + ParquetAvroWriter::buildWriter, + records.toArray(new GenericData.Record[] {}))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("not-a-number"); + } }