Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 79 additions & 65 deletions src/db/index/segment/segment_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -609,38 +609,86 @@ Status SegmentHelper::ReduceVectorIndex(
auto vector_index_params =
std::dynamic_pointer_cast<VectorIndexParams>(field->index_params());

auto vector_block_id = block_id_generator();
if (vector_index_params->quantize_type() == QuantizeType::UNDEFINED) {
auto vector_index_path = FileHelper::MakeVectorIndexPath(
output_segment_path, field->name(), vector_block_id);
auto build_merge_options = [&]() {
vector_column_params::MergeOptions merge_options;
if (concurrency == 0) {
merge_options.pool = GlobalResource::Instance().optimize_thread_pool();
} else {
merge_options.write_concurrency = concurrency;
}
return merge_options;
};

// only create original vector indexer
auto vector_indexer =
std::make_shared<VectorColumnIndexer>(vector_index_path, *field);
s = vector_indexer->Open({true, true});
CHECK_RETURN_STATUS(s);
auto can_reuse_first_indexer = [&](auto &&fetch_indexers) {
if (filter != nullptr || input_segments.size() <= 1) {
return false;
}
for (const auto &input_segment : input_segments) {
if (fetch_indexers(input_segment).size() != 1) {
return false;
}
}
return true;
};

auto collect_merge_indexers = [&](auto &&fetch_indexers,
size_t start_index) {
std::vector<VectorColumnIndexer::Ptr> merge_indexers;
for (auto &input_segment : input_segments) {
// merge_indexers should be ordered put
auto to_merge_indexers =
input_segment->get_vector_indexer(field->name());
for (size_t i = start_index; i < input_segments.size(); ++i) {
auto to_merge_indexers = fetch_indexers(input_segments[i]);
merge_indexers.insert(merge_indexers.end(), to_merge_indexers.begin(),
to_merge_indexers.end());
}
return merge_indexers;
};

vector_column_params::MergeOptions merge_options;
if (concurrency == 0) {
merge_options.pool = GlobalResource::Instance().optimize_thread_pool();
} else {
merge_options.write_concurrency = concurrency;
auto merge_with_optional_reuse = [&](const std::string &output_index_path,
const FieldSchema &index_field,
auto &&fetch_indexers) -> Status {
auto vector_indexer =
std::make_shared<VectorColumnIndexer>(output_index_path, index_field);
auto merge_options = build_merge_options();
bool reused_base_index = false;

if (can_reuse_first_indexer(fetch_indexers)) {
auto first_indexer = fetch_indexers(input_segments.front())[0];
if (FileHelper::CopyFile(first_indexer->index_file_path(),
output_index_path)) {
// Reuse first segment index as merge base to avoid rebuilding it.
s = vector_indexer->Open({true, false});
CHECK_RETURN_STATUS(s);

auto merge_indexers = collect_merge_indexers(fetch_indexers, 1);
s = vector_indexer->Merge(merge_indexers, filter, merge_options);
CHECK_RETURN_STATUS(s);
reused_base_index = true;
}
}

s = vector_indexer->Merge(merge_indexers, filter, merge_options);
CHECK_RETURN_STATUS(s);
if (!reused_base_index) {
s = vector_indexer->Open({true, true});
CHECK_RETURN_STATUS(s);

auto merge_indexers = collect_merge_indexers(fetch_indexers, 0);
s = vector_indexer->Merge(merge_indexers, filter, merge_options);
CHECK_RETURN_STATUS(s);
}

s = vector_indexer->Flush();
CHECK_RETURN_STATUS(s);
return Status::OK();
};

auto vector_block_id = block_id_generator();
if (vector_index_params->quantize_type() == QuantizeType::UNDEFINED) {
auto vector_index_path = FileHelper::MakeVectorIndexPath(
output_segment_path, field->name(), vector_block_id);

s = merge_with_optional_reuse(vector_index_path, *field,
[&](const Segment::Ptr &input_segment) {
return input_segment->get_vector_indexer(field->name());
});
CHECK_RETURN_STATUS(s);

BlockMeta new_block_meta;
new_block_meta.set_id(vector_block_id);
Expand All @@ -659,32 +707,11 @@ Status SegmentHelper::ReduceVectorIndex(
field_without_quantize->set_index_params(
MakeDefaultVectorIndexParams(vector_index_params->metric_type()));

// create flat index
auto vector_indexer = std::make_shared<VectorColumnIndexer>(
vector_index_path, *field_without_quantize);
s = vector_indexer->Open({true, true});
CHECK_RETURN_STATUS(s);

std::vector<VectorColumnIndexer::Ptr> merge_indexers;
for (auto &input_segment : input_segments) {
// merge_indexers should be ordered put
auto to_merge_indexers =
input_segment->get_vector_indexer(field->name());
merge_indexers.insert(merge_indexers.end(), to_merge_indexers.begin(),
to_merge_indexers.end());
}

vector_column_params::MergeOptions merge_options;
if (concurrency == 0) {
merge_options.pool = GlobalResource::Instance().optimize_thread_pool();
} else {
merge_options.write_concurrency = concurrency;
}

s = vector_indexer->Merge(merge_indexers, filter, merge_options);
CHECK_RETURN_STATUS(s);

s = vector_indexer->Flush();
s = merge_with_optional_reuse(
vector_index_path, *field_without_quantize,
[&](const Segment::Ptr &input_segment) {
return input_segment->get_vector_indexer(field->name());
});
CHECK_RETURN_STATUS(s);

BlockMeta new_block_meta;
Expand All @@ -699,24 +726,11 @@ Status SegmentHelper::ReduceVectorIndex(
auto vector_quan_index_path = FileHelper::MakeQuantizeVectorIndexPath(
output_segment_path, field->name(), vector_quan_block_id);

auto vector_indexer_quantize =
std::make_shared<VectorColumnIndexer>(vector_quan_index_path, *field);
s = vector_indexer_quantize->Open({true, true});
CHECK_RETURN_STATUS(s);

merge_indexers.clear();
for (auto &input_segment : input_segments) {
// merge_indexers should be ordered put
auto to_merge_indexers =
input_segment->get_quant_vector_indexer(field->name());
merge_indexers.insert(merge_indexers.end(), to_merge_indexers.begin(),
to_merge_indexers.end());
}

s = vector_indexer_quantize->Merge(merge_indexers, filter, merge_options);
CHECK_RETURN_STATUS(s);

s = vector_indexer_quantize->Flush();
s = merge_with_optional_reuse(
vector_quan_index_path, *field,
[&](const Segment::Ptr &input_segment) {
return input_segment->get_quant_vector_indexer(field->name());
});
CHECK_RETURN_STATUS(s);

new_block_meta.set_id(vector_quan_block_id);
Expand Down Expand Up @@ -827,4 +841,4 @@ Status SegmentHelper::ExecuteDropScalarIndexTask(DropScalarIndexTask &task) {
&task.output_scalar_indexer_);
}

} // namespace zvec
} // namespace zvec
117 changes: 116 additions & 1 deletion tests/db/index/segment/segment_helper_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -691,4 +691,119 @@ TEST_F(SegmentHelperTest, CreateVectorIndexTask_SingleField) {
<< output_segment_meta->to_string_formatted() << std::endl;
ASSERT_EQ(output_segment_meta->id(), 0);
ASSERT_FALSE(output_segment_meta->writing_forward_block().has_value());
}
}

TEST_F(SegmentHelperTest, CompactTask_VectorIndexThreeSegmentsRegression) {
auto schema = test::TestHelper::CreateSchemaWithVectorIndex();

Version version;
version.set_schema(*schema);
auto version_manager_tmp = VersionManager::Create(col_path, version);
if (!version_manager_tmp.has_value()) {
throw std::runtime_error("Failed to create version manager");
}

auto version_manager = version_manager_tmp.value();
bool forward_use_parquet = false;
auto seg_options =
SegmentOptions{false, !forward_use_parquet, DEFAULT_MAX_BUFFER_SIZE};

auto seg1 = test::TestHelper::CreateSegmentWithDoc(
GetColPath(), *schema, 0, 0, id_map, delete_store, version_manager,
seg_options, 0, 300);
auto seg2 = test::TestHelper::CreateSegmentWithDoc(
GetColPath(), *schema, 1, 300, id_map, delete_store, version_manager,
seg_options, 300, 300);
auto seg3 = test::TestHelper::CreateSegmentWithDoc(
GetColPath(), *schema, 2, 600, id_map, delete_store, version_manager,
seg_options, 600, 300);
ASSERT_TRUE(seg1 != nullptr);
ASSERT_TRUE(seg2 != nullptr);
ASSERT_TRUE(seg3 != nullptr);
ASSERT_TRUE(seg1->flush().ok());
ASSERT_TRUE(seg2->flush().ok());
ASSERT_TRUE(seg3->flush().ok());

CompactTask task(GetColPath(), schema, {seg1, seg2, seg3}, 3, nullptr,
forward_use_parquet, 1);
auto segment_task = SegmentTask::CreateComapctTask(task);
ASSERT_TRUE(segment_task != nullptr);

auto status = SegmentHelper::Execute(segment_task);
ASSERT_TRUE(status.ok());

auto compact_task = std::get<CompactTask>(segment_task->task_info());
ASSERT_TRUE(compact_task.output_segment_meta_ != nullptr);

auto tmp_segment_path = FileHelper::MakeTempSegmentPath(GetColPath(), 3);
auto new_segment_path = FileHelper::MakeSegmentPath(GetColPath(), 3);
ASSERT_TRUE(FileHelper::MoveDirectory(tmp_segment_path, new_segment_path));

seg_options.read_only_ = true;
version_manager->set_enable_mmap(!forward_use_parquet);
auto output_segment_ret =
Segment::Open(GetColPath(), *schema, *compact_task.output_segment_meta_,
id_map, delete_store, version_manager, seg_options);
ASSERT_TRUE(output_segment_ret.has_value());
auto output_segment = std::move(output_segment_ret.value());

ASSERT_EQ(output_segment->doc_count(), 900);
ASSERT_NE(output_segment->Fetch(0), nullptr);
ASSERT_NE(output_segment->Fetch(899), nullptr);
}

TEST_F(SegmentHelperTest, CompactTask_FilterMultiSegmentsRegression) {
auto schema = test::TestHelper::CreateSchemaWithVectorIndex();

Version version;
version.set_schema(*schema);
auto version_manager_tmp = VersionManager::Create(col_path, version);
if (!version_manager_tmp.has_value()) {
throw std::runtime_error("Failed to create version manager");
}

auto version_manager = version_manager_tmp.value();
bool forward_use_parquet = false;
auto seg_options =
SegmentOptions{false, !forward_use_parquet, DEFAULT_MAX_BUFFER_SIZE};

auto seg1 = test::TestHelper::CreateSegmentWithDoc(
GetColPath(), *schema, 0, 0, id_map, delete_store, version_manager,
seg_options, 0, 400);
auto seg2 = test::TestHelper::CreateSegmentWithDoc(
GetColPath(), *schema, 1, 400, id_map, delete_store, version_manager,
seg_options, 400, 400);
ASSERT_TRUE(seg1 != nullptr);
ASSERT_TRUE(seg2 != nullptr);
ASSERT_TRUE(seg1->flush().ok());
ASSERT_TRUE(seg2->flush().ok());

auto filter = std::make_shared<EasyIndexFilter>([](uint64_t id) -> bool {
return id < 100 || (id >= 400 && id < 450);
});

CompactTask task(GetColPath(), schema, {seg1, seg2}, 2, filter,
forward_use_parquet, 1);
auto segment_task = SegmentTask::CreateComapctTask(task);
ASSERT_TRUE(segment_task != nullptr);

auto status = SegmentHelper::Execute(segment_task);
ASSERT_TRUE(status.ok());

auto compact_task = std::get<CompactTask>(segment_task->task_info());
ASSERT_TRUE(compact_task.output_segment_meta_ != nullptr);

auto tmp_segment_path = FileHelper::MakeTempSegmentPath(GetColPath(), 2);
auto new_segment_path = FileHelper::MakeSegmentPath(GetColPath(), 2);
ASSERT_TRUE(FileHelper::MoveDirectory(tmp_segment_path, new_segment_path));

seg_options.read_only_ = true;
version_manager->set_enable_mmap(!forward_use_parquet);
auto output_segment_ret =
Segment::Open(GetColPath(), *schema, *compact_task.output_segment_meta_,
id_map, delete_store, version_manager, seg_options);
ASSERT_TRUE(output_segment_ret.has_value());
auto output_segment = std::move(output_segment_ret.value());

ASSERT_EQ(output_segment->doc_count(), 650);
}