From 7a41447f93945a841ceb7f6b40b2f64929e0644c Mon Sep 17 00:00:00 2001 From: jun76 Date: Sat, 3 Jan 2026 14:10:30 +0000 Subject: [PATCH] =?UTF-8?q?TreeRAG=20=E5=AF=BE=E5=BF=9C=E5=BE=8C=E3=80=81r?= =?UTF-8?q?ef=5Fdoc=5Fid=20=E3=81=8C=E6=AF=8E=E5=9B=9E=E5=A4=89=E3=82=8F?= =?UTF-8?q?=E3=81=A3=E3=81=A6=E3=81=97=E3=81=BE=E3=81=86=E3=82=88=E3=81=86?= =?UTF-8?q?=E3=81=A7=E3=80=81=E3=82=BD=E3=83=BC=E3=82=B9=E3=81=8C=E6=AF=8E?= =?UTF-8?q?=E5=9B=9E=E6=96=B0=E8=A6=8F=E5=88=A4=E5=AE=9A=E3=81=AB=E3=81=AA?= =?UTF-8?q?=E3=82=8B=E3=80=82=E3=82=B3=E3=83=9F=E3=83=83=E3=83=88=20457144?= =?UTF-8?q?c=20=E3=81=AE=E7=B6=9A=E3=81=8D=E3=82=92=E5=AE=9F=E8=A3=85?= =?UTF-8?q?=E4=B8=AD=E3=81=A0=E3=81=8C=E4=B8=80=E6=97=A6=E3=83=AD=E3=83=BC?= =?UTF-8?q?=E3=83=AB=E3=83=90=E3=83=83=E3=82=AF=E3=81=99=E3=82=8B=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- raggify/src/raggify/ingest/ingest.py | 4 ++++ raggify/src/raggify/ingest/upsert.py | 18 ++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/raggify/src/raggify/ingest/ingest.py b/raggify/src/raggify/ingest/ingest.py index e8825cf..d744fb9 100644 --- a/raggify/src/raggify/ingest/ingest.py +++ b/raggify/src/raggify/ingest/ingest.py @@ -102,6 +102,7 @@ async def aingest_path( video_nodes=videos, persist_dir=rt.cfg.pipeline.persist_dir, pipe_batch_size=pipe_batch_size, + force=force, is_canceled=is_canceled, ) @@ -162,6 +163,7 @@ async def aingest_path_list( video_nodes=videos, persist_dir=rt.cfg.pipeline.persist_dir, pipe_batch_size=pipe_batch_size, + force=force, is_canceled=is_canceled, ) @@ -227,6 +229,7 @@ async def aingest_url( video_nodes=videos, persist_dir=rt.cfg.pipeline.persist_dir, pipe_batch_size=pipe_batch_size, + force=force, is_canceled=is_canceled, ) @@ -288,5 +291,6 @@ async def aingest_url_list( video_nodes=videos, persist_dir=rt.cfg.pipeline.persist_dir, pipe_batch_size=pipe_batch_size, + force=force, is_canceled=is_canceled, ) diff --git a/raggify/src/raggify/ingest/upsert.py b/raggify/src/raggify/ingest/upsert.py index 5f00b88..26a9e83 100644 --- a/raggify/src/raggify/ingest/upsert.py +++ b/raggify/src/raggify/ingest/upsert.py @@ -213,6 +213,7 @@ async def _process_batch( batch: Sequence[BaseNode], modality: Modality, persist_dir: Optional[Path], + force: bool, is_canceled: Callable[[], bool], ) -> Sequence[BaseNode]: """Process a batch of nodes through the pipeline. @@ -221,6 +222,7 @@ async def _process_batch( batch (Sequence[BaseNode]): Nodes in the batch. modality (Modality): Target modality. persist_dir (Optional[Path]): Persist directory. + force (bool): Whether to force reingestion even if already present. is_canceled (Callable[[], bool]): Cancellation flag for the job. Raises: @@ -236,6 +238,13 @@ async def _process_batch( try: pipe.reset_nodes() transformed_nodes = await pipe.arun(nodes=batch) + + if force and not transformed_nodes: + raise RuntimeError( + "no nodes were transformed in force mode, " + "flushing existing nodes and reprocessing" + ) + rt.pipeline.persist(pipe=pipe, modality=modality, persist_dir=persist_dir) # Return [] if no nodes were processed @@ -269,6 +278,7 @@ async def _process_batches( modality: Modality, persist_dir: Optional[Path], pipe_batch_size: int, + force: bool, is_canceled: Callable[[], bool], batch_interval_sec: float, batch_retry_interval_sec: list[float], @@ -280,6 +290,7 @@ async def _process_batches( modality (Modality): Target modality. persist_dir (Optional[Path]): Persist directory. pipe_batch_size (int): Number of nodes processed per pipeline batch. + force (bool): Whether to force reingestion even if already present. is_canceled (Callable[[], bool]): Cancellation flag for the job. batch_interval_sec (float): Delay between processing batches in seconds. batch_retry_interval_sec (list[float]): @@ -308,6 +319,7 @@ async def _process_batches( batch=batch, modality=modality, persist_dir=persist_dir, + force=force, is_canceled=is_canceled, ) transformed += len(temp) @@ -366,6 +378,7 @@ async def aupsert_nodes( video_nodes: Sequence[VideoNode], persist_dir: Optional[Path], pipe_batch_size: int, + force: bool, is_canceled: Callable[[], bool], ) -> None: """Upsert nodes into stores. @@ -377,6 +390,7 @@ async def aupsert_nodes( video_nodes (Sequence[VideoNode]): Video nodes. persist_dir (Optional[Path]): Persist directory. pipe_batch_size (int): Number of nodes processed per pipeline batch. + force (bool): Whether to force reingestion even if already present. is_canceled (Callable[[], bool]): Cancellation flag for the job. """ rt = _rt() @@ -391,6 +405,7 @@ async def aupsert_nodes( modality=Modality.TEXT, persist_dir=persist_dir, pipe_batch_size=pipe_batch_size, + force=force, is_canceled=is_canceled, batch_interval_sec=batch_interval_sec, batch_retry_interval_sec=batch_retry_interval_sec, @@ -404,6 +419,7 @@ async def aupsert_nodes( modality=Modality.IMAGE, persist_dir=persist_dir, pipe_batch_size=pipe_batch_size, + force=force, is_canceled=is_canceled, batch_interval_sec=batch_interval_sec, batch_retry_interval_sec=batch_retry_interval_sec, @@ -417,6 +433,7 @@ async def aupsert_nodes( modality=Modality.AUDIO, persist_dir=persist_dir, pipe_batch_size=pipe_batch_size, + force=force, is_canceled=is_canceled, batch_interval_sec=batch_interval_sec, batch_retry_interval_sec=batch_retry_interval_sec, @@ -430,6 +447,7 @@ async def aupsert_nodes( modality=Modality.VIDEO, persist_dir=persist_dir, pipe_batch_size=pipe_batch_size, + force=force, is_canceled=is_canceled, batch_interval_sec=batch_interval_sec, batch_retry_interval_sec=batch_retry_interval_sec,