Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ This product includes code from Apache Iceberg C++.
* src/paimon/format/avro/avro_direct_decoder.h
* src/paimon/format/avro/avro_direct_encoder.cpp
* src/paimon/format/avro/avro_direct_encoder.h
* Avro input stream in src/paimon/format/avro/avro_direct_decoder.cpp

Copyright: 2024-2025 The Apache Software Foundation.
Home page: https://iceberg.apache.org/
Expand Down
4 changes: 2 additions & 2 deletions src/paimon/core/utils/duration.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ class Duration {
}

uint64_t Reset() {
uint64_t dura = Get();
uint64_t duration = Get();
start_ = std::chrono::high_resolution_clock::now();
return dura;
return duration;
}

private:
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/format/avro/avro_direct_encoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ Status AvroDirectEncoder::EncodeArrowToAvro(const ::avro::NodePtr& avro_node,
element_node->leaves() != 2)) {
return Status::Invalid(
fmt::format("Expected AVRO_RECORD for map key-value pair, got {}",
AvroUtils::ToString(avro_node)));
AvroUtils::ToString(element_node)));
}

const auto& map_array =
Expand Down
113 changes: 60 additions & 53 deletions src/paimon/format/avro/avro_input_stream_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
* limitations under the License.
*/

// Adapted from Apache Iceberg C++
// https://github.com/apache/iceberg-cpp/blob/main/src/iceberg/avro/avro_stream_internal.cc

#include "paimon/format/avro/avro_input_stream_impl.h"

#include <algorithm>
#include <memory>
#include <string>
#include <utility>

Expand All @@ -39,82 +43,85 @@ AvroInputStreamImpl::AvroInputStreamImpl(const std::shared_ptr<paimon::InputStre
size_t buffer_size, const uint64_t total_length,
const std::shared_ptr<MemoryPool>& pool)
: pool_(pool),
in_(input_stream),
buffer_size_(buffer_size),
total_length_(total_length),
buffer_(reinterpret_cast<uint8_t*>(pool_->Malloc(buffer_size))),
in_(input_stream),
byte_count_(0),
next_(buffer_),
available_(0) {}
buffer_(reinterpret_cast<uint8_t*>(pool_->Malloc(buffer_size))) {}

AvroInputStreamImpl::~AvroInputStreamImpl() {
pool_->Free(buffer_, buffer_size_);
}

bool AvroInputStreamImpl::next(const uint8_t** data, size_t* size) {
if (available_ == 0 && !fill()) {
return false;
bool AvroInputStreamImpl::next(const uint8_t** data, size_t* len) {
// Return all unconsumed data in the buffer
if (buffer_pos_ < available_bytes_) {
*data = buffer_ + buffer_pos_;
*len = available_bytes_ - buffer_pos_;
byte_count_ += available_bytes_ - buffer_pos_;
buffer_pos_ = available_bytes_;
return true;
}

// Read from the input stream when the buffer is empty
uint64_t remaining = total_length_ - stream_pos_;
if (remaining == 0) {
return false; // eof
}
auto read_length =
in_->Read(reinterpret_cast<char*>(buffer_), std::min(buffer_size_, remaining));
if (!read_length.ok()) {
throw ::avro::Exception("Read failed: {}", read_length.status().ToString());
}
*data = next_;
*size = available_;
next_ += available_;
byte_count_ += available_;
available_ = 0;
available_bytes_ = read_length.value();
stream_pos_ += available_bytes_;
buffer_pos_ = 0;

// Return the whole buffer
*data = buffer_;
*len = available_bytes_;
byte_count_ += available_bytes_;
buffer_pos_ = available_bytes_;

return true;
}

void AvroInputStreamImpl::backup(size_t len) {
next_ -= len;
available_ += len;
if (len > buffer_pos_) {
throw ::avro::Exception("Cannot backup {} bytes, only {} bytes available", len,
buffer_pos_);
}

buffer_pos_ -= len;
byte_count_ -= len;
}

void AvroInputStreamImpl::skip(size_t len) {
while (len > 0) {
if (available_ == 0) {
auto s = in_->Seek(len, paimon::FS_SEEK_CUR);
if (!s.ok()) {
throw ::avro::Exception(s.ToString());
}
byte_count_ += len;
total_read_len_ += len;
return;
}
size_t n = std::min(available_, len);
available_ -= n;
next_ += n;
len -= n;
byte_count_ += n;
// The range to skip is within the buffer
if (buffer_pos_ + len <= available_bytes_) {
buffer_pos_ += len;
byte_count_ += len;
return;
}
seek(byte_count_ + len);
}

void AvroInputStreamImpl::seek(int64_t position) {
auto s = in_->Seek(position - byte_count_ - available_, paimon::FS_SEEK_CUR);
if (!s.ok()) {
throw ::avro::Exception(s.ToString());
}
byte_count_ = position;
total_read_len_ = position;
available_ = 0;
size_t AvroInputStreamImpl::byteCount() const {
return byte_count_;
}

bool AvroInputStreamImpl::fill() {
if (static_cast<uint64_t>(total_read_len_) >= total_length_) {
// eof
return false;
}
Result<int32_t> actual_len = in_->Read(reinterpret_cast<char*>(buffer_),
std::min(buffer_size_, total_length_ - total_read_len_));
if (!actual_len.ok()) {
throw ::avro::Exception(actual_len.status().ToString());
void AvroInputStreamImpl::seek(int64_t position) {
if (static_cast<uint64_t>(position) > total_length_) {
throw ::avro::Exception("Cannot seek to {}, total length is {}", position, total_length_);
}
total_read_len_ += actual_len.value();
if (actual_len.value() != 0) {
next_ = buffer_;
available_ = actual_len.value();
return true;
auto status = in_->Seek(position, SeekOrigin::FS_SEEK_SET);
if (!status.ok()) {
throw ::avro::Exception("Failed to seek to {}, got {}", position, status.ToString());
}
return false;

stream_pos_ = position;
buffer_pos_ = 0;
available_bytes_ = 0;
byte_count_ = position;
}

} // namespace paimon::avro
17 changes: 7 additions & 10 deletions src/paimon/format/avro/avro_input_stream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,26 @@ class AvroInputStreamImpl : public ::avro::SeekableInputStream {

~AvroInputStreamImpl() override;

bool next(const uint8_t** data, size_t* size) override;
bool next(const uint8_t** data, size_t* len) override;
void backup(size_t len) override;
void skip(size_t len) override;
size_t byteCount() const override {
return byte_count_;
}
size_t byteCount() const override;
void seek(int64_t position) override;

private:
AvroInputStreamImpl(const std::shared_ptr<paimon::InputStream>& input_stream,
size_t buffer_size, const uint64_t length,
const std::shared_ptr<MemoryPool>& pool);
bool fill();

std::shared_ptr<MemoryPool> pool_;
std::shared_ptr<paimon::InputStream> in_;
const size_t buffer_size_;
const uint64_t total_length_;
uint8_t* const buffer_;
std::shared_ptr<paimon::InputStream> in_;
size_t byte_count_;
uint8_t* next_;
size_t available_;
int32_t total_read_len_ = 0;
size_t byte_count_ = 0; // bytes position in the avro input stream
size_t stream_pos_ = 0; // current position in the paimon input stream
size_t buffer_pos_ = 0; // next position to read in the buffer
size_t available_bytes_ = 0; // bytes available in the buffer
};

} // namespace paimon::avro
22 changes: 19 additions & 3 deletions src/paimon/format/avro/avro_input_stream_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,21 @@ TEST(AvroInputStreamImplTest, TestSkip) {
ASSERT_TRUE(stream->next(&data, &size));
ASSERT_EQ(size, 5);
ASSERT_EQ(std::string(reinterpret_cast<const char*>(data), size), "fghij");
stream->skip(5);
ASSERT_EQ(stream->byteCount(), 15);
ASSERT_FALSE(stream->next(&data, &size));
ASSERT_THROW(stream->skip(5), ::avro::Exception); // already eof, cannot skip more
ASSERT_EQ(stream->byteCount(), 10);
ASSERT_FALSE(stream->next(&data, &size)); // reach eof

ASSERT_THROW(stream->backup(7), ::avro::Exception); // buffer item is 5, cannot backup 7
stream->backup(4);
ASSERT_EQ(stream->byteCount(), 6);
stream->skip(2); // skip 2 bytes from the available buffer data
ASSERT_EQ(stream->byteCount(), 8);

// verify we can read the remaining 2 bytes from buffer
ASSERT_TRUE(stream->next(&data, &size));
ASSERT_EQ(size, 2);
ASSERT_EQ(std::string(reinterpret_cast<const char*>(data), size), "ij");
ASSERT_EQ(stream->byteCount(), 10);
}

TEST(AvroInputStreamImplTest, TestSkipWithAvailableData) {
Expand Down Expand Up @@ -169,6 +181,10 @@ TEST(AvroInputStreamImplTest, TestSeek) {
ASSERT_EQ(size, 5);
ASSERT_EQ(std::string(reinterpret_cast<const char*>(data), size), "abcde");
stream->seek(2);
ASSERT_EQ(stream->byteCount(), 2);

// after seek, buffer will be cleared, cannot backup
ASSERT_THROW(stream->backup(2), ::avro::Exception);
ASSERT_TRUE(stream->next(&data, &size));
ASSERT_EQ(std::string(reinterpret_cast<const char*>(data), size), "cdefg");
ASSERT_EQ(stream->byteCount(), 7);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ TEST_F(LanceFileReaderWriterTest, TestReachTargetSize) {
}
ASSERT_OK(writer->Flush());
ASSERT_OK(writer->Finish());
// test reach targe size
// test reach target size
ASSERT_TRUE(reach_target_size);
auto fs = std::make_shared<LocalFileSystem>();
ASSERT_OK_AND_ASSIGN(auto file_status, fs->GetFileStatus(file_path));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ TEST_P(LuceneGlobalIndexTest, TestSimpleChinese) {
["最近开源了一个新项目叫qianwen(全角字符),功能类似之前的 Qianwen,是一个面向 AI 应用的智能助手。它不仅支持 Machine Learning 和 NLP 技术,还提供了可扩展的开发框架,便于开发者构建自己的智能助手系统。"],
["我们在测试 qianwen-core v1.2 和 ai-engine-alpha 中的 bug,重点优化了 qianwen 的响应速度和稳定性。本次更新增强了核心模块的功能,提升了智能助手的开发效率,并修复了与 NLP 模块相关的多个问题。"],
["AI 助手开发中常用的技术包括 Speech Recognition、Natural Language Processing 和 Recommendation System。我们使用 TensorFlow 和 PyTorch 构建模型,开发了多个智能助手原型,支持语音交互和上下文理解功能,是当前热门的人工智能发展应用方向。"],
["新一代的 AI 助手代号为「千问」,内部命名为 QianwenX-2024,计划在 next quarter 发布。QianwenX 将集成更强的 multimodal 能力,支持图像和文本联合处理,进一步提升智能助手的理解能力和交互体验,是未来智能助手的重要发展方向。"]
["新一代的 AI 助手代号为「千问」,内部命名为 QianwenX-2024,计划在 next quarter 发布。QianwenX 将集成更强的 multimodel 能力,支持图像和文本联合处理,进一步提升智能助手的理解能力和交互体验,是未来智能助手的重要发展方向。"]
])")
.ValueOrDie();

Expand Down
Loading