From 1d7b1a6cc775fe042bf62cba51a3f95b40ad2f98 Mon Sep 17 00:00:00 2001 From: Madhava Jay Date: Fri, 29 May 2026 21:56:59 +1000 Subject: [PATCH 1/2] more fixes --- .github/workflows/release.yml | 10 ++--- cli/src/commands/fast_allele_freq.rs | 65 ++++++++++++++++++++++------ cli/src/commands/long_emit.rs | 5 +++ cli/src/main.rs | 5 +++ cli/src/util.rs | 51 ++++++++++++++++++++++ 5 files changed, 117 insertions(+), 19 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 9312785..616feca 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -220,7 +220,7 @@ jobs: name: Publish to crates.io runs-on: ubuntu-latest needs: [build] - if: needs.build.result == 'success' + if: always() && needs.build.result == 'success' steps: - uses: actions/checkout@v4 with: @@ -239,7 +239,7 @@ jobs: name: Create GitHub Release runs-on: ubuntu-latest needs: [build, publish-crate, publish-docker-manifest] - if: needs.build.result == 'success' + if: always() && needs.build.result == 'success' && needs.publish-docker-manifest.result == 'success' steps: - uses: actions/checkout@v4 with: @@ -294,8 +294,8 @@ jobs: publish-docker: name: Publish Docker image (${{ matrix.platform }}) runs-on: ${{ matrix.runner }} - needs: [build, publish-crate] - if: needs.build.result == 'success' + needs: [build] + if: always() && needs.build.result == 'success' strategy: matrix: include: @@ -351,7 +351,7 @@ jobs: name: Publish Docker manifest runs-on: ubuntu-latest needs: [publish-docker] - if: needs.publish-docker.result == 'success' + if: always() && needs.publish-docker.result == 'success' steps: - uses: actions/checkout@v4 with: diff --git a/cli/src/commands/fast_allele_freq.rs b/cli/src/commands/fast_allele_freq.rs index 080d24d..9420101 100644 --- a/cli/src/commands/fast_allele_freq.rs +++ b/cli/src/commands/fast_allele_freq.rs @@ -72,11 +72,20 @@ pub fn run_fast_allele_freq(args: FastAlleleFreqArgs) -> Result<()> { // Per-row TSV log can't be written safely from many threads -> single thread. let force_single = args.missing_ref_log.is_some(); - let threads = if force_single { + let requested = if force_single { 1 } else { resolve_threads(args.threads) }; + // Peak RAM ~= threads x one full-panel accumulator. Cap threads so it fits the + // budget; each worker's map fills toward the whole panel regardless of file count. + let threads = cap_threads_for_ram(requested, args.max_ram_gb, shared.reference_count()); + if threads < requested { + eprintln!( + "🧠 fast-allele-freq: capping threads {requested} -> {threads} to stay under {:.0} GB", + args.max_ram_gb + ); + } eprintln!( "▶️ fast-allele-freq: {} input file(s), threads={}", tasks.len(), @@ -90,7 +99,7 @@ pub fn run_fast_allele_freq(args: FastAlleleFreqArgs) -> Result<()> { .num_threads(threads) .build() .context("build fast-allele-freq thread pool")?; - pool.install(|| run_parallel(&tasks, &shared))? + pool.install(|| run_parallel(&tasks, &shared, threads))? }; write_allele_freq(&args.allele_freq_tsv, &loci)?; @@ -145,15 +154,22 @@ fn run_sequential( fn run_parallel( tasks: &[(u32, String, PathBuf)], shared: &Arc, + threads: usize, ) -> Result<(LociMap, Counts)> { - // Each thread folds its share of files into a local map + counts, then maps - // are merged. Counts are order-independent; rsid uses the rank-min rule so the - // merge is deterministic and matches the single-threaded result. - let result = tasks - .par_iter() - .fold( - || ThreadState::new(shared.clone()), - |mut st, (rank, participant, path)| { + // Split into exactly `threads` contiguous chunks so there are exactly `threads` + // accumulators (one full-panel map each). Using par_iter().fold() instead lets + // rayon create many more accumulators than threads, so peak RAM grew with file + // count (e.g. 32 GB at 1400 files). Chunking pins peak RAM to threads x panel. + // Each chunk folds its files into a local map + counts; maps are merged. Counts + // are order-independent; rsid uses the rank-min rule so the merge is + // deterministic and matches the single-threaded result. + let chunk_size = tasks.len().div_ceil(threads.max(1)).max(1); + let chunks: Vec<&[(u32, String, PathBuf)]> = tasks.chunks(chunk_size).collect(); + let result = chunks + .into_par_iter() + .map(|chunk| { + let mut st = ThreadState::new(shared.clone()); + for (rank, participant, path) in chunk { match collect_merged_long_rows(path, participant, &mut st.resolver, &mut st.logger) { Ok((rows, _stats)) => { @@ -163,10 +179,9 @@ fn run_parallel( } Err(err) => eprintln!("⚠️ skipping {}: {err:#}", path.display()), } - st - }, - ) - .map(|st| (st.loci, st.logger.counts().clone())) + } + (st.loci, st.logger.counts().clone()) + }) .reduce( || (BTreeMap::new(), BTreeMap::new()), |(mut m1, mut c1), (m2, c2)| { @@ -178,6 +193,28 @@ fn run_parallel( Ok(result) } +/// Cap worker threads so peak RAM (~threads x one full-panel accumulator) fits the +/// budget. Estimate per-thread bytes from the reference rsid count (output loci are +/// ~1.5x that; ~1600 B/rsid covers the map entry + transient row buffers). +/// +/// `max_ram_gb <= 0` => auto: 80% of detected available RAM (honors container cgroup +/// limits). If RAM can't be detected, fall back to the requested thread count. +fn cap_threads_for_ram(requested: usize, max_ram_gb: f64, ref_count: usize) -> usize { + let budget = if max_ram_gb > 0.0 { + max_ram_gb * 1e9 + } else { + match crate::util::available_memory_bytes() { + Some(bytes) => bytes as f64 * 0.8, + None => return requested.max(1), + } + }; + let per_thread = (ref_count as f64 * 1600.0).max(1.0); + // Keep ~0.5 GB headroom for the final merged map + I/O buffers. + let usable = (budget - 0.5e9).max(per_thread); + let cap = (usable / per_thread).floor() as usize; + requested.min(cap.max(1)).max(1) +} + struct ThreadState { resolver: ReferenceResolver, logger: MissingRefLogger, diff --git a/cli/src/commands/long_emit.rs b/cli/src/commands/long_emit.rs index 39eb110..10b80b8 100644 --- a/cli/src/commands/long_emit.rs +++ b/cli/src/commands/long_emit.rs @@ -741,6 +741,11 @@ impl SharedReference { Ok(Self { by_rsid, by_pos }) } + /// Number of reference rsids (proxy for output-map size when budgeting RAM). + pub(crate) fn reference_count(&self) -> usize { + self.by_rsid.len() + } + fn resolve_rsid(&self, rsid: &str) -> Option { let rsid_norm = normalize_rsid(rsid); let rsid_int = rsid_norm.trim_start_matches("rs").parse::().ok()?; diff --git a/cli/src/main.rs b/cli/src/main.rs index e884f25..ca52148 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -268,6 +268,11 @@ pub struct FastAlleleFreqArgs { /// Worker threads for parsing (0 = auto/all cores). #[arg(long, default_value = "0")] pub threads: usize, + /// Soft cap on memory (GB): reduces threads to fit. 0 = auto (80% of detected + /// available RAM, honoring container cgroup limits). Peak RAM is roughly + /// threads x panel size; this bounds it. Set a number to hard-cap (e.g. 18). + #[arg(long, default_value = "0")] + pub max_ram_gb: f64, } #[derive(Args, Clone)] diff --git a/cli/src/util.rs b/cli/src/util.rs index d6213ee..d81dac1 100644 --- a/cli/src/util.rs +++ b/cli/src/util.rs @@ -52,3 +52,54 @@ fn is_candidate_file(path: &Path) -> bool { } true } + +/// Best-effort available memory in bytes, used to auto-size RAM budgets. +/// Honors container limits (cgroups v2/v1) first, then host memory +/// (`/proc/meminfo` on Linux, `sysctl hw.memsize` on macOS). None if unknown. +pub fn available_memory_bytes() -> Option { + // cgroups v2 (container memory limit) + if let Ok(value) = fs::read_to_string("/sys/fs/cgroup/memory.max") { + let trimmed = value.trim(); + if trimmed != "max" { + if let Ok(bytes) = trimmed.parse::() { + if bytes > 0 { + return Some(bytes); + } + } + } + } + // cgroups v1 + if let Ok(value) = fs::read_to_string("/sys/fs/cgroup/memory/memory.limit_in_bytes") { + if let Ok(bytes) = value.trim().parse::() { + // v1 uses a huge sentinel when unlimited; ignore implausibly large values. + if bytes > 0 && bytes < (1u64 << 62) { + return Some(bytes); + } + } + } + // Linux host: MemAvailable + if let Ok(meminfo) = fs::read_to_string("/proc/meminfo") { + for line in meminfo.lines() { + if let Some(rest) = line.strip_prefix("MemAvailable:") { + if let Some(kb) = rest.split_whitespace().next().and_then(|v| v.parse::().ok()) + { + return Some(kb * 1024); + } + } + } + } + // macOS: total physical memory via sysctl + if let Ok(output) = std::process::Command::new("sysctl") + .args(["-n", "hw.memsize"]) + .output() + { + if let Ok(text) = String::from_utf8(output.stdout) { + if let Ok(bytes) = text.trim().parse::() { + if bytes > 0 { + return Some(bytes); + } + } + } + } + None +} From 00203f29853b239fe705a95dd501e380e179e79d Mon Sep 17 00:00:00 2001 From: Madhava Jay Date: Fri, 29 May 2026 21:58:04 +1000 Subject: [PATCH 2/2] fix --- cli/src/util.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cli/src/util.rs b/cli/src/util.rs index d81dac1..b27b1c1 100644 --- a/cli/src/util.rs +++ b/cli/src/util.rs @@ -81,7 +81,10 @@ pub fn available_memory_bytes() -> Option { if let Ok(meminfo) = fs::read_to_string("/proc/meminfo") { for line in meminfo.lines() { if let Some(rest) = line.strip_prefix("MemAvailable:") { - if let Some(kb) = rest.split_whitespace().next().and_then(|v| v.parse::().ok()) + if let Some(kb) = rest + .split_whitespace() + .next() + .and_then(|v| v.parse::().ok()) { return Some(kb * 1024); }