Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/SchemaUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,8 @@ private TableMetadata applyChangesToMetadata(TableMetadata metadata) {
Set<String> columnProperties =
ImmutableSet.of(
TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX,
TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX);
TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX,
TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX);
Map<String, String> updatedProperties =
PropertyUtil.applySchemaChanges(
newMetadata.properties(), deletedColumns, renamedColumns, columnProperties);
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ private TableProperties() {}
public static final String PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX =
"write.parquet.bloom-filter-enabled.column.";

public static final String PARQUET_COLUMN_STATS_ENABLED_PREFIX =
"write.parquet.stats-enabled.column.";

public static final String AVRO_COMPRESSION = "write.avro.compression-codec";
public static final String DELETE_AVRO_COMPRESSION = "write.delete.avro.compression-codec";
public static final String AVRO_COMPRESSION_DEFAULT = "gzip";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -191,6 +192,20 @@ public void testModificationWithParquetBloomConfig() {
table.properties().get(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "ID"));
}

@TestTemplate
public void testModificationWithParquetColumnStats() {
table.updateProperties().set(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "id", "true").commit();

table.updateSchema().renameColumn("id", "ID").commit();
assertThat(table.properties())
.containsEntry(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "ID", "true")
.doesNotContainKey(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "id");

table.updateSchema().deleteColumn("ID").commit();
assertThat(table.properties())
.doesNotContainKey(table.properties().get(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "ID"));
}

@TestTemplate
public void testDeleteAndAddColumnReassign() {
NameMapping mapping = MappingUtil.create(table.schema());
Expand Down
1 change: 1 addition & 0 deletions docs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Iceberg tables support table properties to configure table behavior, like the de
| write.parquet.bloom-filter-enabled.column.col1 | (not set) | Hint to parquet to write a bloom filter for the column: 'col1' |
| write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset |
| write.parquet.bloom-filter-fpp.column.col1 | 0.01 | The false positive probability for a bloom filter applied to 'col1' (must > 0.0 and < 1.0) |
| write.parquet.stats-enabled.column.col1 | (not set) | Controls whether to collect parquet column statistics for column 'col1' |
| write.avro.compression-codec | gzip | Avro compression codec: gzip(deflate with 9 level), zstd, snappy, uncompressed |
| write.avro.compression-level | null | Avro compression level |
| write.orc.stripe-size-bytes | 67108864 (64 MB) | Define the default ORC stripe size, in bytes |
Expand Down
77 changes: 57 additions & 20 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
Expand Down Expand Up @@ -98,7 +99,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.PropertyUtil;
Expand All @@ -120,6 +120,7 @@
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type.ID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -308,31 +309,17 @@ private WriteBuilder createContextFunc(

private void setBloomFilterConfig(
Context context,
MessageType parquetSchema,
Map<String, String> colNameToParquetPathMap,
BiConsumer<String, Boolean> withBloomFilterEnabled,
BiConsumer<String, Double> withBloomFilterFPP) {

Map<Integer, String> fieldIdToParquetPath =
parquetSchema.getColumns().stream()
.filter(col -> col.getPrimitiveType().getId() != null)
.collect(
Collectors.toMap(
col -> col.getPrimitiveType().getId().intValue(),
col -> String.join(".", col.getPath())));

context
.columnBloomFilterEnabled()
.forEach(
(colPath, isEnabled) -> {
Types.NestedField fieldId = schema.findField(colPath);
if (fieldId == null) {
LOG.warn("Skipping bloom filter config for missing field: {}", colPath);
Comment thread
huaxiangsun marked this conversation as resolved.
return;
}

String parquetColumnPath = fieldIdToParquetPath.get(fieldId.fieldId());
String parquetColumnPath = colNameToParquetPathMap.get(colPath);
if (parquetColumnPath == null) {
LOG.warn("Skipping bloom filter config for missing field: {}", fieldId);
LOG.warn("Skipping bloom filter config for missing field: {}", colPath);
return;
}

Expand All @@ -344,6 +331,24 @@ private void setBloomFilterConfig(
});
}

private void setColumnStatsConfig(
Context context,
Map<String, String> colNameToParquetPathMap,
BiConsumer<String, Boolean> withColumnStatsEnabled) {

context
.columnStatsEnabled()
.forEach(
(colPath, isEnabled) -> {
String parquetColumnPath = colNameToParquetPathMap.get(colPath);
if (parquetColumnPath == null) {
LOG.warn("Skipping column statistics config for missing field: {}", colPath);
return;
}
withColumnStatsEnabled.accept(parquetColumnPath, Boolean.valueOf(isEnabled));
});
}

@Override
public <D> FileAppender<D> build() throws IOException {
Preconditions.checkNotNull(schema, "Schema is required");
Expand Down Expand Up @@ -401,6 +406,18 @@ public <D> FileAppender<D> build() throws IOException {
Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key");
}

Map<String, String> colNameToParquetPathMap =
type.getColumns().stream()
.filter(
col -> {
ID id = col.getPrimitiveType().getId();
return (id != null) && (schema.findColumnName(id.intValue()) != null);
})
.collect(
Collectors.toMap(
col -> schema.findColumnName(col.getPrimitiveType().getId().intValue()),
col -> String.join(".", col.getPath())));

if (createWriterFunc != null) {
Preconditions.checkArgument(
writeSupport == null, "Cannot write with both write support and Parquet value writer");
Expand All @@ -421,7 +438,12 @@ public <D> FileAppender<D> build() throws IOException {
.withMaxBloomFilterBytes(bloomFilterMaxBytes);

setBloomFilterConfig(
context, type, propsBuilder::withBloomFilterEnabled, propsBuilder::withBloomFilterFPP);
context,
colNameToParquetPathMap,
propsBuilder::withBloomFilterEnabled,
propsBuilder::withBloomFilterFPP);

setColumnStatsConfig(context, colNameToParquetPathMap, propsBuilder::withStatisticsEnabled);

ParquetProperties parquetProperties = propsBuilder.build();

Expand Down Expand Up @@ -457,10 +479,13 @@ public <D> FileAppender<D> build() throws IOException {

setBloomFilterConfig(
context,
type,
colNameToParquetPathMap,
parquetWriteBuilder::withBloomFilterEnabled,
parquetWriteBuilder::withBloomFilterFPP);

setColumnStatsConfig(
context, colNameToParquetPathMap, parquetWriteBuilder::withStatisticsEnabled);

return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig);
}
}
Expand All @@ -477,6 +502,7 @@ private static class Context {
private final int bloomFilterMaxBytes;
private final Map<String, String> columnBloomFilterFpp;
private final Map<String, String> columnBloomFilterEnabled;
private final Map<String, String> columnStatsEnabled;
private final boolean dictionaryEnabled;

private Context(
Expand All @@ -491,6 +517,7 @@ private Context(
int bloomFilterMaxBytes,
Map<String, String> columnBloomFilterFpp,
Map<String, String> columnBloomFilterEnabled,
Map<String, String> columnStatsEnabled,
boolean dictionaryEnabled) {
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
Expand All @@ -503,6 +530,7 @@ private Context(
this.bloomFilterMaxBytes = bloomFilterMaxBytes;
this.columnBloomFilterFpp = columnBloomFilterFpp;
this.columnBloomFilterEnabled = columnBloomFilterEnabled;
this.columnStatsEnabled = columnStatsEnabled;
this.dictionaryEnabled = dictionaryEnabled;
}

Expand Down Expand Up @@ -564,6 +592,9 @@ static Context dataContext(Map<String, String> config) {
Map<String, String> columnBloomFilterEnabled =
PropertyUtil.propertiesWithPrefix(config, PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX);

Map<String, String> columnStatsEnabled =
PropertyUtil.propertiesWithPrefix(config, PARQUET_COLUMN_STATS_ENABLED_PREFIX);

boolean dictionaryEnabled =
PropertyUtil.propertyAsBoolean(config, ParquetOutputFormat.ENABLE_DICTIONARY, true);

Expand All @@ -579,6 +610,7 @@ static Context dataContext(Map<String, String> config) {
bloomFilterMaxBytes,
columnBloomFilterFpp,
columnBloomFilterEnabled,
columnStatsEnabled,
dictionaryEnabled);
}

Expand Down Expand Up @@ -647,6 +679,7 @@ static Context deleteContext(Map<String, String> config) {
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT,
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(),
dictionaryEnabled);
}

Expand Down Expand Up @@ -702,6 +735,10 @@ Map<String, String> columnBloomFilterEnabled() {
return columnBloomFilterEnabled;
}

Map<String, String> columnStatsEnabled() {
return columnStatsEnabled;
}

boolean dictionaryEnabled() {
return dictionaryEnabled;
}
Expand Down
46 changes: 46 additions & 0 deletions parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.iceberg.Files.localInput;
import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
Expand Down Expand Up @@ -57,6 +58,7 @@
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.schema.MessageType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -219,6 +221,50 @@ public void testTwoLevelList() throws IOException {
assertThat(recordRead.get("topbytes")).isEqualTo(expectedBinary);
}

@Test
public void testColumnStatisticsEnabled() throws Exception {
Schema schema =
new Schema(
optional(1, "int_field", IntegerType.get()),
optional(2, "string_field", Types.StringType.get()));

File file = createTempFile(temp);

List<GenericData.Record> records = Lists.newArrayListWithCapacity(5);
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());
for (int i = 1; i <= 5; i++) {
GenericData.Record record = new GenericData.Record(avroSchema);
record.put("int_field", i);
record.put("string_field", "test");
records.add(record);
}

write(
file,
schema,
ImmutableMap.<String, String>builder()
.put(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "int_field", "true")
Comment thread
huaxiangsun marked this conversation as resolved.
.put(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "string_field", "false")
.buildOrThrow(),
ParquetAvroWriter::buildWriter,
records.toArray(new GenericData.Record[] {}));

InputFile inputFile = Files.localInput(file);

try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(inputFile))) {
for (BlockMetaData block : reader.getFooter().getBlocks()) {
for (ColumnChunkMetaData column : block.getColumns()) {
boolean emptyStats = column.getStatistics().isEmpty();
if (column.getPath().toDotString().equals("int_field")) {
assertThat(emptyStats).as("int_field has statistics").isEqualTo(false);
} else if (column.getPath().toDotString().equals("string_field")) {
assertThat(emptyStats).as("string_field has statistics").isEqualTo(true);
}
}
}
}
}

private Pair<File, Long> generateFile(
Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
int desiredRecordCount,
Expand Down