Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 45 additions & 140 deletions src/infrastructure/rag/lightrag_adapter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
import hashlib
import os
import tempfile
import time
from pathlib import Path
from typing import Literal, cast

from fastapi.logger import logger
Expand Down Expand Up @@ -139,49 +141,6 @@ async def vision_call(
)
return self.rag[working_dir]

# ------------------------------------------------------------------
# LLM callables (passed directly to RAGAnything)
# ------------------------------------------------------------------

async def _llm_call(
self, prompt, system_prompt=None, history_messages=None, **kwargs
):
if history_messages is None:
history_messages = []
return await openai_complete_if_cache(
self._llm_config.CHAT_MODEL,
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=self._llm_config.api_key,
base_url=self._llm_config.api_base_url,
**kwargs,
)

async def _vision_call(
self,
prompt,
system_prompt=None,
history_messages=None,
image_data=None,
**kwargs,
):
if history_messages is None:
history_messages = []
messages = _build_vision_messages(
system_prompt, history_messages, prompt, image_data
)
return await openai_complete_if_cache(
self._llm_config.VISION_MODEL,
"Image Description Task",
system_prompt=None,
history_messages=messages,
api_key=self._llm_config.api_key,
base_url=self._llm_config.api_base_url,
messages=messages,
**kwargs,
)

# ------------------------------------------------------------------
# Port implementation — indexing
# ------------------------------------------------------------------
Expand Down Expand Up @@ -232,74 +191,80 @@ async def index_folder(
file_extensions: list[str] | None = None,
working_dir: str = "",
) -> FolderIndexingResult:
"""Index a folder by processing each document sequentially.
"""Index a folder by processing documents concurrently.

RAGAnything's process_folder_complete uses deepcopy internally which
fails with asyncpg/asyncio objects. We iterate files manually and
call process_document_complete for each one instead.
Uses ``asyncio.Semaphore`` bounded by ``MAX_CONCURRENT_FILES`` so
that at most *N* files are processed at the same time. When
``MAX_CONCURRENT_FILES <= 1`` behaviour is identical to the old
sequential loop.
"""
start_time = time.time()
rag = self._ensure_initialized(working_dir)
await rag._ensure_lightrag_initialized()

glob_pattern = "**/*" if recursive else "*"
from pathlib import Path

folder = Path(folder_path)
all_files = [f for f in folder.glob(glob_pattern) if f.is_file()]

if file_extensions:
exts = set(file_extensions)
all_files = [f for f in all_files if f.suffix in exts]

max_workers = max(1, self._rag_config.MAX_CONCURRENT_FILES)
semaphore = asyncio.Semaphore(max_workers)

succeeded = 0
failed = 0
file_results: list[FileProcessingDetail] = []

for file_path_obj in all_files:
try:
await rag.process_document_complete(
file_path=str(file_path_obj),
output_dir=output_dir,
parse_method="txt",
)
succeeded += 1
file_results.append(
FileProcessingDetail(
async def _process_file(file_path_obj: Path) -> None:
nonlocal succeeded, failed
async with semaphore:
try:
await rag.process_document_complete(
file_path=str(file_path_obj),
file_name=file_path_obj.name,
status=IndexingStatus.SUCCESS,
output_dir=output_dir,
parse_method="txt",
)
)
logger.info(
f"Indexed {file_path_obj.name} ({succeeded}/{len(all_files)})"
)
except Exception as e:
failed += 1
logger.error(f"Failed to index {file_path_obj.name}: {e}")
file_results.append(
FileProcessingDetail(
file_path=str(file_path_obj),
file_name=file_path_obj.name,
status=IndexingStatus.FAILED,
error=str(e),
succeeded += 1
file_results.append(
FileProcessingDetail(
file_path=str(file_path_obj),
file_name=file_path_obj.name,
status=IndexingStatus.SUCCESS,
)
)
)
logger.info(
f"Indexed {file_path_obj.name} ({succeeded}/{len(all_files)})"
)
except Exception as e:
failed += 1
logger.error(f"Failed to index {file_path_obj.name}: {e}")
file_results.append(
FileProcessingDetail(
file_path=str(file_path_obj),
file_name=file_path_obj.name,
status=IndexingStatus.FAILED,
error=str(e),
)
)

await asyncio.gather(*[_process_file(f) for f in all_files])

processing_time_ms = (time.time() - start_time) * 1000
total = len(all_files)
if total == 0:
status = IndexingStatus.SUCCESS
message = f"No files found in '{folder_path}'"
elif failed == 0 and succeeded > 0:
elif failed == 0:
status = IndexingStatus.SUCCESS
message = f"Successfully indexed {succeeded} file(s) from '{folder_path}'"
elif succeeded > 0 and failed > 0:
status = IndexingStatus.PARTIAL
message = f"Partially indexed: {succeeded} succeeded, {failed} failed"
else:
elif succeeded == 0:
status = IndexingStatus.FAILED
message = f"Failed to index folder '{folder_path}'"
else:
status = IndexingStatus.PARTIAL
message = f"Partially indexed: {succeeded} succeeded, {failed} failed"

return FolderIndexingResult(
status=status,
Expand Down Expand Up @@ -358,47 +323,6 @@ async def query_multimodal(
top_k=top_k,
)

# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------

@staticmethod
def _build_folder_result(
result, folder_path: str, recursive: bool, processing_time_ms: float
) -> FolderIndexingResult:
result_dict = result if isinstance(result, dict) else {}
stats = FolderIndexingStats(
total_files=result_dict.get("total_files", 0),
files_processed=result_dict.get("successful_files", 0),
files_failed=result_dict.get("failed_files", 0),
files_skipped=result_dict.get("skipped_files", 0),
)

file_results = _parse_file_details(result_dict)

if stats.files_failed == 0 and stats.files_processed > 0:
status = IndexingStatus.SUCCESS
message = f"Successfully indexed {stats.files_processed} file(s) from '{folder_path}'"
elif stats.files_processed > 0 and stats.files_failed > 0:
status = IndexingStatus.PARTIAL
message = f"Partially indexed folder '{folder_path}': {stats.files_processed} succeeded, {stats.files_failed} failed"
elif stats.files_processed == 0 and stats.total_files > 0:
status = IndexingStatus.FAILED
message = f"Failed to index any files from '{folder_path}'"
else:
status = IndexingStatus.SUCCESS
message = f"No files found to index in '{folder_path}'"

return FolderIndexingResult(
status=status,
message=message,
folder_path=folder_path,
recursive=recursive,
stats=stats,
processing_time_ms=round(processing_time_ms, 2),
file_results=file_results,
)


# ------------------------------------------------------------------
# Module-level helpers
Expand Down Expand Up @@ -430,22 +354,3 @@ def _build_vision_messages(

messages.append({"role": "user", "content": content})
return messages


def _parse_file_details(result_dict: dict) -> list[FileProcessingDetail] | None:
if "file_details" not in result_dict:
return None
file_details = result_dict["file_details"]
if not isinstance(file_details, list):
return None
return [
FileProcessingDetail(
file_path=d.get("file_path", ""),
file_name=os.path.basename(d.get("file_path", "")),
status=IndexingStatus.SUCCESS
if d.get("success", False)
else IndexingStatus.FAILED,
error=d.get("error"),
)
for d in file_details
]
Loading
Loading