diff --git a/be/src/clucene b/be/src/clucene new file mode 160000 index 00000000000000..6046125ce7f9bb --- /dev/null +++ b/be/src/clucene @@ -0,0 +1 @@ +Subproject commit 6046125ce7f9bb8631427ff8ee1ce93e07edb421 diff --git a/be/src/format/table/iceberg_delete_file_reader_helper.cpp b/be/src/format/table/iceberg_delete_file_reader_helper.cpp index 2e7045c81ad551..752cd484e9637e 100644 --- a/be/src/format/table/iceberg_delete_file_reader_helper.cpp +++ b/be/src/format/table/iceberg_delete_file_reader_helper.cpp @@ -29,8 +29,10 @@ #include "core/block/block.h" #include "core/block/column_with_type_and_name.h" #include "core/column/column_dictionary.h" +#include "core/column/column_nullable.h" #include "core/column/column_string.h" #include "core/column/column_vector.h" +#include "core/data_type/data_type_nullable.h" #include "core/data_type/data_type_number.h" #include "core/data_type/data_type_string.h" #include "exec/common/endian.h" @@ -114,6 +116,26 @@ Status visit_position_delete_block(const Block& block, size_t read_rows, return Status::InternalError("Unsupported file_path column type in position delete block"); } +Status unnest_position_delete_column(Block* block, const std::string& column_name) { + int column_pos = block->get_position_by_name(column_name); + if (column_pos < 0) { + return Status::InternalError("Position delete block is missing required column {}", + column_name); + } + + auto& column = block->get_by_position(static_cast(column_pos)); + auto* nullable_column = check_and_get_column(column.column.get()); + if (nullable_column == nullptr) { + return Status::OK(); + } + if (nullable_column->has_null()) { + return Status::Corruption("Iceberg position delete column {} has null values", column_name); + } + column.column = nullable_column->get_nested_column_ptr(); + column.type = remove_nullable(column.type); + return Status::OK(); +} + Status init_parquet_delete_reader(ParquetReader* reader, bool* dictionary_coded) { if (reader == nullptr || dictionary_coded == nullptr) { return Status::InvalidArgument("invalid parquet delete reader arguments"); @@ -180,6 +202,12 @@ Status decode_deletion_vector_buffer(const char* buf, size_t buffer_size, } // namespace +Status unnest_iceberg_position_delete_block(Block* block) { + RETURN_IF_ERROR(unnest_position_delete_column(block, ICEBERG_FILE_PATH)); + RETURN_IF_ERROR(unnest_position_delete_column(block, ICEBERG_ROW_POS)); + return Status::OK(); +} + IcebergDeleteFileIOContext::IcebergDeleteFileIOContext(RuntimeState* state) { io_ctx.file_cache_stats = &file_cache_stats; io_ctx.file_reader_stats = &file_reader_stats; @@ -244,17 +272,17 @@ Status read_iceberg_position_delete_file(const TIcebergDeleteFileDesc& delete_fi Block block; if (dictionary_coded) { block.insert(ColumnWithTypeAndName( - ColumnDictI32::create(FieldType::OLAP_FIELD_TYPE_VARCHAR), - std::make_shared(), ICEBERG_FILE_PATH)); + make_nullable(ColumnDictI32::create(FieldType::OLAP_FIELD_TYPE_VARCHAR)), + make_nullable(std::make_shared()), ICEBERG_FILE_PATH)); } else { - block.insert(ColumnWithTypeAndName(ColumnString::create(), - std::make_shared(), - ICEBERG_FILE_PATH)); + block.insert(ColumnWithTypeAndName( + make_nullable(std::make_shared()), ICEBERG_FILE_PATH)); } - block.insert(ColumnWithTypeAndName(ColumnInt64::create(), - std::make_shared(), ICEBERG_ROW_POS)); + block.insert(ColumnWithTypeAndName(make_nullable(std::make_shared()), + ICEBERG_ROW_POS)); size_t read_rows = 0; RETURN_IF_ERROR(reader.get_next_block(&block, &read_rows, &eof)); + RETURN_IF_ERROR(unnest_iceberg_position_delete_block(&block)); RETURN_IF_ERROR(visit_position_delete_block(block, read_rows, visitor)); } return Status::OK(); diff --git a/be/src/format/table/iceberg_delete_file_reader_helper.h b/be/src/format/table/iceberg_delete_file_reader_helper.h index a6771212851ae8..863969ec8ca368 100644 --- a/be/src/format/table/iceberg_delete_file_reader_helper.h +++ b/be/src/format/table/iceberg_delete_file_reader_helper.h @@ -31,6 +31,7 @@ namespace doris { +class Block; class FileMetaCache; class RuntimeProfile; class RuntimeState; @@ -67,6 +68,8 @@ TFileRangeDesc build_iceberg_delete_file_range(const std::string& path); bool is_iceberg_deletion_vector(const TIcebergDeleteFileDesc& delete_file); +Status unnest_iceberg_position_delete_block(Block* block); + Status read_iceberg_position_delete_file(const TIcebergDeleteFileDesc& delete_file, const IcebergDeleteFileReaderOptions& options, IcebergPositionDeleteVisitor* visitor); diff --git a/be/src/format/table/iceberg_reader.cpp b/be/src/format/table/iceberg_reader.cpp index 7a74431a05851b..27a50c86ca635a 100644 --- a/be/src/format/table/iceberg_reader.cpp +++ b/be/src/format/table/iceberg_reader.cpp @@ -40,6 +40,7 @@ #include "core/column/column_string.h" #include "core/column/column_vector.h" #include "core/data_type/data_type_factory.hpp" +#include "core/data_type/data_type_nullable.h" #include "core/data_type/define_primitive_type.h" #include "core/data_type/primitive_type.h" #include "core/string_ref.h" @@ -52,6 +53,7 @@ #include "format/table/deletion_vector_reader.h" #include "format/table/iceberg/iceberg_orc_nested_column_utils.h" #include "format/table/iceberg/iceberg_parquet_nested_column_utils.h" +#include "format/table/iceberg_delete_file_reader_helper.h" #include "format/table/nested_column_access_helper.h" #include "format/table/table_schema_change_helper.h" #include "runtime/runtime_state.h" @@ -392,17 +394,26 @@ Status IcebergParquetReader::_read_position_delete_file(const TFileRangeDesc* de } DataTypePtr data_type_file_path {new DataTypeString}; DataTypePtr data_type_pos {new DataTypeInt64}; + DataTypePtr nullable_data_type_file_path = make_nullable(data_type_file_path); + DataTypePtr nullable_data_type_pos = make_nullable(data_type_pos); bool eof = false; while (!eof) { - Block block = {dictionary_coded - ? ColumnWithTypeAndName {ColumnDictI32::create( - FieldType::OLAP_FIELD_TYPE_VARCHAR), - data_type_file_path, ICEBERG_FILE_PATH} - : ColumnWithTypeAndName {data_type_file_path, ICEBERG_FILE_PATH}, - - {data_type_pos, ICEBERG_ROW_POS}}; + Block block = { + dictionary_coded + ? ColumnWithTypeAndName {ColumnDictI32::create( + FieldType::OLAP_FIELD_TYPE_VARCHAR), + nullable_data_type_file_path, ICEBERG_FILE_PATH} + : ColumnWithTypeAndName {nullable_data_type_file_path, ICEBERG_FILE_PATH}, + + {nullable_data_type_pos, ICEBERG_ROW_POS}}; + if (dictionary_coded) { + block.replace_by_position( + ICEBERG_FILE_PATH_INDEX, + make_nullable(block.get_by_position(ICEBERG_FILE_PATH_INDEX).column)); + } size_t read_rows = 0; RETURN_IF_ERROR(parquet_delete_reader.get_next_block(&block, &read_rows, &eof)); + RETURN_IF_ERROR(unnest_iceberg_position_delete_block(&block)); if (read_rows <= 0) { break; diff --git a/be/test/exec/test_data/iceberg_optional_position_delete_parquet/optional_non_null_position_delete.parquet b/be/test/exec/test_data/iceberg_optional_position_delete_parquet/optional_non_null_position_delete.parquet new file mode 100644 index 00000000000000..1cb9e9b8aebe49 Binary files /dev/null and b/be/test/exec/test_data/iceberg_optional_position_delete_parquet/optional_non_null_position_delete.parquet differ diff --git a/be/test/exec/test_data/iceberg_optional_position_delete_parquet/optional_null_position_delete.parquet b/be/test/exec/test_data/iceberg_optional_position_delete_parquet/optional_null_position_delete.parquet new file mode 100644 index 00000000000000..19b8d460ae7d61 Binary files /dev/null and b/be/test/exec/test_data/iceberg_optional_position_delete_parquet/optional_null_position_delete.parquet differ diff --git a/be/test/format/table/iceberg/iceberg_delete_file_reader_helper_test.cpp b/be/test/format/table/iceberg/iceberg_delete_file_reader_helper_test.cpp index 658d3f70d0aca2..390016b3b91f2b 100644 --- a/be/test/format/table/iceberg/iceberg_delete_file_reader_helper_test.cpp +++ b/be/test/format/table/iceberg/iceberg_delete_file_reader_helper_test.cpp @@ -34,6 +34,12 @@ namespace { constexpr const char* kMixedPositionDeleteFile = "./be/test/exec/test_data/iceberg_mixed_position_delete_parquet/" "mixed_encoding_position_delete.parquet"; +constexpr const char* kOptionalNonNullPositionDeleteFile = + "./be/test/exec/test_data/iceberg_optional_position_delete_parquet/" + "optional_non_null_position_delete.parquet"; +constexpr const char* kOptionalNullPositionDeleteFile = + "./be/test/exec/test_data/iceberg_optional_position_delete_parquet/" + "optional_null_position_delete.parquet"; constexpr const char* kTargetDataFilePath = "s3://warehouse/wh/test_db/000_target_data_file.parquet"; @@ -107,4 +113,68 @@ TEST(IcebergDeleteFileReaderHelperTest, ReadMixedEncodingParquetPositionDeleteFi EXPECT_EQ(it->second, expected_positions); } +TEST(IcebergDeleteFileReaderHelperTest, ReadOptionalParquetPositionDeleteFileWithoutNulls) { + RuntimeProfile profile("test_profile"); + RuntimeState runtime_state((TQueryOptions()), TQueryGlobals()); + FileMetaCache meta_cache(1024); + IcebergDeleteFileIOContext io_context(&runtime_state); + + TFileScanRangeParams scan_params; + scan_params.file_type = TFileType::FILE_LOCAL; + scan_params.format_type = TFileFormatType::FORMAT_PARQUET; + + TIcebergDeleteFileDesc delete_file; + delete_file.path = kOptionalNonNullPositionDeleteFile; + delete_file.file_format = TFileFormatType::FORMAT_PARQUET; + delete_file.__isset.file_format = true; + + IcebergDeleteFileReaderOptions options; + options.state = &runtime_state; + options.profile = &profile; + options.scan_params = &scan_params; + options.io_ctx = &io_context.io_ctx; + options.meta_cache = &meta_cache; + options.batch_size = 1024; + + CollectPositionDeleteVisitor visitor; + auto st = read_iceberg_position_delete_file(delete_file, options, &visitor); + ASSERT_TRUE(st.ok()) << st; + ASSERT_EQ(visitor.total_rows, 4); + + const auto it = visitor.delete_rows.find(kTargetDataFilePath); + ASSERT_NE(it, visitor.delete_rows.end()); + + const std::vector expected_positions = {0, 2, 4}; + EXPECT_EQ(it->second, expected_positions); +} + +TEST(IcebergDeleteFileReaderHelperTest, RejectOptionalParquetPositionDeleteFileWithNulls) { + RuntimeProfile profile("test_profile"); + RuntimeState runtime_state((TQueryOptions()), TQueryGlobals()); + FileMetaCache meta_cache(1024); + IcebergDeleteFileIOContext io_context(&runtime_state); + + TFileScanRangeParams scan_params; + scan_params.file_type = TFileType::FILE_LOCAL; + scan_params.format_type = TFileFormatType::FORMAT_PARQUET; + + TIcebergDeleteFileDesc delete_file; + delete_file.path = kOptionalNullPositionDeleteFile; + delete_file.file_format = TFileFormatType::FORMAT_PARQUET; + delete_file.__isset.file_format = true; + + IcebergDeleteFileReaderOptions options; + options.state = &runtime_state; + options.profile = &profile; + options.scan_params = &scan_params; + options.io_ctx = &io_context.io_ctx; + options.meta_cache = &meta_cache; + options.batch_size = 1024; + + CollectPositionDeleteVisitor visitor; + auto st = read_iceberg_position_delete_file(delete_file, options, &visitor); + ASSERT_FALSE(st.ok()); + EXPECT_NE(st.to_string().find("has null values"), std::string::npos) << st; +} + } // namespace doris