diff --git a/CMakeLists.txt b/CMakeLists.txt index fd17b0b..3d2e02b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,6 +21,7 @@ include(cmake/third_party.cmake) # Valid values are "generic", "avx2", "avx512", "sve". option(NSPARSE_OPT_LEVEL "" "generic") option(NSPARSE_ENABLE_PYTHON "Build Python bindings" OFF) +option(NSPARSE_BUILD_JNI "Build JNI shared library for Java integration" OFF) add_subdirectory(nsparse) @@ -29,6 +30,11 @@ if(NSPARSE_ENABLE_PYTHON) add_subdirectory(nsparse/python) endif() +# JNI bindings +if(NSPARSE_BUILD_JNI) + add_subdirectory(jni) +endif() + # CTest must be included in the top level to enable `make test` target. include(CTest) diff --git a/jni/CMakeLists.txt b/jni/CMakeLists.txt new file mode 100644 index 0000000..c01cd12 --- /dev/null +++ b/jni/CMakeLists.txt @@ -0,0 +1,26 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 + +find_package(JNI REQUIRED) + +add_library(nsparse_jni SHARED nsparse_jni.cpp) + +target_include_directories(nsparse_jni PRIVATE + ${JNI_INCLUDE_DIRS} + ${PROJECT_SOURCE_DIR} +) + +target_link_libraries(nsparse_jni PRIVATE nsparse) + +set_target_properties(nsparse_jni PROPERTIES + POSITION_INDEPENDENT_CODE ON + OUTPUT_NAME "nsparse_jni" + PREFIX "lib" +) + +# Strip debug symbols in release builds +if(CMAKE_BUILD_TYPE STREQUAL "Release") + if(NOT WIN32) + target_link_options(nsparse_jni PRIVATE -s) + endif() +endif() diff --git a/jni/nsparse_jni.cpp b/jni/nsparse_jni.cpp new file mode 100644 index 0000000..dd7acb2 --- /dev/null +++ b/jni/nsparse_jni.cpp @@ -0,0 +1,392 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include + +#ifdef __linux__ +#include +#include +#include +#include +#include +#include +#include +#include +#endif + +#include + +#include "nsparse/index.h" +#include "nsparse/index_factory.h" +#include "nsparse/io/index_io.h" +#include "nsparse/seismic_index.h" +#include "nsparse/seismic_scalar_quantized_index.h" +#include "nsparse/types.h" + +namespace { + +void throw_java_exception(JNIEnv* env, const char* msg) { + jclass cls = env->FindClass("java/lang/RuntimeException"); + if (cls != nullptr) { + env->ThrowNew(cls, msg); + } +} + +nsparse::Index* to_index(jlong ptr) { + return reinterpret_cast(ptr); +} + +} // namespace + +extern "C" { + +JNIEXPORT jlong JNICALL Java_org_opensearch_neuralsearch_sparse_jni_NsparseJni_createIndex( + JNIEnv* env, jclass, jint dimension, jstring description) { + try { + const char* desc = env->GetStringUTFChars(description, nullptr); + std::string desc_str(desc); + env->ReleaseStringUTFChars(description, desc); + + nsparse::Index* index = nsparse::index_factory(dimension, desc_str.c_str()); + return reinterpret_cast(index); + } catch (const std::exception& e) { + throw_java_exception(env, e.what()); + return 0; + } +} + +JNIEXPORT void JNICALL Java_org_opensearch_neuralsearch_sparse_jni_NsparseJni_addVectors( + JNIEnv* env, jclass, jlong indexPtr, jint n, jintArray indptrArr, + jshortArray indicesArr, jfloatArray valuesArr) { + try { + nsparse::Index* index = to_index(indexPtr); + + jint* indptr = env->GetIntArrayElements(indptrArr, nullptr); + jshort* indices = env->GetShortArrayElements(indicesArr, nullptr); + jfloat* values = env->GetFloatArrayElements(valuesArr, nullptr); + + static_assert(sizeof(nsparse::idx_t) == sizeof(jint)); + static_assert(sizeof(nsparse::term_t) == sizeof(jshort)); + static_assert(sizeof(float) == sizeof(jfloat)); + + index->add( + static_cast(n), + reinterpret_cast(indptr), + reinterpret_cast(indices), + reinterpret_cast(values) + ); + + env->ReleaseIntArrayElements(indptrArr, indptr, JNI_ABORT); + env->ReleaseShortArrayElements(indicesArr, indices, JNI_ABORT); + env->ReleaseFloatArrayElements(valuesArr, values, JNI_ABORT); + } catch (const std::exception& e) { + throw_java_exception(env, e.what()); + } +} + +JNIEXPORT void JNICALL Java_org_opensearch_neuralsearch_sparse_jni_NsparseJni_addVectorsWithIds( + JNIEnv* env, jclass, jlong indexPtr, jint n, jintArray indptrArr, + jshortArray indicesArr, jfloatArray valuesArr, jintArray idsArr) { + try { + nsparse::Index* index = to_index(indexPtr); + + jint* indptr = env->GetIntArrayElements(indptrArr, nullptr); + jshort* indices = env->GetShortArrayElements(indicesArr, nullptr); + jfloat* values = env->GetFloatArrayElements(valuesArr, nullptr); + jint* ids = env->GetIntArrayElements(idsArr, nullptr); + + index->add_with_ids( + static_cast(n), + reinterpret_cast(indptr), + reinterpret_cast(indices), + reinterpret_cast(values), + reinterpret_cast(ids) + ); + + env->ReleaseIntArrayElements(indptrArr, indptr, JNI_ABORT); + env->ReleaseShortArrayElements(indicesArr, indices, JNI_ABORT); + env->ReleaseFloatArrayElements(valuesArr, values, JNI_ABORT); + env->ReleaseIntArrayElements(idsArr, ids, JNI_ABORT); + } catch (const std::exception& e) { + throw_java_exception(env, e.what()); + } +} + +JNIEXPORT void JNICALL Java_org_opensearch_neuralsearch_sparse_jni_NsparseJni_reserveIndex( + JNIEnv* env, jclass, jlong indexPtr, jlong numVectors, jlong totalNnz) { + try { + to_index(indexPtr)->reserve(static_cast(numVectors), + static_cast(totalNnz)); + } catch (const std::exception& e) { + throw_java_exception(env, e.what()); + } +} + +JNIEXPORT void JNICALL Java_org_opensearch_neuralsearch_sparse_jni_NsparseJni_buildIndex( + JNIEnv* env, jclass, jlong indexPtr) { + try { +#ifdef __linux__ + // Reduce mmap threshold so allocations >= 128KB use mmap instead of sbrk. + // mmap'd regions are returned to the OS immediately on free, preventing + // heap fragmentation that would otherwise retain tens of GB after build. + mallopt(M_MMAP_THRESHOLD, 128 * 1024); + mallopt(M_TRIM_THRESHOLD, 128 * 1024); +#endif + to_index(indexPtr)->build(); + } catch (const std::exception& e) { + throw_java_exception(env, e.what()); + } +} + +JNIEXPORT jobject JNICALL Java_org_opensearch_neuralsearch_sparse_jni_NsparseJni_search( + JNIEnv* env, jclass, jlong indexPtr, jint nQueries, jintArray indptrArr, + jshortArray indicesArr, jfloatArray valuesArr, jint k, + jfloat heapFactor, jint cut) { + try { + nsparse::Index* index = to_index(indexPtr); + + jint* indptr = env->GetIntArrayElements(indptrArr, nullptr); + jshort* indices = env->GetShortArrayElements(indicesArr, nullptr); + jfloat* values = env->GetFloatArrayElements(valuesArr, nullptr); + + int total = nQueries * k; + std::vector distances(total); + std::vector labels(total); + + nsparse::SeismicSearchParameters params(static_cast(cut), static_cast(heapFactor)); + + index->search( + static_cast(nQueries), + reinterpret_cast(indptr), + reinterpret_cast(indices), + reinterpret_cast(values), + static_cast(k), + distances.data(), + labels.data(), + ¶ms + ); + + env->ReleaseIntArrayElements(indptrArr, indptr, JNI_ABORT); + env->ReleaseShortArrayElements(indicesArr, indices, JNI_ABORT); + env->ReleaseFloatArrayElements(valuesArr, values, JNI_ABORT); + + jfloatArray distArr = env->NewFloatArray(total); + env->SetFloatArrayRegion(distArr, 0, total, distances.data()); + + jintArray labelArr = env->NewIntArray(total); + static_assert(sizeof(nsparse::idx_t) == sizeof(jint)); + env->SetIntArrayRegion(labelArr, 0, total, reinterpret_cast(labels.data())); + + jclass resultClass = env->FindClass("org/opensearch/neuralsearch/sparse/jni/SearchResult"); + jmethodID ctor = env->GetMethodID(resultClass, "", "([F[II)V"); + return env->NewObject(resultClass, ctor, distArr, labelArr, static_cast(k)); + } catch (const std::exception& e) { + throw_java_exception(env, e.what()); + return nullptr; + } +} + +JNIEXPORT jobject JNICALL Java_org_opensearch_neuralsearch_sparse_jni_NsparseJni_searchSQ( + JNIEnv* env, jclass, jlong indexPtr, jint nQueries, jintArray indptrArr, + jshortArray indicesArr, jfloatArray valuesArr, jint k, + jfloat heapFactor, jint cut, jfloat vmin, jfloat vmax) { + try { + nsparse::Index* index = to_index(indexPtr); + + jint* indptr = env->GetIntArrayElements(indptrArr, nullptr); + jshort* indices = env->GetShortArrayElements(indicesArr, nullptr); + jfloat* values = env->GetFloatArrayElements(valuesArr, nullptr); + + int total = nQueries * k; + std::vector distances(total); + std::vector labels(total); + + nsparse::SeismicSQSearchParameters params( + static_cast(vmin), static_cast(vmax), + static_cast(cut), static_cast(heapFactor)); + + index->search( + static_cast(nQueries), + reinterpret_cast(indptr), + reinterpret_cast(indices), + reinterpret_cast(values), + static_cast(k), + distances.data(), + labels.data(), + ¶ms + ); + + env->ReleaseIntArrayElements(indptrArr, indptr, JNI_ABORT); + env->ReleaseShortArrayElements(indicesArr, indices, JNI_ABORT); + env->ReleaseFloatArrayElements(valuesArr, values, JNI_ABORT); + + jfloatArray distArr = env->NewFloatArray(total); + env->SetFloatArrayRegion(distArr, 0, total, distances.data()); + + jintArray labelArr = env->NewIntArray(total); + env->SetIntArrayRegion(labelArr, 0, total, reinterpret_cast(labels.data())); + + jclass resultClass = env->FindClass("org/opensearch/neuralsearch/sparse/jni/SearchResult"); + jmethodID ctor = env->GetMethodID(resultClass, "", "([F[II)V"); + return env->NewObject(resultClass, ctor, distArr, labelArr, static_cast(k)); + } catch (const std::exception& e) { + throw_java_exception(env, e.what()); + return nullptr; + } +} + +JNIEXPORT void JNICALL Java_org_opensearch_neuralsearch_sparse_jni_NsparseJni_buildAndSaveIndex( + JNIEnv* env, jclass, jlong indexPtr, jstring path) { + try { + const char* pathStr = env->GetStringUTFChars(path, nullptr); + std::string pathCopy(pathStr); + env->ReleaseStringUTFChars(path, pathStr); + + nsparse::Index* index = to_index(indexPtr); +#ifdef __linux__ + mallopt(M_MMAP_THRESHOLD, 128 * 1024); + mallopt(M_TRIM_THRESHOLD, 128 * 1024); +#endif + index->build_and_save(pathCopy.c_str()); + index->release_build_memory(); + delete index; +#ifdef __linux__ + malloc_trim(0); +#endif + } catch (const std::exception& e) { + throw_java_exception(env, e.what()); + } +} + +JNIEXPORT void JNICALL Java_org_opensearch_neuralsearch_sparse_jni_NsparseJni_saveIndex( + JNIEnv* env, jclass, jlong indexPtr, jstring path) { + try { + const char* pathStr = env->GetStringUTFChars(path, nullptr); + std::string pathCopy(pathStr); + env->ReleaseStringUTFChars(path, pathStr); + + nsparse::Index* index = to_index(indexPtr); + nsparse::write_index(index, pathCopy.data()); + // Release vectors_ and clustered_inverted_lists immediately after save. + // The index will be loaded fresh from disk for search — keeping build + // data in memory wastes ~80GB and causes OOM when loadIndex runs. + index->release_build_memory(); +#ifdef __linux__ + malloc_trim(0); +#endif + } catch (const std::exception& e) { + throw_java_exception(env, e.what()); + } +} + +JNIEXPORT jlong JNICALL Java_org_opensearch_neuralsearch_sparse_jni_NsparseJni_loadIndex( + JNIEnv* env, jclass, jstring path) { + try { + const char* pathStr = env->GetStringUTFChars(path, nullptr); + std::string pathCopy(pathStr); + env->ReleaseStringUTFChars(path, pathStr); + + nsparse::Index* index = nsparse::read_index(pathCopy.data()); + return reinterpret_cast(index); + } catch (const std::exception& e) { + throw_java_exception(env, e.what()); + return 0; + } +} + +JNIEXPORT void JNICALL Java_org_opensearch_neuralsearch_sparse_jni_NsparseJni_deleteIndex( + JNIEnv* env, jclass, jlong indexPtr) { + try { + delete to_index(indexPtr); +#ifdef __linux__ + // Restore default mmap threshold for subsequent allocations (search path). + mallopt(M_MMAP_THRESHOLD, 128 * 1024); + mallopt(M_TRIM_THRESHOLD, 128 * 1024); + malloc_trim(0); +#endif + } catch (const std::exception& e) { + throw_java_exception(env, e.what()); + } +} + +JNIEXPORT jint JNICALL Java_org_opensearch_neuralsearch_sparse_jni_NsparseJni_numVectors( + JNIEnv* env, jclass, jlong indexPtr) { + try { + return static_cast(to_index(indexPtr)->num_vectors()); + } catch (const std::exception& e) { + throw_java_exception(env, e.what()); + return 0; + } +} + +JNIEXPORT jint JNICALL Java_org_opensearch_neuralsearch_sparse_jni_NsparseJni_getDimension( + JNIEnv* env, jclass, jlong indexPtr) { + try { + return static_cast(to_index(indexPtr)->get_dimension()); + } catch (const std::exception& e) { + throw_java_exception(env, e.what()); + return 0; + } +} + +JNIEXPORT void JNICALL Java_org_opensearch_neuralsearch_sparse_jni_NsparseJni_setNumThreads( + JNIEnv*, jclass, jint numThreads) { + omp_set_num_threads(static_cast(numThreads)); +} + +JNIEXPORT void JNICALL Java_org_opensearch_neuralsearch_sparse_jni_NsparseJni_evictPageCache( + JNIEnv* env, jclass, jstring dirPath, jstring suffix) { +#ifdef __linux__ + const char* dir = env->GetStringUTFChars(dirPath, nullptr); + const char* sfx = env->GetStringUTFChars(suffix, nullptr); + size_t sfx_len = strlen(sfx); + size_t dir_len = strlen(dir); + + FILE* maps = fopen("/proc/self/maps", "r"); + if (maps) { + char* line = nullptr; + size_t line_cap = 0; + ssize_t line_len; + while ((line_len = getline(&line, &line_cap, maps)) != -1) { + char* p = line; + int fields = 0; + while (*p && fields < 5) { + while (*p && *p != ' ') p++; + while (*p == ' ') p++; + fields++; + } + if (*p != '/') continue; + char* pathname = p; + size_t plen = strlen(pathname); + if (plen > 0 && pathname[plen - 1] == '\n') { + pathname[plen - 1] = '\0'; + plen--; + } + if (plen > dir_len + sfx_len && + strncmp(pathname, dir, dir_len) == 0 && + pathname[dir_len] == '/' && + strcmp(pathname + plen - sfx_len, sfx) == 0) { + unsigned long start = 0, end = 0; + if (sscanf(line, "%lx-%lx", &start, &end) == 2 && end > start) { + if (madvise(reinterpret_cast(start), end - start, MADV_DONTNEED) != 0) { + fprintf(stderr, "madvise MADV_DONTNEED failed: %s (addr=%lx, len=%lu)\n", + strerror(errno), start, end - start); + } + } + } + } + free(line); + fclose(maps); + } + + env->ReleaseStringUTFChars(dirPath, dir); + env->ReleaseStringUTFChars(suffix, sfx); +#endif +} + +} // extern "C" diff --git a/nsparse/cluster/inverted_list_clusters.cpp b/nsparse/cluster/inverted_list_clusters.cpp index 24ace98..f37265e 100644 --- a/nsparse/cluster/inverted_list_clusters.cpp +++ b/nsparse/cluster/inverted_list_clusters.cpp @@ -21,14 +21,6 @@ namespace nsparse { namespace { -/** - * @brief Generage summary sparse vector for posting lists - * - * @param vectors inverted index - * @param group_of_doc_ids a list of posting list - * @param alpha prune ratio - * @return SparseVectors - */ template SparseVectors summarize_(const SparseVectors* vectors, const std::vector& group_of_doc_ids, @@ -40,53 +32,90 @@ SparseVectors summarize_(const SparseVectors* vectors, return summarized_vectors; } const auto element_size = vectors->get_element_size(); - const auto& indptr_data = vectors->indptr_data(); - const auto& indices_data = vectors->indices_data(); - const auto& values_data = vectors->values_data(); + const auto* indptr_data = vectors->indptr_data(); + const auto* indices_data = vectors->indices_data(); + const auto* values_data = vectors->values_data(); + const size_t dimension = vectors->get_dimension(); + + // Dense buffer for max-pooling (reused across clusters) + std::vector dense_max; + if (dimension > 0) { + dense_max.resize(dimension, T{0}); + } + std::vector active_terms; + active_terms.reserve(1024); + for (size_t i = 0; i < offsets.size() - 1; ++i) { size_t n_docs = offsets[i + 1] - offsets[i]; - std::unordered_map summary_map; - float sum = 0.0F; auto doc_ids = std::span( group_of_doc_ids.data() + offsets[i], n_docs); - for (const auto& doc_id : doc_ids) { - int start = indptr_data[doc_id]; - int end = indptr_data[doc_id + 1]; - for (size_t j = start; j < end; ++j) { - const auto old = summary_map[indices_data[j]]; - auto& value = summary_map[indices_data[j]]; - // j is element index, need byte offset for T access - value = std::max(value, *reinterpret_cast( - values_data + j * sizeof(T))); - sum += value - old; + + std::vector sorted_docs(doc_ids.begin(), doc_ids.end()); + std::sort(sorted_docs.begin(), sorted_docs.end()); + + float sum = 0.0F; + active_terms.clear(); + std::vector> summary_vec; + + if (dimension > 0) { + for (const auto& doc_id : sorted_docs) { + offset_t start = indptr_data[doc_id]; + offset_t end = indptr_data[doc_id + 1]; + for (offset_t j = start; j < end; ++j) { + term_t term = indices_data[j]; + T val = *reinterpret_cast( + values_data + j * sizeof(T)); + T& cur = dense_max[term]; + if (cur == T{0}) { + active_terms.push_back(term); + } + if (val > cur) { + sum += static_cast(val) - + static_cast(cur); + cur = val; + } + } + } + summary_vec.reserve(active_terms.size()); + for (term_t term : active_terms) { + summary_vec.emplace_back(term, dense_max[term]); + dense_max[term] = T{0}; } + } else { + std::unordered_map summary_map; + for (const auto& doc_id : sorted_docs) { + offset_t start = indptr_data[doc_id]; + offset_t end = indptr_data[doc_id + 1]; + for (offset_t j = start; j < end; ++j) { + const auto old = summary_map[indices_data[j]]; + auto& value = summary_map[indices_data[j]]; + value = std::max(value, *reinterpret_cast( + values_data + j * sizeof(T))); + sum += value - old; + } + } + summary_vec.assign(summary_map.begin(), summary_map.end()); } - // Convert summary_map to vector of pairs - std::vector> summary_vec(summary_map.begin(), - summary_map.end()); - - // Sort by value in descending order + // Sort by value descending for alpha pruning std::ranges::sort(summary_vec, [](const auto& a, const auto& b) { return a.second > b.second; }); float addup = 0.0F; - for (int j = 0; j < summary_vec.size(); ++j) { - addup += summary_vec[j].second; + for (size_t j = 0; j < summary_vec.size(); ++j) { + addup += static_cast(summary_vec[j].second); if (addup / sum >= alpha) { - summary_vec.erase(summary_vec.begin() + j + 1, - summary_vec.end()); + summary_vec.resize(j + 1); break; } } - // Sort by term_t order + // Sort by term order for sparse output std::ranges::sort(summary_vec, [](const auto& a, const auto& b) { return a.first < b.first; }); - // Break into separate terms and values vectors std::vector terms; std::vector values; terms.reserve(summary_vec.size()); diff --git a/nsparse/cluster/inverted_list_clusters.h b/nsparse/cluster/inverted_list_clusters.h index f65ebd3..76fdcbb 100644 --- a/nsparse/cluster/inverted_list_clusters.h +++ b/nsparse/cluster/inverted_list_clusters.h @@ -43,6 +43,8 @@ class InvertedListClusters : public Serializable { return summaries_ == nullptr ? 0 : summaries_->num_vectors(); } + size_t total_docs() const { return docs_.size(); } + void serialize(IOWriter* writer) const override; void deserialize(IOReader* reader) override; diff --git a/nsparse/cluster/kmeans_utils.cpp b/nsparse/cluster/kmeans_utils.cpp index 7f8ebeb..06d7bf0 100644 --- a/nsparse/cluster/kmeans_utils.cpp +++ b/nsparse/cluster/kmeans_utils.cpp @@ -9,9 +9,11 @@ #include "nsparse/cluster/kmeans_utils.h" +#include #include #include #include +#include #include #include @@ -26,6 +28,146 @@ #endif namespace nsparse::detail { + +template +static void map_docs_to_clusters_sparse_invindex( + const SparseVectors* vectors, const std::vector& docs, + std::vector>& clusters) { + size_t n_clusters = clusters.size(); + size_t n_docs = docs.size(); + + const offset_t* indptr = vectors->indptr_data(); + const term_t* indices = vectors->indices_data(); + const auto* values = reinterpret_cast(vectors->values_data()); + size_t dimension = vectors->get_dimension(); + if (dimension == 0) { + term_t max_term = 0; + for (size_t c = 0; c < n_clusters; ++c) { + idx_t centroid_doc = clusters[c].at(0); + const offset_t start = indptr[centroid_doc]; + const offset_t end = indptr[centroid_doc + 1]; + for (offset_t j = start; j < end; ++j) { + max_term = std::max(max_term, indices[j]); + } + } + dimension = static_cast(max_term) + 1; + } + + // Build flattened inverted index: contiguous arrays with offset table + // First pass: count entries per term + std::vector term_counts(dimension, 0); + size_t total_entries = 0; + for (size_t c = 0; c < n_clusters; ++c) { + idx_t centroid_doc = clusters[c].at(0); + const offset_t start = indptr[centroid_doc]; + const offset_t end = indptr[centroid_doc + 1]; + for (offset_t j = start; j < end; ++j) { + term_counts[indices[j]]++; + total_entries++; + } + } + + // Build offset table (prefix sum) + std::vector offsets(dimension + 1); + offsets[0] = 0; + for (size_t i = 0; i < dimension; ++i) { + offsets[i + 1] = offsets[i] + term_counts[i]; + } + + // Second pass: fill flattened arrays + std::vector flat_cids(total_entries); + std::vector flat_vals(total_entries); + std::vector write_pos(dimension, 0); + for (size_t c = 0; c < n_clusters; ++c) { + idx_t centroid_doc = clusters[c].at(0); + const offset_t start = indptr[centroid_doc]; + const offset_t end = indptr[centroid_doc + 1]; + for (offset_t j = start; j < end; ++j) { + term_t term = indices[j]; + uint32_t pos = offsets[term] + write_pos[term]++; + flat_cids[pos] = static_cast(c); + float val; + if constexpr (std::is_same_v) { + val = values[j]; + } else { + val = static_cast(values[j]); + } + flat_vals[pos] = val; + } + } + + // Build sorted centroid list for binary search lookup + std::vector centroid_ids; + centroid_ids.reserve(n_clusters); + for (size_t c = 0; c < n_clusters; ++c) { + centroid_ids.push_back(clusters[c].at(0)); + } + std::sort(centroid_ids.begin(), centroid_ids.end()); + + // Sort doc IDs for cache-friendly CSR access + std::vector sorted_order(n_docs); + std::iota(sorted_order.begin(), sorted_order.end(), 0); + std::sort(sorted_order.begin(), sorted_order.end(), + [&docs](size_t a, size_t b) { return docs[a] < docs[b]; }); + + // Assign each doc to nearest centroid via sparse-sparse dot product + // Uses dirty-centroid tracking to avoid full zeroing and full argmax + std::vector scores(n_clusters, 0.0f); + std::vector dirty(n_clusters); + std::vector seen(n_clusters, false); + size_t n_dirty = 0; + + for (size_t si = 0; si < n_docs; ++si) { + size_t orig_idx = sorted_order[si]; + idx_t doc_id = docs[orig_idx]; + + if (std::binary_search(centroid_ids.begin(), centroid_ids.end(), + doc_id)) + continue; + + n_dirty = 0; + const offset_t start = indptr[doc_id]; + const offset_t end = indptr[doc_id + 1]; + for (offset_t j = start; j < end; ++j) { + term_t term = indices[j]; + float doc_val; + if constexpr (std::is_same_v) { + doc_val = values[j]; + } else { + doc_val = static_cast(values[j]); + } + const uint32_t inv_start = offsets[term]; + const uint32_t inv_end = offsets[term + 1]; + for (uint32_t k = inv_start; k < inv_end; ++k) { + uint16_t cid = flat_cids[k]; + if (!seen[cid]) { + seen[cid] = true; + dirty[n_dirty++] = cid; + } + scores[cid] += doc_val * flat_vals[k]; + } + } + + // Partial argmax over dirty centroids only + uint16_t best = dirty[0]; + float best_score = scores[best]; + for (size_t k = 1; k < n_dirty; ++k) { + uint16_t cid = dirty[k]; + if (scores[cid] > best_score) { + best_score = scores[cid]; + best = cid; + } + } + clusters[best].push_back(doc_id); + + // Reset only dirty entries + for (size_t k = 0; k < n_dirty; ++k) { + scores[dirty[k]] = 0.0f; + seen[dirty[k]] = false; + } + } +} + #if defined(__AVX512F__) template @@ -99,13 +241,13 @@ static void map_docs_to_clusters_avx512_impl( initialize_cluster_representatives(dense_centroids, center_dimension); dense_centroids = std::vector>(); - const idx_t* indptr = vectors->indptr_data(); + const offset_t* indptr = vectors->indptr_data(); const term_t* indices = vectors->indices_data(); const T* values = reinterpret_cast(vectors->values_data()); for (size_t i = 0; i < n_docs; ++i) { idx_t doc_id = docs[i]; - const idx_t start = indptr[doc_id]; + const offset_t start = indptr[doc_id]; const size_t len = indptr[doc_id + 1] - start; auto similarities = dot_product_sparse_matrix( @@ -170,42 +312,35 @@ void map_docs_to_clusters(const SparseVectors* vectors, if (n_clusters == 0 || n_docs == 0) { return; } + if (vectors->get_dimension() > 0) { + const auto element_size = vectors->get_element_size(); + if (element_size == U32) { + map_docs_to_clusters_sparse_invindex(vectors, docs, + clusters); + } else if (element_size == U16) { + map_docs_to_clusters_sparse_invindex(vectors, docs, + clusters); + } else { + map_docs_to_clusters_sparse_invindex(vectors, docs, + clusters); + } + } else { #if defined(__AVX512F__) - map_docs_to_clusters_avx512(vectors, docs, clusters); - return; + map_docs_to_clusters_avx512(vectors, docs, clusters); #else - - const idx_t* indptr = vectors->indptr_data(); - const term_t* indices = vectors->indices_data(); - const uint8_t* values = vectors->values_data(); - const auto element_size = vectors->get_element_size(); - for (size_t i = 0; i < n_docs; ++i) { - // get_dense_vector returns uint8_t buffer with element_size bytes per - // value - const auto& vec = vectors->get_dense_vector(docs[i]); - float max_similarity = std::numeric_limits::lowest(); - size_t best_cluster = 0; - bool is_centroid = false; - for (size_t j = 0; j < n_clusters; ++j) { - idx_t centroid_doc = clusters[j].at(0); - if (docs[i] == centroid_doc) { - is_centroid = true; - break; - } - const idx_t start = indptr[centroid_doc]; - const size_t len = indptr[centroid_doc + 1] - start; - float similarity = dot_product_typed_dense( - indices, values, vec.data(), start, len, element_size); - if (similarity > max_similarity) { - max_similarity = similarity; - best_cluster = j; - } - } - if (!is_centroid) { - clusters[best_cluster].push_back(docs[i]); + const auto element_size = vectors->get_element_size(); + if (element_size == U32) { + map_docs_to_clusters_sparse_invindex(vectors, docs, + clusters); + } else if (element_size == U16) { + map_docs_to_clusters_sparse_invindex(vectors, docs, + clusters); + } else { + map_docs_to_clusters_sparse_invindex(vectors, docs, + clusters); } - } #endif + } } } // namespace nsparse::detail \ No newline at end of file diff --git a/nsparse/id_map_index.cpp b/nsparse/id_map_index.cpp index e821423..38e8c56 100644 --- a/nsparse/id_map_index.cpp +++ b/nsparse/id_map_index.cpp @@ -12,6 +12,7 @@ #include #include "nsparse/id_selector.h" +#include "nsparse/io/file_io.h" #include "nsparse/io/index_io.h" #include "nsparse/io/io.h" #include "nsparse/utils/checks.h" @@ -24,8 +25,49 @@ void IDMapIndex::add(idx_t n, const idx_t* indptr, const term_t* indices, delegate_->add(n, indptr, indices, values); } +void IDMapIndex::reserve(size_t num_vectors, size_t total_nnz) { + delegate_->reserve(num_vectors, total_nnz); +} + void IDMapIndex::build() { delegate_->build(); } +void IDMapIndex::build_and_save(const char* path) { + FileIOWriter writer(const_cast(path)); + + // Write IDMapIndex header (fourcc + dimension) + auto id_val = fourcc(name); + writer.write(&id_val, sizeof(uint32_t), 1); + auto dim = delegate_->get_dimension(); + writer.write(&dim, sizeof(int), 1); + + // Write body via IOWriter overload + build_and_save(&writer); + + writer.close(); +} + +void IDMapIndex::build_and_save(IOWriter* writer) { + // Write delegate's header + body (streaming build) + auto delegate_id_val = fourcc(delegate_->id()); + writer->write(&delegate_id_val, sizeof(uint32_t), 1); + auto delegate_dim = delegate_->get_dimension(); + writer->write(&delegate_dim, sizeof(int), 1); + delegate_->build_and_save(writer); + + // Write internal_to_external_ map + size_t map_size = internal_to_external_.size(); + writer->write(&map_size, sizeof(size_t), 1); + if (map_size > 0) { + writer->write(internal_to_external_.data(), sizeof(idx_t), map_size); + } +} + +void IDMapIndex::release_build_memory() { + if (delegate_) { + delegate_->release_build_memory(); + } +} + void IDMapIndex::search(idx_t n, const idx_t* indptr, const term_t* indices, const float* values, int k, float* distances, idx_t* labels, SearchParameters* search_parameters) { diff --git a/nsparse/id_map_index.h b/nsparse/id_map_index.h index 4c437c9..341a93c 100644 --- a/nsparse/id_map_index.h +++ b/nsparse/id_map_index.h @@ -83,7 +83,11 @@ class IDMapIndex : public Index, public IndexIO { void add(idx_t n, const idx_t* indptr, const term_t* indices, const float* values) override; + void reserve(size_t num_vectors, size_t total_nnz) override; void build() override; + void build_and_save(const char* path) override; + void build_and_save(IOWriter* writer) override; + void release_build_memory() override; void search(idx_t n, const idx_t* indptr, const term_t* indices, const float* values, int k, float* distances, idx_t* labels, SearchParameters* search_parameters = nullptr) override; diff --git a/nsparse/index.cpp b/nsparse/index.cpp index 306129c..484d1a2 100644 --- a/nsparse/index.cpp +++ b/nsparse/index.cpp @@ -11,6 +11,7 @@ #include +#include "nsparse/io/index_io.h" #include "nsparse/types.h" #include "nsparse/utils/checks.h" @@ -20,6 +21,16 @@ Index::Index(int dim) : dimension_(dim) {} void Index::build() { throw_not_implemented(); } +void Index::build_and_save(const char* path) { + build(); + write_index(this, const_cast(path)); +} + +void Index::build_and_save(IOWriter* writer) { + build(); + nsparse::detail::write_index(this, writer, true); +} + void Index::search(idx_t n, const idx_t* indptr, const term_t* indices, const float* values, int k, float* distances, idx_t* labels, SearchParameters* search_parameters) { diff --git a/nsparse/index.h b/nsparse/index.h index dba9003..ace2a32 100644 --- a/nsparse/index.h +++ b/nsparse/index.h @@ -18,6 +18,8 @@ namespace nsparse { +class IOWriter; + struct SearchParameters { virtual ~SearchParameters() = default; const IDSelector* get_id_selector() const { return id_selector; } @@ -34,7 +36,11 @@ class Index { virtual std::array id() const = 0; virtual void add(idx_t n, const idx_t* indptr, const term_t* indices, const float* values) = 0; + virtual void reserve(size_t num_vectors, size_t total_nnz) {} virtual void build(); + virtual void build_and_save(const char* path); + virtual void build_and_save(IOWriter* writer); + virtual void release_build_memory() {} virtual void search( idx_t n, const idx_t* indptr, const term_t* indices, const float* values, int k, float* distances, idx_t* labels, diff --git a/nsparse/index_factory.cpp b/nsparse/index_factory.cpp index 790cf3b..3fec141 100644 --- a/nsparse/index_factory.cpp +++ b/nsparse/index_factory.cpp @@ -101,12 +101,12 @@ Index* index_factory(int dimension, const char* description) { } if (index_type == "seismic") { - int lambda = std::stoi(get_param("lambda", "10")); - int beta = std::stoi(get_param("beta", "5")); - float alpha = std::stof(get_param("alpha", "0.5")); - return new SeismicIndex(dimension, {.lambda = lambda = lambda, - .beta = beta = beta, - .alpha = alpha = alpha}); + int lambda = std::stoi(get_param("lambda", "-1")); + int beta = std::stoi(get_param("beta", "-1")); + float alpha = std::stof(get_param("alpha", "0.4")); + return new SeismicIndex(dimension, {.lambda = lambda, + .beta = beta, + .alpha = alpha}); } if (index_type == "seismic_sq") { @@ -117,13 +117,13 @@ Index* index_factory(int dimension, const char* description) { } float vmin = std::stof(get_param("vmin", "0.0")); float vmax = std::stof(get_param("vmax", "1.0")); - int lambda = std::stoi(get_param("lambda", "10")); - int beta = std::stoi(get_param("beta", "5")); - float alpha = std::stof(get_param("alpha", "0.5")); + int lambda = std::stoi(get_param("lambda", "-1")); + int beta = std::stoi(get_param("beta", "-1")); + float alpha = std::stof(get_param("alpha", "0.4")); return new SeismicScalarQuantizedIndex(quantizer_type, vmin, vmax, - {.lambda = lambda = lambda, - .beta = beta = beta, - .alpha = alpha = alpha}, + {.lambda = lambda, + .beta = beta, + .alpha = alpha}, dimension); } diff --git a/nsparse/inverted_index.cpp b/nsparse/inverted_index.cpp index 7813bca..a4335ab 100644 --- a/nsparse/inverted_index.cpp +++ b/nsparse/inverted_index.cpp @@ -262,7 +262,7 @@ auto InvertedIndex::search(idx_t n, const idx_t* indptr, const term_t* indices, #pragma omp parallel for for (idx_t query_idx = 0; query_idx < n; ++query_idx) { - const idx_t start = query_indptr[query_idx]; + const offset_t start = query_indptr[query_idx]; const size_t len = query_indptr[query_idx + 1] - start; auto [distances, labels] = diff --git a/nsparse/invlists/inverted_lists.cpp b/nsparse/invlists/inverted_lists.cpp index 4aa3562..32ccbca 100644 --- a/nsparse/invlists/inverted_lists.cpp +++ b/nsparse/invlists/inverted_lists.cpp @@ -162,15 +162,15 @@ std::unique_ptr ArrayInvertedLists::build_inverted_lists( std::make_unique(n_term, element_size); size_t n_docs = vectors->num_vectors(); - const auto& indptr_data = vectors->indptr_data(); - const auto& indices_data = vectors->indices_data(); - const auto& values_data = vectors->values_data(); + const auto* indptr_data = vectors->indptr_data(); + const auto* indices_data = vectors->indices_data(); + const auto* values_data = vectors->values_data(); // inverted_lists.add_entry is thread safe for (size_t i = 0; i < n_docs; ++i) { - int start = indptr_data[i]; - int n_tokens = indptr_data[i + 1] - indptr_data[i]; - for (size_t j = start; j < start + n_tokens; ++j) { + offset_t start = indptr_data[i]; + offset_t n_tokens = indptr_data[i + 1] - indptr_data[i]; + for (offset_t j = start; j < start + n_tokens; ++j) { term_t term_id = indices_data[j]; inverted_lists->add_entry(term_id, i, values_data + j * element_size); diff --git a/nsparse/python/swignsparse_avx2.swig b/nsparse/python/swignsparse_avx2.swig new file mode 100644 index 0000000..fdbf04f --- /dev/null +++ b/nsparse/python/swignsparse_avx2.swig @@ -0,0 +1,133 @@ +/** + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +%module swignsparse + +%{ +#define SWIG_FILE_WITH_INIT +#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION +#include +#include "nsparse/types.h" +#include "nsparse/io/io.h" +#include "nsparse/id_selector.h" +#include "nsparse/sparse_vectors.h" +#include "nsparse/invlists/inverted_lists.h" +#include "nsparse/index.h" +#include "nsparse/brutal_index.h" +#include "nsparse/seismic_common.h" +#include "nsparse/seismic_index.h" +#include "nsparse/seismic_scalar_quantized_index.h" +#include "nsparse/id_map_index.h" +#include "nsparse/utils/scalar_quantizer.h" +#include "nsparse/cluster/random_kmeans.h" +#include "nsparse/inverted_index.h" +#include "nsparse/index_factory.h" +#include "nsparse/io/index_io.h" +%} + +%init %{ +import_array(); +%} + +%include "nsparse_typemaps.i" +%include "factory.i" +%include "nsparse/io/io.h" +%include "nsparse/types.h" +%include "nsparse/sparse_vectors.h" +%include "nsparse/invlists/inverted_lists.h" +%include "nsparse/utils/scalar_quantizer.h" + +// IDSelector support - expose the class hierarchy for Python +// Multi-argument typemap: Python passes a single int32 array, n is inferred +%typemap(in) (size_t n, const nsparse::idx_t* indices) (Py_buffer view) { + if (PyObject_GetBuffer($input, &view, PyBUF_FORMAT | PyBUF_C_CONTIGUOUS) == -1) { + SWIG_fail; + } + if (strcmp(view.format, "i") != 0) { + PyBuffer_Release(&view); + PyErr_SetString(PyExc_TypeError, "Expected int32 array for indices"); + SWIG_fail; + } + $1 = (size_t)(view.shape[0]); + $2 = (nsparse::idx_t*)view.buf; +} +%typemap(freearg) (size_t n, const nsparse::idx_t* indices) { + PyBuffer_Release(&view$argnum); +} +%include "nsparse/id_selector.h" + +// IO classes needed for index serialization (must come before index.h) +// Ignore internal IO classes - not needed in Python API +%ignore nsparse::SeismicIndex::write_index; +%ignore nsparse::SeismicIndex::read_index; +%ignore nsparse::SeismicScalarQuantizedIndex::write_index; +%ignore nsparse::SeismicScalarQuantizedIndex::read_index; +%ignore nsparse::IDMapIndex::write_index; +%ignore nsparse::IDMapIndex::read_index; +%ignore nsparse::SparseVectors::serialize; +%ignore nsparse::SparseVectors::deserialize; +%ignore nsparse::InvertedListClusters::serialize; +%ignore nsparse::InvertedListClusters::deserialize; +%ignore nsparse::detail; + +// Ignore the protected std::vector-based methods +%ignore nsparse::Index::add(idx_t, std::vector&, std::vector&, std::vector&); +%ignore nsparse::Index::search(idx_t, std::vector&, std::vector&, std::vector&, int); + +// Ignore SeismicIndex's vector-based search methods, expose only pointer-based +%ignore nsparse::SeismicIndex::search(idx_t, std::vector&, std::vector&, std::vector&, int); +%ignore nsparse::SeismicIndex::search(idx_t, std::vector&, std::vector&, std::vector&, int, int, float); + +// Ignore SeismicScalarQuantizedIndex's vector-based search methods +%ignore nsparse::SeismicScalarQuantizedIndex::search(idx_t, std::vector&, std::vector&, std::vector&, int); + +// Ignore InvertedIndex's protected search method (returns pair_of_score_id_vectors_t) +%ignore nsparse::InvertedIndex::search; + +// Ignore the original search methods - we'll provide our own via %extend +%ignore nsparse::Index::search; + +// Extend Index to provide search methods +// Use separate names because both overloads have the same number of visible Python args +// (labels and distances have numinputs=0, so both would appear as 5-arg functions to Python) +%extend nsparse::Index { + // Version without SearchParameters + void search_c(idx_t n, const idx_t* indptr, const term_t* indices, + const float* values, int k, float* distances, idx_t* labels) { + $self->search(n, indptr, indices, values, k, distances, labels, nullptr); + } + + // Version with SearchParameters - different name to avoid overload ambiguity + void search_with_params(idx_t n, const idx_t* indptr, const term_t* indices, + const float* values, int k, float* distances, + idx_t* labels, + nsparse::SearchParameters* search_parameters) { + $self->search(n, indptr, indices, values, k, distances, labels, search_parameters); + } +} + +%include "nsparse/index.h" +%include "nsparse/brutal_index.h" +%include "nsparse/seismic_common.h" +%include "nsparse/seismic_index.h" +%include "nsparse/seismic_scalar_quantized_index.h" +%include "nsparse/inverted_index.h" +%include "nsparse/id_map_index.h" +%include "nsparse/cluster/random_kmeans.h" + +// index_factory returns a new object - Python takes ownership +// Use %factory to enable proper downcasting based on runtime type +%newobject nsparse::index_factory; +%factory(nsparse::Index* nsparse::index_factory, nsparse::BrutalIndex, nsparse::SeismicIndex, nsparse::SeismicScalarQuantizedIndex, nsparse::IDMapIndex, nsparse::InvertedIndex); +%include "nsparse/index_factory.h" + +// read_index returns a new object - Python takes ownership +%newobject nsparse::read_index; +%factory(nsparse::Index* nsparse::read_index, nsparse::BrutalIndex, nsparse::SeismicIndex, nsparse::SeismicScalarQuantizedIndex, nsparse::IDMapIndex, nsparse::InvertedIndex); +%include "nsparse/io/index_io.h" diff --git a/nsparse/python/swignsparse_avx512.swig b/nsparse/python/swignsparse_avx512.swig new file mode 100644 index 0000000..fdbf04f --- /dev/null +++ b/nsparse/python/swignsparse_avx512.swig @@ -0,0 +1,133 @@ +/** + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +%module swignsparse + +%{ +#define SWIG_FILE_WITH_INIT +#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION +#include +#include "nsparse/types.h" +#include "nsparse/io/io.h" +#include "nsparse/id_selector.h" +#include "nsparse/sparse_vectors.h" +#include "nsparse/invlists/inverted_lists.h" +#include "nsparse/index.h" +#include "nsparse/brutal_index.h" +#include "nsparse/seismic_common.h" +#include "nsparse/seismic_index.h" +#include "nsparse/seismic_scalar_quantized_index.h" +#include "nsparse/id_map_index.h" +#include "nsparse/utils/scalar_quantizer.h" +#include "nsparse/cluster/random_kmeans.h" +#include "nsparse/inverted_index.h" +#include "nsparse/index_factory.h" +#include "nsparse/io/index_io.h" +%} + +%init %{ +import_array(); +%} + +%include "nsparse_typemaps.i" +%include "factory.i" +%include "nsparse/io/io.h" +%include "nsparse/types.h" +%include "nsparse/sparse_vectors.h" +%include "nsparse/invlists/inverted_lists.h" +%include "nsparse/utils/scalar_quantizer.h" + +// IDSelector support - expose the class hierarchy for Python +// Multi-argument typemap: Python passes a single int32 array, n is inferred +%typemap(in) (size_t n, const nsparse::idx_t* indices) (Py_buffer view) { + if (PyObject_GetBuffer($input, &view, PyBUF_FORMAT | PyBUF_C_CONTIGUOUS) == -1) { + SWIG_fail; + } + if (strcmp(view.format, "i") != 0) { + PyBuffer_Release(&view); + PyErr_SetString(PyExc_TypeError, "Expected int32 array for indices"); + SWIG_fail; + } + $1 = (size_t)(view.shape[0]); + $2 = (nsparse::idx_t*)view.buf; +} +%typemap(freearg) (size_t n, const nsparse::idx_t* indices) { + PyBuffer_Release(&view$argnum); +} +%include "nsparse/id_selector.h" + +// IO classes needed for index serialization (must come before index.h) +// Ignore internal IO classes - not needed in Python API +%ignore nsparse::SeismicIndex::write_index; +%ignore nsparse::SeismicIndex::read_index; +%ignore nsparse::SeismicScalarQuantizedIndex::write_index; +%ignore nsparse::SeismicScalarQuantizedIndex::read_index; +%ignore nsparse::IDMapIndex::write_index; +%ignore nsparse::IDMapIndex::read_index; +%ignore nsparse::SparseVectors::serialize; +%ignore nsparse::SparseVectors::deserialize; +%ignore nsparse::InvertedListClusters::serialize; +%ignore nsparse::InvertedListClusters::deserialize; +%ignore nsparse::detail; + +// Ignore the protected std::vector-based methods +%ignore nsparse::Index::add(idx_t, std::vector&, std::vector&, std::vector&); +%ignore nsparse::Index::search(idx_t, std::vector&, std::vector&, std::vector&, int); + +// Ignore SeismicIndex's vector-based search methods, expose only pointer-based +%ignore nsparse::SeismicIndex::search(idx_t, std::vector&, std::vector&, std::vector&, int); +%ignore nsparse::SeismicIndex::search(idx_t, std::vector&, std::vector&, std::vector&, int, int, float); + +// Ignore SeismicScalarQuantizedIndex's vector-based search methods +%ignore nsparse::SeismicScalarQuantizedIndex::search(idx_t, std::vector&, std::vector&, std::vector&, int); + +// Ignore InvertedIndex's protected search method (returns pair_of_score_id_vectors_t) +%ignore nsparse::InvertedIndex::search; + +// Ignore the original search methods - we'll provide our own via %extend +%ignore nsparse::Index::search; + +// Extend Index to provide search methods +// Use separate names because both overloads have the same number of visible Python args +// (labels and distances have numinputs=0, so both would appear as 5-arg functions to Python) +%extend nsparse::Index { + // Version without SearchParameters + void search_c(idx_t n, const idx_t* indptr, const term_t* indices, + const float* values, int k, float* distances, idx_t* labels) { + $self->search(n, indptr, indices, values, k, distances, labels, nullptr); + } + + // Version with SearchParameters - different name to avoid overload ambiguity + void search_with_params(idx_t n, const idx_t* indptr, const term_t* indices, + const float* values, int k, float* distances, + idx_t* labels, + nsparse::SearchParameters* search_parameters) { + $self->search(n, indptr, indices, values, k, distances, labels, search_parameters); + } +} + +%include "nsparse/index.h" +%include "nsparse/brutal_index.h" +%include "nsparse/seismic_common.h" +%include "nsparse/seismic_index.h" +%include "nsparse/seismic_scalar_quantized_index.h" +%include "nsparse/inverted_index.h" +%include "nsparse/id_map_index.h" +%include "nsparse/cluster/random_kmeans.h" + +// index_factory returns a new object - Python takes ownership +// Use %factory to enable proper downcasting based on runtime type +%newobject nsparse::index_factory; +%factory(nsparse::Index* nsparse::index_factory, nsparse::BrutalIndex, nsparse::SeismicIndex, nsparse::SeismicScalarQuantizedIndex, nsparse::IDMapIndex, nsparse::InvertedIndex); +%include "nsparse/index_factory.h" + +// read_index returns a new object - Python takes ownership +%newobject nsparse::read_index; +%factory(nsparse::Index* nsparse::read_index, nsparse::BrutalIndex, nsparse::SeismicIndex, nsparse::SeismicScalarQuantizedIndex, nsparse::IDMapIndex, nsparse::InvertedIndex); +%include "nsparse/io/index_io.h" diff --git a/nsparse/python/swignsparse_sve.swig b/nsparse/python/swignsparse_sve.swig new file mode 100644 index 0000000..fdbf04f --- /dev/null +++ b/nsparse/python/swignsparse_sve.swig @@ -0,0 +1,133 @@ +/** + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +%module swignsparse + +%{ +#define SWIG_FILE_WITH_INIT +#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION +#include +#include "nsparse/types.h" +#include "nsparse/io/io.h" +#include "nsparse/id_selector.h" +#include "nsparse/sparse_vectors.h" +#include "nsparse/invlists/inverted_lists.h" +#include "nsparse/index.h" +#include "nsparse/brutal_index.h" +#include "nsparse/seismic_common.h" +#include "nsparse/seismic_index.h" +#include "nsparse/seismic_scalar_quantized_index.h" +#include "nsparse/id_map_index.h" +#include "nsparse/utils/scalar_quantizer.h" +#include "nsparse/cluster/random_kmeans.h" +#include "nsparse/inverted_index.h" +#include "nsparse/index_factory.h" +#include "nsparse/io/index_io.h" +%} + +%init %{ +import_array(); +%} + +%include "nsparse_typemaps.i" +%include "factory.i" +%include "nsparse/io/io.h" +%include "nsparse/types.h" +%include "nsparse/sparse_vectors.h" +%include "nsparse/invlists/inverted_lists.h" +%include "nsparse/utils/scalar_quantizer.h" + +// IDSelector support - expose the class hierarchy for Python +// Multi-argument typemap: Python passes a single int32 array, n is inferred +%typemap(in) (size_t n, const nsparse::idx_t* indices) (Py_buffer view) { + if (PyObject_GetBuffer($input, &view, PyBUF_FORMAT | PyBUF_C_CONTIGUOUS) == -1) { + SWIG_fail; + } + if (strcmp(view.format, "i") != 0) { + PyBuffer_Release(&view); + PyErr_SetString(PyExc_TypeError, "Expected int32 array for indices"); + SWIG_fail; + } + $1 = (size_t)(view.shape[0]); + $2 = (nsparse::idx_t*)view.buf; +} +%typemap(freearg) (size_t n, const nsparse::idx_t* indices) { + PyBuffer_Release(&view$argnum); +} +%include "nsparse/id_selector.h" + +// IO classes needed for index serialization (must come before index.h) +// Ignore internal IO classes - not needed in Python API +%ignore nsparse::SeismicIndex::write_index; +%ignore nsparse::SeismicIndex::read_index; +%ignore nsparse::SeismicScalarQuantizedIndex::write_index; +%ignore nsparse::SeismicScalarQuantizedIndex::read_index; +%ignore nsparse::IDMapIndex::write_index; +%ignore nsparse::IDMapIndex::read_index; +%ignore nsparse::SparseVectors::serialize; +%ignore nsparse::SparseVectors::deserialize; +%ignore nsparse::InvertedListClusters::serialize; +%ignore nsparse::InvertedListClusters::deserialize; +%ignore nsparse::detail; + +// Ignore the protected std::vector-based methods +%ignore nsparse::Index::add(idx_t, std::vector&, std::vector&, std::vector&); +%ignore nsparse::Index::search(idx_t, std::vector&, std::vector&, std::vector&, int); + +// Ignore SeismicIndex's vector-based search methods, expose only pointer-based +%ignore nsparse::SeismicIndex::search(idx_t, std::vector&, std::vector&, std::vector&, int); +%ignore nsparse::SeismicIndex::search(idx_t, std::vector&, std::vector&, std::vector&, int, int, float); + +// Ignore SeismicScalarQuantizedIndex's vector-based search methods +%ignore nsparse::SeismicScalarQuantizedIndex::search(idx_t, std::vector&, std::vector&, std::vector&, int); + +// Ignore InvertedIndex's protected search method (returns pair_of_score_id_vectors_t) +%ignore nsparse::InvertedIndex::search; + +// Ignore the original search methods - we'll provide our own via %extend +%ignore nsparse::Index::search; + +// Extend Index to provide search methods +// Use separate names because both overloads have the same number of visible Python args +// (labels and distances have numinputs=0, so both would appear as 5-arg functions to Python) +%extend nsparse::Index { + // Version without SearchParameters + void search_c(idx_t n, const idx_t* indptr, const term_t* indices, + const float* values, int k, float* distances, idx_t* labels) { + $self->search(n, indptr, indices, values, k, distances, labels, nullptr); + } + + // Version with SearchParameters - different name to avoid overload ambiguity + void search_with_params(idx_t n, const idx_t* indptr, const term_t* indices, + const float* values, int k, float* distances, + idx_t* labels, + nsparse::SearchParameters* search_parameters) { + $self->search(n, indptr, indices, values, k, distances, labels, search_parameters); + } +} + +%include "nsparse/index.h" +%include "nsparse/brutal_index.h" +%include "nsparse/seismic_common.h" +%include "nsparse/seismic_index.h" +%include "nsparse/seismic_scalar_quantized_index.h" +%include "nsparse/inverted_index.h" +%include "nsparse/id_map_index.h" +%include "nsparse/cluster/random_kmeans.h" + +// index_factory returns a new object - Python takes ownership +// Use %factory to enable proper downcasting based on runtime type +%newobject nsparse::index_factory; +%factory(nsparse::Index* nsparse::index_factory, nsparse::BrutalIndex, nsparse::SeismicIndex, nsparse::SeismicScalarQuantizedIndex, nsparse::IDMapIndex, nsparse::InvertedIndex); +%include "nsparse/index_factory.h" + +// read_index returns a new object - Python takes ownership +%newobject nsparse::read_index; +%factory(nsparse::Index* nsparse::read_index, nsparse::BrutalIndex, nsparse::SeismicIndex, nsparse::SeismicScalarQuantizedIndex, nsparse::IDMapIndex, nsparse::InvertedIndex); +%include "nsparse/io/index_io.h" diff --git a/nsparse/seismic_common.h b/nsparse/seismic_common.h index b73e09b..f7fd1df 100644 --- a/nsparse/seismic_common.h +++ b/nsparse/seismic_common.h @@ -10,10 +10,14 @@ #ifndef SEISMIC_COMMON_H #define SEISMIC_COMMON_H +#include +#include #include #include #include +#include + #include "nsparse/cluster/inverted_list_clusters.h" #include "nsparse/cluster/random_kmeans.h" #include "nsparse/id_selector.h" @@ -58,10 +62,10 @@ inline std::vector calculate_summary_scores( return summary_scores; } -inline float compute_similarity(idx_t doc_id, const idx_t* indptr, +inline float compute_similarity(idx_t doc_id, const offset_t* indptr, const term_t* indices, const uint8_t* values, const uint8_t* dense, size_t element_size) { - const idx_t start = indptr[doc_id]; + const offset_t start = indptr[doc_id]; const size_t len = indptr[doc_id + 1] - start; float score = 0.0F; if (element_size == U32) { @@ -129,30 +133,236 @@ inline int calculate_beta(int beta, int lambda) { inline std::vector build_inverted_lists_clusters( const SparseVectors* vectors, const SparseVectorsConfig& config, const SeismicClusterParameters& seismic_cluster_params) { - // build inverted index - std::unique_ptr inverted_lists = - ArrayInvertedLists::build_inverted_lists(config.dimension, - config.element_size, vectors); int lambda = calculate_lambda(seismic_cluster_params.lambda, vectors->num_vectors()); int beta = calculate_beta(seismic_cluster_params.beta, lambda); - size_t inverted_lists_size = inverted_lists->size(); - std::vector clustered_inverted_lists( - inverted_lists_size); -#pragma omp parallel for schedule(dynamic, 64) - for (int64_t idx = 0; idx < static_cast(inverted_lists_size); - ++idx) { - auto& invlist = (*inverted_lists)[idx]; - const auto& doc_ids = invlist.prune_and_keep_doc_ids(lambda); - InvertedListClusters inverted_list_clusters( - detail::RandomKMeans::train(vectors, doc_ids, beta)); - inverted_list_clusters.summarize(vectors, seismic_cluster_params.alpha); - clustered_inverted_lists[idx] = std::move(inverted_list_clusters); - invlist.clear(); + size_t dimension = config.dimension; + size_t element_size = config.element_size; + std::vector clustered_inverted_lists(dimension); + + const size_t n_docs = vectors->num_vectors(); + const auto* indptr_data = vectors->indptr_data(); + const auto* indices_data = vectors->indices_data(); + const auto* values_data = vectors->values_data(); + + // Estimate inverted list memory: NNZ * (sizeof(idx_t) + element_size) + const size_t total_nnz = indptr_data[n_docs] - indptr_data[0]; + const size_t invlist_bytes_full = + total_nnz * (sizeof(idx_t) + element_size); + + // Choose batch count to fit inverted lists within available memory. + // Reserve 4 GB for clustering overhead and OS. + constexpr size_t kReserve = 4ULL * 1024 * 1024 * 1024; + size_t mem_budget = 0; +#ifdef __linux__ + // Use MemAvailable from /proc/meminfo + FILE* meminfo = fopen("/proc/meminfo", "r"); + if (meminfo) { + char line[256]; + while (fgets(line, sizeof(line), meminfo)) { + size_t kb = 0; + if (sscanf(line, "MemAvailable: %zu kB", &kb) == 1) { + mem_budget = kb * 1024; + break; + } + } + fclose(meminfo); } +#endif + if (mem_budget > kReserve) { + mem_budget -= kReserve; + } else { + mem_budget = 16ULL * 1024 * 1024 * 1024; // fallback: 16 GB + } + + size_t n_batches = (invlist_bytes_full + mem_budget - 1) / mem_budget; + if (n_batches < 1) n_batches = 1; + size_t batch_size = (dimension + n_batches - 1) / n_batches; + + fprintf(stderr, "[nsparse] build_inverted_lists: n_docs=%zu, dimension=%zu, " + "element_size=%zu, lambda=%d, beta=%d, n_batches=%zu, batch_size=%zu\n", + n_docs, dimension, element_size, lambda, beta, n_batches, batch_size); + + for (size_t batch_start = 0; batch_start < dimension; + batch_start += batch_size) { + size_t batch_end = std::min(batch_start + batch_size, dimension); + size_t this_batch = batch_end - batch_start; + + // Parallel CSR → inverted list construction + // Uses existing per-list spinlock in InvertedList for thread safety. + // With 30K+ lists and 32 threads, lock contention is negligible. + auto batch_invlists = + std::make_unique(this_batch, element_size); +#pragma omp parallel for schedule(static) + for (int64_t i = 0; i < static_cast(n_docs); ++i) { + offset_t start = indptr_data[i]; + offset_t end = indptr_data[i + 1]; + for (offset_t j = start; j < end; ++j) { + size_t term_id = indices_data[j]; + if (term_id >= batch_start && term_id < batch_end) { + batch_invlists->add_entry( + static_cast(term_id - batch_start), + static_cast(i), + values_data + j * element_size); + } + } + } + + // Count non-empty lists for diagnostics + size_t non_empty = 0; + size_t total_entries = 0; + for (size_t i = 0; i < this_batch; ++i) { + size_t sz = (*batch_invlists)[i].size(); + if (sz > 0) { + non_empty++; + total_entries += sz; + } + } + fprintf(stderr, "[nsparse] batch [%zu, %zu): %zu/%zu non-empty lists, " + "%zu total entries\n", + batch_start, batch_end, non_empty, this_batch, total_entries); + + // Prune and cluster in parallel + std::atomic clustered_count{0}; +#pragma omp parallel for schedule(dynamic, 1) + for (int64_t i = 0; i < static_cast(this_batch); ++i) { + auto& invlist = (*batch_invlists)[i]; + if (invlist.size() == 0) { + continue; + } + const auto& doc_ids = invlist.prune_and_keep_doc_ids(lambda); + InvertedListClusters inverted_list_clusters( + detail::RandomKMeans::train(vectors, doc_ids, beta)); + inverted_list_clusters.summarize(vectors, + seismic_cluster_params.alpha); + clustered_inverted_lists[batch_start + i] = + std::move(inverted_list_clusters); + invlist.clear(); + clustered_count.fetch_add(1, std::memory_order_relaxed); + } + fprintf(stderr, "[nsparse] batch done: %zu lists clustered\n", + clustered_count.load()); + } + return clustered_inverted_lists; } +// Streaming variant: builds inverted list clusters batch-by-batch and serializes +// each batch's clusters immediately to the IOWriter, freeing memory after each +// batch. This avoids accumulating all 65K InvertedListClusters in RAM. +inline void build_and_save_inverted_lists_clusters( + const SparseVectors* vectors, const SparseVectorsConfig& config, + const SeismicClusterParameters& seismic_cluster_params, + IOWriter* io_writer) { + int lambda = + calculate_lambda(seismic_cluster_params.lambda, vectors->num_vectors()); + int beta = calculate_beta(seismic_cluster_params.beta, lambda); + size_t dimension = config.dimension; + size_t element_size = config.element_size; + + const size_t n_docs = vectors->num_vectors(); + const auto* indptr_data = vectors->indptr_data(); + const auto* indices_data = vectors->indices_data(); + const auto* values_data = vectors->values_data(); + + const size_t total_nnz = indptr_data[n_docs] - indptr_data[0]; + const size_t invlist_bytes_full = + total_nnz * (sizeof(idx_t) + element_size); + + // Cap per-batch inverted list memory to keep peak RSS under physical RAM. + // The CSR (~46 GB for float32 at 46M docs) remains throughout; each batch + // adds inverted lists + k-means temporaries (~2x the raw invlist allocation). + constexpr size_t kMaxBatchBytes = 8ULL * 1024 * 1024 * 1024; // 8 GB + size_t mem_budget = std::min(kMaxBatchBytes, invlist_bytes_full / 3); + if (mem_budget == 0) mem_budget = invlist_bytes_full; + + size_t n_batches = (invlist_bytes_full + mem_budget - 1) / mem_budget; + if (n_batches < 1) n_batches = 1; + size_t batch_size = (dimension + n_batches - 1) / n_batches; + + fprintf(stderr, + "[nsparse] build_and_save_inverted_lists: n_docs=%zu, " + "dimension=%zu, element_size=%zu, lambda=%d, beta=%d, " + "n_batches=%zu, batch_size=%zu\n", + n_docs, dimension, element_size, lambda, beta, n_batches, + batch_size); + + // Write the total dimension count (number of InvertedListClusters) + io_writer->write(&dimension, sizeof(dimension), 1); + + for (size_t batch_start = 0; batch_start < dimension; + batch_start += batch_size) { + size_t batch_end = std::min(batch_start + batch_size, dimension); + size_t this_batch = batch_end - batch_start; + + auto batch_invlists = + std::make_unique(this_batch, element_size); +#pragma omp parallel for schedule(static) + for (int64_t i = 0; i < static_cast(n_docs); ++i) { + offset_t start = indptr_data[i]; + offset_t end = indptr_data[i + 1]; + for (offset_t j = start; j < end; ++j) { + size_t term_id = indices_data[j]; + if (term_id >= batch_start && term_id < batch_end) { + batch_invlists->add_entry( + static_cast(term_id - batch_start), + static_cast(i), + values_data + j * element_size); + } + } + } + + size_t non_empty = 0; + size_t total_entries = 0; + for (size_t i = 0; i < this_batch; ++i) { + size_t sz = (*batch_invlists)[i].size(); + if (sz > 0) { + non_empty++; + total_entries += sz; + } + } + fprintf(stderr, + "[nsparse] batch [%zu, %zu): %zu/%zu non-empty lists, " + "%zu total entries\n", + batch_start, batch_end, non_empty, this_batch, total_entries); + + // Prune, cluster, serialize, and immediately free each list + std::vector batch_clusters(this_batch); + std::atomic clustered_count{0}; +#pragma omp parallel for schedule(dynamic, 1) + for (int64_t i = 0; i < static_cast(this_batch); ++i) { + auto& invlist = (*batch_invlists)[i]; + if (invlist.size() == 0) { + continue; + } + const auto& doc_ids = invlist.prune_and_keep_doc_ids(lambda); + InvertedListClusters inverted_list_clusters( + detail::RandomKMeans::train(vectors, doc_ids, beta)); + inverted_list_clusters.summarize(vectors, + seismic_cluster_params.alpha); + batch_clusters[i] = std::move(inverted_list_clusters); + invlist.clear(); + clustered_count.fetch_add(1, std::memory_order_relaxed); + } + fprintf(stderr, "[nsparse] batch done: %zu lists clustered\n", + clustered_count.load()); + + // Free inverted lists immediately + batch_invlists.reset(); + + // Serialize this batch's clusters to disk and free + for (size_t i = 0; i < this_batch; ++i) { + batch_clusters[i].serialize(io_writer); + } + batch_clusters.clear(); + batch_clusters.shrink_to_fit(); + + fprintf(stderr, + "[nsparse] batch [%zu, %zu): serialized and freed\n", + batch_start, batch_end); + } +} + } // namespace detail } // namespace nsparse diff --git a/nsparse/seismic_index.cpp b/nsparse/seismic_index.cpp index 283c899..8d9e419 100644 --- a/nsparse/seismic_index.cpp +++ b/nsparse/seismic_index.cpp @@ -23,6 +23,8 @@ #include "nsparse/id_selector.h" #include "nsparse/index.h" #include "nsparse/invlists/inverted_lists.h" +#include "nsparse/io/file_io.h" +#include "nsparse/io/index_io.h" #include "nsparse/io/seismic_invlists_writer.h" #include "nsparse/seismic_common.h" #include "nsparse/sparse_vectors.h" @@ -66,10 +68,8 @@ void query_single_inverted_list(const SparseVectors* vectors, for (const size_t& cluster_id : cluster_order) { const auto& cluster_score = summary_scores[cluster_id]; - if (heap.full() && (cluster_score * heap_factor < heap.peek_score())) { - if (first_list) { - break; - } + if (heap.full() && + (cluster_score * heap_factor < heap.peek_score())) { continue; } const auto& docs = cluster_invlist.get_docs(cluster_id); @@ -83,7 +83,7 @@ void query_single_inverted_list(const SparseVectors* vectors, } if (i + kPrefetchDist1 < n_docs) { const idx_t next_doc = docs[i + kPrefetchDist1]; - const idx_t next_start = indptr[next_doc]; + const offset_t next_start = indptr[next_doc]; const size_t next_len = indptr[next_doc + 1] - next_start; detail::prefetch_vector(indices + next_start, values + next_start, next_len); @@ -95,7 +95,7 @@ void query_single_inverted_list(const SparseVectors* vectors, if (id_selector != nullptr && !id_selector->is_member(doc_id)) { continue; } - const idx_t start = indptr[doc_id]; + const offset_t start = indptr[doc_id]; const size_t len = indptr[doc_id + 1] - start; auto score = detail::dot_product_float_dense( indices + start, values + start, len, dense.data()); @@ -110,6 +110,15 @@ SeismicIndex::SeismicIndex(int dim) SeismicIndex::SeismicIndex(int dim, SeismicClusterParameters parameter) : Index(dim), cluster_parameter_(parameter) {} +void SeismicIndex::reserve(size_t num_vectors, size_t total_nnz) { + if (vectors_ == nullptr) { + vectors_ = std::unique_ptr( + new SparseVectors({.element_size = kElementSize, + .dimension = static_cast(dimension_)})); + } + vectors_->reserve(num_vectors, total_nnz); +} + void SeismicIndex::add(idx_t n, const idx_t* indptr, const term_t* indices, const float* values) { throw_if_not_positive(n); @@ -135,6 +144,42 @@ void SeismicIndex::build() { cluster_parameter_)); } +void SeismicIndex::build_and_save(const char* path) { + FileIOWriter writer(const_cast(path)); + + // Write index header (fourcc + dimension) + auto id_val = fourcc(name); + writer.write(&id_val, sizeof(uint32_t), 1); + writer.write(&dimension_, sizeof(int), 1); + + // Write body via IOWriter overload + build_and_save(&writer); + + writer.close(); +} + +void SeismicIndex::build_and_save(IOWriter* writer) { + // Write vectors + if (vectors_ == nullptr) { + empty_sparse_vectors.serialize(writer); + } else { + vectors_->serialize(writer); + } + + // Stream clusters: build batch-by-batch, serialize each batch immediately + detail::build_and_save_inverted_lists_clusters( + vectors_.get(), + {.element_size = kElementSize, + .dimension = static_cast(get_dimension())}, + cluster_parameter_, writer); +} + +void SeismicIndex::release_build_memory() { + vectors_.reset(); + clustered_inverted_lists.clear(); + clustered_inverted_lists.shrink_to_fit(); +} + auto SeismicIndex::search(idx_t n, const idx_t* indptr, const term_t* indices, const float* values, int k, SearchParameters* search_parameters) @@ -179,7 +224,7 @@ auto SeismicIndex::search(idx_t n, const idx_t* indptr, const term_t* indices, #pragma omp parallel for for (idx_t query_idx = 0; query_idx < n; ++query_idx) { const auto& dense = query_vectors.get_dense_vector_float(query_idx); - const idx_t start = query_indptr[query_idx]; + const offset_t start = query_indptr[query_idx]; const size_t len = query_indptr[query_idx + 1] - start; const auto& cuts = detail::top_k_tokens( query_indices + start, query_values + start, len, parameters->cut); @@ -213,7 +258,7 @@ auto SeismicIndex::single_query(const std::vector& dense, absl::flat_hash_set visited; visited.reserve(cuts.size() * 5000); detail::TopKHolder holder(k); - bool first_list = true; + bool first_list = false; for (const auto& term : cuts) { if (term >= clustered_inverted_lists.size()) [[unlikely]] { continue; diff --git a/nsparse/seismic_index.h b/nsparse/seismic_index.h index 1050380..6aa7bc7 100644 --- a/nsparse/seismic_index.h +++ b/nsparse/seismic_index.h @@ -45,7 +45,11 @@ class SeismicIndex : public Index, public IndexIO { SeismicIndex& operator=(const SeismicIndex&) = delete; const SparseVectors* get_vectors() const override; + void reserve(size_t num_vectors, size_t total_nnz) override; void build() override; + void build_and_save(const char* path) override; + void build_and_save(IOWriter* writer) override; + void release_build_memory() override; void add(idx_t n, const idx_t* indptr, const term_t* indices, const float* values) override; diff --git a/nsparse/seismic_scalar_quantized_index.cpp b/nsparse/seismic_scalar_quantized_index.cpp index 41b1111..c966afb 100644 --- a/nsparse/seismic_scalar_quantized_index.cpp +++ b/nsparse/seismic_scalar_quantized_index.cpp @@ -23,6 +23,8 @@ #include "nsparse/id_selector.h" #include "nsparse/index.h" #include "nsparse/invlists/inverted_lists.h" +#include "nsparse/io/file_io.h" +#include "nsparse/io/index_io.h" #include "nsparse/io/io.h" #include "nsparse/io/seismic_invlists_writer.h" #include "nsparse/seismic_common.h" @@ -68,7 +70,8 @@ void query_single_inverted_list(const SparseVectors* vectors, for (const size_t& cluster_id : cluster_order) { const auto& cluster_score = summary_scores[cluster_id]; - if (heap.full() && (cluster_score * heap_factor < heap.peek_score())) { + if (heap.full() && + (cluster_score * heap_factor < heap.peek_score())) { if (first_list) { break; } @@ -76,25 +79,16 @@ void query_single_inverted_list(const SparseVectors* vectors, } const auto& docs = cluster_invlist.get_docs(cluster_id); const size_t n_docs = docs.size(); - // Two-stage prefetch pipeline: - // Stage 1 (distance 2): prefetch indptr[docs[i+2]] so the indptr - // lookup is cached by the time we need it next iteration. - // Stage 2 (distance 1): read indptr[docs[i+1]] (now cached from - // stage 1 issued last iteration), prefetch the actual vector data. static constexpr size_t kPrefetchDist1 = 2; // vector data prefetch static constexpr size_t kPrefetchDist2 = 4; // indptr prefetch for (size_t i = 0; i < n_docs; ++i) { const auto& doc_id = docs[i]; - // Stage 1: prefetch indptr entry for doc at distance 2 if (i + kPrefetchDist2 < n_docs) { detail::prefetch_indptr(indptr, docs[i + kPrefetchDist2]); } - // Stage 2: prefetch vector data for next doc (indptr should - // already be cached from stage 1 issued kPrefetchDist2 - - // kPrefetchDist1 iterations ago) if (i + kPrefetchDist1 < n_docs) { const idx_t next_doc = docs[i + kPrefetchDist1]; - const idx_t next_start = indptr[next_doc]; + const offset_t next_start = indptr[next_doc]; const size_t next_len = indptr[next_doc + 1] - next_start; detail::prefetch_vector(indices + next_start, values + next_start, next_len); @@ -124,6 +118,17 @@ SeismicScalarQuantizedIndex::SeismicScalarQuantizedIndex( sq_(quantizer_type, vmin, vmax), cluster_parameter_(parameter) {} +void SeismicScalarQuantizedIndex::reserve(size_t num_vectors, + size_t total_nnz) { + const size_t element_size = sq_.bytes_per_value(); + if (vectors_ == nullptr) { + vectors_ = std::unique_ptr( + new SparseVectors({.element_size = element_size, + .dimension = static_cast(dimension_)})); + } + vectors_->reserve(num_vectors, total_nnz); +} + void SeismicScalarQuantizedIndex::add(idx_t n, const idx_t* indptr, const term_t* indices, const float* values) { @@ -175,6 +180,45 @@ void SeismicScalarQuantizedIndex::build() { cluster_parameter_)); } +void SeismicScalarQuantizedIndex::build_and_save(const char* path) { + FileIOWriter writer(const_cast(path)); + + // Write index header (fourcc + dimension) + auto id_val = fourcc(name); + writer.write(&id_val, sizeof(uint32_t), 1); + writer.write(&dimension_, sizeof(int), 1); + + // Write body via IOWriter overload + build_and_save(&writer); + + writer.close(); +} + +void SeismicScalarQuantizedIndex::build_and_save(IOWriter* writer) { + // Write SQ header + write_header(writer); + + // Write vectors + if (vectors_ == nullptr) { + empty_sparse_vectors.serialize(writer); + } else { + vectors_->serialize(writer); + } + + // Stream clusters: build batch-by-batch, serialize each batch immediately + detail::build_and_save_inverted_lists_clusters( + vectors_.get(), + {.element_size = sq_.bytes_per_value(), + .dimension = static_cast(get_dimension())}, + cluster_parameter_, writer); +} + +void SeismicScalarQuantizedIndex::release_build_memory() { + vectors_.reset(); + clustered_inverted_lists.clear(); + clustered_inverted_lists.shrink_to_fit(); +} + auto SeismicScalarQuantizedIndex::search(idx_t n, const idx_t* indptr, const term_t* indices, const float* values, int k, @@ -235,7 +279,7 @@ auto SeismicScalarQuantizedIndex::search(idx_t n, const idx_t* indptr, #pragma omp parallel for for (idx_t query_idx = 0; query_idx < n; ++query_idx) { const auto& dense = query_vectors.get_dense_vector(query_idx); - const idx_t start = query_indptr[query_idx]; + const offset_t start = query_indptr[query_idx]; const size_t len = query_indptr[query_idx + 1] - start; std::vector cuts; if (element_size == U16) { diff --git a/nsparse/seismic_scalar_quantized_index.h b/nsparse/seismic_scalar_quantized_index.h index 023f99c..46f5800 100644 --- a/nsparse/seismic_scalar_quantized_index.h +++ b/nsparse/seismic_scalar_quantized_index.h @@ -47,7 +47,11 @@ class SeismicScalarQuantizedIndex : public Index, public IndexIO { std::array id() const override { return name; } void add(idx_t n, const idx_t* indptr, const term_t* indices, const float* values) override; + void reserve(size_t num_vectors, size_t total_nnz) override; void build() override; + void build_and_save(const char* path) override; + void build_and_save(IOWriter* writer) override; + void release_build_memory() override; const SparseVectors* get_vectors() const override { return vectors_.get(); } const ScalarQuantizer& get_scalar_quantizer() const { return sq_; } diff --git a/nsparse/sparse_vectors.cpp b/nsparse/sparse_vectors.cpp index fe232f0..d586ae1 100644 --- a/nsparse/sparse_vectors.cpp +++ b/nsparse/sparse_vectors.cpp @@ -21,6 +21,12 @@ SparseVectors::SparseVectors(SparseVectorsConfig config) : config_(config) { throw_if_not_positive(config_.dimension); } +void SparseVectors::reserve(size_t num_vectors, size_t total_nnz) { + indptr_.reserve(num_vectors + 1); + indices_.reserve(total_nnz); + values_.reserve(total_nnz * config_.element_size); +} + void SparseVectors::add_vectors(const std::vector& indptr, const std::vector& indices, const std::vector& weights) { @@ -48,9 +54,9 @@ void SparseVectors::add_vectors(const idx_t* indptr, size_t indptr_size, // Append weights directly (already in uint8_t format) this->values_.insert(this->values_.end(), weights, weights + weights_size); - idx_t offset = this->indptr_.back(); + offset_t offset = this->indptr_.back(); for (size_t i = 1; i < indptr_size; ++i) { - this->indptr_.push_back(indptr[i] + offset); + this->indptr_.push_back(static_cast(indptr[i]) + offset); } } @@ -61,10 +67,8 @@ void SparseVectors::add_vector(const std::vector& indices, void SparseVectors::add_vector(const term_t* indices, size_t indices_size, const uint8_t* weights, size_t weights_size) { - // Get the current offset (where the new vector starts) - idx_t offset = this->indptr_.empty() ? 0 : this->indptr_.back(); + offset_t offset = this->indptr_.empty() ? 0 : this->indptr_.back(); - // If this is the first vector, initialize indptr with 0 if (this->indptr_.empty()) { this->indptr_.push_back(0); } @@ -72,7 +76,7 @@ void SparseVectors::add_vector(const term_t* indices, size_t indices_size, this->indices_.insert(this->indices_.end(), indices, indices + indices_size); this->values_.insert(this->values_.end(), weights, weights + weights_size); - this->indptr_.push_back(offset + static_cast(indices_size)); + this->indptr_.push_back(offset + static_cast(indices_size)); } std::vector SparseVectors::get_dense_vector_float( @@ -81,12 +85,12 @@ std::vector SparseVectors::get_dense_vector_float( throw std::out_of_range("Vector index out of range"); } - idx_t start = indptr_[vector_idx]; - idx_t end = indptr_[vector_idx + 1]; + offset_t start = indptr_[vector_idx]; + offset_t end = indptr_[vector_idx + 1]; std::vector dense_vector( config_.dimension > 0 ? config_.dimension : indices_[end - 1] + 1, 0.0F); - for (idx_t i = start; i < end; ++i) { + for (offset_t i = start; i < end; ++i) { const uint8_t* value_ptr = values_.data() + (i * config_.element_size); if (config_.element_size == U32) { dense_vector[indices_[i]] = @@ -105,13 +109,12 @@ std::vector SparseVectors::get_dense_vector(idx_t vector_idx) const { if (vector_idx < 0 || vector_idx > static_cast(indptr_.size()) - 2) { throw std::out_of_range("Vector index out of range"); } - idx_t start = indptr_[vector_idx]; - idx_t end = indptr_[vector_idx + 1]; - size_t size = end - start; + offset_t start = indptr_[vector_idx]; + offset_t end = indptr_[vector_idx + 1]; std::vector dense_vector(config_.dimension * config_.element_size, - 0.0F); - for (idx_t i = start; i < end; ++i) { - for (idx_t j = 0; j < config_.element_size; ++j) { + 0); + for (offset_t i = start; i < end; ++i) { + for (size_t j = 0; j < config_.element_size; ++j) { dense_vector[indices_[i] * config_.element_size + j] = values_[i * config_.element_size + j]; } @@ -133,8 +136,8 @@ void SparseVectors::serialize(IOWriter* io_writer) const { auto element_size = get_element_size(); io_writer->write(&element_size, sizeof(size_t), 1); size_t indptr_size = vector_count + 1; - io_writer->write(const_cast(indptr_.data()), sizeof(idx_t), - indptr_size); + io_writer->write(const_cast(indptr_.data()), + sizeof(offset_t), indptr_size); size_t indices_size = indptr_[vector_count]; io_writer->write(const_cast(indices_.data()), sizeof(term_t), indices_size); @@ -156,7 +159,7 @@ void SparseVectors::deserialize(IOReader* io_reader) { size_t indptr_size = vector_count + 1; indptr_.resize(indptr_size); - io_reader->read(indptr_.data(), sizeof(idx_t), indptr_size); + io_reader->read(indptr_.data(), sizeof(offset_t), indptr_size); size_t indices_size = indptr_[vector_count]; indices_.resize(indices_size); diff --git a/nsparse/sparse_vectors.h b/nsparse/sparse_vectors.h index a9b8d8a..437fc32 100644 --- a/nsparse/sparse_vectors.h +++ b/nsparse/sparse_vectors.h @@ -27,7 +27,7 @@ struct SparseVectorsConfig { }; struct SparseVectorsData { - const idx_t* indptr_data; + const offset_t* indptr_data; const term_t* indices_data; const float* values_data; }; @@ -45,6 +45,8 @@ class SparseVectors : public Serializable { SparseVectors(SparseVectors&& other) noexcept = default; SparseVectors& operator=(SparseVectors&& other) noexcept = default; + void reserve(size_t num_vectors, size_t total_nnz); + void add_vectors(const std::vector& indptr, const std::vector& indices, const std::vector& weights); @@ -65,7 +67,7 @@ class SparseVectors : public Serializable { std::vector get_dense_vector_float(idx_t vector_idx) const; std::vector get_dense_vector(idx_t vector_idx) const; - const idx_t* indptr_data() const { return indptr_.data(); } + const offset_t* indptr_data() const { return indptr_.data(); } const term_t* indices_data() const { return indices_.data(); } const float* values_data_float() const { return reinterpret_cast(values_.data()); @@ -88,7 +90,7 @@ class SparseVectors : public Serializable { void deserialize(IOReader* reader) override; private: - std::vector indptr_; + std::vector indptr_; std::vector indices_; std::vector values_; SparseVectorsConfig config_; diff --git a/nsparse/types.h b/nsparse/types.h index db969f1..5963f1f 100644 --- a/nsparse/types.h +++ b/nsparse/types.h @@ -16,6 +16,7 @@ namespace nsparse { using idx_t = int32_t; +using offset_t = int64_t; using term_t = uint16_t; using weight_t = float; diff --git a/nsparse/utils/distance.h b/nsparse/utils/distance.h index d5bbf66..8fcdeec 100644 --- a/nsparse/utils/distance.h +++ b/nsparse/utils/distance.h @@ -69,8 +69,8 @@ inline auto dot_product_vectors_dense(const SparseVectors* vectors, const auto* values = vectors->values_data(); for (size_t i = 0; i < n_vectors; ++i) { - const idx_t start = indptr[i]; - const idx_t end = indptr[i + 1]; + const offset_t start = indptr[i]; + const offset_t end = indptr[i + 1]; const size_t len = end - start; const term_t* idx_ptr = indices + start; // Cast to T* at the correct byte offset diff --git a/nsparse/utils/distance_avx2.h b/nsparse/utils/distance_avx2.h index 5c7f0cb..b70f23d 100644 --- a/nsparse/utils/distance_avx2.h +++ b/nsparse/utils/distance_avx2.h @@ -432,12 +432,12 @@ inline auto dot_product_float_vectors_dense(const SparseVectors* vectors, const auto& [indptr, indices, values] = vectors->get_all_data(); for (size_t i = 0; i < n_vectors; ++i) { - const idx_t start = indptr[i]; - const idx_t end = indptr[i + 1]; + const offset_t start = indptr[i]; + const offset_t end = indptr[i + 1]; const size_t len = end - start; if (i + 1 < n_vectors) { - const idx_t next_start = indptr[i + 1]; + const offset_t next_start = indptr[i + 1]; const size_t next_len = indptr[i + 2] - next_start; prefetch_vector(indices + next_start, values + next_start, next_len); @@ -454,17 +454,17 @@ inline auto dot_product_uint8_vectors_dense(const SparseVectors* vectors, size_t n_vectors = vectors->num_vectors(); std::vector results(n_vectors, 0); - const idx_t* indptr = vectors->indptr_data(); + const offset_t* indptr = vectors->indptr_data(); const term_t* indices = vectors->indices_data(); const uint8_t* values = vectors->typed_values_data(); for (size_t i = 0; i < n_vectors; ++i) { - const idx_t start = indptr[i]; - const idx_t end = indptr[i + 1]; + const offset_t start = indptr[i]; + const offset_t end = indptr[i + 1]; const size_t len = end - start; if (i + 1 < n_vectors) { - const idx_t next_start = indptr[i + 1]; + const offset_t next_start = indptr[i + 1]; const size_t next_len = indptr[i + 2] - next_start; prefetch_vector(indices + next_start, values + next_start, next_len); @@ -481,17 +481,17 @@ inline auto dot_product_uint16_vectors_dense(const SparseVectors* vectors, size_t n_vectors = vectors->num_vectors(); std::vector results(n_vectors, 0); - const idx_t* indptr = vectors->indptr_data(); + const offset_t* indptr = vectors->indptr_data(); const term_t* indices = vectors->indices_data(); const uint16_t* values = vectors->typed_values_data(); for (size_t i = 0; i < n_vectors; ++i) { - const idx_t start = indptr[i]; - const idx_t end = indptr[i + 1]; + const offset_t start = indptr[i]; + const offset_t end = indptr[i + 1]; const size_t len = end - start; if (i + 1 < n_vectors) { - const idx_t next_start = indptr[i + 1]; + const offset_t next_start = indptr[i + 1]; const size_t next_len = indptr[i + 2] - next_start; prefetch_vector(indices + next_start, values + next_start, next_len); diff --git a/nsparse/utils/distance_avx512.h b/nsparse/utils/distance_avx512.h index 1969e23..424f4df 100644 --- a/nsparse/utils/distance_avx512.h +++ b/nsparse/utils/distance_avx512.h @@ -445,12 +445,12 @@ inline auto dot_product_float_vectors_dense(const SparseVectors* vectors, const auto& [indptr, indices, values] = vectors->get_all_data(); for (size_t i = 0; i < n_vectors; ++i) { - const idx_t start = indptr[i]; - const idx_t end = indptr[i + 1]; + const offset_t start = indptr[i]; + const offset_t end = indptr[i + 1]; const size_t len = end - start; if (i + 1 < n_vectors) { - const idx_t next_start = indptr[i + 1]; + const offset_t next_start = indptr[i + 1]; const size_t next_len = indptr[i + 2] - next_start; prefetch_vector(indices + next_start, values + next_start, next_len); @@ -467,17 +467,17 @@ inline auto dot_product_uint8_vectors_dense(const SparseVectors* vectors, size_t n_vectors = vectors->num_vectors(); std::vector results(n_vectors, 0); - const idx_t* indptr = vectors->indptr_data(); + const offset_t* indptr = vectors->indptr_data(); const term_t* indices = vectors->indices_data(); const uint8_t* values = vectors->typed_values_data(); for (size_t i = 0; i < n_vectors; ++i) { - const idx_t start = indptr[i]; - const idx_t end = indptr[i + 1]; + const offset_t start = indptr[i]; + const offset_t end = indptr[i + 1]; const size_t len = end - start; if (i + 1 < n_vectors) { - const idx_t next_start = indptr[i + 1]; + const offset_t next_start = indptr[i + 1]; const size_t next_len = indptr[i + 2] - next_start; prefetch_vector(indices + next_start, values + next_start, next_len); @@ -494,17 +494,17 @@ inline auto dot_product_uint16_vectors_dense(const SparseVectors* vectors, size_t n_vectors = vectors->num_vectors(); std::vector results(n_vectors, 0); - const idx_t* indptr = vectors->indptr_data(); + const offset_t* indptr = vectors->indptr_data(); const term_t* indices = vectors->indices_data(); const uint16_t* values = vectors->typed_values_data(); for (size_t i = 0; i < n_vectors; ++i) { - const idx_t start = indptr[i]; - const idx_t end = indptr[i + 1]; + const offset_t start = indptr[i]; + const offset_t end = indptr[i + 1]; const size_t len = end - start; if (i + 1 < n_vectors) { - const idx_t next_start = indptr[i + 1]; + const offset_t next_start = indptr[i + 1]; const size_t next_len = indptr[i + 2] - next_start; prefetch_vector(indices + next_start, values + next_start, next_len); diff --git a/nsparse/utils/distance_neon.h b/nsparse/utils/distance_neon.h index 9674ff5..50a9d9b 100644 --- a/nsparse/utils/distance_neon.h +++ b/nsparse/utils/distance_neon.h @@ -457,12 +457,12 @@ inline auto dot_product_float_vectors_dense(const SparseVectors* vectors, const auto& [indptr, indices, values] = vectors->get_all_data(); for (size_t i = 0; i < n_vectors; ++i) { - const idx_t start = indptr[i]; - const idx_t end = indptr[i + 1]; + const offset_t start = indptr[i]; + const offset_t end = indptr[i + 1]; const size_t len = end - start; if (i + 1 < n_vectors) { - const idx_t next_start = indptr[i + 1]; + const offset_t next_start = indptr[i + 1]; const size_t next_len = indptr[i + 2] - next_start; prefetch_vector(indices + next_start, values + next_start, next_len); @@ -479,17 +479,17 @@ inline auto dot_product_uint8_vectors_dense(const SparseVectors* vectors, size_t n_vectors = vectors->num_vectors(); std::vector results(n_vectors, 0); - const idx_t* indptr = vectors->indptr_data(); + const offset_t* indptr = vectors->indptr_data(); const term_t* indices = vectors->indices_data(); const uint8_t* values = vectors->typed_values_data(); for (size_t i = 0; i < n_vectors; ++i) { - const idx_t start = indptr[i]; - const idx_t end = indptr[i + 1]; + const offset_t start = indptr[i]; + const offset_t end = indptr[i + 1]; const size_t len = end - start; if (i + 1 < n_vectors) { - const idx_t next_start = indptr[i + 1]; + const offset_t next_start = indptr[i + 1]; const size_t next_len = indptr[i + 2] - next_start; prefetch_vector(indices + next_start, values + next_start, next_len); @@ -506,17 +506,17 @@ inline auto dot_product_uint16_vectors_dense(const SparseVectors* vectors, size_t n_vectors = vectors->num_vectors(); std::vector results(n_vectors, 0); - const idx_t* indptr = vectors->indptr_data(); + const offset_t* indptr = vectors->indptr_data(); const term_t* indices = vectors->indices_data(); const uint16_t* values = vectors->typed_values_data(); for (size_t i = 0; i < n_vectors; ++i) { - const idx_t start = indptr[i]; - const idx_t end = indptr[i + 1]; + const offset_t start = indptr[i]; + const offset_t end = indptr[i + 1]; const size_t len = end - start; if (i + 1 < n_vectors) { - const idx_t next_start = indptr[i + 1]; + const offset_t next_start = indptr[i + 1]; const size_t next_len = indptr[i + 2] - next_start; prefetch_vector(indices + next_start, values + next_start, next_len); diff --git a/nsparse/utils/distance_sve.h b/nsparse/utils/distance_sve.h index c9a9428..8d6c52a 100644 --- a/nsparse/utils/distance_sve.h +++ b/nsparse/utils/distance_sve.h @@ -371,12 +371,12 @@ inline auto dot_product_float_vectors_dense(const SparseVectors* vectors, const auto& [indptr, indices, values] = vectors->get_all_data(); for (size_t i = 0; i < n_vectors; ++i) { - const idx_t start = indptr[i]; - const idx_t end = indptr[i + 1]; + const offset_t start = indptr[i]; + const offset_t end = indptr[i + 1]; const size_t len = end - start; if (i + 1 < n_vectors) { - const idx_t next_start = indptr[i + 1]; + const offset_t next_start = indptr[i + 1]; const size_t next_len = indptr[i + 2] - next_start; prefetch_vector(indices + next_start, values + next_start, next_len); @@ -393,17 +393,17 @@ inline auto dot_product_uint8_vectors_dense(const SparseVectors* vectors, size_t n_vectors = vectors->num_vectors(); std::vector results(n_vectors, 0); - const idx_t* indptr = vectors->indptr_data(); + const offset_t* indptr = vectors->indptr_data(); const term_t* indices = vectors->indices_data(); const uint8_t* values = vectors->typed_values_data(); for (size_t i = 0; i < n_vectors; ++i) { - const idx_t start = indptr[i]; - const idx_t end = indptr[i + 1]; + const offset_t start = indptr[i]; + const offset_t end = indptr[i + 1]; const size_t len = end - start; if (i + 1 < n_vectors) { - const idx_t next_start = indptr[i + 1]; + const offset_t next_start = indptr[i + 1]; const size_t next_len = indptr[i + 2] - next_start; prefetch_vector(indices + next_start, values + next_start, next_len); @@ -420,17 +420,17 @@ inline auto dot_product_uint16_vectors_dense(const SparseVectors* vectors, size_t n_vectors = vectors->num_vectors(); std::vector results(n_vectors, 0); - const idx_t* indptr = vectors->indptr_data(); + const offset_t* indptr = vectors->indptr_data(); const term_t* indices = vectors->indices_data(); const uint16_t* values = vectors->typed_values_data(); for (size_t i = 0; i < n_vectors; ++i) { - const idx_t start = indptr[i]; - const idx_t end = indptr[i + 1]; + const offset_t start = indptr[i]; + const offset_t end = indptr[i + 1]; const size_t len = end - start; if (i + 1 < n_vectors) { - const idx_t next_start = indptr[i + 1]; + const offset_t next_start = indptr[i + 1]; const size_t next_len = indptr[i + 2] - next_start; prefetch_vector(indices + next_start, values + next_start, next_len); diff --git a/nsparse/utils/prefetch.h b/nsparse/utils/prefetch.h index 72b8784..ca00b15 100644 --- a/nsparse/utils/prefetch.h +++ b/nsparse/utils/prefetch.h @@ -45,7 +45,7 @@ inline void prefetch_vector(const term_t* indices, const T* values, } } -inline void prefetch_indptr(const idx_t* indptr, idx_t doc_id) { +inline void prefetch_indptr(const offset_t* indptr, idx_t doc_id) { NSPARSE_PREFETCH(&indptr[doc_id], 0, 0); } diff --git a/nsparse/utils/scalar_quantizer.h b/nsparse/utils/scalar_quantizer.h index d25e0ca..7857cbc 100644 --- a/nsparse/utils/scalar_quantizer.h +++ b/nsparse/utils/scalar_quantizer.h @@ -15,6 +15,10 @@ #include #include +#if defined(__AVX512F__) && defined(__AVX512BW__) +#include +#endif + namespace nsparse { /// Quantization type for scalar quantizer @@ -44,9 +48,7 @@ class ScalarQuantizer { /// Encode array of values void encode(const float* vals, uint8_t* codes, size_t n) const { if (qtype_ == QuantizerType::QT_8bit) { - for (size_t i = 0; i < n; i++) { - codes[i] = encode_8bit(vals[i]); - } + encode_8bit_batch(vals, codes, n); } else { auto* codes16 = reinterpret_cast(codes); for (size_t i = 0; i < n; i++) { @@ -86,6 +88,88 @@ class ScalarQuantizer { } private: + void encode_8bit_batch(const float* vals, uint8_t* codes, size_t n) const { +#if defined(__AVX512F__) && defined(__AVX512BW__) + const float scale = kMax8bit / (vmax_ - vmin_); + const __m512 v_vmin = _mm512_set1_ps(vmin_); + const __m512 v_scale = _mm512_set1_ps(scale); + const __m512 v_zero = _mm512_setzero_ps(); + const __m512 v_max = _mm512_set1_ps(kMax8bit); + + size_t i = 0; + // Process 64 floats (4x16) per iteration → 64 uint8 output + for (; i + 64 <= n; i += 64) { + // Load and quantize 4 groups of 16 floats + __m512 f0 = _mm512_loadu_ps(vals + i); + __m512 f1 = _mm512_loadu_ps(vals + i + 16); + __m512 f2 = _mm512_loadu_ps(vals + i + 32); + __m512 f3 = _mm512_loadu_ps(vals + i + 48); + + // (val - vmin) * scale, clamped to [0, 255] + f0 = _mm512_min_ps(v_max, _mm512_max_ps(v_zero, + _mm512_mul_ps(_mm512_sub_ps(f0, v_vmin), v_scale))); + f1 = _mm512_min_ps(v_max, _mm512_max_ps(v_zero, + _mm512_mul_ps(_mm512_sub_ps(f1, v_vmin), v_scale))); + f2 = _mm512_min_ps(v_max, _mm512_max_ps(v_zero, + _mm512_mul_ps(_mm512_sub_ps(f2, v_vmin), v_scale))); + f3 = _mm512_min_ps(v_max, _mm512_max_ps(v_zero, + _mm512_mul_ps(_mm512_sub_ps(f3, v_vmin), v_scale))); + + // Convert to int32 with rounding + __m512i i0 = _mm512_cvt_roundps_epi32(f0, + _MM_FROUND_TO_NEAREST_INT | _MM_FROUND_NO_EXC); + __m512i i1 = _mm512_cvt_roundps_epi32(f1, + _MM_FROUND_TO_NEAREST_INT | _MM_FROUND_NO_EXC); + __m512i i2 = _mm512_cvt_roundps_epi32(f2, + _MM_FROUND_TO_NEAREST_INT | _MM_FROUND_NO_EXC); + __m512i i3 = _mm512_cvt_roundps_epi32(f3, + _MM_FROUND_TO_NEAREST_INT | _MM_FROUND_NO_EXC); + + // Pack 32-bit → 16-bit (saturating): 16+16 → 32 per pack + __m512i s01 = _mm512_packs_epi32(i0, i1); + __m512i s23 = _mm512_packs_epi32(i2, i3); + + // Pack 16-bit → 8-bit (unsigned saturating): 32+32 → 64 + __m512i bytes = _mm512_packus_epi16(s01, s23); + + // The packs interleave lanes, need permutation to restore order + // After packs_epi32(A,B): [A0..A3,B0..B3, A4..A7,B4..B7, + // A8..A11,B8..B11, A12..A15,B12..B15] + // After packus_epi16(AB,CD): complex lane interleaving + // Use a permute to fix the order + const __m512i perm = _mm512_set_epi32( + 15, 11, 7, 3, 14, 10, 6, 2, 13, 9, 5, 1, 12, 8, 4, 0); + bytes = _mm512_permutexvar_epi32(perm, bytes); + + _mm512_storeu_si512(codes + i, bytes); + } + + // Process remaining 16 floats at a time + for (; i + 16 <= n; i += 16) { + __m512 f = _mm512_loadu_ps(vals + i); + f = _mm512_min_ps(v_max, _mm512_max_ps(v_zero, + _mm512_mul_ps(_mm512_sub_ps(f, v_vmin), v_scale))); + __m512i iv = _mm512_cvt_roundps_epi32(f, + _MM_FROUND_TO_NEAREST_INT | _MM_FROUND_NO_EXC); + // Extract and store scalar (small tail, perf not critical) + alignas(64) int32_t tmp[16]; + _mm512_store_si512(tmp, iv); + for (int j = 0; j < 16; ++j) { + codes[i + j] = static_cast(tmp[j]); + } + } + + // Scalar tail + for (; i < n; i++) { + codes[i] = encode_8bit(vals[i]); + } +#else + for (size_t i = 0; i < n; i++) { + codes[i] = encode_8bit(vals[i]); + } +#endif + } + /// Encode a single float value to 8-bit [[nodiscard]] uint8_t encode_8bit(float val) const { float scaled = (val - vmin_) * (kMax8bit / (vmax_ - vmin_)); diff --git a/tests/id_map_index_test.cpp b/tests/id_map_index_test.cpp index 7b02f13..b6d3044 100644 --- a/tests/id_map_index_test.cpp +++ b/tests/id_map_index_test.cpp @@ -14,7 +14,10 @@ #include #include "nsparse/id_selector.h" +#include "nsparse/io/buffered_io.h" +#include "nsparse/io/index_io.h" #include "nsparse/seismic_index.h" +#include "nsparse/seismic_scalar_quantized_index.h" #include "nsparse/types.h" namespace { @@ -308,3 +311,50 @@ TEST_F(IDMapIndexTest, search_with_not_id_selector) { EXPECT_TRUE(label == 100 || label == 300 || label == -1); } } + +TEST(IDMapBuildAndSave, produces_loadable_index_with_sq_delegate) { + auto* sq = new nsparse::SeismicScalarQuantizedIndex( + nsparse::QuantizerType::QT_8bit, 0.0F, 1.0F, + {.lambda = 10, .beta = 2, .alpha = 0.5F}, 5); + auto* idmap = new nsparse::IDMapIndex(sq); + + std::vector indptr = {0, 2, 4, 6}; + std::vector indices = {0, 1, 2, 3, 0, 4}; + std::vector values = {1.0F, 0.5F, 0.8F, 0.6F, 0.9F, 0.7F}; + std::vector ids = {100, 200, 300}; + + idmap->add_with_ids(3, indptr.data(), indices.data(), values.data(), + ids.data()); + + // Streaming build_and_save (prepend IDMapIndex header) + nsparse::BufferedIOWriter writer; + auto id_val = nsparse::fourcc(nsparse::IDMapIndex::name); + int dim = 5; + writer.write(&id_val, sizeof(uint32_t), 1); + writer.write(&dim, sizeof(int), 1); + idmap->build_and_save(&writer); + + nsparse::BufferedIOReader reader(writer.data()); + nsparse::Index* loaded = nsparse::read_index(&reader); + + ASSERT_NE(loaded, nullptr); + EXPECT_EQ(loaded->get_vectors()->num_vectors(), 3); + + // Search should return external IDs + std::vector query_indptr = {0, 2}; + std::vector query_indices = {0, 1}; + std::vector query_values = {1.0F, 0.8F}; + std::vector labels(3, -1); + std::vector distances(3, -1.0F); + + nsparse::SeismicSearchParameters params(5, 1000.0F); + loaded->search(1, query_indptr.data(), query_indices.data(), + query_values.data(), 3, distances.data(), labels.data(), + ¶ms); + + // Should get external IDs back + EXPECT_TRUE(labels[0] == 100 || labels[0] == 200 || labels[0] == 300); + + delete loaded; + delete idmap; +} diff --git a/tests/seismic_common_test.cpp b/tests/seismic_common_test.cpp index 3b0162d..1da0e29 100644 --- a/tests/seismic_common_test.cpp +++ b/tests/seismic_common_test.cpp @@ -117,7 +117,7 @@ TEST(CalculateSummaryScores, multiple_vectors) { TEST(ComputeSimilarity, float_element_size) { // 2 docs: doc0 has indices {1}, values {2.0f}; doc1 has indices {0,2}, // values {1.0f, 3.0f} - std::vector indptr = {0, 1, 3}; + std::vector indptr = {0, 1, 3}; std::vector indices = {1, 0, 2}; // Values stored as raw bytes @@ -142,7 +142,7 @@ TEST(ComputeSimilarity, float_element_size) { } TEST(ComputeSimilarity, uint16_element_size) { - std::vector indptr = {0, 2}; + std::vector indptr = {0, 2}; std::vector indices = {0, 1}; std::vector values(2 * sizeof(uint16_t)); @@ -160,7 +160,7 @@ TEST(ComputeSimilarity, uint16_element_size) { } TEST(ComputeSimilarity, uint8_element_size) { - std::vector indptr = {0, 3}; + std::vector indptr = {0, 3}; std::vector indices = {0, 1, 2}; std::vector values = {2, 3, 4}; std::vector dense = {10, 20, 30}; @@ -172,7 +172,7 @@ TEST(ComputeSimilarity, uint8_element_size) { } TEST(ComputeSimilarity, empty_doc) { - std::vector indptr = {0, 0}; + std::vector indptr = {0, 0}; std::vector indices = {}; std::vector values = {}; std::vector dense = {1, 2, 3}; @@ -276,5 +276,42 @@ TEST(ShouldRunExactMatch, array_selector_also_enumerable) { EXPECT_TRUE(should_run_exact_match(&selector, 5, nullptr)); } +// ---- build_inverted_lists_clusters dimension boundary test ---- + +TEST(BuildInvertedListsClusters, dimension_65536_no_overflow) { + // Regression test: when dimension=65536, batch_end=65536 must not be + // cast to term_t (uint16_t) which would overflow to 0. + SparseVectorsConfig cfg{.element_size = U8, .dimension = 65536}; + SparseVectors vectors(cfg); + + // 10 vectors, each with 5 entries at high term IDs (near 65535) + std::vector indptr = {0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50}; + std::vector indices; + std::vector values; + for (int doc = 0; doc < 10; ++doc) { + for (int j = 0; j < 5; ++j) { + indices.push_back(static_cast(65530 + j)); + values.push_back(static_cast(100 + j)); + } + } + vectors.add_vectors(indptr.data(), indptr.size(), indices.data(), + indices.size(), values.data(), values.size()); + ASSERT_EQ(vectors.num_vectors(), 10); + + SeismicClusterParameters params = {.lambda = 10, .beta = 2, .alpha = 0.4F}; + auto result = build_inverted_lists_clusters(&vectors, cfg, params); + ASSERT_EQ(result.size(), 65536); + + // Terms 65530..65534 should have non-empty clusters + size_t non_empty = 0; + for (size_t i = 65530; i < 65535; ++i) { + if (result[i].cluster_size() > 0) { + non_empty++; + } + } + EXPECT_GT(non_empty, 0) + << "Expected non-empty inverted lists for high term IDs"; +} + } // namespace detail } // namespace nsparse diff --git a/tests/seismic_scalar_quantized_index_test.cpp b/tests/seismic_scalar_quantized_index_test.cpp index acd70d7..9c641aa 100644 --- a/tests/seismic_scalar_quantized_index_test.cpp +++ b/tests/seismic_scalar_quantized_index_test.cpp @@ -818,5 +818,93 @@ TEST(SeismicSQIndexSearch, search_with_id_selector_filters_results) { EXPECT_EQ(labels[2], -1); } +// ============== build_and_save tests ============== + +TEST(SeismicSQBuildAndSave, produces_loadable_index) { + TestableSeismicSQIndex original(QuantizerType::QT_8bit, 0.0F, 1.0F, 10, 2, + 0.5F, 4); + + original.add_docs({{{0, 1.0F}, {1, 0.5F}}, + {{0, 0.8F}, {2, 0.6F}}, + {{1, 0.9F}, {3, 0.7F}}}); + + // build_and_save(IOWriter*) writes body only; prepend header for read_index + BufferedIOWriter writer; + auto id_val = fourcc(SeismicScalarQuantizedIndex::name); + int dim = 4; + writer.write(&id_val, sizeof(uint32_t), 1); + writer.write(&dim, sizeof(int), 1); + original.build_and_save(&writer); + + BufferedIOReader reader(writer.data()); + Index* loaded = read_index(&reader); + + ASSERT_NE(loaded, nullptr); + EXPECT_EQ(loaded->get_dimension(), 4); + EXPECT_EQ(loaded->get_vectors()->num_vectors(), 3); + + delete loaded; +} + +TEST(SeismicSQBuildAndSave, search_matches_regular_build) { + TestableSeismicSQIndex streaming(QuantizerType::QT_8bit, 0.0F, 1.0F, 10, 2, + 0.5F, 4); + TestableSeismicSQIndex regular(QuantizerType::QT_8bit, 0.0F, 1.0F, 10, 2, + 0.5F, 4); + + std::vector> docs = { + {{0, 1.0F}, {1, 0.5F}}, + {{0, 0.8F}, {2, 0.6F}}, + {{1, 0.9F}, {3, 0.7F}}}; + + streaming.add_docs(docs); + regular.add_docs(docs); + + // Regular path: build + write + regular.build(); + BufferedIOWriter regular_writer; + write_index(®ular, ®ular_writer); + + // Streaming path: build_and_save (prepend header) + BufferedIOWriter streaming_writer; + auto id_val = fourcc(SeismicScalarQuantizedIndex::name); + int dim = 4; + streaming_writer.write(&id_val, sizeof(uint32_t), 1); + streaming_writer.write(&dim, sizeof(int), 1); + streaming.build_and_save(&streaming_writer); + + // Load both + BufferedIOReader regular_reader(regular_writer.data()); + Index* regular_loaded = read_index(®ular_reader); + + BufferedIOReader streaming_reader(streaming_writer.data()); + Index* streaming_loaded = read_index(&streaming_reader); + + // Search both with same query + std::vector query_indptr = {0, 2}; + std::vector query_indices = {0, 1}; + std::vector query_values = {1.0F, 0.8F}; + + std::vector labels_regular(3, -1); + std::vector distances_regular(3, -1.0F); + std::vector labels_streaming(3, -1); + std::vector distances_streaming(3, -1.0F); + + SeismicSearchParameters params(5, 1000.0F); + regular_loaded->search(1, query_indptr.data(), query_indices.data(), + query_values.data(), 3, distances_regular.data(), + labels_regular.data(), ¶ms); + streaming_loaded->search(1, query_indptr.data(), query_indices.data(), + query_values.data(), 3, distances_streaming.data(), + labels_streaming.data(), ¶ms); + + EXPECT_EQ(labels_regular[0], labels_streaming[0]); + EXPECT_EQ(labels_regular[1], labels_streaming[1]); + EXPECT_EQ(labels_regular[2], labels_streaming[2]); + + delete regular_loaded; + delete streaming_loaded; +} + } // namespace } // namespace nsparse diff --git a/tests/sparse_vectors_test.cpp b/tests/sparse_vectors_test.cpp index 8c2aba9..7f7464e 100644 --- a/tests/sparse_vectors_test.cpp +++ b/tests/sparse_vectors_test.cpp @@ -284,7 +284,7 @@ TEST(SparseVectors, indptr_data) { reinterpret_cast(values.data()), values.size() * sizeof(float)); - const nsparse::idx_t* indptr = vectors.indptr_data(); + const nsparse::offset_t* indptr = vectors.indptr_data(); ASSERT_EQ(indptr[0], 0); ASSERT_EQ(indptr[1], 2); }