From 286027092c5620bbe02ed9d0bbab96091e79d62c Mon Sep 17 00:00:00 2001 From: Madhava Jay Date: Fri, 29 May 2026 11:07:12 +1000 Subject: [PATCH] illumina emit long --- cli/src/commands/long_emit.rs | 547 +++++++++++++++++++++++++++++++--- cli/src/genotype_reader.rs | 145 ++++++--- 2 files changed, 607 insertions(+), 85 deletions(-) diff --git a/cli/src/commands/long_emit.rs b/cli/src/commands/long_emit.rs index 4234ab6..85f98c7 100644 --- a/cli/src/commands/long_emit.rs +++ b/cli/src/commands/long_emit.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; use std::fs::File; use std::io::{BufRead, BufReader, BufWriter, Read, Write}; use std::path::{Path, PathBuf}; @@ -57,6 +57,7 @@ pub fn run_long_emit(args: EmitLongArgs) -> Result<()> { let store = StatsStore::connect_read_only(&sqlite_path)?; let mut resolver = ReferenceResolver::new(&store)?; + let mut successful_files = 0usize; for input in &args.inputs { let start = Instant::now(); let output_path = resolve_output_path(input, args.output.as_ref())?; @@ -65,13 +66,25 @@ pub fn run_long_emit(args: EmitLongArgs) -> Result<()> { .clone() .unwrap_or_else(|| default_participant(input)); let mut missing_logger = MissingRefLogger::new(args.missing_ref_log.as_deref())?; - let stats = emit_from_genotype( + let stats = match emit_from_genotype( input, &output_path, &participant, &mut resolver, &mut missing_logger, - )?; + ) { + Ok(stats) => stats, + Err(err) if args.inputs.len() > 1 => { + eprintln!( + "WARNING: skipping file {}: emit-long failed: {err:#}", + input.display() + ); + let _ = std::fs::remove_file(&output_path); + continue; + } + Err(err) => return Err(err), + }; + successful_files += 1; eprintln!( "✅ emit-long (genotype): {} parsed, {} rows, {} missing-ref, {} missing-gt", stats.rows_parsed, stats.rows_emitted, stats.missing_reference, stats.missing_gt @@ -87,6 +100,9 @@ pub fn run_long_emit(args: EmitLongArgs) -> Result<()> { participant ); } + if successful_files == 0 { + bail!("No input files produced usable long rows"); + } eprintln!( "⏱️ emit-long: total elapsed {:.2}s", overall_start.elapsed().as_secs_f64() @@ -227,9 +243,6 @@ fn emit_from_genotype( let delimiter = detect_delimiter(&buffered_lines); let mut parser = RowParser::new(delimiter); - let mut writer = LongRowWriter::new(BufWriter::new( - File::create(output).with_context(|| format!("Create {:?}", output))?, - )); let mut stats = EmitStats { variants_seen: 0, rows_emitted: 0, @@ -237,16 +250,17 @@ fn emit_from_genotype( rows_parsed: 0, missing_reference: 0, }; + let mut pending_rows = Vec::new(); let mut line_number: u64 = 0; let mut ctx = GenotypeEmitContext { parser: &mut parser, - writer: &mut writer, participant, resolver, stats: &mut stats, missing_logger, input, + pending_rows: &mut pending_rows, }; for line in &buffered_lines { @@ -265,18 +279,28 @@ fn emit_from_genotype( consume_genotype_line(&mut ctx, &buffer, line_number)?; } + let mut writer = LongRowWriter::new(BufWriter::new( + File::create(output).with_context(|| format!("Create {:?}", output))?, + )); + write_merged_long_rows(&mut writer, pending_rows, &mut stats)?; writer.flush()?; Ok(stats) } struct GenotypeEmitContext<'a> { parser: &'a mut RowParser, - writer: &'a mut LongRowWriter>, participant: &'a str, resolver: &'a mut ReferenceResolver, stats: &'a mut EmitStats, missing_logger: &'a mut MissingRefLogger, input: &'a Path, + pending_rows: &'a mut Vec, +} + +#[derive(Clone)] +struct PendingLongRow { + key: String, + row: LongRow, } fn consume_genotype_line( @@ -284,67 +308,195 @@ fn consume_genotype_line( line: &str, line_number: u64, ) -> Result<()> { - match ctx.parser.consume_line(line)? { + let outcome = match ctx.parser.consume_line(line) { + Ok(outcome) => outcome, + Err(err) => { + ctx.missing_logger + .log_issue(ctx.input, line_number, "parse_error", "", line)?; + eprintln!( + "WARNING: {}:{}: parse_error: {err:#}", + ctx.input.display(), + line_number + ); + return Ok(()); + } + }; + + match outcome { RowOutcome::Parsed(row) => { ctx.stats.rows_parsed += 1; - let rsid_label = row.rsid.trim(); - if rsid_label.is_empty() { - return Ok(()); + let mut rsid_label = clean_rsid_label(&row.rsid); + let mut reference = if rsid_label.is_empty() { + None + } else { + match ctx.resolver.resolve_rsid(&rsid_label) { + Ok(reference) => reference, + Err(err) => { + ctx.missing_logger.log_issue( + ctx.input, + line_number, + "rsid_resolve_error", + &rsid_label, + line, + )?; + eprintln!( + "WARNING: {}:{}: rsid_resolve_error {}: {err:#}", + ctx.input.display(), + line_number, + rsid_label + ); + None + } + } + }; + if reference.is_none() { + match ctx.resolver.resolve_position(&row.chrom, row.pos) { + Ok(Some(position_reference)) => { + if rsid_label.is_empty() { + rsid_label = format!("rs{}", position_reference.rsid); + } + reference = Some(position_reference); + } + Ok(None) => {} + Err(err) => { + ctx.missing_logger.log_issue( + ctx.input, + line_number, + "position_resolve_error", + &format!("{}:{}", row.chrom, row.pos), + line, + )?; + eprintln!( + "WARNING: {}:{}: position_resolve_error {}:{}: {err:#}", + ctx.input.display(), + line_number, + row.chrom, + row.pos + ); + } + } } - let reference = match ctx.resolver.resolve(rsid_label)? { + let reference = match reference { Some(reference) => reference, None => { ctx.stats.missing_reference += 1; - ctx.missing_logger - .log(ctx.input, line_number, rsid_label, line)?; + ctx.missing_logger.log_issue( + ctx.input, + line_number, + "missing_ref", + &rsid_label, + line, + )?; return Ok(()); } }; let reference_base = normalize_sequence(&reference.reference); let alternates = parse_alternates(&reference.alternates); if !is_snp(&reference_base, &alternates) { + ctx.missing_logger.log_issue( + ctx.input, + line_number, + "non_snp_reference", + &rsid_label, + line, + )?; return Ok(()); } let dosages = parse_genotype_dosages(row.genotype.as_deref(), &reference_base, &alternates); if let Some(dosages) = dosages { for (alt, dosage) in alternates.iter().zip(dosages.iter()) { - ctx.writer.write_row(&LongRow { - locus_key: locus_key( - &reference.chromosome, - reference.position, - &reference_base, - alt, - ), - rsid: rsid_label.to_string(), - participant_id: ctx.participant.to_string(), - dosage: *dosage, - })?; - ctx.stats.rows_emitted += 1; + push_pending_row(ctx, &reference, &reference_base, alt, &rsid_label, *dosage); } } else { ctx.stats.missing_gt += 1; for alt in &alternates { - ctx.writer.write_row(&LongRow { - locus_key: locus_key( - &reference.chromosome, - reference.position, - &reference_base, - alt, - ), - rsid: rsid_label.to_string(), - participant_id: ctx.participant.to_string(), - dosage: -1, - })?; - ctx.stats.rows_emitted += 1; + push_pending_row(ctx, &reference, &reference_base, alt, &rsid_label, -1); } } } - RowOutcome::Skipped | RowOutcome::Ignored => {} + RowOutcome::Skipped => { + ctx.missing_logger.log_issue( + ctx.input, + line_number, + "skipped_unusable_row", + "", + line, + )?; + } + RowOutcome::Ignored => {} } Ok(()) } +fn push_pending_row( + ctx: &mut GenotypeEmitContext<'_>, + reference: &ReferenceVariant, + reference_base: &str, + alt: &str, + rsid_label: &str, + dosage: i8, +) { + let row = LongRow { + locus_key: locus_key( + &reference.chromosome, + reference.position, + reference_base, + alt, + ), + rsid: rsid_label.to_string(), + participant_id: ctx.participant.to_string(), + dosage, + }; + let key = format!("{}\t{}\t{}", row.locus_key, row.rsid, row.participant_id); + ctx.pending_rows.push(PendingLongRow { key, row }); +} + +fn write_merged_long_rows( + writer: &mut LongRowWriter>, + pending_rows: Vec, + stats: &mut EmitStats, +) -> Result<()> { + let mut order: Vec = Vec::new(); + let mut groups: HashMap> = HashMap::new(); + for pending in pending_rows { + if !groups.contains_key(&pending.key) { + order.push(pending.key.clone()); + } + groups.entry(pending.key).or_default().push(pending.row); + } + + for key in order { + let members = groups.remove(&key).expect("group present"); + let selected = select_duplicate_probe_row(&members); + writer.write_row(&selected)?; + stats.rows_emitted += 1; + } + Ok(()) +} + +fn select_duplicate_probe_row(rows: &[LongRow]) -> LongRow { + let calls: Vec<&LongRow> = rows.iter().filter(|row| row.dosage >= 0).collect(); + if calls.is_empty() { + return rows[0].clone(); + } + let distinct: BTreeSet = calls.iter().map(|row| row.dosage).collect(); + if distinct.len() == 1 { + return calls[0].clone(); + } + let mut conflict = rows[0].clone(); + conflict.dosage = -1; + conflict +} + +fn clean_rsid_label(value: &str) -> String { + let value = value.trim(); + if value.is_empty() || matches!(value, "." | "-") { + return String::new(); + } + normalize_rsid(value) +} + struct MissingRefLogger { writer: Option>, } @@ -370,17 +522,36 @@ impl MissingRefLogger { Ok(Self { writer }) } - fn log(&mut self, input: &Path, line_number: u64, rsid: &str, raw_line: &str) -> Result<()> { + fn log_issue( + &mut self, + input: &Path, + line_number: u64, + code: &str, + label: &str, + raw_line: &str, + ) -> Result<()> { + let trimmed = raw_line.trim_end(); if let Some(writer) = self.writer.as_mut() { - let trimmed = raw_line.trim_end(); writeln!( writer, - "{}\t{}\t{}\tmissing_ref\t{}", + "{}\t{}\t{}\t{}\t{}", input.display(), line_number, - rsid, + label, + code, trimmed )?; + } else if !matches!( + code, + "parse_error" | "rsid_resolve_error" | "position_resolve_error" + ) { + eprintln!( + "WARNING: {}:{}: {} {}", + input.display(), + line_number, + code, + label + ); } Ok(()) } @@ -418,7 +589,8 @@ fn is_gz(path: &Path) -> bool { struct ReferenceResolver { conn: rusqlite::Connection, - cache: HashMap>, + rsid_cache: HashMap>, + position_cache: HashMap<(String, i64), Option>, } impl ReferenceResolver { @@ -426,17 +598,18 @@ impl ReferenceResolver { let conn = store.open_connection()?; Ok(Self { conn, - cache: HashMap::new(), + rsid_cache: HashMap::new(), + position_cache: HashMap::new(), }) } - fn resolve(&mut self, rsid: &str) -> Result> { + fn resolve_rsid(&mut self, rsid: &str) -> Result> { let rsid_norm = normalize_rsid(rsid); let rsid_int = match rsid_norm.trim_start_matches("rs").parse::() { Ok(value) => value, Err(_) => return Ok(None), }; - if let Some(cached) = self.cache.get(&rsid_int) { + if let Some(cached) = self.rsid_cache.get(&rsid_int) { return Ok(cached.clone()); } @@ -476,7 +649,283 @@ impl ReferenceResolver { .optional()?; } - self.cache.insert(rsid_int, row.clone()); + self.rsid_cache.insert(rsid_int, row.clone()); Ok(row) } + + fn resolve_position(&mut self, chrom: &str, pos: i64) -> Result> { + if pos <= 0 { + return Ok(None); + } + let chrom = normalize_chrom(chrom); + let key = (chrom, pos); + if let Some(cached) = self.position_cache.get(&key) { + return Ok(cached.clone()); + } + + let mut stmt = self.conn.prepare_cached( + "SELECT rsid, chromosome, position, reference, alternates + FROM grch38_non_rsids + WHERE chromosome = ?1 AND position = ?2 + ORDER BY rsid", + )?; + let mut rows = stmt.query((&key.0, key.1))?; + let mut resolved: Option = None; + let mut ambiguous = false; + while let Some(row) = rows.next()? { + let candidate = ReferenceVariant { + rsid: row.get(0)?, + chromosome: row.get(1)?, + position: row.get(2)?, + reference: row.get(3)?, + alternates: row.get(4)?, + }; + if let Some(existing) = resolved.as_ref() { + if existing.rsid != candidate.rsid + || existing.reference != candidate.reference + || existing.alternates != candidate.alternates + { + ambiguous = true; + break; + } + } else { + resolved = Some(candidate); + } + } + let row = if ambiguous { None } else { resolved }; + + self.position_cache.insert(key, row.clone()); + Ok(row) + } +} + +fn normalize_chrom(value: &str) -> String { + let chrom = value.trim(); + chrom + .strip_prefix("chr") + .or_else(|| chrom.strip_prefix("CHR")) + .unwrap_or(chrom) + .to_ascii_uppercase() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::commands::long_aggregate::run_long_aggregate; + use crate::AggregateLongArgs; + use std::fs; + use std::io::Read; + use std::time::{SystemTime, UNIX_EPOCH}; + + fn row(dosage: i8) -> LongRow { + LongRow { + locus_key: "1-100-A-G".to_string(), + rsid: "rs1".to_string(), + participant_id: "P1".to_string(), + dosage, + } + } + + fn test_resolver() -> ReferenceResolver { + let conn = rusqlite::Connection::open_in_memory().unwrap(); + conn.execute_batch( + "CREATE TABLE rsid_reference_user ( + rsid INTEGER PRIMARY KEY, + chromosome TEXT NOT NULL, + position INTEGER NOT NULL, + reference TEXT NOT NULL, + alternates TEXT NOT NULL + ); + CREATE TABLE rsid_reference ( + rsid INTEGER PRIMARY KEY, + chromosome TEXT NOT NULL, + position INTEGER NOT NULL, + reference TEXT NOT NULL, + alternates TEXT NOT NULL + ); + CREATE TABLE grch38_non_rsids ( + snp_name TEXT PRIMARY KEY, + rsid INTEGER NOT NULL, + chromosome TEXT NOT NULL, + position INTEGER NOT NULL, + reference TEXT NOT NULL, + alternates TEXT NOT NULL, + source TEXT NOT NULL, + note TEXT + ); + INSERT INTO grch38_non_rsids + (snp_name, rsid, chromosome, position, reference, alternates, source) + VALUES + ('CNV_1', 123, '2', 200, 'A', 'G', 'test'), + ('AMBIG_1', 124, '3', 300, 'A', 'G', 'test'), + ('AMBIG_2', 125, '3', 300, 'A', 'T', 'test');", + ) + .unwrap(); + ReferenceResolver { + conn, + rsid_cache: HashMap::new(), + position_cache: HashMap::new(), + } + } + + fn full_test_resolver() -> ReferenceResolver { + let resolver = test_resolver(); + resolver + .conn + .execute_batch( + "INSERT INTO rsid_reference + (rsid, chromosome, position, reference, alternates) + VALUES + (1, '1', 100, 'A', 'G'), + (2, '1', 200, 'C', 'T'), + (3, '2', 300, 'A', 'C'); + INSERT INTO grch38_non_rsids + (snp_name, rsid, chromosome, position, reference, alternates, source) + VALUES + ('NONRS_CNV_A', 3, '2', 300, 'A', 'C', 'test');", + ) + .unwrap(); + resolver + } + + fn unique_test_dir(label: &str) -> PathBuf { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos(); + let dir = std::env::temp_dir().join(format!("bvs-{label}-{nanos}")); + fs::create_dir_all(&dir).unwrap(); + dir + } + + fn write_text(path: &Path, contents: &str) { + fs::write(path, contents).unwrap(); + } + + fn emit_test_file( + resolver: &mut ReferenceResolver, + input: &Path, + output: &Path, + participant: &str, + warning_log: &Path, + ) { + let mut logger = MissingRefLogger::new(Some(warning_log)).unwrap(); + let stats = emit_from_genotype(input, output, participant, resolver, &mut logger).unwrap(); + assert!( + stats.rows_emitted > 0, + "expected emitted rows for {}", + participant + ); + } + + #[test] + fn duplicate_probe_merge_prefers_concordant_call_over_no_call() { + let selected = select_duplicate_probe_row(&[row(-1), row(1), row(1)]); + assert_eq!(selected.dosage, 1); + } + + #[test] + fn duplicate_probe_merge_conflict_becomes_no_call() { + let selected = select_duplicate_probe_row(&[row(0), row(2)]); + assert_eq!(selected.dosage, -1); + } + + #[test] + fn resolver_uses_position_fallback_for_non_rsid_illumina_probes() { + let mut resolver = test_resolver(); + let reference = resolver.resolve_position("chr2", 200).unwrap().unwrap(); + assert_eq!(reference.rsid, 123); + assert_eq!(reference.reference, "A"); + assert_eq!(reference.alternates, "G"); + } + + #[test] + fn resolver_does_not_choose_ambiguous_position_fallback() { + let mut resolver = test_resolver(); + assert!(resolver.resolve_position("3", 300).unwrap().is_none()); + } + + #[test] + fn emit_long_then_aggregate_counts_ddna_and_noisy_illumina_rsids() { + let tmp = unique_test_dir("emit-aggregate"); + let warn = tmp.join("warnings.tsv"); + let ddna1 = tmp.join("ddna1.txt"); + let ddna2 = tmp.join("ddna2.txt"); + let illumina_sectionless = tmp.join("illumina_sectionless.txt"); + let illumina_sections = tmp.join("illumina_sections.txt"); + + write_text( + &ddna1, + "rs1\t1\t100\tAG\t.\t.\t.\nrs2\t1\t200\tTT\t.\t.\t.\nrs404\t1\t404\tAA\t.\t.\t.\n", + ); + write_text( + &ddna2, + "rs1\tchr1\t100\tAA\t.\t.\t.\nrs2\t1\t200\tCT\t.\t.\t.\n", + ); + write_text( + &illumina_sectionless, + "SNP Name\tSNP\tChr\tPosition\tAllele1 - Plus\tAllele2 - Plus\n\ + BOT-rs1\t[A/G]\tchr1\t100\tG\tG\n\ + rs1_ilmndup1\t[A/G]\t1\t100\t-\t-\n\ + malformed-short-row\n\ + NONRS_CNV_A\t[A/C]\tchr2\t300\tA\tC\n\ + UNMAPPED_VENDOR_PROBE\t.\t0\t0\tA\tA\n", + ); + write_text( + &illumina_sections, + "[Header]\nGSGT Version\t2.0.5\n[Data]\n\ + SNP Name\tSNP\tChr\tPosition\tAllele1 - Plus\tAllele2 - Plus\n\ + rs1_ilmndup2\t[A/G]\t1\t100\tA\tG\n\ + BOT2-rs2\t[C/T]\t1\t200\tC\tC\n\ + rs2_ilmndup_conflict\t[C/T]\t1\t200\tT\tT\n\ + NONRS_CNV_A\t[A/C]\t2\t300\tC\tC\n", + ); + + let mut resolver = full_test_resolver(); + let inputs = [ + (&ddna1, tmp.join("P_DDNA1.bvlr"), "P_DDNA1"), + (&ddna2, tmp.join("P_DDNA2.bvlr"), "P_DDNA2"), + (&illumina_sectionless, tmp.join("P_ILL1.bvlr"), "P_ILL1"), + (&illumina_sections, tmp.join("P_ILL2.bvlr"), "P_ILL2"), + ]; + for (input, output, participant) in &inputs { + emit_test_file(&mut resolver, input, output, participant, &warn); + } + + let args = AggregateLongArgs { + inputs: inputs.iter().map(|(_, output, _)| output.clone()).collect(), + input_list: None, + input_glob: None, + matrix_tsv: Some(tmp.join("matrix.tsv")), + allele_freq_tsv: tmp.join("allele.tsv"), + tmp_dir: Some(tmp.join("tmp")), + chunk_records: 2, + threads: 1, + max_ram_percent: 80, + }; + run_long_aggregate(args).unwrap(); + + let mut matrix = String::new(); + File::open(tmp.join("matrix.tsv")) + .unwrap() + .read_to_string(&mut matrix) + .unwrap(); + assert!(matrix.contains("locus_key\trsid\tP_DDNA1\tP_DDNA2\tP_ILL1\tP_ILL2")); + assert!(matrix.contains("1-100-A-G\trs1\t1\t0\t2\t1")); + assert!(matrix.contains("1-200-C-T\trs2\t2\t1\t-1\t-1")); + assert!(matrix.contains("2-300-A-C\trs3\t-1\t-1\t1\t2")); + + let mut allele = String::new(); + File::open(tmp.join("allele.tsv")) + .unwrap() + .read_to_string(&mut allele) + .unwrap(); + assert!(allele.contains("1-100-A-G\t4\t8\t1\t2\t0.500000\trs1")); + assert!(allele.contains("1-200-C-T\t3\t4\t1\t1\t0.750000\trs2")); + assert!(allele.contains("2-300-A-C\t3\t4\t1\t1\t0.750000\trs3")); + + let warnings = fs::read_to_string(&warn).unwrap(); + assert!(warnings.contains("missing_ref")); + assert!(warnings.contains("skipped_unusable_row")); + } } diff --git a/cli/src/genotype_reader.rs b/cli/src/genotype_reader.rs index 8767457..483f897 100644 --- a/cli/src/genotype_reader.rs +++ b/cli/src/genotype_reader.rs @@ -4,7 +4,7 @@ use anyhow::Result; const COMMENT_PREFIXES: [&str; 2] = ["#", "//"]; -const RSID_ALIASES: &[&str] = &["rsid", "name", "snp", "marker", "id", "snpid"]; +const RSID_ALIASES: &[&str] = &["rsid", "name", "snp", "marker", "id", "snpid", "snpname"]; const CHROM_ALIASES: &[&str] = &["chromosome", "chr", "chrom"]; const POSITION_ALIASES: &[&str] = &[ "position", @@ -182,40 +182,8 @@ impl RowParser { row_map.insert(normalize_name(&header[idx]), strip_inline_comment(&value)); } - if self.carigenetics { - // SNP Name -> rsid via rs\d+ extraction; non-rs probes keep an - // empty rsid and resolve downstream by (chrom,pos). - let snp_name = row_map - .get("snpname") - .map(|s| s.as_str()) - .unwrap_or_default(); - let rsid = extract_rs(snp_name).unwrap_or_default(); - let chrom = match row_map.get("chr").or_else(|| row_map.get("chromosome")) { - Some(v) if !v.is_empty() && v != "0" => v.clone(), - _ => return Ok(RowOutcome::Skipped), - }; - let pos = match row_map.get("position").and_then(|v| v.parse::().ok()) { - Some(p) if p > 0 => p, - _ => return Ok(RowOutcome::Skipped), - }; - let a1 = row_map.get("allele1plus").map(|s| s.as_str()).unwrap_or(""); - let a2 = row_map.get("allele2plus").map(|s| s.as_str()).unwrap_or(""); - let genotype = if a1.is_empty() && a2.is_empty() { - None - } else if a1 == "-" || a2 == "-" { - Some("--".to_string()) - } else { - Some(format!("{}{}", a1, a2)) - }; - return Ok(RowOutcome::Parsed(GenotypeRow { - rsid, - chrom, - pos, - genotype, - gs: None, - baf: None, - lrr: None, - })); + if self.carigenetics || looks_like_illumina_row(&row_map) { + return Ok(parse_illumina_row(&row_map).unwrap_or(RowOutcome::Skipped)); } let rsid = match self.lookup(&row_map, "rsid") { @@ -223,7 +191,7 @@ impl RowParser { _ => return Ok(RowOutcome::Skipped), }; let chrom = match self.lookup(&row_map, "chromosome") { - Some(value) if !value.is_empty() => value, + Some(value) if !value.is_empty() => clean_chrom(&value), _ => return Ok(RowOutcome::Skipped), }; let pos = match self @@ -369,6 +337,68 @@ fn split_csv_line(line: &str) -> Vec { fields } +fn looks_like_illumina_row(row_map: &HashMap) -> bool { + row_map.contains_key("snpname") + && row_map + .get("chr") + .or_else(|| row_map.get("chromosome")) + .is_some() + && row_map.get("position").is_some() + && (row_map.get("allele1plus").is_some() || row_map.get("allele2plus").is_some()) +} + +fn parse_illumina_row(row_map: &HashMap) -> Option { + // SNP Name -> rsid via rs\d+ extraction; non-rs probes keep an empty rsid + // and resolve downstream by (chrom,pos). + let snp_name = row_map + .get("snpname") + .map(|s| s.as_str()) + .unwrap_or_default(); + let rsid = extract_rs(snp_name).unwrap_or_default(); + let chrom = match row_map.get("chr").or_else(|| row_map.get("chromosome")) { + Some(v) if !v.is_empty() => clean_chrom(v), + _ => return Some(RowOutcome::Skipped), + }; + if chrom == "0" { + return Some(RowOutcome::Skipped); + } + let pos = match row_map.get("position").and_then(|v| v.parse::().ok()) { + Some(p) if p > 0 => p, + _ => return Some(RowOutcome::Skipped), + }; + let a1 = row_map.get("allele1plus").map(|s| s.as_str()).unwrap_or(""); + let a2 = row_map.get("allele2plus").map(|s| s.as_str()).unwrap_or(""); + let genotype = if a1.is_empty() && a2.is_empty() { + None + } else if is_no_call_allele(a1) || is_no_call_allele(a2) { + Some("--".to_string()) + } else { + Some(format!("{}{}", a1.trim(), a2.trim())) + }; + Some(RowOutcome::Parsed(GenotypeRow { + rsid, + chrom, + pos, + genotype, + gs: None, + baf: None, + lrr: None, + })) +} + +fn clean_chrom(value: &str) -> String { + let chrom = value.trim(); + chrom + .strip_prefix("chr") + .or_else(|| chrom.strip_prefix("CHR")) + .unwrap_or(chrom) + .to_ascii_uppercase() +} + +fn is_no_call_allele(value: &str) -> bool { + matches!(value.trim(), "" | "-" | "." | "N" | "n" | "0") +} + fn normalize_name(name: &str) -> String { name.chars() .filter(|c| !matches!(c, ' ' | '\t' | '-' | '_')) @@ -390,3 +420,46 @@ fn strip_inline_comment(value: &str) -> String { fn strip_bom(value: &str) -> &str { value.strip_prefix('\u{feff}').unwrap_or(value) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_sectionless_illumina_rows() { + let mut parser = RowParser::new(Delimiter::Tab); + assert!(matches!( + parser + .consume_line("SNP Name\tSNP\tChr\tPosition\tAllele1 - Plus\tAllele2 - Plus") + .unwrap(), + RowOutcome::Ignored + )); + let row = match parser + .consume_line("BOT-rs1135675\t[A/G]\tchr1\t100\tA\tG") + .unwrap() + { + RowOutcome::Parsed(row) => row, + _ => panic!("expected parsed row"), + }; + assert_eq!(row.rsid, "rs1135675"); + assert_eq!(row.chrom, "1"); + assert_eq!(row.pos, 100); + assert_eq!(row.genotype.as_deref(), Some("AG")); + } + + #[test] + fn keeps_illumina_non_rsid_probe_resolvable_by_position() { + let mut parser = RowParser::new(Delimiter::Tab); + parser + .consume_line("SNP Name\tSNP\tChr\tPosition\tAllele1 - Plus\tAllele2 - Plus") + .unwrap(); + let row = match parser.consume_line("CNV_123\t.\t2\t200\t-\tA").unwrap() { + RowOutcome::Parsed(row) => row, + _ => panic!("expected parsed row"), + }; + assert_eq!(row.rsid, ""); + assert_eq!(row.chrom, "2"); + assert_eq!(row.pos, 200); + assert_eq!(row.genotype.as_deref(), Some("--")); + } +}