diff --git a/LICENSE b/LICENSE index 606e7d4d..05727035 100644 --- a/LICENSE +++ b/LICENSE @@ -268,6 +268,7 @@ This product includes code from Apache Iceberg C++. * src/paimon/format/avro/avro_direct_decoder.h * src/paimon/format/avro/avro_direct_encoder.cpp * src/paimon/format/avro/avro_direct_encoder.h +* Avro input stream in src/paimon/format/avro/avro_direct_decoder.cpp Copyright: 2024-2025 The Apache Software Foundation. Home page: https://iceberg.apache.org/ diff --git a/src/paimon/core/utils/duration.h b/src/paimon/core/utils/duration.h index d5b64dbb..5f7b9bbd 100644 --- a/src/paimon/core/utils/duration.h +++ b/src/paimon/core/utils/duration.h @@ -32,9 +32,9 @@ class Duration { } uint64_t Reset() { - uint64_t dura = Get(); + uint64_t duration = Get(); start_ = std::chrono::high_resolution_clock::now(); - return dura; + return duration; } private: diff --git a/src/paimon/format/avro/avro_direct_encoder.cpp b/src/paimon/format/avro/avro_direct_encoder.cpp index c2c39ad9..20c63fea 100644 --- a/src/paimon/format/avro/avro_direct_encoder.cpp +++ b/src/paimon/format/avro/avro_direct_encoder.cpp @@ -289,7 +289,7 @@ Status AvroDirectEncoder::EncodeArrowToAvro(const ::avro::NodePtr& avro_node, element_node->leaves() != 2)) { return Status::Invalid( fmt::format("Expected AVRO_RECORD for map key-value pair, got {}", - AvroUtils::ToString(avro_node))); + AvroUtils::ToString(element_node))); } const auto& map_array = diff --git a/src/paimon/format/avro/avro_input_stream_impl.cpp b/src/paimon/format/avro/avro_input_stream_impl.cpp index 67e1df3b..30c26528 100644 --- a/src/paimon/format/avro/avro_input_stream_impl.cpp +++ b/src/paimon/format/avro/avro_input_stream_impl.cpp @@ -14,9 +14,13 @@ * limitations under the License. */ +// Adapted from Apache Iceberg C++ +// https://github.com/apache/iceberg-cpp/blob/main/src/iceberg/avro/avro_stream_internal.cc + #include "paimon/format/avro/avro_input_stream_impl.h" #include +#include #include #include @@ -39,82 +43,85 @@ AvroInputStreamImpl::AvroInputStreamImpl(const std::shared_ptr& pool) : pool_(pool), + in_(input_stream), buffer_size_(buffer_size), total_length_(total_length), - buffer_(reinterpret_cast(pool_->Malloc(buffer_size))), - in_(input_stream), - byte_count_(0), - next_(buffer_), - available_(0) {} + buffer_(reinterpret_cast(pool_->Malloc(buffer_size))) {} AvroInputStreamImpl::~AvroInputStreamImpl() { pool_->Free(buffer_, buffer_size_); } -bool AvroInputStreamImpl::next(const uint8_t** data, size_t* size) { - if (available_ == 0 && !fill()) { - return false; +bool AvroInputStreamImpl::next(const uint8_t** data, size_t* len) { + // Return all unconsumed data in the buffer + if (buffer_pos_ < available_bytes_) { + *data = buffer_ + buffer_pos_; + *len = available_bytes_ - buffer_pos_; + byte_count_ += available_bytes_ - buffer_pos_; + buffer_pos_ = available_bytes_; + return true; + } + + // Read from the input stream when the buffer is empty + uint64_t remaining = total_length_ - stream_pos_; + if (remaining == 0) { + return false; // eof + } + auto read_length = + in_->Read(reinterpret_cast(buffer_), std::min(buffer_size_, remaining)); + if (!read_length.ok()) { + throw ::avro::Exception("Read failed: {}", read_length.status().ToString()); } - *data = next_; - *size = available_; - next_ += available_; - byte_count_ += available_; - available_ = 0; + available_bytes_ = read_length.value(); + stream_pos_ += available_bytes_; + buffer_pos_ = 0; + + // Return the whole buffer + *data = buffer_; + *len = available_bytes_; + byte_count_ += available_bytes_; + buffer_pos_ = available_bytes_; + return true; } void AvroInputStreamImpl::backup(size_t len) { - next_ -= len; - available_ += len; + if (len > buffer_pos_) { + throw ::avro::Exception("Cannot backup {} bytes, only {} bytes available", len, + buffer_pos_); + } + + buffer_pos_ -= len; byte_count_ -= len; } void AvroInputStreamImpl::skip(size_t len) { - while (len > 0) { - if (available_ == 0) { - auto s = in_->Seek(len, paimon::FS_SEEK_CUR); - if (!s.ok()) { - throw ::avro::Exception(s.ToString()); - } - byte_count_ += len; - total_read_len_ += len; - return; - } - size_t n = std::min(available_, len); - available_ -= n; - next_ += n; - len -= n; - byte_count_ += n; + // The range to skip is within the buffer + if (buffer_pos_ + len <= available_bytes_) { + buffer_pos_ += len; + byte_count_ += len; + return; } + seek(byte_count_ + len); } -void AvroInputStreamImpl::seek(int64_t position) { - auto s = in_->Seek(position - byte_count_ - available_, paimon::FS_SEEK_CUR); - if (!s.ok()) { - throw ::avro::Exception(s.ToString()); - } - byte_count_ = position; - total_read_len_ = position; - available_ = 0; +size_t AvroInputStreamImpl::byteCount() const { + return byte_count_; } -bool AvroInputStreamImpl::fill() { - if (static_cast(total_read_len_) >= total_length_) { - // eof - return false; - } - Result actual_len = in_->Read(reinterpret_cast(buffer_), - std::min(buffer_size_, total_length_ - total_read_len_)); - if (!actual_len.ok()) { - throw ::avro::Exception(actual_len.status().ToString()); +void AvroInputStreamImpl::seek(int64_t position) { + if (static_cast(position) > total_length_) { + throw ::avro::Exception("Cannot seek to {}, total length is {}", position, total_length_); } - total_read_len_ += actual_len.value(); - if (actual_len.value() != 0) { - next_ = buffer_; - available_ = actual_len.value(); - return true; + auto status = in_->Seek(position, SeekOrigin::FS_SEEK_SET); + if (!status.ok()) { + throw ::avro::Exception("Failed to seek to {}, got {}", position, status.ToString()); } - return false; + + stream_pos_ = position; + buffer_pos_ = 0; + available_bytes_ = 0; + byte_count_ = position; } } // namespace paimon::avro diff --git a/src/paimon/format/avro/avro_input_stream_impl.h b/src/paimon/format/avro/avro_input_stream_impl.h index 73bd6247..01474b17 100644 --- a/src/paimon/format/avro/avro_input_stream_impl.h +++ b/src/paimon/format/avro/avro_input_stream_impl.h @@ -39,29 +39,26 @@ class AvroInputStreamImpl : public ::avro::SeekableInputStream { ~AvroInputStreamImpl() override; - bool next(const uint8_t** data, size_t* size) override; + bool next(const uint8_t** data, size_t* len) override; void backup(size_t len) override; void skip(size_t len) override; - size_t byteCount() const override { - return byte_count_; - } + size_t byteCount() const override; void seek(int64_t position) override; private: AvroInputStreamImpl(const std::shared_ptr& input_stream, size_t buffer_size, const uint64_t length, const std::shared_ptr& pool); - bool fill(); std::shared_ptr pool_; + std::shared_ptr in_; const size_t buffer_size_; const uint64_t total_length_; uint8_t* const buffer_; - std::shared_ptr in_; - size_t byte_count_; - uint8_t* next_; - size_t available_; - int32_t total_read_len_ = 0; + size_t byte_count_ = 0; // bytes position in the avro input stream + size_t stream_pos_ = 0; // current position in the paimon input stream + size_t buffer_pos_ = 0; // next position to read in the buffer + size_t available_bytes_ = 0; // bytes available in the buffer }; } // namespace paimon::avro diff --git a/src/paimon/format/avro/avro_input_stream_impl_test.cpp b/src/paimon/format/avro/avro_input_stream_impl_test.cpp index 54342e1d..58b54006 100644 --- a/src/paimon/format/avro/avro_input_stream_impl_test.cpp +++ b/src/paimon/format/avro/avro_input_stream_impl_test.cpp @@ -105,9 +105,21 @@ TEST(AvroInputStreamImplTest, TestSkip) { ASSERT_TRUE(stream->next(&data, &size)); ASSERT_EQ(size, 5); ASSERT_EQ(std::string(reinterpret_cast(data), size), "fghij"); - stream->skip(5); - ASSERT_EQ(stream->byteCount(), 15); - ASSERT_FALSE(stream->next(&data, &size)); + ASSERT_THROW(stream->skip(5), ::avro::Exception); // already eof, cannot skip more + ASSERT_EQ(stream->byteCount(), 10); + ASSERT_FALSE(stream->next(&data, &size)); // reach eof + + ASSERT_THROW(stream->backup(7), ::avro::Exception); // buffer item is 5, cannot backup 7 + stream->backup(4); + ASSERT_EQ(stream->byteCount(), 6); + stream->skip(2); // skip 2 bytes from the available buffer data + ASSERT_EQ(stream->byteCount(), 8); + + // verify we can read the remaining 2 bytes from buffer + ASSERT_TRUE(stream->next(&data, &size)); + ASSERT_EQ(size, 2); + ASSERT_EQ(std::string(reinterpret_cast(data), size), "ij"); + ASSERT_EQ(stream->byteCount(), 10); } TEST(AvroInputStreamImplTest, TestSkipWithAvailableData) { @@ -169,6 +181,10 @@ TEST(AvroInputStreamImplTest, TestSeek) { ASSERT_EQ(size, 5); ASSERT_EQ(std::string(reinterpret_cast(data), size), "abcde"); stream->seek(2); + ASSERT_EQ(stream->byteCount(), 2); + + // after seek, buffer will be cleared, cannot backup + ASSERT_THROW(stream->backup(2), ::avro::Exception); ASSERT_TRUE(stream->next(&data, &size)); ASSERT_EQ(std::string(reinterpret_cast(data), size), "cdefg"); ASSERT_EQ(stream->byteCount(), 7); diff --git a/src/paimon/format/lance/lance_format_reader_writer_test.cpp b/src/paimon/format/lance/lance_format_reader_writer_test.cpp index 869b62c9..06408ab0 100644 --- a/src/paimon/format/lance/lance_format_reader_writer_test.cpp +++ b/src/paimon/format/lance/lance_format_reader_writer_test.cpp @@ -387,7 +387,7 @@ TEST_F(LanceFileReaderWriterTest, TestReachTargetSize) { } ASSERT_OK(writer->Flush()); ASSERT_OK(writer->Finish()); - // test reach targe size + // test reach target size ASSERT_TRUE(reach_target_size); auto fs = std::make_shared(); ASSERT_OK_AND_ASSIGN(auto file_status, fs->GetFileStatus(file_path)); diff --git a/src/paimon/global_index/lucene/lucene_global_index_test.cpp b/src/paimon/global_index/lucene/lucene_global_index_test.cpp index 1665ff02..9a7c78d7 100644 --- a/src/paimon/global_index/lucene/lucene_global_index_test.cpp +++ b/src/paimon/global_index/lucene/lucene_global_index_test.cpp @@ -306,7 +306,7 @@ TEST_P(LuceneGlobalIndexTest, TestSimpleChinese) { ["最近开源了一个新项目叫qianwen(全角字符),功能类似之前的 Qianwen,是一个面向 AI 应用的智能助手。它不仅支持 Machine Learning 和 NLP 技术,还提供了可扩展的开发框架,便于开发者构建自己的智能助手系统。"], ["我们在测试 qianwen-core v1.2 和 ai-engine-alpha 中的 bug,重点优化了 qianwen 的响应速度和稳定性。本次更新增强了核心模块的功能,提升了智能助手的开发效率,并修复了与 NLP 模块相关的多个问题。"], ["AI 助手开发中常用的技术包括 Speech Recognition、Natural Language Processing 和 Recommendation System。我们使用 TensorFlow 和 PyTorch 构建模型,开发了多个智能助手原型,支持语音交互和上下文理解功能,是当前热门的人工智能发展应用方向。"], -["新一代的 AI 助手代号为「千问」,内部命名为 QianwenX-2024,计划在 next quarter 发布。QianwenX 将集成更强的 multimodal 能力,支持图像和文本联合处理,进一步提升智能助手的理解能力和交互体验,是未来智能助手的重要发展方向。"] +["新一代的 AI 助手代号为「千问」,内部命名为 QianwenX-2024,计划在 next quarter 发布。QianwenX 将集成更强的 multimodel 能力,支持图像和文本联合处理,进一步提升智能助手的理解能力和交互体验,是未来智能助手的重要发展方向。"] ])") .ValueOrDie();