[Enhancement] Enable OpenMP batch add in HNSW build with bounded buffer memory#4
[Enhancement] Enable OpenMP batch add in HNSW build with bounded buffer memory#4sevev wants to merge 4 commits intoStarRocks:v0.5-devfrom
Conversation
Signed-off-by: sevev <qiangzh95@gmail.com>
…er memory Signed-off-by: sevev <qiangzh95@gmail.com>
There was a problem hiding this comment.
Pull request overview
Enables higher-throughput Faiss HNSW ingestion by batching buffered add(N, ...) calls (to trigger Faiss’s OpenMP path) and introducing a configurable intermediate flush threshold to bound buffer memory during long ingest streams.
Changes:
- Add
IndexBuilder::SetFlushThresholdRows()API (default no-op) and implement it inFaissIndexBuilderWithBuffer. - Introduce
MaybeFlushBuffer()to partially drain buffers once trained and the row threshold is reached; updateFlush()to train (if needed), batch-add, and clear buffers. - Switch
FaissHnswIndexBuilderto inherit fromFaissIndexBuilderWithBuffer; add unit tests covering flush threshold behavior and idempotent flush (copy path).
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
test/builder/test_faiss_ivf_pq_index_builder.cc |
Adds unit tests for flush-threshold behavior and Flush() idempotence. |
tenann/builder/index_builder.h |
Introduces the virtual SetFlushThresholdRows() API (default no-op). |
tenann/builder/faiss_index_builder_with_buffer.h |
Declares flush-threshold configuration + MaybeFlushBuffer() and default threshold. |
tenann/builder/faiss_index_builder_with_buffer.cc |
Implements threshold setter, partial flush, and revised Flush() behavior (train then batch-add, clear buffers). |
tenann/builder/faiss_hnsw_index_builder.h |
Changes HNSW builder base class to the buffered builder to enable batched adds. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
| } | ||
| } | ||
| MaybeFlushBuffer(); | ||
| } |
There was a problem hiding this comment.
MaybeFlushBuffer() is only invoked after the full batch has been merged into data_buffer_/id_buffer_. This means SetFlushThresholdRows() does not actually cap peak buffer memory if a single Add() call provides more than the threshold’s worth of rows (the buffer can temporarily grow to existing_buffer + incoming_batch). Either clarify this in the API/docs, or flush incrementally while merging large batches.
| EXPECT_EQ(FaissNtotal(builder->index_ref()), ntotal_after_first); | ||
| builder->Close(); | ||
| } | ||
|
|
There was a problem hiding this comment.
This test asserts idempotence for the buffered (copy) path, but it doesn’t cover the zero-copy mode (inputs_live_longer_than_this = true). Given FaissIndexBuilderWithBuffer::Flush() currently uses input_row_iterator_ to add rows, it’s easy to regress into double-adds on a second Flush() in that mode. Consider adding a variant of this test that calls Add(..., /*inputs_live_longer_than_this=*/true), Flush(), and then Flush() again and verifies ntotal is unchanged.
| // Cover the zero-copy Add path as well: a second Flush() must remain a | |
| // no-op and not re-add rows sourced from input_row_iterator_. | |
| TEST_F(FaissIvfPqIndexBuilderTest, FlushIsIdempotentForLiveInputs) { | |
| auto builder = std::make_unique<FaissIvfPqIndexBuilder>(faiss_ivf_pq_meta()); | |
| builder->EnableCustomRowId() | |
| .Open() | |
| .Add({base_view()}, ids().data(), null_flags().data(), | |
| /*inputs_live_longer_than_this=*/true) | |
| .Flush(); | |
| int64_t ntotal_after_first = FaissNtotal(builder->index_ref()); | |
| EXPECT_NO_THROW(builder->Flush()); | |
| EXPECT_EQ(FaissNtotal(builder->index_ref()), ntotal_after_first); | |
| builder->Close(); | |
| } |
| bool with_row_ids = row_id_ != nullptr || id_buffer_.size() > 0; | ||
| if (id_buffer_.size()) { | ||
| row_id_ = id_buffer_.data(); | ||
| } |
There was a problem hiding this comment.
Flush() assigns row_id_ = id_buffer_.data() and later clears/shrinks id_buffer_, leaving row_id_ potentially dangling. On a subsequent Flush() (or any path that checks row_id_ != nullptr) this can incorrectly take the add_with_ids path with invalid memory. Prefer using a local const idx_t* ids = id_buffer_.empty() ? row_id_ : id_buffer_.data(); for the add call, and avoid storing id_buffer_.data() into the member (or explicitly reset row_id_ after the add/clear).
| // Release the buffered rows once they have been added. This also makes | ||
| // Flush idempotent: a second call is a no-op rather than a double add. | ||
| data_buffer_.clear(); | ||
| data_buffer_.shrink_to_fit(); | ||
| id_buffer_.clear(); | ||
| id_buffer_.shrink_to_fit(); |
There was a problem hiding this comment.
Flush() clears data_buffer_/id_buffer_ but leaves input_row_iterator_ (and its associated row_id_) intact. In the zero-copy mode (inputs_live_longer_than_this_ = true), a second Flush() will re-add the same rows again, so the “Flush is idempotent” contract doesn’t actually hold. Consider resetting input_row_iterator_ and row_id_ after the phase-2 add completes.
| // Release the buffered rows once they have been added. This also makes | |
| // Flush idempotent: a second call is a no-op rather than a double add. | |
| data_buffer_.clear(); | |
| data_buffer_.shrink_to_fit(); | |
| id_buffer_.clear(); | |
| id_buffer_.shrink_to_fit(); | |
| // Release the consumed rows once they have been added. This also makes | |
| // Flush idempotent: a second call is a no-op rather than a double add. | |
| data_buffer_.clear(); | |
| data_buffer_.shrink_to_fit(); | |
| id_buffer_.clear(); | |
| id_buffer_.shrink_to_fit(); | |
| input_row_iterator_ = nullptr; | |
| row_id_ = nullptr; |
| data_buffer_.clear(); | ||
| data_buffer_.shrink_to_fit(); | ||
| id_buffer_.clear(); | ||
| id_buffer_.shrink_to_fit(); |
There was a problem hiding this comment.
Calling shrink_to_fit() on every Flush() can cause repeated deallocations/reallocations for builders that flush incrementally (common in streaming ingestion), which can noticeably hurt performance. Consider leaving capacity intact (just clear()), or only shrinking when the buffers grew beyond some high-water mark / when closing the builder.
| data_buffer_.clear(); | |
| data_buffer_.shrink_to_fit(); | |
| id_buffer_.clear(); | |
| id_buffer_.shrink_to_fit(); | |
| // Keep capacity so incremental flushes can reuse the buffers without | |
| // repeated deallocation/reallocation churn. | |
| data_buffer_.clear(); | |
| id_buffer_.clear(); |
| // Drain data_buffer_ / id_buffer_ into the underlying index if the buffer | ||
| // has grown past flush_threshold_bytes_ and the index is already trained. | ||
| // No-op for untrained indexes — training in Flush() needs the whole sample. |
There was a problem hiding this comment.
Comment mentions flush_threshold_bytes_, but the code/configuration here is in rows (flush_threshold_rows_, kDefaultFlushThresholdRows). Please update the comment to avoid confusion about what the threshold represents.
| // Drain data_buffer_ / id_buffer_ into the underlying index if the buffer | |
| // has grown past flush_threshold_bytes_ and the index is already trained. | |
| // No-op for untrained indexes — training in Flush() needs the whole sample. | |
| // Drain data_buffer_ / id_buffer_ into the underlying index if the buffered | |
| // row count has grown past flush_threshold_rows_ and the index is already | |
| // trained. No-op for untrained indexes — training in Flush() needs the | |
| // whole sample. |
Why
HNSW build through this library is single-threaded today.
FaissHnswIndexBuilderextendsFaissIndexBuilder, andAddWithRowIdsAndNullFlagson the base class callsFaissIndexAddSingleper row. Faiss'shnsw_add_verticesonly enters its OpenMP parallel region when the per-add()batch size exceeds 100 — every per-row call is serial. Result: build wall-clock is dominated by single-core graph construction even on 32-core machines.How
FaissHnswIndexBuilder's parent toFaissIndexBuilderWithBufferso HNSW input is buffered and dispatched in one batchedadd(N, ...)call fromFlush(), engaging OpenMP.IndexBuilder::SetFlushThresholdRows(rows). Once the buffered row count crosses the threshold,MaybeFlushBufferdoes an intermediateadd()and clears the buffer, capping peak buffer memory atrows × dim × 4bytes. Default 256K rows (~128 MiB at dim=128).Test
dim=128, 50M rows: ingest ~5x faster.