Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/clucene
Submodule clucene added at 604612
42 changes: 35 additions & 7 deletions be/src/format/table/iceberg_delete_file_reader_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
#include "core/block/block.h"
#include "core/block/column_with_type_and_name.h"
#include "core/column/column_dictionary.h"
#include "core/column/column_nullable.h"
#include "core/column/column_string.h"
#include "core/column/column_vector.h"
#include "core/data_type/data_type_nullable.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/data_type_string.h"
#include "exec/common/endian.h"
Expand Down Expand Up @@ -114,6 +116,26 @@ Status visit_position_delete_block(const Block& block, size_t read_rows,
return Status::InternalError("Unsupported file_path column type in position delete block");
}

Status unnest_position_delete_column(Block* block, const std::string& column_name) {
int column_pos = block->get_position_by_name(column_name);
if (column_pos < 0) {
return Status::InternalError("Position delete block is missing required column {}",
column_name);
}

auto& column = block->get_by_position(static_cast<size_t>(column_pos));
auto* nullable_column = check_and_get_column<ColumnNullable>(column.column.get());
if (nullable_column == nullptr) {
return Status::OK();
}
if (nullable_column->has_null()) {
return Status::Corruption("Iceberg position delete column {} has null values", column_name);
}
column.column = nullable_column->get_nested_column_ptr();
column.type = remove_nullable(column.type);
return Status::OK();
}

Status init_parquet_delete_reader(ParquetReader* reader, bool* dictionary_coded) {
if (reader == nullptr || dictionary_coded == nullptr) {
return Status::InvalidArgument("invalid parquet delete reader arguments");
Expand Down Expand Up @@ -180,6 +202,12 @@ Status decode_deletion_vector_buffer(const char* buf, size_t buffer_size,

} // namespace

Status unnest_iceberg_position_delete_block(Block* block) {
RETURN_IF_ERROR(unnest_position_delete_column(block, ICEBERG_FILE_PATH));
RETURN_IF_ERROR(unnest_position_delete_column(block, ICEBERG_ROW_POS));
return Status::OK();
}

IcebergDeleteFileIOContext::IcebergDeleteFileIOContext(RuntimeState* state) {
io_ctx.file_cache_stats = &file_cache_stats;
io_ctx.file_reader_stats = &file_reader_stats;
Expand Down Expand Up @@ -244,17 +272,17 @@ Status read_iceberg_position_delete_file(const TIcebergDeleteFileDesc& delete_fi
Block block;
if (dictionary_coded) {
block.insert(ColumnWithTypeAndName(
ColumnDictI32::create(FieldType::OLAP_FIELD_TYPE_VARCHAR),
std::make_shared<DataTypeString>(), ICEBERG_FILE_PATH));
make_nullable(ColumnDictI32::create(FieldType::OLAP_FIELD_TYPE_VARCHAR)),
make_nullable(std::make_shared<DataTypeString>()), ICEBERG_FILE_PATH));
} else {
block.insert(ColumnWithTypeAndName(ColumnString::create(),
std::make_shared<DataTypeString>(),
ICEBERG_FILE_PATH));
block.insert(ColumnWithTypeAndName(
make_nullable(std::make_shared<DataTypeString>()), ICEBERG_FILE_PATH));
}
block.insert(ColumnWithTypeAndName(ColumnInt64::create(),
std::make_shared<DataTypeInt64>(), ICEBERG_ROW_POS));
block.insert(ColumnWithTypeAndName(make_nullable(std::make_shared<DataTypeInt64>()),
ICEBERG_ROW_POS));
size_t read_rows = 0;
RETURN_IF_ERROR(reader.get_next_block(&block, &read_rows, &eof));
RETURN_IF_ERROR(unnest_iceberg_position_delete_block(&block));
RETURN_IF_ERROR(visit_position_delete_block(block, read_rows, visitor));
}
return Status::OK();
Expand Down
3 changes: 3 additions & 0 deletions be/src/format/table/iceberg_delete_file_reader_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

namespace doris {

class Block;
class FileMetaCache;
class RuntimeProfile;
class RuntimeState;
Expand Down Expand Up @@ -67,6 +68,8 @@ TFileRangeDesc build_iceberg_delete_file_range(const std::string& path);

bool is_iceberg_deletion_vector(const TIcebergDeleteFileDesc& delete_file);

Status unnest_iceberg_position_delete_block(Block* block);

Status read_iceberg_position_delete_file(const TIcebergDeleteFileDesc& delete_file,
const IcebergDeleteFileReaderOptions& options,
IcebergPositionDeleteVisitor* visitor);
Expand Down
25 changes: 18 additions & 7 deletions be/src/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "core/column/column_string.h"
#include "core/column/column_vector.h"
#include "core/data_type/data_type_factory.hpp"
#include "core/data_type/data_type_nullable.h"
#include "core/data_type/define_primitive_type.h"
#include "core/data_type/primitive_type.h"
#include "core/string_ref.h"
Expand All @@ -52,6 +53,7 @@
#include "format/table/deletion_vector_reader.h"
#include "format/table/iceberg/iceberg_orc_nested_column_utils.h"
#include "format/table/iceberg/iceberg_parquet_nested_column_utils.h"
#include "format/table/iceberg_delete_file_reader_helper.h"
#include "format/table/nested_column_access_helper.h"
#include "format/table/table_schema_change_helper.h"
#include "runtime/runtime_state.h"
Expand Down Expand Up @@ -392,17 +394,26 @@ Status IcebergParquetReader::_read_position_delete_file(const TFileRangeDesc* de
}
DataTypePtr data_type_file_path {new DataTypeString};
DataTypePtr data_type_pos {new DataTypeInt64};
DataTypePtr nullable_data_type_file_path = make_nullable(data_type_file_path);
DataTypePtr nullable_data_type_pos = make_nullable(data_type_pos);
bool eof = false;
while (!eof) {
Block block = {dictionary_coded
? ColumnWithTypeAndName {ColumnDictI32::create(
FieldType::OLAP_FIELD_TYPE_VARCHAR),
data_type_file_path, ICEBERG_FILE_PATH}
: ColumnWithTypeAndName {data_type_file_path, ICEBERG_FILE_PATH},

{data_type_pos, ICEBERG_ROW_POS}};
Block block = {
dictionary_coded
? ColumnWithTypeAndName {ColumnDictI32::create(
FieldType::OLAP_FIELD_TYPE_VARCHAR),
nullable_data_type_file_path, ICEBERG_FILE_PATH}
: ColumnWithTypeAndName {nullable_data_type_file_path, ICEBERG_FILE_PATH},

{nullable_data_type_pos, ICEBERG_ROW_POS}};
if (dictionary_coded) {
block.replace_by_position(
ICEBERG_FILE_PATH_INDEX,
make_nullable(block.get_by_position(ICEBERG_FILE_PATH_INDEX).column));
}
size_t read_rows = 0;
RETURN_IF_ERROR(parquet_delete_reader.get_next_block(&block, &read_rows, &eof));
RETURN_IF_ERROR(unnest_iceberg_position_delete_block(&block));

if (read_rows <= 0) {
break;
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ namespace {
constexpr const char* kMixedPositionDeleteFile =
"./be/test/exec/test_data/iceberg_mixed_position_delete_parquet/"
"mixed_encoding_position_delete.parquet";
constexpr const char* kOptionalNonNullPositionDeleteFile =
"./be/test/exec/test_data/iceberg_optional_position_delete_parquet/"
"optional_non_null_position_delete.parquet";
constexpr const char* kOptionalNullPositionDeleteFile =
"./be/test/exec/test_data/iceberg_optional_position_delete_parquet/"
"optional_null_position_delete.parquet";
constexpr const char* kTargetDataFilePath =
"s3://warehouse/wh/test_db/000_target_data_file.parquet";

Expand Down Expand Up @@ -107,4 +113,68 @@ TEST(IcebergDeleteFileReaderHelperTest, ReadMixedEncodingParquetPositionDeleteFi
EXPECT_EQ(it->second, expected_positions);
}

TEST(IcebergDeleteFileReaderHelperTest, ReadOptionalParquetPositionDeleteFileWithoutNulls) {
RuntimeProfile profile("test_profile");
RuntimeState runtime_state((TQueryOptions()), TQueryGlobals());
FileMetaCache meta_cache(1024);
IcebergDeleteFileIOContext io_context(&runtime_state);

TFileScanRangeParams scan_params;
scan_params.file_type = TFileType::FILE_LOCAL;
scan_params.format_type = TFileFormatType::FORMAT_PARQUET;

TIcebergDeleteFileDesc delete_file;
delete_file.path = kOptionalNonNullPositionDeleteFile;
delete_file.file_format = TFileFormatType::FORMAT_PARQUET;
delete_file.__isset.file_format = true;

IcebergDeleteFileReaderOptions options;
options.state = &runtime_state;
options.profile = &profile;
options.scan_params = &scan_params;
options.io_ctx = &io_context.io_ctx;
options.meta_cache = &meta_cache;
options.batch_size = 1024;

CollectPositionDeleteVisitor visitor;
auto st = read_iceberg_position_delete_file(delete_file, options, &visitor);
ASSERT_TRUE(st.ok()) << st;
ASSERT_EQ(visitor.total_rows, 4);

const auto it = visitor.delete_rows.find(kTargetDataFilePath);
ASSERT_NE(it, visitor.delete_rows.end());

const std::vector<int64_t> expected_positions = {0, 2, 4};
EXPECT_EQ(it->second, expected_positions);
}

TEST(IcebergDeleteFileReaderHelperTest, RejectOptionalParquetPositionDeleteFileWithNulls) {
RuntimeProfile profile("test_profile");
RuntimeState runtime_state((TQueryOptions()), TQueryGlobals());
FileMetaCache meta_cache(1024);
IcebergDeleteFileIOContext io_context(&runtime_state);

TFileScanRangeParams scan_params;
scan_params.file_type = TFileType::FILE_LOCAL;
scan_params.format_type = TFileFormatType::FORMAT_PARQUET;

TIcebergDeleteFileDesc delete_file;
delete_file.path = kOptionalNullPositionDeleteFile;
delete_file.file_format = TFileFormatType::FORMAT_PARQUET;
delete_file.__isset.file_format = true;

IcebergDeleteFileReaderOptions options;
options.state = &runtime_state;
options.profile = &profile;
options.scan_params = &scan_params;
options.io_ctx = &io_context.io_ctx;
options.meta_cache = &meta_cache;
options.batch_size = 1024;

CollectPositionDeleteVisitor visitor;
auto st = read_iceberg_position_delete_file(delete_file, options, &visitor);
ASSERT_FALSE(st.ok());
EXPECT_NE(st.to_string().find("has null values"), std::string::npos) << st;
}

} // namespace doris
Loading