Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d3728a1
Spark shredded variant implementation
aihuaxu Aug 24, 2025
edf772f
Add heuristics to determine the shredding schema
aihuaxu Nov 2, 2025
07b1722
Simplify heuristics to most common type
aihuaxu Jan 9, 2026
bb55257
Add to 4.1
aihuaxu Jan 15, 2026
110c802
Add tie break and INT/DECIMAL promotion
aihuaxu Jan 27, 2026
ef2f92c
Merge remote-tracking branch 'apache/main' into pr-14297
nssalian Mar 11, 2026
4f00355
Wire shredding writer through WriterFunction API
nssalian Mar 13, 2026
12e2e88
Fix decimal issue, null handling, heuristics and adding more tests
nssalian Mar 13, 2026
a4d2982
Merge remote-tracking branch 'apache/main' into pr-14297
nssalian Mar 19, 2026
24a1b0c
Adding BufferedFileAppender for deferred writer init
nssalian Mar 23, 2026
53c6125
Adding VariantShreddingAnalyzer and withFileSchema support
nssalian Mar 23, 2026
50b01a4
Wiring the variant shredding write path via BufferedFileAppender
nssalian Mar 23, 2026
0171f38
Merge remote-tracking branch 'apache/main' into pr-14297
nssalian Mar 23, 2026
b598b5c
Fix checkstyle violations in SchemaInferenceVisitor and SparkFileWrit…
nssalian Mar 23, 2026
f7b6d59
Merge remote-tracking branch 'apache/main' into pr-14297
nssalian Mar 23, 2026
9df8b43
Wire variant shredding write path through FormatModel API as per PR f…
nssalian Mar 30, 2026
7dae70e
Merge remote-tracking branch 'apache/main' into pr-14297
nssalian Mar 30, 2026
f763486
Fix decimal overflow, array pruning, and buffer lifecycle in variant …
nssalian Apr 7, 2026
73e00cb
Merge remote-tracking branch 'apache/main' into pr-14297
nssalian Apr 7, 2026
6608235
Test fix and pr comment
nssalian Apr 8, 2026
c4167eb
Merge remote-tracking branch 'apache/main' into pr-14297
nssalian Apr 18, 2026
a5121ff
Fixing PR comments
nssalian Apr 18, 2026
4f104b0
Update doc for spark config
nssalian Apr 18, 2026
9c5355d
Merge remote-tracking branch 'apache/main' into pr-14297
nssalian Apr 19, 2026
8bb90a3
add co-author
nssalian Mar 30, 2026
fe9b2de
Merge branch 'spark-write-iceberg-variant' of github.com:aihuaxu/iceb…
nssalian Apr 21, 2026
4b6a9f2
Merge remote-tracking branch 'apache/main' into pr-14297
nssalian Apr 21, 2026
c63155d
Core: Move DataTestHelpers to core and use in TestBufferedFileAppender
nssalian Apr 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ private TableProperties() {}
"write.delete.parquet.compression-level";
public static final String PARQUET_COMPRESSION_LEVEL_DEFAULT = null;

public static final String PARQUET_VARIANT_SHRED = "write.parquet.variant.shred";
public static final boolean PARQUET_VARIANT_SHRED_DEFAULT = false;
public static final String PARQUET_VARIANT_BUFFER_SIZE =
"write.parquet.variant.inference.buffer-size";
public static final int PARQUET_VARIANT_BUFFER_SIZE_DEFAULT = 100;

public static final String PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT =
"write.parquet.row-group-check-min-record-count";
public static final String DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT =
Expand Down
147 changes: 147 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/BufferedFileAppender.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

/**
* A FileAppender that buffers the first N rows, then creates a delegate appender via a factory.
*
* <p>The factory receives the buffered rows and is responsible for creating the real appender. Row
* replay is handled internally. All subsequent {@link #add} calls delegate directly to the real
* appender.
*
* <p>If fewer than {@code bufferSize} rows are written before close, the factory is called with
* whatever rows were buffered. If no rows were written, the factory is not called and no file is
* created on disk. In this case, {@link #metrics()} returns {@code new Metrics(0L)} and {@link
* #length()} returns {@code 0L}.
*
* @param <D> the row type
*/
public class BufferedFileAppender<D> implements FileAppender<D> {
private final int bufferRowCount;
private final Function<List<D>, FileAppender<D>> appenderFactory;
private final UnaryOperator<D> copyFunc;
private List<D> buffer;
private FileAppender<D> delegate;
private boolean closed = false;

/**
* @param bufferRowCount number of rows to buffer before creating the delegate appender
* @param appenderFactory given the buffered rows, creates the delegate appender
*/
public BufferedFileAppender(
int bufferRowCount, Function<List<D>, FileAppender<D>> appenderFactory) {
this(bufferRowCount, appenderFactory, UnaryOperator.identity());
}
Comment on lines +55 to +58
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this? As we only create BufferedFileAppender in ParquetFormatModel, rather than creating it in the engine as before.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing it means anyone using BufferedFileAppender outside of ParquetFormatModel must always provide a copy func even when their rows aren't reused. Happy to remove if that's ok for a future use case. I left it in there to be more flexible.


/**
* @param bufferRowCount number of rows to buffer before creating the delegate appender
* @param appenderFactory given the buffered rows, creates the delegate appender
* @param copyFunc copies a row before buffering (needed when row objects are reused, e.g. Spark
* InternalRow)
*/
public BufferedFileAppender(
int bufferRowCount,
Function<List<D>, FileAppender<D>> appenderFactory,
UnaryOperator<D> copyFunc) {
Preconditions.checkArgument(
bufferRowCount > 0, "bufferRowCount must be > 0, got %s", bufferRowCount);
Preconditions.checkNotNull(appenderFactory, "appenderFactory must not be null");
Preconditions.checkNotNull(copyFunc, "copyFunc must not be null");
Comment thread
pvary marked this conversation as resolved.
this.bufferRowCount = bufferRowCount;
this.appenderFactory = appenderFactory;
this.copyFunc = copyFunc;
this.buffer = Lists.newArrayListWithCapacity(bufferRowCount);
}

@Override
public void add(D datum) {
Preconditions.checkState(!closed, "Cannot add to a closed appender");
if (delegate != null) {
delegate.add(datum);
} else {
buffer.add(copyFunc.apply(datum));
if (buffer.size() >= bufferRowCount) {
initialize();
}
}
}

@Override
public Metrics metrics() {
Preconditions.checkState(closed, "Cannot return metrics for unclosed appender");
if (delegate == null) {
return new Metrics(0L);
}

return delegate.metrics();
Comment thread
pvary marked this conversation as resolved.
}

@Override
public long length() {
if (delegate != null) {
return delegate.length();
}
Comment thread
pvary marked this conversation as resolved.

// No bytes written to disk yet; data is buffered in memory
return 0L;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure about the 0 length?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since there is nothing buffered yet, 0 makes sense. Let me know if you prefer a different response

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use this length to decide if we should roll over to next file, right?
Does it cause the file includes more data when it later writes the actual data?

}

@Override
public List<Long> splitOffsets() {
if (delegate != null) {
return delegate.splitOffsets();
}

return null;
Comment thread
pvary marked this conversation as resolved.
}

@Override
public void close() throws IOException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens on a create().close() sequence with no data written? it should be a no-op. Is this tested?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add an empty close test

if (!closed) {
if (delegate == null && buffer != null && !buffer.isEmpty()) {
initialize();
}

if (delegate != null) {
Comment thread
pvary marked this conversation as resolved.
delegate.close();
}

this.closed = true;
this.buffer = null;
}
}

private void initialize() {
delegate = appenderFactory.apply(buffer);
Preconditions.checkState(delegate != null, "appenderFactory must not return null");
try {
buffer.forEach(delegate::add);
} finally {
buffer = null;
}
}
}
227 changes: 227 additions & 0 deletions core/src/test/java/org/apache/iceberg/io/TestBufferedFileAppender.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroIterable;
import org.apache.iceberg.data.DataTestHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.avro.DataWriter;
import org.apache.iceberg.data.avro.PlannedDataReader;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.inmemory.InMemoryOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestBufferedFileAppender {

private static final Schema SCHEMA =
new Schema(
Types.NestedField.required(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()));

private InMemoryOutputFile outputFile;
private GenericRecord record;

@BeforeEach
public void before() {
this.outputFile = new InMemoryOutputFile();
this.record = GenericRecord.create(SCHEMA);
}

private Function<List<Record>, FileAppender<Record>> avroFactory(OutputFile out) {
return bufferedRows -> {
try {
return Avro.write(out)
.createWriterFunc(DataWriter::create)
.schema(SCHEMA)
.overwrite()
.build();
} catch (IOException e) {
throw new RuntimeIOException(e);
}
};
}

private BufferedFileAppender<Record> createAppender(int bufferSize) {
return new BufferedFileAppender<>(bufferSize, avroFactory(outputFile), Record::copy);
}

private Record createRecord(long id, String data) {
return record.copy(ImmutableMap.of("id", id, "data", data));
}

private List<Record> readBack() throws IOException {
try (AvroIterable<Record> reader =
Avro.read(outputFile.toInputFile())
.project(SCHEMA)
.createResolvingReader(PlannedDataReader::create)
.build()) {
return Lists.newArrayList(reader);
}
}

@Test
public void testBufferFlushesOnThreshold() throws IOException {
BufferedFileAppender<Record> appender = createAppender(3);

appender.add(createRecord(1L, "a"));
appender.add(createRecord(2L, "b"));

// delegate not yet created, length should be 0
assertThat(appender.length()).isEqualTo(0L);

appender.add(createRecord(3L, "c"));

// delegate created after 3rd row, length should be > 0
assertThat(appender.length()).isGreaterThan(0L);

appender.add(createRecord(4L, "d"));
appender.add(createRecord(5L, "e"));
appender.close();

List<Record> expected =
Lists.newArrayList(
createRecord(1L, "a"),
createRecord(2L, "b"),
createRecord(3L, "c"),
createRecord(4L, "d"),
createRecord(5L, "e"));
DataTestHelpers.assertEquals(SCHEMA.asStruct(), expected, readBack());
}

@Test
public void testCloseWithPartialBuffer() throws IOException {
BufferedFileAppender<Record> appender = createAppender(10);

appender.add(createRecord(1L, "a"));
appender.add(createRecord(2L, "b"));
appender.add(createRecord(3L, "c"));

// buffer not full yet
assertThat(appender.length()).isEqualTo(0L);

// close flushes partial buffer through factory
appender.close();

List<Record> expected =
Lists.newArrayList(createRecord(1L, "a"), createRecord(2L, "b"), createRecord(3L, "c"));
DataTestHelpers.assertEquals(SCHEMA.asStruct(), expected, readBack());
}

@Test
public void testCopyFuncIsApplied() throws IOException {
BufferedFileAppender<Record> appender = createAppender(3);

// use a single mutable record, relying on copyFunc to snapshot it
record.set(0, 1L);
record.set(1, "first");
appender.add(record);

record.set(0, 2L);
record.set(1, "second");
appender.add(record);

record.set(0, 3L);
record.set(1, "third");
appender.add(record);

appender.close();

List<Record> expected =
Lists.newArrayList(
createRecord(1L, "first"), createRecord(2L, "second"), createRecord(3L, "third"));
DataTestHelpers.assertEquals(SCHEMA.asStruct(), expected, readBack());
}

@Test
public void testMetricsAfterClose() throws IOException {
BufferedFileAppender<Record> appender = createAppender(2);

appender.add(createRecord(1L, "a"));
appender.add(createRecord(2L, "b"));
appender.add(createRecord(3L, "c"));
appender.close();

assertThat(appender.metrics()).isNotNull();
assertThat(appender.metrics().recordCount()).isEqualTo(3L);
assertThat(appender.length()).isGreaterThan(0L);
}

@Test
public void testMetricsBeforeCloseThrows() throws IOException {
try (BufferedFileAppender<Record> appender = createAppender(10)) {
assertThatThrownBy(appender::metrics)
.isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot return metrics for unclosed appender");
}
}

@Test
public void testAddAfterCloseThrows() throws IOException {
try (BufferedFileAppender<Record> appender = createAppender(10)) {
appender.add(createRecord(1L, "a"));
appender.close();

assertThatThrownBy(() -> appender.add(createRecord(2L, "b")))
.isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot add to a closed appender");
}
}

@Test
public void testAddAllSpanningBuffer() throws IOException {
BufferedFileAppender<Record> appender = createAppender(2);

List<Record> records =
Lists.newArrayList(
createRecord(1L, "a"),
createRecord(2L, "b"),
createRecord(3L, "c"),
createRecord(4L, "d"));

appender.addAll(records);
appender.close();

DataTestHelpers.assertEquals(SCHEMA.asStruct(), records, readBack());
}

@Test
public void testCloseWithNoData() throws IOException {
BufferedFileAppender<Record> appender = createAppender(10);
// close immediately with no data written
appender.close();
// delegate was never created
assertThat(appender.length()).isEqualTo(0L);
assertThat(appender.metrics()).isNotNull();
assertThat(appender.metrics().recordCount()).isEqualTo(0L);
assertThat(appender.splitOffsets()).isNull();
}
}
2 changes: 2 additions & 0 deletions docs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ Iceberg tables support table properties to configure table behavior, like the de
| write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size |
| write.parquet.compression-codec | zstd | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed |
| write.parquet.compression-level | null | Parquet compression level |
| write.parquet.variant.shred | false | When true, variant columns are written with shredded Parquet encoding for improved query performance |
| write.parquet.variant.inference.buffer-size | 100 | Number of rows to buffer for schema inference when variant shredding is enabled |
| write.parquet.bloom-filter-enabled.column.col1 | (not set) | Hint to parquet to write a bloom filter for the column: 'col1' |
| write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset |
| write.parquet.bloom-filter-fpp.column.col1 | 0.01 | The false positive probability for a bloom filter applied to 'col1' (must > 0.0 and < 1.0) |
Expand Down
Loading
Loading