Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def create_runmode_ingestor(*, run_mode: RunMode = "inprocess", params: Ingestor
ray_address=p.ray_address,
ray_log_to_driver=p.ray_log_to_driver,
debug=p.debug,
allow_no_gpu=p.allow_no_gpu,
)
if run_mode == "fused":
from nemo_retriever.ingest_modes.fused import FusedIngestor
Expand Down
6 changes: 5 additions & 1 deletion nemo_retriever/src/nemo_retriever/ingest_modes/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def __init__(
ray_address: Optional[str] = None,
ray_log_to_driver: bool = True,
debug: bool = False,
allow_no_gpu: bool = False,
) -> None:
super().__init__(documents=documents)

Expand Down Expand Up @@ -240,7 +241,10 @@ def __init__(
logger.info(self._cluster_resources)

# 2. Resolve requested plan for the Ray DAG that will be built
self._requested_plan = resolve_requested_plan(cluster_resources=self._cluster_resources)
self._requested_plan = resolve_requested_plan(
cluster_resources=self._cluster_resources,
allow_no_gpu=allow_no_gpu,
)
logger.info(self._requested_plan)

# Builder-style task configuration recorded for later execution.
Expand Down
1 change: 1 addition & 0 deletions nemo_retriever/src/nemo_retriever/params/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class IngestorCreateParams(_ParamsModel):
ray_log_to_driver: bool = True
debug: bool = False
base_url: str = "http://localhost:7670"
allow_no_gpu: bool = False


class IngestExecuteParams(_ParamsModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,12 +411,31 @@ def resolve_requested_plan(
override_pdf_extract_batch_size: Optional[int] = None,
override_pdf_extract_cpus_per_task: Optional[float] = None,
override_pdf_extract_tasks: Optional[int] = None,
allow_no_gpu: bool = False,
) -> RequestedPlan:
available_gpu_count = max(0, int(cluster_resources.available_gpu_count()))

if available_gpu_count == 0:
if available_gpu_count == 0 and not allow_no_gpu:
raise ValueError("No GPUs available")

def _resolve_int_actors(override: Optional[int], default: int, multiply_by_available_num_gpu: bool) -> int:
if override is not None and override > 0:
return int(override)
if available_gpu_count == 0:
return 1
if multiply_by_available_num_gpu:
return int(default * available_gpu_count)
return int(default)

def _resolve_float_actors(override: Optional[float], default: float, multiply_by_available_num_gpu: bool) -> float:
if override is not None and override > 0.0:
return float(override)
if available_gpu_count == 0:
return 0.0
if multiply_by_available_num_gpu:
return float(default * available_gpu_count)
return float(default)

def _resolve_int(override: Optional[int], default: int, multiply_by_available_num_gpu: bool) -> int:
if override is not None and override > 0:
return int(override)
Expand All @@ -431,41 +450,41 @@ def _resolve_float(override: Optional[float], default: float, multiply_by_availa
return float(default * available_gpu_count)
return float(default)

embed_initial_actors = _resolve_int(override_embed_initial_actors, EMBED_INITIAL_ACTORS, True)
embed_min_actors = _resolve_int(override_embed_min_actors, EMBED_MIN_ACTORS, True)
embed_max_actors = _resolve_int(override_embed_max_actors, EMBED_MAX_ACTORS, True)
embed_gpus_per_actor = _resolve_float(override_embed_gpus_per_actor, EMBED_GPUS_PER_ACTOR, False)
embed_initial_actors = _resolve_int_actors(override_embed_initial_actors, EMBED_INITIAL_ACTORS, True)
embed_min_actors = _resolve_int_actors(override_embed_min_actors, EMBED_MIN_ACTORS, True)
embed_max_actors = _resolve_int_actors(override_embed_max_actors, EMBED_MAX_ACTORS, True)
embed_gpus_per_actor = _resolve_float_actors(override_embed_gpus_per_actor, EMBED_GPUS_PER_ACTOR, False)
embed_batch_size = _resolve_int(override_embed_batch_size, EMBED_BATCH_SIZE, False)

nemotron_parse_initial_actors = _resolve_int(
nemotron_parse_initial_actors = _resolve_int_actors(
override_nemotron_parse_initial_actors, NEMOTRON_PARSE_INITIAL_ACTORS, True
)
nemotron_parse_min_actors = _resolve_int(override_nemotron_parse_min_actors, NEMOTRON_PARSE_MIN_ACTORS, True)
nemotron_parse_max_actors = _resolve_int(override_nemotron_parse_max_actors, NEMOTRON_PARSE_MAX_ACTORS, True)
nemotron_parse_gpus_per_actor = _resolve_float(
nemotron_parse_min_actors = _resolve_int_actors(override_nemotron_parse_min_actors, NEMOTRON_PARSE_MIN_ACTORS, True)
nemotron_parse_max_actors = _resolve_int_actors(override_nemotron_parse_max_actors, NEMOTRON_PARSE_MAX_ACTORS, True)
nemotron_parse_gpus_per_actor = _resolve_float_actors(
override_nemotron_parse_gpus_per_actor, NEMOTRON_PARSE_GPUS_PER_ACTOR, False
)
nemotron_parse_batch_size = _resolve_int(override_nemotron_parse_batch_size, NEMOTRON_PARSE_BATCH_SIZE, False)

ocr_initial_actors = _resolve_int(override_ocr_initial_actors, OCR_INITIAL_ACTORS, True)
ocr_min_actors = _resolve_int(override_ocr_min_actors, OCR_MIN_ACTORS, True)
ocr_max_actors = _resolve_int(override_ocr_max_actors, OCR_MAX_ACTORS, True)
ocr_gpus_per_actor = _resolve_float(override_ocr_gpus_per_actor, OCR_GPUS_PER_ACTOR, False)
ocr_initial_actors = _resolve_int_actors(override_ocr_initial_actors, OCR_INITIAL_ACTORS, True)
ocr_min_actors = _resolve_int_actors(override_ocr_min_actors, OCR_MIN_ACTORS, True)
ocr_max_actors = _resolve_int_actors(override_ocr_max_actors, OCR_MAX_ACTORS, True)
ocr_gpus_per_actor = _resolve_float_actors(override_ocr_gpus_per_actor, OCR_GPUS_PER_ACTOR, False)
ocr_batch_size = _resolve_int(override_ocr_batch_size, OCR_BATCH_SIZE, False)

page_elements_initial_actors = _resolve_int(
page_elements_initial_actors = _resolve_int_actors(
override_page_elements_initial_actors, PAGE_ELEMENTS_INITIAL_ACTORS, True
)
page_elements_min_actors = _resolve_int(override_page_elements_min_actors, PAGE_ELEMENTS_MIN_ACTORS, True)
page_elements_max_actors = _resolve_int(override_page_elements_max_actors, PAGE_ELEMENTS_MAX_ACTORS, True)
page_elements_gpus_per_actor = _resolve_float(
page_elements_min_actors = _resolve_int_actors(override_page_elements_min_actors, PAGE_ELEMENTS_MIN_ACTORS, True)
page_elements_max_actors = _resolve_int_actors(override_page_elements_max_actors, PAGE_ELEMENTS_MAX_ACTORS, True)
page_elements_gpus_per_actor = _resolve_float_actors(
override_page_elements_gpus_per_actor, PAGE_ELEMENTS_GPUS_PER_ACTOR, False
)
page_elements_batch_size = _resolve_int(override_page_elements_batch_size, PAGE_ELEMENTS_BATCH_SIZE, False)

pdf_extract_batch_size = _resolve_int(override_pdf_extract_batch_size, PDF_EXTRACT_BATCH_SIZE, False)
pdf_extract_cpus_per_task = _resolve_float(override_pdf_extract_cpus_per_task, PDF_EXTRACT_CPUS_PER_TASK, False)
pdf_extract_tasks = _resolve_int(override_pdf_extract_tasks, PDF_EXTRACT_TASKS, True)
pdf_extract_tasks = _resolve_int_actors(override_pdf_extract_tasks, PDF_EXTRACT_TASKS, True)

return RequestedPlan(
embed_initial_actors=embed_initial_actors,
Expand Down
2 changes: 1 addition & 1 deletion nemo_retriever/tests/test_batch_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def test_batch_ingestor_filters_none_runtime_env_vars(monkeypatch) -> None:
)
monkeypatch.setattr(
"nemo_retriever.ingest_modes.batch.resolve_requested_plan",
lambda cluster_resources: {"plan": "dummy"},
lambda cluster_resources, allow_no_gpu=False: {"plan": "dummy"},
)

BatchIngestor(documents=[])
Expand Down
1 change: 1 addition & 0 deletions nemo_retriever/tests/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def test_create_runmode_ingestor_batch_and_fused(monkeypatch: pytest.MonkeyPatch
"ray_address": "ray://cluster",
"ray_log_to_driver": False,
"debug": False,
"allow_no_gpu": False,
}
assert fused.kwargs == {
"documents": ["doc.pdf"],
Expand Down
27 changes: 27 additions & 0 deletions nemo_retriever/tests/test_resource_heuristics.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,33 @@ def test_resolve_requested_plan_raises_with_no_gpus() -> None:
rh.resolve_requested_plan(cluster_resources=_make_cluster(total_gpu=0))


def test_resolve_requested_plan_allow_no_gpu_returns_cpu_only_plan() -> None:
plan = rh.resolve_requested_plan(cluster_resources=_make_cluster(total_gpu=0), allow_no_gpu=True)

assert plan.embed_gpus_per_actor == 0.0
assert plan.nemotron_parse_gpus_per_actor == 0.0
assert plan.ocr_gpus_per_actor == 0.0
assert plan.page_elements_gpus_per_actor == 0.0

assert plan.embed_initial_actors == 1
assert plan.embed_min_actors == 1
assert plan.embed_max_actors == 1

assert plan.nemotron_parse_initial_actors == 1
assert plan.nemotron_parse_min_actors == 1
assert plan.nemotron_parse_max_actors == 1

assert plan.ocr_initial_actors == 1
assert plan.ocr_min_actors == 1
assert plan.ocr_max_actors == 1

assert plan.page_elements_initial_actors == 1
assert plan.page_elements_min_actors == 1
assert plan.page_elements_max_actors == 1

assert plan.pdf_extract_tasks == 1


# ---------------------------------------------------------------------------
# RequestedPlan — getters and model behavior
# ---------------------------------------------------------------------------
Expand Down
Loading