Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,12 @@ The service automatically detects and processes the following document formats t
| Microsoft PowerPoint | `.pptx` | |
| Microsoft Excel | `.xlsx` | |
| HTML | `.html`, `.htm` | |
| Plain Text | `.txt`, `.text`, `.md` | UTF-8, UTF-16, ASCII supported; converted to PDF via ReportLab |
| Quarto Markdown | `.qmd` | Quarto documents |
| R Markdown | `.Rmd`, `.rmd` | R Markdown files |
| Plain Text | `.txt`, `.md` | Pre-converted to PDF by the adapter; UTF-8, UTF-16, ASCII supported |
| Quarto Markdown | `.qmd` | Not currently supported (requires raganything patch) |
| R Markdown | `.Rmd`, `.rmd` | Not currently supported (requires raganything patch) |
| Images | `.png`, `.jpg`, `.jpeg`, `.gif`, `.webp`, `.bmp`, `.tiff`, `.tif` | Vision model processing (if enabled) |

**Note:** File format detection is automatic. No configuration is required to specify the document type. The service will process any supported format when indexed. All document and image formats are supported out-of-the-box when installed with `raganything[all]`.
**Note:** `.txt` and `.md` files are pre-converted to PDF by the adapter before indexing, since raganything's DoclingParser doesn't natively support text formats. Other formats are processed directly by raganything.

### Query

Expand Down
180 changes: 128 additions & 52 deletions src/infrastructure/rag/lightrag_adapter.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import asyncio
import contextlib
import hashlib
import os
import tempfile
import time
from pathlib import Path
from typing import Literal, cast

from fastapi.logger import logger
from lightrag import QueryParam
from lightrag.llm.openai import openai_complete_if_cache, openai_embed
from lightrag.utils import EmbeddingFunc
from raganything import RAGAnything, RAGAnythingConfig
from raganything.parser import Parser

from application.requests.query_request import MultimodalContentItem
from config import LLMConfig, RAGConfig
Expand All @@ -23,6 +27,8 @@

QueryMode = Literal["local", "global", "hybrid", "naive", "mix", "bypass"]

_TEXT_EXTENSIONS = {".txt", ".md"}

_POSTGRES_STORAGE = {
"kv_storage": "PGKVStorage",
"vector_storage": "PGVectorStorage",
Expand Down Expand Up @@ -58,6 +64,26 @@ def _make_workspace(working_dir: str) -> str:
digest = hashlib.sha256(working_dir.encode()).hexdigest()[:16]
return f"ws_{digest}"

@staticmethod
def _convert_text_to_pdf(
file_path: str, output_dir: str
) -> tuple[str, str]:
stem = os.path.splitext(os.path.basename(file_path))[0]
unique_dir = tempfile.mkdtemp(prefix=f"rag_txt_{stem}_", dir=output_dir)
pdf_path = Parser.convert_text_to_pdf(file_path, output_dir=unique_dir)
return str(pdf_path), unique_dir

@staticmethod
def _process_with_pdf_fallback(
file_path: str, output_dir: str
) -> tuple[str, str | None, str | None]:
ext = os.path.splitext(file_path)[1].lower()
if ext not in _TEXT_EXTENSIONS:
return file_path, None, None
logger.info(f"Converting {ext} file to PDF for docling compatibility: {file_path}")
pdf_path, temp_dir = LightRAGAdapter._convert_text_to_pdf(file_path, output_dir)
return pdf_path, pdf_path, temp_dir

def init_project(self, working_dir: str) -> RAGAnything:
if working_dir in self.rag:
return self.rag[working_dir]
Expand Down Expand Up @@ -200,9 +226,14 @@ async def index_document(
start_time = time.time()
rag = self._ensure_initialized(working_dir)
await rag._ensure_lightrag_initialized()
temp_pdf_path: str | None = None
temp_dir: str | None = None
try:
effective_path, temp_pdf_path, temp_dir = self._process_with_pdf_fallback(
file_path, output_dir
)
await rag.process_document_complete(
file_path=file_path, output_dir=output_dir, parse_method="txt"
file_path=effective_path, output_dir=output_dir, parse_method="txt"
)
processing_time_ms = (time.time() - start_time) * 1000
return FileIndexingResult(
Expand All @@ -223,6 +254,13 @@ async def index_document(
processing_time_ms=round(processing_time_ms, 2),
error=str(e),
)
finally:
if temp_pdf_path is not None:
with contextlib.suppress(OSError):
os.unlink(temp_pdf_path)
with contextlib.suppress(OSError):
if temp_dir and os.path.isdir(temp_dir):
os.rmdir(temp_dir)

async def index_folder(
self,
Expand All @@ -232,85 +270,123 @@ async def index_folder(
file_extensions: list[str] | None = None,
working_dir: str = "",
) -> FolderIndexingResult:
"""Index a folder by processing each document sequentially.
"""Index a folder by processing documents concurrently.

RAGAnything's process_folder_complete uses deepcopy internally which
fails with asyncpg/asyncio objects. We iterate files manually and
call process_document_complete for each one instead.
Uses asyncio.Semaphore bounded by MAX_CONCURRENT_FILES to limit
concurrent process_document_complete calls. When MAX_CONCURRENT_FILES=1,
behavior is identical to sequential processing.
"""
start_time = time.time()
rag = self._ensure_initialized(working_dir)
await rag._ensure_lightrag_initialized()

glob_pattern = "**/*" if recursive else "*"
from pathlib import Path

folder = Path(folder_path)
all_files = [f for f in folder.glob(glob_pattern) if f.is_file()]

if file_extensions:
exts = set(file_extensions)
all_files = [f for f in all_files if f.suffix in exts]

succeeded = 0
failed = 0
file_results: list[FileProcessingDetail] = []

for file_path_obj in all_files:
try:
await rag.process_document_complete(
file_path=str(file_path_obj),
output_dir=output_dir,
parse_method="txt",
)
succeeded += 1
file_results.append(
FileProcessingDetail(
file_path=str(file_path_obj),
file_name=file_path_obj.name,
matched = [f for f in folder.glob(glob_pattern) if f.is_file()]

exts = set(file_extensions) if file_extensions else None
all_files = [str(f) for f in matched if exts is None or f.suffix in exts]

if not all_files:
return FolderIndexingResult(
status=IndexingStatus.SUCCESS,
message=f"No files found in '{folder_path}'",
folder_path=folder_path,
recursive=recursive,
stats=FolderIndexingStats(),
file_results=[],
processing_time_ms=round((time.time() - start_time) * 1000, 2),
)

semaphore = asyncio.Semaphore(self._rag_config.MAX_CONCURRENT_FILES)

async def _process_file(file_path_str: str) -> FileProcessingDetail:
file_name = Path(file_path_str).name
temp_pdf_path: str | None = None
temp_dir: str | None = None
async with semaphore:
try:
effective_path, temp_pdf_path, temp_dir = (
self._process_with_pdf_fallback(file_path_str, output_dir)
)
await rag.process_document_complete(
file_path=effective_path,
output_dir=output_dir,
parse_method="txt",
)
logger.info(f"Indexed {file_name}")
return FileProcessingDetail(
file_path=file_path_str,
file_name=file_name,
status=IndexingStatus.SUCCESS,
)
)
logger.info(
f"Indexed {file_path_obj.name} ({succeeded}/{len(all_files)})"
)
except Exception as e:
failed += 1
logger.error(f"Failed to index {file_path_obj.name}: {e}")
file_results.append(
FileProcessingDetail(
file_path=str(file_path_obj),
file_name=file_path_obj.name,
except Exception as e:
logger.error(f"Failed to index {file_name}: {e}")
return FileProcessingDetail(
file_path=file_path_str,
file_name=file_name,
status=IndexingStatus.FAILED,
error=str(e),
)
)
finally:
if temp_pdf_path is not None:
with contextlib.suppress(OSError):
os.unlink(temp_pdf_path)
with contextlib.suppress(OSError):
if temp_dir and os.path.isdir(temp_dir):
os.rmdir(temp_dir)

results = await asyncio.gather(
*[_process_file(f) for f in all_files], return_exceptions=True
)

processing_time_ms = (time.time() - start_time) * 1000
total = len(all_files)
if failed == 0 and succeeded > 0:
status = IndexingStatus.SUCCESS
message = f"Successfully indexed {succeeded} file(s) from '{folder_path}'"
file_results = [
r
if isinstance(r, FileProcessingDetail)
else FileProcessingDetail(
file_path="<unknown>",
file_name="<unknown>",
status=IndexingStatus.FAILED,
error=str(r),
)
for r in results
]
for r in results:
if isinstance(r, BaseException):
logger.error(f"Unexpected error in file processing: {r}")

succeeded = sum(r.status == IndexingStatus.SUCCESS for r in file_results)
failed = len(file_results) - succeeded

if failed == 0:
status, message = (
IndexingStatus.SUCCESS,
f"Successfully indexed {succeeded} file(s) from '{folder_path}'",
)
elif succeeded > 0 and failed > 0:
status = IndexingStatus.PARTIAL
message = f"Partially indexed: {succeeded} succeeded, {failed} failed"
status, message = (
IndexingStatus.PARTIAL,
f"Partially indexed: {succeeded} succeeded, {failed} failed",
)
else:
status = IndexingStatus.FAILED
message = f"Failed to index folder '{folder_path}'"
status, message = (
IndexingStatus.FAILED,
f"Failed to index folder '{folder_path}'",
)

return FolderIndexingResult(
status=status,
message=message,
folder_path=folder_path,
recursive=recursive,
stats=FolderIndexingStats(
total_files=total,
total_files=len(all_files),
files_processed=succeeded,
files_failed=failed,
files_skipped=0,
),
file_results=file_results,
processing_time_ms=round(processing_time_ms, 2),
processing_time_ms=round((time.time() - start_time) * 1000, 2),
)

# ------------------------------------------------------------------
Expand Down
Loading
Loading