diff --git a/nemo_retriever/src/nemo_retriever/application/modes/factory.py b/nemo_retriever/src/nemo_retriever/application/modes/factory.py index e87c8fd39..7391f7e6d 100644 --- a/nemo_retriever/src/nemo_retriever/application/modes/factory.py +++ b/nemo_retriever/src/nemo_retriever/application/modes/factory.py @@ -22,6 +22,7 @@ def create_runmode_ingestor(*, run_mode: RunMode = "inprocess", params: Ingestor ray_address=p.ray_address, ray_log_to_driver=p.ray_log_to_driver, debug=p.debug, + allow_no_gpu=p.allow_no_gpu, ) if run_mode == "fused": from nemo_retriever.ingest_modes.fused import FusedIngestor diff --git a/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py b/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py index e00037285..98747960a 100644 --- a/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py +++ b/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py @@ -199,6 +199,7 @@ def __init__( ray_address: Optional[str] = None, ray_log_to_driver: bool = True, debug: bool = False, + allow_no_gpu: bool = False, ) -> None: super().__init__(documents=documents) @@ -240,7 +241,10 @@ def __init__( logger.info(self._cluster_resources) # 2. Resolve requested plan for the Ray DAG that will be built - self._requested_plan = resolve_requested_plan(cluster_resources=self._cluster_resources) + self._requested_plan = resolve_requested_plan( + cluster_resources=self._cluster_resources, + allow_no_gpu=allow_no_gpu, + ) logger.info(self._requested_plan) # Builder-style task configuration recorded for later execution. diff --git a/nemo_retriever/src/nemo_retriever/params/models.py b/nemo_retriever/src/nemo_retriever/params/models.py index f08548cbe..62a2f3dfc 100644 --- a/nemo_retriever/src/nemo_retriever/params/models.py +++ b/nemo_retriever/src/nemo_retriever/params/models.py @@ -44,6 +44,7 @@ class IngestorCreateParams(_ParamsModel): ray_log_to_driver: bool = True debug: bool = False base_url: str = "http://localhost:7670" + allow_no_gpu: bool = False class IngestExecuteParams(_ParamsModel): diff --git a/nemo_retriever/src/nemo_retriever/utils/ray_resource_hueristics.py b/nemo_retriever/src/nemo_retriever/utils/ray_resource_hueristics.py index 3454493b9..786c40284 100644 --- a/nemo_retriever/src/nemo_retriever/utils/ray_resource_hueristics.py +++ b/nemo_retriever/src/nemo_retriever/utils/ray_resource_hueristics.py @@ -411,12 +411,31 @@ def resolve_requested_plan( override_pdf_extract_batch_size: Optional[int] = None, override_pdf_extract_cpus_per_task: Optional[float] = None, override_pdf_extract_tasks: Optional[int] = None, + allow_no_gpu: bool = False, ) -> RequestedPlan: available_gpu_count = max(0, int(cluster_resources.available_gpu_count())) - if available_gpu_count == 0: + if available_gpu_count == 0 and not allow_no_gpu: raise ValueError("No GPUs available") + def _resolve_int_actors(override: Optional[int], default: int, multiply_by_available_num_gpu: bool) -> int: + if override is not None and override > 0: + return int(override) + if available_gpu_count == 0: + return 1 + if multiply_by_available_num_gpu: + return int(default * available_gpu_count) + return int(default) + + def _resolve_float_actors(override: Optional[float], default: float, multiply_by_available_num_gpu: bool) -> float: + if override is not None and override > 0.0: + return float(override) + if available_gpu_count == 0: + return 0.0 + if multiply_by_available_num_gpu: + return float(default * available_gpu_count) + return float(default) + def _resolve_int(override: Optional[int], default: int, multiply_by_available_num_gpu: bool) -> int: if override is not None and override > 0: return int(override) @@ -431,41 +450,41 @@ def _resolve_float(override: Optional[float], default: float, multiply_by_availa return float(default * available_gpu_count) return float(default) - embed_initial_actors = _resolve_int(override_embed_initial_actors, EMBED_INITIAL_ACTORS, True) - embed_min_actors = _resolve_int(override_embed_min_actors, EMBED_MIN_ACTORS, True) - embed_max_actors = _resolve_int(override_embed_max_actors, EMBED_MAX_ACTORS, True) - embed_gpus_per_actor = _resolve_float(override_embed_gpus_per_actor, EMBED_GPUS_PER_ACTOR, False) + embed_initial_actors = _resolve_int_actors(override_embed_initial_actors, EMBED_INITIAL_ACTORS, True) + embed_min_actors = _resolve_int_actors(override_embed_min_actors, EMBED_MIN_ACTORS, True) + embed_max_actors = _resolve_int_actors(override_embed_max_actors, EMBED_MAX_ACTORS, True) + embed_gpus_per_actor = _resolve_float_actors(override_embed_gpus_per_actor, EMBED_GPUS_PER_ACTOR, False) embed_batch_size = _resolve_int(override_embed_batch_size, EMBED_BATCH_SIZE, False) - nemotron_parse_initial_actors = _resolve_int( + nemotron_parse_initial_actors = _resolve_int_actors( override_nemotron_parse_initial_actors, NEMOTRON_PARSE_INITIAL_ACTORS, True ) - nemotron_parse_min_actors = _resolve_int(override_nemotron_parse_min_actors, NEMOTRON_PARSE_MIN_ACTORS, True) - nemotron_parse_max_actors = _resolve_int(override_nemotron_parse_max_actors, NEMOTRON_PARSE_MAX_ACTORS, True) - nemotron_parse_gpus_per_actor = _resolve_float( + nemotron_parse_min_actors = _resolve_int_actors(override_nemotron_parse_min_actors, NEMOTRON_PARSE_MIN_ACTORS, True) + nemotron_parse_max_actors = _resolve_int_actors(override_nemotron_parse_max_actors, NEMOTRON_PARSE_MAX_ACTORS, True) + nemotron_parse_gpus_per_actor = _resolve_float_actors( override_nemotron_parse_gpus_per_actor, NEMOTRON_PARSE_GPUS_PER_ACTOR, False ) nemotron_parse_batch_size = _resolve_int(override_nemotron_parse_batch_size, NEMOTRON_PARSE_BATCH_SIZE, False) - ocr_initial_actors = _resolve_int(override_ocr_initial_actors, OCR_INITIAL_ACTORS, True) - ocr_min_actors = _resolve_int(override_ocr_min_actors, OCR_MIN_ACTORS, True) - ocr_max_actors = _resolve_int(override_ocr_max_actors, OCR_MAX_ACTORS, True) - ocr_gpus_per_actor = _resolve_float(override_ocr_gpus_per_actor, OCR_GPUS_PER_ACTOR, False) + ocr_initial_actors = _resolve_int_actors(override_ocr_initial_actors, OCR_INITIAL_ACTORS, True) + ocr_min_actors = _resolve_int_actors(override_ocr_min_actors, OCR_MIN_ACTORS, True) + ocr_max_actors = _resolve_int_actors(override_ocr_max_actors, OCR_MAX_ACTORS, True) + ocr_gpus_per_actor = _resolve_float_actors(override_ocr_gpus_per_actor, OCR_GPUS_PER_ACTOR, False) ocr_batch_size = _resolve_int(override_ocr_batch_size, OCR_BATCH_SIZE, False) - page_elements_initial_actors = _resolve_int( + page_elements_initial_actors = _resolve_int_actors( override_page_elements_initial_actors, PAGE_ELEMENTS_INITIAL_ACTORS, True ) - page_elements_min_actors = _resolve_int(override_page_elements_min_actors, PAGE_ELEMENTS_MIN_ACTORS, True) - page_elements_max_actors = _resolve_int(override_page_elements_max_actors, PAGE_ELEMENTS_MAX_ACTORS, True) - page_elements_gpus_per_actor = _resolve_float( + page_elements_min_actors = _resolve_int_actors(override_page_elements_min_actors, PAGE_ELEMENTS_MIN_ACTORS, True) + page_elements_max_actors = _resolve_int_actors(override_page_elements_max_actors, PAGE_ELEMENTS_MAX_ACTORS, True) + page_elements_gpus_per_actor = _resolve_float_actors( override_page_elements_gpus_per_actor, PAGE_ELEMENTS_GPUS_PER_ACTOR, False ) page_elements_batch_size = _resolve_int(override_page_elements_batch_size, PAGE_ELEMENTS_BATCH_SIZE, False) pdf_extract_batch_size = _resolve_int(override_pdf_extract_batch_size, PDF_EXTRACT_BATCH_SIZE, False) pdf_extract_cpus_per_task = _resolve_float(override_pdf_extract_cpus_per_task, PDF_EXTRACT_CPUS_PER_TASK, False) - pdf_extract_tasks = _resolve_int(override_pdf_extract_tasks, PDF_EXTRACT_TASKS, True) + pdf_extract_tasks = _resolve_int_actors(override_pdf_extract_tasks, PDF_EXTRACT_TASKS, True) return RequestedPlan( embed_initial_actors=embed_initial_actors, diff --git a/nemo_retriever/tests/test_batch_ingestor.py b/nemo_retriever/tests/test_batch_ingestor.py index 21b9a7dde..48a15ae9a 100644 --- a/nemo_retriever/tests/test_batch_ingestor.py +++ b/nemo_retriever/tests/test_batch_ingestor.py @@ -43,7 +43,7 @@ def test_batch_ingestor_filters_none_runtime_env_vars(monkeypatch) -> None: ) monkeypatch.setattr( "nemo_retriever.ingest_modes.batch.resolve_requested_plan", - lambda cluster_resources: {"plan": "dummy"}, + lambda cluster_resources, allow_no_gpu=False: {"plan": "dummy"}, ) BatchIngestor(documents=[]) diff --git a/nemo_retriever/tests/test_factory.py b/nemo_retriever/tests/test_factory.py index bec77a128..4be36dfe2 100644 --- a/nemo_retriever/tests/test_factory.py +++ b/nemo_retriever/tests/test_factory.py @@ -49,6 +49,7 @@ def test_create_runmode_ingestor_batch_and_fused(monkeypatch: pytest.MonkeyPatch "ray_address": "ray://cluster", "ray_log_to_driver": False, "debug": False, + "allow_no_gpu": False, } assert fused.kwargs == { "documents": ["doc.pdf"], diff --git a/nemo_retriever/tests/test_resource_heuristics.py b/nemo_retriever/tests/test_resource_heuristics.py index b464d1cb2..c01bb5611 100644 --- a/nemo_retriever/tests/test_resource_heuristics.py +++ b/nemo_retriever/tests/test_resource_heuristics.py @@ -175,6 +175,33 @@ def test_resolve_requested_plan_raises_with_no_gpus() -> None: rh.resolve_requested_plan(cluster_resources=_make_cluster(total_gpu=0)) +def test_resolve_requested_plan_allow_no_gpu_returns_cpu_only_plan() -> None: + plan = rh.resolve_requested_plan(cluster_resources=_make_cluster(total_gpu=0), allow_no_gpu=True) + + assert plan.embed_gpus_per_actor == 0.0 + assert plan.nemotron_parse_gpus_per_actor == 0.0 + assert plan.ocr_gpus_per_actor == 0.0 + assert plan.page_elements_gpus_per_actor == 0.0 + + assert plan.embed_initial_actors == 1 + assert plan.embed_min_actors == 1 + assert plan.embed_max_actors == 1 + + assert plan.nemotron_parse_initial_actors == 1 + assert plan.nemotron_parse_min_actors == 1 + assert plan.nemotron_parse_max_actors == 1 + + assert plan.ocr_initial_actors == 1 + assert plan.ocr_min_actors == 1 + assert plan.ocr_max_actors == 1 + + assert plan.page_elements_initial_actors == 1 + assert plan.page_elements_min_actors == 1 + assert plan.page_elements_max_actors == 1 + + assert plan.pdf_extract_tasks == 1 + + # --------------------------------------------------------------------------- # RequestedPlan — getters and model behavior # ---------------------------------------------------------------------------