Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions raggify/src/raggify/ingest/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ async def aingest_path(
video_nodes=videos,
persist_dir=rt.cfg.pipeline.persist_dir,
pipe_batch_size=pipe_batch_size,
force=force,
is_canceled=is_canceled,
)

Expand Down Expand Up @@ -162,6 +163,7 @@ async def aingest_path_list(
video_nodes=videos,
persist_dir=rt.cfg.pipeline.persist_dir,
pipe_batch_size=pipe_batch_size,
force=force,
is_canceled=is_canceled,
)

Expand Down Expand Up @@ -227,6 +229,7 @@ async def aingest_url(
video_nodes=videos,
persist_dir=rt.cfg.pipeline.persist_dir,
pipe_batch_size=pipe_batch_size,
force=force,
is_canceled=is_canceled,
)

Expand Down Expand Up @@ -288,5 +291,6 @@ async def aingest_url_list(
video_nodes=videos,
persist_dir=rt.cfg.pipeline.persist_dir,
pipe_batch_size=pipe_batch_size,
force=force,
is_canceled=is_canceled,
)
18 changes: 18 additions & 0 deletions raggify/src/raggify/ingest/upsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ async def _process_batch(
batch: Sequence[BaseNode],
modality: Modality,
persist_dir: Optional[Path],
force: bool,
is_canceled: Callable[[], bool],
) -> Sequence[BaseNode]:
"""Process a batch of nodes through the pipeline.
Expand All @@ -221,6 +222,7 @@ async def _process_batch(
batch (Sequence[BaseNode]): Nodes in the batch.
modality (Modality): Target modality.
persist_dir (Optional[Path]): Persist directory.
force (bool): Whether to force reingestion even if already present.
is_canceled (Callable[[], bool]): Cancellation flag for the job.

Raises:
Expand All @@ -236,6 +238,13 @@ async def _process_batch(
try:
pipe.reset_nodes()
transformed_nodes = await pipe.arun(nodes=batch)

if force and not transformed_nodes:
raise RuntimeError(
"no nodes were transformed in force mode, "
"flushing existing nodes and reprocessing"
)

rt.pipeline.persist(pipe=pipe, modality=modality, persist_dir=persist_dir)

# Return [] if no nodes were processed
Expand Down Expand Up @@ -269,6 +278,7 @@ async def _process_batches(
modality: Modality,
persist_dir: Optional[Path],
pipe_batch_size: int,
force: bool,
is_canceled: Callable[[], bool],
batch_interval_sec: float,
batch_retry_interval_sec: list[float],
Expand All @@ -280,6 +290,7 @@ async def _process_batches(
modality (Modality): Target modality.
persist_dir (Optional[Path]): Persist directory.
pipe_batch_size (int): Number of nodes processed per pipeline batch.
force (bool): Whether to force reingestion even if already present.
is_canceled (Callable[[], bool]): Cancellation flag for the job.
batch_interval_sec (float): Delay between processing batches in seconds.
batch_retry_interval_sec (list[float]):
Expand Down Expand Up @@ -308,6 +319,7 @@ async def _process_batches(
batch=batch,
modality=modality,
persist_dir=persist_dir,
force=force,
is_canceled=is_canceled,
)
transformed += len(temp)
Expand Down Expand Up @@ -366,6 +378,7 @@ async def aupsert_nodes(
video_nodes: Sequence[VideoNode],
persist_dir: Optional[Path],
pipe_batch_size: int,
force: bool,
is_canceled: Callable[[], bool],
) -> None:
"""Upsert nodes into stores.
Expand All @@ -377,6 +390,7 @@ async def aupsert_nodes(
video_nodes (Sequence[VideoNode]): Video nodes.
persist_dir (Optional[Path]): Persist directory.
pipe_batch_size (int): Number of nodes processed per pipeline batch.
force (bool): Whether to force reingestion even if already present.
is_canceled (Callable[[], bool]): Cancellation flag for the job.
"""
rt = _rt()
Expand All @@ -391,6 +405,7 @@ async def aupsert_nodes(
modality=Modality.TEXT,
persist_dir=persist_dir,
pipe_batch_size=pipe_batch_size,
force=force,
is_canceled=is_canceled,
batch_interval_sec=batch_interval_sec,
batch_retry_interval_sec=batch_retry_interval_sec,
Expand All @@ -404,6 +419,7 @@ async def aupsert_nodes(
modality=Modality.IMAGE,
persist_dir=persist_dir,
pipe_batch_size=pipe_batch_size,
force=force,
is_canceled=is_canceled,
batch_interval_sec=batch_interval_sec,
batch_retry_interval_sec=batch_retry_interval_sec,
Expand All @@ -417,6 +433,7 @@ async def aupsert_nodes(
modality=Modality.AUDIO,
persist_dir=persist_dir,
pipe_batch_size=pipe_batch_size,
force=force,
is_canceled=is_canceled,
batch_interval_sec=batch_interval_sec,
batch_retry_interval_sec=batch_retry_interval_sec,
Expand All @@ -430,6 +447,7 @@ async def aupsert_nodes(
modality=Modality.VIDEO,
persist_dir=persist_dir,
pipe_batch_size=pipe_batch_size,
force=force,
is_canceled=is_canceled,
batch_interval_sec=batch_interval_sec,
batch_retry_interval_sec=batch_retry_interval_sec,
Expand Down