-
Notifications
You must be signed in to change notification settings - Fork 90
feat: implement DataWriter for Iceberg data files #552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,20 +19,118 @@ | |
|
|
||
| #include "iceberg/data/data_writer.h" | ||
|
|
||
| #include <map> | ||
|
|
||
| #include "iceberg/file_writer.h" | ||
| #include "iceberg/manifest/manifest_entry.h" | ||
| #include "iceberg/util/macros.h" | ||
|
|
||
| namespace iceberg { | ||
|
|
||
| class DataWriter::Impl { | ||
| public: | ||
| static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) { | ||
| WriterOptions writer_options{ | ||
| .path = options.path, | ||
| .schema = options.schema, | ||
| .io = options.io, | ||
| .properties = WriterProperties::FromMap(options.properties), | ||
| }; | ||
|
|
||
| ICEBERG_ASSIGN_OR_RAISE(auto writer, | ||
| WriterFactoryRegistry::Open(options.format, writer_options)); | ||
|
|
||
| return std::unique_ptr<Impl>(new Impl(std::move(options), std::move(writer))); | ||
| } | ||
|
|
||
| Status Write(ArrowArray* data) { | ||
| ICEBERG_DCHECK(writer_, "Writer not initialized"); | ||
| return writer_->Write(data); | ||
| } | ||
|
|
||
| Result<int64_t> Length() const { | ||
| ICEBERG_DCHECK(writer_, "Writer not initialized"); | ||
| return writer_->length(); | ||
| } | ||
|
|
||
| Status Close() { | ||
| ICEBERG_DCHECK(writer_, "Writer not initialized"); | ||
| if (closed_) { | ||
| // Idempotent: no-op if already closed | ||
| return {}; | ||
| } | ||
| ICEBERG_RETURN_UNEXPECTED(writer_->Close()); | ||
| closed_ = true; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this class address thread safety?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question! I've added explicit documentation that this class is not thread-safe:
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think a single writer (or reader) should support thread safety so it is fine not to add comment like this.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @wgtmac out of curiosity for my own knowledge, what guarantees that a single writer/reader will be using the class?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These file writers are supposed to be used by a single write task, which for example can be an unit of a table sink operator in a sql job plan. Usually the writer is responsible for partitioned (and sometimes sorted) data chunks.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. Removed the thread safety comment from the header. |
||
| return {}; | ||
| } | ||
|
|
||
| Result<FileWriter::WriteResult> Metadata() { | ||
| ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer"); | ||
|
|
||
| ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics()); | ||
| ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length()); | ||
| auto split_offsets = writer_->split_offsets(); | ||
|
|
||
| // Serialize literal bounds to binary format | ||
| std::map<int32_t, std::vector<uint8_t>> lower_bounds_map; | ||
| for (const auto& [col_id, literal] : metrics.lower_bounds) { | ||
| ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize()); | ||
| lower_bounds_map[col_id] = std::move(serialized); | ||
| } | ||
| std::map<int32_t, std::vector<uint8_t>> upper_bounds_map; | ||
| for (const auto& [col_id, literal] : metrics.upper_bounds) { | ||
| ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize()); | ||
| upper_bounds_map[col_id] = std::move(serialized); | ||
| } | ||
|
|
||
| auto data_file = std::make_shared<DataFile>(DataFile{ | ||
| .content = DataFile::Content::kData, | ||
| .file_path = options_.path, | ||
| .file_format = options_.format, | ||
| .partition = options_.partition, | ||
| .record_count = metrics.row_count.value_or(-1), | ||
| .file_size_in_bytes = length, | ||
| .column_sizes = {metrics.column_sizes.begin(), metrics.column_sizes.end()}, | ||
| .value_counts = {metrics.value_counts.begin(), metrics.value_counts.end()}, | ||
| .null_value_counts = {metrics.null_value_counts.begin(), | ||
| metrics.null_value_counts.end()}, | ||
| .nan_value_counts = {metrics.nan_value_counts.begin(), | ||
| metrics.nan_value_counts.end()}, | ||
| .lower_bounds = std::move(lower_bounds_map), | ||
| .upper_bounds = std::move(upper_bounds_map), | ||
| .split_offsets = std::move(split_offsets), | ||
| .sort_order_id = options_.sort_order_id, | ||
| }); | ||
|
|
||
| FileWriter::WriteResult result; | ||
| result.data_files.push_back(std::move(data_file)); | ||
| return result; | ||
| } | ||
|
|
||
| private: | ||
| Impl(DataWriterOptions options, std::unique_ptr<Writer> writer) | ||
| : options_(std::move(options)), writer_(std::move(writer)) {} | ||
|
|
||
| DataWriterOptions options_; | ||
| std::unique_ptr<Writer> writer_; | ||
| bool closed_ = false; | ||
| }; | ||
|
|
||
| DataWriter::DataWriter(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {} | ||
|
|
||
| DataWriter::~DataWriter() = default; | ||
|
|
||
| Status DataWriter::Write(ArrowArray* data) { return NotImplemented(""); } | ||
| Result<std::unique_ptr<DataWriter>> DataWriter::Make(const DataWriterOptions& options) { | ||
| ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options)); | ||
| return std::unique_ptr<DataWriter>(new DataWriter(std::move(impl))); | ||
| } | ||
|
|
||
| Status DataWriter::Write(ArrowArray* data) { return impl_->Write(data); } | ||
|
|
||
| Result<int64_t> DataWriter::Length() const { return NotImplemented(""); } | ||
| Result<int64_t> DataWriter::Length() const { return impl_->Length(); } | ||
|
|
||
| Status DataWriter::Close() { return NotImplemented(""); } | ||
| Status DataWriter::Close() { return impl_->Close(); } | ||
|
|
||
| Result<FileWriter::WriteResult> DataWriter::Metadata() { return NotImplemented(""); } | ||
| Result<FileWriter::WriteResult> DataWriter::Metadata() { return impl_->Metadata(); } | ||
|
|
||
| } // namespace iceberg | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,251 @@ | |
| * under the License. | ||
| */ | ||
|
|
||
| #include "iceberg/data/data_writer.h" | ||
|
|
||
| #include <arrow/array.h> | ||
| #include <arrow/c/bridge.h> | ||
| #include <arrow/json/from_string.h> | ||
| #include <gmock/gmock.h> | ||
| #include <gtest/gtest.h> | ||
|
|
||
| namespace iceberg {} // namespace iceberg | ||
| #include "iceberg/arrow/arrow_fs_file_io_internal.h" | ||
| #include "iceberg/avro/avro_register.h" | ||
| #include "iceberg/file_format.h" | ||
| #include "iceberg/manifest/manifest_entry.h" | ||
| #include "iceberg/parquet/parquet_register.h" | ||
| #include "iceberg/partition_spec.h" | ||
| #include "iceberg/row/partition_values.h" | ||
| #include "iceberg/schema.h" | ||
| #include "iceberg/schema_field.h" | ||
| #include "iceberg/schema_internal.h" | ||
| #include "iceberg/test/matchers.h" | ||
| #include "iceberg/type.h" | ||
| #include "iceberg/util/macros.h" | ||
|
|
||
| namespace iceberg { | ||
|
|
||
| using ::testing::HasSubstr; | ||
|
|
||
| class DataWriterTest : public ::testing::Test { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we try to consolidate the test cases since each of them only test a tiny api with repeated boilerplate of creating writer and writing data? This may lead to test cases explosion if more and more cases are like this.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Added
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Added MakeDefaultOptions() and WriteTestDataToWriter() helpers. Reduced test cases. |
||
| protected: | ||
| static void SetUpTestSuite() { | ||
| parquet::RegisterAll(); | ||
| avro::RegisterAll(); | ||
| } | ||
|
|
||
| void SetUp() override { | ||
| file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); | ||
| schema_ = std::make_shared<Schema>( | ||
| std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()), | ||
| SchemaField::MakeOptional(2, "name", string())}); | ||
| partition_spec_ = PartitionSpec::Unpartitioned(); | ||
| } | ||
|
|
||
| DataWriterOptions MakeDefaultOptions( | ||
| std::optional<int32_t> sort_order_id = std::nullopt, | ||
| PartitionValues partition = PartitionValues{}) { | ||
| return DataWriterOptions{ | ||
| .path = "test_data.parquet", | ||
| .schema = schema_, | ||
| .spec = partition_spec_, | ||
| .partition = std::move(partition), | ||
| .format = FileFormatType::kParquet, | ||
| .io = file_io_, | ||
| .sort_order_id = sort_order_id, | ||
| .properties = {{"write.parquet.compression-codec", "uncompressed"}}, | ||
| }; | ||
| } | ||
|
|
||
| std::shared_ptr<::arrow::Array> CreateTestData() { | ||
| ArrowSchema arrow_c_schema; | ||
| ICEBERG_THROW_NOT_OK(ToArrowSchema(*schema_, &arrow_c_schema)); | ||
| auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); | ||
|
|
||
| return ::arrow::json::ArrayFromJSONString( | ||
| ::arrow::struct_(arrow_schema->fields()), | ||
| R"([[1, "Alice"], [2, "Bob"], [3, "Charlie"]])") | ||
| .ValueOrDie(); | ||
| } | ||
|
|
||
| void WriteTestDataToWriter(DataWriter* writer) { | ||
| auto test_data = CreateTestData(); | ||
| ArrowArray arrow_array; | ||
| ASSERT_TRUE(::arrow::ExportArray(*test_data, &arrow_array).ok()); | ||
| ASSERT_THAT(writer->Write(&arrow_array), IsOk()); | ||
| } | ||
|
|
||
| std::shared_ptr<FileIO> file_io_; | ||
| std::shared_ptr<Schema> schema_; | ||
| std::shared_ptr<PartitionSpec> partition_spec_; | ||
| }; | ||
|
|
||
| class DataWriterFormatTest | ||
| : public DataWriterTest, | ||
| public ::testing::WithParamInterface<std::pair<FileFormatType, std::string>> {}; | ||
|
|
||
| TEST_P(DataWriterFormatTest, CreateWithFormat) { | ||
| auto [format, path] = GetParam(); | ||
| DataWriterOptions options{ | ||
| .path = path, | ||
| .schema = schema_, | ||
| .spec = partition_spec_, | ||
| .partition = PartitionValues{}, | ||
| .format = format, | ||
| .io = file_io_, | ||
| .properties = | ||
| format == FileFormatType::kParquet | ||
| ? std::unordered_map<std::string, | ||
| std::string>{{"write.parquet.compression-codec", | ||
| "uncompressed"}} | ||
| : std::unordered_map<std::string, std::string>{}, | ||
| }; | ||
|
|
||
| auto writer_result = DataWriter::Make(options); | ||
| ASSERT_THAT(writer_result, IsOk()); | ||
| auto writer = std::move(writer_result.value()); | ||
| ASSERT_NE(writer, nullptr); | ||
| } | ||
|
|
||
| INSTANTIATE_TEST_SUITE_P( | ||
| FormatTypes, DataWriterFormatTest, | ||
| ::testing::Values(std::make_pair(FileFormatType::kParquet, "test_data.parquet"), | ||
| std::make_pair(FileFormatType::kAvro, "test_data.avro"))); | ||
|
|
||
| TEST_F(DataWriterTest, WriteAndClose) { | ||
| auto writer_result = DataWriter::Make(MakeDefaultOptions()); | ||
| ASSERT_THAT(writer_result, IsOk()); | ||
| auto writer = std::move(writer_result.value()); | ||
|
|
||
| // Write data | ||
| WriteTestDataToWriter(writer.get()); | ||
|
|
||
| // Length should be greater than 0 after write | ||
| auto length_result = writer->Length(); | ||
| ASSERT_THAT(length_result, IsOk()); | ||
| EXPECT_GT(length_result.value(), 0); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: check the size of the data passed to the write function?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. |
||
|
|
||
| // Close | ||
| ASSERT_THAT(writer->Close(), IsOk()); | ||
| } | ||
|
|
||
| TEST_F(DataWriterTest, MetadataAfterClose) { | ||
| auto writer_result = DataWriter::Make(MakeDefaultOptions()); | ||
| ASSERT_THAT(writer_result, IsOk()); | ||
| auto writer = std::move(writer_result.value()); | ||
|
|
||
| WriteTestDataToWriter(writer.get()); | ||
| ASSERT_THAT(writer->Close(), IsOk()); | ||
|
|
||
| // Get metadata | ||
| auto metadata_result = writer->Metadata(); | ||
| ASSERT_THAT(metadata_result, IsOk()); | ||
|
|
||
| const auto& write_result = metadata_result.value(); | ||
| ASSERT_EQ(write_result.data_files.size(), 1); | ||
|
|
||
| const auto& data_file = write_result.data_files[0]; | ||
| EXPECT_EQ(data_file->content, DataFile::Content::kData); | ||
| EXPECT_EQ(data_file->file_path, "test_data.parquet"); | ||
| EXPECT_EQ(data_file->file_format, FileFormatType::kParquet); | ||
| EXPECT_GT(data_file->file_size_in_bytes, 0); | ||
|
|
||
| // Metrics availability depends on the underlying writer implementation | ||
| EXPECT_GE(data_file->column_sizes.size(), 0); | ||
| EXPECT_GE(data_file->value_counts.size(), 0); | ||
| EXPECT_GE(data_file->null_value_counts.size(), 0); | ||
| } | ||
|
|
||
| TEST_F(DataWriterTest, MetadataBeforeCloseReturnsError) { | ||
| auto writer_result = DataWriter::Make(MakeDefaultOptions()); | ||
| ASSERT_THAT(writer_result, IsOk()); | ||
| auto writer = std::move(writer_result.value()); | ||
|
|
||
| // Try to get metadata before closing | ||
| auto metadata_result = writer->Metadata(); | ||
| ASSERT_THAT(metadata_result, IsError(ErrorKind::kValidationFailed)); | ||
| EXPECT_THAT(metadata_result, | ||
| HasErrorMessage("Cannot get metadata before closing the writer")); | ||
| } | ||
|
|
||
| TEST_F(DataWriterTest, CloseIsIdempotent) { | ||
| auto writer_result = DataWriter::Make(MakeDefaultOptions()); | ||
| ASSERT_THAT(writer_result, IsOk()); | ||
| auto writer = std::move(writer_result.value()); | ||
|
|
||
| WriteTestDataToWriter(writer.get()); | ||
|
|
||
| ASSERT_THAT(writer->Close(), IsOk()); | ||
| ASSERT_THAT(writer->Close(), IsOk()); | ||
| ASSERT_THAT(writer->Close(), IsOk()); | ||
| } | ||
|
|
||
| TEST_F(DataWriterTest, SortOrderIdInMetadata) { | ||
| // Test with explicit sort order id | ||
| { | ||
| const int32_t sort_order_id = 42; | ||
| auto writer_result = DataWriter::Make(MakeDefaultOptions(sort_order_id)); | ||
| ASSERT_THAT(writer_result, IsOk()); | ||
| auto writer = std::move(writer_result.value()); | ||
|
|
||
| WriteTestDataToWriter(writer.get()); | ||
| ASSERT_THAT(writer->Close(), IsOk()); | ||
|
|
||
| auto metadata_result = writer->Metadata(); | ||
| ASSERT_THAT(metadata_result, IsOk()); | ||
| const auto& data_file = metadata_result.value().data_files[0]; | ||
| ASSERT_TRUE(data_file->sort_order_id.has_value()); | ||
| EXPECT_EQ(data_file->sort_order_id.value(), sort_order_id); | ||
| } | ||
|
|
||
| // Test without sort order id (should be nullopt) | ||
| { | ||
| auto writer_result = DataWriter::Make(MakeDefaultOptions()); | ||
| ASSERT_THAT(writer_result, IsOk()); | ||
| auto writer = std::move(writer_result.value()); | ||
|
|
||
| WriteTestDataToWriter(writer.get()); | ||
| ASSERT_THAT(writer->Close(), IsOk()); | ||
|
|
||
| auto metadata_result = writer->Metadata(); | ||
| ASSERT_THAT(metadata_result, IsOk()); | ||
| const auto& data_file = metadata_result.value().data_files[0]; | ||
| EXPECT_FALSE(data_file->sort_order_id.has_value()); | ||
| } | ||
| } | ||
|
|
||
| TEST_F(DataWriterTest, PartitionValuesPreserved) { | ||
| PartitionValues partition_values({Literal::Int(42), Literal::String("test")}); | ||
|
|
||
| auto writer_result = | ||
| DataWriter::Make(MakeDefaultOptions(std::nullopt, partition_values)); | ||
| ASSERT_THAT(writer_result, IsOk()); | ||
| auto writer = std::move(writer_result.value()); | ||
|
|
||
| WriteTestDataToWriter(writer.get()); | ||
| ASSERT_THAT(writer->Close(), IsOk()); | ||
|
|
||
| auto metadata_result = writer->Metadata(); | ||
| ASSERT_THAT(metadata_result, IsOk()); | ||
| const auto& data_file = metadata_result.value().data_files[0]; | ||
|
|
||
| EXPECT_EQ(data_file->partition.num_fields(), partition_values.num_fields()); | ||
| EXPECT_EQ(data_file->partition.num_fields(), 2); | ||
| } | ||
|
|
||
| TEST_F(DataWriterTest, WriteMultipleBatches) { | ||
| auto writer_result = DataWriter::Make(MakeDefaultOptions()); | ||
| ASSERT_THAT(writer_result, IsOk()); | ||
| auto writer = std::move(writer_result.value()); | ||
|
|
||
| WriteTestDataToWriter(writer.get()); | ||
| WriteTestDataToWriter(writer.get()); | ||
| ASSERT_THAT(writer->Close(), IsOk()); | ||
|
|
||
| auto metadata_result = writer->Metadata(); | ||
| ASSERT_THAT(metadata_result, IsOk()); | ||
| const auto& data_file = metadata_result.value().data_files[0]; | ||
| EXPECT_GT(data_file->file_size_in_bytes, 0); | ||
| } | ||
|
|
||
| } // namespace iceberg | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could see a case for making close idempotent, is there any strong reason why we want to return this error instead of no op for example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed