diff --git a/.gitignore b/.gitignore index 79bdd61..53fbaf4 100644 --- a/.gitignore +++ b/.gitignore @@ -34,6 +34,9 @@ fastp *.out *.app +# Python +__pycache__/ + # Editor Config .vscode diff --git a/benchmark/README.md b/benchmark/README.md new file mode 100644 index 0000000..3908dbe --- /dev/null +++ b/benchmark/README.md @@ -0,0 +1,114 @@ +# fastp Benchmark Suite + +End-to-end benchmark for fastp: measures throughput, peak memory, and verifies output correctness across I/O format combinations and feature modes. + +## Quick Start + +```bash +# Run all I/O format modes with generated data (10M pairs) +python3 -m benchmark --opt ./fastp + +# Compare optimized build against baseline +python3 -m benchmark --opt ./fastp_opt --orig ./fastp_baseline + +# Use your own FASTQ files +python3 -m benchmark --r1 /data/sample_R1.fq.gz --r2 /data/sample_R2.fq.gz --opt ./fastp +``` + +## Modes + +### I/O Format Modes (default: `all`) + +Test every input/output compression combination: + +| Mode | Type | Input | Output | +|------------|------|--------|--------| +| fq-fq | PE | .fq | .fq | +| fq-gz | PE | .fq | .fq.gz | +| gz-fq | PE | .fq.gz | .fq | +| gz-gz | PE | .fq.gz | .fq.gz | +| se-fq-fq | SE | .fq | .fq | +| se-fq-gz | SE | .fq | .fq.gz | +| se-gz-fq | SE | .fq.gz | .fq | +| se-gz-gz | SE | .fq.gz | .fq.gz | +| stdin-stdout| SE | stdin | stdout | + +### Feature Modes (`all-feat`) + +Exercise specific fastp processing paths: + +| Mode | fastp Flags | Data | Tests | +|------------|-------------------------------------------|-------------|------------------------------------| +| merge | `--merge --correction` | overlapping | PE merge + base correction (#676) | +| correction | `--correction` | overlapping | overlap analysis + base correction | +| dedup-pe | `--dedup` | random | duplicate detection (PE) | +| dedup-se | `--dedup` | random | duplicate detection (SE) | +| qualcut-pe | `--cut_front --cut_tail -W 4 -M 20` | random | sliding window quality cut (PE) | +| qualcut-se | `--cut_front --cut_tail -W 4 -M 20` | random | sliding window quality cut (SE) | +| ht-pe | (default) `-w 32` | random | high-thread concurrency stress | + +### Mode Groups + +| Alias | Expands To | +|------------|------------------------------------------------| +| all | all 9 I/O format modes (default) | +| all-pe | PE format modes only | +| all-se | SE format modes + stdin-stdout | +| all-feat | all 7 feature modes | +| everything | all I/O + all feature modes (16 total) | + +```bash +python3 -m benchmark --mode all-feat --opt ./fastp +python3 -m benchmark --mode merge,correction --opt ./fastp --orig ./fastp_v110 +python3 -m benchmark --mode everything --opt ./fastp +``` + +## Options + +``` +--opt PATH Optimized binary path (default: /tmp/fastp_opt) +--orig PATH Baseline binary for comparison (optional) +--r1 PATH Custom R1 input (.fq or .fq.gz), skips data generation +--r2 PATH Custom R2 input (.fq or .fq.gz), required for PE modes +--mode MODE Comma-separated mode list (default: all) +--pairs N Read pairs to generate (default: 10,000,000) +--threads N Worker threads, 0 = auto (default: 0) +--runs N Repeat count, reports median (default: 3) +--seed N RNG seed for data generation (default: 2026) +--json PATH Load saved JSON results and print summary table +--merge A B Merge two opt-only result JSONs into comparison table +``` + +## Test Data + +When `--r1`/`--r2` are not provided, the benchmark generates synthetic data: + +- **Random PE data** (10M pairs x 150bp): independent R1/R2 with realistic quality profiles. Used for I/O format modes, dedup, qualcut, and high-thread tests. +- **Overlapping PE data** (10M pairs x 150bp): R1 and R2 derived from the same fragment (insert ~220bp, overlap ~80bp). R2 has injected mismatches at low quality (Q10-14) to trigger base correction. Used for merge and correction modes. + +Generated data is cached in `/tmp/fastp_benchmark/data/` and reused across runs. Delete the directory to regenerate. + +When custom files are provided: +- Format conversion is automatic (decompress .gz to .fq or compress .fq to .gz as needed) +- Merge/correction modes use the custom data directly (real sequencing data has natural overlap) + +## Output + +- Per-run timing and peak RSS printed live +- Markdown summary table at the end +- JSON results saved to `/tmp/fastp_benchmark/results.json` +- MD5 verification: when baseline is provided, checks that optimized output matches baseline byte-for-byte + +## Module Structure + +``` +benchmark/ +├── __main__.py # python3 -m benchmark entry point +├── run.py # CLI argument parsing and orchestration +├── modes.py # mode definitions, constants, path config +├── datagen.py # synthetic FASTQ data generation +├── sysinfo.py # hardware/OS info collection +├── runner.py # benchmark execution (build_cmd, run, measure) +├── verify.py # output MD5 verification +└── report.py # summary table formatting +``` diff --git a/benchmark/__init__.py b/benchmark/__init__.py new file mode 100644 index 0000000..067ca38 --- /dev/null +++ b/benchmark/__init__.py @@ -0,0 +1 @@ +"""fastp end-to-end benchmark suite.""" diff --git a/benchmark/__main__.py b/benchmark/__main__.py new file mode 100644 index 0000000..55dfcf5 --- /dev/null +++ b/benchmark/__main__.py @@ -0,0 +1,4 @@ +"""Allow running as: python3 -m benchmark [OPTIONS]""" +from .run import main + +main() diff --git a/benchmark/datagen.py b/benchmark/datagen.py new file mode 100644 index 0000000..1ba4c8b --- /dev/null +++ b/benchmark/datagen.py @@ -0,0 +1,134 @@ +"""FASTQ test data generation for benchmarking.""" +import gzip +import random +import time +from pathlib import Path + + +def generate_data(r1_gz: Path, r2_gz: Path, num_pairs: int, seed: int = 2026): + """Generate random paired-end FASTQ reads (independent R1/R2).""" + READ_LEN = 150 + BATCH = 100_000 + BASES = b"ACGT" + QUAL_POOL = 512 + rng = random.Random(seed) + + pool = [] + for _ in range(QUAL_POOL): + q = bytearray(READ_LEN) + for j in range(READ_LEN): + if j < 5 or j > READ_LEN - 10: + q[j] = rng.randint(20, 35) + 33 + else: + q[j] = rng.randint(30, 40) + 33 + pool.append(bytes(q)) + + t0 = time.time() + written = 0 + with gzip.open(r1_gz, "wb", compresslevel=1) as f1, \ + gzip.open(r2_gz, "wb", compresslevel=1) as f2: + while written < num_pairs: + n = min(BATCH, num_pairs - written) + b1 = bytearray() + b2 = bytearray() + for i in range(n): + rid = written + i + 1 + name = f"@SIM:BENCH:1:{1101 + rid // 10000000}:{rid % 50000}:{(rid * 7) % 50000}".encode() + s1 = bytes(rng.choices(BASES, k=READ_LEN)) + s2 = bytes(rng.choices(BASES, k=READ_LEN)) + q1 = pool[rng.randint(0, QUAL_POOL - 1)] + q2 = pool[rng.randint(0, QUAL_POOL - 1)] + b1 += name + b" 1:N:0:ATCG\n" + s1 + b"\n+\n" + q1 + b"\n" + b2 += name + b" 2:N:0:ATCG\n" + s2 + b"\n+\n" + q2 + b"\n" + f1.write(bytes(b1)) + f2.write(bytes(b2)) + written += n + if written % 1_000_000 == 0: + e = time.time() - t0 + eta = (num_pairs - written) / (written / e) + print(f" {100 * written / num_pairs:5.1f}% {written / 1e6:.0f}M pairs" + f" {written / e / 1e6:.2f}M/s ETA {eta:.0f}s", flush=True) + + print(f" Generated in {time.time() - t0:.1f}s") + + +def generate_merge_data(r1_gz: Path, r2_gz: Path, num_pairs: int, seed: int = 2026): + """Generate overlapping PE reads for merge/correction testing. + + Each pair derives from a random fragment with insert size ~220bp (sd=30). + R2 is the reverse complement of the fragment's 3' end. Mismatches are + injected in the overlap region with low quality on R2 to trigger base + correction (R1 Q30+ vs R2 Q10-14 at mismatch positions). + """ + READ_LEN = 150 + BATCH = 100_000 + BASES = b"ACGT" + COMP = bytes.maketrans(b"ACGT", b"TGCA") + QUAL_POOL = 512 + INSERT_MEAN = 220 + INSERT_SD = 30 + MIN_INSERT = READ_LEN + 30 # need >= 30bp overlap for fastp + MAX_INSERT = 2 * READ_LEN - 1 + MISMATCH_RATE = 0.015 + + rng = random.Random(seed) + + pool = [] + for _ in range(QUAL_POOL): + q = bytearray(READ_LEN) + for j in range(READ_LEN): + if j < 5 or j > READ_LEN - 10: + q[j] = rng.randint(25, 35) + 33 + else: + q[j] = rng.randint(30, 40) + 33 + pool.append(bytes(q)) + + def revcomp(seq: bytes) -> bytes: + return seq.translate(COMP)[::-1] + + t0 = time.time() + written = 0 + with gzip.open(r1_gz, "wb", compresslevel=1) as f1, \ + gzip.open(r2_gz, "wb", compresslevel=1) as f2: + while written < num_pairs: + n = min(BATCH, num_pairs - written) + b1 = bytearray() + b2 = bytearray() + for i in range(n): + rid = written + i + 1 + name = f"@SIM:MERGE:1:{1101 + rid // 10000000}:{rid % 50000}:{(rid * 7) % 50000}".encode() + + # Sample insert size from truncated normal distribution + insert = int(rng.gauss(INSERT_MEAN, INSERT_SD)) + insert = max(MIN_INSERT, min(MAX_INSERT, insert)) + + # Generate fragment and derive reads + frag = bytearray(rng.choices(BASES, k=insert)) + s1 = bytes(frag[:READ_LEN]) + s2 = bytearray(revcomp(bytes(frag[insert - READ_LEN:]))) + + q1 = bytearray(pool[rng.randint(0, QUAL_POOL - 1)]) + q2 = bytearray(pool[rng.randint(0, QUAL_POOL - 1)]) + + # Inject mismatches in R2 overlap with low quality to trigger correction + overlap_len = 2 * READ_LEN - insert + n_mm = max(1, int(overlap_len * MISMATCH_RATE)) + n_mm = min(n_mm, min(4, int(overlap_len * 0.19))) + overlap_start = READ_LEN - overlap_len + for p in rng.sample(range(overlap_start, READ_LEN), n_mm): + alts = [b for b in BASES if b != s2[p]] + s2[p] = rng.choice(alts) + q2[p] = rng.randint(10 + 33, 14 + 33) # Q10-14 + + b1 += name + b" 1:N:0:ATCG\n" + s1 + b"\n+\n" + bytes(q1) + b"\n" + b2 += name + b" 2:N:0:ATCG\n" + bytes(s2) + b"\n+\n" + bytes(q2) + b"\n" + f1.write(bytes(b1)) + f2.write(bytes(b2)) + written += n + if written % 1_000_000 == 0: + e = time.time() - t0 + eta = (num_pairs - written) / (written / e) + print(f" {100 * written / num_pairs:5.1f}% {written / 1e6:.0f}M pairs" + f" {written / e / 1e6:.2f}M/s ETA {eta:.0f}s", flush=True) + + print(f" Generated in {time.time() - t0:.1f}s") diff --git a/benchmark/modes.py b/benchmark/modes.py new file mode 100644 index 0000000..4dfd60d --- /dev/null +++ b/benchmark/modes.py @@ -0,0 +1,82 @@ +"""Mode definitions, constants, and path configuration.""" +import os +from pathlib import Path + +# ── Paths ──────────────────────────────────────────────────────────────────── + +BENCH_DIR = Path("/tmp/fastp_benchmark") +DATA_DIR = BENCH_DIR / "data" +OUT_DIR = BENCH_DIR / "output" +RESULTS_JSON = BENCH_DIR / "results.json" + +CPU_COUNT = os.cpu_count() or 4 + +# ── Mode definitions ───────────────────────────────────────────────────────── + +PE_MODES = ["fq-fq", "fq-gz", "gz-fq", "gz-gz"] +SE_MODES = ["se-fq-fq", "se-fq-gz", "se-gz-fq", "se-gz-gz"] +ALL_MODES = PE_MODES + SE_MODES + ["stdin-stdout"] + +# Feature test modes: exercise specific fastp features beyond default filtering +FEAT_MODES = [ + "merge", "correction", "dedup-pe", "dedup-se", + "qualcut-pe", "qualcut-se", "ht-pe", +] + +FEAT_DEFS = { + "merge": {"base": "pe", "in_fmt": "gz", "out_fmt": "gz", "data": "merge", + "extra": ["--merge", "--correction"]}, + "correction": {"base": "pe", "in_fmt": "gz", "out_fmt": "gz", "data": "merge", + "extra": ["--correction"]}, + "dedup-pe": {"base": "pe", "in_fmt": "gz", "out_fmt": "gz", + "extra": ["--dedup"]}, + "dedup-se": {"base": "se", "in_fmt": "gz", "out_fmt": "gz", + "extra": ["--dedup"]}, + "qualcut-pe": {"base": "pe", "in_fmt": "gz", "out_fmt": "gz", + "extra": ["--cut_front", "--cut_tail", "-W", "4", "-M", "20"]}, + "qualcut-se": {"base": "se", "in_fmt": "gz", "out_fmt": "gz", + "extra": ["--cut_front", "--cut_tail", "-W", "4", "-M", "20"]}, + "ht-pe": {"base": "pe", "in_fmt": "gz", "out_fmt": "gz", + "extra": [], "threads": 32}, +} + +MODE_ALIASES = { + "all-pe": PE_MODES, + "all-se": SE_MODES + ["stdin-stdout"], + "all-feat": FEAT_MODES, + "everything": ALL_MODES + FEAT_MODES, +} + + +def parse_mode(mode: str) -> tuple[str, str, str]: + """Parse mode string -> (type, in_fmt, out_fmt). + + Returns ("stdin", "", "") for stdin-stdout, + ("se", "fq"|"gz", "fq"|"gz") for se-* modes, + ("pe", "fq"|"gz", "fq"|"gz") for bare fq-fq etc. + Feature modes (merge, correction, etc.) resolve via FEAT_DEFS. + """ + if mode in FEAT_DEFS: + d = FEAT_DEFS[mode] + return (d["base"], d["in_fmt"], d["out_fmt"]) + if mode == "stdin-stdout": + return ("stdin", "", "") + if mode.startswith("se-"): + parts = mode[3:].split("-") + return ("se", parts[0], parts[1]) + parts = mode.split("-") + return ("pe", parts[0], parts[1]) + + +def auto_threads(mode: str) -> int: + """Calculate worker threads, reserving reader/writer for each mode. + + PE: 2 readers (L+R) + 2 writers (L+R) = reserve 4 + SE: 1 reader + 1 writer = reserve 2 + Feature modes may override thread count (e.g. ht-pe forces 32). + """ + if mode in FEAT_DEFS and "threads" in FEAT_DEFS[mode]: + return FEAT_DEFS[mode]["threads"] + mtype = parse_mode(mode)[0] + reserved = 4 if mtype == "pe" else 2 + return max(1, CPU_COUNT - reserved) diff --git a/benchmark/report.py b/benchmark/report.py new file mode 100644 index 0000000..ef2d2b6 --- /dev/null +++ b/benchmark/report.py @@ -0,0 +1,167 @@ +"""Summary table generation.""" +import subprocess + +from .modes import parse_mode, auto_threads + + +def get_version(binary: str) -> str: + """Extract version string from fastp binary.""" + try: + r = subprocess.run([binary, "--version"], capture_output=True, text=True, timeout=5) + out = (r.stderr + r.stdout).strip() + for line in out.splitlines(): + if "fastp" in line.lower(): + return line.strip().replace("fastp ", "") + return out.splitlines()[0] if out else "?" + except Exception: + return "?" + + +def print_summary(report: dict): + """Print flat markdown table from full report dict.""" + cfg = report["config"] + results = report["modes"] + num_pairs = cfg["pairs"] + + si = report.get("system", {}) + has_baseline = cfg.get("orig") is not None + orig_ver = get_version(cfg["orig"]) if has_baseline else None + opt_ver = get_version(cfg["opt"]) + + from .sysinfo import format_cores + print() + print("## fastp Benchmark") + print() + cores_str = format_cores(si) + if "cpu" in si: + print(f"- **CPU:** {si['cpu']} **Cores:** {cores_str}") + else: + print(f"- **Cores:** {cores_str}") + mem_parts = [] + if "mem_total_gb" in si: + mem_parts.append(f"{si['mem_total_gb']}GB") + if "mem_type" in si: + mem_parts.append(si["mem_type"]) + if "mem_avail_gb" in si: + mem_parts.append(f"{si['mem_avail_gb']}GB avail") + if mem_parts: + print(f"- **Mem:** {', '.join(mem_parts)}") + if "load_avg" in si: + la = si["load_avg"] + print(f"- **Load:** {la[0]} (1m) {la[1]} (5m) {la[2]} (15m)") + disk = si.get("disk", {}) + if disk: + disk_parts = [disk.get("model", "?"), disk.get("size", "")] + proto = disk.get("protocol", "") + iface = disk.get("interface", "") + link = disk.get("link_speed", "") + if proto: + conn = proto + if iface and iface != proto: + conn += f"/{iface}" + if link: + conn += f" {link}" + disk_parts.append(conn) + print(f"- **Disk:** {', '.join(p for p in disk_parts if p)}") + if "os" in si: + print(f"- **OS:** {si['os']} ({si.get('arch', '?')})") + print(f"- **Pairs:** {num_pairs:,} **Runs:** {cfg['runs']} **Seed:** {cfg['seed']}") + if has_baseline: + print(f"- **Orig:** `{cfg['orig']}` ({orig_ver})") + print(f"- **Opt:** `{cfg['opt']}` ({opt_ver})") + print() + + # build rows + global_w = cfg.get("threads") + rows = [] + for mode, entry in results.items(): + w = entry.get("threads", global_w or auto_threads(mode)) + t_opt = entry["opt_median"] + tp_opt = num_pairs / t_opt / 1e6 + mem_opt = entry.get("opt_peak_mb", 0) + mtype = parse_mode(mode)[0] + io_type = "PE" if mtype == "pe" else "SE" + + verify = entry.get("verify", {}) + v_status = verify.get("status", "n/a") + v_tag = {"pass": "PASS", "fail": "FAIL", "ok": "OK"}.get(v_status, "n/a") + + if has_baseline: + t_orig = entry["orig_median"] + speedup = t_orig / t_opt + tp_orig = num_pairs / t_orig / 1e6 + saved = t_orig - t_opt + mem_orig = entry.get("orig_peak_mb", 0) + mem_ratio = f"{mem_opt / mem_orig:.2f}x" if mem_orig else "-" + rows.append((mode, io_type, w, t_orig, t_opt, speedup, tp_orig, tp_opt, saved, + mem_orig, mem_opt, mem_ratio, v_tag)) + else: + rows.append((mode, io_type, w, t_opt, tp_opt, mem_opt, v_tag)) + + if has_baseline: + hdr = ("Mode", "Type", "W", "Orig (s)", "Opt (s)", "Speedup", + "Orig (M/s)", "Opt (M/s)", "Saved (s)", + "Orig MB", "Opt MB", "Mem", "Verify") + fmt = [ + lambda r: r[0], + lambda r: r[1], + lambda r: str(r[2]), + lambda r: f"{r[3]:.2f}", + lambda r: f"{r[4]:.2f}", + lambda r: f"{r[5]:.2f}x", + lambda r: f"{r[6]:.2f}", + lambda r: f"{r[7]:.2f}", + lambda r: f"{r[8]:.2f}", + lambda r: str(r[9]) if r[9] else "-", + lambda r: str(r[10]) if r[10] else "-", + lambda r: r[11], + lambda r: r[12], + ] + else: + hdr = ("Mode", "Type", "W", "Time (s)", "M/s", "MB", "Verify") + fmt = [ + lambda r: r[0], + lambda r: r[1], + lambda r: str(r[2]), + lambda r: f"{r[3]:.2f}", + lambda r: f"{r[4]:.2f}", + lambda r: str(r[5]) if r[5] else "-", + lambda r: r[6], + ] + + cols = [] + for i, h in enumerate(hdr): + w = len(h) + for row in rows: + w = max(w, len(fmt[i](row))) + cols.append(w) + + def fmt_row(cells): + parts = [] + for i, c in enumerate(cells): + if i == 0: + parts.append(f" {c:<{cols[i]}} ") + else: + parts.append(f" {c:>{cols[i]}} ") + return "|" + "|".join(parts) + "|" + + print(fmt_row(hdr)) + seps = ["-" * (cols[i] + 2) for i in range(len(hdr))] + print("|" + "|".join(seps) + "|") + for row in rows: + cells = [fmt[i](row) for i in range(len(hdr))] + print(fmt_row(cells)) + + any_fail = False + for mode, entry in results.items(): + verify = entry.get("verify", {}) + if verify.get("status") == "fail": + if not any_fail: + print() + print("**Verification failures:**") + any_fail = True + for fname, fv in verify.get("files", {}).items(): + if not fv["match"]: + print(f"- {mode} {fname}: orig=`{fv['orig_md5'][:16]}..` opt=`{fv['opt_md5'][:16]}..`") + + print() diff --git a/benchmark/run.py b/benchmark/run.py new file mode 100644 index 0000000..a159942 --- /dev/null +++ b/benchmark/run.py @@ -0,0 +1,350 @@ +"""CLI entry point for fastp benchmark suite.""" +import argparse +import gzip +import json +import os +import shutil +import sys +from pathlib import Path + +from .modes import ( + ALL_MODES, CPU_COUNT, FEAT_DEFS, MODE_ALIASES, + DATA_DIR, OUT_DIR, RESULTS_JSON, + parse_mode, auto_threads, +) +from .datagen import generate_data, generate_merge_data +from .sysinfo import system_info, format_cores +from .runner import run_bench, kill_stale_fastp +from .verify import verify_outputs +from .report import print_summary + + +def human_size(path: Path) -> str: + sz = path.stat().st_size + for unit in ("B", "KB", "MB", "GB"): + if sz < 1024: + return f"{sz:.1f} {unit}" + sz /= 1024 + return f"{sz:.1f} TB" + + +def banner(text: str, char: str = "=", width: int = 44): + print(char * width) + print(f" {text}") + print(char * width) + + +def main(): + parser = argparse.ArgumentParser(description="fastp end-to-end benchmark") + parser.add_argument("--json", metavar="PATH", + help="Load results from JSON and print summary table (skip benchmark)") + parser.add_argument("--merge", nargs=2, metavar=("ORIG_JSON", "OPT_JSON"), + help="Merge two opt-only JSONs into a comparison table") + parser.add_argument("--mode", default="all", + help="Comma-separated modes: fq-fq,fq-gz,gz-fq,gz-gz," + "se-fq-fq,se-fq-gz,se-gz-fq,se-gz-gz,stdin-stdout," + "merge,correction,dedup-pe,dedup-se,qualcut-pe,qualcut-se,ht-pe," + "all,all-pe,all-se,all-feat,everything (default: all)") + parser.add_argument("--r1", metavar="PATH", + help="Custom R1 input (.fq or .fq.gz). Skips data generation.") + parser.add_argument("--r2", metavar="PATH", + help="Custom R2 input (.fq or .fq.gz). Required for PE modes.") + parser.add_argument("--pairs", type=int, default=10_000_000, + help="Number of read pairs (default: 10000000)") + parser.add_argument("--threads", "-w", type=int, default=0, + help="Worker threads (default: auto, cpu_count minus reader/writer per mode)") + parser.add_argument("--runs", type=int, default=3, + help="Repeat count, median reported (default: 3)") + parser.add_argument("--seed", type=int, default=2026, + help="Random seed for data generation (default: 2026)") + parser.add_argument("--orig", default=None, + help="Baseline binary path (default: none, opt-only mode)") + parser.add_argument("--opt", default="/tmp/fastp_opt", + help="Optimized binary path (default: /tmp/fastp_opt)") + args = parser.parse_args() + + # --- Merge mode: combine two opt-only JSONs into comparison --- + if args.merge: + orig_p, opt_p = Path(args.merge[0]), Path(args.merge[1]) + for p in (orig_p, opt_p): + if not p.exists(): + print(f"File not found: {p}", file=sys.stderr) + sys.exit(1) + orig_report = json.loads(orig_p.read_text()) + opt_report = json.loads(opt_p.read_text()) + merged = { + "system": opt_report.get("system", orig_report.get("system", {})), + "config": { + **opt_report["config"], + "orig": orig_report["config"]["opt"], + }, + "modes": {}, + } + for mode in opt_report["modes"]: + opt_entry = opt_report["modes"][mode] + orig_entry = orig_report["modes"].get(mode, {}) + merged["modes"][mode] = { + "threads": opt_entry.get("threads", orig_entry.get("threads")), + "opt_median": opt_entry["opt_median"], + "opt_peak_mb": opt_entry.get("opt_peak_mb", 0), + "orig_median": orig_entry.get("opt_median", 0), + "orig_peak_mb": orig_entry.get("opt_peak_mb", 0), + "verify": opt_entry.get("verify", {}), + } + print_summary(merged) + return + + # --- JSON-only mode: load and display --- + if args.json: + p = Path(args.json) + if not p.exists(): + print(f"File not found: {p}", file=sys.stderr) + sys.exit(1) + report = json.loads(p.read_text()) + print_summary(report) + return + + # Expand mode aliases + raw_modes = args.mode.split(",") + modes = [] + for m in raw_modes: + m = m.strip() + if m == "all": + modes.extend(ALL_MODES) + elif m in MODE_ALIASES: + modes.extend(MODE_ALIASES[m]) + else: + modes.append(m) + seen = set() + modes = [m for m in modes if not (m in seen or seen.add(m))] + + fastp_orig = args.orig + fastp_opt = args.opt + + if not os.access(fastp_opt, os.X_OK): + print(f"Missing binary: {fastp_opt}", file=sys.stderr) + sys.exit(1) + if fastp_orig and not os.access(fastp_orig, os.X_OK): + print(f"Missing binary: {fastp_orig}", file=sys.stderr) + sys.exit(1) + has_baseline = fastp_orig is not None + + DATA_DIR.mkdir(parents=True, exist_ok=True) + OUT_DIR.mkdir(parents=True, exist_ok=True) + + # --- Resolve per-mode thread counts --- + threads_map: dict[str, int] = {} + for mode in modes: + threads_map[mode] = args.threads if args.threads > 0 else auto_threads(mode) + + # --- System info --- + sysinfo = system_info() + + # --- Header --- + banner("fastp End-to-End Benchmark") + if "cpu" in sysinfo: + print(f" CPU: {sysinfo['cpu']}") + print(f" Cores: {format_cores(sysinfo)}") + mem_str = f"{sysinfo.get('mem_total_gb', '?')}GB" + if "mem_type" in sysinfo: + mem_str += f" {sysinfo['mem_type']}" + if "mem_avail_gb" in sysinfo: + mem_str += f", {sysinfo['mem_avail_gb']}GB avail" + print(f" Memory: {mem_str}") + disk = sysinfo.get("disk", {}) + if disk: + d_parts = [disk.get("model", "?"), disk.get("size", "")] + proto = disk.get("protocol", "") + iface = disk.get("interface", "") + link = disk.get("link_speed", "") + if proto: + conn = proto + if iface and iface != proto: + conn += f"/{iface}" + if link: + conn += f" {link}" + d_parts.append(conn) + print(f" Disk: {', '.join(p for p in d_parts if p)}") + if "load_avg" in sysinfo: + la = sysinfo["load_avg"] + print(f" Load: {la[0]} (1m) {la[1]} (5m) {la[2]} (15m)") + print(f" OS: {sysinfo['os']} ({sysinfo['arch']})") + if args.r1: + print(f" Data: custom ({args.r1})") + else: + print(f" Pairs: {args.pairs}") + unique_w = sorted(set(threads_map.values())) + if len(unique_w) == 1: + print(f" Threads: {unique_w[0]}") + else: + print(f" Threads: auto (PE={threads_map.get(modes[0], unique_w[0])}" + f", SE={threads_map.get('stdin-stdout', unique_w[-1])})") + print(f" Runs: {args.runs} (median reported)") + print(f" Modes: {' '.join(modes)}") + print(f" Seed: {args.seed}") + print("=" * 44) + print() + + # --- Data files --- + custom_data = args.r1 is not None + need_fq = any(parse_mode(m)[1] == "fq" or m == "stdin-stdout" for m in modes) + need_gz = any(parse_mode(m)[1] == "gz" for m in modes) + needs_merge = any( + m in FEAT_DEFS and FEAT_DEFS[m].get("data") == "merge" for m in modes + ) + + if custom_data: + r1_src = Path(args.r1) + r2_src = Path(args.r2) if args.r2 else None + if not r1_src.exists(): + print(f"File not found: {r1_src}", file=sys.stderr) + sys.exit(1) + if r2_src and not r2_src.exists(): + print(f"File not found: {r2_src}", file=sys.stderr) + sys.exit(1) + has_pe = any(parse_mode(m)[0] == "pe" for m in modes) + if has_pe and not r2_src: + print("ERROR: PE modes require --r2", file=sys.stderr) + sys.exit(1) + + is_gz = r1_src.name.endswith(".gz") + if is_gz: + r1_gz, r2_gz = r1_src, (r2_src or r1_src) + r1_fq = DATA_DIR / "custom_R1.fq" + r2_fq = DATA_DIR / "custom_R2.fq" + else: + r1_fq, r2_fq = r1_src, (r2_src or r1_src) + r1_gz = DATA_DIR / "custom_R1.fq.gz" + r2_gz = DATA_DIR / "custom_R2.fq.gz" + + print(f"[data] Custom R1: {r1_src} ({human_size(r1_src)})") + if r2_src: + print(f" Custom R2: {r2_src} ({human_size(r2_src)})") + + if need_fq and not r1_fq.exists(): + print("[data] Decompressing custom data to plain FASTQ...") + for gz_p, fq_p in ((r1_gz, r1_fq), (r2_gz, r2_fq)): + with gzip.open(gz_p, "rb") as fi, open(fq_p, "wb") as fo: + shutil.copyfileobj(fi, fo) + if need_gz and not r1_gz.exists(): + print("[data] Compressing custom data to .gz...") + for fq_p, gz_p in ((r1_fq, r1_gz), (r2_fq, r2_gz)): + with open(fq_p, "rb") as fi, gzip.open(gz_p, "wb", compresslevel=1) as fo: + shutil.copyfileobj(fi, fo) + + merge_r1_gz = r1_gz + merge_r2_gz = r2_gz + if needs_merge: + print("[data] Merge/correction will use custom data (assumes natural overlap)") + else: + r1_gz = DATA_DIR / "bench_R1.fq.gz" + r2_gz = DATA_DIR / "bench_R2.fq.gz" + r1_fq = DATA_DIR / "bench_R1.fq" + r2_fq = DATA_DIR / "bench_R2.fq" + + if r1_gz.exists() and r2_gz.exists(): + print("[data] Reusing existing compressed test data") + print(f" R1.gz: {human_size(r1_gz)}") + print(f" R2.gz: {human_size(r2_gz)}") + else: + print(f"[data] Generating {args.pairs} pairs...") + generate_data(r1_gz, r2_gz, args.pairs, args.seed) + print(f" R1.gz: {human_size(r1_gz)}") + print(f" R2.gz: {human_size(r2_gz)}") + + if need_fq: + if r1_fq.exists() and r2_fq.exists(): + print("[data] Reusing existing uncompressed test data") + else: + print("[data] Decompressing to plain FASTQ...") + for gz_path, fq_path in ((r1_gz, r1_fq), (r2_gz, r2_fq)): + with gzip.open(gz_path, "rb") as fi, open(fq_path, "wb") as fo: + shutil.copyfileobj(fi, fo) + print(f" R1.fq: {human_size(r1_fq)}") + print(f" R2.fq: {human_size(r2_fq)}") + + merge_r1_gz = DATA_DIR / "merge_R1.fq.gz" + merge_r2_gz = DATA_DIR / "merge_R2.fq.gz" + if needs_merge: + if merge_r1_gz.exists() and merge_r2_gz.exists(): + print("[data] Reusing existing merge test data (overlapping PE)") + print(f" merge_R1.gz: {human_size(merge_r1_gz)}") + print(f" merge_R2.gz: {human_size(merge_r2_gz)}") + else: + print(f"[data] Generating {args.pairs} overlapping pairs for merge/correction...") + generate_merge_data(merge_r1_gz, merge_r2_gz, args.pairs, args.seed) + print(f" merge_R1.gz: {human_size(merge_r1_gz)}") + print(f" merge_R2.gz: {human_size(merge_r2_gz)}") + print() + + # --- Cache warmup --- + print("[cache] Warming up filesystem cache...") + cache_files = [r1_gz, r2_gz] + if need_fq: + cache_files += [r1_fq, r2_fq] + if needs_merge and merge_r1_gz != r1_gz: + cache_files += [merge_r1_gz, merge_r2_gz] + for p in cache_files: + if p.exists(): + with open(p, "rb") as f: + while f.read(1 << 20): + pass + print() + + # --- Run benchmarks --- + results: dict[str, dict] = {} + + for mode in modes: + w = threads_map[mode] + banner(f"Mode: {mode} (W={w})", char="-") + kill_stale_fastp([fastp_orig or "", fastp_opt]) + print() + + # Select data files: merge/correction modes use overlapping PE data + if mode in FEAT_DEFS and FEAT_DEFS[mode].get("data") == "merge": + d_r1_fq, d_r2_fq = merge_r1_gz, merge_r2_gz # merge modes use gz only + d_r1_gz, d_r2_gz = merge_r1_gz, merge_r2_gz + else: + d_r1_fq, d_r2_fq = r1_fq, r2_fq + d_r1_gz, d_r2_gz = r1_gz, r2_gz + + if has_baseline: + t_orig, mem_orig = run_bench( + f"orig_{mode}", fastp_orig, mode, args.runs, w, + d_r1_fq, d_r2_fq, d_r1_gz, d_r2_gz) + t_opt, mem_opt = run_bench( + f"opt_{mode}", fastp_opt, mode, args.runs, w, + d_r1_fq, d_r2_fq, d_r1_gz, d_r2_gz) + entry = { + "threads": w, + "opt_median": round(t_opt, 4), + "opt_peak_mb": mem_opt // 1024, + } + if has_baseline: + entry["orig_median"] = round(t_orig, 4) + entry["orig_peak_mb"] = mem_orig // 1024 + results[mode] = entry + + # --- Verify --- + verification = verify_outputs(modes, has_baseline) + for mode in modes: + results[mode]["verify"] = verification[mode] + + # --- Save JSON --- + report = { + "system": sysinfo, + "config": { + "pairs": args.pairs, + "cpus": CPU_COUNT, + "runs": args.runs, + "seed": args.seed, + "orig": fastp_orig, + "opt": fastp_opt, + }, + "modes": results, + } + RESULTS_JSON.write_text(json.dumps(report, indent=2) + "\n") + print(f"[json] Results saved to {RESULTS_JSON}") + print() + + print_summary(report) diff --git a/benchmark/runner.py b/benchmark/runner.py new file mode 100644 index 0000000..9de9dc2 --- /dev/null +++ b/benchmark/runner.py @@ -0,0 +1,141 @@ +"""Benchmark execution engine.""" +import os +import signal +import subprocess +import sys +import time +from pathlib import Path +from statistics import median + +from .modes import parse_mode, FEAT_DEFS, OUT_DIR + + +def output_files(label: str, mode: str) -> list[Path]: + """Return list of output file paths for a given label+mode.""" + mtype, _, out_fmt = parse_mode(mode) + if mtype == "stdin": + return [] + ext = "fq" if out_fmt == "fq" else "fq.gz" + if mode == "merge": + return [OUT_DIR / f"{label}_merged.{ext}", + OUT_DIR / f"{label}_R1.{ext}", + OUT_DIR / f"{label}_R2.{ext}"] + if mtype == "se": + return [OUT_DIR / f"{label}_R1.{ext}"] + return [OUT_DIR / f"{label}_R1.{ext}", OUT_DIR / f"{label}_R2.{ext}"] + + +def build_cmd(binary: str, mode: str, label: str, threads: int, + r1_fq: Path, r2_fq: Path, r1_gz: Path, r2_gz: Path) -> list[str]: + """Return argv list for the given mode.""" + inp = {"fq": r1_fq, "gz": r1_gz} + inp2 = {"fq": r2_fq, "gz": r2_gz} + out_ext = {"fq": "fq", "gz": "fq.gz"} + + mtype, in_fmt, out_fmt = parse_mode(mode) + + if mtype == "stdin": + return [ + binary, "--stdin", "--stdout", + "-j", "/dev/null", "-h", "/dev/null", + "-w", str(threads), + ] + + ext = out_ext[out_fmt] + if mtype == "se": + cmd = [ + binary, + "-i", str(inp[in_fmt]), + "-o", str(OUT_DIR / f"{label}_R1.{ext}"), + "-j", "/dev/null", "-h", "/dev/null", + "-w", str(threads), + ] + if mode in FEAT_DEFS: + cmd += FEAT_DEFS[mode]["extra"] + return cmd + # pe + cmd = [ + binary, + "-i", str(inp[in_fmt]), "-I", str(inp2[in_fmt]), + ] + if mode == "merge": + cmd += ["--merged_out", str(OUT_DIR / f"{label}_merged.{ext}")] + cmd += [ + "-o", str(OUT_DIR / f"{label}_R1.{ext}"), + "-O", str(OUT_DIR / f"{label}_R2.{ext}"), + "-j", "/dev/null", "-h", "/dev/null", + "-w", str(threads), + ] + if mode in FEAT_DEFS: + cmd += FEAT_DEFS[mode]["extra"] + return cmd + + +def kill_stale_fastp(binaries: list[str]) -> None: + """Kill any leftover fastp processes from previous runs.""" + names = {os.path.basename(b) for b in binaries if b} + names.add("fastp") + for name in names: + try: + r = subprocess.run(["pgrep", "-f", name], capture_output=True, text=True) + for pid_str in r.stdout.strip().splitlines(): + pid = int(pid_str) + if pid == os.getpid(): + continue + try: + os.kill(pid, signal.SIGKILL) + print(f"[cleanup] Killed stale process {name} (PID {pid})") + except ProcessLookupError: + pass + except Exception: + pass + + +def run_once(cmd: list[str], mode: str, r1_fq: Path) -> tuple[float, int]: + """Run a single benchmark iteration, return (wall-clock seconds, peak RSS KB).""" + stdin_f = None + kwargs: dict = dict(stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + if mode == "stdin-stdout": + stdin_f = open(r1_fq, "rb") + kwargs["stdin"] = stdin_f + + t0 = time.perf_counter() + proc = subprocess.Popen(cmd, **kwargs) + _, status, rusage = os.wait4(proc.pid, 0) + elapsed = time.perf_counter() - t0 + proc.returncode = os.waitstatus_to_exitcode(status) + + if stdin_f: + stdin_f.close() + if proc.returncode != 0: + raise subprocess.CalledProcessError(proc.returncode, cmd) + + peak_kb = rusage.ru_maxrss + if sys.platform == "darwin": + peak_kb //= 1024 # macOS reports bytes + return elapsed, peak_kb + + +def run_bench(label: str, binary: str, mode: str, runs: int, threads: int, + r1_fq: Path, r2_fq: Path, r1_gz: Path, r2_gz: Path) -> tuple[float, int]: + """Run benchmark N times, print per-run times, return (median time, max peak RSS KB).""" + cmd = build_cmd(binary, mode, label, threads, r1_fq, r2_fq, r1_gz, r2_gz) + print(f"[bench] {label} ({mode}, {runs} runs)") + + # warmup + run_once(cmd, mode, r1_fq) + + times = [] + peak_rss = 0 + for i in range(1, runs + 1): + for f in OUT_DIR.glob(f"{label}_*"): + f.unlink() + t, rss = run_once(cmd, mode, r1_fq) + times.append(t) + peak_rss = max(peak_rss, rss) + print(f" Run {i}: {t:.2f}s peak={rss // 1024}MB") + + med = median(times) + print(f" >> Median: {med:.4f}s peak={peak_rss // 1024}MB") + print() + return med, peak_rss diff --git a/benchmark/sysinfo.py b/benchmark/sysinfo.py new file mode 100644 index 0000000..e398a78 --- /dev/null +++ b/benchmark/sysinfo.py @@ -0,0 +1,269 @@ +"""Hardware and OS information collection.""" +import json +import os +import platform +import subprocess +import sys +from pathlib import Path + +from .modes import CPU_COUNT + + +def system_info() -> dict: + """Collect hardware, OS, and current resource snapshot.""" + info: dict = { + "os": f"{platform.system()} {platform.release()}", + "arch": platform.machine(), + "cpus": CPU_COUNT, + } + + def _sysctl_int(key: str) -> int | None: + r = subprocess.run(["sysctl", "-n", key], capture_output=True, text=True) + return int(r.stdout.strip()) if r.returncode == 0 and r.stdout.strip().isdigit() else None + + def _count_cpulist(path: Path) -> int: + n = 0 + for part in path.read_text().strip().split(","): + if "-" in part: + lo, hi = part.split("-", 1) + n += int(hi) - int(lo) + 1 + elif part.strip(): + n += 1 + return n + + if sys.platform == "darwin": + r = subprocess.run(["sysctl", "-n", "machdep.cpu.brand_string"], + capture_output=True, text=True) + if r.returncode == 0: + info["cpu"] = r.stdout.strip() + phys = _sysctl_int("hw.physicalcpu") + logi = _sysctl_int("hw.logicalcpu") + if phys: + info["physical_cores"] = phys + if logi: + info["logical_cores"] = logi + nlevels = _sysctl_int("hw.nperflevels") + if nlevels: + cores = [] + for i in range(nlevels): + p = _sysctl_int(f"hw.perflevel{i}.physicalcpu") + lvl_l = _sysctl_int(f"hw.perflevel{i}.logicalcpu") + if lvl_l: + label = "P" if i == 0 else "E" if i == 1 else f"L{i}" + entry: dict = {"type": label, "logical": lvl_l} + if p and p != lvl_l: + entry["physical"] = p + cores.append(entry) + if cores: + info["cores"] = cores + else: + # Linux: CPU model + if os.path.exists("/proc/cpuinfo"): + with open("/proc/cpuinfo") as f: + for line in f: + if line.startswith("model name"): + info["cpu"] = line.split(":", 1)[1].strip() + break + cpu_dir = Path("/sys/devices/system/cpu") + online = cpu_dir / "online" + if online.exists(): + info["logical_cores"] = _count_cpulist(online) + phys_ids: set[str] = set() + for topo in cpu_dir.glob("cpu[0-9]*/topology/core_id"): + phys_ids.add(topo.read_text().strip()) + if phys_ids: + info["physical_cores"] = len(phys_ids) + # Intel hybrid + cpu_types_dir = cpu_dir / "types" + if cpu_types_dir.is_dir(): + cores = [] + for d in sorted(cpu_types_dir.iterdir()): + cpulist = d / "cpulist" + if not cpulist.exists(): + continue + n = _count_cpulist(cpulist) + name = d.name + if "core" in name: + label = "P" + elif "atom" in name: + label = "E" + else: + label = name + cores.append({"type": label, "logical": n}) + if cores: + info["cores"] = cores + # ARM/RISC-V + elif (cpu_dir / "cpu0/cpu_capacity").exists(): + cap_counts: dict[int, int] = {} + for cap_file in sorted(cpu_dir.glob("cpu[0-9]*/cpu_capacity")): + cap = int(cap_file.read_text().strip()) + cap_counts[cap] = cap_counts.get(cap, 0) + 1 + if len(cap_counts) > 1: + labels = ["P", "E"] + [f"L{i}" for i in range(2, 10)] + cores = [] + for i, (cap, cnt) in enumerate(sorted(cap_counts.items(), reverse=True)): + cores.append({"type": labels[min(i, len(labels) - 1)], "logical": cnt}) + info["cores"] = cores + + # Total memory + if sys.platform == "darwin": + r = subprocess.run(["sysctl", "-n", "hw.memsize"], + capture_output=True, text=True) + if r.returncode == 0: + info["mem_total_gb"] = round(int(r.stdout.strip()) / (1 << 30), 1) + elif os.path.exists("/proc/meminfo"): + with open("/proc/meminfo") as f: + for line in f: + if line.startswith("MemTotal"): + info["mem_total_gb"] = round(int(line.split()[1]) / (1 << 20), 1) + break + + # Available memory + if sys.platform == "darwin": + r = subprocess.run(["vm_stat"], capture_output=True, text=True) + if r.returncode == 0: + page_size = os.sysconf("SC_PAGE_SIZE") if hasattr(os, "sysconf") else 16384 + pages: dict[str, int] = {} + for line in r.stdout.splitlines(): + if "page size of" in line: + page_size = int(line.split()[-2]) + elif ":" in line: + key, val = line.split(":", 1) + val = val.strip().rstrip(".") + if val.isdigit(): + pages[key.strip()] = int(val) + avail = (pages.get("Pages free", 0) + + pages.get("Pages inactive", 0) + + pages.get("Pages purgeable", 0)) + info["mem_avail_gb"] = round(avail * page_size / (1 << 30), 1) + elif os.path.exists("/proc/meminfo"): + with open("/proc/meminfo") as f: + for line in f: + if line.startswith("MemAvailable"): + info["mem_avail_gb"] = round(int(line.split()[1]) / (1 << 20), 1) + break + + # Load average + try: + l1, l5, l15 = os.getloadavg() + info["load_avg"] = [round(l1, 2), round(l5, 2), round(l15, 2)] + except OSError: + pass + + # Memory type + Disk info + if sys.platform == "darwin": + try: + r = subprocess.run( + ["system_profiler", "SPMemoryDataType", "SPNVMeDataType", "-json"], + capture_output=True, text=True, timeout=10) + if r.returncode == 0: + sp = json.loads(r.stdout) + mem = (sp.get("SPMemoryDataType") or [{}])[0] + mem_type = mem.get("dimm_type", "") + mem_mfr = mem.get("dimm_manufacturer", "") + if mem_type: + info["mem_type"] = f"{mem_type}" + (f" ({mem_mfr})" if mem_mfr else "") + nvme = (sp.get("SPNVMeDataType") or [{}])[0] + items = nvme.get("_items", []) + if items: + disk = items[0] + info["disk"] = { + "model": disk.get("device_model", "?"), + "size": disk.get("size", "?"), + "protocol": "NVMe", + "interface": "Apple Fabric", + } + except Exception: + pass + else: + dmi_mem_type = Path("/sys/devices/virtual/dmi/id/memory_type") + if dmi_mem_type.exists(): + info["mem_type"] = dmi_mem_type.read_text().strip() + root_dev = None + try: + r = subprocess.run(["findmnt", "-no", "SOURCE", "/"], + capture_output=True, text=True, timeout=5) + if r.returncode == 0: + src = r.stdout.strip().split("/")[-1] + import re + m = re.match(r"(sd[a-z]+|nvme\d+n\d+|vd[a-z]+)", src) + if m: + root_dev = m.group(1) + except Exception: + pass + if root_dev: + blk = Path(f"/sys/block/{root_dev}") + disk_info: dict = {} + model_f = blk / "device/model" + if model_f.exists(): + disk_info["model"] = model_f.read_text().strip() + rot_f = blk / "queue/rotational" + if rot_f.exists(): + disk_info["type"] = "HDD" if rot_f.read_text().strip() == "1" else "SSD" + if root_dev.startswith("nvme"): + nvme_name = root_dev.split("n")[0] + nvme_sys = Path(f"/sys/class/nvme/{nvme_name}") + transport_f = nvme_sys / "transport" + if transport_f.exists(): + disk_info["protocol"] = "NVMe" + disk_info["interface"] = transport_f.read_text().strip().upper() + addr_f = nvme_sys / "address" + if addr_f.exists(): + addr = addr_f.read_text().strip() + try: + r2 = subprocess.run( + ["lspci", "-vv", "-s", addr], + capture_output=True, text=True, timeout=5) + if r2.returncode == 0: + for line in r2.stdout.splitlines(): + if "LnkSta:" in line and "Speed" in line: + parts = line.split(",") + speed = "" + width = "" + for p in parts: + p = p.strip() + if "Speed" in p: + speed = p.split("Speed")[-1].strip() + if "Width" in p: + width = p.strip() + if speed: + disk_info["link_speed"] = f"{speed} {width}".strip() + break + except Exception: + pass + elif root_dev.startswith("sd"): + disk_info["protocol"] = "SATA/SAS" + if disk_info: + disk_info.setdefault("size", "?") + size_f = blk / "size" + if size_f.exists(): + sectors = int(size_f.read_text().strip()) + gb = sectors * 512 / (1 << 30) + disk_info["size"] = f"{gb:.0f} GB" + info["disk"] = disk_info + return info + + +def format_cores(si: dict) -> str: + """Format core topology string from system info dict.""" + phys = si.get("physical_cores") + logi = si.get("logical_cores", si.get("cpus", "?")) + topo = "" + if "cores" in si: + parts = [] + for c in si["cores"]: + n = c.get("logical", c.get("count", "?")) + p = c.get("physical") + if p and p != n: + parts.append(f"{p}{c['type']}x2") + else: + parts.append(f"{n}{c['type']}") + topo = " + ".join(parts) + if phys and phys != logi: + if topo: + return f"{phys}C/{logi}T ({topo})" + return f"{phys}C/{logi}T" + else: + if topo: + return f"{logi} ({topo})" + return str(logi) diff --git a/benchmark/verify.py b/benchmark/verify.py new file mode 100644 index 0000000..0089137 --- /dev/null +++ b/benchmark/verify.py @@ -0,0 +1,83 @@ +"""Output verification via MD5 comparison.""" +import gzip +import hashlib +from pathlib import Path + +from .runner import output_files + + +def md5_file(path: Path) -> str: + h = hashlib.md5() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(1 << 20), b""): + h.update(chunk) + return h.hexdigest() + + +def md5_gz_content(path: Path) -> str: + """MD5 of decompressed content (compares FASTQ content, not gz bytes).""" + h = hashlib.md5() + with gzip.open(path, "rb") as f: + for chunk in iter(lambda: f.read(1 << 20), b""): + h.update(chunk) + return h.hexdigest() + + +def fq_md5(path: Path) -> str: + """MD5 of FASTQ content -- decompress if .gz, otherwise hash directly.""" + if path.suffix == ".gz": + return md5_gz_content(path) + return md5_file(path) + + +def verify_outputs(modes: list[str], has_baseline: bool = True) -> dict[str, dict]: + """Verify output FASTQ content (decompress gz first). Returns per-mode verification dict.""" + if has_baseline: + print("[verify] Computing FASTQ content md5 (decompressing gz if needed)...") + else: + print("[verify] Computing opt output md5 (no baseline to compare)...") + verification: dict[str, dict] = {} + + for mode in modes: + if mode == "stdin-stdout": + verification[mode] = {"status": "skipped", "reason": "no output files"} + print(f" {mode}: skipped (no output files)") + continue + + opt_files = output_files(f"opt_{mode}", mode) + mode_result: dict = {"files": {}} + + if has_baseline: + orig_files = output_files(f"orig_{mode}", mode) + all_match = True + for f_orig, f_opt in zip(orig_files, opt_files): + name = f_orig.name.split("_", 2)[-1] + h_orig = fq_md5(f_orig) + h_opt = fq_md5(f_opt) + match = h_orig == h_opt + if not match: + all_match = False + mode_result["files"][name] = { + "orig_md5": h_orig, + "opt_md5": h_opt, + "match": match, + } + status = "MATCH" if match else "DIFFER" + print(f" {mode} {name}: {status} orig={h_orig[:12]} opt={h_opt[:12]}") + mode_result["status"] = "pass" if all_match else "fail" + else: + for f_opt in opt_files: + name = f_opt.name.split("_", 2)[-1] + h_opt = fq_md5(f_opt) + mode_result["files"][name] = {"opt_md5": h_opt} + print(f" {mode} {name}: md5={h_opt[:12]}") + mode_result["status"] = "ok" + + verification[mode] = mode_result + + n_pass = sum(1 for v in verification.values() if v.get("status") == "pass") + n_check = sum(1 for v in verification.values() if v.get("status") in ("pass", "fail")) + if n_check > 0: + print(f" Passed: {n_pass} / {n_check} modes") + print() + return verification