From cddfdeb4d15e469f6a9aaf99d9e897d5d555cfdc Mon Sep 17 00:00:00 2001 From: kgeg401 Date: Sat, 14 Feb 2026 13:15:22 -0500 Subject: [PATCH 1/2] fix(segment): reuse vector index base during compaction --- src/db/index/segment/segment_helper.cc | 145 ++++++++++-------- tests/db/index/segment/segment_helper_test.cc | 117 +++++++++++++- 2 files changed, 196 insertions(+), 66 deletions(-) diff --git a/src/db/index/segment/segment_helper.cc b/src/db/index/segment/segment_helper.cc index 45b2dfb3..a31d7088 100644 --- a/src/db/index/segment/segment_helper.cc +++ b/src/db/index/segment/segment_helper.cc @@ -609,38 +609,87 @@ Status SegmentHelper::ReduceVectorIndex( auto vector_index_params = std::dynamic_pointer_cast(field->index_params()); - auto vector_block_id = block_id_generator(); - if (vector_index_params->quantize_type() == QuantizeType::UNDEFINED) { - auto vector_index_path = FileHelper::MakeVectorIndexPath( - output_segment_path, field->name(), vector_block_id); + auto build_merge_options = [&]() { + vector_column_params::MergeOptions merge_options; + if (concurrency == 0) { + merge_options.pool = GlobalResource::Instance().optimize_thread_pool(); + } else { + merge_options.write_concurrency = concurrency; + } + return merge_options; + }; - // only create original vector indexer - auto vector_indexer = - std::make_shared(vector_index_path, *field); - s = vector_indexer->Open({true, true}); - CHECK_RETURN_STATUS(s); + auto can_reuse_first_indexer = [&](auto &&fetch_indexers) { + if (filter != nullptr || input_segments.size() <= 1) { + return false; + } + for (const auto &input_segment : input_segments) { + if (fetch_indexers(input_segment).size() != 1) { + return false; + } + } + return true; + }; + auto collect_merge_indexers = [&](auto &&fetch_indexers, + size_t start_index) { std::vector merge_indexers; - for (auto &input_segment : input_segments) { - // merge_indexers should be ordered put - auto to_merge_indexers = - input_segment->get_vector_indexer(field->name()); + for (size_t i = start_index; i < input_segments.size(); ++i) { + auto to_merge_indexers = fetch_indexers(input_segments[i]); merge_indexers.insert(merge_indexers.end(), to_merge_indexers.begin(), to_merge_indexers.end()); } + return merge_indexers; + }; - vector_column_params::MergeOptions merge_options; - if (concurrency == 0) { - merge_options.pool = GlobalResource::Instance().optimize_thread_pool(); - } else { - merge_options.write_concurrency = concurrency; + auto merge_with_optional_reuse = [&](const std::string &output_index_path, + const FieldSchema &index_field, + auto &&fetch_indexers) -> Status { + auto vector_indexer = + std::make_shared(output_index_path, index_field); + auto merge_options = build_merge_options(); + bool reused_base_index = false; + + if (can_reuse_first_indexer(fetch_indexers)) { + auto first_indexer = fetch_indexers(input_segments.front())[0]; + if (FileHelper::CopyFile(first_indexer->index_file_path(), + output_index_path)) { + // Reuse first segment index as merge base to avoid rebuilding it. + s = vector_indexer->Open({true, false}); + CHECK_RETURN_STATUS(s); + + auto merge_indexers = collect_merge_indexers(fetch_indexers, 1); + s = vector_indexer->Merge(merge_indexers, filter, merge_options); + CHECK_RETURN_STATUS(s); + reused_base_index = true; + } } - s = vector_indexer->Merge(merge_indexers, filter, merge_options); - CHECK_RETURN_STATUS(s); + if (!reused_base_index) { + s = vector_indexer->Open({true, true}); + CHECK_RETURN_STATUS(s); + + auto merge_indexers = collect_merge_indexers(fetch_indexers, 0); + s = vector_indexer->Merge(merge_indexers, filter, merge_options); + CHECK_RETURN_STATUS(s); + } s = vector_indexer->Flush(); CHECK_RETURN_STATUS(s); + return Status::OK(); + }; + + auto vector_block_id = block_id_generator(); + if (vector_index_params->quantize_type() == QuantizeType::UNDEFINED) { + auto vector_index_path = FileHelper::MakeVectorIndexPath( + output_segment_path, field->name(), vector_block_id); + + s = merge_with_optional_reuse( + vector_index_path, *field, + [&](const Segment::Ptr &input_segment) { + return input_segment->get_vector_indexer(field->name()); + }); + CHECK_RETURN_STATUS(s); BlockMeta new_block_meta; new_block_meta.set_id(vector_block_id); @@ -659,32 +708,11 @@ Status SegmentHelper::ReduceVectorIndex( field_without_quantize->set_index_params( MakeDefaultVectorIndexParams(vector_index_params->metric_type())); - // create flat index - auto vector_indexer = std::make_shared( - vector_index_path, *field_without_quantize); - s = vector_indexer->Open({true, true}); - CHECK_RETURN_STATUS(s); - - std::vector merge_indexers; - for (auto &input_segment : input_segments) { - // merge_indexers should be ordered put - auto to_merge_indexers = - input_segment->get_vector_indexer(field->name()); - merge_indexers.insert(merge_indexers.end(), to_merge_indexers.begin(), - to_merge_indexers.end()); - } - - vector_column_params::MergeOptions merge_options; - if (concurrency == 0) { - merge_options.pool = GlobalResource::Instance().optimize_thread_pool(); - } else { - merge_options.write_concurrency = concurrency; - } - - s = vector_indexer->Merge(merge_indexers, filter, merge_options); - CHECK_RETURN_STATUS(s); - - s = vector_indexer->Flush(); + s = merge_with_optional_reuse( + vector_index_path, *field_without_quantize, + [&](const Segment::Ptr &input_segment) { + return input_segment->get_vector_indexer(field->name()); + }); CHECK_RETURN_STATUS(s); BlockMeta new_block_meta; @@ -699,24 +727,11 @@ Status SegmentHelper::ReduceVectorIndex( auto vector_quan_index_path = FileHelper::MakeQuantizeVectorIndexPath( output_segment_path, field->name(), vector_quan_block_id); - auto vector_indexer_quantize = - std::make_shared(vector_quan_index_path, *field); - s = vector_indexer_quantize->Open({true, true}); - CHECK_RETURN_STATUS(s); - - merge_indexers.clear(); - for (auto &input_segment : input_segments) { - // merge_indexers should be ordered put - auto to_merge_indexers = - input_segment->get_quant_vector_indexer(field->name()); - merge_indexers.insert(merge_indexers.end(), to_merge_indexers.begin(), - to_merge_indexers.end()); - } - - s = vector_indexer_quantize->Merge(merge_indexers, filter, merge_options); - CHECK_RETURN_STATUS(s); - - s = vector_indexer_quantize->Flush(); + s = merge_with_optional_reuse( + vector_quan_index_path, *field, + [&](const Segment::Ptr &input_segment) { + return input_segment->get_quant_vector_indexer(field->name()); + }); CHECK_RETURN_STATUS(s); new_block_meta.set_id(vector_quan_block_id); @@ -827,4 +842,4 @@ Status SegmentHelper::ExecuteDropScalarIndexTask(DropScalarIndexTask &task) { &task.output_scalar_indexer_); } -} // namespace zvec \ No newline at end of file +} // namespace zvec diff --git a/tests/db/index/segment/segment_helper_test.cc b/tests/db/index/segment/segment_helper_test.cc index 03f1c377..d724569b 100644 --- a/tests/db/index/segment/segment_helper_test.cc +++ b/tests/db/index/segment/segment_helper_test.cc @@ -691,4 +691,119 @@ TEST_F(SegmentHelperTest, CreateVectorIndexTask_SingleField) { << output_segment_meta->to_string_formatted() << std::endl; ASSERT_EQ(output_segment_meta->id(), 0); ASSERT_FALSE(output_segment_meta->writing_forward_block().has_value()); -} \ No newline at end of file +} + +TEST_F(SegmentHelperTest, CompactTask_VectorIndexThreeSegmentsRegression) { + auto schema = test::TestHelper::CreateSchemaWithVectorIndex(); + + Version version; + version.set_schema(*schema); + auto version_manager_tmp = VersionManager::Create(col_path, version); + if (!version_manager_tmp.has_value()) { + throw std::runtime_error("Failed to create version manager"); + } + + auto version_manager = version_manager_tmp.value(); + bool forward_use_parquet = false; + auto seg_options = + SegmentOptions{false, !forward_use_parquet, DEFAULT_MAX_BUFFER_SIZE}; + + auto seg1 = test::TestHelper::CreateSegmentWithDoc( + GetColPath(), *schema, 0, 0, id_map, delete_store, version_manager, + seg_options, 0, 300); + auto seg2 = test::TestHelper::CreateSegmentWithDoc( + GetColPath(), *schema, 1, 300, id_map, delete_store, version_manager, + seg_options, 300, 300); + auto seg3 = test::TestHelper::CreateSegmentWithDoc( + GetColPath(), *schema, 2, 600, id_map, delete_store, version_manager, + seg_options, 600, 300); + ASSERT_TRUE(seg1 != nullptr); + ASSERT_TRUE(seg2 != nullptr); + ASSERT_TRUE(seg3 != nullptr); + ASSERT_TRUE(seg1->flush().ok()); + ASSERT_TRUE(seg2->flush().ok()); + ASSERT_TRUE(seg3->flush().ok()); + + CompactTask task(GetColPath(), schema, {seg1, seg2, seg3}, 3, nullptr, + forward_use_parquet, 1); + auto segment_task = SegmentTask::CreateComapctTask(task); + ASSERT_TRUE(segment_task != nullptr); + + auto status = SegmentHelper::Execute(segment_task); + ASSERT_TRUE(status.ok()); + + auto compact_task = std::get(segment_task->task_info()); + ASSERT_TRUE(compact_task.output_segment_meta_ != nullptr); + + auto tmp_segment_path = FileHelper::MakeTempSegmentPath(GetColPath(), 3); + auto new_segment_path = FileHelper::MakeSegmentPath(GetColPath(), 3); + ASSERT_TRUE(FileHelper::MoveDirectory(tmp_segment_path, new_segment_path)); + + seg_options.read_only_ = true; + version_manager->set_enable_mmap(!forward_use_parquet); + auto output_segment_ret = + Segment::Open(GetColPath(), *schema, *compact_task.output_segment_meta_, + id_map, delete_store, version_manager, seg_options); + ASSERT_TRUE(output_segment_ret.has_value()); + auto output_segment = std::move(output_segment_ret.value()); + + ASSERT_EQ(output_segment->doc_count(), 900); + ASSERT_NE(output_segment->Fetch(0), nullptr); + ASSERT_NE(output_segment->Fetch(899), nullptr); +} + +TEST_F(SegmentHelperTest, CompactTask_FilterMultiSegmentsRegression) { + auto schema = test::TestHelper::CreateSchemaWithVectorIndex(); + + Version version; + version.set_schema(*schema); + auto version_manager_tmp = VersionManager::Create(col_path, version); + if (!version_manager_tmp.has_value()) { + throw std::runtime_error("Failed to create version manager"); + } + + auto version_manager = version_manager_tmp.value(); + bool forward_use_parquet = false; + auto seg_options = + SegmentOptions{false, !forward_use_parquet, DEFAULT_MAX_BUFFER_SIZE}; + + auto seg1 = test::TestHelper::CreateSegmentWithDoc( + GetColPath(), *schema, 0, 0, id_map, delete_store, version_manager, + seg_options, 0, 400); + auto seg2 = test::TestHelper::CreateSegmentWithDoc( + GetColPath(), *schema, 1, 400, id_map, delete_store, version_manager, + seg_options, 400, 400); + ASSERT_TRUE(seg1 != nullptr); + ASSERT_TRUE(seg2 != nullptr); + ASSERT_TRUE(seg1->flush().ok()); + ASSERT_TRUE(seg2->flush().ok()); + + auto filter = std::make_shared([](uint64_t id) -> bool { + return id < 100 || (id >= 400 && id < 450); + }); + + CompactTask task(GetColPath(), schema, {seg1, seg2}, 2, filter, + forward_use_parquet, 1); + auto segment_task = SegmentTask::CreateComapctTask(task); + ASSERT_TRUE(segment_task != nullptr); + + auto status = SegmentHelper::Execute(segment_task); + ASSERT_TRUE(status.ok()); + + auto compact_task = std::get(segment_task->task_info()); + ASSERT_TRUE(compact_task.output_segment_meta_ != nullptr); + + auto tmp_segment_path = FileHelper::MakeTempSegmentPath(GetColPath(), 2); + auto new_segment_path = FileHelper::MakeSegmentPath(GetColPath(), 2); + ASSERT_TRUE(FileHelper::MoveDirectory(tmp_segment_path, new_segment_path)); + + seg_options.read_only_ = true; + version_manager->set_enable_mmap(!forward_use_parquet); + auto output_segment_ret = + Segment::Open(GetColPath(), *schema, *compact_task.output_segment_meta_, + id_map, delete_store, version_manager, seg_options); + ASSERT_TRUE(output_segment_ret.has_value()); + auto output_segment = std::move(output_segment_ret.value()); + + ASSERT_EQ(output_segment->doc_count(), 650); +} From 810ae693a02357dd522c38d1bf5577c1e77175d1 Mon Sep 17 00:00:00 2001 From: kgeg401 Date: Sat, 14 Feb 2026 20:52:22 -0500 Subject: [PATCH 2/2] style: align merge_with_optional_reuse call formatting --- src/db/index/segment/segment_helper.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/db/index/segment/segment_helper.cc b/src/db/index/segment/segment_helper.cc index a31d7088..48aa79de 100644 --- a/src/db/index/segment/segment_helper.cc +++ b/src/db/index/segment/segment_helper.cc @@ -684,9 +684,8 @@ Status SegmentHelper::ReduceVectorIndex( auto vector_index_path = FileHelper::MakeVectorIndexPath( output_segment_path, field->name(), vector_block_id); - s = merge_with_optional_reuse( - vector_index_path, *field, - [&](const Segment::Ptr &input_segment) { + s = merge_with_optional_reuse(vector_index_path, *field, + [&](const Segment::Ptr &input_segment) { return input_segment->get_vector_indexer(field->name()); }); CHECK_RETURN_STATUS(s);