Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ private TableProperties() {}
public static final String PARQUET_COLUMN_STATS_ENABLED_PREFIX =
"write.parquet.stats-enabled.column.";

public static final String PARQUET_COLUMN_COMPRESSION_CODEC_PREFIX =
"write.parquet.compression-codec.column.";

public static final String PARQUET_COLUMN_COMPRESSION_LEVEL_PREFIX =
"write.parquet.compression-level.column.";

public static final String AVRO_COMPRESSION = "write.avro.compression-codec";
public static final String DELETE_AVRO_COMPRESSION = "write.delete.avro.compression-codec";
public static final String AVRO_COMPRESSION_DEFAULT = "gzip";
Expand Down
2 changes: 2 additions & 0 deletions docs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ Iceberg tables support table properties to configure table behavior, like the de
| write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size |
| write.parquet.compression-codec | zstd | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed |
| write.parquet.compression-level | null | Parquet compression level |
| write.parquet.compression-codec.column.col1 | (not set) | Per-column compression codec override for column 'col1': zstd, brotli, lz4, gzip, snappy, uncompressed |
| write.parquet.compression-level.column.col1 | (not set) | Per-column compression level override for column 'col1' |
| write.parquet.bloom-filter-enabled.column.col1 | (not set) | Hint to parquet to write a bloom filter for the column: 'col1' |
| write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset |
| write.parquet.bloom-filter-fpp.column.col1 | 0.01 | The false positive probability for a bloom filter applied to 'col1' (must > 0.0 and < 1.0) |
Expand Down
4 changes: 3 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ nessie = "0.107.4"
netty-buffer = "4.2.12.Final"
object-client-bundle = "3.3.2"
orc = "1.9.8"
parquet = "1.17.0"
# TODO: parquet per-column compression is not released yet; update to stable release once available.
# Related PRs: https://github.com/apache/parquet-java/pull/3526 and https://github.com/apache/parquet-java/pull/3396
parquet = "1.18.0"
roaringbitmap = "1.6.14"
scala-collection-compat = "2.14.0"
slf4j = "2.0.17"
Expand Down
78 changes: 78 additions & 0 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_NDV_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_COMPRESSION_CODEC_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_COMPRESSION_LEVEL_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;
Expand Down Expand Up @@ -354,6 +356,46 @@ private void setColumnStatsConfig(
});
}

private void setColumnCompressionConfig(
Context context,
Map<String, String> colNameToParquetPathMap,
BiConsumer<String, CompressionCodecName> withCompressionCodec,
BiConsumer<String, Integer> withCompressionLevel) {

Map<String, String> columnCompressionCodec = context.columnCompressionCodec();
Map<String, String> columnCompressionLevel = context.columnCompressionLevel();

Sets.union(columnCompressionCodec.keySet(), columnCompressionLevel.keySet())
.forEach(
colName -> {
String parquetColumnPath = colNameToParquetPathMap.get(colName);
if (parquetColumnPath == null) {
LOG.warn("Skipping per-column compression config for missing field: {}", colName);
return;
}

String codecStr = columnCompressionCodec.get(colName);
try {
withCompressionCodec.accept(
parquetColumnPath,
codecStr != null ? Context.toCodec(codecStr) : context.codec());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Invalid compression codec for column " + colName + ": " + codecStr, e);
}

String level = columnCompressionLevel.get(colName);
if (level != null) {
try {
withCompressionLevel.accept(parquetColumnPath, Integer.parseInt(level));
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Invalid compression level for column " + colName + ": " + level, e);
}
}
});
}

@Override
public <D> FileAppender<D> build() throws IOException {
Preconditions.checkNotNull(schema, "Schema is required");
Expand Down Expand Up @@ -451,6 +493,12 @@ public <D> FileAppender<D> build() throws IOException {

setColumnStatsConfig(context, colNameToParquetPathMap, propsBuilder::withStatisticsEnabled);

setColumnCompressionConfig(
context,
colNameToParquetPathMap,
propsBuilder::withCompressionCodec,
propsBuilder::withCompressionLevel);

ParquetProperties parquetProperties = propsBuilder.build();

return new org.apache.iceberg.parquet.ParquetWriter<>(
Expand Down Expand Up @@ -493,6 +541,12 @@ public <D> FileAppender<D> build() throws IOException {
setColumnStatsConfig(
context, colNameToParquetPathMap, parquetWriteBuilder::withStatisticsEnabled);

setColumnCompressionConfig(
context,
colNameToParquetPathMap,
parquetWriteBuilder::withCompressionCodec,
parquetWriteBuilder::withCompressionLevel);

return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig);
}
}
Expand All @@ -511,6 +565,8 @@ static class Context {
private final Map<String, String> columnBloomFilterNdv;
private final Map<String, String> columnBloomFilterEnabled;
private final Map<String, String> columnStatsEnabled;
private final Map<String, String> columnCompressionCodec;
private final Map<String, String> columnCompressionLevel;
private final boolean dictionaryEnabled;

private Context(
Expand All @@ -527,6 +583,8 @@ private Context(
Map<String, String> columnBloomFilterNdv,
Map<String, String> columnBloomFilterEnabled,
Map<String, String> columnStatsEnabled,
Map<String, String> columnCompressionCodec,
Map<String, String> columnCompressionLevel,
boolean dictionaryEnabled) {
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
Expand All @@ -541,6 +599,8 @@ private Context(
this.columnBloomFilterNdv = columnBloomFilterNdv;
this.columnBloomFilterEnabled = columnBloomFilterEnabled;
this.columnStatsEnabled = columnStatsEnabled;
this.columnCompressionCodec = columnCompressionCodec;
this.columnCompressionLevel = columnCompressionLevel;
this.dictionaryEnabled = dictionaryEnabled;
}

Expand Down Expand Up @@ -608,6 +668,12 @@ static Context dataContext(Map<String, String> config) {
Map<String, String> columnStatsEnabled =
PropertyUtil.propertiesWithPrefix(config, PARQUET_COLUMN_STATS_ENABLED_PREFIX);

Map<String, String> columnCompressionCodec =
PropertyUtil.propertiesWithPrefix(config, PARQUET_COLUMN_COMPRESSION_CODEC_PREFIX);

Map<String, String> columnCompressionLevel =
PropertyUtil.propertiesWithPrefix(config, PARQUET_COLUMN_COMPRESSION_LEVEL_PREFIX);

boolean dictionaryEnabled =
PropertyUtil.propertyAsBoolean(config, ParquetOutputFormat.ENABLE_DICTIONARY, true);

Expand All @@ -625,6 +691,8 @@ static Context dataContext(Map<String, String> config) {
columnBloomFilterNdv,
columnBloomFilterEnabled,
columnStatsEnabled,
columnCompressionCodec,
columnCompressionLevel,
dictionaryEnabled);
}

Expand Down Expand Up @@ -695,6 +763,8 @@ static Context deleteContext(Map<String, String> config) {
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(),
dictionaryEnabled);
}

Expand Down Expand Up @@ -758,6 +828,14 @@ Map<String, String> columnStatsEnabled() {
return columnStatsEnabled;
}

Map<String, String> columnCompressionCodec() {
return columnCompressionCodec;
}

Map<String, String> columnCompressionLevel() {
return columnCompressionLevel;
}

boolean dictionaryEnabled() {
return dictionaryEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private final long targetRowGroupSize;
private final Map<String, String> metadata;
private final ParquetProperties props;
private final CompressionCodecFactory.BytesInputCompressor compressor;
private final CompressionCodecFactory codecFactory;
private final CompressionCodecName codec;
private final MessageType parquetSchema;
private final ParquetValueWriter<T> model;
private final MetricsConfig metricsConfig;
Expand Down Expand Up @@ -89,8 +90,8 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
this.targetRowGroupSize = rowGroupSize;
this.props = properties;
this.metadata = ImmutableMap.copyOf(metadata);
this.compressor =
new ParquetCodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec);
this.codecFactory = new ParquetCodecFactory(conf, props.getPageSizeThreshold());
this.codec = codec;
this.parquetSchema = parquetSchema;
this.model = (ParquetValueWriter<T>) createWriterFunc.apply(schema, parquetSchema);
this.metricsConfig = metricsConfig;
Expand Down Expand Up @@ -237,7 +238,9 @@ private void startRowGroup() {

this.pageStore =
new ColumnChunkPageWriteStore(
compressor,
codecFactory,
codec,
props,
parquetSchema,
props.getAllocator(),
this.columnIndexTruncateLength,
Expand All @@ -260,8 +263,8 @@ public void close() throws IOException {
if (writer != null) {
writer.end(metadata);
}
if (compressor != null) {
compressor.release();
if (codecFactory != null) {
codecFactory.release();
}
}
}
Expand Down
Loading
Loading