From 62d42cfc9ba5caa0d4cf1602b96244a28743e9f4 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Fri, 22 May 2026 19:42:48 +0800 Subject: [PATCH] [fix](be) Revert row id read batching ### What problem does this PR solve? Issue Number: None Related PR: #63436 Problem Summary: Revert the row id fetch batching change from #63436 because recent profiling shows the row id read path spending significant CPU in nullable null-map RLE decoding and bit readers after the batching change. The reverted change groups sparse row ids by segment and reads them in larger sorted batches, which can make FileColumnIterator::read_by_rowids advance through large null-map ranges for sparse nullable columns. Restore the previous per-row and adjacent-file batching behavior while the sparse nullable row id access pattern is investigated. ### Release note None ### Check List (For Author) - Test: Manual test - build-support/check-format.sh be/src/exec/rowid_fetcher.cpp be/src/service/point_query_executor.cpp be/src/storage/segment/segment.cpp be/src/storage/segment/segment.h - ninja -C be/build_Release src/exec/CMakeFiles/Exec.dir/rowid_fetcher.cpp.o src/service/CMakeFiles/Service.dir/point_query_executor.cpp.o src/storage/CMakeFiles/Storage.dir/segment/segment.cpp.o - Behavior changed: Yes. Restore the row id fetch behavior before #63436. - Does this need documentation: No Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- be/src/exec/rowid_fetcher.cpp | 136 ++++++++---------------- be/src/service/point_query_executor.cpp | 5 +- be/src/storage/segment/segment.cpp | 18 +--- be/src/storage/segment/segment.h | 5 +- 4 files changed, 53 insertions(+), 111 deletions(-) diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp index 27c66197541f5e..f97bce17a8c6a4 100644 --- a/be/src/exec/rowid_fetcher.cpp +++ b/be/src/exec/rowid_fetcher.cpp @@ -357,36 +357,6 @@ struct SegItem { SegmentSharedPtr segment; }; -// Groups all row_ids belonging to the same segment for batched reading. -// Position index tracks where each row_id originated in the original request, -// so results can be scattered back to the correct output positions. -struct DorisFormatReadBatch { - std::shared_ptr file_mapping; - // (row_id, index_in_request) pairs for all rows in this segment. - std::vector> row_ids_with_positions; -}; - -static void scatter_scan_blocks_to_result_block( - const std::vector>& row_id_block_idx, - std::vector& scan_blocks, Block& result_block) { - for (size_t column_id = 0; column_id < result_block.columns(); ++column_id) { - auto dst_col = const_cast(result_block.get_by_position(column_id).column.get()); - - std::vector scan_src_columns; - scan_src_columns.reserve(row_id_block_idx.size()); - std::vector scan_positions; - scan_positions.reserve(row_id_block_idx.size()); - for (const auto& [pos_block, block_idx] : row_id_block_idx) { - DCHECK(scan_blocks.size() > pos_block); - DCHECK(scan_blocks[pos_block].columns() > column_id); - scan_src_columns.emplace_back( - scan_blocks[pos_block].get_by_position(column_id).column.get()); - scan_positions.emplace_back(block_idx); - } - dst_col->insert_from_multi_column(scan_src_columns, scan_positions); - } -} - Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request, PMultiGetResponse* response) { // read from storage engine row id by row id @@ -490,8 +460,7 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request, row_location.row_location.segment_id, row_location.row_location.row_id); for (int x = 0; x < slots.size(); ++x) { - std::vector row_ids { - static_cast(row_loc.ordinal_id())}; + auto row_id = static_cast(row_loc.ordinal_id()); MutableColumnPtr column = result_block.get_by_position(x).column->assume_mutable(); IteratorKey iterator_key {.tablet_id = tablet->tablet_id(), .rowset_id = rowset_id, @@ -506,8 +475,8 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request, } segment = iterator_item.segment; RETURN_IF_ERROR(segment->seek_and_read_by_rowid( - full_read_schema, &slots[x], row_ids, column, - iterator_item.storage_read_options, iterator_item.iterator)); + full_read_schema, &slots[x], row_id, column, iterator_item.storage_read_options, + iterator_item.iterator)); } } // serialize block if not empty @@ -687,71 +656,35 @@ Status RowIdStorageReader::read_batch_doris_format_row( } } - // Phase 1: Group all row_ids by their (tablet_id, rowset_id, segment_id) key. - // Unlike the old code which only batched adjacent rows with the same file_id, - // this merges non-contiguous same-segment requests into a single batch, - // maximizing the number of rows read per seek_and_read_by_rowid call. - std::vector scan_batches; - std::unordered_map batch_idx_by_seg; - // (batch_idx, position_in_batch) for each row in the original request. - std::vector> row_id_block_idx(request_block_desc.row_id_size()); - for (int j = 0; j < request_block_desc.row_id_size(); ++j) { + std::vector row_ids; + int k = 1; + auto max_k = 0; + for (int j = 0; j < request_block_desc.row_id_size();) { auto file_id = request_block_desc.file_id(j); + row_ids.emplace_back(request_block_desc.row_id(j)); auto file_mapping = id_file_map->get_file_mapping(file_id); if (!file_mapping) { return Status::InternalError( "Backend:{} file_mapping not found, query_id: {}, file_id: {}", BackendOptions::get_localhost(), print_id(query_id), file_id); } - - // Derive segment key and group by it — rows from the same segment are batched together - // even if they are interleaved with rows from other segments in the request. - auto [tablet_id, rowset_id, segment_id] = file_mapping->get_doris_format_info(); - SegKey seg_key {.tablet_id = tablet_id, .rowset_id = rowset_id, .segment_id = segment_id}; - auto [it, inserted] = batch_idx_by_seg.emplace(seg_key, scan_batches.size()); - if (inserted) { - // First time seeing this segment, create a new batch for it. - scan_batches.emplace_back(); - scan_batches.back().file_mapping = file_mapping; - } - // Record (row_id, original_request_index) for later sorting and scattering. - scan_batches[it->second].row_ids_with_positions.emplace_back(request_block_desc.row_id(j), - j); - } - - // Phase 2: For each segment, sort row_ids ascending (required by ColumnIterator), - // deduplicate, then read all rows in a single batch call. - std::vector scan_blocks(scan_batches.size()); - for (size_t batch_idx = 0; batch_idx < scan_batches.size(); ++batch_idx) { - auto& scan_batch = scan_batches[batch_idx]; - auto& row_ids_with_positions = scan_batch.row_ids_with_positions; - std::sort(row_ids_with_positions.begin(), row_ids_with_positions.end(), - [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; }); - - // Column iterators read rowids monotonically. Deduplicate consecutive identical row_ids - // (different file_ids may map to the same row), then scatter rows back to their original - // request positions. - std::vector row_ids; - row_ids.reserve(row_ids_with_positions.size()); - - // Also builds the scatter map: row_id_block_idx[original_request_idx] -> - // (batch_idx, deduplicated_position_in_batch). - for (const auto& [row_id, result_idx] : row_ids_with_positions) { - if (row_ids.empty() || row_ids.back() != row_id) { - row_ids.emplace_back(row_id); + for (k = 1; j + k < request_block_desc.row_id_size(); ++k) { + if (request_block_desc.file_id(j + k) == file_id) { + row_ids.emplace_back(request_block_desc.row_id(j + k)); + } else { + break; } - row_id_block_idx[result_idx] = std::make_pair(batch_idx, row_ids.size() - 1); } - scan_blocks[batch_idx] = Block(slots, row_ids.size()); - RETURN_IF_ERROR(read_doris_format_row(id_file_map, scan_batch.file_mapping, row_ids, slots, - full_read_schema, row_store_read_struct, stats, - acquire_tablet_ms, acquire_rowsets_ms, - acquire_segments_ms, lookup_row_data_ms, seg_map, - iterator_map, scan_blocks[batch_idx])); - } + RETURN_IF_ERROR(read_doris_format_row( + id_file_map, file_mapping, row_ids, slots, full_read_schema, row_store_read_struct, + stats, acquire_tablet_ms, acquire_rowsets_ms, acquire_segments_ms, + lookup_row_data_ms, seg_map, iterator_map, result_block)); - scatter_scan_blocks_to_result_block(row_id_block_idx, scan_blocks, result_block); + j += k; + max_k = std::max(max_k, k); + row_ids.clear(); + } return Status::OK(); } @@ -996,7 +929,24 @@ Status RowIdStorageReader::read_batch_external_row( }, &scan_running_time)); - scatter_scan_blocks_to_result_block(row_id_block_idx, scan_blocks, result_block); + // Insert the read data into result_block. + for (size_t column_id = 0; column_id < result_block.get_columns().size(); column_id++) { + // The non-const Block(result_block) is passed in read_by_rowids, but columns[i] in get_columns + // is at bottom an immutable_ptr of Cow, so use const_cast + auto dst_col = const_cast(result_block.get_columns()[column_id].get()); + + std::vector scan_src_columns; + scan_src_columns.reserve(row_id_block_idx.size()); + std::vector scan_positions; + scan_positions.reserve(row_id_block_idx.size()); + for (const auto& [pos_block, block_idx] : row_id_block_idx) { + DCHECK(scan_blocks.size() > pos_block); + DCHECK(scan_blocks[pos_block].get_columns().size() > column_id); + scan_src_columns.emplace_back(scan_blocks[pos_block].get_columns()[column_id].get()); + scan_positions.emplace_back(block_idx); + } + dst_col->insert_from_multi_column(scan_src_columns, scan_positions); + } // Statistical runtime profile information. std::unique_ptr runtime_profile = @@ -1151,9 +1101,11 @@ Status RowIdStorageReader::read_doris_format_row( iterator_item.storage_read_options.stats = &stats; iterator_item.storage_read_options.io_ctx.reader_type = ReaderType::READER_QUERY; } - RETURN_IF_ERROR(segment->seek_and_read_by_rowid( - full_read_schema, &slots[x], row_ids, column, - iterator_item.storage_read_options, iterator_item.iterator)); + for (auto row_id : row_ids) { + RETURN_IF_ERROR(segment->seek_and_read_by_rowid( + full_read_schema, &slots[x], row_id, column, + iterator_item.storage_read_options, iterator_item.iterator)); + } } } return Status::OK(); diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index af34a3fe1d4cfc..441284a251b3c8 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -556,8 +556,7 @@ Status PointQueryExecutor::_lookup_row_data() { const auto& segment = *it; for (int cid : _reusable->missing_col_uids()) { int pos = _reusable->get_col_uid_to_idx().at(cid); - std::vector row_ids { - static_cast(row_loc.row_id)}; + auto row_id = static_cast(row_loc.row_id); MutableColumnPtr column = _result_block->get_by_position(pos).column->assume_mutable(); std::unique_ptr iter; @@ -566,7 +565,7 @@ Status PointQueryExecutor::_lookup_row_data() { storage_read_options.stats = &_read_stats; storage_read_options.io_ctx.reader_type = ReaderType::READER_QUERY; RETURN_IF_ERROR(segment->seek_and_read_by_rowid(*_tablet->tablet_schema(), slot, - row_ids, column, + row_id, column, storage_read_options, iter)); if (_tablet->tablet_schema() ->column_by_uid(slot->col_unique_id()) diff --git a/be/src/storage/segment/segment.cpp b/be/src/storage/segment/segment.cpp index 5c3c3f74202ea1..1c5578c1388146 100644 --- a/be/src/storage/segment/segment.cpp +++ b/be/src/storage/segment/segment.cpp @@ -23,7 +23,6 @@ #include #include -#include #include #include #include @@ -945,17 +944,9 @@ Status Segment::read_key_by_rowid(uint32_t row_id, std::string* key) { } Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* slot, - const std::vector& row_ids, - MutableColumnPtr& result, + uint32_t row_id, MutableColumnPtr& result, StorageReadOptions& storage_read_options, std::unique_ptr& iterator_hint) { - if (row_ids.empty()) { - return Status::OK(); - } - DORIS_CHECK(std::is_sorted(row_ids.begin(), row_ids.end())); - DORIS_CHECK(std::adjacent_find(row_ids.begin(), row_ids.end()) == row_ids.end()); - // ColumnIterator::seek_and_read expects monotonically increasing row_ids without - // duplicates for correct ordinal scanning. Enforce this contract at the entry point. segment_v2::ColumnIteratorOptions opt { .use_page_cache = !config::disable_storage_page_cache, .file_reader = file_reader().get(), @@ -965,6 +956,7 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescripto &storage_read_options.stats->file_cache_stats}, }; + std::vector single_row_loc {row_id}; if (!slot->column_paths().empty()) { // here need create column readers to make sure column reader is created before seek_and_read_by_rowid // if segment cache miss, column reader will be created to make sure the variant column result not coredump @@ -985,13 +977,13 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescripto RETURN_IF_ERROR(iterator_hint->init(opt)); } RETURN_IF_ERROR( - iterator_hint->read_by_rowids(row_ids.data(), row_ids.size(), file_storage_column)); + iterator_hint->read_by_rowids(single_row_loc.data(), 1, file_storage_column)); ColumnPtr source_ptr; // storage may have different type with schema, so we need to cast the column RETURN_IF_ERROR(variant_util::cast_column( ColumnWithTypeAndName(file_storage_column->get_ptr(), storage_type, column.name()), slot->type(), &source_ptr)); - RETURN_IF_CATCH_EXCEPTION(result->insert_range_from(*source_ptr, 0, row_ids.size())); + RETURN_IF_CATCH_EXCEPTION(result->insert_range_from(*source_ptr, 0, 1)); } else { int index = (slot->col_unique_id() >= 0) ? schema.field_index(slot->col_unique_id()) : schema.field_index(slot->col_name()); @@ -1006,7 +998,7 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescripto &storage_read_options)); RETURN_IF_ERROR(iterator_hint->init(opt)); } - RETURN_IF_ERROR(iterator_hint->read_by_rowids(row_ids.data(), row_ids.size(), result)); + RETURN_IF_ERROR(iterator_hint->read_by_rowids(single_row_loc.data(), 1, result)); } return Status::OK(); } diff --git a/be/src/storage/segment/segment.h b/be/src/storage/segment/segment.h index 465d20343bf192..f18be6c093ade8 100644 --- a/be/src/storage/segment/segment.h +++ b/be/src/storage/segment/segment.h @@ -144,9 +144,8 @@ class Segment : public std::enable_shared_from_this, public MetadataAdd Status read_key_by_rowid(uint32_t row_id, std::string* key); - // row_ids must be strictly increasing. - Status seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* slot, - const std::vector& row_ids, MutableColumnPtr& result, + Status seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* slot, uint32_t row_id, + MutableColumnPtr& result, StorageReadOptions& storage_read_options, std::unique_ptr& iterator_hint);