Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ jobs:
name: Publish to crates.io
runs-on: ubuntu-latest
needs: [build]
if: needs.build.result == 'success'
if: always() && needs.build.result == 'success'
steps:
- uses: actions/checkout@v4
with:
Expand All @@ -239,7 +239,7 @@ jobs:
name: Create GitHub Release
runs-on: ubuntu-latest
needs: [build, publish-crate, publish-docker-manifest]
if: needs.build.result == 'success'
if: always() && needs.build.result == 'success' && needs.publish-docker-manifest.result == 'success'
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -294,8 +294,8 @@ jobs:
publish-docker:
name: Publish Docker image (${{ matrix.platform }})
runs-on: ${{ matrix.runner }}
needs: [build, publish-crate]
if: needs.build.result == 'success'
needs: [build]
if: always() && needs.build.result == 'success'
strategy:
matrix:
include:
Expand Down Expand Up @@ -351,7 +351,7 @@ jobs:
name: Publish Docker manifest
runs-on: ubuntu-latest
needs: [publish-docker]
if: needs.publish-docker.result == 'success'
if: always() && needs.publish-docker.result == 'success'
steps:
- uses: actions/checkout@v4
with:
Expand Down
65 changes: 51 additions & 14 deletions cli/src/commands/fast_allele_freq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,20 @@ pub fn run_fast_allele_freq(args: FastAlleleFreqArgs) -> Result<()> {

// Per-row TSV log can't be written safely from many threads -> single thread.
let force_single = args.missing_ref_log.is_some();
let threads = if force_single {
let requested = if force_single {
1
} else {
resolve_threads(args.threads)
};
// Peak RAM ~= threads x one full-panel accumulator. Cap threads so it fits the
// budget; each worker's map fills toward the whole panel regardless of file count.
let threads = cap_threads_for_ram(requested, args.max_ram_gb, shared.reference_count());
if threads < requested {
eprintln!(
"🧠 fast-allele-freq: capping threads {requested} -> {threads} to stay under {:.0} GB",
args.max_ram_gb
);
}
eprintln!(
"▶️ fast-allele-freq: {} input file(s), threads={}",
tasks.len(),
Expand All @@ -90,7 +99,7 @@ pub fn run_fast_allele_freq(args: FastAlleleFreqArgs) -> Result<()> {
.num_threads(threads)
.build()
.context("build fast-allele-freq thread pool")?;
pool.install(|| run_parallel(&tasks, &shared))?
pool.install(|| run_parallel(&tasks, &shared, threads))?
};

write_allele_freq(&args.allele_freq_tsv, &loci)?;
Expand Down Expand Up @@ -145,15 +154,22 @@ fn run_sequential(
fn run_parallel(
tasks: &[(u32, String, PathBuf)],
shared: &Arc<SharedReference>,
threads: usize,
) -> Result<(LociMap, Counts)> {
// Each thread folds its share of files into a local map + counts, then maps
// are merged. Counts are order-independent; rsid uses the rank-min rule so the
// merge is deterministic and matches the single-threaded result.
let result = tasks
.par_iter()
.fold(
|| ThreadState::new(shared.clone()),
|mut st, (rank, participant, path)| {
// Split into exactly `threads` contiguous chunks so there are exactly `threads`
// accumulators (one full-panel map each). Using par_iter().fold() instead lets
// rayon create many more accumulators than threads, so peak RAM grew with file
// count (e.g. 32 GB at 1400 files). Chunking pins peak RAM to threads x panel.
// Each chunk folds its files into a local map + counts; maps are merged. Counts
// are order-independent; rsid uses the rank-min rule so the merge is
// deterministic and matches the single-threaded result.
let chunk_size = tasks.len().div_ceil(threads.max(1)).max(1);
let chunks: Vec<&[(u32, String, PathBuf)]> = tasks.chunks(chunk_size).collect();
let result = chunks
.into_par_iter()
.map(|chunk| {
let mut st = ThreadState::new(shared.clone());
for (rank, participant, path) in chunk {
match collect_merged_long_rows(path, participant, &mut st.resolver, &mut st.logger)
{
Ok((rows, _stats)) => {
Expand All @@ -163,10 +179,9 @@ fn run_parallel(
}
Err(err) => eprintln!("⚠️ skipping {}: {err:#}", path.display()),
}
st
},
)
.map(|st| (st.loci, st.logger.counts().clone()))
}
(st.loci, st.logger.counts().clone())
})
.reduce(
|| (BTreeMap::new(), BTreeMap::new()),
|(mut m1, mut c1), (m2, c2)| {
Expand All @@ -178,6 +193,28 @@ fn run_parallel(
Ok(result)
}

/// Cap worker threads so peak RAM (~threads x one full-panel accumulator) fits the
/// budget. Estimate per-thread bytes from the reference rsid count (output loci are
/// ~1.5x that; ~1600 B/rsid covers the map entry + transient row buffers).
///
/// `max_ram_gb <= 0` => auto: 80% of detected available RAM (honors container cgroup
/// limits). If RAM can't be detected, fall back to the requested thread count.
fn cap_threads_for_ram(requested: usize, max_ram_gb: f64, ref_count: usize) -> usize {
let budget = if max_ram_gb > 0.0 {
max_ram_gb * 1e9
} else {
match crate::util::available_memory_bytes() {
Some(bytes) => bytes as f64 * 0.8,
None => return requested.max(1),
}
};
let per_thread = (ref_count as f64 * 1600.0).max(1.0);
// Keep ~0.5 GB headroom for the final merged map + I/O buffers.
let usable = (budget - 0.5e9).max(per_thread);
let cap = (usable / per_thread).floor() as usize;
requested.min(cap.max(1)).max(1)
}

struct ThreadState {
resolver: ReferenceResolver,
logger: MissingRefLogger,
Expand Down
5 changes: 5 additions & 0 deletions cli/src/commands/long_emit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,11 @@ impl SharedReference {
Ok(Self { by_rsid, by_pos })
}

/// Number of reference rsids (proxy for output-map size when budgeting RAM).
pub(crate) fn reference_count(&self) -> usize {
self.by_rsid.len()
}

fn resolve_rsid(&self, rsid: &str) -> Option<ReferenceVariant> {
let rsid_norm = normalize_rsid(rsid);
let rsid_int = rsid_norm.trim_start_matches("rs").parse::<i64>().ok()?;
Expand Down
5 changes: 5 additions & 0 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,11 @@ pub struct FastAlleleFreqArgs {
/// Worker threads for parsing (0 = auto/all cores).
#[arg(long, default_value = "0")]
pub threads: usize,
/// Soft cap on memory (GB): reduces threads to fit. 0 = auto (80% of detected
/// available RAM, honoring container cgroup limits). Peak RAM is roughly
/// threads x panel size; this bounds it. Set a number to hard-cap (e.g. 18).
#[arg(long, default_value = "0")]
pub max_ram_gb: f64,
}

#[derive(Args, Clone)]
Expand Down
54 changes: 54 additions & 0 deletions cli/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,57 @@ fn is_candidate_file(path: &Path) -> bool {
}
true
}

/// Best-effort available memory in bytes, used to auto-size RAM budgets.
/// Honors container limits (cgroups v2/v1) first, then host memory
/// (`/proc/meminfo` on Linux, `sysctl hw.memsize` on macOS). None if unknown.
pub fn available_memory_bytes() -> Option<u64> {
// cgroups v2 (container memory limit)
if let Ok(value) = fs::read_to_string("/sys/fs/cgroup/memory.max") {
let trimmed = value.trim();
if trimmed != "max" {
if let Ok(bytes) = trimmed.parse::<u64>() {
if bytes > 0 {
return Some(bytes);
}
}
}
}
// cgroups v1
if let Ok(value) = fs::read_to_string("/sys/fs/cgroup/memory/memory.limit_in_bytes") {
if let Ok(bytes) = value.trim().parse::<u64>() {
// v1 uses a huge sentinel when unlimited; ignore implausibly large values.
if bytes > 0 && bytes < (1u64 << 62) {
return Some(bytes);
}
}
}
// Linux host: MemAvailable
if let Ok(meminfo) = fs::read_to_string("/proc/meminfo") {
for line in meminfo.lines() {
if let Some(rest) = line.strip_prefix("MemAvailable:") {
if let Some(kb) = rest
.split_whitespace()
.next()
.and_then(|v| v.parse::<u64>().ok())
{
return Some(kb * 1024);
}
}
}
}
// macOS: total physical memory via sysctl
if let Ok(output) = std::process::Command::new("sysctl")
.args(["-n", "hw.memsize"])
.output()
{
if let Ok(text) = String::from_utf8(output.stdout) {
if let Ok(bytes) = text.trim().parse::<u64>() {
if bytes > 0 {
return Some(bytes);
}
}
}
}
None
}
Loading