Skip to content

[Enhancement] Enable OpenMP batch add in HNSW build with bounded buffer memory#4

Open
sevev wants to merge 4 commits intoStarRocks:v0.5-devfrom
sevev:openmp-batch-add
Open

[Enhancement] Enable OpenMP batch add in HNSW build with bounded buffer memory#4
sevev wants to merge 4 commits intoStarRocks:v0.5-devfrom
sevev:openmp-batch-add

Conversation

@sevev
Copy link
Copy Markdown
Collaborator

@sevev sevev commented Apr 15, 2026

Why

HNSW build through this library is single-threaded today. FaissHnswIndexBuilder extends FaissIndexBuilder, and AddWithRowIdsAndNullFlags on the base class calls FaissIndexAddSingle per row. Faiss's hnsw_add_vertices only enters its OpenMP parallel region when the per-add() batch size exceeds 100 — every per-row call is serial. Result: build wall-clock is dominated by single-core graph construction even on 32-core machines.

How

  1. Switch FaissHnswIndexBuilder's parent to FaissIndexBuilderWithBuffer so HNSW input is buffered and dispatched in one batched add(N, ...) call from Flush(), engaging OpenMP.
  2. Add IndexBuilder::SetFlushThresholdRows(rows). Once the buffered row count crosses the threshold, MaybeFlushBuffer does an intermediate add() and clears the buffer, capping peak buffer memory at rows × dim × 4 bytes. Default 256K rows (~128 MiB at dim=128).

Test

dim=128, 50M rows: ingest ~5x faster.

sevev added 4 commits April 7, 2026 13:56
Signed-off-by: sevev <qiangzh95@gmail.com>
Signed-off-by: sevev <qiangzh95@gmail.com>
…er memory

Signed-off-by: sevev <qiangzh95@gmail.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Enables higher-throughput Faiss HNSW ingestion by batching buffered add(N, ...) calls (to trigger Faiss’s OpenMP path) and introducing a configurable intermediate flush threshold to bound buffer memory during long ingest streams.

Changes:

  • Add IndexBuilder::SetFlushThresholdRows() API (default no-op) and implement it in FaissIndexBuilderWithBuffer.
  • Introduce MaybeFlushBuffer() to partially drain buffers once trained and the row threshold is reached; update Flush() to train (if needed), batch-add, and clear buffers.
  • Switch FaissHnswIndexBuilder to inherit from FaissIndexBuilderWithBuffer; add unit tests covering flush threshold behavior and idempotent flush (copy path).

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
test/builder/test_faiss_ivf_pq_index_builder.cc Adds unit tests for flush-threshold behavior and Flush() idempotence.
tenann/builder/index_builder.h Introduces the virtual SetFlushThresholdRows() API (default no-op).
tenann/builder/faiss_index_builder_with_buffer.h Declares flush-threshold configuration + MaybeFlushBuffer() and default threshold.
tenann/builder/faiss_index_builder_with_buffer.cc Implements threshold setter, partial flush, and revised Flush() behavior (train then batch-add, clear buffers).
tenann/builder/faiss_hnsw_index_builder.h Changes HNSW builder base class to the buffered builder to enable batched adds.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 172 to 176
}
}
}
MaybeFlushBuffer();
}
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MaybeFlushBuffer() is only invoked after the full batch has been merged into data_buffer_/id_buffer_. This means SetFlushThresholdRows() does not actually cap peak buffer memory if a single Add() call provides more than the threshold’s worth of rows (the buffer can temporarily grow to existing_buffer + incoming_batch). Either clarify this in the API/docs, or flush incrementally while merging large batches.

Copilot uses AI. Check for mistakes.
EXPECT_EQ(FaissNtotal(builder->index_ref()), ntotal_after_first);
builder->Close();
}

Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test asserts idempotence for the buffered (copy) path, but it doesn’t cover the zero-copy mode (inputs_live_longer_than_this = true). Given FaissIndexBuilderWithBuffer::Flush() currently uses input_row_iterator_ to add rows, it’s easy to regress into double-adds on a second Flush() in that mode. Consider adding a variant of this test that calls Add(..., /*inputs_live_longer_than_this=*/true), Flush(), and then Flush() again and verifies ntotal is unchanged.

Suggested change
// Cover the zero-copy Add path as well: a second Flush() must remain a
// no-op and not re-add rows sourced from input_row_iterator_.
TEST_F(FaissIvfPqIndexBuilderTest, FlushIsIdempotentForLiveInputs) {
auto builder = std::make_unique<FaissIvfPqIndexBuilder>(faiss_ivf_pq_meta());
builder->EnableCustomRowId()
.Open()
.Add({base_view()}, ids().data(), null_flags().data(),
/*inputs_live_longer_than_this=*/true)
.Flush();
int64_t ntotal_after_first = FaissNtotal(builder->index_ref());
EXPECT_NO_THROW(builder->Flush());
EXPECT_EQ(FaissNtotal(builder->index_ref()), ntotal_after_first);
builder->Close();
}

Copilot uses AI. Check for mistakes.
Comment on lines +93 to +96
bool with_row_ids = row_id_ != nullptr || id_buffer_.size() > 0;
if (id_buffer_.size()) {
row_id_ = id_buffer_.data();
}
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flush() assigns row_id_ = id_buffer_.data() and later clears/shrinks id_buffer_, leaving row_id_ potentially dangling. On a subsequent Flush() (or any path that checks row_id_ != nullptr) this can incorrectly take the add_with_ids path with invalid memory. Prefer using a local const idx_t* ids = id_buffer_.empty() ? row_id_ : id_buffer_.data(); for the add call, and avoid storing id_buffer_.data() into the member (or explicitly reset row_id_ after the add/clear).

Copilot uses AI. Check for mistakes.
Comment on lines +129 to +134
// Release the buffered rows once they have been added. This also makes
// Flush idempotent: a second call is a no-op rather than a double add.
data_buffer_.clear();
data_buffer_.shrink_to_fit();
id_buffer_.clear();
id_buffer_.shrink_to_fit();
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flush() clears data_buffer_/id_buffer_ but leaves input_row_iterator_ (and its associated row_id_) intact. In the zero-copy mode (inputs_live_longer_than_this_ = true), a second Flush() will re-add the same rows again, so the “Flush is idempotent” contract doesn’t actually hold. Consider resetting input_row_iterator_ and row_id_ after the phase-2 add completes.

Suggested change
// Release the buffered rows once they have been added. This also makes
// Flush idempotent: a second call is a no-op rather than a double add.
data_buffer_.clear();
data_buffer_.shrink_to_fit();
id_buffer_.clear();
id_buffer_.shrink_to_fit();
// Release the consumed rows once they have been added. This also makes
// Flush idempotent: a second call is a no-op rather than a double add.
data_buffer_.clear();
data_buffer_.shrink_to_fit();
id_buffer_.clear();
id_buffer_.shrink_to_fit();
input_row_iterator_ = nullptr;
row_id_ = nullptr;

Copilot uses AI. Check for mistakes.
Comment on lines +131 to +134
data_buffer_.clear();
data_buffer_.shrink_to_fit();
id_buffer_.clear();
id_buffer_.shrink_to_fit();
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling shrink_to_fit() on every Flush() can cause repeated deallocations/reallocations for builders that flush incrementally (common in streaming ingestion), which can noticeably hurt performance. Consider leaving capacity intact (just clear()), or only shrinking when the buffers grew beyond some high-water mark / when closing the builder.

Suggested change
data_buffer_.clear();
data_buffer_.shrink_to_fit();
id_buffer_.clear();
id_buffer_.shrink_to_fit();
// Keep capacity so incremental flushes can reuse the buffers without
// repeated deallocation/reallocation churn.
data_buffer_.clear();
id_buffer_.clear();

Copilot uses AI. Check for mistakes.
Comment on lines +51 to +53
// Drain data_buffer_ / id_buffer_ into the underlying index if the buffer
// has grown past flush_threshold_bytes_ and the index is already trained.
// No-op for untrained indexes — training in Flush() needs the whole sample.
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment mentions flush_threshold_bytes_, but the code/configuration here is in rows (flush_threshold_rows_, kDefaultFlushThresholdRows). Please update the comment to avoid confusion about what the threshold represents.

Suggested change
// Drain data_buffer_ / id_buffer_ into the underlying index if the buffer
// has grown past flush_threshold_bytes_ and the index is already trained.
// No-op for untrained indexes — training in Flush() needs the whole sample.
// Drain data_buffer_ / id_buffer_ into the underlying index if the buffered
// row count has grown past flush_threshold_rows_ and the index is already
// trained. No-op for untrained indexes — training in Flush() needs the
// whole sample.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants