From 9b10eb4bbcdfa22ae14bf237783bf4757d5e1798 Mon Sep 17 00:00:00 2001 From: mengnalin Date: Fri, 20 Mar 2026 12:37:50 -0700 Subject: [PATCH 1/6] Add level-aware getCompressor default method to CompressionCodecFactory --- .../compression/CompressionCodecFactory.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java index 561dcb899c..187c74a0ed 100644 --- a/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java +++ b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java @@ -46,6 +46,30 @@ public interface CompressionCodecFactory { */ BytesInputCompressor getCompressor(CompressionCodecName codecName); + /** + * Returns a {@link BytesInputCompressor} instance for the specified codec name and compression level. + *

+ * The compression level controls the trade-off between compression speed and ratio. The valid range + * and meaning of the level is codec-specific: + *

+ * Implementations that do not support compression levels should ignore the {@code level} parameter + * and delegate to {@link #getCompressor(CompressionCodecName)}. + *

+ * The compressor is not thread-safe, so one instance for each working thread is required. + * + * @param codecName the codec name which the compressor instance is to be returned + * @param level the compression level; codec-specific, ignored if the codec does not support levels + * @return the compressor instance for the specified codec name and level + * @see BytesInputCompressor#release() + */ + default BytesInputCompressor getCompressor(CompressionCodecName codecName, int level) { + return getCompressor(codecName); + } + /** * Returns a {@link BytesInputDecompressor} instance for the specified codec name to be used for decompressing page * data. From 979144285a48d8b5a913f5d1b21d233a98a988dc Mon Sep 17 00:00:00 2001 From: mengnalin Date: Mon, 23 Mar 2026 13:43:15 -0700 Subject: [PATCH 2/6] Add level-aware getCompressor override to CodecFactory --- .../apache/parquet/hadoop/CodecFactory.java | 136 +++++++++++++++++- .../hadoop/TestDirectCodecFactory.java | 99 +++++++++++++ 2 files changed, 228 insertions(+), 7 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index eee5fa6083..8a56ac8ef5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -29,9 +29,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.zlib.ZlibCompressor; import org.apache.hadoop.util.ReflectionUtils; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.ByteBufferAllocator; @@ -45,10 +48,15 @@ public class CodecFactory implements CompressionCodecFactory { + private static final Logger LOG = LoggerFactory.getLogger(CodecFactory.class); + protected static final Map CODEC_BY_NAME = Collections.synchronizedMap(new HashMap()); - private final Map compressors = new HashMap<>(); + static final String GZIP_COMPRESS_LEVEL = "zlib.compress.level"; + static final String BROTLI_COMPRESS_QUALITY = "compression.brotli.quality"; + + private final Map compressors = new HashMap<>(); private final Map decompressors = new HashMap<>(); protected final ParquetConfiguration conf; @@ -250,10 +258,22 @@ public CompressionCodecName getCodecName() { @Override public BytesCompressor getCompressor(CompressionCodecName codecName) { - BytesCompressor comp = compressors.get(codecName); + String key = cacheKey(codecName); + BytesCompressor comp = compressors.get(key); if (comp == null) { comp = createCompressor(codecName); - compressors.put(codecName, comp); + compressors.put(key, comp); + } + return comp; + } + + @Override + public BytesCompressor getCompressor(CompressionCodecName codecName, int level) { + String key = cacheKey(codecName, level); + BytesCompressor comp = compressors.get(key); + if (comp == null) { + comp = createCompressorAtLevel(codecName, level); + compressors.put(key, comp); } return comp; } @@ -269,8 +289,7 @@ public BytesDecompressor getDecompressor(CompressionCodecName codecName) { } protected BytesCompressor createCompressor(CompressionCodecName codecName) { - CompressionCodec codec = getCodec(codecName); - return codec == null ? NO_OP_COMPRESSOR : new HeapBytesCompressor(codecName, codec); + return compressorForCodec(codecName, getCodec(codecName)); } protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { @@ -278,6 +297,104 @@ protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { return codec == null ? NO_OP_DECOMPRESSOR : new HeapBytesDecompressor(codec); } + private BytesCompressor createCompressorAtLevel(CompressionCodecName codecName, int level) { + return compressorForCodec(codecName, getCodecAtLevel(codecName, level)); + } + + private BytesCompressor compressorForCodec(CompressionCodecName codecName, CompressionCodec codec) { + return codec == null ? NO_OP_COMPRESSOR : new HeapBytesCompressor(codecName, codec); + } + + private static void validateZstdLevel(int level) { + if (level < 1 || level > 22) { + throw new BadConfigurationException("Unsupported ZSTD compression level: " + level + + ". Valid range is 1 (fastest) to 22 (best compression)."); + } + } + + private static void validateBrotliLevel(int level) { + if (level < 0 || level > 11) { + throw new BadConfigurationException("Unsupported Brotli compression level: " + level + + ". Valid range is 0 (fastest) to 11 (best compression)."); + } + } + + private static void validateGzipLevel(int level) { + if (level != -1 && (level < 0 || level > 9)) { + throw new BadConfigurationException("Unsupported GZIP compression level: " + level + + ". Valid range is 0 (no compression) to 9 (best compression), or -1 for default."); + } + } + + private static ZlibCompressor.CompressionLevel zlibCompressionLevel(int level) { + switch (level) { + case -1: return ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION; + case 0: return ZlibCompressor.CompressionLevel.NO_COMPRESSION; + case 1: return ZlibCompressor.CompressionLevel.BEST_SPEED; + case 2: return ZlibCompressor.CompressionLevel.TWO; + case 3: return ZlibCompressor.CompressionLevel.THREE; + case 4: return ZlibCompressor.CompressionLevel.FOUR; + case 5: return ZlibCompressor.CompressionLevel.FIVE; + case 6: return ZlibCompressor.CompressionLevel.SIX; + case 7: return ZlibCompressor.CompressionLevel.SEVEN; + case 8: return ZlibCompressor.CompressionLevel.EIGHT; + case 9: return ZlibCompressor.CompressionLevel.BEST_COMPRESSION; + default: throw new BadConfigurationException("Unsupported GZIP compression level: " + level + + ". Valid range is 0 (no compression) to 9 (best compression), or -1 for default."); + } + } + + /** + * Returns a {@link CompressionCodec} instance configured at the specified compression level. + * A level-specific {@link Configuration} snapshot is built so that the codec is initialized + * with the requested level rather than the global configuration value. + * For codecs that do not support levels the method falls back to {@link #getCodec(CompressionCodecName)}. + */ + private CompressionCodec getCodecAtLevel(CompressionCodecName codecName, int level) { + String codecClassName = codecName.getHadoopCompressionCodecClassName(); + if (codecClassName == null) { + return null; + } + String key = cacheKey(codecName, level); + CompressionCodec codec = CODEC_BY_NAME.get(key); + if (codec != null) { + return codec; + } + // Build a Configuration snapshot with the level explicitly set for this codec. + Configuration levelConf = new Configuration(ConfigurationUtil.createHadoopConfiguration(conf)); + switch (codecName) { + case ZSTD: + validateZstdLevel(level); + levelConf.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, level); + break; + case GZIP: + validateGzipLevel(level); + levelConf.setEnum(GZIP_COMPRESS_LEVEL, zlibCompressionLevel(level)); + break; + case BROTLI: + validateBrotliLevel(level); + levelConf.setInt(BROTLI_COMPRESS_QUALITY, level); + break; + default: + // Codec does not support levels; fall back to the default codec instance. + LOG.warn("Compression level {} is not supported for codec {} and will be ignored.", level, codecName); + return getCodec(codecName); + } + try { + Class codecClass; + try { + codecClass = Class.forName(codecClassName); + } catch (ClassNotFoundException e) { + codecClass = new Configuration(false).getClassLoader().loadClass(codecClassName); + } + codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, levelConf); + CODEC_BY_NAME.put(key, codec); + return codec; + } catch (ClassNotFoundException e) { + throw new BadConfigurationException("Class " + codecClassName + " was not found", e); + } + } + /** * @param codecName the requested codec * @return the corresponding hadoop codec. null if UNCOMPRESSED @@ -314,10 +431,10 @@ private String cacheKey(CompressionCodecName codecName) { String level = null; switch (codecName) { case GZIP: - level = conf.get("zlib.compress.level"); + level = conf.get(GZIP_COMPRESS_LEVEL); break; case BROTLI: - level = conf.get("compression.brotli.quality"); + level = conf.get(BROTLI_COMPRESS_QUALITY); break; case ZSTD: level = conf.get("parquet.compression.codec.zstd.level"); @@ -329,6 +446,11 @@ private String cacheKey(CompressionCodecName codecName) { return level == null ? codecClass : codecClass + ":" + level; } + private String cacheKey(CompressionCodecName codecName, int level) { + String codecClass = codecName.getHadoopCompressionCodecClassName(); + return (codecClass == null ? codecName.name() : codecClass) + ":" + level; + } + @Override public void release() { for (BytesCompressor compressor : compressors.values()) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java index c78ee09ecc..3d4b7db880 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java @@ -18,12 +18,17 @@ package org.apache.parquet.hadoop; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.BROTLI; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZ4; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZ4_RAW; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZO; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.ZSTD; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.HashSet; import java.util.Random; import java.util.Set; @@ -285,4 +290,98 @@ public void cachingKeysZstd() { Assert.assertEquals(codec_2_1, codec_2_2); Assert.assertNotEquals(codec_2_1, codec_5_1); } + + @Test + public void levelAwareCompressor_sameLevel_returnsCachedInstance() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + BytesInputCompressor c1 = factory.getCompressor(ZSTD, 3); + BytesInputCompressor c2 = factory.getCompressor(ZSTD, 3); + Assert.assertSame("Same codec+level should return the cached instance", c1, c2); + factory.release(); + } + + @Test + public void levelAwareCompressor_differentLevels_returnsDifferentInstances() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + BytesInputCompressor c1 = factory.getCompressor(ZSTD, 1); + BytesInputCompressor c3 = factory.getCompressor(ZSTD, 3); + Assert.assertNotSame("Different levels should return different compressor instances", c1, c3); + factory.release(); + } + + @Test + public void levelAwareCompressor_levelCacheIsolatedFromNoLevelCache() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + BytesInputCompressor noLevel = factory.getCompressor(ZSTD); + BytesInputCompressor withLevel = factory.getCompressor(ZSTD, 3); + Assert.assertNotSame( + "Level-aware and no-level compressors should use separate cache entries", noLevel, withLevel); + factory.release(); + } + + @Test + public void levelAwareCompressor_uncompressed_returnsNoOpCompressor() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + BytesInputCompressor comp = factory.getCompressor(UNCOMPRESSED, 5); + Assert.assertSame(CodecFactory.NO_OP_COMPRESSOR, comp); + factory.release(); + } + + @Test + public void levelAwareCompressor_snappy_ignoresLevel() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + BytesInputCompressor comp = factory.getCompressor(SNAPPY, 99); + Assert.assertNotNull(comp); + Assert.assertEquals(SNAPPY, comp.getCodecName()); + factory.release(); + } + + @Test + public void levelAwareCompressor_gzip_invalidLevel_throwsBadConfigurationException() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + try { + BadConfigurationException ex = Assert.assertThrows(BadConfigurationException.class, + () -> factory.getCompressor(GZIP, 99)); + Assert.assertTrue(ex.getMessage().contains("99")); + } finally { + factory.release(); + } + } + + @Test + public void levelAwareCompressor_gzip_validBoundaryLevels_noException() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + for (int level : new int[] {-1, 0, 1, 9}) { + BytesInputCompressor comp = factory.getCompressor(GZIP, level); + Assert.assertNotNull("Compressor should not be null for GZIP level " + level, comp); + Assert.assertEquals("Codec name should be GZIP for level " + level, GZIP, comp.getCodecName()); + } + factory.release(); + } + + @Test + public void levelAwareCompressor_zstd_roundTrip() throws IOException { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + byte[] original = "hello parquet per-column compression".getBytes(StandardCharsets.UTF_8); + BytesInputDecompressor decompressor = factory.getDecompressor(ZSTD); + for (int level : new int[] {1, 3, 10, 22}) { + BytesInput compressed = factory.getCompressor(ZSTD, level).compress(BytesInput.from(original)); + byte[] result = decompressor.decompress(compressed, original.length).toByteArray(); + Assert.assertArrayEquals("Round-trip failed at ZSTD level " + level, original, result); + } + factory.release(); + } + + @Test + public void levelAwareCompressor_gzip_roundTrip() throws IOException { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + byte[] original = "hello parquet per-column compression".getBytes(StandardCharsets.UTF_8); + BytesInputDecompressor decompressor = factory.getDecompressor(GZIP); + for (int level : new int[] {1, 5, 9}) { + BytesInput compressed = factory.getCompressor(GZIP, level).compress(BytesInput.from(original)); + byte[] result = decompressor.decompress(compressed, original.length).toByteArray(); + Assert.assertArrayEquals("Round-trip failed at GZIP level " + level, original, result); + } + factory.release(); + } } From 1c1053325c739a53f7f4db327ffae39f4d876909 Mon Sep 17 00:00:00 2001 From: mengnalin Date: Mon, 23 Mar 2026 13:57:58 -0700 Subject: [PATCH 3/6] Add per-column compression codec and level to ParquetProperties --- .../parquet/column/ParquetProperties.java | 63 +++++++- .../parquet/column/TestParquetProperties.java | 145 ++++++++++++++++++ 2 files changed, 207 insertions(+), 1 deletion(-) create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/TestParquetProperties.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index f29214b458..b87697ee66 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -39,6 +39,7 @@ import org.apache.parquet.column.values.factory.ValuesWriterFactory; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; /** @@ -135,6 +136,8 @@ public static WriterVersion fromString(String name) { private final Map extraMetaData; private final ColumnProperty statistics; private final ColumnProperty sizeStatistics; + private final ColumnProperty columnCodecs; + private final ColumnProperty columnCompressionLevels; private ParquetProperties(Builder builder) { this.pageSizeThreshold = builder.pageSize; @@ -167,6 +170,8 @@ private ParquetProperties(Builder builder) { this.extraMetaData = builder.extraMetaData; this.statistics = builder.statistics.build(); this.sizeStatistics = builder.sizeStatistics.build(); + this.columnCodecs = builder.columnCodecs.build(); + this.columnCompressionLevels = builder.columnCompressionLevels.build(); } public static Builder builder() { @@ -370,6 +375,28 @@ public boolean getSizeStatisticsEnabled(ColumnDescriptor column) { return sizeStatisticsEnabled; } + /** + * Returns the compression codec configured for the given column, or {@code null} if no + * column-specific codec has been set (the caller should fall back to the job-level codec). + * + * @param column the column descriptor + * @return the per-column codec, or {@code null} if not set + */ + public CompressionCodecName getColumnCodec(ColumnDescriptor column) { + return columnCodecs.getValue(column); + } + + /** + * Returns the compression level configured for the given column, or {@code null} if no + * column-specific level has been set. + * + * @param column the column descriptor + * @return the per-column compression level, or {@code null} if not set + */ + public Integer getColumnCompressionLevel(ColumnDescriptor column) { + return columnCompressionLevels.getValue(column); + } + @Override public String toString() { return "Parquet page size to " + getPageSizeThreshold() + '\n' @@ -388,7 +415,9 @@ public String toString() { + "Page row count limit to " + getPageRowCountLimit() + '\n' + "Writing page checksums is: " + (getPageWriteChecksumEnabled() ? "on" : "off") + '\n' + "Statistics enabled: " + statisticsEnabled + '\n' - + "Size statistics enabled: " + sizeStatisticsEnabled; + + "Size statistics enabled: " + sizeStatisticsEnabled + '\n' + + "Per-column codecs: " + columnCodecs + '\n' + + "Per-column compression levels: " + columnCompressionLevels; } public static class Builder { @@ -419,6 +448,8 @@ public static class Builder { private Map extraMetaData = new HashMap<>(); private final ColumnProperty.Builder statistics; private final ColumnProperty.Builder sizeStatistics; + private final ColumnProperty.Builder columnCodecs; + private final ColumnProperty.Builder columnCompressionLevels; private Builder() { enableDict = ColumnProperty.builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED); @@ -436,6 +467,8 @@ private Builder() { ColumnProperty.builder().withDefaultValue(DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER); statistics = ColumnProperty.builder().withDefaultValue(DEFAULT_STATISTICS_ENABLED); sizeStatistics = ColumnProperty.builder().withDefaultValue(DEFAULT_SIZE_STATISTICS_ENABLED); + columnCodecs = ColumnProperty.builder().withDefaultValue(null); + columnCompressionLevels = ColumnProperty.builder().withDefaultValue(null); } private Builder(ParquetProperties toCopy) { @@ -460,6 +493,8 @@ private Builder(ParquetProperties toCopy) { this.extraMetaData = toCopy.extraMetaData; this.statistics = ColumnProperty.builder(toCopy.statistics); this.sizeStatistics = ColumnProperty.builder(toCopy.sizeStatistics); + this.columnCodecs = ColumnProperty.builder(toCopy.columnCodecs); + this.columnCompressionLevels = ColumnProperty.builder(toCopy.columnCompressionLevels); } /** @@ -756,6 +791,32 @@ public Builder withSizeStatisticsEnabled(String columnPath, boolean enabled) { return this; } + /** + * Set the compression codec for the specified column. + * + * @param columnPath the path of the column (dot-string) + * @param codec the compression codec to use for this column + * @return this builder for method chaining + */ + public Builder withCompressionCodec(String columnPath, CompressionCodecName codec) { + this.columnCodecs.withValue(columnPath, Objects.requireNonNull(codec, "codec cannot be null")); + return this; + } + + /** + * Set the compression level for the specified column. + * The valid range is codec-specific (e.g. ZSTD: 1–22 (default 3), GZIP: 0–9 or -1 (default 6), BROTLI: 0–11 (default 1)). + * Pass {@code null} to unset a previously configured level for that column. + * + * @param columnPath the path of the column (dot-string) + * @param level the compression level, or {@code null} to unset + * @return this builder for method chaining + */ + public Builder withCompressionLevel(String columnPath, Integer level) { + this.columnCompressionLevels.withValue(columnPath, level); + return this; + } + public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(this); // we pass a constructed but uninitialized factory to ParquetProperties above as currently diff --git a/parquet-column/src/test/java/org/apache/parquet/column/TestParquetProperties.java b/parquet-column/src/test/java/org/apache/parquet/column/TestParquetProperties.java new file mode 100644 index 0000000000..be6b518559 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/TestParquetProperties.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column; + +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.ZSTD; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; + +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.Before; +import org.junit.Test; + +public class TestParquetProperties { + + private static final MessageType SCHEMA = MessageTypeParser.parseMessageType( + "message test { required binary col_a; required int32 col_b; required double col_c; }"); + + private ColumnDescriptor colA; + private ColumnDescriptor colB; + private ColumnDescriptor colC; + + @Before + public void setUp() { + colA = SCHEMA.getColumns().get(0); + colB = SCHEMA.getColumns().get(1); + colC = SCHEMA.getColumns().get(2); + } + + @Test + public void columnCodec_notSet_returnsNull() { + ParquetProperties props = ParquetProperties.builder().build(); + assertNull(props.getColumnCodec(colA)); + } + + @Test + public void columnLevel_notSet_returnsNull() { + ParquetProperties props = ParquetProperties.builder().build(); + assertNull(props.getColumnCompressionLevel(colA)); + } + + @Test + public void columnCodec_setForColumn_returnsConfiguredCodec() { + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .build(); + assertEquals(ZSTD, props.getColumnCodec(colA)); + } + + @Test + public void columnCodec_setMultipleTimes_lastValueWins() { + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .withCompressionCodec("col_a", SNAPPY) + .build(); + assertEquals(SNAPPY, props.getColumnCodec(colA)); + } + + @Test + public void columnCodec_otherColumnsUnaffected() { + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .build(); + assertNull(props.getColumnCodec(colB)); + assertNull(props.getColumnCodec(colC)); + } + + @Test + public void columnLevel_setForColumn_returnsConfiguredLevel() { + ParquetProperties props = ParquetProperties.builder() + .withCompressionLevel("col_a", 10) + .build(); + assertEquals(Integer.valueOf(10), props.getColumnCompressionLevel(colA)); + } + + @Test + public void columnLevel_otherColumnsUnaffected() { + ParquetProperties props = ParquetProperties.builder() + .withCompressionLevel("col_a", 10) + .build(); + assertNull(props.getColumnCompressionLevel(colB)); + } + + @Test + public void columnCodecAndLevel_multipleColumns_eachGetsOwn() { + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .withCompressionLevel("col_a", 10) + .withCompressionCodec("col_b", SNAPPY) + .withCompressionCodec("col_c", GZIP) + .withCompressionLevel("col_c", 5) + .build(); + + assertEquals(ZSTD, props.getColumnCodec(colA)); + assertEquals(Integer.valueOf(10), props.getColumnCompressionLevel(colA)); + + assertEquals(SNAPPY, props.getColumnCodec(colB)); + assertNull(props.getColumnCompressionLevel(colB)); + + assertEquals(GZIP, props.getColumnCodec(colC)); + assertEquals(Integer.valueOf(5), props.getColumnCompressionLevel(colC)); + } + + @Test + public void withCompressionCodec_nullCodec_throwsNullPointerException() { + assertThrows(NullPointerException.class, + () -> ParquetProperties.builder().withCompressionCodec("col_a", null)); + } + + @Test + public void copyBuilder_preservesColumnCodecAndLevel() { + ParquetProperties original = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .withCompressionLevel("col_a", 7) + .withCompressionCodec("col_b", SNAPPY) + .build(); + + ParquetProperties copy = ParquetProperties.copy(original).build(); + + assertEquals(ZSTD, copy.getColumnCodec(colA)); + assertEquals(Integer.valueOf(7), copy.getColumnCompressionLevel(colA)); + assertEquals(SNAPPY, copy.getColumnCodec(colB)); + assertNull(copy.getColumnCompressionLevel(colB)); + assertNull(copy.getColumnCodec(colC)); + } +} From 1a2ccd9a85968610cfefb44dde429846e0bc4d16 Mon Sep 17 00:00:00 2001 From: mengnalin Date: Mon, 23 Mar 2026 21:14:24 -0700 Subject: [PATCH 4/6] Add per-column codec resolution to ColumnChunkPageWriteStore --- .../hadoop/ColumnChunkPageWriteStore.java | 87 ++++++++--- .../hadoop/TestColumnChunkPageWriteStore.java | 138 ++++++++++++++++++ 2 files changed, 208 insertions(+), 17 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index d9e6ea0990..f46efc1f4d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.function.Function; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -44,6 +45,7 @@ import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; +import org.apache.parquet.compression.CompressionCodecFactory; import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; import org.apache.parquet.crypto.AesCipher; import org.apache.parquet.crypto.InternalColumnEncryptionSetup; @@ -53,6 +55,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.CodecFactory.BytesCompressor; import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder; import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; import org.apache.parquet.io.ParquetEncodingException; @@ -577,21 +580,7 @@ public ColumnChunkPageWriteStore( int columnIndexTruncateLength, boolean pageWriteChecksumEnabled) { this.schema = schema; - for (ColumnDescriptor path : schema.getColumns()) { - writers.put( - path, - new ColumnChunkPageWriter( - path, - compressor, - allocator, - columnIndexTruncateLength, - pageWriteChecksumEnabled, - null, - null, - null, - -1, - -1)); - } + initWriters(col -> compressor, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled, null, -1); } @Deprecated @@ -622,13 +611,77 @@ public ColumnChunkPageWriteStore( InternalFileEncryptor fileEncryptor, int rowGroupOrdinal) { this.schema = schema; + initWriters(col -> compressor, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled, fileEncryptor, rowGroupOrdinal); + } + + public ColumnChunkPageWriteStore( + CompressionCodecFactory codecFactory, + CompressionCodecName defaultCodec, + ParquetProperties parquetProperties, + MessageType schema, + ByteBufferAllocator allocator, + int columnIndexTruncateLength, + boolean pageWriteChecksumEnabled) { + this.schema = schema; + initWriters( + col -> resolveCompressor(col, codecFactory, defaultCodec, parquetProperties), + allocator, + columnIndexTruncateLength, + pageWriteChecksumEnabled, + null, + -1); + } + + public ColumnChunkPageWriteStore( + CompressionCodecFactory codecFactory, + CompressionCodecName defaultCodec, + ParquetProperties parquetProperties, + MessageType schema, + ByteBufferAllocator allocator, + int columnIndexTruncateLength, + boolean pageWriteChecksumEnabled, + InternalFileEncryptor fileEncryptor, + int rowGroupOrdinal) { + this.schema = schema; + initWriters( + col -> resolveCompressor(col, codecFactory, defaultCodec, parquetProperties), + allocator, + columnIndexTruncateLength, + pageWriteChecksumEnabled, + fileEncryptor, + rowGroupOrdinal); + } + + private static BytesInputCompressor resolveCompressor( + ColumnDescriptor column, + CompressionCodecFactory codecFactory, + CompressionCodecName defaultCodec, + ParquetProperties parquetProperties) { + CompressionCodecName columnCodec = parquetProperties.getColumnCodec(column); + CompressionCodecName codec = columnCodec != null ? columnCodec : defaultCodec; + Integer level = parquetProperties.getColumnCompressionLevel(column); + if (level != null && columnCodec == null) { + LOG.warn("Column '{}': compression level {} set without a per-column codec; " + + "applying level to the default codec ({}).", + ColumnPath.get(column.getPath()), level, defaultCodec); + } + return level != null ? codecFactory.getCompressor(codec, level) : codecFactory.getCompressor(codec); + } + + private void initWriters( + Function compressorFn, + ByteBufferAllocator allocator, + int columnIndexTruncateLength, + boolean pageWriteChecksumEnabled, + InternalFileEncryptor fileEncryptor, + int rowGroupOrdinal) { if (null == fileEncryptor) { for (ColumnDescriptor path : schema.getColumns()) { writers.put( path, new ColumnChunkPageWriter( path, - compressor, + compressorFn.apply(path), allocator, columnIndexTruncateLength, pageWriteChecksumEnabled, @@ -660,7 +713,7 @@ public ColumnChunkPageWriteStore( path, new ColumnChunkPageWriter( path, - compressor, + compressorFn.apply(path), allocator, columnIndexTruncateLength, pageWriteChecksumEnabled, diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java index a17cf678f5..f43b0ff11e 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -22,7 +22,9 @@ import static org.apache.parquet.column.Encoding.RLE; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.ZSTD; import static org.apache.parquet.schema.OriginalType.UTF8; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; @@ -32,6 +34,8 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; @@ -42,6 +46,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -63,6 +68,7 @@ import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; import org.apache.parquet.hadoop.ParquetFileWriter.Mode; +import org.apache.parquet.hadoop.BadConfigurationException; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -319,4 +325,136 @@ public void testColumnOrderV1() throws IOException { private BytesInputCompressor compressor(CompressionCodecName codec) { return new CodecFactory(conf, pageSize).getCompressor(codec); } + + @Test + public void perColumnCodec_defaultUsedWhenNotSet() throws Exception { + MessageType schema = MessageTypeParser.parseMessageType( + "message test { required binary col_a; required int32 col_b; }"); + ParquetProperties props = ParquetProperties.builder().build(); + + Map codecs = writeAndReadCodecs(schema, SNAPPY, props); + + assertEquals(SNAPPY, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + } + + @Test + public void perColumnCodec_overridesDefaultForOneColumn() throws Exception { + MessageType schema = MessageTypeParser.parseMessageType( + "message test { required binary col_a; required int32 col_b; }"); + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .build(); + + Map codecs = writeAndReadCodecs(schema, SNAPPY, props); + + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + } + + @Test + public void perColumnCodec_allColumnsOverridden() throws Exception { + MessageType schema = MessageTypeParser.parseMessageType( + "message test { required binary col_a; required int32 col_b; }"); + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .withCompressionCodec("col_b", GZIP) + .build(); + + Map codecs = writeAndReadCodecs(schema, SNAPPY, props); + + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(GZIP, codecs.get("col_b")); + } + + @Test + public void perColumnLevel_withCodec_roundTrip() throws Exception { + MessageType schema = MessageTypeParser.parseMessageType( + "message test { required binary col_a; required int32 col_b; }"); + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .withCompressionLevel("col_a", 10) + .build(); + + Map codecs = writeAndReadCodecs(schema, SNAPPY, props); + + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + } + + @Test + public void perColumnLevel_invalidZstdLevel_throwsBadConfigurationException() throws Exception { + MessageType schema = MessageTypeParser.parseMessageType( + "message test { required binary col_a; }"); + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .withCompressionLevel("col_a", 23) + .build(); + + BadConfigurationException ex = assertThrows(BadConfigurationException.class, + () -> writeAndReadCodecs(schema, SNAPPY, props)); + assertTrue(ex.getMessage().contains("23")); + } + + @Test + public void perColumnLevel_invalidGzipLevel_throwsBadConfigurationException() throws Exception { + MessageType schema = MessageTypeParser.parseMessageType( + "message test { required binary col_a; }"); + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", GZIP) + .withCompressionLevel("col_a", 10) + .build(); + + BadConfigurationException ex = assertThrows(BadConfigurationException.class, + () -> writeAndReadCodecs(schema, SNAPPY, props)); + assertTrue(ex.getMessage().contains("10")); + } + + private Map writeAndReadCodecs( + MessageType schema, CompressionCodecName defaultCodec, ParquetProperties props) throws Exception { + Path file = new Path("target/test/TestColumnChunkPageWriteStore/perColumnCodec.parquet"); + FileSystem fs = file.getFileSystem(conf); + fs.delete(file, false); + fs.mkdirs(file.getParent()); + + allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator()); + CodecFactory codecFactory = new CodecFactory(conf, pageSize); + + OutputFileForTesting outputFile = new OutputFileForTesting(file, conf); + ParquetFileWriter writer = new ParquetFileWriter( + outputFile, + schema, + Mode.CREATE, + ParquetWriter.DEFAULT_BLOCK_SIZE, + ParquetWriter.MAX_PADDING_SIZE_DEFAULT, + null, + ParquetProperties.builder().withAllocator(allocator).build()); + writer.start(); + writer.startBlock(1); + + try (ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore( + codecFactory, + defaultCodec, + props, + schema, + allocator, + Integer.MAX_VALUE, + ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED)) { + for (ColumnDescriptor col : schema.getColumns()) { + Statistics stats = Statistics.getBuilderForReading(col.getPrimitiveType()).build(); + store.getPageWriter(col).writePage(BytesInput.fromInt(42), 1, 1, stats, RLE, RLE, PLAIN); + } + store.flushToFileWriter(writer); + } + + writer.endBlock(); + writer.end(new HashMap<>()); + + ParquetMetadata footer = ParquetFileReader.readFooter(conf, file, NO_FILTER); + Map result = new HashMap<>(); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + result.put(col.getPath().toDotString(), col.getCodec()); + } + return result; + } } From 71da18937f9114e9254ada18de70668edcdf266e Mon Sep 17 00:00:00 2001 From: mengnalin Date: Mon, 23 Mar 2026 22:40:49 -0700 Subject: [PATCH 5/6] Thread CompressionCodecFactory through record writer stack --- .../hadoop/InternalParquetRecordWriter.java | 19 +- .../parquet/hadoop/ParquetRecordWriter.java | 33 ++- .../apache/parquet/hadoop/ParquetWriter.java | 29 ++- .../parquet/hadoop/TestParquetWriter.java | 213 ++++++++++++++++++ 4 files changed, 283 insertions(+), 11 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index dd51d1ef09..8aae03d522 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -28,10 +28,11 @@ import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; -import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; +import org.apache.parquet.compression.CompressionCodecFactory; import org.apache.parquet.crypto.InternalFileEncryptor; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.ColumnIOFactory; import org.apache.parquet.io.MessageColumnIO; @@ -51,7 +52,8 @@ class InternalParquetRecordWriter { private long rowGroupSizeThreshold; private final int rowGroupRecordCountThreshold; private long nextRowGroupSize; - private final BytesInputCompressor compressor; + private final CompressionCodecFactory codecFactory; + private final CompressionCodecName defaultCodec; private final boolean validating; private final ParquetProperties props; @@ -76,7 +78,8 @@ class InternalParquetRecordWriter { * @param schema the schema of the records * @param extraMetaData extra meta data to write in the footer of the file * @param rowGroupSize the size of a block in the file (this will be approximate) - * @param compressor the codec used to compress + * @param codecFactory factory used to create per-column compressors + * @param defaultCodec the default codec to use when no per-column codec is configured */ public InternalParquetRecordWriter( ParquetFileWriter parquetFileWriter, @@ -84,7 +87,8 @@ public InternalParquetRecordWriter( MessageType schema, Map extraMetaData, long rowGroupSize, - BytesInputCompressor compressor, + CompressionCodecFactory codecFactory, + CompressionCodecName defaultCodec, boolean validating, ParquetProperties props) { this.parquetFileWriter = parquetFileWriter; @@ -94,7 +98,8 @@ public InternalParquetRecordWriter( this.rowGroupSizeThreshold = rowGroupSize; this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit(); this.nextRowGroupSize = rowGroupSizeThreshold; - this.compressor = compressor; + this.codecFactory = codecFactory; + this.defaultCodec = defaultCodec; this.validating = validating; this.props = props; this.fileEncryptor = parquetFileWriter.getEncryptor(); @@ -109,7 +114,9 @@ public ParquetMetadata getFooter() { private void initStore() { ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore( - compressor, + codecFactory, + defaultCodec, + props, schema, props.getAllocator(), props.getColumnIndexTruncateLength(), diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java index 51528b10be..acf6ca3161 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java @@ -26,7 +26,9 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.compression.CompressionCodecFactory; import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; import org.apache.parquet.hadoop.CodecFactory.BytesCompressor; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -104,7 +106,8 @@ public ParquetRecordWriter( .withWriterVersion(writerVersion) .build(); internalWriter = new InternalParquetRecordWriter( - w, writeSupport, schema, extraMetaData, blockSize, compressor, validating, props); + w, writeSupport, schema, extraMetaData, blockSize, + singleCompressorFactory(compressor), compressor.getCodecName(), validating, props); this.memoryManager = null; this.codecFactory = null; } @@ -173,7 +176,8 @@ public ParquetRecordWriter( .withWriterVersion(writerVersion) .build(); internalWriter = new InternalParquetRecordWriter( - w, writeSupport, schema, extraMetaData, blockSize, compressor, validating, props); + w, writeSupport, schema, extraMetaData, blockSize, + singleCompressorFactory(compressor), compressor.getCodecName(), validating, props); this.memoryManager = Objects.requireNonNull(memoryManager, "memoryManager cannot be null"); memoryManager.addWriter(internalWriter, blockSize); this.codecFactory = null; @@ -207,13 +211,36 @@ public ParquetRecordWriter( schema, extraMetaData, blockSize, - codecFactory.getCompressor(codec), + codecFactory, + codec, validating, props); this.memoryManager = Objects.requireNonNull(memoryManager, "memoryManager cannot be null"); memoryManager.addWriter(internalWriter, blockSize); } + private static CompressionCodecFactory singleCompressorFactory(BytesInputCompressor compressor) { + return new CompressionCodecFactory() { + @Override + public BytesInputCompressor getCompressor(CompressionCodecName codecName) { + if (codecName != compressor.getCodecName()) { + throw new IllegalArgumentException( + "Per-column codec overrides are not supported by this writer. " + + "Requested: " + codecName + ", configured: " + compressor.getCodecName()); + } + return compressor; + } + + @Override + public BytesInputDecompressor getDecompressor(CompressionCodecName codecName) { + throw new UnsupportedOperationException("Decompression is not supported by this factory"); + } + + @Override + public void release() {} + }; + } + /** * {@inheritDoc} */ diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 8eb5f7f17b..b791a26bf2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -395,7 +395,6 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport fileWriter.start(); this.codecFactory = codecFactory; - CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(compressionCodecName); final Map extraMetadata; if (encodingProps.getExtraMetaData() == null @@ -418,7 +417,7 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport } this.writer = new InternalParquetRecordWriter( - fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, compressor, validating, encodingProps); + fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, codecFactory, compressionCodecName, validating, encodingProps); } public void write(T object) throws IOException { @@ -569,6 +568,32 @@ public SELF withCompressionCodec(CompressionCodecName codecName) { return self(); } + /** + * Override the compression codec for a specific column. + * + * @param columnPath dot-string path of the column (e.g. {@code "my_col"}) + * @param codecName the codec to use for that column + * @return this builder for method chaining. + */ + public SELF withCompressionCodec(String columnPath, CompressionCodecName codecName) { + encodingPropsBuilder.withCompressionCodec(columnPath, codecName); + return self(); + } + + /** + * Set the compression level for a specific column. + * The valid range is codec-specific (e.g. ZSTD: 1–22 (default 3), GZIP: 0–9 or -1 (default 6), BROTLI: 0–11 (default 1)). + * Pass {@code null} to unset a previously configured level for that column. + * + * @param columnPath dot-string path of the column (e.g. {@code "my_col"}) + * @param level codec-specific compression level, or {@code null} to unset + * @return this builder for method chaining. + */ + public SELF withCompressionLevel(String columnPath, Integer level) { + encodingPropsBuilder.withCompressionLevel(columnPath, level); + return self(); + } + /** * Set the {@link CompressionCodecFactory codec factory} used by the * constructed writer. diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index a7888b58d8..58492ca76c 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -18,6 +18,9 @@ */ package org.apache.parquet.hadoop; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.ZSTD; import static org.apache.parquet.column.Encoding.DELTA_BYTE_ARRAY; import static org.apache.parquet.column.Encoding.PLAIN; import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY; @@ -81,6 +84,7 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.hadoop.util.HadoopOutputFile; @@ -821,6 +825,215 @@ public void testParquetWriterBuilderCanNotConfigurePathAndFile() throws IOExcept ExampleParquetWriter.builder(path).withFile(outputFile).build()); } + @Test + public void perColumnCodec_overridesDefaultForOneColumn() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(INT32).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec(SNAPPY) + .withCompressionCodec("col_a", ZSTD) + .build()) { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + writer.write(f.newGroup().append("col_a", "hello").append("col_b", 1)); + } + + ParquetMetadata footer = readFooter(new Configuration(), path, NO_FILTER); + Map codecs = new HashMap<>(); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + codecs.put(col.getPath().toDotString(), col.getCodec()); + } + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + } + + @Test + public void perColumnCodec_defaultUsedWhenNoOverride() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(INT32).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec(GZIP) + .build()) { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + writer.write(f.newGroup().append("col_a", "hello").append("col_b", 1)); + } + + ParquetMetadata footer = readFooter(new Configuration(), path, NO_FILTER); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + assertEquals( + "Column " + col.getPath().toDotString() + " should use default codec", + GZIP, + col.getCodec()); + } + } + + @Test + public void perColumnLevel_dataRoundTrips() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(INT32).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec(SNAPPY) + .withCompressionCodec("col_a", ZSTD) + .withCompressionLevel("col_a", 1) + .build()) { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + writer.write(f.newGroup().append("col_a", "hello").append("col_b", 42)); + } + + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path).build()) { + Group group = reader.read(); + assertEquals("hello", group.getBinary("col_a", 0).toStringUsingUTF8()); + assertEquals(42, group.getInteger("col_b", 0)); + assertNull(reader.read()); + } + + ParquetMetadata footer = readFooter(new Configuration(), path, NO_FILTER); + Map codecs = new HashMap<>(); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + codecs.put(col.getPath().toDotString(), col.getCodec()); + } + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + } + + @Test + public void perColumnLevel_invalidZstdLevel_throwsBadConfigurationException() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + // ZSTD only supports levels 1-22; level 23 is invalid + Assert.assertThrows(BadConfigurationException.class, () -> { + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec("col_a", ZSTD) + .withCompressionLevel("col_a", 23) + .build()) { + // exception expected before first write + } + }); + } + + @Test + public void perColumnLevel_invalidLevel_throwsBadConfigurationException() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + // GZIP only supports levels -1 (default) through 9; level 10 is invalid + Assert.assertThrows(BadConfigurationException.class, () -> { + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec("col_a", GZIP) + .withCompressionLevel("col_a", 10) + .build()) { + // exception expected before first write + } + }); + } + + @Test + public void perColumnLevel_differentLevelsPerColumn_dataRoundTrips() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(BINARY).as(stringType()).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + // Both columns use ZSTD (from default), but at different levels. + // This exercises the level-only override path in resolveCompressor(). + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec(ZSTD) + .withCompressionLevel("col_a", 1) + .withCompressionLevel("col_b", 10) + .build()) { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + writer.write(f.newGroup().append("col_a", "fast").append("col_b", "best")); + } + + // Both columns must report ZSTD in the footer + ParquetMetadata footer = readFooter(new Configuration(), path, NO_FILTER); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + assertEquals( + "Column " + col.getPath().toDotString() + " should use ZSTD", + ZSTD, + col.getCodec()); + } + + // Data must survive the round-trip at both levels + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path).build()) { + Group group = reader.read(); + assertEquals("fast", group.getBinary("col_a", 0).toStringUsingUTF8()); + assertEquals("best", group.getBinary("col_b", 0).toStringUsingUTF8()); + assertNull(reader.read()); + } + } + + @Test + public void perColumnCodec_allColumnsOverridden() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(INT32).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec(SNAPPY) + .withCompressionCodec("col_a", ZSTD) + .withCompressionCodec("col_b", GZIP) + .build()) { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + writer.write(f.newGroup().append("col_a", "hello").append("col_b", 1)); + } + + ParquetMetadata footer = readFooter(new Configuration(), path, NO_FILTER); + Map codecs = new HashMap<>(); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + codecs.put(col.getPath().toDotString(), col.getCodec()); + } + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(GZIP, codecs.get("col_b")); + } + @Test public void testNoFlushAfterException() throws Exception { final File testDir = temp.newFile(); From f3e18ed143dd6434174e159ebc8e8c7502a8db4d Mon Sep 17 00:00:00 2001 From: mengnalin Date: Tue, 24 Mar 2026 11:22:42 -0700 Subject: [PATCH 6/6] Wire per-column compression into ParquetOutputFormat --- .../parquet/hadoop/ParquetOutputFormat.java | 17 +++ .../parquet/hadoop/TestParquetWriter.java | 138 ++++++++++++++++++ 2 files changed, 155 insertions(+) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 868ae634c1..4db288f455 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -163,6 +163,7 @@ public static enum JobSummaryLevel { public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled"; public static final String STATISTICS_ENABLED = "parquet.column.statistics.enabled"; public static final String SIZE_STATISTICS_ENABLED = "parquet.size.statistics.enabled"; + public static final String COLUMN_COMPRESSION_LEVEL_PREFIX = "parquet.compression.level"; public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { String level = conf.get(JOB_SUMMARY_LEVEL); @@ -222,6 +223,14 @@ public static void setCompression(Job job, CompressionCodecName compression) { getConfiguration(job).set(COMPRESSION, compression.name()); } + public static void setColumnCompression(Job job, String columnPath, CompressionCodecName codec) { + getConfiguration(job).set(COMPRESSION + '#' + columnPath, codec.name()); + } + + public static void setColumnCompressionLevel(Job job, String columnPath, int level) { + getConfiguration(job).setInt(COLUMN_COMPRESSION_LEVEL_PREFIX + '#' + columnPath, level); + } + public static void setEnableDictionary(Job job, boolean enableDictionary) { getConfiguration(job).setBoolean(ENABLE_DICTIONARY, enableDictionary); } @@ -546,6 +555,14 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp STATISTICS_ENABLED, key -> conf.getBoolean(key, ParquetProperties.DEFAULT_STATISTICS_ENABLED), propsBuilder::withStatisticsEnabled) + .withColumnConfig( + COMPRESSION, + key -> CompressionCodecName.fromConf(conf.get(key)), + propsBuilder::withCompressionCodec) + .withColumnConfig( + COLUMN_COMPRESSION_LEVEL_PREFIX, + key -> conf.getInt(key, -1), + propsBuilder::withCompressionLevel) .parseConfig(conf); ParquetProperties props = propsBuilder.build(); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index 58492ca76c..78edac8a94 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -53,6 +53,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordWriter; import net.openhft.hashing.LongHashFunction; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; @@ -1071,4 +1073,140 @@ public void testNoFlushAfterException() throws Exception { FileSystem fs = file.getFileSystem(conf); assertTrue(!fs.exists(file) || fs.getFileStatus(file).getLen() == 0); } + + + @Test + public void outputFormat_setColumnCompression_overridesDefaultForOneColumn() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(INT32).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + Job job = Job.getInstance(); + ParquetOutputFormat.setColumnCompression(job, "col_a", ZSTD); + + Map codecs = writeAndReadCodecsViaOutputFormat(schema, SNAPPY, job, path); + + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + } + + @Test + public void outputFormat_setColumnCompression_allColumnsOverridden() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(INT32).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + Job job = Job.getInstance(); + ParquetOutputFormat.setColumnCompression(job, "col_a", ZSTD); + ParquetOutputFormat.setColumnCompression(job, "col_b", GZIP); + + Map codecs = writeAndReadCodecsViaOutputFormat(schema, SNAPPY, job, path); + + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(GZIP, codecs.get("col_b")); + } + + @Test + public void outputFormat_setColumnCompression_defaultUsedWhenNotSet() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(INT32).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + Job job = Job.getInstance(); + + Map codecs = writeAndReadCodecsViaOutputFormat(schema, GZIP, job, path); + + assertEquals(GZIP, codecs.get("col_a")); + assertEquals(GZIP, codecs.get("col_b")); + } + + @Test + public void outputFormat_setColumnCompressionLevel_withCodec_dataRoundTrips() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(INT32).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + Job job = Job.getInstance(); + ParquetOutputFormat.setColumnCompression(job, "col_a", ZSTD); + ParquetOutputFormat.setColumnCompressionLevel(job, "col_a", 1); + + Map codecs = writeAndReadCodecsViaOutputFormat(schema, SNAPPY, job, path); + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path).build()) { + Group group = reader.read(); + assertEquals("hello", group.getBinary("col_a", 0).toStringUsingUTF8()); + assertEquals(42, group.getInteger("col_b", 0)); + assertNull(reader.read()); + } + } + + @Test + public void outputFormat_setColumnCompressionLevel_differentLevelsPerColumn() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(BINARY).as(stringType()).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + Job job = Job.getInstance(); + ParquetOutputFormat.setColumnCompressionLevel(job, "col_a", 1); + ParquetOutputFormat.setColumnCompressionLevel(job, "col_b", 10); + + Map codecs = writeAndReadCodecsViaOutputFormat(schema, ZSTD, job, path); + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(ZSTD, codecs.get("col_b")); + + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path).build()) { + Group group = reader.read(); + assertEquals("hello", group.getBinary("col_a", 0).toStringUsingUTF8()); + assertEquals("fast", group.getBinary("col_b", 0).toStringUsingUTF8()); + assertNull(reader.read()); + } + } + + @SuppressWarnings("unchecked") + private Map writeAndReadCodecsViaOutputFormat( + MessageType schema, CompressionCodecName defaultCodec, Job job, Path file) + throws Exception { + Configuration conf = job.getConfiguration(); + GroupWriteSupport.setSchema(schema, conf); + SimpleGroupFactory f = new SimpleGroupFactory(schema); + + Group group = f.newGroup(); + for (int i = 0; i < schema.getFieldCount(); i++) { + String name = schema.getFieldName(i); + switch (schema.getType(i).asPrimitiveType().getPrimitiveTypeName()) { + case BINARY: group.append(name, name.equals("col_a") ? "hello" : "fast"); break; + case INT32: group.append(name, 42); break; + default: break; + } + } + + ParquetOutputFormat outputFormat = new ParquetOutputFormat<>(new GroupWriteSupport()); + RecordWriter writer = outputFormat.getRecordWriter(conf, file, defaultCodec); + writer.write(null, group); + writer.close(null); + + ParquetMetadata footer = readFooter(conf, file, NO_FILTER); + Map result = new HashMap<>(); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + result.put(col.getPath().toDotString(), col.getCodec()); + } + return result; + } }