diff --git a/nvmolkit/autotune/_core.py b/nvmolkit/autotune/_core.py index b990e93c..fa8a9671 100644 --- a/nvmolkit/autotune/_core.py +++ b/nvmolkit/autotune/_core.py @@ -287,6 +287,9 @@ def suggest_from_space(trial, name: str, spec: Any) -> Any: - ``(low, high)`` tuple — uniform integer range. - ``(low, high, "log")`` tuple — log-uniform integer range. + - ``(low, high, step)`` tuple — uniform integer range restricted to + multiples of ``step`` (preserves ordering for TPE, unlike a categorical + list of the same values). - ``list`` of choices — categorical search. A literal scalar is returned unchanged (acts as a fixed value rather than @@ -299,6 +302,11 @@ def suggest_from_space(trial, name: str, spec: Any) -> Any: if isinstance(spec, tuple) and len(spec) == 3 and spec[2] == "log": low, high, _ = spec return trial.suggest_int(name, int(low), int(high), log=True) + if isinstance(spec, tuple) and len(spec) == 3 and all(isinstance(v, int) for v in spec): + low, high, step = spec + if step <= 0: + raise ValueError(f"Search-space override for {name!r}: step must be a positive integer, got {step!r}.") + return trial.suggest_int(name, int(low), int(high), step=int(step)) if isinstance(spec, list) and len(spec) == 2 and all(isinstance(v, int) for v in spec): raise TypeError( f"Search-space override for {name!r} is a 2-element int list {spec!r}; " @@ -310,6 +318,12 @@ def suggest_from_space(trial, name: str, spec: Any) -> Any: f"Search-space override for {name!r} is a 3-element list {spec!r} ending in 'log'; " "use a tuple (low, high, 'log') for a log-uniform integer range." ) + if isinstance(spec, list) and len(spec) == 3 and all(isinstance(v, int) for v in spec): + raise TypeError( + f"Search-space override for {name!r} is a 3-element int list {spec!r}; " + "use a tuple (low, high, step) for a stepped integer range, or wrap in " + "a list of more than three values to request a categorical search." + ) if isinstance(spec, (list, tuple)): choices = list(spec) return trial.suggest_categorical(name, choices) @@ -322,9 +336,11 @@ def collect_int_from_space(spec: Any) -> int: For a uniform integer range ``(low, high)`` the arithmetic midpoint is returned. For a log-uniform range ``(low, high, "log")`` the geometric midpoint is returned (rounded to the nearest int, clamped to ``[low, - high]``). For a categorical list the first listed choice is returned, on - the convention that callers list their preferred default first. A bare - scalar is returned unchanged. + high]``). For a stepped range ``(low, high, step)`` the arithmetic + midpoint snapped to the nearest multiple of ``step`` from ``low`` is + returned, clamped to ``[low, high]``. For a categorical list the first + listed choice is returned, on the convention that callers list their + preferred default first. A bare scalar is returned unchanged. """ if isinstance(spec, tuple) and len(spec) == 3 and spec[2] == "log": low, high, _ = spec @@ -334,6 +350,11 @@ def collect_int_from_space(spec: Any) -> int: raise ValueError(f"Log-uniform range {spec!r} requires strictly positive bounds.") midpoint = int(round(math.sqrt(low_int * high_int))) return max(low_int, min(high_int, midpoint)) + if isinstance(spec, tuple) and len(spec) == 3 and all(isinstance(v, int) for v in spec): + low, high, step = (int(v) for v in spec) + midpoint = (low + high) // 2 + snapped = low + round((midpoint - low) / step) * step + return max(low, min(high, snapped)) if isinstance(spec, tuple) and len(spec) == 2: low, high = spec return (int(low) + int(high)) // 2 diff --git a/nvmolkit/autotune/_ff_common.py b/nvmolkit/autotune/_ff_common.py index 0f174cee..c37845fb 100644 --- a/nvmolkit/autotune/_ff_common.py +++ b/nvmolkit/autotune/_ff_common.py @@ -24,6 +24,7 @@ from __future__ import annotations import os +import re from typing import Iterable, Optional from rdkit.Chem import Mol @@ -49,17 +50,37 @@ def coerce_gpu_ids(gpuIds: Optional[Iterable[int]]) -> list[int]: def cpu_count() -> int: - """Return the usable CPU core count, with a floor of 1.""" + """Return the usable physical CPU core count, with a floor of 1. + + Falls back to ``os.cpu_count()`` (logical) if ``/proc/cpuinfo`` is missing. + """ + physical = _physical_cpu_count_from_proc() + if physical is not None: + return max(1, physical) return max(1, os.cpu_count() or 1) +def _physical_cpu_count_from_proc() -> Optional[int]: + """Return the number of distinct physical cores from ``/proc/cpuinfo``.""" + try: + with open("/proc/cpuinfo") as cpuinfo: + text = cpuinfo.read() + except OSError: + return None + physical_ids = re.findall(r"^physical id\s*:\s*(\S+)", text, re.MULTILINE) + core_ids = re.findall(r"^core id\s*:\s*(\S+)", text, re.MULTILINE) + if not physical_ids or len(physical_ids) != len(core_ids): + return None + return len(set(zip(physical_ids, core_ids))) + + def resolve_cpu_budget(cpu_budget: Optional[int]) -> int: """Return the effective CPU budget for autotune search ranges. ``cpu_budget`` lets callers cap total CPU usage explicitly (useful for cross-machine normalization where the goal is to isolate GPU performance - from CPU-count differences). When ``None``, falls back to - :func:`cpu_count`. Values less than 1 are rejected. + from CPU-count differences). When ``None``, falls back to physical core + count via :func:`cpu_count`. Values less than 1 are rejected. """ if cpu_budget is None: return cpu_count() @@ -89,12 +110,16 @@ def resolve_num_gpus(fixed_gpu_ids: list[int]) -> int: def default_ff_search_space(num_gpus: int, cpus: int) -> dict: """Return the default FF search space scaled to the active hardware. - ``batchesPerGpu`` is per-GPU; its upper bound is capped at - ``cpus // num_gpus`` so the total worker thread count across all GPUs - (``num_gpus * batchesPerGpu``) does not exceed ``cpus``. + ``batchSize`` is a stepped integer range in multiples of 64 since the + underlying kernels are tile-tuned for those sizes; the stepped form + preserves numeric ordering for TPE (unlike a categorical list). + ``batchesPerGpu`` is per-GPU and capped at ``min(8, cpus // num_gpus)``: + 8 is the empirical point of diminishing returns for the batched-FF + dispatch, and the physical-core floor prevents oversubscribing CPU + coordinator threads across all GPUs. """ - per_gpu_max = max(1, cpus // max(1, num_gpus)) + per_gpu_max = max(1, min(8, cpus // max(1, num_gpus))) return { - "batchSize": (32, 1500, "log"), + "batchSize": (64, 1024, 64), "batchesPerGpu": (1, per_gpu_max), } diff --git a/nvmolkit/autotune/tune_embed_molecules.py b/nvmolkit/autotune/tune_embed_molecules.py index 5e248a86..437f8649 100644 --- a/nvmolkit/autotune/tune_embed_molecules.py +++ b/nvmolkit/autotune/tune_embed_molecules.py @@ -45,13 +45,16 @@ def _default_embed_search_space(num_gpus: int, cpus: int) -> dict: EmbedMolecules runs preprocessing and GPU-dispatch threads sequentially, so each pool is capped independently: - * ``batchesPerGpu`` (per-GPU GPU-runner threads): max = ``cpus // - num_gpus``. - * ``preprocessingThreads`` (total CPU pool): max = ``cpus``. + * ``batchSize``: stepped int range in multiples of 64 (kernels are + tile-tuned for these sizes); stepping preserves numeric ordering for TPE. + * ``batchesPerGpu`` (per-GPU GPU-runner threads): max = + ``min(8, cpus // num_gpus)``. 8 is the empirical point of diminishing + returns; the physical-core floor prevents oversubscribing across GPUs. + * ``preprocessingThreads`` (total CPU pool): max = ``cpus`` (physical). """ - per_gpu_max = max(1, cpus // max(1, num_gpus)) + per_gpu_max = max(1, min(8, cpus // max(1, num_gpus))) return { - "batchSize": (64, 2000, "log"), + "batchSize": (64, 1024, 64), "batchesPerGpu": (1, per_gpu_max), "preprocessingThreads": (1, cpus), } diff --git a/nvmolkit/autotune/tune_substructure.py b/nvmolkit/autotune/tune_substructure.py index f3162f4c..49418688 100644 --- a/nvmolkit/autotune/tune_substructure.py +++ b/nvmolkit/autotune/tune_substructure.py @@ -64,13 +64,17 @@ def _default_substruct_search_space(num_gpus: int, cpus: int) -> dict: at trial sampling time by clamping ``preprocessingThreads`` to whatever cores are left after ``workerThreads`` is chosen. - * ``workerThreads`` (per-GPU): max = ``cpus // num_gpus``. - * ``preprocessingThreads`` (total CPU pool): max = ``cpus`` (the + * ``batchSize``: stepped int range in multiples of 128 (kernels are + tile-tuned for these sizes); stepping preserves numeric ordering for TPE. + * ``workerThreads`` (per-GPU): max = ``min(8, cpus // num_gpus)``. + 8 is the empirical point of diminishing returns; the physical-core + floor prevents oversubscribing across GPUs. + * ``preprocessingThreads`` (total CPU pool): max = ``cpus`` (physical; effective per-trial cap is reduced once ``workerThreads`` is sampled). """ - per_gpu_worker_max = max(1, cpus // max(1, num_gpus)) + per_gpu_worker_max = max(1, min(8, cpus // max(1, num_gpus))) return { - "batchSize": (128, 8192, "log"), + "batchSize": (128, 1024, 128), "workerThreads": (1, per_gpu_worker_max), "preprocessingThreads": (1, cpus), } diff --git a/nvmolkit/tests/test_autotune.py b/nvmolkit/tests/test_autotune.py index 19cabf2c..28f322c8 100644 --- a/nvmolkit/tests/test_autotune.py +++ b/nvmolkit/tests/test_autotune.py @@ -256,23 +256,32 @@ def test_shrink_halves_within_floor(): def test_default_ff_search_space_caps_batches_per_gpu_by_cpu_count(): - """``batchesPerGpu`` upper bound divides the CPU budget by the GPU count.""" + """``batchesPerGpu`` upper bound is ``min(8, cpus // num_gpus)``.""" space_1gpu = _ff_common.default_ff_search_space(num_gpus=1, cpus=32) space_4gpu = _ff_common.default_ff_search_space(num_gpus=4, cpus=32) space_64gpu = _ff_common.default_ff_search_space(num_gpus=64, cpus=32) - assert space_1gpu["batchesPerGpu"] == (1, 32) + assert space_1gpu["batchesPerGpu"] == (1, 8) assert space_4gpu["batchesPerGpu"] == (1, 8) assert space_64gpu["batchesPerGpu"] == (1, 1) +def test_default_ff_search_space_batch_size_is_stepped_multiples_of_64(): + """``batchSize`` is a stepped int range in multiples of 64.""" + space = _ff_common.default_ff_search_space(num_gpus=1, cpus=32) + low, high, step = space["batchSize"] + assert step == 64 + assert low % 64 == 0 and high % 64 == 0 + assert low <= high + + def test_default_substruct_search_space_caps_per_pool(): - """Substruct: workerThreads capped per-GPU, preprocessingThreads capped at cpus.""" + """Substruct: workerThreads capped at min(8, cpus/num_gpus), prep at cpus.""" space_1gpu = _default_substruct_search_space(num_gpus=1, cpus=16) space_4gpu = _default_substruct_search_space(num_gpus=4, cpus=16) space_64gpu = _default_substruct_search_space(num_gpus=64, cpus=16) - assert space_1gpu["workerThreads"] == (1, 16) + assert space_1gpu["workerThreads"] == (1, 8) assert space_4gpu["workerThreads"] == (1, 4) assert space_64gpu["workerThreads"] == (1, 1) @@ -280,7 +289,10 @@ def test_default_substruct_search_space_caps_per_pool(): assert space_4gpu["preprocessingThreads"] == (1, 16) assert space_64gpu["preprocessingThreads"] == (1, 16) - assert space_1gpu["batchSize"] == (128, 8192, "log") + low, high, step = space_1gpu["batchSize"] + assert step % 64 == 0 + assert low % step == 0 and high % step == 0 + assert low <= high class _RecordingTrial: @@ -290,8 +302,8 @@ def __init__(self, picker=lambda low, high: low): self.calls: list[dict] = [] self.picker = picker - def suggest_int(self, name: str, low: int, high: int, log: bool = False) -> int: - self.calls.append({"name": name, "low": low, "high": high, "log": log}) + def suggest_int(self, name: str, low: int, high: int, log: bool = False, step: int = 1) -> int: + self.calls.append({"name": name, "low": low, "high": high, "log": log, "step": step}) return int(self.picker(low, high)) @@ -300,7 +312,7 @@ def test_suggest_preprocessing_threads_clamps_to_remaining_cpu_budget(): trial = _RecordingTrial() spec = (1, 32) value = _suggest_preprocessing_threads(trial, spec, worker_threads=6, num_gpus=2, cpus=16) - assert trial.calls == [{"name": "preprocessingThreads", "low": 1, "high": 4, "log": False}] + assert trial.calls == [{"name": "preprocessingThreads", "low": 1, "high": 4, "log": False, "step": 1}] assert value == 1 @@ -309,7 +321,7 @@ def test_suggest_preprocessing_threads_respects_low_floor_when_budget_exhausted( trial = _RecordingTrial(picker=lambda low, high: high) spec = (4, 32) value = _suggest_preprocessing_threads(trial, spec, worker_threads=8, num_gpus=2, cpus=16) - assert trial.calls == [{"name": "preprocessingThreads", "low": 4, "high": 4, "log": False}] + assert trial.calls == [{"name": "preprocessingThreads", "low": 4, "high": 4, "log": False, "step": 1}] assert value == 4 @@ -318,7 +330,7 @@ def test_suggest_preprocessing_threads_does_not_clamp_when_budget_available(): trial = _RecordingTrial(picker=lambda low, high: high) spec = (1, 8) value = _suggest_preprocessing_threads(trial, spec, worker_threads=2, num_gpus=2, cpus=32) - assert trial.calls == [{"name": "preprocessingThreads", "low": 1, "high": 8, "log": False}] + assert trial.calls == [{"name": "preprocessingThreads", "low": 1, "high": 8, "log": False, "step": 1}] assert value == 8 @@ -327,14 +339,24 @@ def test_suggest_preprocessing_threads_propagates_log_flag(): trial = _RecordingTrial() spec = (1, 32, "log") _suggest_preprocessing_threads(trial, spec, worker_threads=2, num_gpus=1, cpus=8) - assert trial.calls == [{"name": "preprocessingThreads", "low": 1, "high": 6, "log": True}] + assert trial.calls == [{"name": "preprocessingThreads", "low": 1, "high": 6, "log": True, "step": 1}] def test_default_embed_search_space_caps_per_pool(): - """Embed: per-GPU pool capped at cpus/numGpus, total pool capped at cpus.""" + """Embed: per-GPU pool capped at min(8, cpus/numGpus), total prep pool at cpus.""" space = _default_embed_search_space(num_gpus=4, cpus=16) assert space["batchesPerGpu"] == (1, 4) assert space["preprocessingThreads"] == (1, 16) + low, high, step = space["batchSize"] + assert step == 64 + assert low % 64 == 0 and high % 64 == 0 + assert low <= high + + +def test_default_embed_search_space_caps_batches_per_gpu_at_eight(): + """When cpus/num_gpus exceeds 8, ``batchesPerGpu`` is still capped at 8.""" + space = _default_embed_search_space(num_gpus=4, cpus=128) + assert space["batchesPerGpu"] == (1, 8) def test_resolve_num_gpus_prefers_explicit_list(): @@ -349,6 +371,29 @@ def test_resolve_cpu_budget_falls_back_to_cpu_count(monkeypatch): assert _ff_common.resolve_cpu_budget(None) == 24 +def test_physical_cpu_count_from_proc_dedupes_smt_siblings(tmp_path, monkeypatch): + """``_physical_cpu_count_from_proc`` collapses SMT siblings by ``(physical id, core id)``.""" + fake_cpuinfo = ( + "processor\t: 0\nphysical id\t: 0\ncore id\t: 0\n\n" + "processor\t: 1\nphysical id\t: 0\ncore id\t: 0\n\n" + "processor\t: 2\nphysical id\t: 0\ncore id\t: 1\n\n" + "processor\t: 3\nphysical id\t: 0\ncore id\t: 1\n\n" + "processor\t: 4\nphysical id\t: 1\ncore id\t: 0\n\n" + "processor\t: 5\nphysical id\t: 1\ncore id\t: 0\n\n" + ) + fake_path = tmp_path / "cpuinfo" + fake_path.write_text(fake_cpuinfo) + real_open = open + + def fake_open(path, *args, **kwargs): + if path == "/proc/cpuinfo": + return real_open(fake_path, *args, **kwargs) + return real_open(path, *args, **kwargs) + + monkeypatch.setattr("builtins.open", fake_open) + assert _ff_common._physical_cpu_count_from_proc() == 3 + + def test_resolve_cpu_budget_uses_explicit_value(): """An explicit ``cpu_budget`` overrides whatever the OS reports.""" assert _ff_common.resolve_cpu_budget(14) == 14