From 1086229eb0edf99922d8646cfc333fa6f4ca7db8 Mon Sep 17 00:00:00 2001 From: Madhava Jay Date: Fri, 29 May 2026 21:35:06 +1000 Subject: [PATCH 1/3] making allele freq faster --- cli/Cargo.lock | 2 +- cli/Cargo.toml | 2 +- cli/src/commands/fast_allele_freq.rs | 319 ++++++++++++++++++++++++++ cli/src/commands/fst.rs | 172 ++++++++++++++ cli/src/commands/long_emit.rs | 319 ++++++++++++++++++++------ cli/src/commands/merge_allele_freq.rs | 197 ++++++++++++++++ cli/src/main.rs | 82 +++++++ scripts/bench_fast_allele_freq.sh | 57 +++++ 8 files changed, 1076 insertions(+), 74 deletions(-) create mode 100644 cli/src/commands/fast_allele_freq.rs create mode 100644 cli/src/commands/fst.rs create mode 100644 cli/src/commands/merge_allele_freq.rs create mode 100755 scripts/bench_fast_allele_freq.sh diff --git a/cli/Cargo.lock b/cli/Cargo.lock index 54de682..2f855d1 100644 --- a/cli/Cargo.lock +++ b/cli/Cargo.lock @@ -105,7 +105,7 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "biosynth" -version = "0.1.27" +version = "0.1.28" dependencies = [ "anyhow", "chrono", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 848fd9e..e87f65e 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "biosynth" -version = "0.1.27" +version = "0.1.28" edition = "2021" rust-version = "1.91" authors = ["Madhava Jay "] diff --git a/cli/src/commands/fast_allele_freq.rs b/cli/src/commands/fast_allele_freq.rs new file mode 100644 index 0000000..af8245c --- /dev/null +++ b/cli/src/commands/fast_allele_freq.rs @@ -0,0 +1,319 @@ +use std::collections::BTreeMap; +use std::fs::File; +use std::io::{BufWriter, Write}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::Instant; + +use anyhow::{bail, Context, Result}; +use rayon::prelude::*; +use rayon::ThreadPoolBuilder; + +use crate::commands::long_emit::{ + collect_merged_long_rows, default_participant, summarize_warning_counts, MissingRefLogger, + ReferenceResolver, SharedReference, +}; +use crate::download::ensure_reference_db; +use crate::long_rows::LongRow; +use crate::stats::StatsStore; +use crate::FastAlleleFreqArgs; + +/// Per-locus accumulator. Mirrors the running counters in +/// `long_aggregate::merge_chunks` exactly so the emitted TSV is byte-for-byte +/// identical to `emit-long -> aggregate-long --threads 1`. +struct Accum { + rsid: String, + /// Rank (participant sort order) of the participant whose non-empty rsid is + /// currently kept. `u32::MAX` until a non-empty rsid is seen. The smallest + /// rank wins, reproducing aggregate's "first non-empty rsid by participant". + rsid_rank: u32, + allele_count: i64, + n_obs: i64, + num_homo: i64, + num_hetero: i64, +} + +type LociMap = BTreeMap; +type Counts = BTreeMap; + +pub fn run_fast_allele_freq(args: FastAlleleFreqArgs) -> Result<()> { + let overall_start = Instant::now(); + + let sqlite_path = ensure_reference_db(Some(&args.sqlite), args.force_download)?; + let store = StatsStore::connect_read_only(&sqlite_path)?; + // Preload the whole reference into memory once. Lets worker threads resolve + // rsid/position with lock-free reads instead of contending on SQLite. + let preload_start = Instant::now(); + let shared = Arc::new(SharedReference::load(&store)?); + eprintln!( + "📚 fast-allele-freq: reference preloaded in {:.2}s", + preload_start.elapsed().as_secs_f64() + ); + + let files = collect_input_files(&args.inputs)?; + if files.is_empty() { + bail!("No input genotype files found under the provided --input paths"); + } + // Sort by participant id. aggregate-long --threads 1 merges rows sorted by + // (locus_key, participant_id), so the rsid kept per locus is from the + // alphabetically-first participant with a non-empty rsid. The rank we assign + // here = that participant order, and the rank-min rule in `fold_row` + // reproduces the choice regardless of parallel parse order. + let mut tasks: Vec<(String, PathBuf)> = files + .into_iter() + .map(|path| (default_participant(&path), path)) + .collect(); + tasks.sort_by(|a, b| a.0.cmp(&b.0)); + let tasks: Vec<(u32, String, PathBuf)> = tasks + .into_iter() + .enumerate() + .map(|(rank, (pid, path))| (rank as u32, pid, path)) + .collect(); + + // Per-row TSV log can't be written safely from many threads -> single thread. + let force_single = args.missing_ref_log.is_some(); + let threads = if force_single { 1 } else { resolve_threads(args.threads) }; + eprintln!( + "▶️ fast-allele-freq: {} input file(s), threads={}", + tasks.len(), + threads + ); + + let (loci, counts) = if threads <= 1 { + run_sequential(&tasks, &shared, &args)? + } else { + let pool = ThreadPoolBuilder::new() + .num_threads(threads) + .build() + .context("build fast-allele-freq thread pool")?; + pool.install(|| run_parallel(&tasks, &shared))? + }; + + write_allele_freq(&args.allele_freq_tsv, &loci)?; + + if args.warn_detail != crate::WarnDetail::None { + let summary = summarize_warning_counts(&counts); + if !summary.is_empty() { + eprintln!("⚠️ fast-allele-freq warnings: {summary}"); + } + } + eprintln!( + "✅ fast-allele-freq: {} files, {} loci, {:.2}s", + tasks.len(), + loci.len(), + overall_start.elapsed().as_secs_f64() + ); + println!( + "✅ Wrote allele frequencies to {}", + args.allele_freq_tsv.display() + ); + Ok(()) +} + +fn run_sequential( + tasks: &[(u32, String, PathBuf)], + shared: &Arc, + args: &FastAlleleFreqArgs, +) -> Result<(LociMap, Counts)> { + let mut resolver = ReferenceResolver::shared(shared.clone()); + let mut logger = MissingRefLogger::new(args.missing_ref_log.as_deref(), args.warn_detail)?; + let mut loci: LociMap = BTreeMap::new(); + for (idx, (rank, participant, path)) in tasks.iter().enumerate() { + match collect_merged_long_rows(path, participant, &mut resolver, &mut logger) { + Ok((rows, _stats)) => { + for row in rows { + fold_row(&mut loci, row, *rank); + } + } + Err(err) => eprintln!("⚠️ skipping {}: {err:#}", path.display()), + } + if (idx + 1) % 100 == 0 { + eprintln!( + "🧮 fast-allele-freq: {} files, {} loci so far", + idx + 1, + loci.len() + ); + } + } + Ok((loci, logger.counts().clone())) +} + +fn run_parallel( + tasks: &[(u32, String, PathBuf)], + shared: &Arc, +) -> Result<(LociMap, Counts)> { + // Each thread folds its share of files into a local map + counts, then maps + // are merged. Counts are order-independent; rsid uses the rank-min rule so the + // merge is deterministic and matches the single-threaded result. + let result = tasks + .par_iter() + .fold( + || ThreadState::new(shared.clone()), + |mut st, (rank, participant, path)| { + match collect_merged_long_rows(path, participant, &mut st.resolver, &mut st.logger) + { + Ok((rows, _stats)) => { + for row in rows { + fold_row(&mut st.loci, row, *rank); + } + } + Err(err) => eprintln!("⚠️ skipping {}: {err:#}", path.display()), + } + st + }, + ) + .map(|st| (st.loci, st.logger.counts().clone())) + .reduce( + || (BTreeMap::new(), BTreeMap::new()), + |(mut m1, mut c1), (m2, c2)| { + merge_loci(&mut m1, m2); + merge_counts(&mut c1, c2); + (m1, c1) + }, + ); + Ok(result) +} + +struct ThreadState { + resolver: ReferenceResolver, + logger: MissingRefLogger, + loci: LociMap, +} + +impl ThreadState { + fn new(shared: Arc) -> Self { + // Lock-free in-memory resolver per thread (cheap Arc clone). No per-row + // TSV in parallel mode (counts only); WarnDetail::None keeps worker + // threads from interleaving per-row stderr. Construction cannot fail. + Self { + resolver: ReferenceResolver::shared(shared), + logger: MissingRefLogger::new(None, crate::WarnDetail::None) + .expect("count-only logger has no file handle and cannot fail"), + loci: BTreeMap::new(), + } + } +} + +fn fold_row(loci: &mut LociMap, row: LongRow, rank: u32) { + let entry = loci.entry(row.locus_key).or_insert_with(|| Accum { + rsid: String::new(), + rsid_rank: u32::MAX, + allele_count: 0, + n_obs: 0, + num_homo: 0, + num_hetero: 0, + }); + if !row.rsid.is_empty() && rank < entry.rsid_rank { + entry.rsid = row.rsid; + entry.rsid_rank = rank; + } + if row.dosage != -1 { + entry.allele_count += row.dosage as i64; + entry.n_obs += 1; + if row.dosage == 2 { + entry.num_homo += 1; + } else if row.dosage == 1 { + entry.num_hetero += 1; + } + } +} + +fn merge_loci(into: &mut LociMap, from: LociMap) { + for (locus, b) in from { + match into.get_mut(&locus) { + Some(a) => { + a.allele_count += b.allele_count; + a.n_obs += b.n_obs; + a.num_homo += b.num_homo; + a.num_hetero += b.num_hetero; + if b.rsid_rank < a.rsid_rank { + a.rsid = b.rsid; + a.rsid_rank = b.rsid_rank; + } + } + None => { + into.insert(locus, b); + } + } + } +} + +fn merge_counts(into: &mut Counts, from: Counts) { + for (code, n) in from { + *into.entry(code).or_insert(0) += n; + } +} + +fn resolve_threads(requested: usize) -> usize { + if requested > 0 { + return requested; + } + std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + .max(1) +} + +fn write_allele_freq(path: &Path, loci: &LociMap) -> Result<()> { + if let Some(parent) = path.parent() { + if !parent.as_os_str().is_empty() { + std::fs::create_dir_all(parent) + .with_context(|| format!("Create directory {:?}", parent))?; + } + } + let mut writer = + BufWriter::new(File::create(path).with_context(|| format!("Create {:?}", path))?); + writeln!( + writer, + "locus_key\tallele_count\tallele_number\tnum_homo\tnum_hetero\tallele_freq\trsid" + )?; + for (locus, acc) in loci { + let allele_number = 2 * acc.n_obs; + let allele_freq = if allele_number > 0 { + acc.allele_count as f64 / allele_number as f64 + } else { + 0.0 + }; + writeln!( + writer, + "{locus}\t{}\t{allele_number}\t{}\t{}\t{allele_freq:.6}\t{}", + acc.allele_count, acc.num_homo, acc.num_hetero, acc.rsid + )?; + } + writer.flush()?; + Ok(()) +} + +fn collect_input_files(inputs: &[PathBuf]) -> Result> { + let mut files = Vec::new(); + for input in inputs { + if input.is_file() { + files.push(input.clone()); + } else if input.is_dir() { + // follow_links: flow 04 stages genotype files as symlinks; without + // this they'd be reported as symlink (not file) and skipped. + for entry in walkdir::WalkDir::new(input) + .follow_links(true) + .into_iter() + .filter_map(|e| e.ok()) + { + if entry.file_type().is_file() { + let path = entry.path(); + let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); + if name.starts_with('.') { + continue; + } + if path.extension().and_then(|e| e.to_str()) == Some("bvlr") { + continue; + } + files.push(path.to_path_buf()); + } + } + } else { + eprintln!("⚠️ Input path {:?} is not a file or directory", input); + } + } + files.sort(); + files.dedup(); + Ok(files) +} diff --git a/cli/src/commands/fst.rs b/cli/src/commands/fst.rs new file mode 100644 index 0000000..a83e7a8 --- /dev/null +++ b/cli/src/commands/fst.rs @@ -0,0 +1,172 @@ +use std::collections::HashMap; +use std::fs::File; +use std::io::{BufRead, BufReader, BufWriter, Write}; +use std::path::Path; + +use anyhow::{bail, Context, Result}; + +use crate::FstArgs; + +/// Pairwise Weir & Cockerham 1984 FST (ratio-of-averages), reproducing +/// `04_population_level/fst_islands/scripts/02_compute_fst.py`. Reads the merged +/// allele-frequency and allele-number matrices and writes a population x +/// population matrix, `%.6f`, matching the Python output byte-for-byte. +pub fn run_fst(args: FstArgs) -> Result<()> { + let (pops, freq) = read_matrix(&args.merged_freq)?; + let (pops_n, number) = read_matrix(&args.merged_number)?; + if pops != pops_n { + bail!( + "population columns differ between freq ({:?}) and number ({:?})", + pops, + pops_n + ); + } + let p = pops.len(); + if p < 2 { + bail!("need at least 2 populations, got {}", p); + } + + // Align rows by locus_key (pandas join semantics); iterate loci present in both. + let mut fst = vec![vec![0.0f64; p]; p]; + for i in 0..p { + for j in (i + 1)..p { + let (mut a_sum, mut abc_sum) = (0.0f64, 0.0f64); + for (locus, fr) in &freq { + let Some(nr) = number.get(locus) else { + continue; + }; + let (p1, n1, p2, n2) = (fr[i], nr[i], fr[j], nr[j]); + if let Some((a, b, c)) = wc84(p1, n1, p2, n2) { + a_sum += a; + abc_sum += a + b + c; + } + } + let value = if abc_sum != 0.0 { + a_sum / abc_sum + } else { + f64::NAN + }; + fst[i][j] = value; + fst[j][i] = value; + } + } + + write_matrix(&args.output, &pops, &fst)?; + eprintln!( + "✅ fst: {} populations, {} loci -> {}", + p, + freq.len(), + args.output.display() + ); + println!("✅ Wrote FST matrix to {}", args.output.display()); + Ok(()) +} + +/// WC84 per-SNP (a, b, c) components for a pair. Returns None when any component +/// is non-finite (skipped from the genome-wide ratio, matching the numpy mask). +fn wc84(p1: f64, n1: f64, p2: f64, n2: f64) -> Option<(f64, f64, f64)> { + let r = 2.0; + let n_total = n1 + n2; + if n_total == 0.0 { + return None; + } + let n_bar = n_total / r; + let p_bar = (n1 * p1 + n2 * p2) / n_total; + let nc = (n_total - (n1 * n1 + n2 * n2) / n_total) / (r - 1.0); + let s2 = (n1 * (p1 - p_bar).powi(2) + n2 * (p2 - p_bar).powi(2)) / ((r - 1.0) * n_bar); + let h1 = if n1 > 1.0 { + 2.0 * n1 * p1 * (1.0 - p1) / (2.0 * n1 - 1.0) + } else { + f64::NAN + }; + let h2 = if n2 > 1.0 { + 2.0 * n2 * p2 * (1.0 - p2) / (2.0 * n2 - 1.0) + } else { + f64::NAN + }; + let h_bar = (h1 + h2) / r; + let inner = p_bar * (1.0 - p_bar) - ((r - 1.0) / r) * s2 - h_bar / 4.0; + let a = (n_bar / nc) * (s2 - inner / (n_bar - 1.0)); + let b = (n_bar / (n_bar - 1.0)) + * (p_bar * (1.0 - p_bar) - ((r - 1.0) / r) * s2 - (2.0 * n_bar - 1.0) * h_bar / (4.0 * n_bar)); + let c = h_bar / 2.0; + if a.is_finite() && b.is_finite() && c.is_finite() { + Some((a, b, c)) + } else { + None + } +} + +/// Read a `locus_keypop1...` matrix. Returns (population names, map +/// locus_key -> per-population values). Empty / `nan` cells parse to NaN. +fn read_matrix(path: &Path) -> Result<(Vec, HashMap>)> { + let file = File::open(path).with_context(|| format!("Open {:?}", path))?; + let mut reader = BufReader::new(file); + let mut header = String::new(); + if reader.read_line(&mut header)? == 0 { + bail!("{:?} is empty", path); + } + let pops: Vec = header + .trim_end() + .split('\t') + .skip(1) + .map(|s| s.to_string()) + .collect(); + let mut rows: HashMap> = HashMap::new(); + let mut line = String::new(); + loop { + line.clear(); + if reader.read_line(&mut line)? == 0 { + break; + } + let trimmed = line.trim_end_matches(['\n', '\r']); + if trimmed.is_empty() { + continue; + } + let mut fields = trimmed.split('\t'); + let locus = match fields.next() { + Some(l) => l.to_string(), + None => continue, + }; + let values: Vec = fields + .map(|v| { + let v = v.trim(); + if v.is_empty() || v.eq_ignore_ascii_case("nan") { + f64::NAN + } else { + v.parse::().unwrap_or(f64::NAN) + } + }) + .collect(); + if values.len() == pops.len() { + rows.insert(locus, values); + } + } + Ok((pops, rows)) +} + +fn write_matrix(path: &Path, pops: &[String], fst: &[Vec]) -> Result<()> { + if let Some(parent) = path.parent() { + if !parent.as_os_str().is_empty() { + std::fs::create_dir_all(parent) + .with_context(|| format!("Create directory {:?}", parent))?; + } + } + let mut writer = + BufWriter::new(File::create(path).with_context(|| format!("Create {:?}", path))?); + // Header: empty first cell (unnamed index) then population names. + writeln!(writer, "\t{}", pops.join("\t"))?; + for (i, pop) in pops.iter().enumerate() { + write!(writer, "{pop}")?; + for value in &fst[i] { + if value.is_nan() { + write!(writer, "\t")?; + } else { + write!(writer, "\t{value:.6}")?; + } + } + writeln!(writer)?; + } + writer.flush()?; + Ok(()) +} diff --git a/cli/src/commands/long_emit.rs b/cli/src/commands/long_emit.rs index 85f98c7..ed71112 100644 --- a/cli/src/commands/long_emit.rs +++ b/cli/src/commands/long_emit.rs @@ -2,6 +2,7 @@ use std::collections::{BTreeSet, HashMap}; use std::fs::File; use std::io::{BufRead, BufReader, BufWriter, Read, Write}; use std::path::{Path, PathBuf}; +use std::sync::Arc; use std::time::Instant; use anyhow::{bail, Context, Result}; @@ -14,7 +15,7 @@ use crate::long_rows::{ }; use crate::rsid_cache::normalize_rsid; use crate::stats::{ReferenceVariant, StatsStore}; -use crate::EmitLongArgs; +use crate::{EmitLongArgs, WarnDetail}; use rusqlite::OptionalExtension; const LOOKAHEAD_LINES: usize = 2048; @@ -65,7 +66,8 @@ pub fn run_long_emit(args: EmitLongArgs) -> Result<()> { .participant .clone() .unwrap_or_else(|| default_participant(input)); - let mut missing_logger = MissingRefLogger::new(args.missing_ref_log.as_deref())?; + let mut missing_logger = + MissingRefLogger::new(args.missing_ref_log.as_deref(), args.warn_detail)?; let stats = match emit_from_genotype( input, &output_path, @@ -89,6 +91,12 @@ pub fn run_long_emit(args: EmitLongArgs) -> Result<()> { "✅ emit-long (genotype): {} parsed, {} rows, {} missing-ref, {} missing-gt", stats.rows_parsed, stats.rows_emitted, stats.missing_reference, stats.missing_gt ); + if args.warn_detail == WarnDetail::Summary { + let summary = summarize_warning_counts(missing_logger.counts()); + if !summary.is_empty() { + eprintln!("⚠️ emit-long warnings [{}]: {}", input.display(), summary); + } + } eprintln!( "⏱️ emit-long: {} took {:.2}s", input.display(), @@ -110,12 +118,12 @@ pub fn run_long_emit(args: EmitLongArgs) -> Result<()> { Ok(()) } -struct EmitStats { +pub(crate) struct EmitStats { variants_seen: u64, rows_emitted: u64, missing_gt: u64, - rows_parsed: u64, - missing_reference: u64, + pub(crate) rows_parsed: u64, + pub(crate) missing_reference: u64, } fn emit_from_vcf( @@ -223,6 +231,25 @@ fn emit_from_genotype( resolver: &mut ReferenceResolver, missing_logger: &mut MissingRefLogger, ) -> Result { + let (pending_rows, mut stats) = + parse_genotype_file_to_pending(input, participant, resolver, missing_logger)?; + let mut writer = LongRowWriter::new(BufWriter::new( + File::create(output).with_context(|| format!("Create {:?}", output))?, + )); + write_merged_long_rows(&mut writer, pending_rows, &mut stats)?; + writer.flush()?; + Ok(stats) +} + +/// Parse a genotype file into per-(locus,rsid,participant) pending rows, reusing +/// the exact emit-long parse/resolve/dosage path. Shared by the on-disk `.bvlr` +/// emitter and the in-memory fused aggregator. +fn parse_genotype_file_to_pending( + input: &Path, + participant: &str, + resolver: &mut ReferenceResolver, + missing_logger: &mut MissingRefLogger, +) -> Result<(Vec, EmitStats)> { let input_file = File::open(input).with_context(|| format!("Open {:?}", input))?; let mut reader = BufReader::new(input_file); let mut buffered_lines: Vec = Vec::new(); @@ -279,12 +306,22 @@ fn emit_from_genotype( consume_genotype_line(&mut ctx, &buffer, line_number)?; } - let mut writer = LongRowWriter::new(BufWriter::new( - File::create(output).with_context(|| format!("Create {:?}", output))?, - )); - write_merged_long_rows(&mut writer, pending_rows, &mut stats)?; - writer.flush()?; - Ok(stats) + Ok((pending_rows, stats)) +} + +/// Parse a genotype file and return the same merged `LongRow`s that would be +/// written to a `.bvlr`, but in memory. Lets callers aggregate without the +/// intermediate file. Output is identical to the rows in the `.bvlr`. +pub(crate) fn collect_merged_long_rows( + input: &Path, + participant: &str, + resolver: &mut ReferenceResolver, + missing_logger: &mut MissingRefLogger, +) -> Result<(Vec, EmitStats)> { + let (pending_rows, mut stats) = + parse_genotype_file_to_pending(input, participant, resolver, missing_logger)?; + let rows = merge_pending_rows(pending_rows, &mut stats); + Ok((rows, stats)) } struct GenotypeEmitContext<'a> { @@ -311,13 +348,14 @@ fn consume_genotype_line( let outcome = match ctx.parser.consume_line(line) { Ok(outcome) => outcome, Err(err) => { - ctx.missing_logger - .log_issue(ctx.input, line_number, "parse_error", "", line)?; - eprintln!( - "WARNING: {}:{}: parse_error: {err:#}", - ctx.input.display(), - line_number - ); + ctx.missing_logger.log_issue_detail( + ctx.input, + line_number, + "parse_error", + "", + line, + Some(&format!("{err:#}")), + )?; return Ok(()); } }; @@ -332,19 +370,14 @@ fn consume_genotype_line( match ctx.resolver.resolve_rsid(&rsid_label) { Ok(reference) => reference, Err(err) => { - ctx.missing_logger.log_issue( + ctx.missing_logger.log_issue_detail( ctx.input, line_number, "rsid_resolve_error", &rsid_label, line, + Some(&format!("{err:#}")), )?; - eprintln!( - "WARNING: {}:{}: rsid_resolve_error {}: {err:#}", - ctx.input.display(), - line_number, - rsid_label - ); None } } @@ -359,20 +392,14 @@ fn consume_genotype_line( } Ok(None) => {} Err(err) => { - ctx.missing_logger.log_issue( + ctx.missing_logger.log_issue_detail( ctx.input, line_number, "position_resolve_error", &format!("{}:{}", row.chrom, row.pos), line, + Some(&format!("{err:#}")), )?; - eprintln!( - "WARNING: {}:{}: position_resolve_error {}:{}: {err:#}", - ctx.input.display(), - line_number, - row.chrom, - row.pos - ); } } } @@ -457,6 +484,16 @@ fn write_merged_long_rows( pending_rows: Vec, stats: &mut EmitStats, ) -> Result<()> { + for row in merge_pending_rows(pending_rows, stats) { + writer.write_row(&row)?; + } + Ok(()) +} + +/// Collapse pending rows by (locus_key, rsid, participant), preserving first-seen +/// order, applying duplicate-probe selection. Increments `rows_emitted` per merged +/// row so on-disk and in-memory paths report identical stats. +fn merge_pending_rows(pending_rows: Vec, stats: &mut EmitStats) -> Vec { let mut order: Vec = Vec::new(); let mut groups: HashMap> = HashMap::new(); for pending in pending_rows { @@ -466,13 +503,13 @@ fn write_merged_long_rows( groups.entry(pending.key).or_default().push(pending.row); } + let mut out = Vec::with_capacity(order.len()); for key in order { let members = groups.remove(&key).expect("group present"); - let selected = select_duplicate_probe_row(&members); - writer.write_row(&selected)?; + out.push(select_duplicate_probe_row(&members)); stats.rows_emitted += 1; } - Ok(()) + out } fn select_duplicate_probe_row(rows: &[LongRow]) -> LongRow { @@ -497,12 +534,14 @@ fn clean_rsid_label(value: &str) -> String { normalize_rsid(value) } -struct MissingRefLogger { +pub(crate) struct MissingRefLogger { writer: Option>, + detail: WarnDetail, + counts: std::collections::BTreeMap, } impl MissingRefLogger { - fn new(path: Option<&Path>) -> Result { + pub(crate) fn new(path: Option<&Path>, detail: WarnDetail) -> Result { let writer = if let Some(path) = path { if let Some(parent) = path.parent() { if !parent.as_os_str().is_empty() { @@ -519,7 +558,16 @@ impl MissingRefLogger { } else { None }; - Ok(Self { writer }) + Ok(Self { + writer, + detail, + counts: std::collections::BTreeMap::new(), + }) + } + + /// Coalesced warning counts keyed by code (for end-of-file/run summaries). + pub(crate) fn counts(&self) -> &std::collections::BTreeMap { + &self.counts } fn log_issue( @@ -529,6 +577,22 @@ impl MissingRefLogger { code: &str, label: &str, raw_line: &str, + ) -> Result<()> { + self.log_issue_detail(input, line_number, code, label, raw_line, None) + } + + /// Record a tolerated row-level issue. The full per-row TSV (when + /// `--missing-ref-log` is set) is unchanged; human-facing stderr is now + /// coalesced: only `WarnDetail::Full` prints per row. Counts are always kept + /// so callers can print a one-line summary per file/run. + fn log_issue_detail( + &mut self, + input: &Path, + line_number: u64, + code: &str, + label: &str, + raw_line: &str, + detail_msg: Option<&str>, ) -> Result<()> { let trimmed = raw_line.trim_end(); if let Some(writer) = self.writer.as_mut() { @@ -541,22 +605,42 @@ impl MissingRefLogger { code, trimmed )?; - } else if !matches!( - code, - "parse_error" | "rsid_resolve_error" | "position_resolve_error" - ) { - eprintln!( - "WARNING: {}:{}: {} {}", - input.display(), - line_number, - code, - label - ); + } + *self.counts.entry(code.to_string()).or_insert(0) += 1; + if self.detail == WarnDetail::Full { + match detail_msg { + Some(msg) => eprintln!( + "WARNING: {}:{}: {} {} {}", + input.display(), + line_number, + code, + label, + msg + ), + None => eprintln!( + "WARNING: {}:{}: {} {}", + input.display(), + line_number, + code, + label + ), + } } Ok(()) } } +/// Render coalesced warning counts as `code=count, code=count` (sorted by code). +pub(crate) fn summarize_warning_counts( + counts: &std::collections::BTreeMap, +) -> String { + counts + .iter() + .map(|(code, n)| format!("{code}={n}")) + .collect::>() + .join(", ") +} + fn resolve_output_path(input: &Path, output: Option<&PathBuf>) -> Result { if let Some(out) = output { return Ok(out.clone()); @@ -574,7 +658,7 @@ fn resolve_output_path(input: &Path, output: Option<&PathBuf>) -> Result String { +pub(crate) fn default_participant(input: &Path) -> String { input .file_stem() .and_then(|stem| stem.to_str()) @@ -587,14 +671,103 @@ fn is_gz(path: &Path) -> bool { path.extension().and_then(|ext| ext.to_str()) == Some("gz") } -struct ReferenceResolver { +/// Reference resolver with two backends that return identical results: +/// - `Sqlite`: per-connection, lazily cached. Used by `emit-long` (one file at a +/// time) where a single connection is fine. +/// - `Shared`: the whole reference preloaded into in-memory maps, shared via +/// `Arc` across threads. Lock-free reads, so parallel parsing actually scales +/// (the SQLite backend serializes/contends under many threads). +pub(crate) enum ReferenceResolver { + Sqlite(SqliteResolver), + Shared(Arc), +} + +impl ReferenceResolver { + pub(crate) fn new(store: &StatsStore) -> Result { + Ok(ReferenceResolver::Sqlite(SqliteResolver::new(store)?)) + } + + pub(crate) fn shared(reference: Arc) -> Self { + ReferenceResolver::Shared(reference) + } + + fn resolve_rsid(&mut self, rsid: &str) -> Result> { + match self { + ReferenceResolver::Sqlite(r) => r.resolve_rsid(rsid), + ReferenceResolver::Shared(r) => Ok(r.resolve_rsid(rsid)), + } + } + + fn resolve_position(&mut self, chrom: &str, pos: i64) -> Result> { + match self { + ReferenceResolver::Sqlite(r) => r.resolve_position(chrom, pos), + ReferenceResolver::Shared(r) => Ok(r.resolve_position(chrom, pos)), + } + } +} + +/// Reference preloaded into memory once, then shared read-only across threads. +/// Built from the same tables the SQLite resolver queries, with the same +/// precedence (user overrides win) and the same position-ambiguity rule. +pub(crate) struct SharedReference { + by_rsid: HashMap, + by_pos: HashMap<(String, i64), Option>, +} + +impl SharedReference { + pub(crate) fn load(store: &StatsStore) -> Result { + let mut by_rsid: HashMap = HashMap::new(); + // all_references_with_overrides() = user rows UNION base rows not in user, + // matching SqliteResolver's user-then-base lookup order. + for variant in store.all_references_with_overrides()? { + by_rsid.insert(variant.rsid, variant); + } + let mut by_pos: HashMap<(String, i64), Option> = HashMap::new(); + for variant in store.all_non_rsids()? { + let key = (normalize_chrom(&variant.chromosome), variant.position); + match by_pos.get(&key) { + Some(None) => {} // already ambiguous + Some(Some(existing)) => { + if existing.rsid != variant.rsid + || existing.reference != variant.reference + || existing.alternates != variant.alternates + { + by_pos.insert(key, None); + } + } + None => { + by_pos.insert(key, Some(variant)); + } + } + } + Ok(Self { by_rsid, by_pos }) + } + + fn resolve_rsid(&self, rsid: &str) -> Option { + let rsid_norm = normalize_rsid(rsid); + let rsid_int = rsid_norm.trim_start_matches("rs").parse::().ok()?; + self.by_rsid.get(&rsid_int).cloned() + } + + fn resolve_position(&self, chrom: &str, pos: i64) -> Option { + if pos <= 0 { + return None; + } + self.by_pos + .get(&(normalize_chrom(chrom), pos)) + .cloned() + .flatten() + } +} + +pub(crate) struct SqliteResolver { conn: rusqlite::Connection, rsid_cache: HashMap>, position_cache: HashMap<(String, i64), Option>, } -impl ReferenceResolver { - fn new(store: &StatsStore) -> Result { +impl SqliteResolver { + pub(crate) fn new(store: &StatsStore) -> Result { let conn = store.open_connection()?; Ok(Self { conn, @@ -761,30 +934,32 @@ mod tests { ('AMBIG_2', 125, '3', 300, 'A', 'T', 'test');", ) .unwrap(); - ReferenceResolver { + ReferenceResolver::Sqlite(SqliteResolver { conn, rsid_cache: HashMap::new(), position_cache: HashMap::new(), - } + }) } fn full_test_resolver() -> ReferenceResolver { let resolver = test_resolver(); - resolver - .conn - .execute_batch( - "INSERT INTO rsid_reference - (rsid, chromosome, position, reference, alternates) - VALUES - (1, '1', 100, 'A', 'G'), - (2, '1', 200, 'C', 'T'), - (3, '2', 300, 'A', 'C'); - INSERT INTO grch38_non_rsids - (snp_name, rsid, chromosome, position, reference, alternates, source) - VALUES - ('NONRS_CNV_A', 3, '2', 300, 'A', 'C', 'test');", - ) - .unwrap(); + if let ReferenceResolver::Sqlite(inner) = &resolver { + inner + .conn + .execute_batch( + "INSERT INTO rsid_reference + (rsid, chromosome, position, reference, alternates) + VALUES + (1, '1', 100, 'A', 'G'), + (2, '1', 200, 'C', 'T'), + (3, '2', 300, 'A', 'C'); + INSERT INTO grch38_non_rsids + (snp_name, rsid, chromosome, position, reference, alternates, source) + VALUES + ('NONRS_CNV_A', 3, '2', 300, 'A', 'C', 'test');", + ) + .unwrap(); + } resolver } @@ -809,7 +984,7 @@ mod tests { participant: &str, warning_log: &Path, ) { - let mut logger = MissingRefLogger::new(Some(warning_log)).unwrap(); + let mut logger = MissingRefLogger::new(Some(warning_log), WarnDetail::Full).unwrap(); let stats = emit_from_genotype(input, output, participant, resolver, &mut logger).unwrap(); assert!( stats.rows_emitted > 0, diff --git a/cli/src/commands/merge_allele_freq.rs b/cli/src/commands/merge_allele_freq.rs new file mode 100644 index 0000000..5e6175e --- /dev/null +++ b/cli/src/commands/merge_allele_freq.rs @@ -0,0 +1,197 @@ +use std::collections::HashMap; +use std::fs::File; +use std::io::{BufRead, BufReader, BufWriter, Write}; +use std::path::{Path, PathBuf}; + +use anyhow::{bail, Context, Result}; + +use crate::MergeAlleleFreqArgs; + +/// Inner-join per-population allele-frequency TSVs on `locus_key`, reproducing +/// `04_population_level/fst_islands/scripts/01_load_merge.py`: +/// - `allele_number == 0` -> freq/number become missing (dropped), +/// - keep only loci present and non-missing in EVERY population, +/// - row order follows the first population's file order (pandas left-join), +/// - rsid is carried from the first population. +/// +/// Output is content-equivalent to the Python merge and is consumed directly by +/// `bvs fst` (verified end-to-end). Float text may differ from pandas' repr; the +/// values are identical. +pub fn run_merge_allele_freq(args: MergeAlleleFreqArgs) -> Result<()> { + if args.populations.len() < 2 { + bail!("provide at least 2 --population LABEL=PATH entries"); + } + let pops = parse_populations(&args.populations)?; + + // First population defines row order + rsid. + let (order, mut rsid, mut freq_cols, mut num_cols) = { + let rows = read_country(&pops[0].1)?; + let mut order = Vec::with_capacity(rows.len()); + let mut rsid: HashMap = HashMap::new(); + let mut f0: HashMap = HashMap::new(); + let mut n0: HashMap = HashMap::new(); + for r in rows { + if !f0.contains_key(&r.locus) { + order.push(r.locus.clone()); + } + rsid.insert(r.locus.clone(), r.rsid); + f0.insert(r.locus.clone(), r.freq); + n0.insert(r.locus.clone(), r.number); + } + (order, rsid, vec![f0], vec![n0]) + }; + + // Remaining populations as lookup maps. + for (_, path) in &pops[1..] { + let rows = read_country(path)?; + let mut f: HashMap = HashMap::new(); + let mut n: HashMap = HashMap::new(); + for r in rows { + f.insert(r.locus.clone(), r.freq); + n.insert(r.locus, r.number); + } + freq_cols.push(f); + num_cols.push(n); + } + + let labels: Vec<&str> = pops.iter().map(|(l, _)| l.as_str()).collect(); + let mut out_freq = create(&args.merged_freq)?; + let mut out_number = create(&args.merged_number)?; + let mut out_annotated = match &args.merged_annotated { + Some(p) => Some(create(p)?), + None => None, + }; + + writeln!(out_freq, "locus_key\t{}", labels.join("\t"))?; + writeln!(out_number, "locus_key\t{}", labels.join("\t"))?; + if let Some(w) = out_annotated.as_mut() { + writeln!(w, "locus_key\trsid\t{}", labels.join("\t"))?; + } + + let mut kept = 0usize; + 'locus: for locus in &order { + let mut freqs = Vec::with_capacity(labels.len()); + let mut nums = Vec::with_capacity(labels.len()); + for k in 0..labels.len() { + // inner join: present in every population + let (Some(&f), Some(&n)) = (freq_cols[k].get(locus), num_cols[k].get(locus)) else { + continue 'locus; + }; + // dropna: any missing (incl. allele_number==0 -> NaN) drops the locus + if f.is_nan() || n.is_nan() { + continue 'locus; + } + freqs.push(f); + nums.push(n); + } + write!(out_freq, "{locus}")?; + for f in &freqs { + write!(out_freq, "\t{f}")?; + } + writeln!(out_freq)?; + write!(out_number, "{locus}")?; + for n in &nums { + write!(out_number, "\t{n}")?; + } + writeln!(out_number)?; + if let Some(w) = out_annotated.as_mut() { + let rs = rsid.get(locus).map(|s| s.as_str()).unwrap_or(""); + write!(w, "{locus}\t{rs}")?; + for f in &freqs { + write!(w, "\t{f}")?; + } + writeln!(w)?; + } + kept += 1; + } + + out_freq.flush()?; + out_number.flush()?; + if let Some(w) = out_annotated.as_mut() { + w.flush()?; + } + rsid.clear(); + + eprintln!( + "✅ merge-allele-freq: {} populations, {} loci kept (all-population non-missing)", + labels.len(), + kept + ); + println!("✅ Wrote merged matrices ({} loci)", kept); + Ok(()) +} + +struct CountryRow { + locus: String, + freq: f64, + number: f64, + rsid: String, +} + +/// Parse one per-population allele_freq TSV +/// (locus_key, allele_count, allele_number, num_homo, num_hetero, allele_freq, rsid). +/// allele_number == 0 -> freq and number set to NaN (missing). +fn read_country(path: &Path) -> Result> { + let file = File::open(path).with_context(|| format!("Open {:?}", path))?; + let mut reader = BufReader::new(file); + let mut header = String::new(); + if reader.read_line(&mut header)? == 0 { + bail!("{:?} is empty", path); + } + let mut out = Vec::new(); + let mut line = String::new(); + loop { + line.clear(); + if reader.read_line(&mut line)? == 0 { + break; + } + let trimmed = line.trim_end_matches(['\n', '\r']); + if trimmed.is_empty() { + continue; + } + let f: Vec<&str> = trimmed.split('\t').collect(); + if f.len() < 7 { + continue; + } + let number: f64 = f[2].trim().parse().unwrap_or(f64::NAN); + let freq: f64 = f[5].trim().parse().unwrap_or(f64::NAN); + let (freq, number) = if number == 0.0 { + (f64::NAN, f64::NAN) + } else { + (freq, number) + }; + out.push(CountryRow { + locus: f[0].to_string(), + freq, + number, + rsid: f[6].to_string(), + }); + } + Ok(out) +} + +fn parse_populations(entries: &[String]) -> Result> { + let mut out = Vec::with_capacity(entries.len()); + for e in entries { + let (label, path) = e + .split_once('=') + .with_context(|| format!("--population must be LABEL=PATH, got {e:?}"))?; + if label.is_empty() || path.is_empty() { + bail!("--population must be LABEL=PATH, got {e:?}"); + } + out.push((label.to_string(), PathBuf::from(path))); + } + Ok(out) +} + +fn create(path: &Path) -> Result> { + if let Some(parent) = path.parent() { + if !parent.as_os_str().is_empty() { + std::fs::create_dir_all(parent) + .with_context(|| format!("Create directory {:?}", parent))?; + } + } + Ok(BufWriter::new( + File::create(path).with_context(|| format!("Create {:?}", path))?, + )) +} diff --git a/cli/src/main.rs b/cli/src/main.rs index 8f15f69..fe57010 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -12,7 +12,10 @@ mod stats; mod util; use crate::commands::allele_report::run_allele_report; +use crate::commands::fast_allele_freq::run_fast_allele_freq; +use crate::commands::fst::run_fst; use crate::commands::genostats::run_genostats; +use crate::commands::merge_allele_freq::run_merge_allele_freq; use crate::commands::genotype_to_vcf::run_genotype_to_vcf; use crate::commands::list_missing_cache::run_list_missing_cache; use crate::commands::list_missing_rsids::run_list_missing_rsids; @@ -28,7 +31,10 @@ use crate::commands::update::run_update; mod commands { pub mod allele_report; + pub mod fast_allele_freq; + pub mod fst; pub mod genostats; + pub mod merge_allele_freq; pub mod genotype_to_vcf; pub mod list_missing_cache; pub mod list_missing_rsids; @@ -62,6 +68,12 @@ enum Commands { EmitLong(EmitLongArgs), /// Aggregate long-row records into matrix and allele frequency TSVs. AggregateLong(AggregateLongArgs), + /// Fused parse+aggregate: genotype files -> allele frequency TSV, no .bvlr. + FastAlleleFreq(FastAlleleFreqArgs), + /// Pairwise Weir & Cockerham 1984 FST matrix from merged allele freq/number. + Fst(FstArgs), + /// Inner-join per-population allele frequency TSVs into merged matrices. + MergeAlleleFreq(MergeAlleleFreqArgs), /// Convert a .bvlr file into TSV for debugging. DumpLong(DumpLongArgs), /// List rsids missing from the database based on a cache file. @@ -197,6 +209,9 @@ pub struct EmitLongArgs { /// Optional log file for missing reference rows (appends). #[arg(long)] pub missing_ref_log: Option, + /// Warning verbosity: none, summary (default), or full (per-row). + #[arg(long, value_enum, default_value_t = WarnDetail::Summary)] + pub warn_detail: WarnDetail, } #[derive(Args, Clone)] @@ -230,6 +245,60 @@ pub struct AggregateLongArgs { pub threads: usize, } +#[derive(Args, Clone)] +pub struct FastAlleleFreqArgs { + /// Input genotype file(s) or directories (scanned recursively). + #[arg(short = 'i', long = "input")] + pub inputs: Vec, + /// Path to the SQLite database containing rsid_reference data. + #[arg(long, default_value = "data/genostats.sqlite")] + pub sqlite: PathBuf, + /// Force re-download of the reference database. + #[arg(long, action = ArgAction::SetTrue)] + pub force_download: bool, + /// Output allele frequency TSV path. + #[arg(long)] + pub allele_freq_tsv: PathBuf, + /// Optional log file for missing reference rows (per-row TSV; forces single thread). + #[arg(long)] + pub missing_ref_log: Option, + /// Warning verbosity: none, summary (default), or full (per-row). + #[arg(long, value_enum, default_value_t = WarnDetail::Summary)] + pub warn_detail: WarnDetail, + /// Worker threads for parsing (0 = auto/all cores). + #[arg(long, default_value = "0")] + pub threads: usize, +} + +#[derive(Args, Clone)] +pub struct FstArgs { + /// Merged allele-frequency matrix TSV (locus_key + one column per population). + #[arg(long)] + pub merged_freq: PathBuf, + /// Merged allele-number matrix TSV (same shape as --merged-freq). + #[arg(long)] + pub merged_number: PathBuf, + /// Output population x population FST matrix TSV. + #[arg(long)] + pub output: PathBuf, +} + +#[derive(Args, Clone)] +pub struct MergeAlleleFreqArgs { + /// Population inputs as LABEL=PATH (repeatable; column order preserved). + #[arg(long = "population")] + pub populations: Vec, + /// Output merged allele-frequency matrix TSV. + #[arg(long)] + pub merged_freq: PathBuf, + /// Output merged allele-number matrix TSV. + #[arg(long)] + pub merged_number: PathBuf, + /// Optional rsid-annotated merged frequency matrix TSV. + #[arg(long)] + pub merged_annotated: Option, +} + #[derive(Args, Clone)] pub struct DumpLongArgs { /// Input .bvlr file to dump. @@ -313,6 +382,16 @@ pub struct SyncRsidCacheArgs { pub apply: bool, } +#[derive(ValueEnum, Clone, Copy, Debug, PartialEq, Eq)] +pub enum WarnDetail { + /// Suppress per-row warnings entirely. + None, + /// Print one coalesced count-per-code summary per file/run (default). + Summary, + /// Print every warning row (verbose; for debugging). + Full, +} + #[derive(ValueEnum, Clone, Copy, Debug, PartialEq, Eq)] pub enum SyntheticFormat { /// Dynamic DNA (DDNA) plus-strand genotype text (default). @@ -419,6 +498,9 @@ fn main() -> Result<()> { Commands::GenotypeToVcf(args) => run_genotype_to_vcf(args), Commands::EmitLong(args) => run_long_emit(args), Commands::AggregateLong(args) => run_long_aggregate(args), + Commands::FastAlleleFreq(args) => run_fast_allele_freq(args), + Commands::Fst(args) => run_fst(args), + Commands::MergeAlleleFreq(args) => run_merge_allele_freq(args), Commands::DumpLong(args) => run_long_dump(args), Commands::ListMissingCache(args) => run_list_missing_cache(args), Commands::ListMissingRsids(args) => run_list_missing_rsids(args), diff --git a/scripts/bench_fast_allele_freq.sh b/scripts/bench_fast_allele_freq.sh new file mode 100755 index 0000000..371bc59 --- /dev/null +++ b/scripts/bench_fast_allele_freq.sh @@ -0,0 +1,57 @@ +#!/usr/bin/env bash +# Reproduce the Stage 1 parity + benchmark: +# OLD: bvs emit-long (per file) -> bvs aggregate-long --threads 1 +# NEW: bvs fast-allele-freq (fused parse+aggregate, no .bvlr) +# Asserts the two allele_freq.tsv are byte-for-byte identical, reports timing/disk. +# +# Usage: scripts/bench_fast_allele_freq.sh [N_FILES] [LIMIT] +set -euo pipefail +cd "$(dirname "$0")/.." + +N="${1:-100}" +LIMIT="${2:-20000}" +BVS=./cli/target/release/bvs +DB=data/genostats.sqlite +BENCH=test_bench + +cargo build --release --manifest-path cli/Cargo.toml >/dev/null + +rm -rf "$BENCH"; mkdir -p "$BENCH/ddna" "$BENCH/illumina" "$BENCH/bvlr" + +echo "## generate $N DDNA + $N Illumina (biallelic, limit $LIMIT)" +$BVS synthetic --format ddna --biallelic --seed 1 --count "$N" --limit "$LIMIT" \ + --output "$BENCH/ddna/ddna_{index}.txt" --sqlite "$DB" >/dev/null +$BVS synthetic --format illumina --clean-illumina-rsids --biallelic --seed 1 --count "$N" --limit "$LIMIT" \ + --output "$BENCH/illumina/ill_{index}.txt" --sqlite "$DB" >/dev/null + +echo "## OLD: emit $((2*N)) -> .bvlr" +t0=$(date +%s) +for f in "$BENCH"/ddna/*.txt "$BENCH"/illumina/*.txt; do + $BVS emit-long --input "$f" --output "$BENCH/bvlr/$(basename "$f" .txt).bvlr" --sqlite "$DB" >/dev/null 2>>"$BENCH/emit.log" +done +t1=$(date +%s) +echo "## OLD: aggregate-long --threads 1" +$BVS aggregate-long --input "$BENCH/bvlr" --allele-freq-tsv "$BENCH/old_allele_freq.tsv" --threads 1 >/dev/null 2>>"$BENCH/agg.log" +t2=$(date +%s) + +echo "## NEW: fast-allele-freq" +t3=$(date +%s) +$BVS fast-allele-freq -i "$BENCH/ddna" -i "$BENCH/illumina" --sqlite "$DB" \ + --allele-freq-tsv "$BENCH/new_allele_freq.tsv" >/dev/null 2>>"$BENCH/new.log" +t4=$(date +%s) + +echo +echo "=== PARITY ===" +if cmp -s "$BENCH/old_allele_freq.tsv" "$BENCH/new_allele_freq.tsv"; then + echo "PASS: byte-for-byte identical ($(( $(wc -l <"$BENCH/old_allele_freq.tsv") - 1 )) loci)" + echo " sha256 $(shasum -a 256 "$BENCH/new_allele_freq.tsv" | cut -d' ' -f1)" +else + echo "FAIL: outputs differ"; diff "$BENCH/old_allele_freq.tsv" "$BENCH/new_allele_freq.tsv" | head; exit 1 +fi + +echo +echo "=== BENCHMARK ($((2*N)) files) ===" +printf "OLD emit: %ss\n" "$((t1-t0))" +printf "OLD aggregate: %ss\n" "$((t2-t1))" +printf "OLD total: %ss intermediate .bvlr: %s\n" "$((t2-t0))" "$(du -sh "$BENCH/bvlr" | cut -f1)" +printf "NEW total: %ss intermediate: 0\n" "$((t4-t3))" From 183b8eff433afb6ab82f808884aedd2da92e6644 Mon Sep 17 00:00:00 2001 From: Madhava Jay Date: Fri, 29 May 2026 21:41:50 +1000 Subject: [PATCH 2/3] lint --- cli/src/commands/fast_allele_freq.rs | 6 +++++- cli/src/commands/fst.rs | 4 +++- cli/src/commands/long_emit.rs | 4 +--- cli/src/main.rs | 4 ++-- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/cli/src/commands/fast_allele_freq.rs b/cli/src/commands/fast_allele_freq.rs index af8245c..080d24d 100644 --- a/cli/src/commands/fast_allele_freq.rs +++ b/cli/src/commands/fast_allele_freq.rs @@ -72,7 +72,11 @@ pub fn run_fast_allele_freq(args: FastAlleleFreqArgs) -> Result<()> { // Per-row TSV log can't be written safely from many threads -> single thread. let force_single = args.missing_ref_log.is_some(); - let threads = if force_single { 1 } else { resolve_threads(args.threads) }; + let threads = if force_single { + 1 + } else { + resolve_threads(args.threads) + }; eprintln!( "▶️ fast-allele-freq: {} input file(s), threads={}", tasks.len(), diff --git a/cli/src/commands/fst.rs b/cli/src/commands/fst.rs index a83e7a8..314af46 100644 --- a/cli/src/commands/fst.rs +++ b/cli/src/commands/fst.rs @@ -88,7 +88,9 @@ fn wc84(p1: f64, n1: f64, p2: f64, n2: f64) -> Option<(f64, f64, f64)> { let inner = p_bar * (1.0 - p_bar) - ((r - 1.0) / r) * s2 - h_bar / 4.0; let a = (n_bar / nc) * (s2 - inner / (n_bar - 1.0)); let b = (n_bar / (n_bar - 1.0)) - * (p_bar * (1.0 - p_bar) - ((r - 1.0) / r) * s2 - (2.0 * n_bar - 1.0) * h_bar / (4.0 * n_bar)); + * (p_bar * (1.0 - p_bar) + - ((r - 1.0) / r) * s2 + - (2.0 * n_bar - 1.0) * h_bar / (4.0 * n_bar)); let c = h_bar / 2.0; if a.is_finite() && b.is_finite() && c.is_finite() { Some((a, b, c)) diff --git a/cli/src/commands/long_emit.rs b/cli/src/commands/long_emit.rs index ed71112..39eb110 100644 --- a/cli/src/commands/long_emit.rs +++ b/cli/src/commands/long_emit.rs @@ -631,9 +631,7 @@ impl MissingRefLogger { } /// Render coalesced warning counts as `code=count, code=count` (sorted by code). -pub(crate) fn summarize_warning_counts( - counts: &std::collections::BTreeMap, -) -> String { +pub(crate) fn summarize_warning_counts(counts: &std::collections::BTreeMap) -> String { counts .iter() .map(|(code, n)| format!("{code}={n}")) diff --git a/cli/src/main.rs b/cli/src/main.rs index fe57010..e884f25 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -15,7 +15,6 @@ use crate::commands::allele_report::run_allele_report; use crate::commands::fast_allele_freq::run_fast_allele_freq; use crate::commands::fst::run_fst; use crate::commands::genostats::run_genostats; -use crate::commands::merge_allele_freq::run_merge_allele_freq; use crate::commands::genotype_to_vcf::run_genotype_to_vcf; use crate::commands::list_missing_cache::run_list_missing_cache; use crate::commands::list_missing_rsids::run_list_missing_rsids; @@ -23,6 +22,7 @@ use crate::commands::load_non_rsids::run_load_non_rsids; use crate::commands::long_aggregate::run_long_aggregate; use crate::commands::long_dump::run_long_dump; use crate::commands::long_emit::run_long_emit; +use crate::commands::merge_allele_freq::run_merge_allele_freq; use crate::commands::reference_load::run_reference_load; use crate::commands::resolve_rsids::run_resolve_rsids; use crate::commands::sync_rsid_cache::run_sync_rsid_cache; @@ -34,7 +34,6 @@ mod commands { pub mod fast_allele_freq; pub mod fst; pub mod genostats; - pub mod merge_allele_freq; pub mod genotype_to_vcf; pub mod list_missing_cache; pub mod list_missing_rsids; @@ -42,6 +41,7 @@ mod commands { pub mod long_aggregate; pub mod long_dump; pub mod long_emit; + pub mod merge_allele_freq; pub mod reference_load; pub mod resolve_rsids; pub mod sync_rsid_cache; From af1d333264e5f59ae10b7ddba71e34afbb261429 Mon Sep 17 00:00:00 2001 From: Madhava Jay Date: Fri, 29 May 2026 21:42:16 +1000 Subject: [PATCH 3/3] lint --- cli/src/commands/fst.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cli/src/commands/fst.rs b/cli/src/commands/fst.rs index 314af46..d544c19 100644 --- a/cli/src/commands/fst.rs +++ b/cli/src/commands/fst.rs @@ -7,6 +7,9 @@ use anyhow::{bail, Context, Result}; use crate::FstArgs; +type PopulationNames = Vec; +type MatrixRows = HashMap>; + /// Pairwise Weir & Cockerham 1984 FST (ratio-of-averages), reproducing /// `04_population_level/fst_islands/scripts/02_compute_fst.py`. Reads the merged /// allele-frequency and allele-number matrices and writes a population x @@ -101,7 +104,7 @@ fn wc84(p1: f64, n1: f64, p2: f64, n2: f64) -> Option<(f64, f64, f64)> { /// Read a `locus_keypop1...` matrix. Returns (population names, map /// locus_key -> per-population values). Empty / `nan` cells parse to NaN. -fn read_matrix(path: &Path) -> Result<(Vec, HashMap>)> { +fn read_matrix(path: &Path) -> Result<(PopulationNames, MatrixRows)> { let file = File::open(path).with_context(|| format!("Open {:?}", path))?; let mut reader = BufReader::new(file); let mut header = String::new();