From 32a5485cf9cd7a40d562f4085bfa47dc0fd882fb Mon Sep 17 00:00:00 2001 From: TianyeDong Date: Thu, 28 May 2026 20:28:39 -0400 Subject: [PATCH] Map overlap GPUs by infer placement --- rlix/pipeline/miles_pipeline.py | 94 ++++++++++++++++++++++----------- 1 file changed, 62 insertions(+), 32 deletions(-) diff --git a/rlix/pipeline/miles_pipeline.py b/rlix/pipeline/miles_pipeline.py index c23faef..17cfcfd 100644 --- a/rlix/pipeline/miles_pipeline.py +++ b/rlix/pipeline/miles_pipeline.py @@ -517,32 +517,10 @@ def _wait_for_overlap_engines_offloaded(self, allocated_train_gpus, *, timeout_s rollout_manager = getattr(self, "_rollout_manager", None) if rollout_manager is None: return - miles_args = self._pipeline_config.miles_args - per_engine = max(int(getattr(miles_args, "rollout_num_gpus_per_engine", 1)), 1) - # M11.2 multi-pipeline fix: physical GPU IDs are absolute machine - # indices, but the RolloutManager uses LOCAL engine indices - # (0..N-1) within this pipeline's infer pool. Convert physical → - # local by subtracting the infer pool's first physical GPU. - # M11.1 single-pipeline pool was [0..rollout_num_gpus-1], so - # ``g // per_engine`` happened to equal the local engine index; - # for M11.2 P2 (pool [2,3]), ``2 // 1 = 2`` is wrong (no engine - # at local index 2). Read the infer mapping from - # cluster_device_mappings, fall back to range(rollout_num_gpus) - # for backward compat. - cluster_mappings = ( - getattr(self._pipeline_config, "cluster_device_mappings", None) or {} + overlap_engine_indices, overlap_gpu_ids = self._resolve_overlap_infer_engines( + allocated_train_gpus ) - infer_mapping = list( - cluster_mappings.get( - "actor_infer", list(range(int(miles_args.rollout_num_gpus))) - ) - ) - infer_first = min(infer_mapping) if infer_mapping else 0 - target_indices = sorted( - {(int(g) - infer_first) // per_engine for g in allocated_train_gpus} - ) - target_gpu_ids = sorted(set(int(g) for g in allocated_train_gpus)) - if not target_indices: + if not overlap_engine_indices: return # Phase 1: wait for engine state transitions to "offloaded" / "shell". @@ -551,18 +529,18 @@ def _wait_for_overlap_engines_offloaded(self, allocated_train_gpus, *, timeout_s while time.time() < deadline: try: states = ray.get( - rollout_manager.get_engine_states.remote(target_indices) + rollout_manager.get_engine_states.remote(overlap_engine_indices) ) except Exception as exc: # noqa: BLE001 logger.warning( "_wait_for_overlap_engines_offloaded: get_engine_states failed: %r", exc ) return - uniq = {states.get(i, "?") for i in target_indices} + uniq = {states.get(i, "?") for i in overlap_engine_indices} if uniq.issubset({"offloaded", "shell"}): logger.info( "_wait_for_overlap_engines_offloaded: engines %s reached state=%s", - target_indices, uniq, + overlap_engine_indices, uniq, ) break time.sleep(0.1) @@ -570,7 +548,7 @@ def _wait_for_overlap_engines_offloaded(self, allocated_train_gpus, *, timeout_s logger.warning( "_wait_for_overlap_engines_offloaded: state timeout after %.1fs; " "engines %s still in state=%r", - timeout_s, target_indices, uniq, + timeout_s, overlap_engine_indices, uniq, ) # Phase 2: probe nvidia-smi for OS-level free memory on the @@ -582,7 +560,7 @@ def _wait_for_overlap_engines_offloaded(self, allocated_train_gpus, *, timeout_s last_min_free_gb: Optional[float] = None nvidia_smi_unavail_count = 0 while time.time() < deadline2: - min_free_gb = self._probe_min_free_gpu_mem_gb(target_gpu_ids) + min_free_gb = self._probe_min_free_gpu_mem_gb(overlap_gpu_ids) if min_free_gb is None: # F5 (m11-review.review-report.md §2): nvidia-smi unavailable # or unparseable. Was logged at DEBUG only — promoted to INFO @@ -603,7 +581,7 @@ def _wait_for_overlap_engines_offloaded(self, allocated_train_gpus, *, timeout_s logger.info( "_wait_for_overlap_engines_offloaded: OS-level GPU mem free " "min=%.2f GB across overlap GPUs %s (target=%.1f GB)", - min_free_gb, target_gpu_ids, target_free_gb, + min_free_gb, overlap_gpu_ids, target_free_gb, ) return time.sleep(0.5) @@ -613,7 +591,7 @@ def _wait_for_overlap_engines_offloaded(self, allocated_train_gpus, *, timeout_s timeout_s, last_min_free_gb if last_min_free_gb is not None else float("nan"), target_free_gb, - target_gpu_ids, + overlap_gpu_ids, ) @staticmethod @@ -886,6 +864,58 @@ def signal_rollout_demand(self, rollout_id: int, step_target: int) -> None: # Helpers # ------------------------------------------------------------------ + def _resolve_overlap_infer_engines( + self, allocated_train_gpus + ) -> tuple[list[int], list[int]]: + """Resolve train GPUs to local inference-engine indices. + + ``allocated_train_gpus`` contains physical GPU IDs granted to the + training actors. The rollout manager addresses inference engines by + local engine index within this pipeline's ``actor_infer`` placement. + + The inference placement is read from + ``pipeline_config.cluster_device_mappings["actor_infer"]`` when + present, and falls back to ``range(rollout_num_gpus)`` for older + configs. GPUs are grouped into engines in placement order, with + ``rollout_num_gpus_per_engine`` GPUs per engine. The return value is + ``(overlap_engine_indices, overlap_gpu_ids)`` for the train GPUs that + are also part of the inference placement. + """ + miles_args = self._pipeline_config.miles_args + cluster_mappings = ( + getattr(self._pipeline_config, "cluster_device_mappings", None) or {} + ) + infer_mapping = list( + cluster_mappings.get( + "actor_infer", list(range(int(miles_args.rollout_num_gpus))) + ) + ) + if not infer_mapping: + return [], [] + per_engine = max( + int(getattr(miles_args, "rollout_num_gpus_per_engine", 1)), 1 + ) + if len(infer_mapping) % per_engine != 0: + raise RuntimeError( + "actor_infer GPU mapping length must be divisible by " + f"rollout_num_gpus_per_engine: mapping={infer_mapping}, " + f"per_engine={per_engine}" + ) + + gpu_to_engine = { + int(gpu): local_idx // per_engine + for local_idx, gpu in enumerate(infer_mapping) + } + overlap_gpu_ids = sorted( + int(gpu) + for gpu in set(allocated_train_gpus) + if int(gpu) in gpu_to_engine + ) + overlap_engine_indices = sorted( + {gpu_to_engine[gpu] for gpu in overlap_gpu_ids} + ) + return overlap_engine_indices, overlap_gpu_ids + def _request_cluster_gpus( self, *,