diff --git a/docs/filter.md b/docs/filter.md index 7340840409..fa1fa45bac 100644 --- a/docs/filter.md +++ b/docs/filter.md @@ -25,7 +25,9 @@ The system prioritizes **Pre-Filtering** followed by an adaptive search executio *Optimized for range queries, high compression, and sequential access.* ### 2.1. Storage Architecture (Hybrid Bucket) -The database (LMDB) acts as a coarse-grained B+ Tree. +The database (LMDB) acts as a coarse-grained B+ Tree. NumericIndex opens two MDBX named databases: "numeric_forward" and "numeric_inverted". + +In numeric_inverted * **Key:** `[FieldID] + [Base_Value_32bit]`. * Floats are mapped to lexicographically ordered integers to preserve sort order. * Keys are stored in Big-Endian to support native cursor iteration. @@ -35,6 +37,13 @@ The database (LMDB) acts as a coarse-grained B+ Tree. * **Values:** Compressed as `uint16_t` deltas relative to the Key's `Base_Value`. * **IDs:** Raw `idInt` array, index-aligned with values. +In numeric_forward +* **Key:** `[field string]:[4-byte big-endian integer from values]` + * Floats are mapped to lexicographically ordered integers to preserve sort order. + * Keys are stored in Big-Endian to support native cursor iteration +* **Value + + ### 2.2. Query Execution * **Buckets Fully Inside Selection (Middle):** Use **Summary Bitmap**. Zero array access. * **Buckets Partially Overlapping (Edges):** Scan `Values` array (SIMD), use indices to fetch specific `IDs`. diff --git a/src/core/ndd.hpp b/src/core/ndd.hpp index 55f6e5bc57..f31510622b 100644 --- a/src/core/ndd.hpp +++ b/src/core/ndd.hpp @@ -13,6 +13,7 @@ #include "msgpack_ndd.hpp" #include "quant_vector.hpp" #include "wal.hpp" +#include "../utils/search_timing.hpp" #include "../quant/dispatch.hpp" #include #include @@ -26,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -1472,16 +1474,6 @@ class IndexManager { } } - std::optional> searchKNN(const std::string& index_id, - const std::vector& query, - size_t k, - const nlohmann::json& filter_array, - ndd::FilterParams params = {}, - bool include_vectors = false, - size_t ef = 0) { - return searchKNN(index_id, query, {}, {}, k, filter_array, params, include_vectors, ef); - } - std::optional> searchKNN(const std::string& index_id, const std::vector& query, @@ -1495,6 +1487,7 @@ class IndexManager { float kDenseRrfWeight = settings::DEFAULT_DENSE_RRF_WEIGHT, float kRrfRankConstant = settings::DEFAULT_RRF_RANK_CONSTANT) { + ndd::ScopedSearchTiming search_total_timer(ndd::searchTimingStats().search_total); const float kSparseRrfWeight = 1.0f - kDenseRrfWeight; try { auto entry_ptr = getIndexEntry(index_id); @@ -1522,6 +1515,8 @@ class IndexManager { // 0. Compute Filter Bitmap (Shared) std::optional active_filter_bitmap; if (!filter_array.empty()) { + ndd::ScopedSearchTiming filter_bitmap_timer( + ndd::searchTimingStats().filter_bitmap_compute); active_filter_bitmap = entry.vector_storage->filter_store_->computeFilterBitmap(filter_array); } const ndd::RoaringBitmap* filter_ptr = @@ -1579,29 +1574,60 @@ class IndexManager { if (card == 0) { // No results match filter } else if (card < params.prefilter_threshold) { - // Strategy A: Brute Force on Small Subset + ndd::ScopedSearchTiming prefilter_total_timer( + ndd::searchTimingStats().prefilter_total); + ndd::recordPrefilterCardinality(card); + + // Strategy A: Brute Force on Small Subset std::vector valid_ids; - valid_ids.reserve(card); - bitmap.iterate([](ndd::idInt id, void* ptr){ - static_cast*>(ptr)->push_back(id); - return true; - }, &valid_ids); - - // Fetch vectors - auto vector_batch = entry.vector_storage->get_vectors_batch(valid_ids); - - // Prepare subset for bruteforce search - std::vector>> vector_subset; - vector_subset.reserve(vector_batch.size()); - for(auto& [nid, vbytes] : vector_batch) { - vector_subset.emplace_back(nid, std::move(vbytes)); + { + ndd::ScopedSearchTiming bitmap_to_ids_timer( + ndd::searchTimingStats().prefilter_bitmap_to_ids); + valid_ids.reserve(card); + bitmap.iterate( + [](ndd::idInt id, void* ptr) { + static_cast*>(ptr)->push_back(id); + return true; + }, + &valid_ids); } - dense_results = hnswlib::searchKnnSubset( - query_bytes.data(), vector_subset, k, space); + { + ndd::ScopedSearchTiming direct_score_timer( + ndd::searchTimingStats().prefilter_direct_mdbx_score); + auto distance_func = space->get_dist_func(); + void* dist_func_param = space->get_dist_func_param(); + std::priority_queue> top_results; + + if(k > 0) { + entry.vector_storage->visit_vectors_by_ids( + valid_ids, + [&](ndd::idInt numeric_id, const void* vector_data) { + float distance = distance_func(query_bytes.data(), + vector_data, + dist_func_param); + + if(top_results.size() < k) { + top_results.emplace(distance, numeric_id); + } else if(distance < top_results.top().first) { + top_results.pop(); + top_results.emplace(distance, numeric_id); + } + }); + } + + dense_results.reserve(top_results.size()); + while(!top_results.empty()) { + dense_results.push_back(top_results.top()); + top_results.pop(); + } + std::reverse(dense_results.begin(), dense_results.end()); + } } else { // Strategy B: Filtered HNSW Search + ndd::ScopedSearchTiming filtered_hnsw_timer( + ndd::searchTimingStats().filtered_hnsw_search); BitMapFilterFunctor functor(bitmap); size_t effective_ef = ef > 0 ? ef : settings::DEFAULT_EF_SEARCH; @@ -2280,4 +2306,4 @@ inline std::pair IndexManager::uploadBackup(const std::string backup_store_.writeBackupJson(username, backup_db); return {true, "Backup uploaded successfully"}; -} \ No newline at end of file +} diff --git a/src/filter/category_index.hpp b/src/filter/category_index.hpp index 58ffa62c69..f898fa67c8 100644 --- a/src/filter/category_index.hpp +++ b/src/filter/category_index.hpp @@ -175,29 +175,13 @@ namespace ndd { return bitmap.contains(id); } - void add_batch(const std::string& field, - const std::string& value, - const std::vector& ids) { - if(ids.empty()) { - return; - } - std::string filter_key = format_filter_key(field, value); - ndd::RoaringBitmap bitmap = get_bitmap_internal(filter_key); - for(const auto& id : ids) { - bitmap.add(id); - } - store_bitmap_internal(filter_key, bitmap); - } - // Helper for batch operations where key is already formatted void add_batch_by_key(const std::string& key, const std::vector& ids) { if(ids.empty()) { return; } ndd::RoaringBitmap bitmap = get_bitmap_internal(key); - for(const auto& id : ids) { - bitmap.add(id); - } + bitmap.addMany(ids.size(), ids.data()); store_bitmap_internal(key, bitmap); } diff --git a/src/filter/filter.hpp b/src/filter/filter.hpp index 392fe2b404..5d581e47b4 100644 --- a/src/filter/filter.hpp +++ b/src/filter/filter.hpp @@ -399,8 +399,11 @@ class Filter { return; } - // Create a map to collect IDs for each filter - std::unordered_map> filter_to_ids; + // Create a map to collect IDs for each label filter + std::unordered_map> label_filter_to_ids; + label_filter_to_ids.reserve(id_filter_pairs.size()); + std::vector numeric_filter_entries; + numeric_filter_entries.reserve(id_filter_pairs.size()); // Group IDs by filter for(const auto& [numeric_id, filter_json] : id_filter_pairs) { @@ -417,7 +420,8 @@ class Filter { } if(type == FieldType::Unknown) { - LOG_DEBUG("Unsupported filter type for field '" << field << "'"); + /*This should ideally be an error or atleast an info log.*/ + LOG_INFO("Unsupported filter type for field '" << field << "'"); continue; } @@ -428,20 +432,19 @@ class Filter { if(value.is_string()) { std::string filter_key = format_filter_key(field, value.get()); - filter_to_ids[filter_key].push_back(numeric_id); + label_filter_to_ids[filter_key].emplace_back(numeric_id); } else if(value.is_number()) { - // Use Numeric Index for numbers uint32_t sortable_val; if(value.is_number_integer()) { sortable_val = ndd::filter::int_to_sortable(value.get()); } else { sortable_val = ndd::filter::float_to_sortable(value.get()); } - numeric_index_->put(field, numeric_id, sortable_val); + numeric_filter_entries.emplace_back(field, numeric_id, sortable_val); } else if(value.is_boolean()) { std::string filter_key = format_filter_key(field, value.get() ? "1" : "0"); - filter_to_ids[filter_key].push_back(numeric_id); + label_filter_to_ids[filter_key].emplace_back(numeric_id); } else { LOG_WARN(1203, index_id_, @@ -455,8 +458,19 @@ class Filter { } } + /** + * XXX: For transactional correctness of filter adds, all the filters + * should be added in a single transaction. + * For now, they are being added in two different transactions. + * one for numeric_index and other for labels. + */ + + if(!numeric_filter_entries.empty()) { + numeric_index_->put_batch(numeric_filter_entries); + } + // Process each filter with its batch of IDs - for(const auto& [filter_key, ids] : filter_to_ids) { + for(const auto& [filter_key, ids] : label_filter_to_ids) { add_to_filter_batch(filter_key, ids); } } @@ -471,45 +485,7 @@ class Filter { } void add_filters_from_json(ndd::idInt numeric_id, const std::string& filter_json) { - try { - auto j = nlohmann::json::parse(filter_json); - for(const auto& [field, value] : j.items()) { - FieldType type = FieldType::Unknown; - if(value.is_boolean()) { - type = FieldType::Bool; - } else if(value.is_number()) { - type = FieldType::Number; - } else if(value.is_string()) { - type = FieldType::String; - } - - if(type == FieldType::Unknown) { - LOG_DEBUG("Unsupported filter type for field '" << field << "'"); - continue; - } - - if(!register_field_type(field, type)) { - LOG_ERROR(1205, index_id_, "Type mismatch for field '" << field << "'"); - continue; - } - - if(value.is_string()) { - add_to_filter(field, value.get(), numeric_id); - } else if(value.is_number()) { - uint32_t sortable_val; - if(value.is_number_integer()) { - sortable_val = ndd::filter::int_to_sortable(value.get()); - } else { - sortable_val = ndd::filter::float_to_sortable(value.get()); - } - numeric_index_->put(field, numeric_id, sortable_val); - } else if(value.is_boolean()) { - add_to_filter(field, value.get() ? "1" : "0", numeric_id); - } - } - } catch(const std::exception& e) { - LOG_ERROR(1206, index_id_, "Error adding filters: " << e.what()); - } + add_filters_from_json_batch({{numeric_id, filter_json}}); } void remove_filters_from_json(ndd::idInt numeric_id, const std::string& filter_json) { diff --git a/src/filter/numeric_index.hpp b/src/filter/numeric_index.hpp index c002652137..960545c0ec 100644 --- a/src/filter/numeric_index.hpp +++ b/src/filter/numeric_index.hpp @@ -14,6 +14,17 @@ namespace ndd { namespace filter { + struct NumericBatchEntry { + std::string field; + ndd::idInt id; + uint32_t value; + + NumericBatchEntry(std::string field_in, ndd::idInt id_in, uint32_t value_in) : + field(std::move(field_in)), + id(id_in), + value(value_in) {} + }; + // --- Sortable Key Utilities --- inline uint32_t float_to_sortable(float f) { uint32_t i; @@ -42,6 +53,14 @@ namespace ndd { // --- Bucket Structure (Hybrid) --- struct Bucket { + /** + * XXX: Ideally this bucket should be page size + * bounded. Currently it is difficult to do that + * here because the size of summary_bitmap depends + * on the kind of userspace filter upserts and not + * the number of them. + */ + static constexpr size_t MAX_SIZE = 1024; static constexpr uint32_t MAX_DELTA = 65535; @@ -250,11 +269,23 @@ namespace ndd { } } - void put(const std::string& field, ndd::idInt id, uint32_t value) { + /** + * TODO: + * 1. comprehensive error print and return. + * If there is an error here, there should be a way to reverse + * vector add operation. + */ + void put_batch(const std::vector& entries) { + if(entries.empty()) { + return; + } + MDBX_txn* txn; mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); try { - put_internal(txn, field, id, value); + for(const auto& entry : entries) { + put_internal(txn, entry.field, entry.id, entry.value); + } mdbx_txn_commit(txn); } catch(...) { mdbx_txn_abort(txn); @@ -327,6 +358,11 @@ namespace ndd { rc = mdbx_cursor_get(cursor, &key, &data, MDBX_PREV); } } else if (rc == MDBX_NOTFOUND) { + /** + * The only possible bucket that could still contain + * value is the very last bucket in the database. + * Hence jumping there. + */ rc = mdbx_cursor_get(cursor, &key, &data, MDBX_LAST); } diff --git a/src/main.cpp b/src/main.cpp index 4654a54c20..28918be54f 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -261,6 +261,7 @@ int main(int argc, char** argv) { {{"status", "ok"}, {"timestamp", (std::int64_t)std::chrono::system_clock::now().time_since_epoch().count()}}); PRINT_LOG_TIME(); + ndd::printSearchTimingStats(); ndd::printSparseSearchDebugStats(); ndd::printSparseUpdateDebugStats(); print_mdbx_stats(); diff --git a/src/storage/vector_storage.hpp b/src/storage/vector_storage.hpp index 8ca7f56ab9..e5430be930 100644 --- a/src/storage/vector_storage.hpp +++ b/src/storage/vector_storage.hpp @@ -13,6 +13,7 @@ #include #include #include +#include // Handles vector storage class VectorStore { @@ -339,6 +340,40 @@ class VectorStore { } } + template + size_t visit_vectors_by_ids(const std::vector& numeric_ids, + Visitor&& visitor) const { + if(numeric_ids.empty()) { + return 0; + } + + MDBX_txn* txn; + int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_RDONLY, &txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error(std::string("Failed to begin transaction: ") + mdbx_strerror(rc)); + } + + size_t visited = 0; + try { + for(const auto& numeric_id : numeric_ids) { + MDBX_val key{const_cast(&numeric_id), sizeof(ndd::idInt)}; + MDBX_val data; + + rc = mdbx_get(txn, dbi_, &key, &data); + if(rc == MDBX_SUCCESS && data.iov_len == bytes_per_vector_) { + visitor(numeric_id, static_cast(data.iov_base)); + visited++; + } + } + + mdbx_txn_abort(txn); + return visited; + } catch(...) { + mdbx_txn_abort(txn); + throw; + } + } + void remove(ndd::idInt numeric_id) { MDBX_txn* txn; int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); @@ -745,6 +780,15 @@ class VectorStorage { get_vectors_batch(const std::vector& numeric_ids) const { return vector_store_->get_vectors_batch(numeric_ids); } + + template + size_t visit_vectors_by_ids(const std::vector& numeric_ids, + Visitor&& visitor) const { + return vector_store_->visit_vectors_by_ids( + numeric_ids, + std::forward(visitor)); + } + ndd::VectorMeta get_meta(ndd::idInt numeric_id) const { return meta_store_->get_meta(numeric_id); } diff --git a/src/utils/search_timing.hpp b/src/utils/search_timing.hpp new file mode 100644 index 0000000000..cad40fe5f9 --- /dev/null +++ b/src/utils/search_timing.hpp @@ -0,0 +1,142 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace ndd { + inline constexpr bool SEARCH_TIMING_ENABLED = false; + + struct SearchTimingCounter { + std::atomic calls{0}; + std::atomic total_ns{0}; + }; + + struct SearchTimingStats { + SearchTimingCounter search_total; + SearchTimingCounter filter_bitmap_compute; + SearchTimingCounter filtered_hnsw_search; + SearchTimingCounter prefilter_total; + SearchTimingCounter prefilter_bitmap_to_ids; + SearchTimingCounter prefilter_direct_mdbx_score; + SearchTimingCounter prefilter_mdbx_get; + SearchTimingCounter prefilter_distance_compute; + std::atomic prefilter_cardinality_total{0}; + std::atomic prefilter_cardinality_max{0}; + }; + + inline SearchTimingStats& searchTimingStats() { + static SearchTimingStats stats; + return stats; + } + + inline timespec searchTimingNow() { + timespec ts{}; + clock_gettime(CLOCK_MONOTONIC, &ts); + return ts; + } + + inline uint64_t searchTimingElapsedNs(const timespec& start, const timespec& end) { + const uint64_t start_ns = + static_cast(start.tv_sec) * 1'000'000'000ULL + + static_cast(start.tv_nsec); + const uint64_t end_ns = + static_cast(end.tv_sec) * 1'000'000'000ULL + + static_cast(end.tv_nsec); + return end_ns >= start_ns ? end_ns - start_ns : 0; + } + + inline void addSearchTiming(SearchTimingCounter& counter, uint64_t elapsed_ns) { + if constexpr(SEARCH_TIMING_ENABLED) { + counter.calls.fetch_add(1, std::memory_order_relaxed); + counter.total_ns.fetch_add(elapsed_ns, std::memory_order_relaxed); + } + } + + class ScopedSearchTiming { + public: + explicit ScopedSearchTiming(SearchTimingCounter& counter) : + counter_(SEARCH_TIMING_ENABLED ? &counter : nullptr) { + if constexpr(SEARCH_TIMING_ENABLED) { + start_ = searchTimingNow(); + } + } + + ~ScopedSearchTiming() { + if constexpr(SEARCH_TIMING_ENABLED) { + addSearchTiming(*counter_, + searchTimingElapsedNs(start_, searchTimingNow())); + } + } + + private: + SearchTimingCounter* counter_{nullptr}; + timespec start_{}; + }; + + inline void recordPrefilterCardinality(size_t cardinality) { + if constexpr(!SEARCH_TIMING_ENABLED) { + return; + } + SearchTimingStats& stats = searchTimingStats(); + stats.prefilter_cardinality_total.fetch_add(static_cast(cardinality), + std::memory_order_relaxed); + + uint64_t current_max = + stats.prefilter_cardinality_max.load(std::memory_order_relaxed); + const uint64_t card = static_cast(cardinality); + while(card > current_max + && !stats.prefilter_cardinality_max.compare_exchange_weak( + current_max, card, std::memory_order_relaxed)) { + } + } + + inline void printSearchTimingStats() { + if constexpr(!SEARCH_TIMING_ENABLED) { + return; + } + SearchTimingStats& stats = searchTimingStats(); + + auto print_counter = [](const char* name, SearchTimingCounter& counter) -> uint64_t { + const uint64_t calls = counter.calls.exchange(0, std::memory_order_relaxed); + const uint64_t total_ns = counter.total_ns.exchange(0, std::memory_order_relaxed); + const double total_ms = static_cast(total_ns) / 1'000'000.0; + const double avg_ms = calls ? total_ms / static_cast(calls) : 0.0; + std::cerr << name << " count: " << calls << '\n'; + std::cerr << name << " total(ms): " + << std::fixed << std::setprecision(3) << total_ms << '\n'; + std::cerr << name << " avg(ms): " + << std::fixed << std::setprecision(3) << avg_ms << '\n'; + return calls; + }; + + std::cerr << "Search timing stats since last healthcheck\n"; + print_counter("search_total", stats.search_total); + print_counter("filter_bitmap_compute", stats.filter_bitmap_compute); + print_counter("filtered_hnsw_search", stats.filtered_hnsw_search); + const uint64_t prefilter_calls = print_counter("prefilter_total", stats.prefilter_total); + print_counter("prefilter_bitmap_to_ids", stats.prefilter_bitmap_to_ids); + print_counter("prefilter_direct_mdbx_score", stats.prefilter_direct_mdbx_score); + print_counter("prefilter_mdbx_get", stats.prefilter_mdbx_get); + print_counter("prefilter_distance_compute", stats.prefilter_distance_compute); + + const uint64_t cardinality_total = + stats.prefilter_cardinality_total.exchange(0, std::memory_order_relaxed); + const uint64_t cardinality_max = + stats.prefilter_cardinality_max.exchange(0, std::memory_order_relaxed); + std::cerr << "prefilter_cardinality total: " << cardinality_total << '\n'; + std::cerr << "prefilter_cardinality max: " << cardinality_max << '\n'; + std::cerr << "prefilter_cardinality avg: " + << std::fixed << std::setprecision(3) + << (prefilter_calls + ? static_cast(cardinality_total) + / static_cast(prefilter_calls) + : 0.0) + << '\n'; + std::cerr << "=================================\n"; + } + +} // namespace ndd diff --git a/src/utils/types.hpp b/src/utils/types.hpp new file mode 100644 index 0000000000..431407b7e9 --- /dev/null +++ b/src/utils/types.hpp @@ -0,0 +1,18 @@ +#pragma once + +#include +#include +#include + +namespace ndd { + +template +struct OperationResult { + unsigned int code = 0; + std::string message; + std::optional value; + + bool ok() const { return code == 0; } +}; + +} // namespace ndd