feat: implement DataWriter for Iceberg data files#552
feat: implement DataWriter for Iceberg data files#552shangxinli wants to merge 3 commits intoapache:mainfrom
Conversation
8944a75 to
a201953
Compare
src/iceberg/data/data_writer.cc
Outdated
|
|
||
| ICEBERG_ASSIGN_OR_RAISE(writer_, | ||
| WriterFactoryRegistry::Open(options_.format, writer_options)); | ||
| return {}; |
There was a problem hiding this comment.
It is odd that an empty structure is always returned. Also, since this is initialization why not doing in the ctor?
There was a problem hiding this comment.
Refactored the initialization logic
| if (closed_) { | ||
| return InvalidArgument("Writer already closed"); | ||
| } |
There was a problem hiding this comment.
I could see a case for making close idempotent, is there any strong reason why we want to return this error instead of no op for example?
| return InvalidArgument("Writer already closed"); | ||
| } | ||
| ICEBERG_RETURN_UNEXPECTED(writer_->Close()); | ||
| closed_ = true; |
There was a problem hiding this comment.
Should this class address thread safety?
There was a problem hiding this comment.
Good question! I've added explicit documentation that this class is not thread-safe:
There was a problem hiding this comment.
I don't think a single writer (or reader) should support thread safety so it is fine not to add comment like this.
There was a problem hiding this comment.
@wgtmac out of curiosity for my own knowledge, what guarantees that a single writer/reader will be using the class?
There was a problem hiding this comment.
These file writers are supposed to be used by a single write task, which for example can be an unit of a table sink operator in a sql job plan. Usually the writer is responsible for partitioned (and sometimes sorted) data chunks.
There was a problem hiding this comment.
Agreed. Removed the thread safety comment from the header.
src/iceberg/test/data_writer_test.cc
Outdated
| TEST_F(DataWriterTest, CreateWithParquetFormat) { | ||
| DataWriterOptions options{ | ||
| .path = "test_data.parquet", | ||
| .schema = schema_, | ||
| .spec = partition_spec_, | ||
| .partition = PartitionValues{}, | ||
| .format = FileFormatType::kParquet, | ||
| .io = file_io_, | ||
| .properties = {{"write.parquet.compression-codec", "uncompressed"}}, | ||
| }; | ||
|
|
||
| auto writer_result = DataWriter::Make(options); | ||
| ASSERT_THAT(writer_result, IsOk()); | ||
| auto writer = std::move(writer_result.value()); | ||
| ASSERT_NE(writer, nullptr); | ||
| } | ||
|
|
||
| TEST_F(DataWriterTest, CreateWithAvroFormat) { | ||
| DataWriterOptions options{ | ||
| .path = "test_data.avro", | ||
| .schema = schema_, | ||
| .spec = partition_spec_, | ||
| .partition = PartitionValues{}, | ||
| .format = FileFormatType::kAvro, | ||
| .io = file_io_, | ||
| }; | ||
|
|
||
| auto writer_result = DataWriter::Make(options); | ||
| ASSERT_THAT(writer_result, IsOk()); | ||
| auto writer = std::move(writer_result.value()); | ||
| ASSERT_NE(writer, nullptr); | ||
| } |
There was a problem hiding this comment.
nit: The two tests are quite similar, it is probably possible to leverage a function to reduce duplication
There was a problem hiding this comment.
Consolidated the two tests using parameterized testing.
| // Check length before close | ||
| auto length_result = writer->Length(); | ||
| ASSERT_THAT(length_result, IsOk()); | ||
| EXPECT_GT(length_result.value(), 0); |
There was a problem hiding this comment.
nit: check the size of the data passed to the write function?
src/iceberg/data/data_writer.cc
Outdated
| if (!writer_) { | ||
| return InvalidArgument("Writer not initialized"); | ||
| } |
There was a problem hiding this comment.
| if (!writer_) { | |
| return InvalidArgument("Writer not initialized"); | |
| } | |
| ICEBERG_PRECHECK(writer_, "Writer not initialized"); |
nit, this should make the code shorter.
There was a problem hiding this comment.
Replaced all manual null checks with ICEBERG_PRECHECK
src/iceberg/data/data_writer.cc
Outdated
| } | ||
|
|
||
| Result<FileWriter::WriteResult> Metadata() { | ||
| if (!closed_) { |
There was a problem hiding this comment.
nit: use ICEBERG_CHECK here
src/iceberg/test/data_writer_test.cc
Outdated
| EXPECT_GT(length.value(), 0); | ||
| } | ||
|
|
||
| } // namespace |
There was a problem hiding this comment.
nit: move this closing namespace curly before the first TEST_F?
90d324e to
153d763
Compare
Implements DataWriter class for writing Iceberg data files as part of issue apache#441 (task 2). Implementation: - Static factory method DataWriter::Make() for creating writer instances - Support for Parquet and Avro file formats via WriterFactoryRegistry - Complete DataFile metadata generation including partition info, column statistics, serialized bounds, and sort order ID - Proper lifecycle management with Write/Close/Metadata methods - Idempotent Close() - multiple calls succeed (no-op after first) - PIMPL idiom for ABI stability - Not thread-safe (documented) Tests: - 13 comprehensive unit tests including parameterized format tests - Coverage: creation, write/close lifecycle, metadata generation, error handling, feature validation, and data size verification - All tests passing (13/13) Related to apache#441
153d763 to
147f25b
Compare
src/iceberg/data/data_writer.cc
Outdated
| class DataWriter::Impl { | ||
| public: | ||
| static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) { | ||
| WriterOptions writer_options; |
There was a problem hiding this comment.
nit: use aggregate initialization for writer_options
There was a problem hiding this comment.
Done. Changed to use aggregate initialization for WriterOptions.
There was a problem hiding this comment.
Done. Changed to use aggregate initialization for WriterOptions.
src/iceberg/data/data_writer.cc
Outdated
| } | ||
|
|
||
| Status Write(ArrowArray* data) { | ||
| ICEBERG_PRECHECK(writer_, "Writer not initialized"); |
There was a problem hiding this comment.
Will this check ever fail? If not, should we remove the check or use ICEBERG_DCHECK instead? Same question for below.
There was a problem hiding this comment.
Good point. Since writer_ is always initialized in Make() (the constructor is private and only called after successful writer creation), the check can never fail. Changed to ICEBERG_DCHECK for all three usages.
There was a problem hiding this comment.
Changed to ICEBERG_DCHECK
| return InvalidArgument("Writer already closed"); | ||
| } | ||
| ICEBERG_RETURN_UNEXPECTED(writer_->Close()); | ||
| closed_ = true; |
There was a problem hiding this comment.
I don't think a single writer (or reader) should support thread safety so it is fine not to add comment like this.
src/iceberg/data/data_writer.cc
Outdated
| } | ||
|
|
||
| Result<FileWriter::WriteResult> Metadata() { | ||
| ICEBERG_PRECHECK(closed_, "Cannot get metadata before closing the writer"); |
There was a problem hiding this comment.
| ICEBERG_PRECHECK(closed_, "Cannot get metadata before closing the writer"); | |
| ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer"); |
We should return invalid state instead of invalid argument in this case.
There was a problem hiding this comment.
Done. Changed to ICEBERG_CHECK which returns ValidationFailed instead of InvalidArgument. Updated the test expectation accordingly.
src/iceberg/data/data_writer.cc
Outdated
| data_file->file_path = options_.path; | ||
| data_file->file_format = options_.format; | ||
| data_file->partition = options_.partition; | ||
| data_file->record_count = metrics.row_count.value_or(0); |
There was a problem hiding this comment.
| data_file->record_count = metrics.row_count.value_or(0); | |
| data_file->record_count = metrics.row_count.value_or(-1); |
Java impl uses -1 when row count is unavailable.
There was a problem hiding this comment.
Done. Changed to value_or(-1) to match the Java implementation.
src/iceberg/data/data_writer.cc
Outdated
| auto split_offsets = writer_->split_offsets(); | ||
|
|
||
| auto data_file = std::make_shared<DataFile>(); | ||
| data_file->content = DataFile::Content::kData; |
There was a problem hiding this comment.
nit: use aggregate initialization
There was a problem hiding this comment.
Done. Changed to use aggregate initialization for DataFile. Also used range constructors for the metrics maps (e.g. {metrics.column_sizes.begin(), metrics.column_sizes.end()}) to simplify the conversion. The bounds maps still need explicit loops due to the serialization step.
There was a problem hiding this comment.
Done. Changed to use aggregate initialization for DataFile. Also used range constructors for the metrics maps...
src/iceberg/data/data_writer.cc
Outdated
|
|
||
| // Convert metrics maps from unordered_map to map | ||
| for (const auto& [col_id, size] : metrics.column_sizes) { | ||
| data_file->column_sizes[col_id] = size; |
There was a problem hiding this comment.
Do you think it makes sense to change DataFile and Metrics classes to use std::map or std::unordered_map consistently so we don't need to use a for-loop here?
cc @zhjwpku
There was a problem hiding this comment.
That would be a nice cleanup. For now I've simplified the conversion by using range constructors instead of explicit for-loops. Changing Metrics and DataFile to use the same map type would be a good follow-up.
There was a problem hiding this comment.
That would be a nice cleanup. For now I've simplified... Changing Metrics and DataFile to use the same map type would be a good follow-up.
src/iceberg/test/data_writer_test.cc
Outdated
| SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()), | ||
| SchemaField::MakeOptional(2, "name", std::make_shared<StringType>())}); |
There was a problem hiding this comment.
| SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()), | |
| SchemaField::MakeOptional(2, "name", std::make_shared<StringType>())}); | |
| SchemaField::MakeRequired(1, "id", int32()), | |
| SchemaField::MakeOptional(2, "name", string())}); |
There was a problem hiding this comment.
Done. Changed to use int32() and string() factory functions.
|
|
||
| using ::testing::HasSubstr; | ||
|
|
||
| class DataWriterTest : public ::testing::Test { |
There was a problem hiding this comment.
Can we try to consolidate the test cases since each of them only test a tiny api with repeated boilerplate of creating writer and writing data? This may lead to test cases explosion if more and more cases are like this.
There was a problem hiding this comment.
Done. Added MakeDefaultOptions() and WriteTestDataToWriter() helpers to reduce boilerplate. Consolidated related tests: merged WriteAndClose+LengthIncreasesAfterWrite, GetMetadataAfterClose+MetadataContainsColumnMetrics, and SortOrderIdPreserved+SortOrderIdNullByDefault. Reduced from 10 test cases to 7.
There was a problem hiding this comment.
Done. Added MakeDefaultOptions() and WriteTestDataToWriter() helpers. Reduced test cases.
- Use aggregate initialization for WriterOptions and DataFile - Change ICEBERG_PRECHECK(writer_) to ICEBERG_DCHECK (can never fail) - Use ICEBERG_CHECK for closed state check (returns ValidationFailed) - Use value_or(-1) for missing row count to match Java impl - Use range constructors for metrics map conversion - Remove unnecessary thread safety comment - Use int32()/string() factory functions in tests - Consolidate test cases and add helpers to reduce boilerplate
Implements DataWriter class for writing Iceberg data files as part of issue #441 (task 2).
Implementation:
Related to #441