Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 30 additions & 17 deletions src/alembic/versions/001_add_bm25_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,38 @@ def upgrade() -> None:
"""Add BM25 full-text search to lightrag_doc_chunks."""
op.execute("CREATE EXTENSION IF NOT EXISTS pg_textsearch")

# Guard: LightRAG creates lightrag_doc_chunks lazily on first use.
# On a fresh database the table does not exist yet, so skip the
# column/index/trigger steps. They will be applied on next run
# after LightRAG has created the table.
op.execute(
"ALTER TABLE lightrag_doc_chunks ADD COLUMN IF NOT EXISTS content_tsv tsvector"
)
"""
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_name = 'lightrag_doc_chunks'
) THEN
ALTER TABLE lightrag_doc_chunks
ADD COLUMN IF NOT EXISTS content_tsv tsvector;

op.execute(
"CREATE INDEX IF NOT EXISTS idx_lightrag_chunks_content_tsv ON lightrag_doc_chunks USING GIN(content_tsv)"
CREATE INDEX IF NOT EXISTS idx_lightrag_chunks_content_tsv
ON lightrag_doc_chunks USING GIN(content_tsv);

DROP TRIGGER IF EXISTS trg_chunks_content_tsv
ON lightrag_doc_chunks;

CREATE TRIGGER trg_chunks_content_tsv
BEFORE INSERT OR UPDATE ON lightrag_doc_chunks
FOR EACH ROW EXECUTE FUNCTION update_chunks_tsv();

UPDATE lightrag_doc_chunks
SET content_tsv = to_tsvector('english', COALESCE(content, ''))
WHERE content_tsv IS NULL;
END IF;
END;
$$
"""
)

op.execute(
Expand All @@ -40,19 +66,6 @@ def upgrade() -> None:
"""
)

op.execute("DROP TRIGGER IF EXISTS trg_chunks_content_tsv ON lightrag_doc_chunks")
op.execute(
"""
CREATE TRIGGER trg_chunks_content_tsv
BEFORE INSERT OR UPDATE ON lightrag_doc_chunks
FOR EACH ROW EXECUTE FUNCTION update_chunks_tsv();
"""
)

op.execute(
"UPDATE lightrag_doc_chunks SET content_tsv = to_tsvector('english', COALESCE(content, '')) WHERE content_tsv IS NULL"
)

op.execute("DROP TABLE IF EXISTS chunks")


Expand Down
8 changes: 7 additions & 1 deletion src/application/api/mcp_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ async def query_knowledge_base(
Args:
working_dir: RAG workspace directory for this project
query: The user's question or search query
mode: Search mode - "naive" (default), "local", "global", "hybrid", "mix"
mode: Search mode
- "naive": Vector search only
- "local": Local knowledge graph search
- "global": Global knowledge graph search
- "hybrid": Local + global knowledge graph
- "hybrid+": BM25 + vector search (parallel)
- "mix": Knowledge graph + vector chunks
top_k: Number of chunks to retrieve (default 5)

Returns:
Expand Down
6 changes: 3 additions & 3 deletions src/application/use_cases/query_use_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from domain.ports.bm25_engine import BM25EnginePort, BM25SearchResult
from domain.ports.rag_engine import RAGEnginePort
from infrastructure.hybrid.rrf_combiner import RRFCombiner
from infrastructure.rag.rrf_combiner import RRFCombiner

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -75,13 +75,13 @@ async def execute(
if mode == "hybrid+":
if self.bm25_engine is None:
return await self.rag_engine.query(
query=query, mode="naive", top_k=top_k, working_dir=working_dir
query=query, mode="hybrid", top_k=top_k, working_dir=working_dir
)

bm25_results, vector_results = await asyncio.gather(
self.bm25_engine.search(query, working_dir, top_k=top_k * 2),
self.rag_engine.query(
query=query, mode="naive", top_k=top_k * 2, working_dir=working_dir
query=query, mode="hybrid", top_k=top_k * 2, working_dir=working_dir
),
return_exceptions=True,
)
Expand Down
2 changes: 1 addition & 1 deletion src/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
RAGConfig,
)
from domain.ports.bm25_engine import BM25EnginePort
from infrastructure.bm25.pg_textsearch_adapter import PostgresBM25Adapter
from infrastructure.rag.pg_textsearch_adapter import PostgresBM25Adapter
from infrastructure.rag.lightrag_adapter import LightRAGAdapter
from infrastructure.storage.minio_adapter import MinioAdapter

Expand Down
185 changes: 45 additions & 140 deletions src/infrastructure/rag/lightrag_adapter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
import hashlib
import os
import tempfile
import time
from pathlib import Path
from typing import Literal, cast

from fastapi.logger import logger
Expand Down Expand Up @@ -139,49 +141,6 @@ async def vision_call(
)
return self.rag[working_dir]

# ------------------------------------------------------------------
# LLM callables (passed directly to RAGAnything)
# ------------------------------------------------------------------

async def _llm_call(
self, prompt, system_prompt=None, history_messages=None, **kwargs
):
if history_messages is None:
history_messages = []
return await openai_complete_if_cache(
self._llm_config.CHAT_MODEL,
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=self._llm_config.api_key,
base_url=self._llm_config.api_base_url,
**kwargs,
)

async def _vision_call(
self,
prompt,
system_prompt=None,
history_messages=None,
image_data=None,
**kwargs,
):
if history_messages is None:
history_messages = []
messages = _build_vision_messages(
system_prompt, history_messages, prompt, image_data
)
return await openai_complete_if_cache(
self._llm_config.VISION_MODEL,
"Image Description Task",
system_prompt=None,
history_messages=messages,
api_key=self._llm_config.api_key,
base_url=self._llm_config.api_base_url,
messages=messages,
**kwargs,
)

# ------------------------------------------------------------------
# Port implementation — indexing
# ------------------------------------------------------------------
Expand Down Expand Up @@ -232,74 +191,80 @@ async def index_folder(
file_extensions: list[str] | None = None,
working_dir: str = "",
) -> FolderIndexingResult:
"""Index a folder by processing each document sequentially.
"""Index a folder by processing documents concurrently.

RAGAnything's process_folder_complete uses deepcopy internally which
fails with asyncpg/asyncio objects. We iterate files manually and
call process_document_complete for each one instead.
Uses ``asyncio.Semaphore`` bounded by ``MAX_CONCURRENT_FILES`` so
that at most *N* files are processed at the same time. When
``MAX_CONCURRENT_FILES <= 1`` behaviour is identical to the old
sequential loop.
"""
start_time = time.time()
rag = self._ensure_initialized(working_dir)
await rag._ensure_lightrag_initialized()

glob_pattern = "**/*" if recursive else "*"
from pathlib import Path

folder = Path(folder_path)
all_files = [f for f in folder.glob(glob_pattern) if f.is_file()]

if file_extensions:
exts = set(file_extensions)
all_files = [f for f in all_files if f.suffix in exts]

max_workers = max(1, self._rag_config.MAX_CONCURRENT_FILES)
semaphore = asyncio.Semaphore(max_workers)

succeeded = 0
failed = 0
file_results: list[FileProcessingDetail] = []

for file_path_obj in all_files:
try:
await rag.process_document_complete(
file_path=str(file_path_obj),
output_dir=output_dir,
parse_method="txt",
)
succeeded += 1
file_results.append(
FileProcessingDetail(
async def _process_file(file_path_obj: Path) -> None:
nonlocal succeeded, failed
async with semaphore:
try:
await rag.process_document_complete(
file_path=str(file_path_obj),
file_name=file_path_obj.name,
status=IndexingStatus.SUCCESS,
output_dir=output_dir,
parse_method="txt",
)
)
logger.info(
f"Indexed {file_path_obj.name} ({succeeded}/{len(all_files)})"
)
except Exception as e:
failed += 1
logger.error(f"Failed to index {file_path_obj.name}: {e}")
file_results.append(
FileProcessingDetail(
file_path=str(file_path_obj),
file_name=file_path_obj.name,
status=IndexingStatus.FAILED,
error=str(e),
succeeded += 1
file_results.append(
FileProcessingDetail(
file_path=str(file_path_obj),
file_name=file_path_obj.name,
status=IndexingStatus.SUCCESS,
)
)
)
logger.info(
f"Indexed {file_path_obj.name} ({succeeded}/{len(all_files)})"
)
except Exception as e:
failed += 1
logger.error(f"Failed to index {file_path_obj.name}: {e}")
file_results.append(
FileProcessingDetail(
file_path=str(file_path_obj),
file_name=file_path_obj.name,
status=IndexingStatus.FAILED,
error=str(e),
)
)

await asyncio.gather(*[_process_file(f) for f in all_files])

processing_time_ms = (time.time() - start_time) * 1000
total = len(all_files)
if total == 0:
status = IndexingStatus.SUCCESS
message = f"No files found in '{folder_path}'"
elif failed == 0 and succeeded > 0:
elif failed == 0:
status = IndexingStatus.SUCCESS
message = f"Successfully indexed {succeeded} file(s) from '{folder_path}'"
elif succeeded > 0 and failed > 0:
status = IndexingStatus.PARTIAL
message = f"Partially indexed: {succeeded} succeeded, {failed} failed"
else:
elif succeeded == 0:
status = IndexingStatus.FAILED
message = f"Failed to index folder '{folder_path}'"
else:
status = IndexingStatus.PARTIAL
message = f"Partially indexed: {succeeded} succeeded, {failed} failed"

return FolderIndexingResult(
status=status,
Expand Down Expand Up @@ -358,47 +323,6 @@ async def query_multimodal(
top_k=top_k,
)

# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------

@staticmethod
def _build_folder_result(
result, folder_path: str, recursive: bool, processing_time_ms: float
) -> FolderIndexingResult:
result_dict = result if isinstance(result, dict) else {}
stats = FolderIndexingStats(
total_files=result_dict.get("total_files", 0),
files_processed=result_dict.get("successful_files", 0),
files_failed=result_dict.get("failed_files", 0),
files_skipped=result_dict.get("skipped_files", 0),
)

file_results = _parse_file_details(result_dict)

if stats.files_failed == 0 and stats.files_processed > 0:
status = IndexingStatus.SUCCESS
message = f"Successfully indexed {stats.files_processed} file(s) from '{folder_path}'"
elif stats.files_processed > 0 and stats.files_failed > 0:
status = IndexingStatus.PARTIAL
message = f"Partially indexed folder '{folder_path}': {stats.files_processed} succeeded, {stats.files_failed} failed"
elif stats.files_processed == 0 and stats.total_files > 0:
status = IndexingStatus.FAILED
message = f"Failed to index any files from '{folder_path}'"
else:
status = IndexingStatus.SUCCESS
message = f"No files found to index in '{folder_path}'"

return FolderIndexingResult(
status=status,
message=message,
folder_path=folder_path,
recursive=recursive,
stats=stats,
processing_time_ms=round(processing_time_ms, 2),
file_results=file_results,
)


# ------------------------------------------------------------------
# Module-level helpers
Expand Down Expand Up @@ -430,22 +354,3 @@ def _build_vision_messages(

messages.append({"role": "user", "content": content})
return messages


def _parse_file_details(result_dict: dict) -> list[FileProcessingDetail] | None:
if "file_details" not in result_dict:
return None
file_details = result_dict["file_details"]
if not isinstance(file_details, list):
return None
return [
FileProcessingDetail(
file_path=d.get("file_path", ""),
file_name=os.path.basename(d.get("file_path", "")),
status=IndexingStatus.SUCCESS
if d.get("success", False)
else IndexingStatus.FAILED,
error=d.get("error"),
)
for d in file_details
]
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ async def _rebuild_tsv_if_config_changed(self, conn) -> None:
WHERE content_tsv IS NOT NULL
"""
)
logger.info("Rebuilt content_tsv: %s with text_config='%s'", status, self.text_config)
logger.info(
"Rebuilt content_tsv: %s with text_config='%s'",
status,
self.text_config,
)
except Exception as e:
logger.warning("Could not check/rebuild trigger function: %s", e)

Expand Down Expand Up @@ -172,9 +176,7 @@ async def search(
ORDER BY score
LIMIT $4
"""
results = await conn.fetch(
sql, query, workspace, bm25_index, top_k
)
results = await conn.fetch(sql, query, workspace, bm25_index, top_k)

return [
BM25SearchResult(
Expand Down
Loading
Loading