From d02e6b2b8a2adff0a6a30b58bc5a59d50bffdfc4 Mon Sep 17 00:00:00 2001 From: daidai Date: Fri, 22 May 2026 18:21:51 +0800 Subject: [PATCH] [fix](be) Handle optional Iceberg position delete columns without nulls --- be/src/clucene | 1 + .../iceberg_delete_file_reader_helper.cpp | 42 +++++++++-- .../table/iceberg_delete_file_reader_helper.h | 3 + be/src/format/table/iceberg_reader.cpp | 25 +++++-- .../optional_non_null_position_delete.parquet | Bin 0 -> 1020 bytes .../optional_null_position_delete.parquet | Bin 0 -> 994 bytes ...iceberg_delete_file_reader_helper_test.cpp | 70 ++++++++++++++++++ 7 files changed, 127 insertions(+), 14 deletions(-) create mode 160000 be/src/clucene create mode 100644 be/test/exec/test_data/iceberg_optional_position_delete_parquet/optional_non_null_position_delete.parquet create mode 100644 be/test/exec/test_data/iceberg_optional_position_delete_parquet/optional_null_position_delete.parquet diff --git a/be/src/clucene b/be/src/clucene new file mode 160000 index 00000000000000..6046125ce7f9bb --- /dev/null +++ b/be/src/clucene @@ -0,0 +1 @@ +Subproject commit 6046125ce7f9bb8631427ff8ee1ce93e07edb421 diff --git a/be/src/format/table/iceberg_delete_file_reader_helper.cpp b/be/src/format/table/iceberg_delete_file_reader_helper.cpp index 2e7045c81ad551..752cd484e9637e 100644 --- a/be/src/format/table/iceberg_delete_file_reader_helper.cpp +++ b/be/src/format/table/iceberg_delete_file_reader_helper.cpp @@ -29,8 +29,10 @@ #include "core/block/block.h" #include "core/block/column_with_type_and_name.h" #include "core/column/column_dictionary.h" +#include "core/column/column_nullable.h" #include "core/column/column_string.h" #include "core/column/column_vector.h" +#include "core/data_type/data_type_nullable.h" #include "core/data_type/data_type_number.h" #include "core/data_type/data_type_string.h" #include "exec/common/endian.h" @@ -114,6 +116,26 @@ Status visit_position_delete_block(const Block& block, size_t read_rows, return Status::InternalError("Unsupported file_path column type in position delete block"); } +Status unnest_position_delete_column(Block* block, const std::string& column_name) { + int column_pos = block->get_position_by_name(column_name); + if (column_pos < 0) { + return Status::InternalError("Position delete block is missing required column {}", + column_name); + } + + auto& column = block->get_by_position(static_cast(column_pos)); + auto* nullable_column = check_and_get_column(column.column.get()); + if (nullable_column == nullptr) { + return Status::OK(); + } + if (nullable_column->has_null()) { + return Status::Corruption("Iceberg position delete column {} has null values", column_name); + } + column.column = nullable_column->get_nested_column_ptr(); + column.type = remove_nullable(column.type); + return Status::OK(); +} + Status init_parquet_delete_reader(ParquetReader* reader, bool* dictionary_coded) { if (reader == nullptr || dictionary_coded == nullptr) { return Status::InvalidArgument("invalid parquet delete reader arguments"); @@ -180,6 +202,12 @@ Status decode_deletion_vector_buffer(const char* buf, size_t buffer_size, } // namespace +Status unnest_iceberg_position_delete_block(Block* block) { + RETURN_IF_ERROR(unnest_position_delete_column(block, ICEBERG_FILE_PATH)); + RETURN_IF_ERROR(unnest_position_delete_column(block, ICEBERG_ROW_POS)); + return Status::OK(); +} + IcebergDeleteFileIOContext::IcebergDeleteFileIOContext(RuntimeState* state) { io_ctx.file_cache_stats = &file_cache_stats; io_ctx.file_reader_stats = &file_reader_stats; @@ -244,17 +272,17 @@ Status read_iceberg_position_delete_file(const TIcebergDeleteFileDesc& delete_fi Block block; if (dictionary_coded) { block.insert(ColumnWithTypeAndName( - ColumnDictI32::create(FieldType::OLAP_FIELD_TYPE_VARCHAR), - std::make_shared(), ICEBERG_FILE_PATH)); + make_nullable(ColumnDictI32::create(FieldType::OLAP_FIELD_TYPE_VARCHAR)), + make_nullable(std::make_shared()), ICEBERG_FILE_PATH)); } else { - block.insert(ColumnWithTypeAndName(ColumnString::create(), - std::make_shared(), - ICEBERG_FILE_PATH)); + block.insert(ColumnWithTypeAndName( + make_nullable(std::make_shared()), ICEBERG_FILE_PATH)); } - block.insert(ColumnWithTypeAndName(ColumnInt64::create(), - std::make_shared(), ICEBERG_ROW_POS)); + block.insert(ColumnWithTypeAndName(make_nullable(std::make_shared()), + ICEBERG_ROW_POS)); size_t read_rows = 0; RETURN_IF_ERROR(reader.get_next_block(&block, &read_rows, &eof)); + RETURN_IF_ERROR(unnest_iceberg_position_delete_block(&block)); RETURN_IF_ERROR(visit_position_delete_block(block, read_rows, visitor)); } return Status::OK(); diff --git a/be/src/format/table/iceberg_delete_file_reader_helper.h b/be/src/format/table/iceberg_delete_file_reader_helper.h index a6771212851ae8..863969ec8ca368 100644 --- a/be/src/format/table/iceberg_delete_file_reader_helper.h +++ b/be/src/format/table/iceberg_delete_file_reader_helper.h @@ -31,6 +31,7 @@ namespace doris { +class Block; class FileMetaCache; class RuntimeProfile; class RuntimeState; @@ -67,6 +68,8 @@ TFileRangeDesc build_iceberg_delete_file_range(const std::string& path); bool is_iceberg_deletion_vector(const TIcebergDeleteFileDesc& delete_file); +Status unnest_iceberg_position_delete_block(Block* block); + Status read_iceberg_position_delete_file(const TIcebergDeleteFileDesc& delete_file, const IcebergDeleteFileReaderOptions& options, IcebergPositionDeleteVisitor* visitor); diff --git a/be/src/format/table/iceberg_reader.cpp b/be/src/format/table/iceberg_reader.cpp index 7a74431a05851b..27a50c86ca635a 100644 --- a/be/src/format/table/iceberg_reader.cpp +++ b/be/src/format/table/iceberg_reader.cpp @@ -40,6 +40,7 @@ #include "core/column/column_string.h" #include "core/column/column_vector.h" #include "core/data_type/data_type_factory.hpp" +#include "core/data_type/data_type_nullable.h" #include "core/data_type/define_primitive_type.h" #include "core/data_type/primitive_type.h" #include "core/string_ref.h" @@ -52,6 +53,7 @@ #include "format/table/deletion_vector_reader.h" #include "format/table/iceberg/iceberg_orc_nested_column_utils.h" #include "format/table/iceberg/iceberg_parquet_nested_column_utils.h" +#include "format/table/iceberg_delete_file_reader_helper.h" #include "format/table/nested_column_access_helper.h" #include "format/table/table_schema_change_helper.h" #include "runtime/runtime_state.h" @@ -392,17 +394,26 @@ Status IcebergParquetReader::_read_position_delete_file(const TFileRangeDesc* de } DataTypePtr data_type_file_path {new DataTypeString}; DataTypePtr data_type_pos {new DataTypeInt64}; + DataTypePtr nullable_data_type_file_path = make_nullable(data_type_file_path); + DataTypePtr nullable_data_type_pos = make_nullable(data_type_pos); bool eof = false; while (!eof) { - Block block = {dictionary_coded - ? ColumnWithTypeAndName {ColumnDictI32::create( - FieldType::OLAP_FIELD_TYPE_VARCHAR), - data_type_file_path, ICEBERG_FILE_PATH} - : ColumnWithTypeAndName {data_type_file_path, ICEBERG_FILE_PATH}, - - {data_type_pos, ICEBERG_ROW_POS}}; + Block block = { + dictionary_coded + ? ColumnWithTypeAndName {ColumnDictI32::create( + FieldType::OLAP_FIELD_TYPE_VARCHAR), + nullable_data_type_file_path, ICEBERG_FILE_PATH} + : ColumnWithTypeAndName {nullable_data_type_file_path, ICEBERG_FILE_PATH}, + + {nullable_data_type_pos, ICEBERG_ROW_POS}}; + if (dictionary_coded) { + block.replace_by_position( + ICEBERG_FILE_PATH_INDEX, + make_nullable(block.get_by_position(ICEBERG_FILE_PATH_INDEX).column)); + } size_t read_rows = 0; RETURN_IF_ERROR(parquet_delete_reader.get_next_block(&block, &read_rows, &eof)); + RETURN_IF_ERROR(unnest_iceberg_position_delete_block(&block)); if (read_rows <= 0) { break; diff --git a/be/test/exec/test_data/iceberg_optional_position_delete_parquet/optional_non_null_position_delete.parquet b/be/test/exec/test_data/iceberg_optional_position_delete_parquet/optional_non_null_position_delete.parquet new file mode 100644 index 0000000000000000000000000000000000000000..1cb9e9b8aebe4905248847ca5feef729aa95079d GIT binary patch literal 1020 zcmc&z&1w@-6h8Cga4RW<;=RnkEV8geg)}p1P1KNrcVZjF6q`U5+zgXAO$utpnW+PP z1lKNH`UG9O^eJ4r5E1bSTzbwVqlIe0m3MLe&v(Cbxc5Mgnq22!75-KkpaE+D>9;nZ z-0rw;9wyOjev(FRK6A4u&4Q<6*Yms}3zKPtUYLc!(ebmW5r@gklPGJQcED~1^K2F+ zfqw@$@HH;#obf6*xM8MHB%SU^WvD58Cho4;wBtgJBEvIGQd{2_|jtJfFzsoEFdy47ugX{3``^gQG`@XESs zuF%SYILv12FtD+O>loGIc?yDAwpDk)<`t_2M%6l3%vHg}J6(L#oDYh_{=+h@I=vCD z(t@g}y5dZ;49fx93X#^Aq-mCd?sr9eR-7~4`J~zZu*C}(-6(GnuMNQ*Cf;8G%Qh== zGJbTsMjc_H=TXy;p{+}Q!Mw)}yclQ`ZClVqOPUPVcIJbzK!YBs!vXVvkg N(PMjq&+`EP{ZE2$=s<)1dqUh0|#d8235d?UYNt|?EHN5?dFL>d+)1PC)n!~rkn2*l39uzi zX*ykft9v^n>y~vyea+QL8=9sA5pm_lh!#Fst55z)IDzGNzE)!uu^M#-fP$^Rcqv-< z5t?S`QrP`Hu*`hEh%X{3KsGmZYmG-UO`~G52_uKw3Gy20G6yAW$8LMz$fmsqX3M^j zEOjN+2Sa@{y!V>d_#Z>G`3gbO0D*2&hrT+uP21CXI!szzljgHjbU$nAi{||>yibPn z4@3N;&t2*Qg1V@0LJ6;--rfP*v6?uUJbBiu$7_Ape3tvjGOje@W5g1}1P>7t93+)D z3=xmm8X!L6mG-NZc5H@KocxsehCm;XaWFXGeOR)tOg3t$u$=84F&@<=oKoRc;gd>_ zF&uMzh&acG>>}`AAmfvYZ(X#aPW;#$K2K-Er>FDbVGwa09qnc*;{HBZ?)6XGo~P+{ hlx54(y^6E^Z28i?x6=taf!(H;#*ghCUDh%E?w|El;&cE2 literal 0 HcmV?d00001 diff --git a/be/test/format/table/iceberg/iceberg_delete_file_reader_helper_test.cpp b/be/test/format/table/iceberg/iceberg_delete_file_reader_helper_test.cpp index 658d3f70d0aca2..390016b3b91f2b 100644 --- a/be/test/format/table/iceberg/iceberg_delete_file_reader_helper_test.cpp +++ b/be/test/format/table/iceberg/iceberg_delete_file_reader_helper_test.cpp @@ -34,6 +34,12 @@ namespace { constexpr const char* kMixedPositionDeleteFile = "./be/test/exec/test_data/iceberg_mixed_position_delete_parquet/" "mixed_encoding_position_delete.parquet"; +constexpr const char* kOptionalNonNullPositionDeleteFile = + "./be/test/exec/test_data/iceberg_optional_position_delete_parquet/" + "optional_non_null_position_delete.parquet"; +constexpr const char* kOptionalNullPositionDeleteFile = + "./be/test/exec/test_data/iceberg_optional_position_delete_parquet/" + "optional_null_position_delete.parquet"; constexpr const char* kTargetDataFilePath = "s3://warehouse/wh/test_db/000_target_data_file.parquet"; @@ -107,4 +113,68 @@ TEST(IcebergDeleteFileReaderHelperTest, ReadMixedEncodingParquetPositionDeleteFi EXPECT_EQ(it->second, expected_positions); } +TEST(IcebergDeleteFileReaderHelperTest, ReadOptionalParquetPositionDeleteFileWithoutNulls) { + RuntimeProfile profile("test_profile"); + RuntimeState runtime_state((TQueryOptions()), TQueryGlobals()); + FileMetaCache meta_cache(1024); + IcebergDeleteFileIOContext io_context(&runtime_state); + + TFileScanRangeParams scan_params; + scan_params.file_type = TFileType::FILE_LOCAL; + scan_params.format_type = TFileFormatType::FORMAT_PARQUET; + + TIcebergDeleteFileDesc delete_file; + delete_file.path = kOptionalNonNullPositionDeleteFile; + delete_file.file_format = TFileFormatType::FORMAT_PARQUET; + delete_file.__isset.file_format = true; + + IcebergDeleteFileReaderOptions options; + options.state = &runtime_state; + options.profile = &profile; + options.scan_params = &scan_params; + options.io_ctx = &io_context.io_ctx; + options.meta_cache = &meta_cache; + options.batch_size = 1024; + + CollectPositionDeleteVisitor visitor; + auto st = read_iceberg_position_delete_file(delete_file, options, &visitor); + ASSERT_TRUE(st.ok()) << st; + ASSERT_EQ(visitor.total_rows, 4); + + const auto it = visitor.delete_rows.find(kTargetDataFilePath); + ASSERT_NE(it, visitor.delete_rows.end()); + + const std::vector expected_positions = {0, 2, 4}; + EXPECT_EQ(it->second, expected_positions); +} + +TEST(IcebergDeleteFileReaderHelperTest, RejectOptionalParquetPositionDeleteFileWithNulls) { + RuntimeProfile profile("test_profile"); + RuntimeState runtime_state((TQueryOptions()), TQueryGlobals()); + FileMetaCache meta_cache(1024); + IcebergDeleteFileIOContext io_context(&runtime_state); + + TFileScanRangeParams scan_params; + scan_params.file_type = TFileType::FILE_LOCAL; + scan_params.format_type = TFileFormatType::FORMAT_PARQUET; + + TIcebergDeleteFileDesc delete_file; + delete_file.path = kOptionalNullPositionDeleteFile; + delete_file.file_format = TFileFormatType::FORMAT_PARQUET; + delete_file.__isset.file_format = true; + + IcebergDeleteFileReaderOptions options; + options.state = &runtime_state; + options.profile = &profile; + options.scan_params = &scan_params; + options.io_ctx = &io_context.io_ctx; + options.meta_cache = &meta_cache; + options.batch_size = 1024; + + CollectPositionDeleteVisitor visitor; + auto st = read_iceberg_position_delete_file(delete_file, options, &visitor); + ASSERT_FALSE(st.ok()); + EXPECT_NE(st.to_string().find("has null values"), std::string::npos) << st; +} + } // namespace doris