Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 183 additions & 41 deletions benchmarks/beir-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,24 +156,78 @@ fn parse_args() -> Config {
// ---------------------------------------------------------------------------

fn load_npy_f32(path: &str) -> (Vec<f32>, usize, usize) {
let bytes = std::fs::read(path).unwrap_or_else(|e| panic!("read npy {path}: {e}"));
assert!(bytes.len() >= 10, "npy file too short: {path}");
assert_eq!(&bytes[..6], b"\x93NUMPY", "not a numpy file: {path}");
let major = bytes[6];
let minor = bytes[7];
load_npy_f32_rows(path, None)
}

fn read_npy_header(f: &mut std::fs::File, path: &str) -> (String, usize) {
use std::io::Read;

let mut pre = [0u8; 12];
f.read_exact(&mut pre[..10])
.unwrap_or_else(|e| panic!("read npy magic {path}: {e}"));
assert_eq!(&pre[..6], b"\x93NUMPY", "not a numpy file: {path}");
let major = pre[6];
let minor = pre[7];
assert!(
major == 1 || major == 2,
"unsupported npy version {major}.{minor}: {path}",
);
let (header_len, header_start) = if major == 1 {
let hl = u16::from_le_bytes([bytes[8], bytes[9]]) as usize;
(hl, 10)
let (header_len, data_start) = if major == 1 {
(u16::from_le_bytes([pre[8], pre[9]]) as usize, 10usize)
} else {
let hl = u32::from_le_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]) as usize;
(hl, 12)
f.read_exact(&mut pre[10..12])
.unwrap_or_else(|e| panic!("read npy header length {path}: {e}"));
(
u32::from_le_bytes([pre[8], pre[9], pre[10], pre[11]]) as usize,
12usize,
)
};
let header = std::str::from_utf8(&bytes[header_start..header_start + header_len])
.expect("npy header not utf-8");
let mut hb = vec![0u8; header_len];
f.read_exact(&mut hb)
.unwrap_or_else(|e| panic!("read npy header {path}: {e}"));
let header =
String::from_utf8(hb).unwrap_or_else(|e| panic!("npy header not utf-8 {path}: {e}"));
(header, data_start + header_len)
}

fn npy_shape(header: &str, path: &str) -> Vec<usize> {
let after = &header[header.find("'shape':").expect("no shape in npy header")..];
let open = after.find('(').unwrap();
let close = after.find(')').unwrap();
let dims: Vec<usize> = after[open + 1..close]
.split(',')
.filter_map(|s| s.trim().parse::<usize>().ok())
.collect();
assert!(!dims.is_empty(), "empty npy shape in {path}");
dims
}

fn npy_payload_bytes(n: usize, dim: usize, path: &str) -> usize {
n.checked_mul(dim)
.and_then(|floats| floats.checked_mul(std::mem::size_of::<f32>()))
.unwrap_or_else(|| panic!("npy payload too large: {path}"))
}

/// Read just the npy header and return the row count (dim 0). Cheap: no payload read.
fn npy_row_count(path: &str) -> usize {
let mut f = std::fs::File::open(path).unwrap_or_else(|e| panic!("open npy {path}: {e}"));
let (header, _) = read_npy_header(&mut f, path);
npy_shape(&header, path)
.into_iter()
.next()
.expect("no row count in npy shape")
}

/// Read a 2-D LE-f32 C-order npy. When `max_rows` is `Some(m)`, only the first
/// `m` rows of the payload are read off disk (so `--max-docs` subsampling does
/// NOT pull the whole 36 GB corpus into RAM). The payload is parsed in parallel
/// directly into the output `Vec<f32>` — no intermediate full `Vec<u8>` copy, so
/// peak memory is ~1× the kept data, not 2× the whole file.
fn load_npy_f32_rows(path: &str, max_rows: Option<usize>) -> (Vec<f32>, usize, usize) {
use std::io::Read;

let mut f = std::fs::File::open(path).unwrap_or_else(|e| panic!("open npy {path}: {e}"));
let (header, data_start) = read_npy_header(&mut f, path);
assert!(
header.contains("'descr': '<f4'"),
"expected <f4 dtype in {path}: {header}",
Expand All @@ -182,28 +236,36 @@ fn load_npy_f32(path: &str) -> (Vec<f32>, usize, usize) {
header.contains("'fortran_order': False"),
"expected C order in {path}",
);
let shape_start = header.find("'shape':").expect("no shape in npy header");
let after = &header[shape_start..];
let open = after.find('(').unwrap();
let close = after.find(')').unwrap();
let dims: Vec<usize> = after[open + 1..close]
.split(',')
.filter_map(|s| s.trim().parse::<usize>().ok())
.collect();
let dims = npy_shape(&header, path);
assert_eq!(dims.len(), 2, "expected 2-D array in {path}");
let n = dims[0];
let dim = dims[1];
let data_start = header_start + header_len;
let n_floats = n * dim;
assert_eq!(
bytes.len() - data_start,
n_floats * 4,
"data length mismatch in {path}",
);
let (n_full, dim) = (dims[0], dims[1]);
let n = max_rows.map_or(n_full, |m| m.min(n_full));
let full_payload_bytes = npy_payload_bytes(n_full, dim, path);
let expected_len = (data_start as u64)
.checked_add(full_payload_bytes as u64)
.unwrap_or_else(|| panic!("npy file too large: {path}"));
let actual_len = f
.metadata()
.unwrap_or_else(|e| panic!("stat npy {path}: {e}"))
.len();
assert_eq!(actual_len, expected_len, "data length mismatch in {path}");

let n_floats = n
.checked_mul(dim)
.unwrap_or_else(|| panic!("npy payload too large: {path}"));
let mut out = vec![0.0f32; n_floats];
for (i, chunk) in bytes[data_start..].chunks_exact(4).enumerate() {
out[i] = f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]);
}
let read_bytes = npy_payload_bytes(n, dim, path);
// SAFETY: `out` is fully initialized, `f32` has no invalid bit patterns, and
// the byte slice covers exactly the initialized backing storage.
let out_bytes =
unsafe { std::slice::from_raw_parts_mut(out.as_mut_ptr().cast::<u8>(), read_bytes) };
f.read_exact(out_bytes)
.unwrap_or_else(|e| panic!("read npy payload {path}: {e}"));

#[cfg(target_endian = "big")]
out.par_iter_mut()
.for_each(|v| *v = f32::from_bits(v.to_bits().swap_bytes()));

(out, n, dim)
}

Expand Down Expand Up @@ -712,10 +774,18 @@ fn main() {
let manifest_path = format!("{enc_dir}/embeddings.manifest.json");
let encoder_sha = sha256_file(&manifest_path);

let (corpus_full, n_corpus_full, dim) = load_npy_f32(&format!("{enc_dir}/corpus.f32.npy"));
// Load ONLY the first n_docs rows when sub-sampling (--max-docs), so the scan
// sweep never pulls the whole corpus off disk just to slice it. corpus_ids is
// truncated to match; the full row count comes from the npy header.
let n_corpus_full = npy_row_count(&format!("{enc_dir}/corpus.f32.npy"));
let n_docs = cfg.max_docs.unwrap_or(n_corpus_full).min(n_corpus_full);
let full_corpus = cfg.max_docs.is_none() || n_docs == n_corpus_full;
let (corpus_vec, n_loaded, dim) =
load_npy_f32_rows(&format!("{enc_dir}/corpus.f32.npy"), Some(n_docs));
assert_eq!(n_loaded, n_docs, "corpus load row mismatch");
let (queries, n_queries, q_dim) = load_npy_f32(&format!("{enc_dir}/queries.f32.npy"));
assert_eq!(q_dim, dim, "query dim {q_dim} != corpus dim {dim}");
validate_embeddings(&corpus_full, n_corpus_full, dim, "corpus");
validate_embeddings(&corpus_vec, n_docs, dim, "corpus");
validate_embeddings(&queries, n_queries, q_dim, "queries");

let corpus_ids_full = load_json_string_array(&format!("{enc_dir}/corpus_ids.json"));
Expand All @@ -727,10 +797,7 @@ fn main() {
);
assert_eq!(query_ids.len(), n_queries, "query_ids/embeddings mismatch");

// Sub-sample the corpus for the scaling sweep (latency-only; no nDCG).
let n_docs = cfg.max_docs.unwrap_or(n_corpus_full).min(n_corpus_full);
let full_corpus = cfg.max_docs.is_none() || n_docs == n_corpus_full;
let corpus = &corpus_full[..n_docs * dim];
let corpus = &corpus_vec[..n_docs * dim];
let corpus_ids = &corpus_ids_full[..n_docs];
let write_topk = full_corpus; // qrels-based nDCG only valid on the full corpus

Expand Down Expand Up @@ -986,9 +1053,8 @@ fn run_hnsw(
let build_seconds = t0.elapsed().as_secs_f64();
eprintln!(" build done in {build_seconds:.2}s");

// HNSW graph size is implementation-internal to hnsw_rs; the numeric field
// reports stored float-vector bytes only. Public docs label this as
// "4096 B + graph" so the side structure is not silently counted as zero.
// HNSW graph size is implementation-internal; report the stored-vector bytes
// (full float) as the index footprint, matching the dense baseline accounting.
let bytes_per_vector = dim * 4;
let index_total_mib = (n_docs * bytes_per_vector) as f64 / 1024.0 / 1024.0;
let warmup = 5.min(n_queries);
Expand Down Expand Up @@ -1596,3 +1662,79 @@ unsafe fn scan_agree_avx512(codes: &[u64], wpd: usize, d0: usize, qcode: &[u64],
*a = dim - ham;
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use std::path::PathBuf;

fn temp_npy_path(name: &str) -> PathBuf {
let mut path = std::env::temp_dir();
path.push(format!(
"ordvec-beir-bench-{name}-{}-{}.npy",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
path
}

fn write_v1_npy(path: &std::path::Path, rows: usize, dim: usize, values: &[f32]) {
assert_eq!(values.len(), rows * dim);
let mut header =
format!("{{'descr': '<f4', 'fortran_order': False, 'shape': ({rows}, {dim}), }}");
let padding = (16 - ((10 + header.len() + 1) % 16)) % 16;
header.extend(std::iter::repeat_n(' ', padding));
header.push('\n');

let mut bytes = Vec::new();
bytes.extend_from_slice(b"\x93NUMPY");
bytes.extend_from_slice(&[1, 0]);
bytes.extend_from_slice(&(header.len() as u16).to_le_bytes());
bytes.extend_from_slice(header.as_bytes());
for value in values {
bytes.extend_from_slice(&value.to_le_bytes());
}
std::fs::write(path, bytes).unwrap();
}

#[test]
fn npy_row_count_uses_shared_version_guard() {
let path = temp_npy_path("v3");
std::fs::write(&path, b"\x93NUMPY\x03\x00\x00\x00").unwrap();
let result = std::panic::catch_unwind(|| npy_row_count(path.to_str().unwrap()));
let _ = std::fs::remove_file(&path);
assert!(result.is_err());
}

#[test]
fn load_npy_f32_rows_reads_only_requested_prefix() {
let path = temp_npy_path("prefix");
write_v1_npy(&path, 3, 2, &[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]);

assert_eq!(npy_row_count(path.to_str().unwrap()), 3);
let (values, rows, dim) = load_npy_f32_rows(path.to_str().unwrap(), Some(2));

let _ = std::fs::remove_file(&path);
assert_eq!((rows, dim), (2, 2));
assert_eq!(values, vec![1.0, 2.0, 3.0, 4.0]);
}

#[test]
fn load_npy_f32_rows_rejects_trailing_payload_bytes() {
let path = temp_npy_path("trailing");
write_v1_npy(&path, 1, 1, &[1.0]);
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(&path)
.unwrap();
f.write_all(&[0]).unwrap();

let result = std::panic::catch_unwind(|| load_npy_f32_rows(path.to_str().unwrap(), None));
let _ = std::fs::remove_file(&path);
assert!(result.is_err());
}
}
Loading