diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 5c2d0796..ae61f4ae 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -29,6 +29,7 @@ set(PAIMON_COMMON_SRCS common/data/columnar/columnar_array.cpp common/data/columnar/columnar_map.cpp common/data/columnar/columnar_row.cpp + common/data/columnar/columnar_row_ref.cpp common/data/decimal.cpp common/data/internal_row.cpp common/data/record_batch.cpp diff --git a/src/paimon/common/data/columnar/columnar_array.cpp b/src/paimon/common/data/columnar/columnar_array.cpp index f7ffe5f0..a845a919 100644 --- a/src/paimon/common/data/columnar/columnar_array.cpp +++ b/src/paimon/common/data/columnar/columnar_array.cpp @@ -26,8 +26,9 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/decimal.h" #include "fmt/format.h" +#include "paimon/common/data/columnar/columnar_batch_context.h" #include "paimon/common/data/columnar/columnar_map.h" -#include "paimon/common/data/columnar/columnar_row.h" +#include "paimon/common/data/columnar/columnar_row_ref.h" #include "paimon/common/utils/date_time_utils.h" namespace paimon { @@ -84,7 +85,8 @@ std::shared_ptr ColumnarArray::GetMap(int32_t pos) const { std::shared_ptr ColumnarArray::GetRow(int32_t pos, int32_t num_fields) const { auto struct_array = arrow::internal::checked_cast(array_); assert(struct_array); - return std::make_shared(struct_array->fields(), pool_, offset_ + pos); + auto row_ctx = std::make_shared(nullptr, struct_array->fields(), pool_); + return std::make_shared(std::move(row_ctx), offset_ + pos); } Result> ColumnarArray::ToBooleanArray() const { diff --git a/src/paimon/common/data/columnar/columnar_batch_context.h b/src/paimon/common/data/columnar/columnar_batch_context.h new file mode 100644 index 00000000..553227bb --- /dev/null +++ b/src/paimon/common/data/columnar/columnar_batch_context.h @@ -0,0 +1,47 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "arrow/array/array_base.h" + +namespace arrow { +class StructArray; +} // namespace arrow + +namespace paimon { +class MemoryPool; + +struct ColumnarBatchContext { + ColumnarBatchContext(const std::shared_ptr& struct_array_in, + const arrow::ArrayVector& field_arrays_in, + const std::shared_ptr& pool_in) + : struct_array(struct_array_in), pool(pool_in), field_arrays(field_arrays_in) { + array_ptrs.reserve(field_arrays.size()); + for (const auto& array : field_arrays) { + array_ptrs.push_back(array.get()); + } + } + + std::shared_ptr struct_array; + std::shared_ptr pool; + arrow::ArrayVector field_arrays; + std::vector array_ptrs; +}; +} // namespace paimon diff --git a/src/paimon/common/data/columnar/columnar_row_ref.cpp b/src/paimon/common/data/columnar/columnar_row_ref.cpp new file mode 100644 index 00000000..27060127 --- /dev/null +++ b/src/paimon/common/data/columnar/columnar_row_ref.cpp @@ -0,0 +1,82 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/data/columnar/columnar_row_ref.h" + +#include + +#include "arrow/array/array_decimal.h" +#include "arrow/array/array_nested.h" +#include "arrow/array/array_primitive.h" +#include "arrow/type_traits.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/decimal.h" +#include "paimon/common/data/columnar/columnar_array.h" +#include "paimon/common/data/columnar/columnar_map.h" +#include "paimon/common/utils/date_time_utils.h" + +namespace paimon { +Decimal ColumnarRowRef::GetDecimal(int32_t pos, int32_t precision, int32_t scale) const { + using ArrayType = typename arrow::TypeTraits::ArrayType; + auto array = arrow::internal::checked_cast(ctx_->array_ptrs[pos]); + assert(array); + arrow::Decimal128 decimal(array->GetValue(row_id_)); + return Decimal(precision, scale, + static_cast(decimal.high_bits()) << 64 | decimal.low_bits()); +} + +Timestamp ColumnarRowRef::GetTimestamp(int32_t pos, int32_t precision) const { + using ArrayType = typename arrow::TypeTraits::ArrayType; + auto array = arrow::internal::checked_cast(ctx_->array_ptrs[pos]); + assert(array); + int64_t data = array->Value(row_id_); + auto timestamp_type = + arrow::internal::checked_pointer_cast(array->type()); + // for orc format, data is saved as nano, therefore, Timestamp convert should consider precision + // in arrow array rather than input precision + DateTimeUtils::TimeType time_type = DateTimeUtils::GetTimeTypeFromArrowType(timestamp_type); + auto [milli, nano] = DateTimeUtils::TimestampConverter( + data, time_type, DateTimeUtils::TimeType::MILLISECOND, DateTimeUtils::TimeType::NANOSECOND); + return Timestamp(milli, nano); +} + +std::shared_ptr ColumnarRowRef::GetRow(int32_t pos, int32_t num_fields) const { + auto struct_array = + arrow::internal::checked_pointer_cast(ctx_->field_arrays[pos]); + assert(struct_array); + auto nested_ctx = + std::make_shared(struct_array, struct_array->fields(), ctx_->pool); + return std::make_shared(std::move(nested_ctx), row_id_); +} + +std::shared_ptr ColumnarRowRef::GetArray(int32_t pos) const { + auto list_array = arrow::internal::checked_cast(ctx_->array_ptrs[pos]); + assert(list_array); + int32_t offset = list_array->value_offset(row_id_); + int32_t length = list_array->value_length(row_id_); + return std::make_shared(list_array->values(), ctx_->pool, offset, length); +} + +std::shared_ptr ColumnarRowRef::GetMap(int32_t pos) const { + auto map_array = arrow::internal::checked_cast(ctx_->array_ptrs[pos]); + assert(map_array); + int32_t offset = map_array->value_offset(row_id_); + int32_t length = map_array->value_length(row_id_); + return std::make_shared(map_array->keys(), map_array->items(), ctx_->pool, offset, + length); +} + +} // namespace paimon diff --git a/src/paimon/common/data/columnar/columnar_row_ref.h b/src/paimon/common/data/columnar/columnar_row_ref.h new file mode 100644 index 00000000..315ef6a2 --- /dev/null +++ b/src/paimon/common/data/columnar/columnar_row_ref.h @@ -0,0 +1,135 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/data/binary_string.h" +#include "paimon/common/data/columnar/columnar_batch_context.h" +#include "paimon/common/data/columnar/columnar_utils.h" +#include "paimon/common/data/internal_array.h" +#include "paimon/common/data/internal_map.h" +#include "paimon/common/data/internal_row.h" +#include "paimon/common/types/row_kind.h" +#include "paimon/data/decimal.h" +#include "paimon/data/timestamp.h" +#include "paimon/result.h" + +namespace paimon { +class Bytes; + +/// Columnar row view which shares batch-level context to reduce per-row overhead. +class ColumnarRowRef : public InternalRow { + public: + ColumnarRowRef(std::shared_ptr ctx, int64_t row_id) + : ctx_(std::move(ctx)), row_id_(row_id) {} + + Result GetRowKind() const override { + return row_kind_; + } + + void SetRowKind(const RowKind* kind) override { + row_kind_ = kind; + } + + int32_t GetFieldCount() const override { + return static_cast(ctx_->array_ptrs.size()); + } + + bool IsNullAt(int32_t pos) const override { + return ctx_->array_ptrs[pos]->IsNull(row_id_); + } + + bool GetBoolean(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(ctx_->array_ptrs[pos], + row_id_); + } + + char GetByte(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(ctx_->array_ptrs[pos], + row_id_); + } + + int16_t GetShort(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(ctx_->array_ptrs[pos], + row_id_); + } + + int32_t GetInt(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(ctx_->array_ptrs[pos], + row_id_); + } + + int32_t GetDate(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(ctx_->array_ptrs[pos], + row_id_); + } + + int64_t GetLong(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(ctx_->array_ptrs[pos], + row_id_); + } + + float GetFloat(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(ctx_->array_ptrs[pos], + row_id_); + } + + double GetDouble(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(ctx_->array_ptrs[pos], + row_id_); + } + + BinaryString GetString(int32_t pos) const override { + auto bytes = ColumnarUtils::GetBytes(ctx_->array_ptrs[pos], row_id_, + ctx_->pool.get()); + return BinaryString::FromBytes(bytes); + } + + std::string_view GetStringView(int32_t pos) const override { + return ColumnarUtils::GetView(ctx_->array_ptrs[pos], row_id_); + } + + Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const override; + + Timestamp GetTimestamp(int32_t pos, int32_t precision) const override; + + std::shared_ptr GetBinary(int32_t pos) const override { + return ColumnarUtils::GetBytes(ctx_->array_ptrs[pos], row_id_, + ctx_->pool.get()); + } + + std::shared_ptr GetRow(int32_t pos, int32_t num_fields) const override; + + std::shared_ptr GetArray(int32_t pos) const override; + + std::shared_ptr GetMap(int32_t pos) const override; + + std::string ToString() const override { + return fmt::format("ColumnarRowRef, row_id {}", row_id_); + } + + private: + std::shared_ptr ctx_; + const RowKind* row_kind_ = RowKind::Insert(); + int64_t row_id_; +}; +} // namespace paimon diff --git a/src/paimon/common/data/columnar/columnar_row_test.cpp b/src/paimon/common/data/columnar/columnar_row_test.cpp index a8103906..da56e0a7 100644 --- a/src/paimon/common/data/columnar/columnar_row_test.cpp +++ b/src/paimon/common/data/columnar/columnar_row_test.cpp @@ -24,6 +24,7 @@ #include "arrow/array/array_nested.h" #include "arrow/ipc/json_simple.h" #include "gtest/gtest.h" +#include "paimon/common/data/columnar/columnar_row_ref.h" #include "paimon/common/utils/date_time_utils.h" #include "paimon/memory/bytes.h" #include "paimon/memory/memory_pool.h" @@ -70,6 +71,32 @@ TEST(ColumnarRowTest, TestSimple) { ASSERT_EQ(std::string(row.GetStringView(7)), "Hello"); } +TEST(ColumnarRowRefTest, TestSimple) { + auto pool = GetDefaultPool(); + std::shared_ptr target_type = + arrow::struct_({arrow::field("f1", arrow::int32()), arrow::field("f2", arrow::utf8())}); + auto f1 = + arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), R"([1, 2, 3])").ValueOrDie(); + auto f2 = + arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(), R"(["alpha", "beta", "gamma"])") + .ValueOrDie(); + auto data = arrow::StructArray::Make({f1, f2}, target_type->fields()).ValueOrDie(); + + auto ctx = std::make_shared(data, data->fields(), pool); + ColumnarRowRef row(ctx, 1); + ASSERT_EQ(row.GetFieldCount(), 2); + ASSERT_EQ(row.GetInt(0), 2); + ASSERT_EQ(std::string(row.GetStringView(1)), "beta"); + + auto row_kind = row.GetRowKind(); + ASSERT_TRUE(row_kind.ok()); + ASSERT_EQ(row_kind.value(), RowKind::Insert()); + row.SetRowKind(RowKind::Delete()); + auto updated_kind = row.GetRowKind(); + ASSERT_TRUE(updated_kind.ok()); + ASSERT_EQ(updated_kind.value(), RowKind::Delete()); +} + TEST(ColumnarRowTest, TestComplexAndNestedType) { auto pool = GetDefaultPool(); std::shared_ptr target_type = arrow::struct_({ diff --git a/src/paimon/core/io/key_value_data_file_record_reader.cpp b/src/paimon/core/io/key_value_data_file_record_reader.cpp index 5e7797a2..556f259e 100644 --- a/src/paimon/core/io/key_value_data_file_record_reader.cpp +++ b/src/paimon/core/io/key_value_data_file_record_reader.cpp @@ -28,8 +28,7 @@ #include "arrow/type.h" #include "arrow/util/checked_cast.h" #include "fmt/format.h" -#include "paimon/common/data/columnar/columnar_row.h" -#include "paimon/common/data/internal_row.h" +#include "paimon/common/data/columnar/columnar_row_ref.h" #include "paimon/common/table/special_fields.h" #include "paimon/common/types/row_kind.h" #include "paimon/common/utils/arrow/status_utils.h" @@ -69,12 +68,11 @@ bool KeyValueDataFileRecordReader::Iterator::HasNext() const { Result KeyValueDataFileRecordReader::Iterator::Next() { assert(HasNext()); - // as key is only used in merge sort, do not hold the data in ColumnarRow - auto key = std::make_unique(reader_->key_fields_, reader_->pool_, cursor_); - // as value is used in merge sort and projection (maybe async and multi-thread), hold the data - // in ColumnarRow - auto value = std::make_unique(reader_->value_struct_array_, reader_->value_fields_, - reader_->pool_, cursor_); + // key is only used in merge sort; key context does not hold parent struct array + auto key = std::make_unique(reader_->key_ctx_, cursor_); + // value is used in merge sort and projection (maybe async and multi-thread), so value context + // holds parent struct array to ensure data remains valid + auto value = std::make_unique(reader_->value_ctx_, cursor_); PAIMON_ASSIGN_OR_RAISE(const RowKind* row_kind, RowKind::FromByteValue(reader_->row_kind_array_->Value(cursor_))); int64_t sequence_number = reader_->sequence_number_array_->Value(cursor_); @@ -137,6 +135,8 @@ Result> KeyValueDataFileRecordRe arrow::StructArray::Make(value_fields_, value_names_)); selection_bitmap_ = std::move(bitmap); value_fields_ = value_struct_array_->fields(); + key_ctx_ = std::make_shared(nullptr, key_fields_, pool_); + value_ctx_ = std::make_shared(value_struct_array_, value_fields_, pool_); TraverseArray(value_struct_array_); return std::make_unique(this); } @@ -148,6 +148,8 @@ void KeyValueDataFileRecordReader::Reset() { value_struct_array_.reset(); sequence_number_array_.reset(); row_kind_array_.reset(); + key_ctx_.reset(); + value_ctx_.reset(); } void KeyValueDataFileRecordReader::TraverseArray(const std::shared_ptr& array) { diff --git a/src/paimon/core/io/key_value_data_file_record_reader.h b/src/paimon/core/io/key_value_data_file_record_reader.h index dbbc4853..d950e25d 100644 --- a/src/paimon/core/io/key_value_data_file_record_reader.h +++ b/src/paimon/core/io/key_value_data_file_record_reader.h @@ -22,7 +22,6 @@ #include #include "arrow/type_fwd.h" -#include "paimon/common/data/columnar/columnar_row.h" #include "paimon/core/io/key_value_record_reader.h" #include "paimon/core/key_value.h" #include "paimon/reader/batch_reader.h" @@ -42,6 +41,7 @@ class NumericArray; namespace paimon { class MemoryPool; class Metrics; +struct ColumnarBatchContext; // Convert the arrow array of data file into a KeyValue object iterator (parsing SEQUENCE_NUMBER and // VALUE_KIND columns) @@ -95,5 +95,7 @@ class KeyValueDataFileRecordReader : public KeyValueRecordReader { arrow::ArrayVector value_fields_; std::shared_ptr> sequence_number_array_; std::shared_ptr> row_kind_array_; + std::shared_ptr key_ctx_; + std::shared_ptr value_ctx_; }; } // namespace paimon