Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion docs/filter.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ The system prioritizes **Pre-Filtering** followed by an adaptive search executio
*Optimized for range queries, high compression, and sequential access.*

### 2.1. Storage Architecture (Hybrid Bucket)
The database (LMDB) acts as a coarse-grained B+ Tree.
The database (LMDB) acts as a coarse-grained B+ Tree. NumericIndex opens two MDBX named databases: "numeric_forward" and "numeric_inverted".

In numeric_inverted
* **Key:** `[FieldID] + [Base_Value_32bit]`.
* Floats are mapped to lexicographically ordered integers to preserve sort order.
* Keys are stored in Big-Endian to support native cursor iteration.
Expand All @@ -35,6 +37,13 @@ The database (LMDB) acts as a coarse-grained B+ Tree.
* **Values:** Compressed as `uint16_t` deltas relative to the Key's `Base_Value`.
* **IDs:** Raw `idInt` array, index-aligned with values.

In numeric_forward
* **Key:** `[field string]:[4-byte big-endian integer from values]`
* Floats are mapped to lexicographically ordered integers to preserve sort order.
* Keys are stored in Big-Endian to support native cursor iteration
* **Value


### 2.2. Query Execution
* **Buckets Fully Inside Selection (Middle):** Use **Summary Bitmap**. Zero array access.
* **Buckets Partially Overlapping (Edges):** Scan `Values` array (SIMD), use indices to fetch specific `IDs`.
Expand Down
82 changes: 54 additions & 28 deletions src/core/ndd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "msgpack_ndd.hpp"
#include "quant_vector.hpp"
#include "wal.hpp"
#include "../utils/search_timing.hpp"
#include "../quant/dispatch.hpp"
#include <memory>
#include <deque>
Expand All @@ -26,6 +27,7 @@
#include <thread>
#include <atomic>
#include <optional>
#include <queue>
#include <random>
#include <type_traits>
#include <future>
Expand Down Expand Up @@ -1472,16 +1474,6 @@ class IndexManager {
}
}

std::optional<std::vector<ndd::VectorResult>> searchKNN(const std::string& index_id,
const std::vector<float>& query,
size_t k,
const nlohmann::json& filter_array,
ndd::FilterParams params = {},
bool include_vectors = false,
size_t ef = 0) {
return searchKNN(index_id, query, {}, {}, k, filter_array, params, include_vectors, ef);
}

std::optional<std::vector<ndd::VectorResult>>
searchKNN(const std::string& index_id,
const std::vector<float>& query,
Expand All @@ -1495,6 +1487,7 @@ class IndexManager {
float kDenseRrfWeight = settings::DEFAULT_DENSE_RRF_WEIGHT,
float kRrfRankConstant = settings::DEFAULT_RRF_RANK_CONSTANT)
{
ndd::ScopedSearchTiming search_total_timer(ndd::searchTimingStats().search_total);
const float kSparseRrfWeight = 1.0f - kDenseRrfWeight;
try {
auto entry_ptr = getIndexEntry(index_id);
Expand Down Expand Up @@ -1522,6 +1515,8 @@ class IndexManager {
// 0. Compute Filter Bitmap (Shared)
std::optional<ndd::RoaringBitmap> active_filter_bitmap;
if (!filter_array.empty()) {
ndd::ScopedSearchTiming filter_bitmap_timer(
ndd::searchTimingStats().filter_bitmap_compute);
active_filter_bitmap = entry.vector_storage->filter_store_->computeFilterBitmap(filter_array);
}
const ndd::RoaringBitmap* filter_ptr =
Expand Down Expand Up @@ -1579,29 +1574,60 @@ class IndexManager {
if (card == 0) {
// No results match filter
} else if (card < params.prefilter_threshold) {
// Strategy A: Brute Force on Small Subset
ndd::ScopedSearchTiming prefilter_total_timer(
ndd::searchTimingStats().prefilter_total);
ndd::recordPrefilterCardinality(card);

// Strategy A: Brute Force on Small Subset
std::vector<ndd::idInt> valid_ids;
valid_ids.reserve(card);
bitmap.iterate([](ndd::idInt id, void* ptr){
static_cast<std::vector<ndd::idInt>*>(ptr)->push_back(id);
return true;
}, &valid_ids);

// Fetch vectors
auto vector_batch = entry.vector_storage->get_vectors_batch(valid_ids);

// Prepare subset for bruteforce search
std::vector<std::pair<idInt, std::vector<uint8_t>>> vector_subset;
vector_subset.reserve(vector_batch.size());
for(auto& [nid, vbytes] : vector_batch) {
vector_subset.emplace_back(nid, std::move(vbytes));
{
ndd::ScopedSearchTiming bitmap_to_ids_timer(
ndd::searchTimingStats().prefilter_bitmap_to_ids);
valid_ids.reserve(card);
bitmap.iterate(
[](ndd::idInt id, void* ptr) {
static_cast<std::vector<ndd::idInt>*>(ptr)->push_back(id);
return true;
},
&valid_ids);
}

dense_results = hnswlib::searchKnnSubset<float>(
query_bytes.data(), vector_subset, k, space);
{
ndd::ScopedSearchTiming direct_score_timer(
ndd::searchTimingStats().prefilter_direct_mdbx_score);
auto distance_func = space->get_dist_func();
void* dist_func_param = space->get_dist_func_param();
std::priority_queue<std::pair<float, ndd::idInt>> top_results;

if(k > 0) {
entry.vector_storage->visit_vectors_by_ids(
valid_ids,
[&](ndd::idInt numeric_id, const void* vector_data) {
float distance = distance_func(query_bytes.data(),
vector_data,
dist_func_param);

if(top_results.size() < k) {
top_results.emplace(distance, numeric_id);
} else if(distance < top_results.top().first) {
top_results.pop();
top_results.emplace(distance, numeric_id);
}
});
}

dense_results.reserve(top_results.size());
while(!top_results.empty()) {
dense_results.push_back(top_results.top());
top_results.pop();
}
std::reverse(dense_results.begin(), dense_results.end());
}

} else {
// Strategy B: Filtered HNSW Search
ndd::ScopedSearchTiming filtered_hnsw_timer(
ndd::searchTimingStats().filtered_hnsw_search);
BitMapFilterFunctor functor(bitmap);
size_t effective_ef = ef > 0 ? ef : settings::DEFAULT_EF_SEARCH;

Expand Down Expand Up @@ -2280,4 +2306,4 @@ inline std::pair<bool, std::string> IndexManager::uploadBackup(const std::string
backup_store_.writeBackupJson(username, backup_db);

return {true, "Backup uploaded successfully"};
}
}
18 changes: 1 addition & 17 deletions src/filter/category_index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,29 +175,13 @@ namespace ndd {
return bitmap.contains(id);
}

void add_batch(const std::string& field,
const std::string& value,
const std::vector<ndd::idInt>& ids) {
if(ids.empty()) {
return;
}
std::string filter_key = format_filter_key(field, value);
ndd::RoaringBitmap bitmap = get_bitmap_internal(filter_key);
for(const auto& id : ids) {
bitmap.add(id);
}
store_bitmap_internal(filter_key, bitmap);
}

// Helper for batch operations where key is already formatted
void add_batch_by_key(const std::string& key, const std::vector<ndd::idInt>& ids) {
if(ids.empty()) {
return;
}
ndd::RoaringBitmap bitmap = get_bitmap_internal(key);
for(const auto& id : ids) {
bitmap.add(id);
}
bitmap.addMany(ids.size(), ids.data());
store_bitmap_internal(key, bitmap);
}

Expand Down
70 changes: 23 additions & 47 deletions src/filter/filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,11 @@ class Filter {
return;
}

// Create a map to collect IDs for each filter
std::unordered_map<std::string, std::vector<ndd::idInt>> filter_to_ids;
// Create a map to collect IDs for each label filter
std::unordered_map<std::string, std::vector<ndd::idInt>> label_filter_to_ids;
label_filter_to_ids.reserve(id_filter_pairs.size());
std::vector<ndd::filter::NumericBatchEntry> numeric_filter_entries;
numeric_filter_entries.reserve(id_filter_pairs.size());

// Group IDs by filter
for(const auto& [numeric_id, filter_json] : id_filter_pairs) {
Expand All @@ -417,7 +420,8 @@ class Filter {
}

if(type == FieldType::Unknown) {
LOG_DEBUG("Unsupported filter type for field '" << field << "'");
/*This should ideally be an error or atleast an info log.*/
LOG_INFO("Unsupported filter type for field '" << field << "'");
continue;
}

Expand All @@ -428,20 +432,19 @@ class Filter {

if(value.is_string()) {
std::string filter_key = format_filter_key(field, value.get<std::string>());
filter_to_ids[filter_key].push_back(numeric_id);
label_filter_to_ids[filter_key].emplace_back(numeric_id);
} else if(value.is_number()) {
// Use Numeric Index for numbers
uint32_t sortable_val;
if(value.is_number_integer()) {
sortable_val = ndd::filter::int_to_sortable(value.get<int>());
} else {
sortable_val = ndd::filter::float_to_sortable(value.get<float>());
}
numeric_index_->put(field, numeric_id, sortable_val);
numeric_filter_entries.emplace_back(field, numeric_id, sortable_val);
} else if(value.is_boolean()) {
std::string filter_key =
format_filter_key(field, value.get<bool>() ? "1" : "0");
filter_to_ids[filter_key].push_back(numeric_id);
label_filter_to_ids[filter_key].emplace_back(numeric_id);
} else {
LOG_WARN(1203,
index_id_,
Expand All @@ -455,8 +458,19 @@ class Filter {
}
}

/**
* XXX: For transactional correctness of filter adds, all the filters
* should be added in a single transaction.
* For now, they are being added in two different transactions.
* one for numeric_index and other for labels.
*/

if(!numeric_filter_entries.empty()) {
numeric_index_->put_batch(numeric_filter_entries);
}

// Process each filter with its batch of IDs
for(const auto& [filter_key, ids] : filter_to_ids) {
for(const auto& [filter_key, ids] : label_filter_to_ids) {
add_to_filter_batch(filter_key, ids);
}
}
Expand All @@ -471,45 +485,7 @@ class Filter {
}

void add_filters_from_json(ndd::idInt numeric_id, const std::string& filter_json) {
try {
auto j = nlohmann::json::parse(filter_json);
for(const auto& [field, value] : j.items()) {
FieldType type = FieldType::Unknown;
if(value.is_boolean()) {
type = FieldType::Bool;
} else if(value.is_number()) {
type = FieldType::Number;
} else if(value.is_string()) {
type = FieldType::String;
}

if(type == FieldType::Unknown) {
LOG_DEBUG("Unsupported filter type for field '" << field << "'");
continue;
}

if(!register_field_type(field, type)) {
LOG_ERROR(1205, index_id_, "Type mismatch for field '" << field << "'");
continue;
}

if(value.is_string()) {
add_to_filter(field, value.get<std::string>(), numeric_id);
} else if(value.is_number()) {
uint32_t sortable_val;
if(value.is_number_integer()) {
sortable_val = ndd::filter::int_to_sortable(value.get<int>());
} else {
sortable_val = ndd::filter::float_to_sortable(value.get<float>());
}
numeric_index_->put(field, numeric_id, sortable_val);
} else if(value.is_boolean()) {
add_to_filter(field, value.get<bool>() ? "1" : "0", numeric_id);
}
}
} catch(const std::exception& e) {
LOG_ERROR(1206, index_id_, "Error adding filters: " << e.what());
}
add_filters_from_json_batch({{numeric_id, filter_json}});
}

void remove_filters_from_json(ndd::idInt numeric_id, const std::string& filter_json) {
Expand Down
40 changes: 38 additions & 2 deletions src/filter/numeric_index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@
namespace ndd {
namespace filter {

struct NumericBatchEntry {
std::string field;
ndd::idInt id;
uint32_t value;

NumericBatchEntry(std::string field_in, ndd::idInt id_in, uint32_t value_in) :
field(std::move(field_in)),
id(id_in),
value(value_in) {}
};

// --- Sortable Key Utilities ---
inline uint32_t float_to_sortable(float f) {
uint32_t i;
Expand Down Expand Up @@ -42,6 +53,14 @@ namespace ndd {

// --- Bucket Structure (Hybrid) ---
struct Bucket {
/**
* XXX: Ideally this bucket should be page size
* bounded. Currently it is difficult to do that
* here because the size of summary_bitmap depends
* on the kind of userspace filter upserts and not
* the number of them.
*/

static constexpr size_t MAX_SIZE = 1024;
static constexpr uint32_t MAX_DELTA = 65535;

Expand Down Expand Up @@ -250,11 +269,23 @@ namespace ndd {
}
}

void put(const std::string& field, ndd::idInt id, uint32_t value) {
/**
* TODO:
* 1. comprehensive error print and return.
* If there is an error here, there should be a way to reverse
* vector add operation.
*/
void put_batch(const std::vector<NumericBatchEntry>& entries) {
if(entries.empty()) {
return;
}

MDBX_txn* txn;
mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn);
try {
put_internal(txn, field, id, value);
for(const auto& entry : entries) {
put_internal(txn, entry.field, entry.id, entry.value);
}
mdbx_txn_commit(txn);
} catch(...) {
mdbx_txn_abort(txn);
Expand Down Expand Up @@ -327,6 +358,11 @@ namespace ndd {
rc = mdbx_cursor_get(cursor, &key, &data, MDBX_PREV);
}
} else if (rc == MDBX_NOTFOUND) {
/**
* The only possible bucket that could still contain
* value is the very last bucket in the database.
* Hence jumping there.
*/
rc = mdbx_cursor_get(cursor, &key, &data, MDBX_LAST);
}

Expand Down
1 change: 1 addition & 0 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ int main(int argc, char** argv) {
{{"status", "ok"},
{"timestamp", (std::int64_t)std::chrono::system_clock::now().time_since_epoch().count()}});
PRINT_LOG_TIME();
ndd::printSearchTimingStats();
ndd::printSparseSearchDebugStats();
ndd::printSparseUpdateDebugStats();
print_mdbx_stats();
Expand Down
Loading
Loading