diff --git a/cli/src/commands/cohort_bed.rs b/cli/src/commands/cohort_bed.rs new file mode 100644 index 0000000..b836eb1 --- /dev/null +++ b/cli/src/commands/cohort_bed.rs @@ -0,0 +1,390 @@ +use std::collections::HashMap; +use std::fs::{self, File}; +use std::io::{BufWriter, Write}; +use std::path::{Path, PathBuf}; +use std::sync::Mutex; +use std::time::Instant; + +use anyhow::{bail, Context, Result}; +use rayon::prelude::*; + +use crate::genotype_reader::{detect_delimiter, RowOutcome, RowParser}; +use crate::CohortBedArgs; + +const LOOKAHEAD_LINES: usize = 2048; + +/// Build a cohort PLINK `.bed/.bim/.fam` from DDNA/Illumina genotype files, +/// reproducing `pca_qc_fast/scripts/fast_pipeline.py`'s parse -> cohort SNP +/// universe -> biallelic choice -> PLINK writer byte-for-byte, but fast (parallel +/// Rust parse) and bounded-RAM (rsids interned, no per-sample string blow-up). +/// +/// NOTE: unlike emit-long/fast-allele-freq this does NOT resolve against the +/// reference DB. It uses the genotype files' own rsid/chrom/pos and a cohort-union +/// SNP universe with cohort minor/major allele coding, exactly like fast_pipeline. +pub fn run_cohort_bed(args: CohortBedArgs) -> Result<()> { + let overall = Instant::now(); + let samples = discover_samples(&args.input)?; + if samples.is_empty() { + bail!( + "No sample subdirectories with .txt files found under {:?}", + args.input + ); + } + eprintln!("▶️ cohort-bed: {} samples", samples.len()); + + // Phase 1: parallel parse. Each sample maps its rsids to global interned ids + // via a shared interner, returning compact (id, a1, a2) rows. Interning keeps + // memory ~ (total rows x 6 bytes) + one copy of each distinct variant, instead + // of holding every rsid string per sample. + let interner = Interner::new(); + let parse_start = Instant::now(); + let per_sample: Vec> = samples + .par_iter() + .map(|(_sid, path)| parse_sample(path, &interner)) + .collect::>>()?; + eprintln!( + "🧬 cohort-bed: parsed {} samples in {:.2}s", + samples.len(), + parse_start.elapsed().as_secs_f64() + ); + + let meta = interner.into_meta(); // id -> (rsid, chrom, pos) + + // Phase 2: cohort SNP universe = first-seen rsid order across samples in + // sample order (matches fast_pipeline's pass 1). + let n_samples = samples.len(); + let mut final_idx: Vec = vec![-1; meta.len()]; + let mut universe: Vec = Vec::new(); // interned id in universe order + for sample in &per_sample { + for &(id, _, _) in sample { + if final_idx[id as usize] < 0 { + final_idx[id as usize] = universe.len() as i32; + universe.push(id); + } + } + } + let n_snps = universe.len(); + eprintln!( + "🗺️ cohort-bed: {} SNP universe x {} samples", + n_snps, n_samples + ); + + // Optional snp_info.tsv = full cohort universe in first-seen order, matching + // fast_pipeline.build_snp_universe's output (header rsid/chromosome/position). + if let Some(snp_info_path) = &args.snp_info { + let mut w = BufWriter::new( + File::create(snp_info_path).with_context(|| format!("Create {:?}", snp_info_path))?, + ); + writeln!(w, "rsid\tchromosome\tposition")?; + for &id in &universe { + let (rsid, chrom, pos) = &meta[id as usize]; + writeln!(w, "{rsid}\t{chrom}\t{pos}")?; + } + w.flush()?; + } + + // SNP-major allele matrices (contiguous per SNP for counting), 0 = missing. + let mut a1 = vec![0u8; n_snps * n_samples]; + let mut a2 = vec![0u8; n_snps * n_samples]; + for (s, sample) in per_sample.iter().enumerate() { + for &(id, c1, c2) in sample { + let snp = final_idx[id as usize] as usize; + a1[snp * n_samples + s] = c1; + a2[snp * n_samples + s] = c2; + } + } + drop(per_sample); + + // Choose biallelic/mono variants + cohort minor/major coding. + let mut keep: Vec = Vec::new(); + for snp in 0..n_snps { + let base = snp * n_samples; + let mut counts = [0i64; 4]; + for s in 0..n_samples { + let (x, y) = (a1[base + s], a2[base + s]); + if x != 0 && y != 0 { + counts[(x - 1) as usize] += 1; + counts[(y - 1) as usize] += 1; + } + } + let n_distinct = counts.iter().filter(|&&c| c > 0).count(); + if n_distinct == 0 || n_distinct > 2 { + continue; + } + // Sort base codes 1..4 by count desc, ties by base order (stable) -> A Result<()> { + if let Some(parent) = out_prefix.parent() { + if !parent.as_os_str().is_empty() { + fs::create_dir_all(parent).with_context(|| format!("Create {:?}", parent))?; + } + } + let p = |ext: &str| { + let mut s = out_prefix.as_os_str().to_owned(); + s.push(ext); + PathBuf::from(s) + }; + + // .fam + let mut fam = BufWriter::new(File::create(p(".fam"))?); + for (sid, _) in samples { + writeln!(fam, "{sid}\t{sid}\t0\t0\t0\t-9")?; + } + fam.flush()?; + + // .bim + let mut bim = BufWriter::new(File::create(p(".bim"))?); + for kv in keep { + let (rsid, chrom, pos) = &meta[universe[kv.snp] as usize]; + writeln!( + bim, + "{}\t{}\t0\t{}\t{}\t{}", + chrom_code(chrom), + rsid, + pos, + base_label(kv.minor), + base_label(kv.major) + )?; + } + bim.flush()?; + + // .bed (2-bit, 4 samples/byte, magic 6c 1b 01) + let mut bed = BufWriter::new(File::create(p(".bed"))?); + bed.write_all(&[0x6c, 0x1b, 0x01])?; + let bytes_per_var = n_samples.div_ceil(4); + let mut codes = vec![0u8; bytes_per_var * 4]; + for kv in keep { + let base = kv.snp * n_samples; + for s in 0..n_samples { + let (x, y) = (a1[base + s], a2[base + s]); + let called = x != 0 && y != 0; + let in_set = (x == kv.major || x == kv.minor) && (y == kv.major || y == kv.minor); + // dosage = count of MINOR allele + let two_bit = if !called || !in_set { + 0b01 // missing + } else { + let d = (x == kv.minor) as u8 + (y == kv.minor) as u8; + match d { + 2 => 0b00, + 1 => 0b10, + _ => 0b11, + } + }; + codes[s] = two_bit; + } + for s in n_samples..codes.len() { + codes[s] = 0; // pad slots + } + for chunk in codes.chunks(4) { + let byte = chunk[0] | (chunk[1] << 2) | (chunk[2] << 4) | (chunk[3] << 6); + bed.write_all(&[byte])?; + } + } + bed.flush()?; + Ok(()) +} + +/// Map a chromosome label to PLINK's numeric code, matching fast_pipeline's +/// `_chrom_code` (ordered replacements: XY->25, MT->26, M->26, X->23, Y->24). +fn chrom_code(value: &str) -> String { + let mut c = value.replace("chr", "").replace("CHR", ""); + c = c.replace("XY", "25"); + c = c.replace("MT", "26"); + c = c.replace('M', "26"); + c = c.replace('X', "23"); + c = c.replace('Y', "24"); + c +} + +fn base_label(code: u8) -> char { + match code { + 1 => 'A', + 2 => 'C', + 3 => 'G', + 4 => 'T', + _ => '.', + } +} + +/// Concurrent rsid interner: rsid string -> stable id, plus id -> (rsid, chrom, pos) +/// captured at first insertion (so .bim recovers the original strings). +struct Interner { + inner: Mutex<(HashMap, Vec<(String, String, i64)>)>, +} + +impl Interner { + fn new() -> Self { + Self { + inner: Mutex::new((HashMap::new(), Vec::new())), + } + } + + fn intern(&self, rsid: &str, chrom: &str, pos: i64) -> u32 { + let mut guard = self.inner.lock().expect("interner poisoned"); + if let Some(&id) = guard.0.get(rsid) { + return id; + } + let id = guard.1.len() as u32; + guard.1.push((rsid.to_string(), chrom.to_string(), pos)); + guard.0.insert(rsid.to_string(), id); + id + } + + fn into_meta(self) -> Vec<(String, String, i64)> { + self.inner.into_inner().expect("interner poisoned").1 + } +} + +/// Parse one sample file: dedup rsid keep-first, return (interned_id, a1, a2). +/// Matches `read_sample`: keep no-call rows (they define the SNP), code A/C/G/T=1..4. +fn parse_sample(path: &Path, interner: &Interner) -> Result> { + use std::io::{BufRead, BufReader}; + let file = File::open(path).with_context(|| format!("Open {:?}", path))?; + let mut reader = BufReader::new(file); + let mut buffered: Vec = Vec::new(); + let mut buf = String::new(); + while buffered.len() < LOOKAHEAD_LINES { + buf.clear(); + if reader.read_line(&mut buf)? == 0 { + break; + } + buffered.push(buf.clone()); + } + if buffered.is_empty() { + return Ok(Vec::new()); + } + let delim = detect_delimiter(&buffered); + let mut parser = RowParser::new(delim); + let mut seen: HashMap = HashMap::new(); + let mut out: Vec<(u32, u8, u8)> = Vec::new(); + + let mut handle = |parser: &mut RowParser, line: &str| -> Result<()> { + if let RowOutcome::Parsed(row) = parser.consume_line(line)? { + // variant_id = rsid, else chrom:pos (matches read_pipeline_genotypes) + let rsid = if row.rsid.is_empty() { + format!("{}:{}", row.chrom, row.pos) + } else { + row.rsid.clone() + }; + let gt = match &row.genotype { + Some(g) => g.to_ascii_uppercase(), + None => return Ok(()), // dropna genotype + }; + if seen.insert(rsid.clone(), ()).is_some() { + return Ok(()); // drop_duplicates keep first + } + let (c1, c2) = allele_codes(>); + let id = interner.intern(&rsid, &row.chrom, row.pos); + out.push((id, c1, c2)); + } + Ok(()) + }; + + for line in &buffered { + handle(&mut parser, line)?; + } + loop { + buf.clear(); + if reader.read_line(&mut buf)? == 0 { + break; + } + handle(&mut parser, &buf)?; + } + Ok(out) +} + +/// First two chars of the genotype -> allele codes (A=1,C=2,G=3,T=4, else 0). +fn allele_codes(gt: &str) -> (u8, u8) { + let bytes = gt.as_bytes(); + let code = |b: Option<&u8>| match b { + Some(b'A') => 1, + Some(b'C') => 2, + Some(b'G') => 3, + Some(b'T') => 4, + _ => 0, + }; + (code(bytes.first()), code(bytes.get(1))) +} + +/// Discover samples like fast_pipeline: sorted subdirs, first sorted *.txt in each. +fn discover_samples(data_dir: &Path) -> Result> { + let mut dirs: Vec = fs::read_dir(data_dir) + .with_context(|| format!("Read dir {:?}", data_dir))? + .filter_map(|e| e.ok()) + .map(|e| e.path()) + .filter(|p| p.is_dir()) + .collect(); + dirs.sort(); + let mut samples = Vec::new(); + for dir in dirs { + let mut txts: Vec = fs::read_dir(&dir) + .with_context(|| format!("Read dir {:?}", dir))? + .filter_map(|e| e.ok()) + .map(|e| e.path()) + .filter(|p| p.is_file() && p.extension().and_then(|x| x.to_str()) == Some("txt")) + .collect(); + txts.sort(); + if let Some(first) = txts.into_iter().next() { + let sid = dir + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or_default() + .to_string(); + samples.push((sid, first)); + } + } + Ok(samples) +} diff --git a/cli/src/main.rs b/cli/src/main.rs index ca52148..8941e1a 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -12,6 +12,7 @@ mod stats; mod util; use crate::commands::allele_report::run_allele_report; +use crate::commands::cohort_bed::run_cohort_bed; use crate::commands::fast_allele_freq::run_fast_allele_freq; use crate::commands::fst::run_fst; use crate::commands::genostats::run_genostats; @@ -31,6 +32,7 @@ use crate::commands::update::run_update; mod commands { pub mod allele_report; + pub mod cohort_bed; pub mod fast_allele_freq; pub mod fst; pub mod genostats; @@ -72,6 +74,8 @@ enum Commands { FastAlleleFreq(FastAlleleFreqArgs), /// Pairwise Weir & Cockerham 1984 FST matrix from merged allele freq/number. Fst(FstArgs), + /// Build a cohort PLINK .bed/.bim/.fam from genotype files (for PCA/QC). + CohortBed(CohortBedArgs), /// Inner-join per-population allele frequency TSVs into merged matrices. MergeAlleleFreq(MergeAlleleFreqArgs), /// Convert a .bvlr file into TSV for debugging. @@ -275,6 +279,19 @@ pub struct FastAlleleFreqArgs { pub max_ram_gb: f64, } +#[derive(Args, Clone)] +pub struct CohortBedArgs { + /// Data dir containing one subdirectory per sample, each with a genotype .txt. + #[arg(short = 'i', long = "input")] + pub input: PathBuf, + /// Output PLINK prefix; writes .bed/.bim/.fam. + #[arg(long)] + pub out_prefix: PathBuf, + /// Optional snp_info.tsv (full cohort SNP universe: rsid/chromosome/position). + #[arg(long)] + pub snp_info: Option, +} + #[derive(Args, Clone)] pub struct FstArgs { /// Merged allele-frequency matrix TSV (locus_key + one column per population). @@ -505,6 +522,7 @@ fn main() -> Result<()> { Commands::AggregateLong(args) => run_long_aggregate(args), Commands::FastAlleleFreq(args) => run_fast_allele_freq(args), Commands::Fst(args) => run_fst(args), + Commands::CohortBed(args) => run_cohort_bed(args), Commands::MergeAlleleFreq(args) => run_merge_allele_freq(args), Commands::DumpLong(args) => run_long_dump(args), Commands::ListMissingCache(args) => run_list_missing_cache(args),