Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ set(PAIMON_COMMON_SRCS
common/data/columnar/columnar_array.cpp
common/data/columnar/columnar_map.cpp
common/data/columnar/columnar_row.cpp
common/data/columnar/columnar_row_ref.cpp
common/data/decimal.cpp
common/data/internal_row.cpp
common/data/record_batch.cpp
Expand Down
6 changes: 4 additions & 2 deletions src/paimon/common/data/columnar/columnar_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
#include "arrow/util/checked_cast.h"
#include "arrow/util/decimal.h"
#include "fmt/format.h"
#include "paimon/common/data/columnar/columnar_batch_context.h"
#include "paimon/common/data/columnar/columnar_map.h"
#include "paimon/common/data/columnar/columnar_row.h"
#include "paimon/common/data/columnar/columnar_row_ref.h"
#include "paimon/common/utils/date_time_utils.h"

namespace paimon {
Expand Down Expand Up @@ -84,7 +85,8 @@ std::shared_ptr<InternalMap> ColumnarArray::GetMap(int32_t pos) const {
std::shared_ptr<InternalRow> ColumnarArray::GetRow(int32_t pos, int32_t num_fields) const {
auto struct_array = arrow::internal::checked_cast<const arrow::StructArray*>(array_);
assert(struct_array);
return std::make_shared<ColumnarRow>(struct_array->fields(), pool_, offset_ + pos);
auto row_ctx = std::make_shared<ColumnarBatchContext>(nullptr, struct_array->fields(), pool_);
return std::make_shared<ColumnarRowRef>(std::move(row_ctx), offset_ + pos);
}

Result<std::vector<char>> ColumnarArray::ToBooleanArray() const {
Expand Down
47 changes: 47 additions & 0 deletions src/paimon/common/data/columnar/columnar_batch_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>
#include <vector>

#include "arrow/array/array_base.h"

namespace arrow {
class StructArray;
} // namespace arrow

namespace paimon {
class MemoryPool;

struct ColumnarBatchContext {
ColumnarBatchContext(const std::shared_ptr<arrow::StructArray>& struct_array_in,
const arrow::ArrayVector& field_arrays_in,
const std::shared_ptr<MemoryPool>& pool_in)
: struct_array(struct_array_in), pool(pool_in), field_arrays(field_arrays_in) {
array_ptrs.reserve(field_arrays.size());
for (const auto& array : field_arrays) {
array_ptrs.push_back(array.get());
}
}

std::shared_ptr<arrow::StructArray> struct_array;
std::shared_ptr<MemoryPool> pool;
arrow::ArrayVector field_arrays;
std::vector<const arrow::Array*> array_ptrs;
};
Copy link
Collaborator

@lxy-9602 lxy-9602 Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, the struct_array_ field in ColumnarRow was used solely as a data holder and could not be directly utilized, since it might contain fields beyond those present in array_vec_.

In the new ColumnarBatchContext, however, array_vec_holder appears to serve as the primary source of columnar data. Given that a file may contain thousands of fields, maintaining separate holders like array_vec_holder could lead to unnecessary shared_ptr increments.

Suggestion: Consider using struct_array as the canonical holder and removing array_vec_holder to simplify the design and eliminate redundancy.

} // namespace paimon
82 changes: 82 additions & 0 deletions src/paimon/common/data/columnar/columnar_row_ref.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/common/data/columnar/columnar_row_ref.h"

#include <cassert>

#include "arrow/array/array_decimal.h"
#include "arrow/array/array_nested.h"
#include "arrow/array/array_primitive.h"
#include "arrow/type_traits.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/decimal.h"
#include "paimon/common/data/columnar/columnar_array.h"
#include "paimon/common/data/columnar/columnar_map.h"
#include "paimon/common/utils/date_time_utils.h"

namespace paimon {
Decimal ColumnarRowRef::GetDecimal(int32_t pos, int32_t precision, int32_t scale) const {
using ArrayType = typename arrow::TypeTraits<arrow::Decimal128Type>::ArrayType;
auto array = arrow::internal::checked_cast<const ArrayType*>(ctx_->array_ptrs[pos]);
assert(array);
arrow::Decimal128 decimal(array->GetValue(row_id_));
return Decimal(precision, scale,
static_cast<Decimal::int128_t>(decimal.high_bits()) << 64 | decimal.low_bits());
}

Timestamp ColumnarRowRef::GetTimestamp(int32_t pos, int32_t precision) const {
using ArrayType = typename arrow::TypeTraits<arrow::TimestampType>::ArrayType;
auto array = arrow::internal::checked_cast<const ArrayType*>(ctx_->array_ptrs[pos]);
assert(array);
int64_t data = array->Value(row_id_);
auto timestamp_type =
arrow::internal::checked_pointer_cast<arrow::TimestampType>(array->type());
// for orc format, data is saved as nano, therefore, Timestamp convert should consider precision
// in arrow array rather than input precision
DateTimeUtils::TimeType time_type = DateTimeUtils::GetTimeTypeFromArrowType(timestamp_type);
auto [milli, nano] = DateTimeUtils::TimestampConverter(
data, time_type, DateTimeUtils::TimeType::MILLISECOND, DateTimeUtils::TimeType::NANOSECOND);
return Timestamp(milli, nano);
}

std::shared_ptr<InternalRow> ColumnarRowRef::GetRow(int32_t pos, int32_t num_fields) const {
auto struct_array =
arrow::internal::checked_pointer_cast<arrow::StructArray>(ctx_->field_arrays[pos]);
assert(struct_array);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May change array_vec_holderto array_vec for consistent?

auto nested_ctx =
std::make_shared<ColumnarBatchContext>(struct_array, struct_array->fields(), ctx_->pool);
return std::make_shared<ColumnarRowRef>(std::move(nested_ctx), row_id_);
}

std::shared_ptr<InternalArray> ColumnarRowRef::GetArray(int32_t pos) const {
auto list_array = arrow::internal::checked_cast<const arrow::ListArray*>(ctx_->array_ptrs[pos]);
assert(list_array);
int32_t offset = list_array->value_offset(row_id_);
int32_t length = list_array->value_length(row_id_);
return std::make_shared<ColumnarArray>(list_array->values(), ctx_->pool, offset, length);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a ColumnarArray contains nested rows (e.g., an array of structs), should it return a ColumnarRowRef for inner row access?

}

std::shared_ptr<InternalMap> ColumnarRowRef::GetMap(int32_t pos) const {
auto map_array = arrow::internal::checked_cast<const arrow::MapArray*>(ctx_->array_ptrs[pos]);
assert(map_array);
int32_t offset = map_array->value_offset(row_id_);
int32_t length = map_array->value_length(row_id_);
return std::make_shared<ColumnarMap>(map_array->keys(), map_array->items(), ctx_->pool, offset,
length);
}

} // namespace paimon
135 changes: 135 additions & 0 deletions src/paimon/common/data/columnar/columnar_row_ref.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cstdint>
#include <memory>
#include <string>
#include <string_view>

#include "fmt/format.h"
#include "paimon/common/data/binary_string.h"
#include "paimon/common/data/columnar/columnar_batch_context.h"
#include "paimon/common/data/columnar/columnar_utils.h"
#include "paimon/common/data/internal_array.h"
#include "paimon/common/data/internal_map.h"
#include "paimon/common/data/internal_row.h"
#include "paimon/common/types/row_kind.h"
#include "paimon/data/decimal.h"
#include "paimon/data/timestamp.h"
#include "paimon/result.h"

namespace paimon {
class Bytes;

/// Columnar row view which shares batch-level context to reduce per-row overhead.
class ColumnarRowRef : public InternalRow {
public:
ColumnarRowRef(std::shared_ptr<ColumnarBatchContext> ctx, int64_t row_id)
: ctx_(std::move(ctx)), row_id_(row_id) {}

Result<const RowKind*> GetRowKind() const override {
return row_kind_;
}

void SetRowKind(const RowKind* kind) override {
row_kind_ = kind;
}

int32_t GetFieldCount() const override {
return static_cast<int32_t>(ctx_->array_ptrs.size());
}

bool IsNullAt(int32_t pos) const override {
return ctx_->array_ptrs[pos]->IsNull(row_id_);
}

bool GetBoolean(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::BooleanType, bool>(ctx_->array_ptrs[pos],
row_id_);
}

char GetByte(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int8Type, char>(ctx_->array_ptrs[pos],
row_id_);
}

int16_t GetShort(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int16Type, int16_t>(ctx_->array_ptrs[pos],
row_id_);
}

int32_t GetInt(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int32Type, int32_t>(ctx_->array_ptrs[pos],
row_id_);
}

int32_t GetDate(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Date32Type, int32_t>(ctx_->array_ptrs[pos],
row_id_);
}

int64_t GetLong(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int64Type, int64_t>(ctx_->array_ptrs[pos],
row_id_);
}

float GetFloat(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::FloatType, float>(ctx_->array_ptrs[pos],
row_id_);
}

double GetDouble(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::DoubleType, double>(ctx_->array_ptrs[pos],
row_id_);
}

BinaryString GetString(int32_t pos) const override {
auto bytes = ColumnarUtils::GetBytes<arrow::StringType>(ctx_->array_ptrs[pos], row_id_,
ctx_->pool.get());
return BinaryString::FromBytes(bytes);
}

std::string_view GetStringView(int32_t pos) const override {
return ColumnarUtils::GetView(ctx_->array_ptrs[pos], row_id_);
}

Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const override;

Timestamp GetTimestamp(int32_t pos, int32_t precision) const override;

std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
return ColumnarUtils::GetBytes<arrow::BinaryType>(ctx_->array_ptrs[pos], row_id_,
ctx_->pool.get());
}

std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const override;

std::shared_ptr<InternalArray> GetArray(int32_t pos) const override;

std::shared_ptr<InternalMap> GetMap(int32_t pos) const override;

std::string ToString() const override {
return fmt::format("ColumnarRowRef, row_id {}", row_id_);
}

private:
std::shared_ptr<ColumnarBatchContext> ctx_;
const RowKind* row_kind_ = RowKind::Insert();
int64_t row_id_;
};
} // namespace paimon
27 changes: 27 additions & 0 deletions src/paimon/common/data/columnar/columnar_row_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "arrow/array/array_nested.h"
#include "arrow/ipc/json_simple.h"
#include "gtest/gtest.h"
#include "paimon/common/data/columnar/columnar_row_ref.h"
#include "paimon/common/utils/date_time_utils.h"
#include "paimon/memory/bytes.h"
#include "paimon/memory/memory_pool.h"
Expand Down Expand Up @@ -70,6 +71,32 @@ TEST(ColumnarRowTest, TestSimple) {
ASSERT_EQ(std::string(row.GetStringView(7)), "Hello");
}

TEST(ColumnarRowRefTest, TestSimple) {
auto pool = GetDefaultPool();
std::shared_ptr<arrow::DataType> target_type =
arrow::struct_({arrow::field("f1", arrow::int32()), arrow::field("f2", arrow::utf8())});
auto f1 =
arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), R"([1, 2, 3])").ValueOrDie();
auto f2 =
arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(), R"(["alpha", "beta", "gamma"])")
.ValueOrDie();
auto data = arrow::StructArray::Make({f1, f2}, target_type->fields()).ValueOrDie();

auto ctx = std::make_shared<ColumnarBatchContext>(data, data->fields(), pool);
ColumnarRowRef row(ctx, 1);
ASSERT_EQ(row.GetFieldCount(), 2);
ASSERT_EQ(row.GetInt(0), 2);
ASSERT_EQ(std::string(row.GetStringView(1)), "beta");

auto row_kind = row.GetRowKind();
ASSERT_TRUE(row_kind.ok());
ASSERT_EQ(row_kind.value(), RowKind::Insert());
row.SetRowKind(RowKind::Delete());
auto updated_kind = row.GetRowKind();
ASSERT_TRUE(updated_kind.ok());
ASSERT_EQ(updated_kind.value(), RowKind::Delete());
}

TEST(ColumnarRowTest, TestComplexAndNestedType) {
auto pool = GetDefaultPool();
std::shared_ptr<arrow::DataType> target_type = arrow::struct_({
Expand Down
18 changes: 10 additions & 8 deletions src/paimon/core/io/key_value_data_file_record_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
#include "arrow/type.h"
#include "arrow/util/checked_cast.h"
#include "fmt/format.h"
#include "paimon/common/data/columnar/columnar_row.h"
#include "paimon/common/data/internal_row.h"
#include "paimon/common/data/columnar/columnar_row_ref.h"
#include "paimon/common/table/special_fields.h"
#include "paimon/common/types/row_kind.h"
#include "paimon/common/utils/arrow/status_utils.h"
Expand Down Expand Up @@ -69,12 +68,11 @@ bool KeyValueDataFileRecordReader::Iterator::HasNext() const {

Result<KeyValue> KeyValueDataFileRecordReader::Iterator::Next() {
assert(HasNext());
// as key is only used in merge sort, do not hold the data in ColumnarRow
auto key = std::make_unique<ColumnarRow>(reader_->key_fields_, reader_->pool_, cursor_);
// as value is used in merge sort and projection (maybe async and multi-thread), hold the data
// in ColumnarRow
auto value = std::make_unique<ColumnarRow>(reader_->value_struct_array_, reader_->value_fields_,
reader_->pool_, cursor_);
// key is only used in merge sort; key context does not hold parent struct array
auto key = std::make_unique<ColumnarRowRef>(reader_->key_ctx_, cursor_);
// value is used in merge sort and projection (maybe async and multi-thread), so value context
// holds parent struct array to ensure data remains valid
auto value = std::make_unique<ColumnarRowRef>(reader_->value_ctx_, cursor_);
PAIMON_ASSIGN_OR_RAISE(const RowKind* row_kind,
RowKind::FromByteValue(reader_->row_kind_array_->Value(cursor_)));
int64_t sequence_number = reader_->sequence_number_array_->Value(cursor_);
Expand Down Expand Up @@ -137,6 +135,8 @@ Result<std::unique_ptr<KeyValueRecordReader::Iterator>> KeyValueDataFileRecordRe
arrow::StructArray::Make(value_fields_, value_names_));
selection_bitmap_ = std::move(bitmap);
value_fields_ = value_struct_array_->fields();
key_ctx_ = std::make_shared<ColumnarBatchContext>(nullptr, key_fields_, pool_);
value_ctx_ = std::make_shared<ColumnarBatchContext>(value_struct_array_, value_fields_, pool_);
TraverseArray(value_struct_array_);
return std::make_unique<KeyValueDataFileRecordReader::Iterator>(this);
}
Expand All @@ -148,6 +148,8 @@ void KeyValueDataFileRecordReader::Reset() {
value_struct_array_.reset();
sequence_number_array_.reset();
row_kind_array_.reset();
key_ctx_.reset();
value_ctx_.reset();
}

void KeyValueDataFileRecordReader::TraverseArray(const std::shared_ptr<arrow::Array>& array) {
Expand Down
Loading
Loading