Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 44 additions & 92 deletions be/src/exec/rowid_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,36 +357,6 @@ struct SegItem {
SegmentSharedPtr segment;
};

// Groups all row_ids belonging to the same segment for batched reading.
// Position index tracks where each row_id originated in the original request,
// so results can be scattered back to the correct output positions.
struct DorisFormatReadBatch {
std::shared_ptr<FileMapping> file_mapping;
// (row_id, index_in_request) pairs for all rows in this segment.
std::vector<std::pair<segment_v2::rowid_t, size_t>> row_ids_with_positions;
};

static void scatter_scan_blocks_to_result_block(
const std::vector<std::pair<size_t, size_t>>& row_id_block_idx,
std::vector<Block>& scan_blocks, Block& result_block) {
for (size_t column_id = 0; column_id < result_block.columns(); ++column_id) {
auto dst_col = const_cast<IColumn*>(result_block.get_by_position(column_id).column.get());

std::vector<const IColumn*> scan_src_columns;
scan_src_columns.reserve(row_id_block_idx.size());
std::vector<size_t> scan_positions;
scan_positions.reserve(row_id_block_idx.size());
for (const auto& [pos_block, block_idx] : row_id_block_idx) {
DCHECK(scan_blocks.size() > pos_block);
DCHECK(scan_blocks[pos_block].columns() > column_id);
scan_src_columns.emplace_back(
scan_blocks[pos_block].get_by_position(column_id).column.get());
scan_positions.emplace_back(block_idx);
}
dst_col->insert_from_multi_column(scan_src_columns, scan_positions);
}
}

Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request,
PMultiGetResponse* response) {
// read from storage engine row id by row id
Expand Down Expand Up @@ -490,8 +460,7 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request,
row_location.row_location.segment_id,
row_location.row_location.row_id);
for (int x = 0; x < slots.size(); ++x) {
std::vector<segment_v2::rowid_t> row_ids {
static_cast<segment_v2::rowid_t>(row_loc.ordinal_id())};
auto row_id = static_cast<segment_v2::rowid_t>(row_loc.ordinal_id());
MutableColumnPtr column = result_block.get_by_position(x).column->assume_mutable();
IteratorKey iterator_key {.tablet_id = tablet->tablet_id(),
.rowset_id = rowset_id,
Expand All @@ -506,8 +475,8 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request,
}
segment = iterator_item.segment;
RETURN_IF_ERROR(segment->seek_and_read_by_rowid(
full_read_schema, &slots[x], row_ids, column,
iterator_item.storage_read_options, iterator_item.iterator));
full_read_schema, &slots[x], row_id, column, iterator_item.storage_read_options,
iterator_item.iterator));
}
}
// serialize block if not empty
Expand Down Expand Up @@ -687,71 +656,35 @@ Status RowIdStorageReader::read_batch_doris_format_row(
}
}

// Phase 1: Group all row_ids by their (tablet_id, rowset_id, segment_id) key.
// Unlike the old code which only batched adjacent rows with the same file_id,
// this merges non-contiguous same-segment requests into a single batch,
// maximizing the number of rows read per seek_and_read_by_rowid call.
std::vector<DorisFormatReadBatch> scan_batches;
std::unordered_map<SegKey, size_t, HashOfSegKey> batch_idx_by_seg;
// (batch_idx, position_in_batch) for each row in the original request.
std::vector<std::pair<size_t, size_t>> row_id_block_idx(request_block_desc.row_id_size());
for (int j = 0; j < request_block_desc.row_id_size(); ++j) {
std::vector<uint32_t> row_ids;
int k = 1;
auto max_k = 0;
for (int j = 0; j < request_block_desc.row_id_size();) {
auto file_id = request_block_desc.file_id(j);
row_ids.emplace_back(request_block_desc.row_id(j));
auto file_mapping = id_file_map->get_file_mapping(file_id);
if (!file_mapping) {
return Status::InternalError(
"Backend:{} file_mapping not found, query_id: {}, file_id: {}",
BackendOptions::get_localhost(), print_id(query_id), file_id);
}

// Derive segment key and group by it — rows from the same segment are batched together
// even if they are interleaved with rows from other segments in the request.
auto [tablet_id, rowset_id, segment_id] = file_mapping->get_doris_format_info();
SegKey seg_key {.tablet_id = tablet_id, .rowset_id = rowset_id, .segment_id = segment_id};
auto [it, inserted] = batch_idx_by_seg.emplace(seg_key, scan_batches.size());
if (inserted) {
// First time seeing this segment, create a new batch for it.
scan_batches.emplace_back();
scan_batches.back().file_mapping = file_mapping;
}
// Record (row_id, original_request_index) for later sorting and scattering.
scan_batches[it->second].row_ids_with_positions.emplace_back(request_block_desc.row_id(j),
j);
}

// Phase 2: For each segment, sort row_ids ascending (required by ColumnIterator),
// deduplicate, then read all rows in a single batch call.
std::vector<Block> scan_blocks(scan_batches.size());
for (size_t batch_idx = 0; batch_idx < scan_batches.size(); ++batch_idx) {
auto& scan_batch = scan_batches[batch_idx];
auto& row_ids_with_positions = scan_batch.row_ids_with_positions;
std::sort(row_ids_with_positions.begin(), row_ids_with_positions.end(),
[](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; });

// Column iterators read rowids monotonically. Deduplicate consecutive identical row_ids
// (different file_ids may map to the same row), then scatter rows back to their original
// request positions.
std::vector<uint32_t> row_ids;
row_ids.reserve(row_ids_with_positions.size());

// Also builds the scatter map: row_id_block_idx[original_request_idx] ->
// (batch_idx, deduplicated_position_in_batch).
for (const auto& [row_id, result_idx] : row_ids_with_positions) {
if (row_ids.empty() || row_ids.back() != row_id) {
row_ids.emplace_back(row_id);
for (k = 1; j + k < request_block_desc.row_id_size(); ++k) {
if (request_block_desc.file_id(j + k) == file_id) {
row_ids.emplace_back(request_block_desc.row_id(j + k));
} else {
break;
}
row_id_block_idx[result_idx] = std::make_pair(batch_idx, row_ids.size() - 1);
}

scan_blocks[batch_idx] = Block(slots, row_ids.size());
RETURN_IF_ERROR(read_doris_format_row(id_file_map, scan_batch.file_mapping, row_ids, slots,
full_read_schema, row_store_read_struct, stats,
acquire_tablet_ms, acquire_rowsets_ms,
acquire_segments_ms, lookup_row_data_ms, seg_map,
iterator_map, scan_blocks[batch_idx]));
}
RETURN_IF_ERROR(read_doris_format_row(
id_file_map, file_mapping, row_ids, slots, full_read_schema, row_store_read_struct,
stats, acquire_tablet_ms, acquire_rowsets_ms, acquire_segments_ms,
lookup_row_data_ms, seg_map, iterator_map, result_block));

scatter_scan_blocks_to_result_block(row_id_block_idx, scan_blocks, result_block);
j += k;
max_k = std::max(max_k, k);
row_ids.clear();
}

return Status::OK();
}
Expand Down Expand Up @@ -996,7 +929,24 @@ Status RowIdStorageReader::read_batch_external_row(
},
&scan_running_time));

scatter_scan_blocks_to_result_block(row_id_block_idx, scan_blocks, result_block);
// Insert the read data into result_block.
for (size_t column_id = 0; column_id < result_block.get_columns().size(); column_id++) {
// The non-const Block(result_block) is passed in read_by_rowids, but columns[i] in get_columns
// is at bottom an immutable_ptr of Cow<IColumn>, so use const_cast
auto dst_col = const_cast<IColumn*>(result_block.get_columns()[column_id].get());

std::vector<const IColumn*> scan_src_columns;
scan_src_columns.reserve(row_id_block_idx.size());
std::vector<size_t> scan_positions;
scan_positions.reserve(row_id_block_idx.size());
for (const auto& [pos_block, block_idx] : row_id_block_idx) {
DCHECK(scan_blocks.size() > pos_block);
DCHECK(scan_blocks[pos_block].get_columns().size() > column_id);
scan_src_columns.emplace_back(scan_blocks[pos_block].get_columns()[column_id].get());
scan_positions.emplace_back(block_idx);
}
dst_col->insert_from_multi_column(scan_src_columns, scan_positions);
}

// Statistical runtime profile information.
std::unique_ptr<RuntimeProfile> runtime_profile =
Expand Down Expand Up @@ -1151,9 +1101,11 @@ Status RowIdStorageReader::read_doris_format_row(
iterator_item.storage_read_options.stats = &stats;
iterator_item.storage_read_options.io_ctx.reader_type = ReaderType::READER_QUERY;
}
RETURN_IF_ERROR(segment->seek_and_read_by_rowid(
full_read_schema, &slots[x], row_ids, column,
iterator_item.storage_read_options, iterator_item.iterator));
for (auto row_id : row_ids) {
RETURN_IF_ERROR(segment->seek_and_read_by_rowid(
full_read_schema, &slots[x], row_id, column,
iterator_item.storage_read_options, iterator_item.iterator));
}
}
}
return Status::OK();
Expand Down
5 changes: 2 additions & 3 deletions be/src/service/point_query_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,7 @@ Status PointQueryExecutor::_lookup_row_data() {
const auto& segment = *it;
for (int cid : _reusable->missing_col_uids()) {
int pos = _reusable->get_col_uid_to_idx().at(cid);
std::vector<segment_v2::rowid_t> row_ids {
static_cast<segment_v2::rowid_t>(row_loc.row_id)};
auto row_id = static_cast<segment_v2::rowid_t>(row_loc.row_id);
MutableColumnPtr column =
_result_block->get_by_position(pos).column->assume_mutable();
std::unique_ptr<ColumnIterator> iter;
Expand All @@ -566,7 +565,7 @@ Status PointQueryExecutor::_lookup_row_data() {
storage_read_options.stats = &_read_stats;
storage_read_options.io_ctx.reader_type = ReaderType::READER_QUERY;
RETURN_IF_ERROR(segment->seek_and_read_by_rowid(*_tablet->tablet_schema(), slot,
row_ids, column,
row_id, column,
storage_read_options, iter));
if (_tablet->tablet_schema()
->column_by_uid(slot->col_unique_id())
Expand Down
18 changes: 5 additions & 13 deletions be/src/storage/segment/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <gen_cpp/olap_file.pb.h>
#include <gen_cpp/segment_v2.pb.h>

#include <algorithm>
#include <cstring>
#include <memory>
#include <sstream>
Expand Down Expand Up @@ -945,17 +944,9 @@ Status Segment::read_key_by_rowid(uint32_t row_id, std::string* key) {
}

Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* slot,
const std::vector<uint32_t>& row_ids,
MutableColumnPtr& result,
uint32_t row_id, MutableColumnPtr& result,
StorageReadOptions& storage_read_options,
std::unique_ptr<ColumnIterator>& iterator_hint) {
if (row_ids.empty()) {
return Status::OK();
}
DORIS_CHECK(std::is_sorted(row_ids.begin(), row_ids.end()));
DORIS_CHECK(std::adjacent_find(row_ids.begin(), row_ids.end()) == row_ids.end());
// ColumnIterator::seek_and_read expects monotonically increasing row_ids without
// duplicates for correct ordinal scanning. Enforce this contract at the entry point.
segment_v2::ColumnIteratorOptions opt {
.use_page_cache = !config::disable_storage_page_cache,
.file_reader = file_reader().get(),
Expand All @@ -965,6 +956,7 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescripto
&storage_read_options.stats->file_cache_stats},
};

std::vector<segment_v2::rowid_t> single_row_loc {row_id};
if (!slot->column_paths().empty()) {
// here need create column readers to make sure column reader is created before seek_and_read_by_rowid
// if segment cache miss, column reader will be created to make sure the variant column result not coredump
Expand All @@ -985,13 +977,13 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescripto
RETURN_IF_ERROR(iterator_hint->init(opt));
}
RETURN_IF_ERROR(
iterator_hint->read_by_rowids(row_ids.data(), row_ids.size(), file_storage_column));
iterator_hint->read_by_rowids(single_row_loc.data(), 1, file_storage_column));
ColumnPtr source_ptr;
// storage may have different type with schema, so we need to cast the column
RETURN_IF_ERROR(variant_util::cast_column(
ColumnWithTypeAndName(file_storage_column->get_ptr(), storage_type, column.name()),
slot->type(), &source_ptr));
RETURN_IF_CATCH_EXCEPTION(result->insert_range_from(*source_ptr, 0, row_ids.size()));
RETURN_IF_CATCH_EXCEPTION(result->insert_range_from(*source_ptr, 0, 1));
} else {
int index = (slot->col_unique_id() >= 0) ? schema.field_index(slot->col_unique_id())
: schema.field_index(slot->col_name());
Expand All @@ -1006,7 +998,7 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescripto
&storage_read_options));
RETURN_IF_ERROR(iterator_hint->init(opt));
}
RETURN_IF_ERROR(iterator_hint->read_by_rowids(row_ids.data(), row_ids.size(), result));
RETURN_IF_ERROR(iterator_hint->read_by_rowids(single_row_loc.data(), 1, result));
}
return Status::OK();
}
Expand Down
5 changes: 2 additions & 3 deletions be/src/storage/segment/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,8 @@ class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdd

Status read_key_by_rowid(uint32_t row_id, std::string* key);

// row_ids must be strictly increasing.
Status seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* slot,
const std::vector<uint32_t>& row_ids, MutableColumnPtr& result,
Status seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* slot, uint32_t row_id,
MutableColumnPtr& result,
StorageReadOptions& storage_read_options,
std::unique_ptr<ColumnIterator>& iterator_hint);

Expand Down
Loading