Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions nemo_retriever/src/nemo_retriever/ingest_modes/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,22 @@ def __init__(
self._extract_html_kwargs: Dict[str, Any] = {} # noqa: F821
self._use_nemotron_parse_only: bool = False

@staticmethod
def _positive_int(value: Any) -> int | None:
try:
parsed = int(value)
except (TypeError, ValueError):
return None
return parsed if parsed > 0 else None

@staticmethod
def _positive_float(value: Any) -> float | None:
try:
parsed = float(value)
except (TypeError, ValueError):
return None
return parsed if parsed > 0.0 else None

def files(self, documents: Union[str, List[str]]) -> "BatchIngestor":
"""
Add local files for batch processing.
Expand Down Expand Up @@ -857,16 +873,21 @@ def embed(
resolved = resolved.model_copy(update={"api_key": resolve_remote_api_key()})

kwargs = build_embed_kwargs(resolved, include_batch_tuning=True)
embed_batch_size = (
self._positive_int(kwargs.get("embed_batch_size")) or self._requested_plan.get_embed_batch_size()
)
embed_workers = self._positive_int(kwargs.get("embed_workers"))
embed_initial_actors = embed_workers or self._requested_plan.get_embed_initial_actors()
embed_min_actors = embed_workers or self._requested_plan.get_embed_min_actors()
embed_max_actors = embed_workers or self._requested_plan.get_embed_max_actors()

# Remaining kwargs are forwarded to the actor constructor.
embed_modality = resolved.embed_modality
embed_granularity = resolved.embed_granularity
self._tasks.append(("embed", dict(kwargs)))

# We want to create Ray batches that are of the same size as the embed_batch_size.
self._rd_dataset = self._rd_dataset.repartition(
target_num_rows_per_block=self._requested_plan.get_embed_batch_size()
)
self._rd_dataset = self._rd_dataset.repartition(target_num_rows_per_block=embed_batch_size)

if embed_granularity == "page":
_row_fn = partial(
Expand All @@ -884,7 +905,7 @@ def embed(
)
self._rd_dataset = self._rd_dataset.map_batches(
_row_fn,
batch_size=self._requested_plan.get_embed_batch_size(),
batch_size=embed_batch_size,
batch_format="pandas",
num_cpus=1,
)
Expand All @@ -894,17 +915,19 @@ def embed(
if endpoint:
embed_actor_num_gpus = 0 # We do not need GPU resources if invoking a remote NIM endpoint
else:
embed_actor_num_gpus = self._requested_plan.get_embed_gpus_per_actor()
embed_actor_num_gpus = (
self._positive_float(kwargs.get("gpu_embed")) or self._requested_plan.get_embed_gpus_per_actor()
)

self._rd_dataset = self._rd_dataset.map_batches(
_BatchEmbedActor,
batch_size=self._requested_plan.get_embed_batch_size(),
batch_size=embed_batch_size,
batch_format="pandas",
num_gpus=embed_actor_num_gpus, # pulled from if statement above
compute=rd.ActorPoolStrategy(
initial_size=self._requested_plan.get_embed_initial_actors(),
min_size=self._requested_plan.get_embed_min_actors(),
max_size=self._requested_plan.get_embed_max_actors(),
initial_size=embed_initial_actors,
min_size=embed_min_actors,
max_size=embed_max_actors,
),
fn_constructor_kwargs={"params": resolved},
)
Expand Down
79 changes: 79 additions & 0 deletions nemo_retriever/tests/test_batch_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
pytest.importorskip("ray")

from nemo_retriever.ingest_modes.batch import BatchIngestor
from nemo_retriever.params import EmbedParams


class _DummyClusterResources:
Expand All @@ -21,6 +22,34 @@ def available_gpu_count(self) -> int:
return 0


class _DummyGpuClusterResources:
def total_cpu_count(self) -> int:
return 16

def total_gpu_count(self) -> int:
return 2

def available_cpu_count(self) -> int:
return 16

def available_gpu_count(self) -> int:
return 2


class _DummyDataset:
def __init__(self) -> None:
self.repartition_calls: list[int] = []
self.map_batches_calls: list[dict[str, object]] = []

def repartition(self, *, target_num_rows_per_block: int):
self.repartition_calls.append(target_num_rows_per_block)
return self

def map_batches(self, fn, **kwargs):
self.map_batches_calls.append({"fn": fn, **kwargs})
return self


def test_batch_ingestor_filters_none_runtime_env_vars(monkeypatch) -> None:
captured: dict[str, object] = {}
dummy_ctx = SimpleNamespace(enable_rich_progress_bars=False, use_ray_tqdm=True)
Expand Down Expand Up @@ -56,3 +85,53 @@ def test_batch_ingestor_filters_none_runtime_env_vars(monkeypatch) -> None:
}
assert dummy_ctx.enable_rich_progress_bars is True
assert dummy_ctx.use_ray_tqdm is False


def test_batch_ingestor_embed_honors_batch_tuning(monkeypatch) -> None:
dummy_ctx = SimpleNamespace(enable_rich_progress_bars=False, use_ray_tqdm=True)

monkeypatch.setattr(
"nemo_retriever.ingest_modes.batch.ray.init",
lambda **kwargs: None,
)
monkeypatch.setattr(
"nemo_retriever.ingest_modes.batch.rd.DataContext.get_current",
lambda: dummy_ctx,
)
monkeypatch.setattr(
"nemo_retriever.ingest_modes.batch.gather_cluster_resources",
lambda _ray: _DummyGpuClusterResources(),
)
monkeypatch.setattr(
"nemo_retriever.ingest_modes.batch.rd.ActorPoolStrategy",
lambda *, initial_size, min_size, max_size: SimpleNamespace(
initial_size=initial_size,
min_size=min_size,
max_size=max_size,
),
)

ingestor = BatchIngestor(documents=[])
dataset = _DummyDataset()
ingestor._rd_dataset = dataset

ingestor.embed(
EmbedParams(
model_name="nvidia/llama-nemotron-embed-vl-1b-v2",
embed_granularity="page",
batch_tuning={
"embed_workers": 1,
"embed_batch_size": 1,
"gpu_embed": 1.0,
},
)
)

assert dataset.repartition_calls == [1]
assert dataset.map_batches_calls[0]["batch_size"] == 1
assert dataset.map_batches_calls[1]["batch_size"] == 1
assert dataset.map_batches_calls[1]["num_gpus"] == 1.0
compute = dataset.map_batches_calls[1]["compute"]
assert compute.initial_size == 1
assert compute.min_size == 1
assert compute.max_size == 1
Loading