From 914dec7d85b5607129987020e91475ae80447b03 Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Mon, 23 Mar 2026 17:02:15 -0700 Subject: [PATCH] Fix batch ingest embed cli flags --- .../src/nemo_retriever/ingest_modes/batch.py | 41 +++++++--- nemo_retriever/tests/test_batch_ingestor.py | 79 +++++++++++++++++++ 2 files changed, 111 insertions(+), 9 deletions(-) diff --git a/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py b/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py index e00037285..0a57eebe6 100644 --- a/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py +++ b/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py @@ -255,6 +255,22 @@ def __init__( self._extract_html_kwargs: Dict[str, Any] = {} # noqa: F821 self._use_nemotron_parse_only: bool = False + @staticmethod + def _positive_int(value: Any) -> int | None: + try: + parsed = int(value) + except (TypeError, ValueError): + return None + return parsed if parsed > 0 else None + + @staticmethod + def _positive_float(value: Any) -> float | None: + try: + parsed = float(value) + except (TypeError, ValueError): + return None + return parsed if parsed > 0.0 else None + def files(self, documents: Union[str, List[str]]) -> "BatchIngestor": """ Add local files for batch processing. @@ -857,6 +873,13 @@ def embed( resolved = resolved.model_copy(update={"api_key": resolve_remote_api_key()}) kwargs = build_embed_kwargs(resolved, include_batch_tuning=True) + embed_batch_size = ( + self._positive_int(kwargs.get("embed_batch_size")) or self._requested_plan.get_embed_batch_size() + ) + embed_workers = self._positive_int(kwargs.get("embed_workers")) + embed_initial_actors = embed_workers or self._requested_plan.get_embed_initial_actors() + embed_min_actors = embed_workers or self._requested_plan.get_embed_min_actors() + embed_max_actors = embed_workers or self._requested_plan.get_embed_max_actors() # Remaining kwargs are forwarded to the actor constructor. embed_modality = resolved.embed_modality @@ -864,9 +887,7 @@ def embed( self._tasks.append(("embed", dict(kwargs))) # We want to create Ray batches that are of the same size as the embed_batch_size. - self._rd_dataset = self._rd_dataset.repartition( - target_num_rows_per_block=self._requested_plan.get_embed_batch_size() - ) + self._rd_dataset = self._rd_dataset.repartition(target_num_rows_per_block=embed_batch_size) if embed_granularity == "page": _row_fn = partial( @@ -884,7 +905,7 @@ def embed( ) self._rd_dataset = self._rd_dataset.map_batches( _row_fn, - batch_size=self._requested_plan.get_embed_batch_size(), + batch_size=embed_batch_size, batch_format="pandas", num_cpus=1, ) @@ -894,17 +915,19 @@ def embed( if endpoint: embed_actor_num_gpus = 0 # We do not need GPU resources if invoking a remote NIM endpoint else: - embed_actor_num_gpus = self._requested_plan.get_embed_gpus_per_actor() + embed_actor_num_gpus = ( + self._positive_float(kwargs.get("gpu_embed")) or self._requested_plan.get_embed_gpus_per_actor() + ) self._rd_dataset = self._rd_dataset.map_batches( _BatchEmbedActor, - batch_size=self._requested_plan.get_embed_batch_size(), + batch_size=embed_batch_size, batch_format="pandas", num_gpus=embed_actor_num_gpus, # pulled from if statement above compute=rd.ActorPoolStrategy( - initial_size=self._requested_plan.get_embed_initial_actors(), - min_size=self._requested_plan.get_embed_min_actors(), - max_size=self._requested_plan.get_embed_max_actors(), + initial_size=embed_initial_actors, + min_size=embed_min_actors, + max_size=embed_max_actors, ), fn_constructor_kwargs={"params": resolved}, ) diff --git a/nemo_retriever/tests/test_batch_ingestor.py b/nemo_retriever/tests/test_batch_ingestor.py index 21b9a7dde..b06751a35 100644 --- a/nemo_retriever/tests/test_batch_ingestor.py +++ b/nemo_retriever/tests/test_batch_ingestor.py @@ -5,6 +5,7 @@ pytest.importorskip("ray") from nemo_retriever.ingest_modes.batch import BatchIngestor +from nemo_retriever.params import EmbedParams class _DummyClusterResources: @@ -21,6 +22,34 @@ def available_gpu_count(self) -> int: return 0 +class _DummyGpuClusterResources: + def total_cpu_count(self) -> int: + return 16 + + def total_gpu_count(self) -> int: + return 2 + + def available_cpu_count(self) -> int: + return 16 + + def available_gpu_count(self) -> int: + return 2 + + +class _DummyDataset: + def __init__(self) -> None: + self.repartition_calls: list[int] = [] + self.map_batches_calls: list[dict[str, object]] = [] + + def repartition(self, *, target_num_rows_per_block: int): + self.repartition_calls.append(target_num_rows_per_block) + return self + + def map_batches(self, fn, **kwargs): + self.map_batches_calls.append({"fn": fn, **kwargs}) + return self + + def test_batch_ingestor_filters_none_runtime_env_vars(monkeypatch) -> None: captured: dict[str, object] = {} dummy_ctx = SimpleNamespace(enable_rich_progress_bars=False, use_ray_tqdm=True) @@ -56,3 +85,53 @@ def test_batch_ingestor_filters_none_runtime_env_vars(monkeypatch) -> None: } assert dummy_ctx.enable_rich_progress_bars is True assert dummy_ctx.use_ray_tqdm is False + + +def test_batch_ingestor_embed_honors_batch_tuning(monkeypatch) -> None: + dummy_ctx = SimpleNamespace(enable_rich_progress_bars=False, use_ray_tqdm=True) + + monkeypatch.setattr( + "nemo_retriever.ingest_modes.batch.ray.init", + lambda **kwargs: None, + ) + monkeypatch.setattr( + "nemo_retriever.ingest_modes.batch.rd.DataContext.get_current", + lambda: dummy_ctx, + ) + monkeypatch.setattr( + "nemo_retriever.ingest_modes.batch.gather_cluster_resources", + lambda _ray: _DummyGpuClusterResources(), + ) + monkeypatch.setattr( + "nemo_retriever.ingest_modes.batch.rd.ActorPoolStrategy", + lambda *, initial_size, min_size, max_size: SimpleNamespace( + initial_size=initial_size, + min_size=min_size, + max_size=max_size, + ), + ) + + ingestor = BatchIngestor(documents=[]) + dataset = _DummyDataset() + ingestor._rd_dataset = dataset + + ingestor.embed( + EmbedParams( + model_name="nvidia/llama-nemotron-embed-vl-1b-v2", + embed_granularity="page", + batch_tuning={ + "embed_workers": 1, + "embed_batch_size": 1, + "gpu_embed": 1.0, + }, + ) + ) + + assert dataset.repartition_calls == [1] + assert dataset.map_batches_calls[0]["batch_size"] == 1 + assert dataset.map_batches_calls[1]["batch_size"] == 1 + assert dataset.map_batches_calls[1]["num_gpus"] == 1.0 + compute = dataset.map_batches_calls[1]["compute"] + assert compute.initial_size == 1 + assert compute.min_size == 1 + assert compute.max_size == 1