diff --git a/README.md b/README.md index da20930..34ea54a 100644 --- a/README.md +++ b/README.md @@ -167,12 +167,12 @@ The service automatically detects and processes the following document formats t | Microsoft PowerPoint | `.pptx` | | | Microsoft Excel | `.xlsx` | | | HTML | `.html`, `.htm` | | -| Plain Text | `.txt`, `.text`, `.md` | UTF-8, UTF-16, ASCII supported; converted to PDF via ReportLab | -| Quarto Markdown | `.qmd` | Quarto documents | -| R Markdown | `.Rmd`, `.rmd` | R Markdown files | +| Plain Text | `.txt`, `.md` | Pre-converted to PDF by the adapter; UTF-8, UTF-16, ASCII supported | +| Quarto Markdown | `.qmd` | Not currently supported (requires raganything patch) | +| R Markdown | `.Rmd`, `.rmd` | Not currently supported (requires raganything patch) | | Images | `.png`, `.jpg`, `.jpeg`, `.gif`, `.webp`, `.bmp`, `.tiff`, `.tif` | Vision model processing (if enabled) | -**Note:** File format detection is automatic. No configuration is required to specify the document type. The service will process any supported format when indexed. All document and image formats are supported out-of-the-box when installed with `raganything[all]`. +**Note:** `.txt` and `.md` files are pre-converted to PDF by the adapter before indexing, since raganything's DoclingParser doesn't natively support text formats. Other formats are processed directly by raganything. ### Query diff --git a/src/infrastructure/rag/lightrag_adapter.py b/src/infrastructure/rag/lightrag_adapter.py index 386a301..b8e03c3 100644 --- a/src/infrastructure/rag/lightrag_adapter.py +++ b/src/infrastructure/rag/lightrag_adapter.py @@ -1,7 +1,10 @@ +import asyncio +import contextlib import hashlib import os import tempfile import time +from pathlib import Path from typing import Literal, cast from fastapi.logger import logger @@ -9,6 +12,7 @@ from lightrag.llm.openai import openai_complete_if_cache, openai_embed from lightrag.utils import EmbeddingFunc from raganything import RAGAnything, RAGAnythingConfig +from raganything.parser import Parser from application.requests.query_request import MultimodalContentItem from config import LLMConfig, RAGConfig @@ -23,6 +27,8 @@ QueryMode = Literal["local", "global", "hybrid", "naive", "mix", "bypass"] +_TEXT_EXTENSIONS = {".txt", ".md"} + _POSTGRES_STORAGE = { "kv_storage": "PGKVStorage", "vector_storage": "PGVectorStorage", @@ -58,6 +64,26 @@ def _make_workspace(working_dir: str) -> str: digest = hashlib.sha256(working_dir.encode()).hexdigest()[:16] return f"ws_{digest}" + @staticmethod + def _convert_text_to_pdf( + file_path: str, output_dir: str + ) -> tuple[str, str]: + stem = os.path.splitext(os.path.basename(file_path))[0] + unique_dir = tempfile.mkdtemp(prefix=f"rag_txt_{stem}_", dir=output_dir) + pdf_path = Parser.convert_text_to_pdf(file_path, output_dir=unique_dir) + return str(pdf_path), unique_dir + + @staticmethod + def _process_with_pdf_fallback( + file_path: str, output_dir: str + ) -> tuple[str, str | None, str | None]: + ext = os.path.splitext(file_path)[1].lower() + if ext not in _TEXT_EXTENSIONS: + return file_path, None, None + logger.info(f"Converting {ext} file to PDF for docling compatibility: {file_path}") + pdf_path, temp_dir = LightRAGAdapter._convert_text_to_pdf(file_path, output_dir) + return pdf_path, pdf_path, temp_dir + def init_project(self, working_dir: str) -> RAGAnything: if working_dir in self.rag: return self.rag[working_dir] @@ -200,9 +226,14 @@ async def index_document( start_time = time.time() rag = self._ensure_initialized(working_dir) await rag._ensure_lightrag_initialized() + temp_pdf_path: str | None = None + temp_dir: str | None = None try: + effective_path, temp_pdf_path, temp_dir = self._process_with_pdf_fallback( + file_path, output_dir + ) await rag.process_document_complete( - file_path=file_path, output_dir=output_dir, parse_method="txt" + file_path=effective_path, output_dir=output_dir, parse_method="txt" ) processing_time_ms = (time.time() - start_time) * 1000 return FileIndexingResult( @@ -223,6 +254,13 @@ async def index_document( processing_time_ms=round(processing_time_ms, 2), error=str(e), ) + finally: + if temp_pdf_path is not None: + with contextlib.suppress(OSError): + os.unlink(temp_pdf_path) + with contextlib.suppress(OSError): + if temp_dir and os.path.isdir(temp_dir): + os.rmdir(temp_dir) async def index_folder( self, @@ -232,71 +270,109 @@ async def index_folder( file_extensions: list[str] | None = None, working_dir: str = "", ) -> FolderIndexingResult: - """Index a folder by processing each document sequentially. + """Index a folder by processing documents concurrently. - RAGAnything's process_folder_complete uses deepcopy internally which - fails with asyncpg/asyncio objects. We iterate files manually and - call process_document_complete for each one instead. + Uses asyncio.Semaphore bounded by MAX_CONCURRENT_FILES to limit + concurrent process_document_complete calls. When MAX_CONCURRENT_FILES=1, + behavior is identical to sequential processing. """ start_time = time.time() rag = self._ensure_initialized(working_dir) await rag._ensure_lightrag_initialized() glob_pattern = "**/*" if recursive else "*" - from pathlib import Path - folder = Path(folder_path) - all_files = [f for f in folder.glob(glob_pattern) if f.is_file()] - - if file_extensions: - exts = set(file_extensions) - all_files = [f for f in all_files if f.suffix in exts] - - succeeded = 0 - failed = 0 - file_results: list[FileProcessingDetail] = [] - - for file_path_obj in all_files: - try: - await rag.process_document_complete( - file_path=str(file_path_obj), - output_dir=output_dir, - parse_method="txt", - ) - succeeded += 1 - file_results.append( - FileProcessingDetail( - file_path=str(file_path_obj), - file_name=file_path_obj.name, + matched = [f for f in folder.glob(glob_pattern) if f.is_file()] + + exts = set(file_extensions) if file_extensions else None + all_files = [str(f) for f in matched if exts is None or f.suffix in exts] + + if not all_files: + return FolderIndexingResult( + status=IndexingStatus.SUCCESS, + message=f"No files found in '{folder_path}'", + folder_path=folder_path, + recursive=recursive, + stats=FolderIndexingStats(), + file_results=[], + processing_time_ms=round((time.time() - start_time) * 1000, 2), + ) + + semaphore = asyncio.Semaphore(self._rag_config.MAX_CONCURRENT_FILES) + + async def _process_file(file_path_str: str) -> FileProcessingDetail: + file_name = Path(file_path_str).name + temp_pdf_path: str | None = None + temp_dir: str | None = None + async with semaphore: + try: + effective_path, temp_pdf_path, temp_dir = ( + self._process_with_pdf_fallback(file_path_str, output_dir) + ) + await rag.process_document_complete( + file_path=effective_path, + output_dir=output_dir, + parse_method="txt", + ) + logger.info(f"Indexed {file_name}") + return FileProcessingDetail( + file_path=file_path_str, + file_name=file_name, status=IndexingStatus.SUCCESS, ) - ) - logger.info( - f"Indexed {file_path_obj.name} ({succeeded}/{len(all_files)})" - ) - except Exception as e: - failed += 1 - logger.error(f"Failed to index {file_path_obj.name}: {e}") - file_results.append( - FileProcessingDetail( - file_path=str(file_path_obj), - file_name=file_path_obj.name, + except Exception as e: + logger.error(f"Failed to index {file_name}: {e}") + return FileProcessingDetail( + file_path=file_path_str, + file_name=file_name, status=IndexingStatus.FAILED, error=str(e), ) - ) + finally: + if temp_pdf_path is not None: + with contextlib.suppress(OSError): + os.unlink(temp_pdf_path) + with contextlib.suppress(OSError): + if temp_dir and os.path.isdir(temp_dir): + os.rmdir(temp_dir) + + results = await asyncio.gather( + *[_process_file(f) for f in all_files], return_exceptions=True + ) - processing_time_ms = (time.time() - start_time) * 1000 - total = len(all_files) - if failed == 0 and succeeded > 0: - status = IndexingStatus.SUCCESS - message = f"Successfully indexed {succeeded} file(s) from '{folder_path}'" + file_results = [ + r + if isinstance(r, FileProcessingDetail) + else FileProcessingDetail( + file_path="", + file_name="", + status=IndexingStatus.FAILED, + error=str(r), + ) + for r in results + ] + for r in results: + if isinstance(r, BaseException): + logger.error(f"Unexpected error in file processing: {r}") + + succeeded = sum(r.status == IndexingStatus.SUCCESS for r in file_results) + failed = len(file_results) - succeeded + + if failed == 0: + status, message = ( + IndexingStatus.SUCCESS, + f"Successfully indexed {succeeded} file(s) from '{folder_path}'", + ) elif succeeded > 0 and failed > 0: - status = IndexingStatus.PARTIAL - message = f"Partially indexed: {succeeded} succeeded, {failed} failed" + status, message = ( + IndexingStatus.PARTIAL, + f"Partially indexed: {succeeded} succeeded, {failed} failed", + ) else: - status = IndexingStatus.FAILED - message = f"Failed to index folder '{folder_path}'" + status, message = ( + IndexingStatus.FAILED, + f"Failed to index folder '{folder_path}'", + ) return FolderIndexingResult( status=status, @@ -304,13 +380,13 @@ async def index_folder( folder_path=folder_path, recursive=recursive, stats=FolderIndexingStats( - total_files=total, + total_files=len(all_files), files_processed=succeeded, files_failed=failed, files_skipped=0, ), file_results=file_results, - processing_time_ms=round(processing_time_ms, 2), + processing_time_ms=round((time.time() - start_time) * 1000, 2), ) # ------------------------------------------------------------------ diff --git a/tests/unit/test_lightrag_adapter.py b/tests/unit/test_lightrag_adapter.py index 21e2341..687d445 100644 --- a/tests/unit/test_lightrag_adapter.py +++ b/tests/unit/test_lightrag_adapter.py @@ -1,5 +1,6 @@ import os import tempfile +from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -388,7 +389,7 @@ async def test_index_txt_file_success( rag_config_postgres: RAGConfig, tmp_path, ) -> None: - """Should successfully index .txt file with parse_method='txt'.""" + """Should convert .txt to PDF then index it successfully.""" adapter = LightRAGAdapter(llm_config, rag_config_postgres) mock_rag = MagicMock() mock_rag.process_document_complete = AsyncMock() @@ -399,52 +400,76 @@ async def test_index_txt_file_success( txt_file = tmp_path / "sample.txt" txt_file.write_text("This is sample text content for testing.") - result = await adapter.index_document( - file_path=str(txt_file), - file_name="sample.txt", - output_dir=str(tmp_path), - working_dir="test_dir", - ) + # Mock the PDF conversion + fake_pdf_path = tmp_path / "sample_abc12345.pdf" + with patch( + "infrastructure.rag.lightrag_adapter.Parser" + ) as mock_parser_cls: + mock_parser_cls.convert_text_to_pdf.return_value = fake_pdf_path + + result = await adapter.index_document( + file_path=str(txt_file), + file_name="sample.txt", + output_dir=str(tmp_path), + working_dir="test_dir", + ) + + # Verify conversion was called with the txt file and a unique output_dir + convert_call = mock_parser_cls.convert_text_to_pdf.call_args + assert convert_call[0][0] == str(txt_file) + assert convert_call[1]["output_dir"].startswith(str(tmp_path)) + # Verify process_document_complete received the PDF path + mock_rag.process_document_complete.assert_awaited_once_with( + file_path=str(fake_pdf_path), + output_dir=str(tmp_path), + parse_method="txt", + ) assert result.status == IndexingStatus.SUCCESS assert result.file_name == "sample.txt" assert result.file_path == str(txt_file) assert result.processing_time_ms is not None assert result.error is None - mock_rag.process_document_complete.assert_awaited_once_with( - file_path=str(txt_file), - output_dir=str(tmp_path), - parse_method="txt", - ) - async def test_index_text_extension_success( + async def test_index_text_extension_not_converted( self, llm_config: LLMConfig, rag_config_postgres: RAGConfig, tmp_path, ) -> None: - """Should successfully index .text extension files.""" + """Should NOT convert .text files — not in _TEXT_EXTENSIONS, passes through.""" adapter = LightRAGAdapter(llm_config, rag_config_postgres) mock_rag = MagicMock() mock_rag.process_document_complete = AsyncMock() mock_rag._ensure_lightrag_initialized = AsyncMock() adapter.rag["test_dir"] = mock_rag - # Create a .text file (another TXT format) + # Create a .text file (unsupported for conversion, will be passed as-is) text_file = tmp_path / "notes.text" text_file.write_text("Notes in .text extension format.") - result = await adapter.index_document( - file_path=str(text_file), - file_name="notes.text", - output_dir=str(tmp_path), - working_dir="test_dir", - ) + with patch( + "infrastructure.rag.lightrag_adapter.Parser" + ) as mock_parser_cls: + result = await adapter.index_document( + file_path=str(text_file), + file_name="notes.text", + output_dir=str(tmp_path), + working_dir="test_dir", + ) + + # convert_text_to_pdf should NOT be called for .text extension + mock_parser_cls.convert_text_to_pdf.assert_not_called() + # The file passes through as-is (mock succeeds, real raganything would fail) assert result.status == IndexingStatus.SUCCESS assert result.file_name == "notes.text" - assert result.processing_time_ms is not None - mock_rag.process_document_complete.assert_awaited_once() + # process_document_complete receives the original path, no conversion + mock_rag.process_document_complete.assert_awaited_once_with( + file_path=str(text_file), + output_dir=str(tmp_path), + parse_method="txt", + ) async def test_index_empty_txt_file( self, @@ -452,7 +477,7 @@ async def test_index_empty_txt_file( rag_config_postgres: RAGConfig, tmp_path, ) -> None: - """Should handle empty .txt files correctly.""" + """Should handle empty .txt files correctly — converted to PDF then indexed.""" adapter = LightRAGAdapter(llm_config, rag_config_postgres) mock_rag = MagicMock() mock_rag.process_document_complete = AsyncMock() @@ -463,12 +488,18 @@ async def test_index_empty_txt_file( empty_file = tmp_path / "empty.txt" empty_file.write_text("") - result = await adapter.index_document( - file_path=str(empty_file), - file_name="empty.txt", - output_dir=str(tmp_path), - working_dir="test_dir", - ) + fake_pdf_path = tmp_path / "empty_abc12345.pdf" + with patch( + "infrastructure.rag.lightrag_adapter.Parser" + ) as mock_parser_cls: + mock_parser_cls.convert_text_to_pdf.return_value = fake_pdf_path + + result = await adapter.index_document( + file_path=str(empty_file), + file_name="empty.txt", + output_dir=str(tmp_path), + working_dir="test_dir", + ) assert result.status == IndexingStatus.SUCCESS assert result.file_name == "empty.txt" @@ -482,7 +513,7 @@ async def test_index_large_txt_file( rag_config_postgres: RAGConfig, tmp_path, ) -> None: - """Should handle large text files efficiently.""" + """Should handle large text files efficiently — converted to PDF then indexed.""" adapter = LightRAGAdapter(llm_config, rag_config_postgres) mock_rag = MagicMock() mock_rag.process_document_complete = AsyncMock() @@ -494,19 +525,25 @@ async def test_index_large_txt_file( large_content = "Line of text.\n" * 50000 # ~500KB large_file.write_text(large_content) - result = await adapter.index_document( - file_path=str(large_file), - file_name="large.txt", - output_dir=str(tmp_path), - working_dir="test_dir", - ) + fake_pdf_path = tmp_path / "large_abc12345.pdf" + with patch( + "infrastructure.rag.lightrag_adapter.Parser" + ) as mock_parser_cls: + mock_parser_cls.convert_text_to_pdf.return_value = fake_pdf_path + + result = await adapter.index_document( + file_path=str(large_file), + file_name="large.txt", + output_dir=str(tmp_path), + working_dir="test_dir", + ) assert result.status == IndexingStatus.SUCCESS assert result.file_name == "large.txt" assert result.processing_time_ms is not None - # Verify the file path was passed correctly + # Verify the PDF path was passed to process_document_complete call_args = mock_rag.process_document_complete.call_args - assert call_args[1]["file_path"] == str(large_file) + assert call_args[1]["file_path"] == str(fake_pdf_path) async def test_index_txt_with_various_encodings( self, @@ -525,37 +562,401 @@ async def test_index_txt_with_various_encodings( utf8_file = tmp_path / "utf8.txt" utf8_file.write_text("ASCII and UTF-8: café ñ 北京", encoding="utf-8") - result_utf8 = await adapter.index_document( - file_path=str(utf8_file), - file_name="utf8.txt", - output_dir=str(tmp_path), - working_dir="test_dir", - ) + fake_pdf_utf8 = tmp_path / "utf8_abc12345.pdf" + with patch( + "infrastructure.rag.lightrag_adapter.Parser" + ) as mock_parser_cls: + mock_parser_cls.convert_text_to_pdf.return_value = fake_pdf_utf8 + + result_utf8 = await adapter.index_document( + file_path=str(utf8_file), + file_name="utf8.txt", + output_dir=str(tmp_path), + working_dir="test_dir", + ) assert result_utf8.status == IndexingStatus.SUCCESS # Test UTF-16 utf16_file = tmp_path / "utf16.txt" utf16_file.write_text("UTF-16 content: 你好", encoding="utf-16") - result_utf16 = await adapter.index_document( - file_path=str(utf16_file), - file_name="utf16.txt", - output_dir=str(tmp_path), - working_dir="test_dir", - ) + fake_pdf_utf16 = tmp_path / "utf16_abc12345.pdf" + with patch( + "infrastructure.rag.lightrag_adapter.Parser" + ) as mock_parser_cls: + mock_parser_cls.convert_text_to_pdf.return_value = fake_pdf_utf16 + + result_utf16 = await adapter.index_document( + file_path=str(utf16_file), + file_name="utf16.txt", + output_dir=str(tmp_path), + working_dir="test_dir", + ) assert result_utf16.status == IndexingStatus.SUCCESS # Test ASCII ascii_file = tmp_path / "ascii.txt" ascii_file.write_text("Simple ASCII content only", encoding="ascii") - result_ascii = await adapter.index_document( - file_path=str(ascii_file), - file_name="ascii.txt", - output_dir=str(tmp_path), - working_dir="test_dir", - ) + fake_pdf_ascii = tmp_path / "ascii_abc12345.pdf" + with patch( + "infrastructure.rag.lightrag_adapter.Parser" + ) as mock_parser_cls: + mock_parser_cls.convert_text_to_pdf.return_value = fake_pdf_ascii + + result_ascii = await adapter.index_document( + file_path=str(ascii_file), + file_name="ascii.txt", + output_dir=str(tmp_path), + working_dir="test_dir", + ) assert result_ascii.status == IndexingStatus.SUCCESS # All three should be processed assert mock_rag.process_document_complete.call_count == 3 + + # ------------------------------------------------------------------ + # Pre-convert .txt/.md to PDF — TDD tests (should FAIL until implemented) + # ------------------------------------------------------------------ + + async def test_index_txt_file_converts_to_pdf_before_processing( + self, + llm_config: LLMConfig, + rag_config_postgres: RAGConfig, + tmp_path, + ) -> None: + """Should convert .txt to PDF via Parser.convert_text_to_pdf before indexing.""" + adapter = LightRAGAdapter(llm_config, rag_config_postgres) + mock_rag = MagicMock() + mock_rag.process_document_complete = AsyncMock() + mock_rag._ensure_lightrag_initialized = AsyncMock() + adapter.rag["test_dir"] = mock_rag + + # Create a real .txt file in tmp_path + txt_file = tmp_path / "fiche.txt" + txt_file.write_text("Contenu de la fiche technique.") + + # Mock the PDF conversion to return a fake PDF path + fake_pdf_path = tmp_path / "fiche_abc12345.pdf" + with patch( + "infrastructure.rag.lightrag_adapter.Parser" + ) as mock_parser_cls: + mock_parser_cls.convert_text_to_pdf.return_value = fake_pdf_path + + result = await adapter.index_document( + file_path=str(txt_file), + file_name="fiche.txt", + output_dir=str(tmp_path), + working_dir="test_dir", + ) + + # Verify convert_text_to_pdf was called with the original .txt path + convert_call = mock_parser_cls.convert_text_to_pdf.call_args + assert convert_call[0][0] == str(txt_file) + assert convert_call[1]["output_dir"].startswith(str(tmp_path)) + # Verify process_document_complete received the PDF path, NOT the .txt path + mock_rag.process_document_complete.assert_awaited_once_with( + file_path=str(fake_pdf_path), + output_dir=str(tmp_path), + parse_method="txt", + ) + # Verify result preserves original file_name and reports SUCCESS + assert result.status == IndexingStatus.SUCCESS + assert result.file_name == "fiche.txt" + assert result.file_path == str(txt_file) + + async def test_index_txt_file_cleans_up_pdf_on_success( + self, + llm_config: LLMConfig, + rag_config_postgres: RAGConfig, + tmp_path, + ) -> None: + """Should delete the temporary PDF after successful indexing.""" + adapter = LightRAGAdapter(llm_config, rag_config_postgres) + mock_rag = MagicMock() + mock_rag.process_document_complete = AsyncMock() + mock_rag._ensure_lightrag_initialized = AsyncMock() + adapter.rag["test_dir"] = mock_rag + + txt_file = tmp_path / "cleanup.txt" + txt_file.write_text("Content to be cleaned up.") + + fake_pdf_path = tmp_path / "cleanup_abc12345.pdf" + with ( + patch( + "infrastructure.rag.lightrag_adapter.Parser" + ) as mock_parser_cls, + patch("os.unlink") as mock_unlink, + ): + mock_parser_cls.convert_text_to_pdf.return_value = fake_pdf_path + + await adapter.index_document( + file_path=str(txt_file), + file_name="cleanup.txt", + output_dir=str(tmp_path), + working_dir="test_dir", + ) + + # Verify os.unlink was called on the temp PDF path + mock_unlink.assert_called_once_with(str(fake_pdf_path)) + + async def test_index_txt_file_cleans_up_pdf_on_failure( + self, + llm_config: LLMConfig, + rag_config_postgres: RAGConfig, + tmp_path, + ) -> None: + """Should delete the temporary PDF even when process_document_complete raises.""" + adapter = LightRAGAdapter(llm_config, rag_config_postgres) + mock_rag = MagicMock() + mock_rag.process_document_complete = AsyncMock( + side_effect=RuntimeError("Parsing exploded") + ) + mock_rag._ensure_lightrag_initialized = AsyncMock() + adapter.rag["test_dir"] = mock_rag + + txt_file = tmp_path / "resilient.txt" + txt_file.write_text("Content that will fail to parse.") + + fake_pdf_path = tmp_path / "resilient_abc12345.pdf" + with ( + patch( + "infrastructure.rag.lightrag_adapter.Parser" + ) as mock_parser_cls, + patch("os.unlink") as mock_unlink, + ): + mock_parser_cls.convert_text_to_pdf.return_value = fake_pdf_path + + result = await adapter.index_document( + file_path=str(txt_file), + file_name="resilient.txt", + output_dir=str(tmp_path), + working_dir="test_dir", + ) + + # The result should be FAILED, but cleanup must still have happened + assert result.status == IndexingStatus.FAILED + assert result.file_name == "resilient.txt" + mock_unlink.assert_called_once_with(str(fake_pdf_path)) + + async def test_index_md_file_converts_to_pdf( + self, + llm_config: LLMConfig, + rag_config_postgres: RAGConfig, + tmp_path, + ) -> None: + """Should convert .md files to PDF before indexing, same as .txt.""" + adapter = LightRAGAdapter(llm_config, rag_config_postgres) + mock_rag = MagicMock() + mock_rag.process_document_complete = AsyncMock() + mock_rag._ensure_lightrag_initialized = AsyncMock() + adapter.rag["test_dir"] = mock_rag + + md_file = tmp_path / "readme.md" + md_file.write_text("# Hello\n\nThis is **markdown**.") + + fake_pdf_path = tmp_path / "readme_abc12345.pdf" + with patch( + "infrastructure.rag.lightrag_adapter.Parser" + ) as mock_parser_cls: + mock_parser_cls.convert_text_to_pdf.return_value = fake_pdf_path + + result = await adapter.index_document( + file_path=str(md_file), + file_name="readme.md", + output_dir=str(tmp_path), + working_dir="test_dir", + ) + + # Verify conversion was called and the PDF path was forwarded + convert_call = mock_parser_cls.convert_text_to_pdf.call_args + assert convert_call[0][0] == str(md_file) + assert convert_call[1]["output_dir"].startswith(str(tmp_path)) + mock_rag.process_document_complete.assert_awaited_once_with( + file_path=str(fake_pdf_path), + output_dir=str(tmp_path), + parse_method="txt", + ) + assert result.status == IndexingStatus.SUCCESS + assert result.file_name == "readme.md" + + async def test_index_pdf_file_not_converted( + self, + llm_config: LLMConfig, + rag_config_postgres: RAGConfig, + tmp_path, + ) -> None: + """Should NOT attempt conversion for .pdf files — pass through unchanged.""" + adapter = LightRAGAdapter(llm_config, rag_config_postgres) + mock_rag = MagicMock() + mock_rag.process_document_complete = AsyncMock() + mock_rag._ensure_lightrag_initialized = AsyncMock() + adapter.rag["test_dir"] = mock_rag + + pdf_file = tmp_path / "report.pdf" + pdf_file.write_text("fake pdf content") + + with patch( + "infrastructure.rag.lightrag_adapter.Parser" + ) as mock_parser_cls: + result = await adapter.index_document( + file_path=str(pdf_file), + file_name="report.pdf", + output_dir=str(tmp_path), + working_dir="test_dir", + ) + + # convert_text_to_pdf must NOT be called for .pdf files + mock_parser_cls.convert_text_to_pdf.assert_not_called() + + # process_document_complete receives original path unchanged + mock_rag.process_document_complete.assert_awaited_once_with( + file_path=str(pdf_file), + output_dir=str(tmp_path), + parse_method="txt", + ) + assert result.status == IndexingStatus.SUCCESS + assert result.file_name == "report.pdf" + + async def test_index_txt_conversion_failure_returns_failed_result( + self, + llm_config: LLMConfig, + rag_config_postgres: RAGConfig, + tmp_path, + ) -> None: + """Should return FAILED when convert_text_to_pdf raises.""" + adapter = LightRAGAdapter(llm_config, rag_config_postgres) + mock_rag = MagicMock() + mock_rag.process_document_complete = AsyncMock() + mock_rag._ensure_lightrag_initialized = AsyncMock() + adapter.rag["test_dir"] = mock_rag + + txt_file = tmp_path / "broken.txt" + txt_file.write_text("Content that cannot be decoded.") + + with patch( + "infrastructure.rag.lightrag_adapter.Parser" + ) as mock_parser_cls: + mock_parser_cls.convert_text_to_pdf.side_effect = RuntimeError( + "Could not decode text file broken.txt with any supported encoding" + ) + + result = await adapter.index_document( + file_path=str(txt_file), + file_name="broken.txt", + output_dir=str(tmp_path), + working_dir="test_dir", + ) + + # Should return FAILED with the error message + assert result.status == IndexingStatus.FAILED + assert result.file_name == "broken.txt" + assert result.error is not None + assert "Could not decode" in result.error + # process_document_complete should NOT have been called + mock_rag.process_document_complete.assert_not_awaited() + + async def test_index_folder_txt_file_converts_to_pdf( + self, + llm_config: LLMConfig, + rag_config_postgres: RAGConfig, + tmp_path, + ) -> None: + """Should convert .txt files to PDF when indexing via index_folder.""" + adapter = LightRAGAdapter(llm_config, rag_config_postgres) + mock_rag = MagicMock() + mock_rag.process_document_complete = AsyncMock() + mock_rag._ensure_lightrag_initialized = AsyncMock() + adapter.rag["test_dir"] = mock_rag + + # Create a folder with a .txt file + folder = tmp_path / "docs_folder" + folder.mkdir() + txt_file = folder / "notes.txt" + txt_file.write_text("Some text content.") + + fake_pdf_path = tmp_path / "notes_abc12345.pdf" + with patch( + "infrastructure.rag.lightrag_adapter.Parser" + ) as mock_parser_cls: + mock_parser_cls.convert_text_to_pdf.return_value = fake_pdf_path + + result = await adapter.index_folder( + folder_path=str(folder), + output_dir=str(tmp_path), + working_dir="test_dir", + ) + + # Conversion should have happened + convert_call = mock_parser_cls.convert_text_to_pdf.call_args + assert convert_call[0][0] == str(txt_file) + assert convert_call[1]["output_dir"].startswith(str(tmp_path)) + # process_document_complete should have received the PDF path + mock_rag.process_document_complete.assert_awaited_once_with( + file_path=str(fake_pdf_path), + output_dir=str(tmp_path), + parse_method="txt", + ) + assert result.status == IndexingStatus.SUCCESS + assert result.stats.files_processed == 1 + + async def test_index_txt_file_unique_pdf_name( + self, + llm_config: LLMConfig, + rag_config_postgres: RAGConfig, + tmp_path, + ) -> None: + """Should use a unique PDF filename (UUID suffix) to avoid concurrent collisions.""" + adapter = LightRAGAdapter(llm_config, rag_config_postgres) + mock_rag = MagicMock() + mock_rag.process_document_complete = AsyncMock() + mock_rag._ensure_lightrag_initialized = AsyncMock() + adapter.rag["test_dir"] = mock_rag + + txt_file = tmp_path / "concurrent.txt" + txt_file.write_text("Same file indexed concurrently.") + + # Capture what output_dir is passed to convert_text_to_pdf. + # The adapter should use a tempdir or unique output_dir so the PDF + # gets a unique name (e.g., {stem}_{uuid8}.pdf). + conversion_calls: list[tuple] = [] + + def fake_convert(text_path, output_dir=None): + conversion_calls.append((text_path, output_dir)) + # Simulate what raganything would produce with a unique name + stem = Path(text_path).stem + return Path(output_dir) / f"{stem}_a1b2c3d4.pdf" if output_dir else None + + with patch( + "infrastructure.rag.lightrag_adapter.Parser" + ) as mock_parser_cls: + mock_parser_cls.convert_text_to_pdf.side_effect = fake_convert + + result = await adapter.index_document( + file_path=str(txt_file), + file_name="concurrent.txt", + output_dir=str(tmp_path), + working_dir="test_dir", + ) + + assert result.status == IndexingStatus.SUCCESS + + # The key assertion: convert_text_to_pdf must have been called, + # and the result PDF path must differ from a naive "{stem}.pdf". + # The adapter is responsible for ensuring the output uses a unique name + # so that concurrent index_document calls for the same .txt file + # do not collide on the same PDF path. + assert len(conversion_calls) == 1 + _text_path_arg, output_dir_arg = conversion_calls[0] + + # The PDF path produced by the conversion must contain a unique suffix + # (not just "concurrent.pdf" which would collide under concurrency). + pdf_path_used = mock_rag.process_document_complete.call_args[1]["file_path"] + pdf_name = Path(pdf_path_used).name + assert pdf_name.startswith("concurrent") + assert pdf_name.endswith(".pdf") + # The PDF name should include a UUID-like suffix, not be just "concurrent.pdf" + assert pdf_name != "concurrent.pdf", ( + "PDF filename must include a unique suffix to avoid concurrent collisions, " + f"got: {pdf_name}" + )