diff --git a/raggify/src/raggify/ingest/ingest.py b/raggify/src/raggify/ingest/ingest.py index e8825cf..d744fb9 100644 --- a/raggify/src/raggify/ingest/ingest.py +++ b/raggify/src/raggify/ingest/ingest.py @@ -102,6 +102,7 @@ async def aingest_path( video_nodes=videos, persist_dir=rt.cfg.pipeline.persist_dir, pipe_batch_size=pipe_batch_size, + force=force, is_canceled=is_canceled, ) @@ -162,6 +163,7 @@ async def aingest_path_list( video_nodes=videos, persist_dir=rt.cfg.pipeline.persist_dir, pipe_batch_size=pipe_batch_size, + force=force, is_canceled=is_canceled, ) @@ -227,6 +229,7 @@ async def aingest_url( video_nodes=videos, persist_dir=rt.cfg.pipeline.persist_dir, pipe_batch_size=pipe_batch_size, + force=force, is_canceled=is_canceled, ) @@ -288,5 +291,6 @@ async def aingest_url_list( video_nodes=videos, persist_dir=rt.cfg.pipeline.persist_dir, pipe_batch_size=pipe_batch_size, + force=force, is_canceled=is_canceled, ) diff --git a/raggify/src/raggify/ingest/upsert.py b/raggify/src/raggify/ingest/upsert.py index 5f00b88..26a9e83 100644 --- a/raggify/src/raggify/ingest/upsert.py +++ b/raggify/src/raggify/ingest/upsert.py @@ -213,6 +213,7 @@ async def _process_batch( batch: Sequence[BaseNode], modality: Modality, persist_dir: Optional[Path], + force: bool, is_canceled: Callable[[], bool], ) -> Sequence[BaseNode]: """Process a batch of nodes through the pipeline. @@ -221,6 +222,7 @@ async def _process_batch( batch (Sequence[BaseNode]): Nodes in the batch. modality (Modality): Target modality. persist_dir (Optional[Path]): Persist directory. + force (bool): Whether to force reingestion even if already present. is_canceled (Callable[[], bool]): Cancellation flag for the job. Raises: @@ -236,6 +238,13 @@ async def _process_batch( try: pipe.reset_nodes() transformed_nodes = await pipe.arun(nodes=batch) + + if force and not transformed_nodes: + raise RuntimeError( + "no nodes were transformed in force mode, " + "flushing existing nodes and reprocessing" + ) + rt.pipeline.persist(pipe=pipe, modality=modality, persist_dir=persist_dir) # Return [] if no nodes were processed @@ -269,6 +278,7 @@ async def _process_batches( modality: Modality, persist_dir: Optional[Path], pipe_batch_size: int, + force: bool, is_canceled: Callable[[], bool], batch_interval_sec: float, batch_retry_interval_sec: list[float], @@ -280,6 +290,7 @@ async def _process_batches( modality (Modality): Target modality. persist_dir (Optional[Path]): Persist directory. pipe_batch_size (int): Number of nodes processed per pipeline batch. + force (bool): Whether to force reingestion even if already present. is_canceled (Callable[[], bool]): Cancellation flag for the job. batch_interval_sec (float): Delay between processing batches in seconds. batch_retry_interval_sec (list[float]): @@ -308,6 +319,7 @@ async def _process_batches( batch=batch, modality=modality, persist_dir=persist_dir, + force=force, is_canceled=is_canceled, ) transformed += len(temp) @@ -366,6 +378,7 @@ async def aupsert_nodes( video_nodes: Sequence[VideoNode], persist_dir: Optional[Path], pipe_batch_size: int, + force: bool, is_canceled: Callable[[], bool], ) -> None: """Upsert nodes into stores. @@ -377,6 +390,7 @@ async def aupsert_nodes( video_nodes (Sequence[VideoNode]): Video nodes. persist_dir (Optional[Path]): Persist directory. pipe_batch_size (int): Number of nodes processed per pipeline batch. + force (bool): Whether to force reingestion even if already present. is_canceled (Callable[[], bool]): Cancellation flag for the job. """ rt = _rt() @@ -391,6 +405,7 @@ async def aupsert_nodes( modality=Modality.TEXT, persist_dir=persist_dir, pipe_batch_size=pipe_batch_size, + force=force, is_canceled=is_canceled, batch_interval_sec=batch_interval_sec, batch_retry_interval_sec=batch_retry_interval_sec, @@ -404,6 +419,7 @@ async def aupsert_nodes( modality=Modality.IMAGE, persist_dir=persist_dir, pipe_batch_size=pipe_batch_size, + force=force, is_canceled=is_canceled, batch_interval_sec=batch_interval_sec, batch_retry_interval_sec=batch_retry_interval_sec, @@ -417,6 +433,7 @@ async def aupsert_nodes( modality=Modality.AUDIO, persist_dir=persist_dir, pipe_batch_size=pipe_batch_size, + force=force, is_canceled=is_canceled, batch_interval_sec=batch_interval_sec, batch_retry_interval_sec=batch_retry_interval_sec, @@ -430,6 +447,7 @@ async def aupsert_nodes( modality=Modality.VIDEO, persist_dir=persist_dir, pipe_batch_size=pipe_batch_size, + force=force, is_canceled=is_canceled, batch_interval_sec=batch_interval_sec, batch_retry_interval_sec=batch_retry_interval_sec,