diff --git a/docs/source/user_guide/data_types.rst b/docs/source/user_guide/data_types.rst index d8c6512a..d2b125af 100644 --- a/docs/source/user_guide/data_types.rst +++ b/docs/source/user_guide/data_types.rst @@ -158,7 +158,7 @@ and `Arrow DataTypes `` diff --git a/src/paimon/common/utils/string_utils.cpp b/src/paimon/common/utils/string_utils.cpp index 88312d03..c194c778 100644 --- a/src/paimon/common/utils/string_utils.cpp +++ b/src/paimon/common/utils/string_utils.cpp @@ -138,11 +138,11 @@ Result StringUtils::StringToDate(const std::string& str) { std::istringstream ss(str); ss >> std::get_time(&timeinfo, "%Y-%m-%d"); if (ss.fail()) { - return Status::Invalid(fmt::format("failed to convert string {} to date", str)); + return Status::Invalid(fmt::format("failed to convert string '{}' to date", str)); } std::time_t time = timegm(&timeinfo); if (time == -1) { - return Status::Invalid(fmt::format("failed to convert string {} to date", str)); + return Status::Invalid(fmt::format("failed to convert string '{}' to date", str)); } static const int64_t SECONDS_PER_DAY = 86400l; // = 24 * 60 * 60 return time / SECONDS_PER_DAY; diff --git a/src/paimon/core/casting/cast_executor_test.cpp b/src/paimon/core/casting/cast_executor_test.cpp index 838d0b60..1ff9397e 100644 --- a/src/paimon/core/casting/cast_executor_test.cpp +++ b/src/paimon/core/casting/cast_executor_test.cpp @@ -837,7 +837,7 @@ TEST_F(CastExecutorTest, TestStringToBooleanCastExecutorCastLiteral) { auto msg = CheckLiteralInvalidResult(cast_executor, FieldType::STRING, src_data, arrow::boolean()); ASSERT_TRUE( - msg.find("StringToBooleanCastExecutor cast failed: STRING cannot cast to BOOLEAN") != + msg.find("StringToBooleanCastExecutor cast failed: STRING '' cannot cast to BOOLEAN") != std::string::npos); } { @@ -847,7 +847,7 @@ TEST_F(CastExecutorTest, TestStringToBooleanCastExecutorCastLiteral) { src_data, arrow::boolean()); ASSERT_TRUE( msg.find( - "StringToBooleanCastExecutor cast failed: STRING ttrue cannot cast to BOOLEAN") != + "StringToBooleanCastExecutor cast failed: STRING 'ttrue' cannot cast to BOOLEAN") != std::string::npos); } } @@ -864,7 +864,7 @@ TEST_F(CastExecutorTest, TestStringToBooleanCastExecutorCastArray) { auto msg = CheckArrayInvalidResult(cast_executor, arrow::utf8(), arrow::boolean(), R"([""])"); ASSERT_TRUE( - msg.find("StringToBooleanCastExecutor cast failed: STRING cannot cast to BOOLEAN") != + msg.find("StringToBooleanCastExecutor cast failed: STRING '' cannot cast to BOOLEAN") != std::string::npos); } { @@ -873,7 +873,7 @@ TEST_F(CastExecutorTest, TestStringToBooleanCastExecutorCastArray) { R"(["true", "ttrue"])"); ASSERT_TRUE( msg.find( - "StringToBooleanCastExecutor cast failed: STRING ttrue cannot cast to BOOLEAN") != + "StringToBooleanCastExecutor cast failed: STRING 'ttrue' cannot cast to BOOLEAN") != std::string::npos); } } @@ -940,49 +940,49 @@ TEST_F(CastExecutorTest, TestStringToNumericPrimitiveCastExecutorCastLiteral) { auto msg = CheckLiteralInvalidResult(cast_executor, FieldType::STRING, src_data, arrow::int8()); ASSERT_TRUE(msg.find("cast literal in StringToNumericPrimitiveCastExecutor failed: cannot " - "cast 128 from STRING to TINYINT") != std::string::npos); + "cast '128' from STRING to TINYINT") != std::string::npos); } { std::string src_data = "-129"; auto msg = CheckLiteralInvalidResult(cast_executor, FieldType::STRING, src_data, arrow::int8()); ASSERT_TRUE(msg.find("cast literal in StringToNumericPrimitiveCastExecutor failed: cannot " - "cast -129 from STRING to TINYINT") != std::string::npos); + "cast '-129' from STRING to TINYINT") != std::string::npos); } { std::string src_data = "32768"; auto msg = CheckLiteralInvalidResult(cast_executor, FieldType::STRING, src_data, arrow::int16()); ASSERT_TRUE(msg.find("cast literal in StringToNumericPrimitiveCastExecutor failed: cannot " - "cast 32768 from STRING to SMALLINT") != std::string::npos); + "cast '32768' from STRING to SMALLINT") != std::string::npos); } { std::string src_data = "-32769"; auto msg = CheckLiteralInvalidResult(cast_executor, FieldType::STRING, src_data, arrow::int16()); ASSERT_TRUE(msg.find("cast literal in StringToNumericPrimitiveCastExecutor failed: cannot " - "cast -32769 from STRING to SMALLINT") != std::string::npos); + "cast '-32769' from STRING to SMALLINT") != std::string::npos); } { std::string src_data = "2147483648"; auto msg = CheckLiteralInvalidResult(cast_executor, FieldType::STRING, src_data, arrow::int32()); ASSERT_TRUE(msg.find("cast literal in StringToNumericPrimitiveCastExecutor failed: cannot " - "cast 2147483648 from STRING to INT") != std::string::npos); + "cast '2147483648' from STRING to INT") != std::string::npos); } { std::string src_data = "-2147483649"; auto msg = CheckLiteralInvalidResult(cast_executor, FieldType::STRING, src_data, arrow::int32()); ASSERT_TRUE(msg.find("cast literal in StringToNumericPrimitiveCastExecutor failed: cannot " - "cast -2147483649 from STRING to INT") != std::string::npos); + "cast '-2147483649' from STRING to INT") != std::string::npos); } { std::string src_data = "9223372036854775808"; auto msg = CheckLiteralInvalidResult(cast_executor, FieldType::STRING, src_data, arrow::int64()); ASSERT_TRUE(msg.find("cast literal in StringToNumericPrimitiveCastExecutor failed: cannot " - "cast 9223372036854775808 from STRING to BIGINT") != + "cast '9223372036854775808' from STRING to BIGINT") != std::string::npos); } { @@ -990,7 +990,7 @@ TEST_F(CastExecutorTest, TestStringToNumericPrimitiveCastExecutorCastLiteral) { auto msg = CheckLiteralInvalidResult(cast_executor, FieldType::STRING, src_data, arrow::int64()); ASSERT_TRUE(msg.find("cast literal in StringToNumericPrimitiveCastExecutor failed: cannot " - "cast -9223372036854775809 from STRING to BIGINT") != + "cast '-9223372036854775809' from STRING to BIGINT") != std::string::npos); } { @@ -1011,14 +1011,14 @@ TEST_F(CastExecutorTest, TestStringToNumericPrimitiveCastExecutorCastLiteral) { auto msg = CheckLiteralInvalidResult(cast_executor, FieldType::STRING, src_data, arrow::int16()); ASSERT_TRUE(msg.find("cast literal in StringToNumericPrimitiveCastExecutor failed: cannot " - "cast from STRING to SMALLINT") != std::string::npos); + "cast '' from STRING to SMALLINT") != std::string::npos); } { std::string src_data = "abc"; auto msg = CheckLiteralInvalidResult(cast_executor, FieldType::STRING, src_data, arrow::int32()); ASSERT_TRUE(msg.find("cast literal in StringToNumericPrimitiveCastExecutor failed: cannot " - "cast abc from STRING to INT") != std::string::npos); + "cast 'abc' from STRING to INT") != std::string::npos); } } @@ -1421,7 +1421,7 @@ TEST_F(CastExecutorTest, TestStringToDateCastExecutorCastLiteral) { std::string src_data = "9223372036854775807"; auto msg = CheckLiteralInvalidResult(cast_executor, FieldType::STRING, src_data, arrow::date32()); - ASSERT_TRUE(msg.find("failed to convert string 9223372036854775807 to date") != + ASSERT_TRUE(msg.find("failed to convert string '9223372036854775807' to date") != std::string::npos); } { @@ -1429,28 +1429,30 @@ TEST_F(CastExecutorTest, TestStringToDateCastExecutorCastLiteral) { std::string src_data = "11970-01-02"; auto msg = CheckLiteralInvalidResult(cast_executor, FieldType::STRING, src_data, arrow::date32()); - ASSERT_TRUE(msg.find("failed to convert string 11970-01-02 to date") != std::string::npos); + ASSERT_TRUE(msg.find("failed to convert string '11970-01-02' to date") != + std::string::npos); } { // invalid date str std::string src_data = "-1970-01-02"; auto msg = CheckLiteralInvalidResult(cast_executor, FieldType::STRING, src_data, arrow::date32()); - ASSERT_TRUE(msg.find("failed to convert string -1970-01-02 to date") != std::string::npos); + ASSERT_TRUE(msg.find("failed to convert string '-1970-01-02' to date") != + std::string::npos); } { // invalid date str std::string src_data = ""; auto msg = CheckLiteralInvalidResult(cast_executor, FieldType::STRING, src_data, arrow::date32()); - ASSERT_TRUE(msg.find("failed to convert string to date") != std::string::npos); + ASSERT_TRUE(msg.find("failed to convert string '' to date") != std::string::npos); } { // invalid date str std::string src_data = "0x1"; auto msg = CheckLiteralInvalidResult(cast_executor, FieldType::STRING, src_data, arrow::date32()); - ASSERT_TRUE(msg.find("failed to convert string 0x1 to date") != std::string::npos); + ASSERT_TRUE(msg.find("failed to convert string '0x1' to date") != std::string::npos); } } diff --git a/src/paimon/core/casting/string_to_boolean_cast_executor.cpp b/src/paimon/core/casting/string_to_boolean_cast_executor.cpp index e8d139e6..ea8ab4dc 100644 --- a/src/paimon/core/casting/string_to_boolean_cast_executor.cpp +++ b/src/paimon/core/casting/string_to_boolean_cast_executor.cpp @@ -52,7 +52,7 @@ Result StringToBooleanCastExecutor::Cast( std::optional bool_value = StringUtils::StringToValue(value); if (bool_value == std::nullopt) { return Status::Invalid(fmt::format( - "StringToBooleanCastExecutor cast failed: STRING {} cannot cast to BOOLEAN", value)); + "StringToBooleanCastExecutor cast failed: STRING '{}' cannot cast to BOOLEAN", value)); } return Literal(bool_value.value()); } @@ -71,7 +71,7 @@ Result> StringToBooleanCastExecutor::Cast( StringUtils::StringToValue(string_array->GetString(i)); if (bool_value == std::nullopt) { return Status::Invalid(fmt::format( - "StringToBooleanCastExecutor cast failed: STRING {} cannot cast to BOOLEAN", + "StringToBooleanCastExecutor cast failed: STRING '{}' cannot cast to BOOLEAN", string_array->GetString(i))); } PAIMON_RETURN_NOT_OK_FROM_ARROW(bool_builder->Append(bool_value.value())); diff --git a/src/paimon/core/casting/string_to_numeric_primitive_cast_executor.cpp b/src/paimon/core/casting/string_to_numeric_primitive_cast_executor.cpp index a7f8fd03..45213bb0 100644 --- a/src/paimon/core/casting/string_to_numeric_primitive_cast_executor.cpp +++ b/src/paimon/core/casting/string_to_numeric_primitive_cast_executor.cpp @@ -71,7 +71,7 @@ Result StringToNumericPrimitiveCastExecutor::CastLiteral(const Literal& if (!success) { return Status::Invalid( fmt::format("cast literal in StringToNumericPrimitiveCastExecutor failed: cannot " - "cast {} from STRING to {}", + "cast '{}' from STRING to {}", value, FieldTypeUtils::FieldTypeToString(target_type))); } return Literal(out); @@ -80,7 +80,7 @@ Result StringToNumericPrimitiveCastExecutor::CastLiteral(const Literal& if (!casted_value) { return Status::Invalid( fmt::format("cast literal in StringToNumericPrimitiveCastExecutor failed: cannot " - "cast {} from STRING to {}", + "cast '{}' from STRING to {}", value, FieldTypeUtils::FieldTypeToString(target_type))); } return Literal(casted_value.value()); diff --git a/src/paimon/core/io/complete_row_tracking_fields_reader.cpp b/src/paimon/core/io/complete_row_tracking_fields_reader.cpp index 92b859ef..d9efd487 100644 --- a/src/paimon/core/io/complete_row_tracking_fields_reader.cpp +++ b/src/paimon/core/io/complete_row_tracking_fields_reader.cpp @@ -53,7 +53,7 @@ Status CompleteRowTrackingFieldsBatchReader::SetReadSchema( int32_t sequence_id_idx = arrow_schema->GetFieldIndex(SpecialFields::SequenceNumber().Name()); if (sequence_id_idx != -1 && file_schema->GetFieldIndex(SpecialFields::SequenceNumber().Name()) == -1) { - // read special fields but file not exist, remove special fields to format reader + // read special fields but file not exist, remove special fields to format reader PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(arrow_schema, arrow_schema->RemoveField(sequence_id_idx)); } ArrowSchema c_schema; diff --git a/src/paimon/core/io/row_to_arrow_array_converter.h b/src/paimon/core/io/row_to_arrow_array_converter.h index 9d8ea932..2a0764af 100644 --- a/src/paimon/core/io/row_to_arrow_array_converter.h +++ b/src/paimon/core/io/row_to_arrow_array_converter.h @@ -31,7 +31,7 @@ #include "paimon/memory/memory_pool.h" #include "paimon/reader/batch_reader.h" namespace paimon { -// convert row T to output R (R maybe BatchReader::ReadBatch or KeyValueBatch) +// convert row T to output R (R maybe BatchReader::ReadBatch or KeyValueBatch) template class RowToArrowArrayConverter { public: diff --git a/src/paimon/format/avro/CMakeLists.txt b/src/paimon/format/avro/CMakeLists.txt index f0d2c48c..dba1bb04 100644 --- a/src/paimon/format/avro/CMakeLists.txt +++ b/src/paimon/format/avro/CMakeLists.txt @@ -50,6 +50,7 @@ if(PAIMON_ENABLE_AVRO) if(PAIMON_BUILD_TESTS) add_paimon_test(avro_format_test SOURCES + avro_direct_encoder_decoder_test.cpp avro_file_batch_reader_test.cpp avro_file_format_test.cpp avro_format_writer_test.cpp diff --git a/src/paimon/format/avro/avro_direct_decoder.cpp b/src/paimon/format/avro/avro_direct_decoder.cpp index 37b016b5..b5bbd1d6 100644 --- a/src/paimon/format/avro/avro_direct_decoder.cpp +++ b/src/paimon/format/avro/avro_direct_decoder.cpp @@ -418,6 +418,7 @@ Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, const auto& branch_node = avro_node->leafAt(branch_index); if (branch_node->type() == ::avro::AVRO_NULL) { + decoder->decodeNull(); PAIMON_RETURN_NOT_OK_FROM_ARROW(array_builder->AppendNull()); return Status::OK(); } else { diff --git a/src/paimon/format/avro/avro_direct_encoder.cpp b/src/paimon/format/avro/avro_direct_encoder.cpp index c906e41f..c2c39ad9 100644 --- a/src/paimon/format/avro/avro_direct_encoder.cpp +++ b/src/paimon/format/avro/avro_direct_encoder.cpp @@ -57,7 +57,8 @@ Result ValidateUnion(const ::avro::NodePtr& union_node) { return UnionBranches{.null_index = 0, .value_index = 1, .value_node = branch_1}; } if (branch_1->type() == ::avro::AVRO_NULL && branch_0->type() != ::avro::AVRO_NULL) { - return UnionBranches{.null_index = 1, .value_index = 0, .value_node = branch_0}; + return Status::Invalid( + "Unexpected: In paimon, we expect the null branch to be the first branch in a union."); } return Status::Invalid("Union must have exactly one null branch"); } @@ -92,10 +93,6 @@ Status AvroDirectEncoder::EncodeArrowToAvro(const ::avro::NodePtr& avro_node, } switch (avro_node->type()) { - case ::avro::AVRO_NULL: - encoder->encodeNull(); - return Status::OK(); - case ::avro::AVRO_BOOL: { const auto& bool_array = arrow::internal::checked_cast(array); @@ -230,9 +227,7 @@ Status AvroDirectEncoder::EncodeArrowToAvro(const ::avro::NodePtr& avro_node, const auto& binary_array = arrow::internal::checked_cast(array); std::string_view value = binary_array.GetView(row_index); - // TODO(jinli.zjw): need to copy to ctx? - ctx->assign(value.begin(), value.end()); - encoder->encodeBytes(ctx->data(), ctx->size()); + encoder->encodeBytes(reinterpret_cast(value.data()), value.size()); return Status::OK(); } @@ -294,7 +289,7 @@ Status AvroDirectEncoder::EncodeArrowToAvro(const ::avro::NodePtr& avro_node, element_node->leaves() != 2)) { return Status::Invalid( fmt::format("Expected AVRO_RECORD for map key-value pair, got {}", - ::avro::toString(element_node->type()))); + AvroUtils::ToString(avro_node))); } const auto& map_array = @@ -366,9 +361,11 @@ Status AvroDirectEncoder::EncodeArrowToAvro(const ::avro::NodePtr& avro_node, return Status::OK(); } + case ::avro::AVRO_NULL: case ::avro::AVRO_UNION: // Already handled above - return Status::Invalid("Unexpected union handling"); + return Status::Invalid(fmt::format("Unexpected Avro type handling: {}", + ::avro::toString(avro_node->type()))); default: return Status::Invalid( fmt::format("Unsupported Avro type: {}", ::avro::toString(avro_node->type()))); diff --git a/src/paimon/format/avro/avro_direct_encoder_decoder_test.cpp b/src/paimon/format/avro/avro_direct_encoder_decoder_test.cpp new file mode 100644 index 00000000..decba368 --- /dev/null +++ b/src/paimon/format/avro/avro_direct_encoder_decoder_test.cpp @@ -0,0 +1,670 @@ +/* + * Copyright 2024-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "arrow/api.h" +#include "arrow/ipc/api.h" +#include "avro/Compiler.hh" +#include "avro/Decoder.hh" +#include "avro/Encoder.hh" +#include "avro/Stream.hh" +#include "avro/ValidSchema.hh" +#include "gtest/gtest.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/format/avro/avro_direct_decoder.h" +#include "paimon/format/avro/avro_direct_encoder.h" +#include "paimon/format/avro/avro_schema_converter.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::avro::test { + +class AvroDirectEncoderDecoderTest : public ::testing::Test { + public: + void SetUp() override {} + void TearDown() override {} + + Result> EncodeData( + const ::avro::NodePtr& avro_node, const std::shared_ptr& input_array) { + auto output_stream = ::avro::memoryOutputStream(); + auto encoder = ::avro::binaryEncoder(); + encoder->init(*output_stream); + + for (int64_t i = 0; i < input_array->length(); ++i) { + PAIMON_RETURN_NOT_OK(AvroDirectEncoder::EncodeArrowToAvro(avro_node, *input_array, i, + encoder.get(), &encode_ctx_)); + } + return output_stream; + } + + Result> DecodeWithEncodedData( + const ::avro::NodePtr& avro_node, std::unique_ptr<::avro::OutputStream>&& encoded_data, + const std::optional>& projection, int32_t expected_count, + arrow::ArrayBuilder* builder) { + auto input_stream = ::avro::memoryInputStream(*encoded_data); + auto decoder = ::avro::binaryDecoder(); + decoder->init(*input_stream); + + for (int32_t i = 0; i < expected_count; ++i) { + PAIMON_RETURN_NOT_OK(AvroDirectDecoder::DecodeAvroToBuilder( + avro_node, projection, decoder.get(), builder, &decode_ctx_)); + } + + std::shared_ptr decoded_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Finish(&decoded_array)); + EXPECT_EQ(decoded_array->length(), expected_count); + return decoded_array; + } + + void CheckResult(const std::string& schema_json, + const std::shared_ptr& input_array, + arrow::ArrayBuilder* builder) { + auto avro_schema = ::avro::compileJsonSchemaFromString(schema_json); + + ASSERT_OK_AND_ASSIGN(auto encoded_data, EncodeData(avro_schema.root(), input_array)); + ASSERT_OK_AND_ASSIGN( + auto decoded_array, + DecodeWithEncodedData(avro_schema.root(), std::move(encoded_data), + /*projection=*/std::nullopt, input_array->length(), builder)); + ASSERT_TRUE(decoded_array->Equals(*input_array)); + } + + Result> GetProjectedArray( + const std::shared_ptr& input_array, + const std::set& projection) { + auto struct_type = input_array->struct_type(); + arrow::FieldVector projected_fields; + projected_fields.reserve(projection.size()); + arrow::ArrayVector projected_field_arrays; + projected_field_arrays.reserve(projection.size()); + for (size_t index : projection) { + if (index >= static_cast(struct_type->num_fields())) { + return Status::Invalid( + fmt::format("Projection index {} out of range for struct with {} fields", index, + struct_type->num_fields())); + } + projected_fields.push_back(struct_type->field(index)); + projected_field_arrays.push_back(input_array->field(index)); + } + auto projected_struct_type = arrow::struct_(projected_fields); + + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + auto projected_array, + arrow::StructArray::Make(projected_field_arrays, projected_fields)); + return projected_array; + } + + void CheckResultWithProjection(const std::shared_ptr& src_array, + const std::set& projection) { + auto src_struct_array = std::dynamic_pointer_cast(src_array); + ASSERT_OK_AND_ASSIGN(auto avro_schema, + AvroSchemaConverter::ArrowSchemaToAvroSchema( + arrow::schema(src_struct_array->struct_type()->fields()))); + ASSERT_OK_AND_ASSIGN(auto encoded_data, EncodeData(avro_schema.root(), src_array)); + + ASSERT_OK_AND_ASSIGN(auto projected_array, GetProjectedArray(src_struct_array, projection)); + auto decoded_array_builder = arrow::MakeBuilder(projected_array->type()).ValueOrDie(); + ASSERT_OK_AND_ASSIGN( + auto decoded_array, + DecodeWithEncodedData(avro_schema.root(), std::move(encoded_data), projection, + src_array->length(), decoded_array_builder.get())); + ASSERT_TRUE(decoded_array->Equals(*projected_array)); + } + + protected: + AvroDirectEncoder::EncodeContext encode_ctx_; + AvroDirectDecoder::DecodeContext decode_ctx_; +}; + +TEST_F(AvroDirectEncoderDecoderTest, TestBooleanType) { + std::string schema_json = R"({"type": "boolean"})"; + arrow::BooleanBuilder builder; + ASSERT_TRUE(builder.Append(true).ok()); + ASSERT_TRUE(builder.Append(false).ok()); + ASSERT_TRUE(builder.Append(true).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestIntegerTypes) { + // Test INT8 + { + std::string schema_json = R"({"type": "int"})"; + arrow::Int8Builder builder; + ASSERT_TRUE(builder.Append(1).ok()); + ASSERT_TRUE(builder.Append(-128).ok()); + ASSERT_TRUE(builder.Append(127).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); + } + + // Test INT32 + { + std::string schema_json = R"({"type": "int"})"; + arrow::Int32Builder builder; + ASSERT_TRUE(builder.Append(42).ok()); + ASSERT_TRUE(builder.Append(-2147483648).ok()); + ASSERT_TRUE(builder.Append(2147483647).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); + } + + // Test INT64 + { + std::string schema_json = R"({"type": "long"})"; + arrow::Int64Builder builder; + ASSERT_TRUE(builder.Append(123456789L).ok()); + ASSERT_TRUE(builder.Append(-9223372036854775807L).ok()); + ASSERT_TRUE(builder.Append(9223372036854775807L).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); + } +} + +TEST_F(AvroDirectEncoderDecoderTest, TestFloatingPointTypes) { + // Test FLOAT + { + std::string schema_json = R"({"type": "float"})"; + arrow::FloatBuilder builder; + ASSERT_TRUE(builder.Append(3.14f).ok()); + ASSERT_TRUE(builder.Append(-2.71f).ok()); + ASSERT_TRUE(builder.Append(0.0f).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); + } + + // Test DOUBLE + { + std::string schema_json = R"({"type": "double"})"; + arrow::DoubleBuilder builder; + ASSERT_TRUE(builder.Append(3.141592653589793).ok()); + ASSERT_TRUE(builder.Append(-2.718281828459045).ok()); + ASSERT_TRUE(builder.Append(0.0).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); + } +} + +TEST_F(AvroDirectEncoderDecoderTest, TestStringType) { + std::string schema_json = R"({"type": "string"})"; + arrow::StringBuilder builder; + ASSERT_TRUE(builder.Append("hello").ok()); + ASSERT_TRUE(builder.Append("world").ok()); + ASSERT_TRUE(builder.Append("").ok()); + ASSERT_TRUE(builder.Append("测试中文").ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestBytesType) { + std::string schema_json = R"({"type": "bytes"})"; + arrow::BinaryBuilder builder; + ASSERT_TRUE(builder.Append("binary_data").ok()); + ASSERT_TRUE(builder.Append(std::string("\x00\x01\x02\x03", 4)).ok()); + ASSERT_TRUE(builder.Append("").ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestDate32Type) { + std::string schema_json = R"({"type": "int", "logicalType": "date"})"; + arrow::Date32Builder builder; + ASSERT_TRUE(builder.Append(18628).ok()); // 2021-01-01 + ASSERT_TRUE(builder.Append(0).ok()); // 1970-01-01 + ASSERT_TRUE(builder.Append(-1).ok()); // 1969-12-31 + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestTimestampType) { + // Test timestamp-millis + { + std::string schema_json = R"({"type": "long", "logicalType": "timestamp-millis"})"; + arrow::TimestampBuilder builder(arrow::timestamp(arrow::TimeUnit::MILLI), + arrow::default_memory_pool()); + ASSERT_TRUE(builder.Append(1609459200123L).ok()); // 2021-01-01 00:00:00.123 + ASSERT_TRUE(builder.Append(0L).ok()); // 1970-01-01 00:00:00 + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); + } + + // Test timestamp-micros + { + std::string schema_json = R"({"type": "long", "logicalType": "timestamp-micros"})"; + arrow::TimestampBuilder builder(arrow::timestamp(arrow::TimeUnit::MICRO), + arrow::default_memory_pool()); + ASSERT_TRUE(builder.Append(1609459200123123L).ok()); // 2021-01-01 00:00:00.123123 + ASSERT_TRUE(builder.Append(0L).ok()); // 1970-01-01 00:00:00 + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); + } +} + +TEST_F(AvroDirectEncoderDecoderTest, TestUnionType) { + // Test nullable int (union of null and int) + std::string schema_json = R"(["null", "int"])"; + arrow::Int32Builder builder; + ASSERT_TRUE(builder.Append(42).ok()); + ASSERT_TRUE(builder.AppendNull().ok()); + ASSERT_TRUE(builder.Append(100).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestRecordType) { + std::string schema_json = R"({ + "type": "record", + "name": "TestRecord", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "name", "type": "string"}, + {"name": "active", "type": "boolean"} + ] + })"; + + // Create struct array + auto int_field = arrow::field("id", arrow::int32()); + auto string_field = arrow::field("name", arrow::utf8()); + auto bool_field = arrow::field("active", arrow::boolean()); + auto struct_type = arrow::struct_({int_field, string_field, bool_field}); + + arrow::StructBuilder struct_builder( + struct_type, arrow::default_memory_pool(), + {std::make_shared(), std::make_shared(), + std::make_shared()}); + + auto int_builder = static_cast(struct_builder.field_builder(0)); + auto string_builder = static_cast(struct_builder.field_builder(1)); + auto bool_builder = static_cast(struct_builder.field_builder(2)); + + // Add first record + ASSERT_TRUE(struct_builder.Append().ok()); + ASSERT_TRUE(int_builder->Append(1).ok()); + ASSERT_TRUE(string_builder->Append("Alice").ok()); + ASSERT_TRUE(bool_builder->Append(true).ok()); + + // Add second record + ASSERT_TRUE(struct_builder.Append().ok()); + ASSERT_TRUE(int_builder->Append(2).ok()); + ASSERT_TRUE(string_builder->Append("Bob").ok()); + ASSERT_TRUE(bool_builder->Append(false).ok()); + + std::shared_ptr input_array; + ASSERT_TRUE(struct_builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &struct_builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestDecodeWithProjection) { + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), + arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int16()), + arrow::field("f3", arrow::int32()), + arrow::field("f4", arrow::int64()), + arrow::field("f5", arrow::float32()), + arrow::field("f6", arrow::float64()), + arrow::field("f7", arrow::utf8()), + arrow::field("f8", arrow::binary()), + arrow::field("f9", arrow::map(arrow::float64(), arrow::float64())), + arrow::field("f10", arrow::map(arrow::utf8(), arrow::utf8())), + arrow::field("f11", arrow::list(arrow::float32())), + arrow::field("f12", arrow::struct_({arrow::field("f0", arrow::boolean()), + arrow::field("f1", arrow::int64())})), + arrow::field("f13", arrow::timestamp(arrow::TimeUnit::MICRO)), + arrow::field("f14", arrow::date32()), + arrow::field("f15", arrow::decimal128(2, 2)), + arrow::field("f16", arrow::decimal128(10, 10)), + arrow::field("f17", arrow::decimal128(19, 19))}; + + std::shared_ptr src_array = + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ + [true, 127, 32767, 2147483647, 9999999999999, 1234.56, 1234567890.0987654321, "aa", "qq", [[1.1,10.1],[2.2,20.2]], [["key1","val1"],["key2","val2"]], [0.1, 0.2], [true, null], "1970-01-01 00:02:03.123123", 2456, "0.22", "0.1234567890", "0.1234567890987654321"], + [false, -128, -32768, -2147483648, -9999999999999, -1234.56, -1234567890.0987654321, null, "ww", [[1.11,10.11],[2.22,20.22]], [["key11","val11"],["key22","val22"]], [-0.1, -0.2, null, 0.3, 0.4], [null, 2], "1970-01-01 00:16:39.999999", null, "-0.22", "-0.1234567890", null], + [null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null] + ])") + .ValueOrDie(); + + // no skip + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}); + // skip bool + CheckResultWithProjection(src_array, + {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}); + // skip int + CheckResultWithProjection(src_array, {0, 1, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}); + // skip long + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}); + // skip float + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}); + // skip double + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}); + // skip string + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}); + // skip binary + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 4, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15, 16, 17}); + // skip map + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 15, 16, 17}); + // skip array-based map + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13, 14, 15, 16, 17}); + // skip list + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 13, 14, 15, 16, 17}); + // skip struct + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 13, 14, 15, 16, 17}); + // skip others + CheckResultWithProjection(src_array, {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}); + // skip null and union is already tested in above test cases +} + +TEST_F(AvroDirectEncoderDecoderTest, TestArrayType) { + std::string schema_json = R"({ + "type": "array", + "items": "int" + })"; + + // Create list array + arrow::ListBuilder list_builder(arrow::default_memory_pool(), + std::make_shared()); + auto int_builder = static_cast(list_builder.value_builder()); + + // First list: [1, 2, 3] + ASSERT_TRUE(list_builder.Append().ok()); + ASSERT_TRUE(int_builder->Append(1).ok()); + ASSERT_TRUE(int_builder->Append(2).ok()); + ASSERT_TRUE(int_builder->Append(3).ok()); + + // Second list: [] + ASSERT_TRUE(list_builder.Append().ok()); + + // Third list: [42] + ASSERT_TRUE(list_builder.Append().ok()); + ASSERT_TRUE(int_builder->Append(42).ok()); + + std::shared_ptr input_array; + ASSERT_TRUE(list_builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &list_builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestMapType) { + std::string schema_json = R"({ + "type": "map", + "values": "string" + })"; + + // Create map array + arrow::MapBuilder map_builder(arrow::default_memory_pool(), + std::make_shared(), + std::make_shared()); + auto key_builder = static_cast(map_builder.key_builder()); + auto value_builder = static_cast(map_builder.item_builder()); + + // First map: {"key1": "value1", "key2": "value2"} + ASSERT_TRUE(map_builder.Append().ok()); + ASSERT_TRUE(key_builder->Append("key1").ok()); + ASSERT_TRUE(value_builder->Append("value1").ok()); + ASSERT_TRUE(key_builder->Append("key2").ok()); + ASSERT_TRUE(value_builder->Append("value2").ok()); + + // Second map: {} + ASSERT_TRUE(map_builder.Append().ok()); + + // Third map: {"single": "entry"} + ASSERT_TRUE(map_builder.Append().ok()); + ASSERT_TRUE(key_builder->Append("single").ok()); + ASSERT_TRUE(value_builder->Append("entry").ok()); + + std::shared_ptr input_array; + ASSERT_TRUE(map_builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &map_builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestArrayBasedMapType) { + std::string schema_json = R"({ + "type" : "array", + "items" : { + "type" : "record", + "name" : "record_f1", + "fields" : [ { + "name" : "key", + "type" : "int" + }, { + "name" : "value", + "type" : "string" + } ] + }, + "logicalType" : "map" + })"; + + // Create map array + arrow::MapBuilder map_builder(arrow::default_memory_pool(), + std::make_shared(), + std::make_shared()); + auto key_builder = static_cast(map_builder.key_builder()); + auto value_builder = static_cast(map_builder.item_builder()); + + // First map: {111: "value1", 222: "value2"} + ASSERT_TRUE(map_builder.Append().ok()); + ASSERT_TRUE(key_builder->Append(111).ok()); + ASSERT_TRUE(value_builder->Append("value1").ok()); + ASSERT_TRUE(key_builder->Append(222).ok()); + ASSERT_TRUE(value_builder->Append("value2").ok()); + + // Second map: {} + ASSERT_TRUE(map_builder.Append().ok()); + + // Third map: {333: "entry"} + ASSERT_TRUE(map_builder.Append().ok()); + ASSERT_TRUE(key_builder->Append(333).ok()); + ASSERT_TRUE(value_builder->Append("entry").ok()); + + std::shared_ptr input_array; + ASSERT_TRUE(map_builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &map_builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestDecimalType) { + std::string schema_json = R"({ + "type": "bytes", + "logicalType": "decimal", + "precision": 10, + "scale": 2 + })"; + + // Create decimal array + auto decimal_type = arrow::decimal128(10, 2); + arrow::Decimal128Builder builder(decimal_type); + + ASSERT_TRUE(builder.Append(arrow::Decimal128("123.45")).ok()); + ASSERT_TRUE(builder.Append(arrow::Decimal128("-67.89")).ok()); + ASSERT_TRUE(builder.Append(arrow::Decimal128("0.00")).ok()); + + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestEncoderErrorCases) { + std::string schema_json = R"({"type": "int"})"; + auto avro_schema = ::avro::compileJsonSchemaFromString(schema_json); + auto output_stream = ::avro::memoryOutputStream(); + auto encoder = ::avro::binaryEncoder(); + encoder->init(*output_stream); + + { + // Test out of bounds row index + arrow::Int32Builder builder; + ASSERT_TRUE(builder.Append(42).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + + ASSERT_NOK_WITH_MSG(AvroDirectEncoder::EncodeArrowToAvro(avro_schema.root(), *input_array, + -1, encoder.get(), &encode_ctx_), + "Row index -1 out of bounds 1"); + ASSERT_NOK_WITH_MSG(AvroDirectEncoder::EncodeArrowToAvro(avro_schema.root(), *input_array, + 1, encoder.get(), &encode_ctx_), + "Row index 1 out of bounds 1"); + } + { + // Test null value in non-nullable field + arrow::Int32Builder nullable_builder; + ASSERT_TRUE(nullable_builder.AppendNull().ok()); + std::shared_ptr nullable_array; + ASSERT_TRUE(nullable_builder.Finish(&nullable_array).ok()); + + ASSERT_NOK_WITH_MSG( + AvroDirectEncoder::EncodeArrowToAvro(avro_schema.root(), *nullable_array, 0, + encoder.get(), &encode_ctx_), + "Null value in non-nullable field"); + } +} + +TEST_F(AvroDirectEncoderDecoderTest, TestDecoderErrorCases) { + std::string schema_json = R"(["null", "int"])"; + auto avro_schema = ::avro::compileJsonSchemaFromString(schema_json); + + // Test with invalid union branch index, branch index 2, but union only has 2 branches (0,1) + std::vector invalid_data = {0x04}; + auto input_stream = ::avro::memoryInputStream(invalid_data.data(), invalid_data.size()); + auto decoder = ::avro::binaryDecoder(); + decoder->init(*input_stream); + + arrow::Int32Builder builder; + ASSERT_NOK_WITH_MSG( + AvroDirectDecoder::DecodeAvroToBuilder(avro_schema.root(), std::nullopt, decoder.get(), + &builder, &decode_ctx_), + "Union branch index 2 out of range [0, 2)"); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestInvalidUnionType) { + auto run = [&](const std::string& schema_json, const std::string& error_msg) { + auto avro_schema = ::avro::compileJsonSchemaFromString(schema_json); + auto output_stream = ::avro::memoryOutputStream(); + auto encoder = ::avro::binaryEncoder(); + encoder->init(*output_stream); + + arrow::Int32Builder builder; + ASSERT_TRUE(builder.Append(42).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + + ASSERT_NOK_WITH_MSG(AvroDirectEncoder::EncodeArrowToAvro(avro_schema.root(), *input_array, + 0, encoder.get(), &encode_ctx_), + error_msg); + }; + // Test union with more than 2 branches + run(R"(["null", "int", "string"])", "Union must have exactly 2 branches, got 3"); + // Test union with null branch not first + run(R"(["int", "null"])", + "Unexpected: In paimon, we expect the null branch to be the first branch in a union."); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestInvalidMapType) { + std::string schema_json = R"({ + "type": "map", + "values": "string" + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(schema_json); + + arrow::MapBuilder map_builder(arrow::default_memory_pool(), + std::make_shared(), + std::make_shared()); + auto key_builder = static_cast(map_builder.key_builder()); + auto value_builder = static_cast(map_builder.item_builder()); + ASSERT_TRUE(map_builder.Append().ok()); + ASSERT_TRUE(key_builder->Append(1).ok()); + ASSERT_TRUE(value_builder->Append("value1").ok()); + std::shared_ptr input_array; + ASSERT_TRUE(map_builder.Finish(&input_array).ok()); + + ASSERT_NOK_WITH_MSG(EncodeData(avro_schema.root(), input_array), + "AVRO_MAP keys must be StringArray, got int32"); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestInvalidArrayBasedMapType) { + std::string schema_json = R"({ + "type" : "array", + "items" : { + "type" : "record", + "name" : "record_f1", + "fields" : [ { + "name" : "key", + "type" : "int" + }, { + "name" : "value", + "type" : "string" + }, { + "name" : "metadata", + "type" : "string" + } ] + }, + "logicalType" : "map" + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(schema_json); + + arrow::MapBuilder map_builder(arrow::default_memory_pool(), + std::make_shared(), + std::make_shared()); + ASSERT_TRUE(map_builder.Append().ok()); + std::shared_ptr input_array; + ASSERT_TRUE(map_builder.Finish(&input_array).ok()); + + ASSERT_NOK_WITH_MSG(EncodeData(avro_schema.root(), input_array), + "Expected AVRO_RECORD for map key-value pair"); +} + +#ifndef NDEBUG +TEST_F(AvroDirectEncoderDecoderTest, TestTypeMismatch) { + // Test string schema with int array (The type-mismatch issue should not occur, so we only + // perform type conversion checks in debug mode.) + std::string schema_json = R"({"type": "string"})"; + auto avro_schema = ::avro::compileJsonSchemaFromString(schema_json); + auto output_stream = ::avro::memoryOutputStream(); + auto encoder = ::avro::binaryEncoder(); + encoder->init(*output_stream); + + arrow::Int32Builder builder; + ASSERT_TRUE(builder.Append(42).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + + ASSERT_THROW(auto status = AvroDirectEncoder::EncodeArrowToAvro( + avro_schema.root(), *input_array, 0, encoder.get(), &encode_ctx_), + std::bad_cast); +} +#endif + +} // namespace paimon::avro::test diff --git a/src/paimon/format/avro/avro_format_writer_test.cpp b/src/paimon/format/avro/avro_format_writer_test.cpp index 67ea461f..e8664c4f 100644 --- a/src/paimon/format/avro/avro_format_writer_test.cpp +++ b/src/paimon/format/avro/avro_format_writer_test.cpp @@ -233,21 +233,21 @@ TEST_F(AvroFormatWriterTest, TestGetEstimateLength) { // add batch first time, 1 row AddRecordBatchOnce(format_writer, struct_type, 1, 0); - ASSERT_OK_AND_ASSIGN(bool reach_targe_size, + ASSERT_OK_AND_ASSIGN(bool reach_target_size, format_writer->ReachTargetSize(/*suggested_check=*/true, /*target_size=*/102400)); - ASSERT_FALSE(reach_targe_size); + ASSERT_FALSE(reach_target_size); // add batch second times, 9998 rows AddRecordBatchOnce(format_writer, struct_type, 9998, 1); - ASSERT_OK_AND_ASSIGN(reach_targe_size, format_writer->ReachTargetSize(/*suggested_check=*/true, - /*target_size=*/102400)); - ASSERT_FALSE(reach_targe_size); + ASSERT_OK_AND_ASSIGN(reach_target_size, format_writer->ReachTargetSize(/*suggested_check=*/true, + /*target_size=*/102400)); + ASSERT_FALSE(reach_target_size); AddRecordBatchOnce(format_writer, struct_type, 100000, 9999); - ASSERT_OK_AND_ASSIGN(reach_targe_size, format_writer->ReachTargetSize(/*suggested_check=*/true, - /*target_size=*/102400)); - ASSERT_TRUE(reach_targe_size); + ASSERT_OK_AND_ASSIGN(reach_target_size, format_writer->ReachTargetSize(/*suggested_check=*/true, + /*target_size=*/102400)); + ASSERT_TRUE(reach_target_size); ASSERT_OK(format_writer->Finish()); } diff --git a/src/paimon/format/avro/avro_input_stream_impl_test.cpp b/src/paimon/format/avro/avro_input_stream_impl_test.cpp index c530b9c9..54342e1d 100644 --- a/src/paimon/format/avro/avro_input_stream_impl_test.cpp +++ b/src/paimon/format/avro/avro_input_stream_impl_test.cpp @@ -110,6 +110,43 @@ TEST(AvroInputStreamImplTest, TestSkip) { ASSERT_FALSE(stream->next(&data, &size)); } +TEST(AvroInputStreamImplTest, TestSkipWithAvailableData) { + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::filesystem::path file_path = dir->Str() + "/file"; + std::ofstream output_file(file_path); + ASSERT_TRUE(output_file.is_open()); + std::string test_data = "abcdefghij"; + output_file << test_data; + output_file.close(); + + size_t buffer_size = 10; + std::shared_ptr fs = std::make_shared(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr in, fs->Open(file_path)); + ASSERT_OK_AND_ASSIGN(auto stream, + AvroInputStreamImpl::Create(in, buffer_size, GetDefaultPool())); + + const uint8_t* data; + size_t size; + // First, load data into buffer by calling next() + ASSERT_TRUE(stream->next(&data, &size)); + ASSERT_EQ(size, 10); + ASSERT_EQ(stream->byteCount(), 10); + + // Now backup some data to make it available in buffer + stream->backup(7); + ASSERT_EQ(stream->byteCount(), 3); + + // Skip 3 bytes from the available buffer data + stream->skip(3); + ASSERT_EQ(stream->byteCount(), 6); + + // Verify we can read the remaining 4 bytes from buffer + ASSERT_TRUE(stream->next(&data, &size)); + ASSERT_EQ(size, 4); + ASSERT_EQ(std::string(reinterpret_cast(data), size), "ghij"); + ASSERT_EQ(stream->byteCount(), 10); +} + TEST(AvroInputStreamImplTest, TestSeek) { auto dir = paimon::test::UniqueTestDirectory::Create(); std::filesystem::path file_path = dir->Str() + "/file"; diff --git a/src/paimon/format/avro/avro_stats_extractor.cpp b/src/paimon/format/avro/avro_stats_extractor.cpp index 6989979e..680c2376 100644 --- a/src/paimon/format/avro/avro_stats_extractor.cpp +++ b/src/paimon/format/avro/avro_stats_extractor.cpp @@ -80,9 +80,9 @@ Result> AvroStatsExtractor::FetchColumnStatistics( case arrow::Type::type::BOOL: return ColumnStats::CreateBooleanColumnStats(std::nullopt, std::nullopt, std::nullopt); case arrow::Type::type::INT8: - return ColumnStats::CreateTinyIntColumnStats(std::nullopt, std::nullopt, std::nullopt); case arrow::Type::type::INT16: - return ColumnStats::CreateSmallIntColumnStats(std::nullopt, std::nullopt, std::nullopt); + return Status::Invalid( + fmt::format("Unexpected: {} type cannot appear in avro files.", type->ToString())); case arrow::Type::type::INT32: return ColumnStats::CreateIntColumnStats(std::nullopt, std::nullopt, std::nullopt); case arrow::Type::type::INT64: diff --git a/src/paimon/format/lance/lance_file_batch_reader.cpp b/src/paimon/format/lance/lance_file_batch_reader.cpp index f0979196..e7da4ed6 100644 --- a/src/paimon/format/lance/lance_file_batch_reader.cpp +++ b/src/paimon/format/lance/lance_file_batch_reader.cpp @@ -29,7 +29,6 @@ LanceFileBatchReader::LanceFileBatchReader(LanceFileReader* file_reader, int32_t num_rows_(num_rows), error_message_(std::move(error_message)), file_reader_(file_reader), - metrics_(std::make_shared()) {} Result> LanceFileBatchReader::Create( diff --git a/src/paimon/format/lance/lance_format_reader_writer_test.cpp b/src/paimon/format/lance/lance_format_reader_writer_test.cpp index e76edc5a..869b62c9 100644 --- a/src/paimon/format/lance/lance_format_reader_writer_test.cpp +++ b/src/paimon/format/lance/lance_format_reader_writer_test.cpp @@ -377,18 +377,18 @@ TEST_F(LanceFileReaderWriterTest, TestReachTargetSize) { std::string file_path = dir->Str() + "/test.lance"; ASSERT_OK_AND_ASSIGN(auto writer, LanceFormatWriter::Create(file_path, schema)); - bool reach_targe_size = false; + bool reach_target_size = false; for (auto& array : src_chunk_array->chunks()) { ArrowArray c_array; ASSERT_TRUE(arrow::ExportArray(*array, &c_array).ok()); ASSERT_OK(writer->AddBatch(&c_array)); - ASSERT_OK_AND_ASSIGN(reach_targe_size, writer->ReachTargetSize(/*suggested_check=*/true, - /*target_size=*/4096)); + ASSERT_OK_AND_ASSIGN(reach_target_size, writer->ReachTargetSize(/*suggested_check=*/true, + /*target_size=*/4096)); } ASSERT_OK(writer->Flush()); ASSERT_OK(writer->Finish()); // test reach targe size - ASSERT_TRUE(reach_targe_size); + ASSERT_TRUE(reach_target_size); auto fs = std::make_shared(); ASSERT_OK_AND_ASSIGN(auto file_status, fs->GetFileStatus(file_path)); ASSERT_GT(file_status->GetLen(), 0); diff --git a/src/paimon/global_index/lucene/lucene_directory_test.cpp b/src/paimon/global_index/lucene/lucene_directory_test.cpp index 6c031ae4..10bd5364 100644 --- a/src/paimon/global_index/lucene/lucene_directory_test.cpp +++ b/src/paimon/global_index/lucene/lucene_directory_test.cpp @@ -32,7 +32,7 @@ class LuceneDirectoryTest : public ::testing::Test, public ::testing::WithParamI TEST_P(LuceneDirectoryTest, TestSimple) { int32_t read_buffer_size = GetParam(); - // write 3 files in a single concact file + // write 3 files in a single concat file std::vector data = {"helloworld", "abcdefg", "paimoncpp"}; auto dir = paimon::test::UniqueTestDirectory::Create("local"); auto lucene_directory = Lucene::FSDirectory::open(LuceneUtils::StringToWstring(dir->Str()), diff --git a/src/paimon/global_index/lumina/lumina_global_index.cpp b/src/paimon/global_index/lumina/lumina_global_index.cpp index 855b389a..6541e7ed 100644 --- a/src/paimon/global_index/lumina/lumina_global_index.cpp +++ b/src/paimon/global_index/lumina/lumina_global_index.cpp @@ -268,7 +268,7 @@ Status LuminaIndexWriter::AddBatch(::ArrowArray* arrow_array) { } if (value_array->length() != field_length * dimension_) { return Status::Invalid(fmt::format( - "invalid input array in LuminaIndexWriter, length of field array [{}] multiplied " + "invalid input array in LuminaIndexWriter, length of field array [{}] multiplied " "dimension [{}] must match length of field value array [{}]", field_length, dimension_, value_array->length())); } diff --git a/src/paimon/global_index/lumina/lumina_global_index_test.cpp b/src/paimon/global_index/lumina/lumina_global_index_test.cpp index 6422a1fd..a2eb95db 100644 --- a/src/paimon/global_index/lumina/lumina_global_index_test.cpp +++ b/src/paimon/global_index/lumina/lumina_global_index_test.cpp @@ -303,7 +303,7 @@ TEST_F(LuminaGlobalIndexTest, TestInvalidInputs) { .ValueOrDie(); ASSERT_NOK_WITH_MSG( WriteGlobalIndex(index_root, data_type_, options_, array, Range(0, 2)), - "invalid input array in LuminaIndexWriter, length of field array [2] multiplied " + "invalid input array in LuminaIndexWriter, length of field array [2] multiplied " "dimension [4] must match length of field value array [7]"); } diff --git a/test/inte/scan_inte_test.cpp b/test/inte/scan_inte_test.cpp index 9cda9248..b2ed08dd 100644 --- a/test/inte/scan_inte_test.cpp +++ b/test/inte/scan_inte_test.cpp @@ -1282,7 +1282,7 @@ TEST_F(ScanInteTest, TestScanAppendComplexDataWithSnapshot4WithPredicateFilter2) auto predicate1 = PredicateBuilder::GreaterThan(/*field_index=*/2, /*field_name=*/"f3", FieldType::DATE, Literal(FieldType::DATE, 0)); // BINARY does not have stats in manifest, min/max in value stats is null - // if row_count != null_count and min/max is null, file will not be filtered + // if row_count != null_count and min/max is null, file will not be filtered auto predicate2 = PredicateBuilder::GreaterThan( /*field_index=*/5, /*field_name=*/"f6", FieldType::BINARY, Literal(FieldType::BINARY, "zoo", 3));