diff --git a/extralit-server/CHANGELOG.md b/extralit-server/CHANGELOG.md index 2f5c61999..6f4462aef 100644 --- a/extralit-server/CHANGELOG.md +++ b/extralit-server/CHANGELOG.md @@ -18,6 +18,13 @@ These are the section headers that we use: ### Added - Refactor document analysis and preprocessing job to support asynchronous s3 IO operations and large file processing. +- Added text chunking pipeline using chonkie `RecursiveChunker` with a new RQ job (`process_text_extraction_result_job`) that extracts text from OCR JSON results, chunks it, and persists chunks to `documents.metadata_` in the database. +- Added `ChunkMetadata` schema and `chunks` field to `TextExtractionMetadata` for storing text chunks alongside extracted text. +- Added `chonkie>=1.0.2` as a dependency. + +### Removed + +- Removed `InMemoryChunkStore` (`chunk_store.py`) — chunks are now persisted to the database instead of a process-local dict. ### Fixed diff --git a/extralit-server/pyproject.toml b/extralit-server/pyproject.toml index 767c557d1..a962644ab 100644 --- a/extralit-server/pyproject.toml +++ b/extralit-server/pyproject.toml @@ -72,6 +72,7 @@ dependencies = [ "aioboto3>=13.1.1", "types-aiobotocore-s3==2.24.2", # For document processing + "chonkie>=1.0.2", "ocrmypdf>=16.11.0", "pdf2image>=1.17.0", "opencv-python-headless>=4.11.0.86", diff --git a/extralit-server/src/extralit_server/api/schemas/v1/document/metadata.py b/extralit-server/src/extralit_server/api/schemas/v1/document/metadata.py index d9a3e10b3..a2459dc14 100644 --- a/extralit-server/src/extralit_server/api/schemas/v1/document/metadata.py +++ b/extralit-server/src/extralit_server/api/schemas/v1/document/metadata.py @@ -58,11 +58,21 @@ class PreprocessingMetadata(BaseModel): processed_s3_url: Optional[str] = Field(None, description="S3 URL of processed PDF") +class ChunkMetadata(BaseModel): + """A single text chunk produced by the chunker.""" + + text: str = Field(..., description="Chunk text content") + start_index: int = Field(..., description="Character start offset in source text") + end_index: int = Field(..., description="Character end offset in source text") + token_count: int = Field(..., description="Number of tokens in the chunk") + + class TextExtractionMetadata(BaseModel): """Text extraction job results.""" markdown: str = Field(None, description="Extracted text") extraction_method: str = Field(..., description="Method used for extraction") + chunks: Optional[list[ChunkMetadata]] = Field(None, description="Text chunks for embedding / retrieval") class DocumentProcessingMetadata(BaseModel): @@ -111,6 +121,16 @@ def update_preprocessing_results(self, preprocess_result: dict) -> None: processed_s3_url=preprocess_result.get("processed_s3_url"), ) + def update_text_extraction_results( + self, text: str, chunks: list[dict], extraction_method: str = "external_ocr" + ) -> None: + """Update text extraction metadata including chunks.""" + self.text_extraction_metadata = TextExtractionMetadata( + markdown=text, + extraction_method=extraction_method, + chunks=[ChunkMetadata(**c) for c in chunks], + ) + def is_workflow_complete(self) -> bool: """Check if all workflow steps are complete.""" return all( diff --git a/extralit-server/src/extralit_server/contexts/document/chunker.py b/extralit-server/src/extralit_server/contexts/document/chunker.py new file mode 100644 index 000000000..e9be0db03 --- /dev/null +++ b/extralit-server/src/extralit_server/contexts/document/chunker.py @@ -0,0 +1,67 @@ +# Copyright 2024-present, Extralit Labs, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Document text chunking using chonkie's RecursiveChunker. + +See https://docs.chonkie.ai/oss/chunkers/recursive-chunker +""" + +from __future__ import annotations + +import logging +from typing import Any + +from chonkie import RecursiveChunker + +_LOGGER = logging.getLogger(__name__) + +DEFAULT_CHUNK_SIZE = 512 + + +def chunk_text( + text: str, + *, + chunk_size: int = DEFAULT_CHUNK_SIZE, +) -> list[dict[str, Any]]: + """Chunk *text* using chonkie's ``RecursiveChunker``. + + Returns a list of plain dicts (JSON-serialisable) with keys: + - ``text``: the chunk content + - ``start_index``: character start offset in the original text + - ``end_index``: character end offset in the original text + - ``token_count``: number of tokens in the chunk + """ + if not text or not text.strip(): + return [] + + chunker = RecursiveChunker( + chunk_size=chunk_size, + ) + + chunks = chunker.chunk(text) + + result: list[dict[str, Any]] = [] + for chunk in chunks: + result.append({ + "text": chunk.text, + "start_index": chunk.start_index, + "end_index": chunk.end_index, + "token_count": chunk.token_count, + }) + + _LOGGER.debug( + "Chunked %d chars into %d chunks (size=%d)", + len(text), len(result), chunk_size, + ) + return result diff --git a/extralit-server/src/extralit_server/jobs/document_jobs.py b/extralit-server/src/extralit_server/jobs/document_jobs.py index 51e4a073d..4092b486a 100644 --- a/extralit-server/src/extralit_server/jobs/document_jobs.py +++ b/extralit-server/src/extralit_server/jobs/document_jobs.py @@ -23,9 +23,12 @@ from rq import Retry, get_current_job from rq.decorators import job +import re + from extralit_server.api.schemas.v1.document.metadata import DocumentProcessingMetadata from extralit_server.contexts import files from extralit_server.contexts.document.analysis import PDFOCRLayerDetector +from extralit_server.contexts.document.chunker import chunk_text from extralit_server.contexts.document.margin import PDFAnalyzer from extralit_server.contexts.document.preprocessing import PDFPreprocessingSettings, PDFPreprocessor from extralit_server.database import AsyncSessionLocal @@ -189,3 +192,101 @@ async def analysis_and_preprocess_job( current_job.meta["error"] = str(e) current_job.save_meta() raise + + +@job(queue=DEFAULT_QUEUE, connection=REDIS_CONNECTION, timeout=600, retry=Retry(max=3, interval=[10, 30, 60])) +async def process_text_extraction_result_job(document_id: UUID, extraction_result: dict) -> dict[str, Any]: + """Process text-extraction JSON, chunk text with chonkie, and persist to DB. + + Accepts the JSON result produced by an external OCR / text-extraction + worker (e.g. ``extralit_ocr.jobs.pymupdf_to_markdown_job``). The + ``extraction_result`` dict may have one of these shapes: + + * ``{"markdown": "..."}`` + * ``{"text": "..."}`` + * Marker-style ``{"pages": [{"blocks": [{"text"|"content"|"markdown": ...}]}]}`` + + Steps + ----- + 1. Extract plain text from whichever JSON shape is provided. + 2. Create text chunks using **chonkie** ``RecursiveChunker`` + (via :func:`~extralit_server.contexts.document.chunker.chunk_text`). + 3. Persist the extracted text **and** chunks into + ``documents.metadata_`` → ``text_extraction_metadata`` in the DB. + """ + current_job = get_current_job() + if current_job is None: + raise Exception("No current job found") + + current_job.meta.update({"document_id": str(document_id), "workflow_step": "process_text_extraction"}) + current_job.save_meta() + + try: + text = _extract_text_from_result(extraction_result) + + # Chunk the text using chonkie RecursiveChunker + chunks = chunk_text(text or "") + + # Persist text + chunks into document metadata in the database + async with AsyncSessionLocal() as db: + document = await db.get(Document, document_id) + if document: + if document.metadata_ is None: + document.metadata_ = DocumentProcessingMetadata().model_dump() + + metadata = DocumentProcessingMetadata(**document.metadata_) + metadata.update_text_extraction_results( + text=text or "", + chunks=chunks, + extraction_method="external_ocr", + ) + document.metadata_ = metadata.model_dump() + await db.commit() + + current_job.meta["chunks_count"] = len(chunks) + current_job.save_meta() + + return {"document_id": str(document_id), "chunks_count": len(chunks)} + + except Exception as e: + _LOGGER.error(f"Error processing text extraction for document {document_id}: {e}") + current_job.meta["error"] = str(e) + current_job.save_meta() + raise + + +def _extract_text_from_result(extraction_result: dict) -> str: + """Pull plain text out of the various JSON shapes produced by OCR workers.""" + if not extraction_result: + return "" + + # Direct string fields + for key in ("markdown", "text", "content", "value"): + val = extraction_result.get(key) + if isinstance(val, str): + return val + + # Marker-style pages → blocks + if "pages" in extraction_result and isinstance(extraction_result["pages"], list): + parts: list[str] = [] + for page in extraction_result["pages"]: + blocks = page.get("blocks", []) if isinstance(page, dict) else [] + for block in blocks: + if not isinstance(block, dict): + continue + content = ( + block.get("markdown") + or block.get("text") + or block.get("content") + or block.get("html") + or "" + ) + # Strip basic HTML tags if present + if content and "<" in content and ">" in content: + content = re.sub(r"<[^>]+>", "", content) + if content: + parts.append(content) + return "\n\n".join(parts) + + # Absolute fallback + return str(extraction_result) diff --git a/extralit-server/src/extralit_server/workflows/documents.py b/extralit-server/src/extralit_server/workflows/documents.py index 71251b2db..973b485eb 100644 --- a/extralit-server/src/extralit_server/workflows/documents.py +++ b/extralit-server/src/extralit_server/workflows/documents.py @@ -19,7 +19,7 @@ from rq.group import Group from extralit_server.database import AsyncSessionLocal -from extralit_server.jobs.document_jobs import analysis_and_preprocess_job +from extralit_server.jobs.document_jobs import analysis_and_preprocess_job, process_text_extraction_result_job from extralit_server.jobs.queues import DEFAULT_QUEUE, OCR_QUEUE, REDIS_CONNECTION from extralit_server.models.database import DocumentWorkflow @@ -93,6 +93,12 @@ async def create_document_workflow( group.enqueue_many(queue=DEFAULT_QUEUE, job_datas=[analysis_job_data]) group.enqueue_many(queue=OCR_QUEUE, job_datas=[text_extraction_job_data]) + # Step 5: Chunking job — runs after text extraction completes. + # `process_text_extraction_result_job` is invoked with the OCR result + # (the dict returned by pymupdf_to_markdown_job) once that job finishes. + # Wiring via RQ Callback or a dependent enqueue is done by the caller + # that has access to the text-extraction result. + # Step 6: Future table extraction job (conditional based on analysis results) # This will be added when table extraction is implemented # table_extraction_job_data = OCR_QUEUE.prepare_data( diff --git a/extralit-server/tests/unit/contexts/document/__init__.py b/extralit-server/tests/unit/contexts/document/__init__.py new file mode 100644 index 000000000..ebe834147 --- /dev/null +++ b/extralit-server/tests/unit/contexts/document/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024-present, Extralit Labs, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/extralit-server/tests/unit/contexts/document/test_chunker.py b/extralit-server/tests/unit/contexts/document/test_chunker.py new file mode 100644 index 000000000..ab02cbcbb --- /dev/null +++ b/extralit-server/tests/unit/contexts/document/test_chunker.py @@ -0,0 +1,85 @@ +# Copyright 2024-present, Extralit Labs, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the chonkie-based text chunker.""" + +import pytest + +from extralit_server.contexts.document.chunker import chunk_text + + +class TestChunkText: + """Tests for chunk_text using chonkie RecursiveChunker.""" + + def test_empty_string_returns_empty(self): + assert chunk_text("") == [] + + def test_whitespace_only_returns_empty(self): + assert chunk_text(" \n\t ") == [] + + def test_short_text_single_chunk(self): + text = "Hello world. This is a short sentence." + chunks = chunk_text(text, chunk_size=512) + assert len(chunks) >= 1 + # The full text should appear in the chunk(s) + combined = " ".join(c["text"] for c in chunks) + assert "Hello world" in combined + + def test_chunk_dict_keys(self): + text = "Some sample text for testing the chunker output format." + chunks = chunk_text(text, chunk_size=512) + assert len(chunks) >= 1 + for chunk in chunks: + assert "text" in chunk + assert "start_index" in chunk + assert "end_index" in chunk + assert "token_count" in chunk + assert isinstance(chunk["text"], str) + assert isinstance(chunk["start_index"], int) + assert isinstance(chunk["end_index"], int) + assert isinstance(chunk["token_count"], int) + + def test_long_text_produces_multiple_chunks(self): + # Create text that exceeds a single chunk + text = ("This is a paragraph of text. " * 200) + chunks = chunk_text(text, chunk_size=64) + assert len(chunks) > 1 + + def test_token_count_positive(self): + text = "The quick brown fox jumps over the lazy dog." + chunks = chunk_text(text, chunk_size=512) + for chunk in chunks: + assert chunk["token_count"] > 0 + + def test_start_end_index_ordering(self): + text = ("Sentence number one. " * 100) + chunks = chunk_text(text, chunk_size=32) + for chunk in chunks: + assert chunk["start_index"] < chunk["end_index"] + + def test_chunks_are_json_serialisable(self): + import json + + text = "A simple test for JSON serialisation." + chunks = chunk_text(text, chunk_size=512) + # Should not raise + serialised = json.dumps(chunks) + assert isinstance(serialised, str) + + def test_custom_chunk_size(self): + text = ("Word " * 500) + chunks_small = chunk_text(text, chunk_size=32) + chunks_large = chunk_text(text, chunk_size=256) + # Smaller chunk size ⇒ more chunks + assert len(chunks_small) > len(chunks_large) diff --git a/extralit-server/tests/unit/contexts/document/test_text_extraction.py b/extralit-server/tests/unit/contexts/document/test_text_extraction.py new file mode 100644 index 000000000..b35b7231d --- /dev/null +++ b/extralit-server/tests/unit/contexts/document/test_text_extraction.py @@ -0,0 +1,122 @@ +# Copyright 2024-present, Extralit Labs, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for TextExtractionMetadata with chunks and the RQ job helper.""" + +import pytest + +from extralit_server.api.schemas.v1.document.metadata import ( + ChunkMetadata, + DocumentProcessingMetadata, + TextExtractionMetadata, +) +from extralit_server.jobs.document_jobs import _extract_text_from_result + + +# --------------------------------------------------------------------------- +# ChunkMetadata / TextExtractionMetadata schema tests +# --------------------------------------------------------------------------- + + +class TestChunkMetadata: + def test_create_chunk_metadata(self): + cm = ChunkMetadata(text="hello", start_index=0, end_index=5, token_count=1) + assert cm.text == "hello" + assert cm.start_index == 0 + + def test_text_extraction_with_chunks(self): + chunks = [ + ChunkMetadata(text="chunk1", start_index=0, end_index=6, token_count=1), + ChunkMetadata(text="chunk2", start_index=6, end_index=12, token_count=1), + ] + tem = TextExtractionMetadata( + markdown="chunk1chunk2", + extraction_method="external_ocr", + chunks=chunks, + ) + assert len(tem.chunks) == 2 + assert tem.chunks[0].text == "chunk1" + + def test_text_extraction_chunks_default_none(self): + tem = TextExtractionMetadata(markdown="hi", extraction_method="ocr") + assert tem.chunks is None + + +class TestUpdateTextExtractionResults: + def test_updates_metadata(self): + meta = DocumentProcessingMetadata() + chunks = [ + {"text": "a", "start_index": 0, "end_index": 1, "token_count": 1}, + {"text": "b", "start_index": 1, "end_index": 2, "token_count": 1}, + ] + meta.update_text_extraction_results(text="ab", chunks=chunks) + assert meta.text_extraction_metadata is not None + assert meta.text_extraction_metadata.markdown == "ab" + assert len(meta.text_extraction_metadata.chunks) == 2 + assert meta.text_extraction_metadata.extraction_method == "external_ocr" + + def test_roundtrip_serialisation(self): + meta = DocumentProcessingMetadata() + chunks = [{"text": "x", "start_index": 0, "end_index": 1, "token_count": 1}] + meta.update_text_extraction_results(text="x", chunks=chunks) + dumped = meta.model_dump() + restored = DocumentProcessingMetadata(**dumped) + assert restored.text_extraction_metadata.chunks[0].text == "x" + + +# --------------------------------------------------------------------------- +# _extract_text_from_result helper tests +# --------------------------------------------------------------------------- + + +class TestExtractTextFromResult: + def test_empty_dict(self): + assert _extract_text_from_result({}) == "" + + def test_none(self): + assert _extract_text_from_result(None) == "" + + def test_markdown_key(self): + assert _extract_text_from_result({"markdown": "# Title"}) == "# Title" + + def test_text_key(self): + assert _extract_text_from_result({"text": "hello"}) == "hello" + + def test_content_key(self): + assert _extract_text_from_result({"content": "body"}) == "body" + + def test_pages_blocks_text(self): + result = { + "pages": [ + {"blocks": [{"text": "block1"}, {"text": "block2"}]}, + {"blocks": [{"text": "block3"}]}, + ] + } + text = _extract_text_from_result(result) + assert "block1" in text + assert "block2" in text + assert "block3" in text + + def test_pages_blocks_html_stripped(self): + result = { + "pages": [{"blocks": [{"html": "

hello

"}]}] + } + text = _extract_text_from_result(result) + assert "

" not in text + assert "hello" in text + + def test_fallback_stringifies(self): + result = {"unknown_key": 42} + text = _extract_text_from_result(result) + assert "42" in text