Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.parquet.column.values.factory.ValuesWriterFactory;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;

/**
Expand Down Expand Up @@ -135,6 +136,8 @@ public static WriterVersion fromString(String name) {
private final Map<String, String> extraMetaData;
private final ColumnProperty<Boolean> statistics;
private final ColumnProperty<Boolean> sizeStatistics;
private final ColumnProperty<CompressionCodecName> columnCodecs;
private final ColumnProperty<Integer> columnCompressionLevels;

private ParquetProperties(Builder builder) {
this.pageSizeThreshold = builder.pageSize;
Expand Down Expand Up @@ -167,6 +170,8 @@ private ParquetProperties(Builder builder) {
this.extraMetaData = builder.extraMetaData;
this.statistics = builder.statistics.build();
this.sizeStatistics = builder.sizeStatistics.build();
this.columnCodecs = builder.columnCodecs.build();
this.columnCompressionLevels = builder.columnCompressionLevels.build();
}

public static Builder builder() {
Expand Down Expand Up @@ -370,6 +375,28 @@ public boolean getSizeStatisticsEnabled(ColumnDescriptor column) {
return sizeStatisticsEnabled;
}

/**
* Returns the compression codec configured for the given column, or {@code null} if no
* column-specific codec has been set (the caller should fall back to the job-level codec).
*
* @param column the column descriptor
* @return the per-column codec, or {@code null} if not set
*/
public CompressionCodecName getColumnCodec(ColumnDescriptor column) {
return columnCodecs.getValue(column);
}

/**
* Returns the compression level configured for the given column, or {@code null} if no
* column-specific level has been set.
*
* @param column the column descriptor
* @return the per-column compression level, or {@code null} if not set
*/
public Integer getColumnCompressionLevel(ColumnDescriptor column) {
return columnCompressionLevels.getValue(column);
}

@Override
public String toString() {
return "Parquet page size to " + getPageSizeThreshold() + '\n'
Expand All @@ -388,7 +415,9 @@ public String toString() {
+ "Page row count limit to " + getPageRowCountLimit() + '\n'
+ "Writing page checksums is: " + (getPageWriteChecksumEnabled() ? "on" : "off") + '\n'
+ "Statistics enabled: " + statisticsEnabled + '\n'
+ "Size statistics enabled: " + sizeStatisticsEnabled;
+ "Size statistics enabled: " + sizeStatisticsEnabled + '\n'
+ "Per-column codecs: " + columnCodecs + '\n'
+ "Per-column compression levels: " + columnCompressionLevels;
}

public static class Builder {
Expand Down Expand Up @@ -419,6 +448,8 @@ public static class Builder {
private Map<String, String> extraMetaData = new HashMap<>();
private final ColumnProperty.Builder<Boolean> statistics;
private final ColumnProperty.Builder<Boolean> sizeStatistics;
private final ColumnProperty.Builder<CompressionCodecName> columnCodecs;
private final ColumnProperty.Builder<Integer> columnCompressionLevels;

private Builder() {
enableDict = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED);
Expand All @@ -436,6 +467,8 @@ private Builder() {
ColumnProperty.<Integer>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER);
statistics = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_STATISTICS_ENABLED);
sizeStatistics = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_SIZE_STATISTICS_ENABLED);
columnCodecs = ColumnProperty.<CompressionCodecName>builder().withDefaultValue(null);
columnCompressionLevels = ColumnProperty.<Integer>builder().withDefaultValue(null);
}

private Builder(ParquetProperties toCopy) {
Expand All @@ -460,6 +493,8 @@ private Builder(ParquetProperties toCopy) {
this.extraMetaData = toCopy.extraMetaData;
this.statistics = ColumnProperty.builder(toCopy.statistics);
this.sizeStatistics = ColumnProperty.builder(toCopy.sizeStatistics);
this.columnCodecs = ColumnProperty.builder(toCopy.columnCodecs);
this.columnCompressionLevels = ColumnProperty.builder(toCopy.columnCompressionLevels);
}

/**
Expand Down Expand Up @@ -756,6 +791,32 @@ public Builder withSizeStatisticsEnabled(String columnPath, boolean enabled) {
return this;
}

/**
* Set the compression codec for the specified column.
*
* @param columnPath the path of the column (dot-string)
* @param codec the compression codec to use for this column
* @return this builder for method chaining
*/
public Builder withCompressionCodec(String columnPath, CompressionCodecName codec) {
this.columnCodecs.withValue(columnPath, Objects.requireNonNull(codec, "codec cannot be null"));
return this;
}

/**
* Set the compression level for the specified column.
* The valid range is codec-specific (e.g. ZSTD: 1–22 (default 3), GZIP: 0–9 or -1 (default 6), BROTLI: 0–11 (default 1)).
* Pass {@code null} to unset a previously configured level for that column.
*
* @param columnPath the path of the column (dot-string)
* @param level the compression level, or {@code null} to unset
* @return this builder for method chaining
*/
public Builder withCompressionLevel(String columnPath, Integer level) {
this.columnCompressionLevels.withValue(columnPath, level);
return this;
}

public ParquetProperties build() {
ParquetProperties properties = new ParquetProperties(this);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.column;

import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.ZSTD;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;

import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.junit.Before;
import org.junit.Test;

public class TestParquetProperties {

private static final MessageType SCHEMA = MessageTypeParser.parseMessageType(
"message test { required binary col_a; required int32 col_b; required double col_c; }");

private ColumnDescriptor colA;
private ColumnDescriptor colB;
private ColumnDescriptor colC;

@Before
public void setUp() {
colA = SCHEMA.getColumns().get(0);
colB = SCHEMA.getColumns().get(1);
colC = SCHEMA.getColumns().get(2);
}

@Test
public void columnCodec_notSet_returnsNull() {
ParquetProperties props = ParquetProperties.builder().build();
assertNull(props.getColumnCodec(colA));
}

@Test
public void columnLevel_notSet_returnsNull() {
ParquetProperties props = ParquetProperties.builder().build();
assertNull(props.getColumnCompressionLevel(colA));
}

@Test
public void columnCodec_setForColumn_returnsConfiguredCodec() {
ParquetProperties props = ParquetProperties.builder()
.withCompressionCodec("col_a", ZSTD)
.build();
assertEquals(ZSTD, props.getColumnCodec(colA));
}

@Test
public void columnCodec_setMultipleTimes_lastValueWins() {
ParquetProperties props = ParquetProperties.builder()
.withCompressionCodec("col_a", ZSTD)
.withCompressionCodec("col_a", SNAPPY)
.build();
assertEquals(SNAPPY, props.getColumnCodec(colA));
}

@Test
public void columnCodec_otherColumnsUnaffected() {
ParquetProperties props = ParquetProperties.builder()
.withCompressionCodec("col_a", ZSTD)
.build();
assertNull(props.getColumnCodec(colB));
assertNull(props.getColumnCodec(colC));
}

@Test
public void columnLevel_setForColumn_returnsConfiguredLevel() {
ParquetProperties props = ParquetProperties.builder()
.withCompressionLevel("col_a", 10)
.build();
assertEquals(Integer.valueOf(10), props.getColumnCompressionLevel(colA));
}

@Test
public void columnLevel_otherColumnsUnaffected() {
ParquetProperties props = ParquetProperties.builder()
.withCompressionLevel("col_a", 10)
.build();
assertNull(props.getColumnCompressionLevel(colB));
}

@Test
public void columnCodecAndLevel_multipleColumns_eachGetsOwn() {
ParquetProperties props = ParquetProperties.builder()
.withCompressionCodec("col_a", ZSTD)
.withCompressionLevel("col_a", 10)
.withCompressionCodec("col_b", SNAPPY)
.withCompressionCodec("col_c", GZIP)
.withCompressionLevel("col_c", 5)
.build();

assertEquals(ZSTD, props.getColumnCodec(colA));
assertEquals(Integer.valueOf(10), props.getColumnCompressionLevel(colA));

assertEquals(SNAPPY, props.getColumnCodec(colB));
assertNull(props.getColumnCompressionLevel(colB));

assertEquals(GZIP, props.getColumnCodec(colC));
assertEquals(Integer.valueOf(5), props.getColumnCompressionLevel(colC));
}

@Test
public void withCompressionCodec_nullCodec_throwsNullPointerException() {
assertThrows(NullPointerException.class,
() -> ParquetProperties.builder().withCompressionCodec("col_a", null));
}

@Test
public void copyBuilder_preservesColumnCodecAndLevel() {
ParquetProperties original = ParquetProperties.builder()
.withCompressionCodec("col_a", ZSTD)
.withCompressionLevel("col_a", 7)
.withCompressionCodec("col_b", SNAPPY)
.build();

ParquetProperties copy = ParquetProperties.copy(original).build();

assertEquals(ZSTD, copy.getColumnCodec(colA));
assertEquals(Integer.valueOf(7), copy.getColumnCompressionLevel(colA));
assertEquals(SNAPPY, copy.getColumnCodec(colB));
assertNull(copy.getColumnCompressionLevel(colB));
assertNull(copy.getColumnCodec(colC));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,30 @@ public interface CompressionCodecFactory {
*/
BytesInputCompressor getCompressor(CompressionCodecName codecName);

/**
* Returns a {@link BytesInputCompressor} instance for the specified codec name and compression level.
* <p>
* The compression level controls the trade-off between compression speed and ratio. The valid range
* and meaning of the level is codec-specific:
* <ul>
* <li>ZSTD: 1 (fastest) to 22 (best compression), default 3</li>
* <li>GZIP: 1 (fastest) to 9 (best compression), default 6</li>
* <li>BROTLI: 0 (fastest) to 11 (best compression), default 1</li>
* </ul>
* Implementations that do not support compression levels should ignore the {@code level} parameter
* and delegate to {@link #getCompressor(CompressionCodecName)}.
* <p>
* The compressor is not thread-safe, so one instance for each working thread is required.
*
* @param codecName the codec name which the compressor instance is to be returned
* @param level the compression level; codec-specific, ignored if the codec does not support levels
* @return the compressor instance for the specified codec name and level
* @see BytesInputCompressor#release()
*/
default BytesInputCompressor getCompressor(CompressionCodecName codecName, int level) {
return getCompressor(codecName);
}

/**
* Returns a {@link BytesInputDecompressor} instance for the specified codec name to be used for decompressing page
* data.
Expand Down
Loading