-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Spark: Support writing shredded variant in Iceberg-Spark #14297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d3728a1
edf772f
07b1722
bb55257
110c802
ef2f92c
4f00355
12e2e88
a4d2982
24a1b0c
53c6125
50b01a4
0171f38
b598b5c
f7b6d59
9df8b43
7dae70e
f763486
73e00cb
6608235
c4167eb
a5121ff
4f104b0
9c5355d
8bb90a3
fe9b2de
4b6a9f2
c63155d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.io; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.List; | ||
| import java.util.function.Function; | ||
| import java.util.function.UnaryOperator; | ||
| import org.apache.iceberg.Metrics; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
|
|
||
| /** | ||
| * A FileAppender that buffers the first N rows, then creates a delegate appender via a factory. | ||
| * | ||
| * <p>The factory receives the buffered rows and is responsible for creating the real appender. Row | ||
| * replay is handled internally. All subsequent {@link #add} calls delegate directly to the real | ||
| * appender. | ||
| * | ||
| * <p>If fewer than {@code bufferSize} rows are written before close, the factory is called with | ||
| * whatever rows were buffered. If no rows were written, the factory is not called and no file is | ||
| * created on disk. In this case, {@link #metrics()} returns {@code new Metrics(0L)} and {@link | ||
| * #length()} returns {@code 0L}. | ||
| * | ||
| * @param <D> the row type | ||
| */ | ||
| public class BufferedFileAppender<D> implements FileAppender<D> { | ||
| private final int bufferRowCount; | ||
| private final Function<List<D>, FileAppender<D>> appenderFactory; | ||
| private final UnaryOperator<D> copyFunc; | ||
| private List<D> buffer; | ||
| private FileAppender<D> delegate; | ||
| private boolean closed = false; | ||
|
|
||
| /** | ||
| * @param bufferRowCount number of rows to buffer before creating the delegate appender | ||
| * @param appenderFactory given the buffered rows, creates the delegate appender | ||
| */ | ||
| public BufferedFileAppender( | ||
| int bufferRowCount, Function<List<D>, FileAppender<D>> appenderFactory) { | ||
| this(bufferRowCount, appenderFactory, UnaryOperator.identity()); | ||
| } | ||
|
|
||
| /** | ||
| * @param bufferRowCount number of rows to buffer before creating the delegate appender | ||
| * @param appenderFactory given the buffered rows, creates the delegate appender | ||
| * @param copyFunc copies a row before buffering (needed when row objects are reused, e.g. Spark | ||
| * InternalRow) | ||
| */ | ||
| public BufferedFileAppender( | ||
| int bufferRowCount, | ||
| Function<List<D>, FileAppender<D>> appenderFactory, | ||
| UnaryOperator<D> copyFunc) { | ||
| Preconditions.checkArgument( | ||
| bufferRowCount > 0, "bufferRowCount must be > 0, got %s", bufferRowCount); | ||
| Preconditions.checkNotNull(appenderFactory, "appenderFactory must not be null"); | ||
| Preconditions.checkNotNull(copyFunc, "copyFunc must not be null"); | ||
|
pvary marked this conversation as resolved.
|
||
| this.bufferRowCount = bufferRowCount; | ||
| this.appenderFactory = appenderFactory; | ||
| this.copyFunc = copyFunc; | ||
| this.buffer = Lists.newArrayListWithCapacity(bufferRowCount); | ||
| } | ||
|
|
||
| @Override | ||
| public void add(D datum) { | ||
| Preconditions.checkState(!closed, "Cannot add to a closed appender"); | ||
| if (delegate != null) { | ||
| delegate.add(datum); | ||
| } else { | ||
| buffer.add(copyFunc.apply(datum)); | ||
| if (buffer.size() >= bufferRowCount) { | ||
| initialize(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public Metrics metrics() { | ||
| Preconditions.checkState(closed, "Cannot return metrics for unclosed appender"); | ||
| if (delegate == null) { | ||
| return new Metrics(0L); | ||
| } | ||
|
|
||
| return delegate.metrics(); | ||
|
pvary marked this conversation as resolved.
|
||
| } | ||
|
|
||
| @Override | ||
| public long length() { | ||
| if (delegate != null) { | ||
| return delegate.length(); | ||
| } | ||
|
pvary marked this conversation as resolved.
|
||
|
|
||
| // No bytes written to disk yet; data is buffered in memory | ||
| return 0L; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sure about the 0 length?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since there is nothing buffered yet, 0 makes sense. Let me know if you prefer a different response
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We use this length to decide if we should roll over to next file, right? |
||
| } | ||
|
|
||
| @Override | ||
| public List<Long> splitOffsets() { | ||
| if (delegate != null) { | ||
| return delegate.splitOffsets(); | ||
| } | ||
|
|
||
| return null; | ||
|
pvary marked this conversation as resolved.
|
||
| } | ||
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what happens on a create().close() sequence with no data written? it should be a no-op. Is this tested?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll add an empty close test |
||
| if (!closed) { | ||
| if (delegate == null && buffer != null && !buffer.isEmpty()) { | ||
| initialize(); | ||
| } | ||
|
|
||
| if (delegate != null) { | ||
|
pvary marked this conversation as resolved.
|
||
| delegate.close(); | ||
| } | ||
|
|
||
| this.closed = true; | ||
| this.buffer = null; | ||
| } | ||
| } | ||
|
|
||
| private void initialize() { | ||
| delegate = appenderFactory.apply(buffer); | ||
| Preconditions.checkState(delegate != null, "appenderFactory must not return null"); | ||
| try { | ||
| buffer.forEach(delegate::add); | ||
| } finally { | ||
| buffer = null; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,227 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.io; | ||
|
|
||
| import static org.assertj.core.api.Assertions.assertThat; | ||
| import static org.assertj.core.api.Assertions.assertThatThrownBy; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.List; | ||
| import java.util.function.Function; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.avro.Avro; | ||
| import org.apache.iceberg.avro.AvroIterable; | ||
| import org.apache.iceberg.data.DataTestHelpers; | ||
| import org.apache.iceberg.data.GenericRecord; | ||
| import org.apache.iceberg.data.Record; | ||
| import org.apache.iceberg.data.avro.DataWriter; | ||
| import org.apache.iceberg.data.avro.PlannedDataReader; | ||
| import org.apache.iceberg.exceptions.RuntimeIOException; | ||
| import org.apache.iceberg.inmemory.InMemoryOutputFile; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.types.Types; | ||
| import org.junit.jupiter.api.BeforeEach; | ||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| public class TestBufferedFileAppender { | ||
|
|
||
| private static final Schema SCHEMA = | ||
| new Schema( | ||
| Types.NestedField.required(1, "id", Types.LongType.get()), | ||
| Types.NestedField.optional(2, "data", Types.StringType.get())); | ||
|
|
||
| private InMemoryOutputFile outputFile; | ||
| private GenericRecord record; | ||
|
|
||
| @BeforeEach | ||
| public void before() { | ||
| this.outputFile = new InMemoryOutputFile(); | ||
| this.record = GenericRecord.create(SCHEMA); | ||
| } | ||
|
|
||
| private Function<List<Record>, FileAppender<Record>> avroFactory(OutputFile out) { | ||
| return bufferedRows -> { | ||
| try { | ||
| return Avro.write(out) | ||
| .createWriterFunc(DataWriter::create) | ||
| .schema(SCHEMA) | ||
| .overwrite() | ||
| .build(); | ||
| } catch (IOException e) { | ||
| throw new RuntimeIOException(e); | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| private BufferedFileAppender<Record> createAppender(int bufferSize) { | ||
| return new BufferedFileAppender<>(bufferSize, avroFactory(outputFile), Record::copy); | ||
| } | ||
|
|
||
| private Record createRecord(long id, String data) { | ||
| return record.copy(ImmutableMap.of("id", id, "data", data)); | ||
| } | ||
|
|
||
| private List<Record> readBack() throws IOException { | ||
| try (AvroIterable<Record> reader = | ||
| Avro.read(outputFile.toInputFile()) | ||
| .project(SCHEMA) | ||
| .createResolvingReader(PlannedDataReader::create) | ||
| .build()) { | ||
| return Lists.newArrayList(reader); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testBufferFlushesOnThreshold() throws IOException { | ||
| BufferedFileAppender<Record> appender = createAppender(3); | ||
|
|
||
| appender.add(createRecord(1L, "a")); | ||
| appender.add(createRecord(2L, "b")); | ||
|
|
||
| // delegate not yet created, length should be 0 | ||
| assertThat(appender.length()).isEqualTo(0L); | ||
|
|
||
| appender.add(createRecord(3L, "c")); | ||
|
|
||
| // delegate created after 3rd row, length should be > 0 | ||
| assertThat(appender.length()).isGreaterThan(0L); | ||
|
|
||
| appender.add(createRecord(4L, "d")); | ||
| appender.add(createRecord(5L, "e")); | ||
| appender.close(); | ||
|
|
||
| List<Record> expected = | ||
| Lists.newArrayList( | ||
| createRecord(1L, "a"), | ||
| createRecord(2L, "b"), | ||
| createRecord(3L, "c"), | ||
| createRecord(4L, "d"), | ||
| createRecord(5L, "e")); | ||
| DataTestHelpers.assertEquals(SCHEMA.asStruct(), expected, readBack()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testCloseWithPartialBuffer() throws IOException { | ||
| BufferedFileAppender<Record> appender = createAppender(10); | ||
|
|
||
| appender.add(createRecord(1L, "a")); | ||
| appender.add(createRecord(2L, "b")); | ||
| appender.add(createRecord(3L, "c")); | ||
|
|
||
| // buffer not full yet | ||
| assertThat(appender.length()).isEqualTo(0L); | ||
|
|
||
| // close flushes partial buffer through factory | ||
| appender.close(); | ||
|
|
||
| List<Record> expected = | ||
| Lists.newArrayList(createRecord(1L, "a"), createRecord(2L, "b"), createRecord(3L, "c")); | ||
| DataTestHelpers.assertEquals(SCHEMA.asStruct(), expected, readBack()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testCopyFuncIsApplied() throws IOException { | ||
| BufferedFileAppender<Record> appender = createAppender(3); | ||
|
|
||
| // use a single mutable record, relying on copyFunc to snapshot it | ||
| record.set(0, 1L); | ||
| record.set(1, "first"); | ||
| appender.add(record); | ||
|
|
||
| record.set(0, 2L); | ||
| record.set(1, "second"); | ||
| appender.add(record); | ||
|
|
||
| record.set(0, 3L); | ||
| record.set(1, "third"); | ||
| appender.add(record); | ||
|
|
||
| appender.close(); | ||
|
|
||
| List<Record> expected = | ||
| Lists.newArrayList( | ||
| createRecord(1L, "first"), createRecord(2L, "second"), createRecord(3L, "third")); | ||
| DataTestHelpers.assertEquals(SCHEMA.asStruct(), expected, readBack()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testMetricsAfterClose() throws IOException { | ||
| BufferedFileAppender<Record> appender = createAppender(2); | ||
|
|
||
| appender.add(createRecord(1L, "a")); | ||
| appender.add(createRecord(2L, "b")); | ||
| appender.add(createRecord(3L, "c")); | ||
| appender.close(); | ||
|
|
||
| assertThat(appender.metrics()).isNotNull(); | ||
| assertThat(appender.metrics().recordCount()).isEqualTo(3L); | ||
| assertThat(appender.length()).isGreaterThan(0L); | ||
| } | ||
|
|
||
| @Test | ||
| public void testMetricsBeforeCloseThrows() throws IOException { | ||
| try (BufferedFileAppender<Record> appender = createAppender(10)) { | ||
| assertThatThrownBy(appender::metrics) | ||
| .isInstanceOf(IllegalStateException.class) | ||
| .hasMessage("Cannot return metrics for unclosed appender"); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testAddAfterCloseThrows() throws IOException { | ||
| try (BufferedFileAppender<Record> appender = createAppender(10)) { | ||
| appender.add(createRecord(1L, "a")); | ||
| appender.close(); | ||
|
|
||
| assertThatThrownBy(() -> appender.add(createRecord(2L, "b"))) | ||
| .isInstanceOf(IllegalStateException.class) | ||
| .hasMessage("Cannot add to a closed appender"); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testAddAllSpanningBuffer() throws IOException { | ||
| BufferedFileAppender<Record> appender = createAppender(2); | ||
|
|
||
| List<Record> records = | ||
| Lists.newArrayList( | ||
| createRecord(1L, "a"), | ||
| createRecord(2L, "b"), | ||
| createRecord(3L, "c"), | ||
| createRecord(4L, "d")); | ||
|
|
||
| appender.addAll(records); | ||
| appender.close(); | ||
|
|
||
| DataTestHelpers.assertEquals(SCHEMA.asStruct(), records, readBack()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testCloseWithNoData() throws IOException { | ||
| BufferedFileAppender<Record> appender = createAppender(10); | ||
| // close immediately with no data written | ||
| appender.close(); | ||
| // delegate was never created | ||
| assertThat(appender.length()).isEqualTo(0L); | ||
| assertThat(appender.metrics()).isNotNull(); | ||
| assertThat(appender.metrics().recordCount()).isEqualTo(0L); | ||
| assertThat(appender.splitOffsets()).isNull(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this? As we only create
BufferedFileAppenderinParquetFormatModel, rather than creating it in the engine as before.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing it means anyone using BufferedFileAppender outside of ParquetFormatModel must always provide a copy func even when their rows aren't reused. Happy to remove if that's ok for a future use case. I left it in there to be more flexible.