From 80a1f115d190398d6c38cfe0fe96323501db9d5b Mon Sep 17 00:00:00 2001 From: Madhava Jay Date: Fri, 29 May 2026 23:56:06 +1000 Subject: [PATCH] faster qc --- .github/workflows/ci.yml | 50 +++- .gitignore | 26 +- cli/Cargo.lock | 56 ++++- cli/Cargo.toml | 1 + cli/src/commands/cohort_bed.rs | 74 ++++-- cli/src/commands/project_bed.rs | 428 ++++++++++++++++++++++++++++++++ cli/src/main.rs | 27 ++ 7 files changed, 634 insertions(+), 28 deletions(-) create mode 100644 cli/src/commands/project_bed.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6692a20..c2b13cd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,12 +14,8 @@ concurrency: cancel-in-progress: true jobs: - test: - runs-on: ${{ matrix.os }} - strategy: - matrix: - os: [ubuntu-latest, macos-latest, windows-latest] - + lint: + runs-on: ubuntu-latest steps: - name: Checkout code uses: actions/checkout@v4 @@ -55,13 +51,51 @@ jobs: key: ${{ runner.os }}-cargo-build-target-${{ hashFiles('cli/Cargo.lock') }}-v2 - name: Check formatting - if: matrix.os != 'windows-latest' run: cd cli && cargo fmt -- --check - name: Run clippy (all targets) - if: matrix.os != 'windows-latest' run: cd cli && cargo clippy --all-targets --all-features --no-deps -- -D warnings + test: + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup Rust + uses: dtolnay/rust-toolchain@stable + with: + toolchain: 1.91.0 + components: rustfmt, clippy + + - name: Show Rust versions + run: | + rustc --version + cargo --version + rustfmt --version + + - name: Cache cargo registry + uses: actions/cache@v4 + with: + path: ~/.cargo/registry + key: ${{ runner.os }}-cargo-registry-${{ hashFiles('cli/Cargo.lock') }} + + - name: Cache cargo index + uses: actions/cache@v4 + with: + path: ~/.cargo/git + key: ${{ runner.os }}-cargo-index-${{ hashFiles('cli/Cargo.lock') }} + + - name: Cache cargo build + uses: actions/cache@v4 + with: + path: cli/target + key: ${{ runner.os }}-cargo-build-target-${{ hashFiles('cli/Cargo.lock') }}-v2 + - name: Run tests run: cd cli && cargo test --verbose diff --git a/.gitignore b/.gitignore index e8dc76f..0775ef7 100644 --- a/.gitignore +++ b/.gitignore @@ -33,5 +33,29 @@ notebooks/data/* data/*.sqlite data/*.db !data/.gitkeep -!data/genostats.sqlitescripts/__pycache__/ +!data/genostats.sqlite data/genostats.sqlite.bak +scripts/__pycache__/ + +# Local scratch / test artifacts (regenerable, not for the repo) +out/ +test_bench/ +test_data/ +test-data/ +test_data.zip +*.bvlr +*.vcf +*.vcf.gz +*.vcf.log +*.miss.log +allele-report.html +rsid_cache.json +genostats-summary.json +missing.json +missing.tsv +*.tbi +carika* +pc0001* +genotype_grch38.txt +*_run.log +notebooks_dbsnp_download.log diff --git a/cli/Cargo.lock b/cli/Cargo.lock index 6729b7b..e0e0326 100644 --- a/cli/Cargo.lock +++ b/cli/Cargo.lock @@ -111,6 +111,7 @@ dependencies = [ "chrono", "clap", "csv", + "dashmap", "glob", "indicatif", "noodles", @@ -323,6 +324,20 @@ dependencies = [ "memchr", ] +[[package]] +name = "dashmap" +version = "6.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6361d5c062261c78a176addb82d4c821ae42bed6089de0e12603cd25de2059c" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -821,6 +836,15 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" +[[package]] +name = "lock_api" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.28" @@ -950,9 +974,9 @@ checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" [[package]] name = "once_cell" -version = "1.21.3" +version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" [[package]] name = "once_cell_polyfill" @@ -960,6 +984,19 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" +[[package]] +name = "parking_lot_core" +version = "0.9.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-link 0.2.1", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -1166,6 +1203,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redox_syscall" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" +dependencies = [ + "bitflags", +] + [[package]] name = "reqwest" version = "0.12.24" @@ -1296,6 +1342,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.228" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 9873a7a..671c031 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -30,5 +30,6 @@ rand = { version = "0.8", features = ["std"] } reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls"] } noodles = { version = "0.104.0", features = ["vcf", "tabix", "bgzf", "core"] } glob = "0.3" +dashmap = "6" [dev-dependencies] diff --git a/cli/src/commands/cohort_bed.rs b/cli/src/commands/cohort_bed.rs index b836eb1..869408d 100644 --- a/cli/src/commands/cohort_bed.rs +++ b/cli/src/commands/cohort_bed.rs @@ -2,10 +2,11 @@ use std::collections::HashMap; use std::fs::{self, File}; use std::io::{BufWriter, Write}; use std::path::{Path, PathBuf}; -use std::sync::Mutex; +use std::sync::atomic::{AtomicU32, Ordering}; use std::time::Instant; use anyhow::{bail, Context, Result}; +use dashmap::DashMap; use rayon::prelude::*; use crate::genotype_reader::{detect_delimiter, RowOutcome, RowParser}; @@ -38,9 +39,15 @@ pub fn run_cohort_bed(args: CohortBedArgs) -> Result<()> { // of holding every rsid string per sample. let interner = Interner::new(); let parse_start = Instant::now(); + // map_init gives each rayon worker a reusable per-thread cache (rsid -> global + // id). After a worker sees the panel once, subsequent samples resolve rsids + // from this local map with zero shared-map access — so the ~1e9 lookups stop + // contending on the DashMap shards (which all samples hit identically). let per_sample: Vec> = samples .par_iter() - .map(|(_sid, path)| parse_sample(path, &interner)) + .map_init(HashMap::::new, |local, (_sid, path)| { + parse_sample(path, &interner, local) + }) .collect::>>()?; eprintln!( "🧬 cohort-bed: parsed {} samples in {:.2}s", @@ -225,8 +232,8 @@ fn write_outputs( }; codes[s] = two_bit; } - for s in n_samples..codes.len() { - codes[s] = 0; // pad slots + for code in codes.iter_mut().skip(n_samples) { + *code = 0; // pad slots } for chunk in codes.chunks(4) { let byte = chunk[0] | (chunk[1] << 2) | (chunk[2] << 4) | (chunk[3] << 6); @@ -261,36 +268,61 @@ fn base_label(code: u8) -> char { /// Concurrent rsid interner: rsid string -> stable id, plus id -> (rsid, chrom, pos) /// captured at first insertion (so .bim recovers the original strings). +type SnpMeta = (String, String, i64); + +/// Sharded `DashMap` so the parallel parse doesn't serialize on one lock. The +/// previous global `Mutex` was taken once per row (~1e9 times at 1400x692k), +/// which collapsed parallelism (~23 min at 1400 files). Sharded locks spread the +/// ~1e9 lookups (almost all hits — every sample shares the same rsids); only the +/// ~1e6 distinct rsids ever insert. struct Interner { - inner: Mutex<(HashMap, Vec<(String, String, i64)>)>, + map: DashMap, + meta: DashMap, + next: AtomicU32, } impl Interner { fn new() -> Self { Self { - inner: Mutex::new((HashMap::new(), Vec::new())), + map: DashMap::new(), + meta: DashMap::new(), + next: AtomicU32::new(0), } } fn intern(&self, rsid: &str, chrom: &str, pos: i64) -> u32 { - let mut guard = self.inner.lock().expect("interner poisoned"); - if let Some(&id) = guard.0.get(rsid) { - return id; + // Fast path: already-seen rsid -> sharded read, no exclusive lock. + if let Some(id) = self.map.get(rsid) { + return *id; } - let id = guard.1.len() as u32; - guard.1.push((rsid.to_string(), chrom.to_string(), pos)); - guard.0.insert(rsid.to_string(), id); - id + // First sighting: insert under the shard's entry lock. or_insert_with only + // runs (and bumps the id counter) if still vacant, so ids stay unique even + // if two threads race on the same new rsid. + *self.map.entry(rsid.to_string()).or_insert_with(|| { + let id = self.next.fetch_add(1, Ordering::Relaxed); + self.meta + .insert(id, (rsid.to_string(), chrom.to_string(), pos)); + id + }) } - fn into_meta(self) -> Vec<(String, String, i64)> { - self.inner.into_inner().expect("interner poisoned").1 + fn into_meta(self) -> Vec { + let n = self.next.load(Ordering::Relaxed) as usize; + let mut out: Vec = vec![(String::new(), String::new(), 0); n]; + for (id, m) in self.meta.into_iter() { + out[id as usize] = m; + } + out } } /// Parse one sample file: dedup rsid keep-first, return (interned_id, a1, a2). /// Matches `read_sample`: keep no-call rows (they define the SNP), code A/C/G/T=1..4. -fn parse_sample(path: &Path, interner: &Interner) -> Result> { +fn parse_sample( + path: &Path, + interner: &Interner, + local: &mut HashMap, +) -> Result> { use std::io::{BufRead, BufReader}; let file = File::open(path).with_context(|| format!("Open {:?}", path))?; let mut reader = BufReader::new(file); @@ -327,7 +359,15 @@ fn parse_sample(path: &Path, interner: &Interner) -> Result> return Ok(()); // drop_duplicates keep first } let (c1, c2) = allele_codes(>); - let id = interner.intern(&rsid, &row.chrom, row.pos); + // Per-thread cache first; only fall to the shared interner on a miss. + let id = match local.get(&rsid) { + Some(&id) => id, + None => { + let id = interner.intern(&rsid, &row.chrom, row.pos); + local.insert(rsid.clone(), id); + id + } + }; out.push((id, c1, c2)); } Ok(()) diff --git a/cli/src/commands/project_bed.rs b/cli/src/commands/project_bed.rs new file mode 100644 index 0000000..1e95e49 --- /dev/null +++ b/cli/src/commands/project_bed.rs @@ -0,0 +1,428 @@ +use std::collections::HashMap; +use std::fs::{self, File}; +use std::io::{BufRead, BufReader, BufWriter, Write}; +use std::path::{Path, PathBuf}; +use std::time::Instant; + +use anyhow::{bail, Context, Result}; +use rayon::prelude::*; + +use crate::genotype_reader::{detect_delimiter, RowOutcome, RowParser}; +use crate::ProjectBedArgs; + +const LOOKAHEAD_LINES: usize = 2048; + +/// Build a gnomAD-loadings-oriented PLINK `.bed/.bim/.fam` for PCA projection, +/// reproducing `gnomad_projection_fast/fast_convert_ddna_to_plink.py --loadings-npz` +/// byte-for-byte, but with a fast parallel Rust parse. +/// +/// Unlike `cohort-bed` (cohort minor/major): the panel is the fixed loadings set, +/// A1=alt / A2=ref, samples are matched by variant key (chrom*1e11+pos), and the +/// projection-specific filters apply (gs>=min_gs with NaN->1.0, autosomes 1-22, +/// 2-char ACGT). Low-overlap samples are dropped like fast_convert. +pub fn run_project_bed(args: ProjectBedArgs) -> Result<()> { + let overall = Instant::now(); + let panel = load_panel(&args.loadings_variants)?; + eprintln!( + "📋 project-bed: {} loadings panel variants", + panel.vars.len() + ); + + let samples = discover_samples(&args.input)?; + if samples.is_empty() { + bail!( + "No sample subdirectories with .txt files found under {:?}", + args.input + ); + } + eprintln!("▶️ project-bed: {} samples", samples.len()); + + // Parallel parse — the panel map is read-only and shared, so there is no + // interner contention (each sample matches against the fixed 76k panel). + let min_gs = args.min_gs; + let parse_start = Instant::now(); + let parsed: Vec = samples + .par_iter() + .map(|(_sid, path)| parse_sample(path, &panel, min_gs)) + .collect::>>()?; + eprintln!( + "🧬 project-bed: parsed {} samples in {:.2}s", + samples.len(), + parse_start.elapsed().as_secs_f64() + ); + + let n_panel = panel.vars.len(); + let n_samples = samples.len(); + // Panel-major allele matrices (0 = missing), filled at matched positions. + let mut a1 = vec![0u8; n_panel * n_samples]; + let mut a2 = vec![0u8; n_panel * n_samples]; + let mut matched = vec![0i32; n_samples]; + for (s, hits) in parsed.iter().enumerate() { + matched[s] = hits.rows.len() as i32; + for &(idx, c1, c2) in &hits.rows { + a1[idx as usize * n_samples + s] = c1; + a2[idx as usize * n_samples + s] = c2; + } + } + drop(parsed); + + // Keep panel variants with valid ACGT ref/alt (fixed_a1>0 & fixed_a2>0). + let keep_idx: Vec = (0..n_panel) + .filter(|&i| panel.vars[i].a1_code > 0 && panel.vars[i].a2_code > 0) + .collect(); + eprintln!( + "🧮 project-bed: {} / {} panel SNPs with valid ref/alt", + keep_idx.len(), + n_panel + ); + + // Projection overlap filter: drop samples below ceil(expected * ratio). + let min_observed = + (args.expected_loadings_overlap as f64 * args.min_loadings_ratio).ceil() as i32; + let keep_sample: Vec = matched.iter().map(|&m| m >= min_observed).collect(); + let n_keep = keep_sample.iter().filter(|&&k| k).count(); + eprintln!( + "🔎 project-bed: overlap filter expected={}, ratio={:.3}, min_observed={}; keeping {}/{} samples", + args.expected_loadings_overlap, args.min_loadings_ratio, min_observed, n_keep, n_samples + ); + if n_keep == 0 { + bail!( + "no samples passed the projection overlap filter; requires >= {} observed loading positions", + min_observed + ); + } + let kept_samples: Vec = (0..n_samples).filter(|&s| keep_sample[s]).collect(); + + write_outputs( + &args, + &panel, + &keep_idx, + &a1, + &a2, + &samples, + &kept_samples, + n_samples, + )?; + eprintln!( + "✅ project-bed: done in {:.2}s", + overall.elapsed().as_secs_f64() + ); + println!( + "✅ Wrote {}.bed/.bim/.fam ({} SNPs x {} samples)", + args.out_prefix.display(), + keep_idx.len(), + n_keep + ); + Ok(()) +} + +struct PanelVar { + rsid: String, // "chrom:pos:ref:alt" + chrom: String, + pos: i64, + a1_code: u8, // alt + a2_code: u8, // ref +} + +struct Panel { + vars: Vec, + key_to_idx: HashMap, +} + +struct SampleHits { + rows: Vec<(u32, u8, u8)>, // (panel_idx, a1_code, a2_code) +} + +/// Load the loadings panel from a TSV (chrom, pos, ref, alt, [locuskey]). +/// Sorted by variant key (chrom*1e11+pos), dedup keeping the first occurrence — +/// matching fast_convert's stable-sort + np.unique(first) on the npz. +fn load_panel(path: &Path) -> Result { + let file = File::open(path).with_context(|| format!("Open {:?}", path))?; + let reader = BufReader::new(file); + // (key, original_order, chrom, pos, ref, alt) + let mut rows: Vec<(i64, usize, String, i64, String, String)> = Vec::new(); + for (i, line) in reader.lines().enumerate() { + let line = line?; + let t = line.trim_end_matches(['\n', '\r']); + if t.is_empty() { + continue; + } + let f: Vec<&str> = t.split('\t').collect(); + if f.len() < 4 { + continue; + } + // skip a header line if present + let pos: i64 = match f[1].trim().parse() { + Ok(p) => p, + Err(_) => continue, + }; + let chrom = f[0].trim().to_string(); + let chrom_int: i64 = match chrom.parse() { + Ok(c) => c, + Err(_) => continue, // loadings are autosomal ints + }; + let key = chrom_int * 100_000_000_000 + pos; + rows.push(( + key, + i, + chrom, + pos, + f[2].trim().to_string(), + f[3].trim().to_string(), + )); + } + // stable sort by key, then dedup keeping the first original-order row per key. + rows.sort_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1))); + let mut vars = Vec::new(); + let mut key_to_idx = HashMap::new(); + let mut last_key: Option = None; + for (key, _ord, chrom, pos, r, a) in rows { + if last_key == Some(key) { + continue; // dedup: keep first + } + last_key = Some(key); + let a1_code = base_code(&a); // alt + let a2_code = base_code(&r); // ref + key_to_idx.insert(key, vars.len() as u32); + vars.push(PanelVar { + rsid: format!("{chrom}:{pos}:{r}:{a}"), + chrom, + pos, + a1_code, + a2_code, + }); + } + Ok(Panel { vars, key_to_idx }) +} + +/// Parse one sample: apply fast_convert's projection filters and match to the +/// panel by variant key. Dedup repeated keys keep-first. Returns matched rows. +fn parse_sample(path: &Path, panel: &Panel, min_gs: f32) -> Result { + let file = File::open(path).with_context(|| format!("Open {:?}", path))?; + let mut reader = BufReader::new(file); + let mut buffered: Vec = Vec::new(); + let mut buf = String::new(); + while buffered.len() < LOOKAHEAD_LINES { + buf.clear(); + if reader.read_line(&mut buf)? == 0 { + break; + } + buffered.push(buf.clone()); + } + if buffered.is_empty() { + return Ok(SampleHits { rows: Vec::new() }); + } + let delim = detect_delimiter(&buffered); + let mut parser = RowParser::new(delim); + let mut seen_keys: HashMap = HashMap::new(); + let mut rows: Vec<(u32, u8, u8)> = Vec::new(); + + let mut handle = |parser: &mut RowParser, line: &str| -> Result<()> { + if let RowOutcome::Parsed(row) = parser.consume_line(line)? { + // autosome 1-22 (chrom already cleaned by RowParser) + let chrom_int: i64 = match row.chrom.parse() { + Ok(c) if (1..=22).contains(&c) => c, + _ => return Ok(()), + }; + if row.pos <= 0 { + return Ok(()); + } + // gs >= min_gs; missing/unparseable gs -> 1.0 (matches fillna(1.0)) + let gs: f32 = row + .gs + .as_deref() + .and_then(|s| s.trim().parse::().ok()) + .unwrap_or(1.0); + if !matches!( + gs.partial_cmp(&min_gs), + Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal) + ) { + return Ok(()); + } + // genotype must be 2-char ACGT + let gt = match &row.genotype { + Some(g) => g.to_ascii_uppercase(), + None => return Ok(()), + }; + let (c1, c2) = match acgt_pair(>) { + Some(p) => p, + None => return Ok(()), + }; + let key = chrom_int * 100_000_000_000 + row.pos; + let Some(&idx) = panel.key_to_idx.get(&key) else { + return Ok(()); + }; + if seen_keys.insert(key, ()).is_some() { + return Ok(()); // dedup keep-first + } + rows.push((idx, c1, c2)); + } + Ok(()) + }; + + for line in &buffered { + handle(&mut parser, line)?; + } + loop { + buf.clear(); + if reader.read_line(&mut buf)? == 0 { + break; + } + handle(&mut parser, &buf)?; + } + Ok(SampleHits { rows }) +} + +/// 2-char genotype -> (a1,a2) codes, only if both chars are ACGT; else None. +fn acgt_pair(gt: &str) -> Option<(u8, u8)> { + let b = gt.as_bytes(); + if b.len() < 2 { + return None; + } + let c = |x: u8| match x { + b'A' => 1, + b'C' => 2, + b'G' => 3, + b'T' => 4, + _ => 0, + }; + let (a, d) = (c(b[0]), c(b[1])); + if a == 0 || d == 0 { + None + } else { + Some((a, d)) + } +} + +fn base_code(s: &str) -> u8 { + match s.as_bytes().first() { + Some(b'A') => 1, + Some(b'C') => 2, + Some(b'G') => 3, + Some(b'T') => 4, + _ => 0, + } +} + +fn base_label(code: u8) -> char { + match code { + 1 => 'A', + 2 => 'C', + 3 => 'G', + 4 => 'T', + _ => '.', + } +} + +#[allow(clippy::too_many_arguments)] +fn write_outputs( + args: &ProjectBedArgs, + panel: &Panel, + keep_idx: &[usize], + a1: &[u8], + a2: &[u8], + samples: &[(String, PathBuf)], + kept_samples: &[usize], + n_samples: usize, +) -> Result<()> { + let out_prefix = &args.out_prefix; + if let Some(parent) = out_prefix.parent() { + if !parent.as_os_str().is_empty() { + fs::create_dir_all(parent).with_context(|| format!("Create {:?}", parent))?; + } + } + let p = |ext: &str| { + let mut s = out_prefix.as_os_str().to_owned(); + s.push(ext); + PathBuf::from(s) + }; + + // .fam — note FID is "0" (matches fast_convert), IID = sample id. + let mut fam = BufWriter::new(File::create(p(".fam"))?); + for &s in kept_samples { + writeln!(fam, "0\t{}\t0\t0\t0\t-9", samples[s].0)?; + } + fam.flush()?; + + // .bim — variant id = "chrom:pos:ref:alt", A1=alt label, A2=ref label. + let mut bim = BufWriter::new(File::create(p(".bim"))?); + let mut bim_buf = String::new(); + for &i in keep_idx { + let v = &panel.vars[i]; + bim_buf.push_str(&format!( + "{}\t{}\t0\t{}\t{}\t{}\n", + v.chrom, + v.rsid, + v.pos, + base_label(v.a1_code), + base_label(v.a2_code) + )); + } + bim.write_all(bim_buf.as_bytes())?; + bim.flush()?; + + // .bed — 2-bit, kept variants x kept samples; dosage = count of A1(alt). + let mut bed = BufWriter::new(File::create(p(".bed"))?); + bed.write_all(&[0x6c, 0x1b, 0x01])?; + let nk = kept_samples.len(); + let bytes_per_var = nk.div_ceil(4); + let mut codes = vec![0u8; bytes_per_var * 4]; + for &i in keep_idx { + let v = &panel.vars[i]; + let base = i * n_samples; + for (j, &s) in kept_samples.iter().enumerate() { + let (x, y) = (a1[base + s], a2[base + s]); + let missing = x == 0 || y == 0; + let in_set = (x == v.a1_code || x == v.a2_code) && (y == v.a1_code || y == v.a2_code); + let two_bit = if missing || !in_set { + 0b01 + } else { + let n_a1 = (x == v.a1_code) as u8 + (y == v.a1_code) as u8; + match n_a1 { + 2 => 0b00, + 1 => 0b10, + _ => 0b11, + } + }; + codes[j] = two_bit; + } + for slot in codes.iter_mut().take(bytes_per_var * 4).skip(nk) { + *slot = 0; + } + for chunk in codes.chunks(4) { + bed.write_all(&[chunk[0] | (chunk[1] << 2) | (chunk[2] << 4) | (chunk[3] << 6)])?; + } + } + bed.flush()?; + Ok(()) +} + +/// Discover samples like fast_convert/fast_pipeline: sorted subdirs, first *.txt. +fn discover_samples(data_dir: &Path) -> Result> { + let mut dirs: Vec = fs::read_dir(data_dir) + .with_context(|| format!("Read dir {:?}", data_dir))? + .filter_map(|e| e.ok()) + .map(|e| e.path()) + .filter(|p| p.is_dir()) + .collect(); + dirs.sort(); + let mut samples = Vec::new(); + for dir in dirs { + let mut txts: Vec = fs::read_dir(&dir) + .with_context(|| format!("Read dir {:?}", dir))? + .filter_map(|e| e.ok()) + .map(|e| e.path()) + .filter(|p| p.is_file() && p.extension().and_then(|x| x.to_str()) == Some("txt")) + .collect(); + txts.sort(); + if let Some(first) = txts.into_iter().next() { + let sid = dir + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or_default() + .to_string(); + samples.push((sid, first)); + } + } + Ok(samples) +} diff --git a/cli/src/main.rs b/cli/src/main.rs index 8941e1a..8b20d2e 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -24,6 +24,7 @@ use crate::commands::long_aggregate::run_long_aggregate; use crate::commands::long_dump::run_long_dump; use crate::commands::long_emit::run_long_emit; use crate::commands::merge_allele_freq::run_merge_allele_freq; +use crate::commands::project_bed::run_project_bed; use crate::commands::reference_load::run_reference_load; use crate::commands::resolve_rsids::run_resolve_rsids; use crate::commands::sync_rsid_cache::run_sync_rsid_cache; @@ -44,6 +45,7 @@ mod commands { pub mod long_dump; pub mod long_emit; pub mod merge_allele_freq; + pub mod project_bed; pub mod reference_load; pub mod resolve_rsids; pub mod sync_rsid_cache; @@ -76,6 +78,8 @@ enum Commands { Fst(FstArgs), /// Build a cohort PLINK .bed/.bim/.fam from genotype files (for PCA/QC). CohortBed(CohortBedArgs), + /// Build a gnomAD-loadings-oriented PLINK bed for PCA projection. + ProjectBed(ProjectBedArgs), /// Inner-join per-population allele frequency TSVs into merged matrices. MergeAlleleFreq(MergeAlleleFreqArgs), /// Convert a .bvlr file into TSV for debugging. @@ -292,6 +296,28 @@ pub struct CohortBedArgs { pub snp_info: Option, } +#[derive(Args, Clone)] +pub struct ProjectBedArgs { + /// Data dir containing one subdirectory per sample, each with a genotype .txt. + #[arg(short = 'i', long = "input")] + pub input: PathBuf, + /// gnomAD loadings variants TSV (chrom, pos, ref, alt[, locuskey]). + #[arg(long)] + pub loadings_variants: PathBuf, + /// Output PLINK prefix; writes .bed/.bim/.fam. + #[arg(long)] + pub out_prefix: PathBuf, + /// Minimum genotype score (GS) to keep a row; missing GS counts as 1.0. + #[arg(long, default_value = "0.15")] + pub min_gs: f32, + /// Expected gnomAD loading sites on a healthy GSAv3 file. + #[arg(long, default_value = "8971")] + pub expected_loadings_overlap: usize, + /// Min fraction of expected overlap to keep a sample. + #[arg(long, default_value = "0.95")] + pub min_loadings_ratio: f64, +} + #[derive(Args, Clone)] pub struct FstArgs { /// Merged allele-frequency matrix TSV (locus_key + one column per population). @@ -523,6 +549,7 @@ fn main() -> Result<()> { Commands::FastAlleleFreq(args) => run_fast_allele_freq(args), Commands::Fst(args) => run_fst(args), Commands::CohortBed(args) => run_cohort_bed(args), + Commands::ProjectBed(args) => run_project_bed(args), Commands::MergeAlleleFreq(args) => run_merge_allele_freq(args), Commands::DumpLong(args) => run_long_dump(args), Commands::ListMissingCache(args) => run_list_missing_cache(args),