Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 62 additions & 32 deletions rlix/pipeline/miles_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,32 +517,10 @@ def _wait_for_overlap_engines_offloaded(self, allocated_train_gpus, *, timeout_s
rollout_manager = getattr(self, "_rollout_manager", None)
if rollout_manager is None:
return
miles_args = self._pipeline_config.miles_args
per_engine = max(int(getattr(miles_args, "rollout_num_gpus_per_engine", 1)), 1)
# M11.2 multi-pipeline fix: physical GPU IDs are absolute machine
# indices, but the RolloutManager uses LOCAL engine indices
# (0..N-1) within this pipeline's infer pool. Convert physical →
# local by subtracting the infer pool's first physical GPU.
# M11.1 single-pipeline pool was [0..rollout_num_gpus-1], so
# ``g // per_engine`` happened to equal the local engine index;
# for M11.2 P2 (pool [2,3]), ``2 // 1 = 2`` is wrong (no engine
# at local index 2). Read the infer mapping from
# cluster_device_mappings, fall back to range(rollout_num_gpus)
# for backward compat.
cluster_mappings = (
getattr(self._pipeline_config, "cluster_device_mappings", None) or {}
overlap_engine_indices, overlap_gpu_ids = self._resolve_overlap_infer_engines(
allocated_train_gpus
)
infer_mapping = list(
cluster_mappings.get(
"actor_infer", list(range(int(miles_args.rollout_num_gpus)))
)
)
infer_first = min(infer_mapping) if infer_mapping else 0
target_indices = sorted(
{(int(g) - infer_first) // per_engine for g in allocated_train_gpus}
)
target_gpu_ids = sorted(set(int(g) for g in allocated_train_gpus))
if not target_indices:
if not overlap_engine_indices:
return

# Phase 1: wait for engine state transitions to "offloaded" / "shell".
Expand All @@ -551,26 +529,26 @@ def _wait_for_overlap_engines_offloaded(self, allocated_train_gpus, *, timeout_s
while time.time() < deadline:
try:
states = ray.get(
rollout_manager.get_engine_states.remote(target_indices)
rollout_manager.get_engine_states.remote(overlap_engine_indices)
)
except Exception as exc: # noqa: BLE001
logger.warning(
"_wait_for_overlap_engines_offloaded: get_engine_states failed: %r", exc
)
return
uniq = {states.get(i, "?") for i in target_indices}
uniq = {states.get(i, "?") for i in overlap_engine_indices}
if uniq.issubset({"offloaded", "shell"}):
logger.info(
"_wait_for_overlap_engines_offloaded: engines %s reached state=%s",
target_indices, uniq,
overlap_engine_indices, uniq,
)
break
time.sleep(0.1)
else:
logger.warning(
"_wait_for_overlap_engines_offloaded: state timeout after %.1fs; "
"engines %s still in state=%r",
timeout_s, target_indices, uniq,
timeout_s, overlap_engine_indices, uniq,
)

# Phase 2: probe nvidia-smi for OS-level free memory on the
Expand All @@ -582,7 +560,7 @@ def _wait_for_overlap_engines_offloaded(self, allocated_train_gpus, *, timeout_s
last_min_free_gb: Optional[float] = None
nvidia_smi_unavail_count = 0
while time.time() < deadline2:
min_free_gb = self._probe_min_free_gpu_mem_gb(target_gpu_ids)
min_free_gb = self._probe_min_free_gpu_mem_gb(overlap_gpu_ids)
if min_free_gb is None:
# F5 (m11-review.review-report.md §2): nvidia-smi unavailable
# or unparseable. Was logged at DEBUG only — promoted to INFO
Expand All @@ -603,7 +581,7 @@ def _wait_for_overlap_engines_offloaded(self, allocated_train_gpus, *, timeout_s
logger.info(
"_wait_for_overlap_engines_offloaded: OS-level GPU mem free "
"min=%.2f GB across overlap GPUs %s (target=%.1f GB)",
min_free_gb, target_gpu_ids, target_free_gb,
min_free_gb, overlap_gpu_ids, target_free_gb,
)
return
time.sleep(0.5)
Expand All @@ -613,7 +591,7 @@ def _wait_for_overlap_engines_offloaded(self, allocated_train_gpus, *, timeout_s
timeout_s,
last_min_free_gb if last_min_free_gb is not None else float("nan"),
target_free_gb,
target_gpu_ids,
overlap_gpu_ids,
)

@staticmethod
Expand Down Expand Up @@ -886,6 +864,58 @@ def signal_rollout_demand(self, rollout_id: int, step_target: int) -> None:
# Helpers
# ------------------------------------------------------------------

def _resolve_overlap_infer_engines(
self, allocated_train_gpus
) -> tuple[list[int], list[int]]:
"""Resolve train GPUs to local inference-engine indices.

``allocated_train_gpus`` contains physical GPU IDs granted to the
training actors. The rollout manager addresses inference engines by
local engine index within this pipeline's ``actor_infer`` placement.

The inference placement is read from
``pipeline_config.cluster_device_mappings["actor_infer"]`` when
present, and falls back to ``range(rollout_num_gpus)`` for older
configs. GPUs are grouped into engines in placement order, with
``rollout_num_gpus_per_engine`` GPUs per engine. The return value is
``(overlap_engine_indices, overlap_gpu_ids)`` for the train GPUs that
are also part of the inference placement.
"""
miles_args = self._pipeline_config.miles_args
cluster_mappings = (
getattr(self._pipeline_config, "cluster_device_mappings", None) or {}
)
infer_mapping = list(
cluster_mappings.get(
"actor_infer", list(range(int(miles_args.rollout_num_gpus)))
)
)
if not infer_mapping:
return [], []
per_engine = max(
int(getattr(miles_args, "rollout_num_gpus_per_engine", 1)), 1
)
if len(infer_mapping) % per_engine != 0:
raise RuntimeError(
"actor_infer GPU mapping length must be divisible by "
f"rollout_num_gpus_per_engine: mapping={infer_mapping}, "
f"per_engine={per_engine}"
)

gpu_to_engine = {
int(gpu): local_idx // per_engine
for local_idx, gpu in enumerate(infer_mapping)
}
overlap_gpu_ids = sorted(
int(gpu)
for gpu in set(allocated_train_gpus)
if int(gpu) in gpu_to_engine
)
overlap_engine_indices = sorted(
{gpu_to_engine[gpu] for gpu in overlap_gpu_ids}
)
return overlap_engine_indices, overlap_gpu_ids

def _request_cluster_gpus(
self,
*,
Expand Down