Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 42 additions & 8 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,8 @@ concurrency:
cancel-in-progress: true

jobs:
test:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]

lint:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand Down Expand Up @@ -55,13 +51,51 @@ jobs:
key: ${{ runner.os }}-cargo-build-target-${{ hashFiles('cli/Cargo.lock') }}-v2

- name: Check formatting
if: matrix.os != 'windows-latest'
run: cd cli && cargo fmt -- --check

- name: Run clippy (all targets)
if: matrix.os != 'windows-latest'
run: cd cli && cargo clippy --all-targets --all-features --no-deps -- -D warnings

test:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Setup Rust
uses: dtolnay/rust-toolchain@stable
with:
toolchain: 1.91.0
components: rustfmt, clippy

- name: Show Rust versions
run: |
rustc --version
cargo --version
rustfmt --version

- name: Cache cargo registry
uses: actions/cache@v4
with:
path: ~/.cargo/registry
key: ${{ runner.os }}-cargo-registry-${{ hashFiles('cli/Cargo.lock') }}

- name: Cache cargo index
uses: actions/cache@v4
with:
path: ~/.cargo/git
key: ${{ runner.os }}-cargo-index-${{ hashFiles('cli/Cargo.lock') }}

- name: Cache cargo build
uses: actions/cache@v4
with:
path: cli/target
key: ${{ runner.os }}-cargo-build-target-${{ hashFiles('cli/Cargo.lock') }}-v2

- name: Run tests
run: cd cli && cargo test --verbose

Expand Down
26 changes: 25 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,29 @@ notebooks/data/*
data/*.sqlite
data/*.db
!data/.gitkeep
!data/genostats.sqlitescripts/__pycache__/
!data/genostats.sqlite
data/genostats.sqlite.bak
scripts/__pycache__/

# Local scratch / test artifacts (regenerable, not for the repo)
out/
test_bench/
test_data/
test-data/
test_data.zip
*.bvlr
*.vcf
*.vcf.gz
*.vcf.log
*.miss.log
allele-report.html
rsid_cache.json
genostats-summary.json
missing.json
missing.tsv
*.tbi
carika*
pc0001*
genotype_grch38.txt
*_run.log
notebooks_dbsnp_download.log
56 changes: 54 additions & 2 deletions cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ rand = { version = "0.8", features = ["std"] }
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls"] }
noodles = { version = "0.104.0", features = ["vcf", "tabix", "bgzf", "core"] }
glob = "0.3"
dashmap = "6"

[dev-dependencies]
74 changes: 57 additions & 17 deletions cli/src/commands/cohort_bed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use std::collections::HashMap;
use std::fs::{self, File};
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Instant;

use anyhow::{bail, Context, Result};
use dashmap::DashMap;
use rayon::prelude::*;

use crate::genotype_reader::{detect_delimiter, RowOutcome, RowParser};
Expand Down Expand Up @@ -38,9 +39,15 @@ pub fn run_cohort_bed(args: CohortBedArgs) -> Result<()> {
// of holding every rsid string per sample.
let interner = Interner::new();
let parse_start = Instant::now();
// map_init gives each rayon worker a reusable per-thread cache (rsid -> global
// id). After a worker sees the panel once, subsequent samples resolve rsids
// from this local map with zero shared-map access — so the ~1e9 lookups stop
// contending on the DashMap shards (which all samples hit identically).
let per_sample: Vec<Vec<(u32, u8, u8)>> = samples
.par_iter()
.map(|(_sid, path)| parse_sample(path, &interner))
.map_init(HashMap::<String, u32>::new, |local, (_sid, path)| {
parse_sample(path, &interner, local)
})
.collect::<Result<Vec<_>>>()?;
eprintln!(
"🧬 cohort-bed: parsed {} samples in {:.2}s",
Expand Down Expand Up @@ -225,8 +232,8 @@ fn write_outputs(
};
codes[s] = two_bit;
}
for s in n_samples..codes.len() {
codes[s] = 0; // pad slots
for code in codes.iter_mut().skip(n_samples) {
*code = 0; // pad slots
}
for chunk in codes.chunks(4) {
let byte = chunk[0] | (chunk[1] << 2) | (chunk[2] << 4) | (chunk[3] << 6);
Expand Down Expand Up @@ -261,36 +268,61 @@ fn base_label(code: u8) -> char {

/// Concurrent rsid interner: rsid string -> stable id, plus id -> (rsid, chrom, pos)
/// captured at first insertion (so .bim recovers the original strings).
type SnpMeta = (String, String, i64);

/// Sharded `DashMap` so the parallel parse doesn't serialize on one lock. The
/// previous global `Mutex` was taken once per row (~1e9 times at 1400x692k),
/// which collapsed parallelism (~23 min at 1400 files). Sharded locks spread the
/// ~1e9 lookups (almost all hits — every sample shares the same rsids); only the
/// ~1e6 distinct rsids ever insert.
struct Interner {
inner: Mutex<(HashMap<String, u32>, Vec<(String, String, i64)>)>,
map: DashMap<String, u32>,
meta: DashMap<u32, SnpMeta>,
next: AtomicU32,
}

impl Interner {
fn new() -> Self {
Self {
inner: Mutex::new((HashMap::new(), Vec::new())),
map: DashMap::new(),
meta: DashMap::new(),
next: AtomicU32::new(0),
}
}

fn intern(&self, rsid: &str, chrom: &str, pos: i64) -> u32 {
let mut guard = self.inner.lock().expect("interner poisoned");
if let Some(&id) = guard.0.get(rsid) {
return id;
// Fast path: already-seen rsid -> sharded read, no exclusive lock.
if let Some(id) = self.map.get(rsid) {
return *id;
}
let id = guard.1.len() as u32;
guard.1.push((rsid.to_string(), chrom.to_string(), pos));
guard.0.insert(rsid.to_string(), id);
id
// First sighting: insert under the shard's entry lock. or_insert_with only
// runs (and bumps the id counter) if still vacant, so ids stay unique even
// if two threads race on the same new rsid.
*self.map.entry(rsid.to_string()).or_insert_with(|| {
let id = self.next.fetch_add(1, Ordering::Relaxed);
self.meta
.insert(id, (rsid.to_string(), chrom.to_string(), pos));
id
})
}

fn into_meta(self) -> Vec<(String, String, i64)> {
self.inner.into_inner().expect("interner poisoned").1
fn into_meta(self) -> Vec<SnpMeta> {
let n = self.next.load(Ordering::Relaxed) as usize;
let mut out: Vec<SnpMeta> = vec![(String::new(), String::new(), 0); n];
for (id, m) in self.meta.into_iter() {
out[id as usize] = m;
}
out
}
}

/// Parse one sample file: dedup rsid keep-first, return (interned_id, a1, a2).
/// Matches `read_sample`: keep no-call rows (they define the SNP), code A/C/G/T=1..4.
fn parse_sample(path: &Path, interner: &Interner) -> Result<Vec<(u32, u8, u8)>> {
fn parse_sample(
path: &Path,
interner: &Interner,
local: &mut HashMap<String, u32>,
) -> Result<Vec<(u32, u8, u8)>> {
use std::io::{BufRead, BufReader};
let file = File::open(path).with_context(|| format!("Open {:?}", path))?;
let mut reader = BufReader::new(file);
Expand Down Expand Up @@ -327,7 +359,15 @@ fn parse_sample(path: &Path, interner: &Interner) -> Result<Vec<(u32, u8, u8)>>
return Ok(()); // drop_duplicates keep first
}
let (c1, c2) = allele_codes(&gt);
let id = interner.intern(&rsid, &row.chrom, row.pos);
// Per-thread cache first; only fall to the shared interner on a miss.
let id = match local.get(&rsid) {
Some(&id) => id,
None => {
let id = interner.intern(&rsid, &row.chrom, row.pos);
local.insert(rsid.clone(), id);
id
}
};
out.push((id, c1, c2));
}
Ok(())
Expand Down
Loading
Loading