From c7867da4090b2f97a4ad0f908edeb6ca5a89e5fa Mon Sep 17 00:00:00 2001 From: root Date: Mon, 30 Mar 2026 17:13:12 +0000 Subject: [PATCH 1/8] vb-11: add efSearch, vb-12: fix creation of CAGRA grid tests --- generate-combinations.py | 191 +++------ .../benchmarks/BenchmarkConfiguration.java | 21 +- .../cuvs/benchmarks/LuceneCuvsBenchmarks.java | 370 +++++++++++------- 3 files changed, 297 insertions(+), 285 deletions(-) diff --git a/generate-combinations.py b/generate-combinations.py index fbb93dc..3f4c77a 100644 --- a/generate-combinations.py +++ b/generate-combinations.py @@ -1,4 +1,3 @@ - import itertools import argparse import sys @@ -42,7 +41,11 @@ invariants["vectorDimension"] = dataset_info["vector_dimension"] print("sweep: " + sweep) for param, value in sweeps[sweep].get("common-params", {}).items(): - if not isinstance(value, list): + # efSearch is always passed through as a list — Java handles the iteration + if param == 'efSearch': + # Ensure it's always a list + invariants[param] = value if isinstance(value, list) else [value] + elif not isinstance(value, list): invariants[param] = value else: variants[param] = value @@ -54,7 +57,10 @@ for param, value in algorithms[algo].items(): if param not in ["params"]: - if not isinstance(value, list): + # efSearch is always passed through as a list — Java handles the iteration + if param == 'efSearch': + algo_invariants[param] = value if isinstance(value, list) else [value] + elif not isinstance(value, list): algo_invariants[param] = value else: algo_variants[param] = value @@ -62,138 +68,53 @@ # Generate all combination of variants. For each combination, generate a hashed ID, and a file with the # name pattern as --.json. The file should contain the invariants as is, and the variants as the current combination. if algo_variants: - # Separate efSearch from other variants if it exists - efSearch_values = None - other_variant_keys = [] - other_variant_values = [] - - for key, value in algo_variants.items(): - if key == 'efSearch': - efSearch_values = value - else: - other_variant_keys.append(key) - other_variant_values.append(value) - - # Generate combinations with efSearch at the beginning (innermost loop) - if efSearch_values and other_variant_keys: - # Generate combinations of other parameters first - for other_combination in itertools.product(*other_variant_values): - other_variants = dict(zip(other_variant_keys, other_combination)) - # Then iterate through efSearch values - for ef_index, ef_value in enumerate(efSearch_values): - current_variants = other_variants.copy() - current_variants['efSearch'] = ef_value - - # Skip if cagraIntermediateDegree < cagraGraphDegree - if 'cagraIntermediateDegree' in current_variants and 'cagraGraphDegree' in current_variants: - if current_variants['cagraIntermediateDegree'] < current_variants['cagraGraphDegree']: - print(f"\t\tSkipping combination: cagraIntermediateDegree ({current_variants['cagraIntermediateDegree']}) < cagraGraphDegree ({current_variants['cagraGraphDegree']})") - continue - - # Skip if hnswMaxConn > hnswBeamWidth - if 'hnswMaxConn' in current_variants and 'hnswBeamWidth' in current_variants: - if current_variants['hnswMaxConn'] > current_variants['hnswBeamWidth']: - print(f"\t\tSkipping combination: hnswMaxConn ({current_variants['hnswMaxConn']}) > hnswBeamWidth ({current_variants['hnswBeamWidth']})") - continue - - # Generate hash only from other_variants (excluding efSearch) - base_hash = hashlib.md5(json.dumps(other_variants, sort_keys=True).encode()).hexdigest()[:8] - hash_id = f"{base_hash}-ef{ef_value}" - - config = algo_invariants.copy() - config.update(current_variants) - - # For multiple efSearch combinations: subsequent ones skip indexing - if len(efSearch_values) > 1 and ef_index > 0: - config['skipIndexing'] = True - - # Set cleanIndexDirectory based on position - if ef_index == 0: - config['cleanIndexDirectory'] = False - elif ef_index == len(efSearch_values) - 1: - config['cleanIndexDirectory'] = True - else: - config['cleanIndexDirectory'] = False - - # Use base_hash for index directory paths - if 'hnswIndexDirPath' in config: - config['hnswIndexDirPath'] = f"hnswIndex-{base_hash}" - if 'cuvsIndexDirPath' in config: - config['cuvsIndexDirPath'] = f"cuvsIndex-{base_hash}" - - filename = f"{algo}-{hash_id}.json" - sweep_dir = f"{args.configs_dir}/{sweep}" - filepath = f"{sweep_dir}/{filename}" - os.makedirs(sweep_dir, exist_ok=True) - with open(filepath, 'w') as f: - json.dump(config, f, indent=2) - print(f"\tGenerated config file: {filepath}") - elif efSearch_values: - # Only efSearch values, no other variants - for ef_index, ef_value in enumerate(efSearch_values): - current_variants = {'efSearch': ef_value} - # Generate hash from empty dict since no other variants exist - base_hash = hashlib.md5(json.dumps({}, sort_keys=True).encode()).hexdigest()[:8] - hash_id = f"{base_hash}-ef{ef_value}" - - config = algo_invariants.copy() - config.update(current_variants) - - # For multiple efSearch combinations: subsequent ones skip indexing - if len(efSearch_values) > 1 and ef_index > 0: - config['skipIndexing'] = True - - # Set cleanIndexDirectory based on position - if ef_index == 0: - config['cleanIndexDirectory'] = False - elif ef_index == len(efSearch_values) - 1: - config['cleanIndexDirectory'] = True - else: - config['cleanIndexDirectory'] = False - - # Use base_hash for index directory paths - if 'hnswIndexDirPath' in config: - config['hnswIndexDirPath'] = f"hnswIndex-{base_hash}" - if 'cuvsIndexDirPath' in config: - config['cuvsIndexDirPath'] = f"cuvsIndex-{base_hash}" - - filename = f"{algo}-{hash_id}.json" - sweep_dir = f"{args.configs_dir}/{sweep}" - filepath = f"{sweep_dir}/{filename}" - os.makedirs(sweep_dir, exist_ok=True) - with open(filepath, 'w') as f: - json.dump(config, f, indent=2) - print(f"\tGenerated config file: {filepath}") - else: - # No efSearch, use original logic - variant_keys = list(algo_variants.keys()) - variant_values = list(algo_variants.values()) - for combination in itertools.product(*variant_values): - current_variants = dict(zip(variant_keys, combination)) - - # Skip if cagraIntermediateDegree < cagraGraphDegree - if 'cagraIntermediateDegree' in current_variants and 'cagraGraphDegree' in current_variants: - if current_variants['cagraIntermediateDegree'] < current_variants['cagraGraphDegree']: - print(f"\t\tSkipping combination: cagraIntermediateDegree ({current_variants['cagraIntermediateDegree']}) < cagraGraphDegree ({current_variants['cagraGraphDegree']})") - continue - - # Skip if hnswMaxConn > hnswBeamWidth - if 'hnswMaxConn' in current_variants and 'hnswBeamWidth' in current_variants: - if current_variants['hnswMaxConn'] > current_variants['hnswBeamWidth']: - print(f"\t\tSkipping combination: hnswMaxConn ({current_variants['hnswMaxConn']}) > hnswBeamWidth ({current_variants['hnswBeamWidth']})") - continue - - hash_id = hashlib.md5(json.dumps(current_variants, sort_keys=True).encode()).hexdigest()[:8] - - config = algo_invariants.copy() - config.update(current_variants) - filename = f"{algo}-{hash_id}.json" - sweep_dir = f"{args.configs_dir}/{sweep}" - filepath = f"{sweep_dir}/{filename}" - os.makedirs(sweep_dir, exist_ok=True) - with open(filepath, 'w') as f: - json.dump(config, f, indent=2) - print(f"\tGenerated config file: {filepath}") + variant_keys = list(algo_variants.keys()) + variant_values = list(algo_variants.values()) + for combination in itertools.product(*variant_values): + current_variants = dict(zip(variant_keys, combination)) + + hash_id = hashlib.md5(json.dumps(current_variants, sort_keys=True).encode()).hexdigest()[:8] + + config = algo_invariants.copy() + config.update(current_variants) + + # Skip if cagraIntermediateGraphDegree < cagraGraphDegree + # (CAGRA silently clamps graphDegree down to intermediateGraphDegree, + # which produces duplicate test runs) + if config.get('cagraIntermediateGraphDegree', float('inf')) < config.get('cagraGraphDegree', 0): + print(f"\t\tSkipping combination: cagraIntermediateGraphDegree ({config['cagraIntermediateGraphDegree']}) < cagraGraphDegree ({config['cagraGraphDegree']})") + continue + + # Skip if hnswMaxConn > hnswBeamWidth + if config.get('hnswMaxConn', 0) > config.get('hnswBeamWidth', float('inf')): + print(f"\t\tSkipping combination: hnswMaxConn ({config['hnswMaxConn']}) > hnswBeamWidth ({config['hnswBeamWidth']})") + continue + + # Set indexDirPath based on hash + config['indexDirPath'] = f"index-{hash_id}" + + filename = f"{algo}-{hash_id}.json" + sweep_dir = f"{args.configs_dir}/{sweep}" + filepath = f"{sweep_dir}/{filename}" + os.makedirs(sweep_dir, exist_ok=True) + with open(filepath, 'w') as f: + json.dump(config, f, indent=2) + print(f"\tGenerated config file: {filepath}") + else: + # No variants at all, just generate a single config + hash_id = hashlib.md5(json.dumps(algo_invariants, sort_keys=True).encode()).hexdigest()[:8] + config = algo_invariants.copy() + + # Set indexDirPath based on hash + config['indexDirPath'] = f"index-{hash_id}" + + filename = f"{algo}-{hash_id}.json" + sweep_dir = f"{args.configs_dir}/{sweep}" + filepath = f"{sweep_dir}/{filename}" + os.makedirs(sweep_dir, exist_ok=True) + with open(filepath, 'w') as f: + json.dump(config, f, indent=2) + print(f"\tGenerated config file: {filepath}") print("----------------------") diff --git a/src/main/java/com/searchscale/lucene/cuvs/benchmarks/BenchmarkConfiguration.java b/src/main/java/com/searchscale/lucene/cuvs/benchmarks/BenchmarkConfiguration.java index 1d97838..35e208a 100644 --- a/src/main/java/com/searchscale/lucene/cuvs/benchmarks/BenchmarkConfiguration.java +++ b/src/main/java/com/searchscale/lucene/cuvs/benchmarks/BenchmarkConfiguration.java @@ -5,6 +5,7 @@ import com.nvidia.cuvs.CagraIndexParams.CudaDataType; import com.nvidia.cuvs.CagraIndexParams.CuvsDistanceType; import com.searchscale.lucene.cuvs.benchmarks.LuceneCuvsBenchmarks.Codex; +import java.util.List; public class BenchmarkConfiguration { @@ -47,7 +48,7 @@ public class BenchmarkConfiguration { public int cagraITopK; public int cagraSearchWidth; public int cagraHnswLayers; // layers in CAGRA->HNSW conversion - public int efSearch; + public List efSearch; // e.g. [64] or [64, 128, 256] public CagraGraphBuildAlgo cagraGraphBuildAlgo; // CAGRA IVF_PQ parameters @@ -89,11 +90,22 @@ public boolean isCagraHNSWScalar() { return Codex.CAGRA_HNSW_SCALAR.equals(algoToRun); } - public int getEffectiveEfSearch() { - if (efSearch > 0) { + /** + * Returns the list of efSearch values to use during search. + * + *

If {@code efSearch} is set in the config JSON (e.g. [64, 128, 256]), + * those values are returned directly. Otherwise, falls back to a single-element + * list containing a default derived from topK. + * + *

The benchmark runner iterates over these values and runs search once per value + * against the same index — no rebuild is needed. + */ + public List getEfSearchValues() { + if (efSearch != null && !efSearch.isEmpty()) { return efSearch; } - return Math.max(topK, (int) Math.ceil(topK * 1.5)); + // Default: 1.5x topK, but at least topK + return List.of(Math.max(topK, (int) Math.ceil(topK * 1.5))); } public String prettyString() { @@ -128,6 +140,7 @@ public String prettyString() { sb.append("Enable TieredMerge: ").append(enableTieredMerge).append('\n'); sb.append("Num HNSW merge threads: ").append(hnswMergeThreads).append('\n'); sb.append("enableIndexWriterInfoStream: ").append(enableIndexWriterInfoStream).append('\n'); + sb.append("efSearch: ").append(getEfSearchValues()).append('\n'); sb.append("------- algo parameters ------\n"); if (isLucene()) { diff --git a/src/main/java/com/searchscale/lucene/cuvs/benchmarks/LuceneCuvsBenchmarks.java b/src/main/java/com/searchscale/lucene/cuvs/benchmarks/LuceneCuvsBenchmarks.java index 4798c7e..b06b140 100644 --- a/src/main/java/com/searchscale/lucene/cuvs/benchmarks/LuceneCuvsBenchmarks.java +++ b/src/main/java/com/searchscale/lucene/cuvs/benchmarks/LuceneCuvsBenchmarks.java @@ -296,43 +296,87 @@ public static void main(String[] args) throws Throwable { } } + // Resolve the list of efSearch values to iterate over. + // The index is built once above; we search it once per efSearch value. + List efSearchValues = config.getEfSearchValues(); + log.info( + "Will run search with {} efSearch value(s): {}", efSearchValues.size(), efSearchValues); + + // Read ground truth once (shared across all efSearch runs) + List groundTruth = Util.readGroundTruthFile(config.groundTruthFile); + Directory indexDir = MMapDirectory.open(Path.of(config.indexDirPath)); log.info("Index directory is: {} (using memory-mapped files)", indexDir); - log.info("Querying documents using {} ...", config.algoToRun); - // Always use standard Lucene search since we always create Lucene HNSW indexes - search( - indexDir, - config, - metrics, - queryResults, - Util.readGroundTruthFile(config.groundTruthFile)); - - Util.calculateRecallAccuracy(queryResults, metrics, config.algoToRun); - - String resultsJson = - Util.newObjectMapper() - .writerWithDefaultPrettyPrinter() - .writeValueAsString(Map.of("configuration", config, "metrics", metrics)); - - if (config.saveResultsOnDisk) { - // Use the resultsDirectory directly if provided - String resultsDir = config.resultsDirectory != null ? config.resultsDirectory : "results"; - File results = new File(resultsDir); - if (!results.exists()) { - results.mkdirs(); - } - // Save results.json directly to the specified directory - FileUtils.write( - new File(results.toString() + "/results.json"), resultsJson, Charset.forName("UTF-8")); + // Snapshot indexing-only metrics before the loop so that each efSearch run + // starts from the same base and doesn't inherit results from prior runs. + Map indexingMetrics = new LinkedHashMap<>(metrics); + + for (int efSearch : efSearchValues) { + log.info("--- Running search with efSearch={} ---", efSearch); + + // Fresh collections for this efSearch run + List efSearchQueryResults = + Collections.synchronizedList(new ArrayList()); + Map efSearchMetrics = new LinkedHashMap(); + + // Copy over indexing metrics (only) so they appear in every result file + efSearchMetrics.putAll(indexingMetrics); + efSearchMetrics.put("efSearch", efSearch); - // Save CSV with neighbors data - Util.writeCSV(queryResults, results.toString() + "/neighbors.csv"); + log.info("Querying documents using {} with efSearch={} ...", config.algoToRun, efSearch); + search(indexDir, config, efSearchMetrics, efSearchQueryResults, groundTruth, efSearch); - log.info("Results saved to directory: {}", resultsDir); + Util.calculateRecallAccuracy(efSearchQueryResults, efSearchMetrics, config.algoToRun); + + String resultsJson = + Util.newObjectMapper() + .writerWithDefaultPrettyPrinter() + .writeValueAsString(Map.of("configuration", config, "metrics", efSearchMetrics)); + + if (config.saveResultsOnDisk) { + // Use the resultsDirectory directly if provided + String resultsDir = config.resultsDirectory != null ? config.resultsDirectory : "results"; + + // When there are multiple efSearch values, create a subdirectory per value + if (efSearchValues.size() > 1) { + resultsDir = resultsDir + "/efSearch_" + efSearch; + } + + File results = new File(resultsDir); + if (!results.exists()) { + results.mkdirs(); + } + + // Save results.json directly to the specified directory + FileUtils.write( + new File(results.toString() + "/results.json"), + resultsJson, + Charset.forName("UTF-8")); + + // Save CSV with neighbors data + Util.writeCSV(efSearchQueryResults, results.toString() + "/neighbors.csv"); + + log.info("Results for efSearch={} saved to directory: {}", efSearch, resultsDir); + } + + log.info( + "\n-----\nMetrics for efSearch={}: {}\n{}\n-----", + efSearch, + efSearchMetrics, + resultsJson); + + // Accumulate per-efSearch metrics into the top-level metrics map + for (Map.Entry entry : efSearchMetrics.entrySet()) { + if (efSearchValues.size() > 1) { + metrics.put("efSearch_" + efSearch + "/" + entry.getKey(), entry.getValue()); + } else { + metrics.put(entry.getKey(), entry.getValue()); + } + } } - log.info("\n-----\nOverall metrics: " + metrics + "\nMetrics: \n" + resultsJson + "\n-----"); + log.info("\n-----\nOverall metrics: {}\n-----", metrics); // Close the index directory before cleaning indexDir.close(); @@ -430,12 +474,23 @@ private static void indexDocuments( writer.close(); } + /** + * Runs search queries against the given index directory using the specified efSearch value. + * + * @param directory the Lucene index directory to search + * @param config benchmark configuration + * @param metrics map to populate with search performance metrics + * @param queryResults list to populate with per-query results + * @param groundTruth ground truth neighbor lists for recall calculation + * @param efSearch the efSearch (number of candidates) to use for this search run + */ private static void search( Directory directory, BenchmarkConfiguration config, Map metrics, List queryResults, - List groundTruth) { + List groundTruth, + int efSearch) { DB db = null; try (IndexReader indexReader = DirectoryReader.open(directory)) { @@ -481,123 +536,130 @@ private static void search( for (int t = 0; t < config.queryThreads; t++) { pool.submit( () -> { - while (queryId.getAndIncrement() <= config.numQueriesToRun) { - int currentQueryId = queryId.get(); - KnnFloatVectorQuery query; - - if (config.algoToRun.equals(Codex.CAGRA_SEARCH)) { - int effectiveEfSearch = config.getEffectiveEfSearch(); - query = - new GPUKnnFloatVectorQuery( - config.vectorColName, - queries.get(currentQueryId), - effectiveEfSearch, - null, - config.cagraITopK, - config.cagraSearchWidth); - } else { - int effectiveEfSearch = config.getEffectiveEfSearch(); - query = - new KnnFloatVectorQuery( - config.vectorColName, queries.get(currentQueryId), effectiveEfSearch); + while (true) { + int currentQueryId = queryId.getAndIncrement(); + if (currentQueryId >= config.numQueriesToRun) { + break; } - - TopDocs topDocs; - long searchStartTime = System.nanoTime(); try { - int effectiveEfSearch = config.getEffectiveEfSearch(); - TopScoreDocCollectorManager collectorManager = - new TopScoreDocCollectorManager( - effectiveEfSearch, null, Integer.MAX_VALUE, true); - topDocs = indexSearcher.search(query, collectorManager); - } catch (IOException e) { - throw new RuntimeException("Problem during executing a query: ", e); - } - double searchTimeTakenMs = - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchStartTime); - // log.info("End to end search took: " + searchTimeTakenMs); - if (currentQueryId > config.numWarmUpQueries) { - queryLatencies.put(queryId.get(), searchTimeTakenMs); - } - int finishedCount = queriesFinished.incrementAndGet(); - - // Log progress every 1000 queries - if (finishedCount % 1000 == 0 || finishedCount == config.numQueriesToRun) { - log.info( - "Done querying " - + finishedCount - + " out of " - + config.numQueriesToRun - + " queries."); - } - - ScoreDoc[] hits = topDocs.scoreDocs; - List neighbors = new ArrayList<>(); - List scores = new ArrayList<>(); + KnnFloatVectorQuery query; + + if (config.algoToRun.equals(Codex.CAGRA_SEARCH)) { + query = + new GPUKnnFloatVectorQuery( + config.vectorColName, + queries.get(currentQueryId), + efSearch, + null, + config.cagraITopK, + config.cagraSearchWidth); + } else { + query = + new KnnFloatVectorQuery( + config.vectorColName, queries.get(currentQueryId), efSearch); + } - // Debug: Log search results for first query - if (queryId.get() == 0) { - log.info( - "Debug: First query returned " - + hits.length - + " hits (ef-search candidates)"); - log.info( - "Debug: Will select top " - + config.topK - + " from " - + hits.length - + " candidates"); - } - int numResultsToTake = Math.min(config.topK, hits.length); - long retrievalStartTime = System.nanoTime(); - for (int i = 0; i < numResultsToTake; i++) { - ScoreDoc hit = hits[i]; + TopDocs topDocs; + long searchStartTime = System.nanoTime(); try { - Document d = indexReader.storedFields().document(hit.doc); - neighbors.add(Integer.parseInt(d.get("id"))); + TopScoreDocCollectorManager collectorManager = + new TopScoreDocCollectorManager(efSearch, null, Integer.MAX_VALUE, true); + topDocs = indexSearcher.search(query, collectorManager); } catch (IOException e) { - e.printStackTrace(); + throw new RuntimeException("Problem during executing a query: ", e); + } + double searchTimeTakenMs = + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchStartTime); + if (currentQueryId > config.numWarmUpQueries) { + queryLatencies.put(queryId.get(), searchTimeTakenMs); + } + int finishedCount = queriesFinished.incrementAndGet(); + + // Log progress every 1000 queries + if (finishedCount % 1000 == 0 || finishedCount == config.numQueriesToRun) { + log.info( + "Done querying " + + finishedCount + + " out of " + + config.numQueriesToRun + + " queries."); } - scores.add(hit.score); - } - double retrievalTimeTakenMs = - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - retrievalStartTime); - if (currentQueryId > config.numWarmUpQueries) { - retrievalLatencies.put(queryId.get(), retrievalTimeTakenMs); - } - // Debug: Log results for all queries - log.debug( - "Query " - + currentQueryId - + " - First 5 neighbors: " - + neighbors.subList(0, Math.min(5, neighbors.size()))); - log.debug( - "Query " - + currentQueryId - + " - First 5 distances: " - + scores.subList(0, Math.min(5, scores.size()))); - int[] expectedNeighbors = groundTruth.get(currentQueryId); - log.debug( - "Query " - + currentQueryId - + " - Expected neighbors: " - + java.util.Arrays.toString( - java.util.Arrays.copyOf( - expectedNeighbors, Math.min(5, expectedNeighbors.length)))); - - if (currentQueryId > config.numWarmUpQueries) { - QueryResult result = - new QueryResult( - config.algoToRun.toString(), - currentQueryId, - neighbors, - groundTruth.get(currentQueryId), - scores, - searchTimeTakenMs); - queryResults.add(result); - } else { - log.info("Skipping warmup query: {}", currentQueryId); + ScoreDoc[] hits = topDocs.scoreDocs; + List neighbors = new ArrayList<>(); + List scores = new ArrayList<>(); + + // Debug: Log search results for first query + if (queryId.get() == 0) { + log.info( + "Debug: First query returned " + + hits.length + + " hits (ef-search candidates)"); + log.info( + "Debug: Will select top " + + config.topK + + " from " + + hits.length + + " candidates"); + } + int numResultsToTake = Math.min(config.topK, hits.length); + long retrievalStartTime = System.nanoTime(); + for (int i = 0; i < numResultsToTake; i++) { + ScoreDoc hit = hits[i]; + try { + Document d = indexReader.storedFields().document(hit.doc); + neighbors.add(Integer.parseInt(d.get("id"))); + } catch (IOException e) { + e.printStackTrace(); + } + scores.add(hit.score); + } + double retrievalTimeTakenMs = + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - retrievalStartTime); + if (currentQueryId > config.numWarmUpQueries) { + retrievalLatencies.put(queryId.get(), retrievalTimeTakenMs); + } + + // Debug: Log results for all queries + log.debug( + "Query " + + currentQueryId + + " - First 5 neighbors: " + + neighbors.subList(0, Math.min(5, neighbors.size()))); + log.debug( + "Query " + + currentQueryId + + " - First 5 distances: " + + scores.subList(0, Math.min(5, scores.size()))); + int[] expectedNeighbors = groundTruth.get(currentQueryId); + log.debug( + "Query " + + currentQueryId + + " - Expected neighbors: " + + java.util.Arrays.toString( + java.util.Arrays.copyOf( + expectedNeighbors, Math.min(5, expectedNeighbors.length)))); + + if (currentQueryId > config.numWarmUpQueries) { + QueryResult result = + new QueryResult( + config.algoToRun.toString(), + currentQueryId, + neighbors, + groundTruth.get(currentQueryId), + scores, + searchTimeTakenMs); + queryResults.add(result); + } else { + log.info("Skipping warmup query: {}", currentQueryId); + } + } catch (Exception e) { + log.error( + "Exception during query {}: {} - {}", + currentQueryId, + e.getClass().getSimpleName(), + e.getMessage(), + e); } } }); @@ -611,17 +673,33 @@ private static void search( metrics.put(config.algoToRun + "-query-time", (endTime - startTime)); metrics.put( config.algoToRun + "-query-throughput", - (config.numQueriesToRun / ((endTime - startTime) / 1000.0))); + (endTime - startTime) > 0 + ? (config.numQueriesToRun / ((endTime - startTime) / 1000.0)) + : 0.0); double avgLatency = - new ArrayList<>(queryLatencies.values()).stream().reduce(0.0, Double::sum) - / queryLatencies.size(); + queryLatencies.isEmpty() + ? 0.0 + : new ArrayList<>(queryLatencies.values()).stream().reduce(0.0, Double::sum) + / queryLatencies.size(); double avgRetLatency = - new ArrayList<>(retrievalLatencies.values()).stream().reduce(0.0, Double::sum) - / retrievalLatencies.size(); + retrievalLatencies.isEmpty() + ? 0.0 + : new ArrayList<>(retrievalLatencies.values()).stream().reduce(0.0, Double::sum) + / retrievalLatencies.size(); metrics.put(config.algoToRun + "-mean-latency", avgLatency); metrics.put(config.algoToRun + "-mean-retrieval-latency", avgRetLatency); + // Log warning if no queries completed successfully + if (queryLatencies.isEmpty()) { + log.error( + "WARNING: Zero queries completed successfully! " + + "Check the query thread exception logs above for the root cause. " + + "queriesFinished={}, queryResults.size={}", + queriesFinished.get(), + queryResults.size()); + } + int segmentCount = indexReader.leaves().size(); metrics.put(config.algoToRun + "-segment-count", segmentCount); From 2d1884b9f2993d00f273c178a7724bd8419d9842 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Mon, 30 Mar 2026 18:25:42 +0000 Subject: [PATCH 2/8] vb-10: fix typo in build-index benchmarks plot --- convert_to_nvidia_format.py | 17 +++++++++++------ run_pareto_analysis.sh | 29 +++++++++++++++++++---------- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/convert_to_nvidia_format.py b/convert_to_nvidia_format.py index 54f3931..d3caa2f 100755 --- a/convert_to_nvidia_format.py +++ b/convert_to_nvidia_format.py @@ -7,10 +7,14 @@ from typing import List, Dict, Optional, Tuple -def create_index_name(config: Dict) -> str: - """Create index name from configuration parameters""" +def create_index_name(config: Dict, metrics: Dict) -> str: + """Create index name from configuration and metrics. + + efSearch is read from metrics (where it's a scalar int per search run) + rather than config (where it's now a list of values to sweep). + """ algorithm = config.get('algoToRun', 'UNKNOWN') - ef_search = config.get('efSearch', 0) + ef_search = metrics.get('efSearch', 0) if algorithm in ['LUCENE_HNSW', 'hnsw']: beam_width = config.get('hnswBeamWidth', 0) @@ -38,7 +42,7 @@ def convert_results_to_nvidia_format(results_json_path: str, output_dir: str, da elif algorithm in ['hnsw', 'LUCENE_HNSW']: algorithm = 'LUCENE_HNSW' - index_name = create_index_name(config) + index_name = create_index_name(config, metrics) recall_key = next((key for key in metrics.keys() if 'recall-accuracy' in key.lower()), None) if not recall_key: @@ -97,12 +101,13 @@ def convert_results_to_nvidia_format(results_json_path: str, output_dir: str, da if build_time_key: build_time_ms = float(metrics[build_time_key]) + build_time_s = build_time_ms / 1000.0 build_benchmark = { "name": f"{algorithm}/{index_name}", - "real_time": build_time_ms, + "real_time": build_time_s, "iterations": 1, - "time_unit": "ms", + "time_unit": "s", "run_name": "run_1", "run_type": "iteration", "repetitions": 1, diff --git a/run_pareto_analysis.sh b/run_pareto_analysis.sh index 33020a8..1466617 100755 --- a/run_pareto_analysis.sh +++ b/run_pareto_analysis.sh @@ -81,9 +81,14 @@ import csv import json import glob -def create_index_name_from_config(config): +def create_index_name_from_results(config, metrics): + \"\"\"Create index name from config and metrics. + + efSearch is read from metrics (scalar int per search run) + rather than config (now a list of values to sweep). + \"\"\" algorithm = config.get('algoToRun', 'UNKNOWN') - ef_search = config.get('efSearch', 0) + ef_search = metrics.get('efSearch', 0) if algorithm in ['LUCENE_HNSW', 'hnsw']: beam_width = config.get('hnswBeamWidth', 0) @@ -144,13 +149,17 @@ for algorithm, pareto_indices in pareto_runs_by_algo.items(): index_to_dir = {} for benchmark_dir in benchmark_dirs: - results_json_path = os.path.join(benchmark_dir, 'results.json') - if os.path.exists(results_json_path): + # Walk into subdirectories to find efSearch_* results + for root, dirs, files in os.walk(benchmark_dir): + if 'results.json' not in files: + continue + results_json_path = os.path.join(root, 'results.json') try: with open(results_json_path, 'r') as f: results_data = json.load(f) config = results_data['configuration'] + metrics = results_data['metrics'] algo_to_run = config.get('algoToRun') algorithm_match = False @@ -160,11 +169,11 @@ for algorithm, pareto_indices in pareto_runs_by_algo.items(): algorithm_match = True if algorithm_match: - index_name = create_index_name_from_config(config) + index_name = create_index_name_from_results(config, metrics) if index_name not in index_to_dir: - index_to_dir[index_name] = benchmark_dir + index_to_dir[index_name] = root except Exception as e: - print(f' Error processing {benchmark_dir}: {e}') + print(f' Error processing {root}: {e}') print(f'Mapped {len(index_to_dir)} configurations') @@ -172,8 +181,8 @@ for algorithm, pareto_indices in pareto_runs_by_algo.items(): unmatched = 0 for index_name, pareto_run in pareto_indices.items(): if index_name in index_to_dir: - benchmark_dir = index_to_dir[index_name] - is_pareto_file = os.path.join(benchmark_dir, 'is_pareto') + result_dir = index_to_dir[index_name] + is_pareto_file = os.path.join(result_dir, 'is_pareto') with open(is_pareto_file, 'w') as f: f.write(f'Pareto optimal run\\n') @@ -217,4 +226,4 @@ echo "" echo "Final output:" echo "- Pareto optimal runs marked with is_pareto files" echo "- Plots: ${OUTPUT_DIR}/plots/" -echo "- No intermediate files (completely cleaned up)" \ No newline at end of file +echo "- No intermediate files (completely cleaned up)" From d4ad340dd81a0990155130076549c9e9e883dbcc Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Tue, 7 Apr 2026 17:50:56 +0000 Subject: [PATCH 3/8] fix vb-11/12 mistake in LuceneEuvsBenchmarks --- .../lucene/cuvs/benchmarks/LuceneCuvsBenchmarks.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/searchscale/lucene/cuvs/benchmarks/LuceneCuvsBenchmarks.java b/src/main/java/com/searchscale/lucene/cuvs/benchmarks/LuceneCuvsBenchmarks.java index b06b140..d62d05f 100644 --- a/src/main/java/com/searchscale/lucene/cuvs/benchmarks/LuceneCuvsBenchmarks.java +++ b/src/main/java/com/searchscale/lucene/cuvs/benchmarks/LuceneCuvsBenchmarks.java @@ -571,7 +571,7 @@ private static void search( double searchTimeTakenMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchStartTime); if (currentQueryId > config.numWarmUpQueries) { - queryLatencies.put(queryId.get(), searchTimeTakenMs); + queryLatencies.put(currentQueryId, searchTimeTakenMs); } int finishedCount = queriesFinished.incrementAndGet(); @@ -617,7 +617,7 @@ private static void search( double retrievalTimeTakenMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - retrievalStartTime); if (currentQueryId > config.numWarmUpQueries) { - retrievalLatencies.put(queryId.get(), retrievalTimeTakenMs); + retrievalLatencies.put(currentQueryId, retrievalTimeTakenMs); } // Debug: Log results for all queries From f6eb9320524519d22da520c2f9342158e40c88e8 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Wed, 29 Apr 2026 01:53:04 +0000 Subject: [PATCH 4/8] vb-15a: fix corner-case timing in generate-combinations.py --- generate-combinations.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/generate-combinations.py b/generate-combinations.py index 3f4c77a..98300b3 100644 --- a/generate-combinations.py +++ b/generate-combinations.py @@ -73,8 +73,6 @@ for combination in itertools.product(*variant_values): current_variants = dict(zip(variant_keys, combination)) - hash_id = hashlib.md5(json.dumps(current_variants, sort_keys=True).encode()).hexdigest()[:8] - config = algo_invariants.copy() config.update(current_variants) @@ -91,6 +89,8 @@ continue # Set indexDirPath based on hash + hash_input = {k: v for k, v in config.items() if k != 'indexDirPath'} + hash_id = hashlib.md5(json.dumps(hash_input, sort_keys=True, default=str).encode()).hexdigest()[:8] config['indexDirPath'] = f"index-{hash_id}" filename = f"{algo}-{hash_id}.json" From 07fea3864b62ddfc1a44802a4b872cdc5cb0b09f Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Wed, 29 Apr 2026 01:55:05 +0000 Subject: [PATCH 5/8] vb-15b: allocate/reserve dynamic java heap-space --- run_sweep.sh | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/run_sweep.sh b/run_sweep.sh index 8d768fb..b512d41 100755 --- a/run_sweep.sh +++ b/run_sweep.sh @@ -1,5 +1,11 @@ #!/bin/bash +export MAVEN_OPTS="-Xmx80g -Xms8g \ + -XX:+UseG1GC \ + -XX:MinHeapFreeRatio=5 \ + -XX:MaxHeapFreeRatio=15 \ + -XX:G1PeriodicGCInterval=5000" + # Parse command-line arguments while getopts ":-:" opt; do case $OPTARG in From 85ed22a76a3c92deac44dd2d3d5cf55bd764673f Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Wed, 29 Apr 2026 01:59:08 +0000 Subject: [PATCH 6/8] vb-15c: preserve intermediate paraeto data --- run_pareto_analysis.sh | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/run_pareto_analysis.sh b/run_pareto_analysis.sh index 1466617..d37d815 100755 --- a/run_pareto_analysis.sh +++ b/run_pareto_analysis.sh @@ -220,10 +220,11 @@ ls -la "${OUTPUT_DIR}/plots"/*.png echo "" echo "Cleaning up intermediate files..." -rm -rf "${INTERMEDIATE_DIR}" -echo "Intermediate files cleaned up!" +# rm -rf "${INTERMEDIATE_DIR}" +# echo "Intermediate files cleaned up!" +echo "Intermediate files left intact!" echo "" echo "Final output:" echo "- Pareto optimal runs marked with is_pareto files" echo "- Plots: ${OUTPUT_DIR}/plots/" -echo "- No intermediate files (completely cleaned up)" +echo "- Yes intermediate files are still present ;)" From 7121c37e2383188392d5f5b0142a38cc4cc09f63 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Wed, 29 Apr 2026 02:02:10 +0000 Subject: [PATCH 7/8] vb-15d: - added forceMerge-versions of nLists and nProbes since those often need to be scaled differently depending on segment size - logic to support avoiding java heapspace OOMs - fix nanosecond to millisecond conversion trunctation error --- .../benchmarks/BenchmarkConfiguration.java | 36 ++ .../cuvs/benchmarks/LuceneCuvsBenchmarks.java | 380 ++++++++++-------- 2 files changed, 258 insertions(+), 158 deletions(-) diff --git a/src/main/java/com/searchscale/lucene/cuvs/benchmarks/BenchmarkConfiguration.java b/src/main/java/com/searchscale/lucene/cuvs/benchmarks/BenchmarkConfiguration.java index 35e208a..54b5c99 100644 --- a/src/main/java/com/searchscale/lucene/cuvs/benchmarks/BenchmarkConfiguration.java +++ b/src/main/java/com/searchscale/lucene/cuvs/benchmarks/BenchmarkConfiguration.java @@ -70,6 +70,32 @@ public class BenchmarkConfiguration { public int cuVSIvfPqSearchParamsNProbes = 20; public double cuVSIvfPqSearchParamsPreferredShmemCarveout = 1.0; + // ── IVF-PQ overrides for the forceMerge phase ────────────────────────────── + /** + * nLists to use when building the CAGRA graph during {@code forceMerge}. + * + *

During flush, segments are small so {@link #cuVSIvfPqIndexParamsNLists} is + * sized accordingly. A force-merged segment can be 4-8x larger; using the flush + * value here leaves too few vectors per cluster, degrading graph quality. + * + *

Per the NVIDIA cuVS documentation, {@code n_rows / n_lists} should fall in + * the range 1,000–10,000. Set this to match your merged segment size, e.g. + * 10 M vectors / 5,000 nLists ≈ 2,000 vectors/cluster. + * + *

{@code 0} (default) = use {@link #cuVSIvfPqIndexParamsNLists} unchanged. + */ + public int cuVSIvfPqIndexParamsForceMergeNLists = 0; + + /** + * nProbes to use when searching the IVF-PQ index during {@code forceMerge}. + * + *

With a larger nLists at merge time, nProbes should be scaled proportionally + * to maintain the same recall during CAGRA graph construction. + * + *

{@code 0} (default) = use {@link #cuVSIvfPqSearchParamsNProbes} unchanged. + */ + public int cuVSIvfPqSearchParamsForceMergeNProbes = 0; + public boolean isLucene() { return Codex.LUCENE_HNSW.equals(algoToRun); } @@ -141,6 +167,16 @@ public String prettyString() { sb.append("Num HNSW merge threads: ").append(hnswMergeThreads).append('\n'); sb.append("enableIndexWriterInfoStream: ").append(enableIndexWriterInfoStream).append('\n'); sb.append("efSearch: ").append(getEfSearchValues()).append('\n'); + sb.append("nLists (flush): ").append(cuVSIvfPqIndexParamsNLists).append('\n'); + sb.append("nLists (forceMerge): ") + .append(cuVSIvfPqIndexParamsForceMergeNLists) + .append(cuVSIvfPqIndexParamsForceMergeNLists == 0 ? " (same as flush)" : "") + .append('\n'); + sb.append("nProbes (flush): ").append(cuVSIvfPqSearchParamsNProbes).append('\n'); + sb.append("nProbes (forceMerge): ") + .append(cuVSIvfPqSearchParamsForceMergeNProbes) + .append(cuVSIvfPqSearchParamsForceMergeNProbes == 0 ? " (same as flush)" : "") + .append('\n'); sb.append("------- algo parameters ------\n"); if (isLucene()) { diff --git a/src/main/java/com/searchscale/lucene/cuvs/benchmarks/LuceneCuvsBenchmarks.java b/src/main/java/com/searchscale/lucene/cuvs/benchmarks/LuceneCuvsBenchmarks.java index d62d05f..a7cb615 100644 --- a/src/main/java/com/searchscale/lucene/cuvs/benchmarks/LuceneCuvsBenchmarks.java +++ b/src/main/java/com/searchscale/lucene/cuvs/benchmarks/LuceneCuvsBenchmarks.java @@ -88,22 +88,16 @@ public enum Codex { */ private static void setPerThreadRAMLimit(IndexWriterConfig config, int limitMB) { try { - // First, try to find the field in IndexWriterConfig java.lang.reflect.Field field = null; Class clazz = config.getClass(); - - // Try to find the field in the class hierarchy while (clazz != null && field == null) { try { field = clazz.getDeclaredField("perThreadHardLimitMB"); } catch (NoSuchFieldException e) { - // Try superclass clazz = clazz.getSuperclass(); } } - if (field == null) { - // If not found in IndexWriterConfig, try LiveIndexWriterConfig clazz = config.getClass().getSuperclass(); while (clazz != null && field == null) { try { @@ -113,7 +107,6 @@ private static void setPerThreadRAMLimit(IndexWriterConfig config, int limitMB) } } } - if (field != null) { field.setAccessible(true); field.setInt(config, limitMB); @@ -126,6 +119,67 @@ private static void setPerThreadRAMLimit(IndexWriterConfig config, int limitMB) } } + // ── Writer-config helpers ────────────────────────────────────────────────── + + /** + * Builds an {@link IndexWriterConfig} for the flush phase. + * Uses {@link NoMergePolicy} unless {@code enableTieredMerge} is set, + * in which case background auto-merges are allowed during indexing. + * forceMerge is never triggered here — that is the responsibility of + * the separate merge writer created in {@code main()}. + */ + private static IndexWriterConfig buildFlushWriterConfig( + BenchmarkConfiguration config, Codec codec) { + IndexWriterConfig iwc = new IndexWriterConfig(new StandardAnalyzer()); + iwc.setCodec(codec); + iwc.setMaxBufferedDocs(config.flushFreq); + iwc.setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH); + if (config.enableTieredMerge) { + iwc.setMergePolicy(new TieredMergePolicy()); + } else { + iwc.setMergePolicy(NoMergePolicy.INSTANCE); + } + setPerThreadRAMLimit(iwc, 10240); // 10 GB per thread + if (config.enableIndexWriterInfoStream) { + iwc.setInfoStream(new PrintStreamInfoStream(System.out)); + } + log.info( + "Configured flush writer — MaxBufferedDocs: {}, RAMBufferSizeMB: {}, PerThreadRAMLimit: {}" + + " MB", + iwc.getMaxBufferedDocs(), + iwc.getRAMBufferSizeMB(), + iwc.getRAMPerThreadHardLimitMB()); + return iwc; + } + + /** + * Builds an {@link IndexWriterConfig} for the forceMerge phase. + * Always uses a {@link TieredMergePolicy} tuned so that the policy ceiling + * never silently caps a single-segment merge of a large index. + */ + private static IndexWriterConfig buildMergeWriterConfig( + BenchmarkConfiguration config, Codec mergeCodec) { + IndexWriterConfig iwc = new IndexWriterConfig(new StandardAnalyzer()); + iwc.setCodec(mergeCodec); + TieredMergePolicy tmp = new TieredMergePolicy(); + if (config.forceMerge == 1) { + // With 10M x 1536-dim vectors the full index is ~60+ GiB. + // Raise the ceiling above the full index size so the policy never + // silently caps forceMerge(1) at ~6 segments. + tmp.setMaxMergedSegmentMB(150 * 1024); // 150 GiB in MB + tmp.setSegmentsPerTier(2); + tmp.setMaxMergeAtOnce(20); + } + iwc.setMergePolicy(tmp); + setPerThreadRAMLimit(iwc, 10240); + if (config.enableIndexWriterInfoStream) { + iwc.setInfoStream(new PrintStreamInfoStream(System.out)); + } + return iwc; + } + + // ── Main ────────────────────────────────────────────────────────────────── + public static void main(String[] args) throws Throwable { if (args.length < 1 || args.length > 3) { @@ -136,15 +190,13 @@ public static void main(String[] args) throws Throwable { BenchmarkConfiguration config = Util.newObjectMapper().readValue(new File(args[0]), BenchmarkConfiguration.class); - // Override benchmarkID if provided as command line argument if (args.length >= 2) { config.benchmarkID = args[1]; } - - // Override resultsDirectory if provided as command line argument if (args.length >= 3) { config.resultsDirectory = args[2]; } + Map metrics = new LinkedHashMap(); List queryResults = Collections.synchronizedList(new ArrayList()); config.debugPrintArguments(); @@ -160,21 +212,17 @@ public static void main(String[] args) throws Throwable { long parseStartTime = System.currentTimeMillis(); - // Check if dataset is .fvecs or .fbin format and handle it directly if (config.datasetFile.contains("fvecs") || config.datasetFile.contains("fbin")) { log.info("Detected .fvecs or .fbin file format. Loading directly without MapDB..."); - if (config.loadVectorsInMemory) { log.info("Loading all vectors in memory (loadVectorsInMemory is enabled)"); long start = System.currentTimeMillis(); List loadedVectors = new ArrayList(); - if (config.datasetFile.contains("fbin")) { FBIvecsReader.readFbin(config.datasetFile, config.numDocs, loadedVectors); } else { FBIvecsReader.readFvecs(config.datasetFile, config.numDocs, loadedVectors); } - vectorProvider = new MemoryVectorProvider(loadedVectors); log.info( "Time taken to load {} vectors in-memory: {} ms", @@ -184,13 +232,10 @@ public static void main(String[] args) throws Throwable { log.info("Creating streaming vector provider (loadVectorsInMemory is disabled)"); vectorProvider = new StreamingVectorProvider(config.datasetFile, config.numDocs); } - titles.add(config.vectorColName); } else { - // Use MapDB for non-.fvecs files (CSV, bvecs, etc.) DB db; IndexTreeList vectors; - if (new File(datasetMapdbFile).exists() == false) { log.info("Mapdb file not found for dataset. Preparing one ..."); db = DBMaker.fileDB(datasetMapdbFile).make(); @@ -207,7 +252,6 @@ public static void main(String[] args) throws Throwable { vectors = db.indexTreeList("vectors", SERIALIZER.FLOAT_ARRAY).createOrOpen(); log.info("{} vectors available from the mapdb file", vectors.size()); } - if (config.loadVectorsInMemory) { log.info( "Mapdb loaded. Now loading all vectors in memory (loadVectorsInMemory is enabled)"); @@ -230,65 +274,77 @@ public static void main(String[] args) throws Throwable { (System.currentTimeMillis() - parseStartTime)); try { - // [2] Benchmarking setup + // [2] Benchmarking setup — indexing if (!config.skipIndexing) { - IndexWriter writer; - - // HNSW Writer: - IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new StandardAnalyzer()); - indexWriterConfig.setCodec(getCodec(config)); - indexWriterConfig.setMaxBufferedDocs(config.flushFreq); - indexWriterConfig.setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH); - - if (config.forceMerge > 0 || config.enableTieredMerge) { - indexWriterConfig.setMergePolicy(new TieredMergePolicy()); - } else { - indexWriterConfig.setMergePolicy(NoMergePolicy.INSTANCE); - } - // Use reflection to bypass the 2048MB per-thread limit and set it to 10GB - setPerThreadRAMLimit(indexWriterConfig, 10240); // 10GB per thread - log.info( - "Configured HNSW writer - MaxBufferedDocs: {}, RAMBufferSizeMB: {}, PerThreadRAMLimit:" - + " {} MB", - config.flushFreq, - indexWriterConfig.getRAMBufferSizeMB(), - indexWriterConfig.getRAMPerThreadHardLimitMB()); + // ── Phase 1: flush (no forceMerge) ──────────────────────────────────── + // Build the flush writer with the flush-time codec. The merge policy is + // NoMergePolicy (or TieredMergePolicy if enableTieredMerge), but forceMerge + // is never called on this writer. That happens in Phase 2 below, using a + // separate writer that carries the merge-time nLists/nProbes. + Codec flushCodec = getCodec(config, false); + IndexWriterConfig flushWriterConfig = buildFlushWriterConfig(config, flushCodec); + Directory indexDirectory; if (!config.createIndexInMemory) { - Path hnswIndex = Path.of(config.indexDirPath); - writer = new IndexWriter(FSDirectory.open(hnswIndex), indexWriterConfig); + indexDirectory = FSDirectory.open(Path.of(config.indexDirPath)); } else { - writer = new IndexWriter(new ByteBuffersDirectory(), indexWriterConfig); - } - - if (config.enableIndexWriterInfoStream) { - indexWriterConfig.setInfoStream(new PrintStreamInfoStream(System.out)); + indexDirectory = new ByteBuffersDirectory(); } - var formatName = writer.getConfig().getCodec().knnVectorsFormat().getName(); - + IndexWriter flushWriter = new IndexWriter(indexDirectory, flushWriterConfig); + var formatName = flushWriter.getConfig().getCodec().knnVectorsFormat().getName(); log.info("Indexing documents using {} ...", formatName); + long indexStartTime = System.currentTimeMillis(); - indexDocuments(writer, config, titles, vectorProvider); + indexDocuments(flushWriter, config, titles, vectorProvider); long indexTimeTaken = System.currentTimeMillis() - indexStartTime; - metrics.put(config.algoToRun + "-indexing-time", indexTimeTaken); - log.info("Time taken for index building (end to end): {} ms", indexTimeTaken); + // ── Phase 2: forceMerge with merge-time codec ────────────────────────── + // Opens a fresh IndexWriter on the same directory so the codec — and + // therefore nLists/nProbes — can differ from the flush writer above. + // Only runs when forceMerge > 0. + if (config.forceMerge > 0) { + int effectiveNLists = + config.cuVSIvfPqIndexParamsForceMergeNLists > 0 + ? config.cuVSIvfPqIndexParamsForceMergeNLists + : config.cuVSIvfPqIndexParamsNLists; + int effectiveNProbes = + config.cuVSIvfPqSearchParamsForceMergeNProbes > 0 + ? config.cuVSIvfPqSearchParamsForceMergeNProbes + : config.cuVSIvfPqSearchParamsNProbes; + log.info( + "Starting forceMerge phase — target segments: {}, nLists: {} (flush was: {})," + + " nProbes: {} (flush was: {})", + config.forceMerge, + effectiveNLists, + config.cuVSIvfPqIndexParamsNLists, + effectiveNProbes, + config.cuVSIvfPqSearchParamsNProbes); + + Codec mergeCodec = getCodec(config, true); + IndexWriterConfig mergeWriterConfig = buildMergeWriterConfig(config, mergeCodec); + IndexWriter mergeWriter = new IndexWriter(indexDirectory, mergeWriterConfig); + long mergeStart = System.currentTimeMillis(); + mergeWriter.forceMerge(config.forceMerge); + mergeWriter.commit(); + mergeWriter.close(); + log.info("forceMerge completed in {} ms", (System.currentTimeMillis() - mergeStart)); + } + + // ── Index size reporting ─────────────────────────────────────────────── try { - if (writer.getDirectory() instanceof FSDirectory) { + if (indexDirectory instanceof FSDirectory) { Path indexPath = Paths.get(config.indexDirPath); long directorySize; try (var stream = Files.walk(indexPath, FileVisitOption.FOLLOW_LINKS)) { directorySize = stream.filter(p -> p.toFile().isFile()).mapToLong(p -> p.toFile().length()).sum(); } - double directorySizeGB = directorySize / 1_073_741_824.0; metrics.put(config.algoToRun + "-index-size", directorySizeGB); - log.info("Size of {}: {} GB", indexPath.toString(), directorySizeGB); } } catch (IOException e) { @@ -296,31 +352,23 @@ public static void main(String[] args) throws Throwable { } } - // Resolve the list of efSearch values to iterate over. - // The index is built once above; we search it once per efSearch value. + // [3] Search — one run per efSearch value List efSearchValues = config.getEfSearchValues(); log.info( "Will run search with {} efSearch value(s): {}", efSearchValues.size(), efSearchValues); - // Read ground truth once (shared across all efSearch runs) List groundTruth = Util.readGroundTruthFile(config.groundTruthFile); - Directory indexDir = MMapDirectory.open(Path.of(config.indexDirPath)); log.info("Index directory is: {} (using memory-mapped files)", indexDir); - // Snapshot indexing-only metrics before the loop so that each efSearch run - // starts from the same base and doesn't inherit results from prior runs. Map indexingMetrics = new LinkedHashMap<>(metrics); for (int efSearch : efSearchValues) { log.info("--- Running search with efSearch={} ---", efSearch); - // Fresh collections for this efSearch run List efSearchQueryResults = Collections.synchronizedList(new ArrayList()); Map efSearchMetrics = new LinkedHashMap(); - - // Copy over indexing metrics (only) so they appear in every result file efSearchMetrics.putAll(indexingMetrics); efSearchMetrics.put("efSearch", efSearch); @@ -335,28 +383,19 @@ public static void main(String[] args) throws Throwable { .writeValueAsString(Map.of("configuration", config, "metrics", efSearchMetrics)); if (config.saveResultsOnDisk) { - // Use the resultsDirectory directly if provided String resultsDir = config.resultsDirectory != null ? config.resultsDirectory : "results"; - - // When there are multiple efSearch values, create a subdirectory per value if (efSearchValues.size() > 1) { resultsDir = resultsDir + "/efSearch_" + efSearch; } - File results = new File(resultsDir); if (!results.exists()) { results.mkdirs(); } - - // Save results.json directly to the specified directory FileUtils.write( new File(results.toString() + "/results.json"), resultsJson, Charset.forName("UTF-8")); - - // Save CSV with neighbors data Util.writeCSV(efSearchQueryResults, results.toString() + "/neighbors.csv"); - log.info("Results for efSearch={} saved to directory: {}", efSearch, resultsDir); } @@ -366,7 +405,6 @@ public static void main(String[] args) throws Throwable { efSearchMetrics, resultsJson); - // Accumulate per-efSearch metrics into the top-level metrics map for (Map.Entry entry : efSearchMetrics.entrySet()) { if (efSearchValues.size() > 1) { metrics.put("efSearch_" + efSearch + "/" + entry.getKey(), entry.getValue()); @@ -377,14 +415,10 @@ public static void main(String[] args) throws Throwable { } log.info("\n-----\nOverall metrics: {}\n-----", metrics); - - // Close the index directory before cleaning indexDir.close(); - // Clean index directory after benchmarks complete if requested if (config.cleanIndexDirectory && !config.createIndexInMemory) { Path indexPath = Path.of(config.indexDirPath); - if (indexPath != null) { try { log.info("Cleaning index directory: {}", indexPath); @@ -400,15 +434,24 @@ public static void main(String[] args) throws Throwable { vectorProvider.close(); } } + if (executorService != null && !executorService.isShutdown()) { executorService.shutdown(); executorService.close(); - // Need the following as a temporary fix for now as the process gets stuck in case of - // LUCENE_HNSW when using multiple merge threads + // Temporary fix: prevents process getting stuck on LUCENE_HNSW with multiple merge threads System.exit(0); } } + // ── Indexing (flush only — forceMerge intentionally removed) ────────────── + + /** + * Indexes all documents into the given writer, commits, and closes it. + * + *

forceMerge is intentionally not called here. It is handled in + * {@code main()} by a separate {@link IndexWriter} that carries different + * IVF-PQ parameters (nLists / nProbes) via the merge codec. + */ private static void indexDocuments( IndexWriter writer, BenchmarkConfiguration config, @@ -432,7 +475,7 @@ private static void indexDocuments( while (true) { int id = numDocsIndexed.getAndIncrement(); if (id >= numDocsToIndex) { - break; // done + break; } float[] vector; try { @@ -451,7 +494,6 @@ private static void indexDocuments( (id + 1), writer.getPendingNumDocs()); } - // Log when we expect a flush if ((id + 1) == config.flushFreq || (id + 1) == 2 * config.flushFreq) { log.info("Expected flush point reached at {} documents", (id + 1)); } @@ -464,16 +506,13 @@ private static void indexDocuments( pool.shutdown(); pool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); - if (config.forceMerge > 0) { - log.info("Force merge is enabled, force merging into " + config.forceMerge + " segments"); - writer.forceMerge(config.forceMerge); - } - log.info("Calling commit."); writer.commit(); writer.close(); } + // ── Search ──────────────────────────────────────────────────────────────── + /** * Runs search queries against the given index directory using the specified efSearch value. * @@ -503,7 +542,6 @@ private static void search( log.info("No mapdb file found for queries. Reading source files to build one ..."); db = DBMaker.fileDB(queryMapdbFile).make(); queries = db.indexTreeList("vectors", SERIALIZER.FLOAT_ARRAY).createOrOpen(); - if (config.queryFile.endsWith(".csv")) { for (String line : FileUtils.readFileToString(new File(config.queryFile), "UTF-8").split("\n")) { @@ -543,7 +581,6 @@ private static void search( } try { KnnFloatVectorQuery query; - if (config.algoToRun.equals(Codex.CAGRA_SEARCH)) { query = new GPUKnnFloatVectorQuery( @@ -568,14 +605,12 @@ private static void search( } catch (IOException e) { throw new RuntimeException("Problem during executing a query: ", e); } - double searchTimeTakenMs = - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchStartTime); + double searchTimeTakenMs = (System.nanoTime() - searchStartTime) / 1_000_000.0; + if (currentQueryId > config.numWarmUpQueries) { queryLatencies.put(currentQueryId, searchTimeTakenMs); } int finishedCount = queriesFinished.incrementAndGet(); - - // Log progress every 1000 queries if (finishedCount % 1000 == 0 || finishedCount == config.numQueriesToRun) { log.info( "Done querying " @@ -589,7 +624,6 @@ private static void search( List neighbors = new ArrayList<>(); List scores = new ArrayList<>(); - // Debug: Log search results for first query if (queryId.get() == 0) { log.info( "Debug: First query returned " @@ -615,12 +649,11 @@ private static void search( scores.add(hit.score); } double retrievalTimeTakenMs = - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - retrievalStartTime); + (System.nanoTime() - retrievalStartTime) / 1_000_000.0; if (currentQueryId > config.numWarmUpQueries) { retrievalLatencies.put(currentQueryId, retrievalTimeTakenMs); } - // Debug: Log results for all queries log.debug( "Query " + currentQueryId @@ -667,7 +700,6 @@ private static void search( pool.shutdown(); pool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); - long endTime = System.currentTimeMillis(); metrics.put(config.algoToRun + "-query-time", (endTime - startTime)); @@ -686,11 +718,9 @@ private static void search( ? 0.0 : new ArrayList<>(retrievalLatencies.values()).stream().reduce(0.0, Double::sum) / retrievalLatencies.size(); - metrics.put(config.algoToRun + "-mean-latency", avgLatency); metrics.put(config.algoToRun + "-mean-retrieval-latency", avgRetLatency); - // Log warning if no queries completed successfully if (queryLatencies.isEmpty()) { log.error( "WARNING: Zero queries completed successfully! " @@ -713,7 +743,19 @@ private static void search( } } - private static Codec getCodec(BenchmarkConfiguration config) throws Exception { + // ── Codec factory ───────────────────────────────────────────────────────── + + /** + * Builds the Lucene {@link Codec} for the given phase. + * + * @param config benchmark configuration + * @param forMerge {@code true} → substitute + * {@link BenchmarkConfiguration#cuVSIvfPqIndexParamsForceMergeNLists} and + * {@link BenchmarkConfiguration#cuVSIvfPqSearchParamsForceMergeNProbes} + * when non-zero (merge-time codec); + * {@code false} → always use the flush-time values. + */ + private static Codec getCodec(BenchmarkConfiguration config, boolean forMerge) throws Exception { if (config.algoToRun.equals(Codex.LUCENE_HNSW)) { log.info("<<< Using Lucene101Codec >>>"); return new Lucene101Codec(Mode.BEST_SPEED) { @@ -734,76 +776,98 @@ public KnnVectorsFormat getKnnVectorsFormatForField(String field) { return new HighDimensionKnnVectorsFormat(knnFormat, config.vectorDimension); } }; - } else { + } - CuVSIvfPqIndexParams ciip = - new CuVSIvfPqIndexParams.Builder() - .withAddDataOnBuild(config.cuVSIvfPqIndexParamsAddDataOnBuild) - .withCodebookKind(config.cuVSIvfPqIndexParamsCodebookKind) - .withConservativeMemoryAllocation( - config.cuVSIvfPqIndexParamsConservativeMemoryAllocation) - .withForceRandomRotation(config.cuVSIvfPqIndexParamsForceRandomRotation) - .withKmeansNIters(config.cuVSIvfPqIndexParamsKmeansNIters) - .withKmeansTrainsetFraction(config.cuVSIvfPqIndexParamsKmeansTrainsetFraction) - .withMaxTrainPointsPerPqCode(config.cuVSIvfPqIndexParamsMaxTrainPointsPerPqCode) - .withMetric(config.cuVSIvfPqIndexParamsMetric) - .withMetricArg(config.cuVSIvfPqIndexParamsMetricArg) - .withNLists(config.cuVSIvfPqIndexParamsNLists) - .withPqBits(config.cuVSIvfPqIndexParamsPqBits) - .withPqDim(config.cuVSIvfPqIndexParamsPqDim) - .build(); + // ── Resolve effective nLists / nProbes ───────────────────────────────── + int effectiveNLists = + (forMerge && config.cuVSIvfPqIndexParamsForceMergeNLists > 0) + ? config.cuVSIvfPqIndexParamsForceMergeNLists + : config.cuVSIvfPqIndexParamsNLists; - CuVSIvfPqSearchParams cisp = - new CuVSIvfPqSearchParams.Builder() - .withInternalDistanceDtype(config.cuVSIvfPqSearchParamsInternalDistanceDtype) - .withLutDtype(config.cuVSIvfPqSearchParamsLutDtype) - .withNProbes(config.cuVSIvfPqSearchParamsNProbes) - .withPreferredShmemCarveout(config.cuVSIvfPqSearchParamsPreferredShmemCarveout) - .build(); + int effectiveNProbes = + (forMerge && config.cuVSIvfPqSearchParamsForceMergeNProbes > 0) + ? config.cuVSIvfPqSearchParamsForceMergeNProbes + : config.cuVSIvfPqSearchParamsNProbes; - CuVSIvfPqParams cip = - new CuVSIvfPqParams.Builder() - .withCuVSIvfPqIndexParams(ciip) - .withCuVSIvfPqSearchParams(cisp) - .withRefinementRate(config.cuVSIvfPqParamsRefinementRate) - .build(); + if (forMerge) { + log.info( + "<<< Merge codec — nLists: {} (flush: {}), nProbes: {} (flush: {}) >>>", + effectiveNLists, + config.cuVSIvfPqIndexParamsNLists, + effectiveNProbes, + config.cuVSIvfPqSearchParamsNProbes); + } - AcceleratedHNSWParams params = - new AcceleratedHNSWParams.Builder() + CuVSIvfPqIndexParams ciip = + new CuVSIvfPqIndexParams.Builder() + .withAddDataOnBuild(config.cuVSIvfPqIndexParamsAddDataOnBuild) + .withCodebookKind(config.cuVSIvfPqIndexParamsCodebookKind) + .withConservativeMemoryAllocation( + config.cuVSIvfPqIndexParamsConservativeMemoryAllocation) + .withForceRandomRotation(config.cuVSIvfPqIndexParamsForceRandomRotation) + .withKmeansNIters(config.cuVSIvfPqIndexParamsKmeansNIters) + .withKmeansTrainsetFraction(config.cuVSIvfPqIndexParamsKmeansTrainsetFraction) + .withMaxTrainPointsPerPqCode(config.cuVSIvfPqIndexParamsMaxTrainPointsPerPqCode) + .withMetric(config.cuVSIvfPqIndexParamsMetric) + .withMetricArg(config.cuVSIvfPqIndexParamsMetricArg) + .withNLists(effectiveNLists) // ← flush or merge value + .withPqBits(config.cuVSIvfPqIndexParamsPqBits) + .withPqDim(config.cuVSIvfPqIndexParamsPqDim) + .build(); + + CuVSIvfPqSearchParams cisp = + new CuVSIvfPqSearchParams.Builder() + .withInternalDistanceDtype(config.cuVSIvfPqSearchParamsInternalDistanceDtype) + .withLutDtype(config.cuVSIvfPqSearchParamsLutDtype) + .withNProbes(effectiveNProbes) // ← flush or merge value + .withPreferredShmemCarveout(config.cuVSIvfPqSearchParamsPreferredShmemCarveout) + .build(); + + CuVSIvfPqParams cip = + new CuVSIvfPqParams.Builder() + .withCuVSIvfPqIndexParams(ciip) + .withCuVSIvfPqSearchParams(cisp) + .withRefinementRate(config.cuVSIvfPqParamsRefinementRate) + .build(); + + AcceleratedHNSWParams params = + new AcceleratedHNSWParams.Builder() + .withWriterThreads(config.cuvsWriterThreads) + .withIntermediateGraphDegree(config.cagraIntermediateGraphDegree) + .withGraphDegree(config.cagraGraphDegree) + .withHNSWLayer(config.cagraHnswLayers) + .withMaxConn(config.hnswMaxConn) + .withBeamWidth(config.hnswBeamWidth) + .withCagraGraphBuildAlgo(config.cagraGraphBuildAlgo) + .withCuVSIvfPqParams(cip) + .build(); + + if (config.algoToRun.equals(Codex.CAGRA_HNSW)) { + log.info("<<< Using Lucene101AcceleratedHNSWCodec >>>"); + return new Lucene101AcceleratedHNSWCodec(params); + } else if (config.algoToRun.equals(Codex.CAGRA_SEARCH)) { + log.info("<<< Using CuVS2510GPUSearchCodec >>>"); + GPUSearchParams gpuParams = + new GPUSearchParams.Builder() + .withCagraGraphBuildAlgo(config.cagraGraphBuildAlgo) .withWriterThreads(config.cuvsWriterThreads) .withIntermediateGraphDegree(config.cagraIntermediateGraphDegree) .withGraphDegree(config.cagraGraphDegree) - .withHNSWLayer(config.cagraHnswLayers) - .withMaxConn(config.hnswMaxConn) - .withBeamWidth(config.hnswBeamWidth) - .withCagraGraphBuildAlgo(config.cagraGraphBuildAlgo) - .withCuVSIvfPqParams(cip) .build(); - - if (config.algoToRun.equals(Codex.CAGRA_HNSW)) { - log.info("<<< Using Lucene101AcceleratedHNSWCodec >>>"); - return new Lucene101AcceleratedHNSWCodec(params); - } else if (config.algoToRun.equals(Codex.CAGRA_SEARCH)) { - log.info("<<< Using CuVS2510GPUSearchCodec >>>"); - GPUSearchParams gpuParams = - new GPUSearchParams.Builder() - .withCagraGraphBuildAlgo(config.cagraGraphBuildAlgo) - .withWriterThreads(config.cuvsWriterThreads) - .withIntermediateGraphDegree(config.cagraIntermediateGraphDegree) - .withGraphDegree(config.cagraGraphDegree) - .build(); - return new CuVS2510GPUSearchCodec(gpuParams); - } else if (config.algoToRun.equals(Codex.CAGRA_HNSW_BINARY)) { - log.info("<<< Using LuceneAcceleratedHNSWBinaryQuantizedCodec >>>"); - return new LuceneAcceleratedHNSWBinaryQuantizedCodec(params); - } else if (config.algoToRun.equals(Codex.CAGRA_HNSW_SCALAR)) { - log.info("<<< Using LuceneAcceleratedHNSWScalarQuantizedCodec >>>"); - return new LuceneAcceleratedHNSWScalarQuantizedCodec(params); - } + return new CuVS2510GPUSearchCodec(gpuParams); + } else if (config.algoToRun.equals(Codex.CAGRA_HNSW_BINARY)) { + log.info("<<< Using LuceneAcceleratedHNSWBinaryQuantizedCodec >>>"); + return new LuceneAcceleratedHNSWBinaryQuantizedCodec(params); + } else if (config.algoToRun.equals(Codex.CAGRA_HNSW_SCALAR)) { + log.info("<<< Using LuceneAcceleratedHNSWScalarQuantizedCodec >>>"); + return new LuceneAcceleratedHNSWScalarQuantizedCodec(params); } + return null; } + // ── Inner helpers ───────────────────────────────────────────────────────── + private static class HighDimensionKnnVectorsFormat extends KnnVectorsFormat { private final KnnVectorsFormat knnFormat; private final int maxDimensions; From a909c2805aefc2efd161cf67ab1696da62c240b7 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Wed, 29 Apr 2026 02:10:27 +0000 Subject: [PATCH 8/8] vb-15e: update pom.xml to point to 26.06.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 565c840..c213223 100644 --- a/pom.xml +++ b/pom.xml @@ -35,7 +35,7 @@ com.nvidia.cuvs.lucene cuvs-lucene - 26.04.0 + 26.06.0 com.nvidia.cuvs