Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions extralit-server/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ These are the section headers that we use:
### Added

- Refactor document analysis and preprocessing job to support asynchronous s3 IO operations and large file processing.
- Added text chunking pipeline using chonkie `RecursiveChunker` with a new RQ job (`process_text_extraction_result_job`) that extracts text from OCR JSON results, chunks it, and persists chunks to `documents.metadata_` in the database.
- Added `ChunkMetadata` schema and `chunks` field to `TextExtractionMetadata` for storing text chunks alongside extracted text.
- Added `chonkie>=1.0.2` as a dependency.

### Removed

- Removed `InMemoryChunkStore` (`chunk_store.py`) — chunks are now persisted to the database instead of a process-local dict.

### Fixed

Expand Down
1 change: 1 addition & 0 deletions extralit-server/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ dependencies = [
"aioboto3>=13.1.1",
"types-aiobotocore-s3==2.24.2",
# For document processing
"chonkie>=1.0.2",
"ocrmypdf>=16.11.0",
"pdf2image>=1.17.0",
"opencv-python-headless>=4.11.0.86",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,21 @@ class PreprocessingMetadata(BaseModel):
processed_s3_url: Optional[str] = Field(None, description="S3 URL of processed PDF")


class ChunkMetadata(BaseModel):
"""A single text chunk produced by the chunker."""

text: str = Field(..., description="Chunk text content")
start_index: int = Field(..., description="Character start offset in source text")
end_index: int = Field(..., description="Character end offset in source text")
token_count: int = Field(..., description="Number of tokens in the chunk")


class TextExtractionMetadata(BaseModel):
"""Text extraction job results."""

markdown: str = Field(None, description="Extracted text")
extraction_method: str = Field(..., description="Method used for extraction")
chunks: Optional[list[ChunkMetadata]] = Field(None, description="Text chunks for embedding / retrieval")


class DocumentProcessingMetadata(BaseModel):
Expand Down Expand Up @@ -111,6 +121,16 @@ def update_preprocessing_results(self, preprocess_result: dict) -> None:
processed_s3_url=preprocess_result.get("processed_s3_url"),
)

def update_text_extraction_results(
self, text: str, chunks: list[dict], extraction_method: str = "external_ocr"
) -> None:
"""Update text extraction metadata including chunks."""
self.text_extraction_metadata = TextExtractionMetadata(
markdown=text,
extraction_method=extraction_method,
chunks=[ChunkMetadata(**c) for c in chunks],
)

def is_workflow_complete(self) -> bool:
"""Check if all workflow steps are complete."""
return all(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2024-present, Extralit Labs, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Document text chunking using chonkie's RecursiveChunker.

See https://docs.chonkie.ai/oss/chunkers/recursive-chunker
"""

from __future__ import annotations

import logging
from typing import Any

from chonkie import RecursiveChunker

_LOGGER = logging.getLogger(__name__)

DEFAULT_CHUNK_SIZE = 512


def chunk_text(
text: str,
*,
chunk_size: int = DEFAULT_CHUNK_SIZE,
) -> list[dict[str, Any]]:
"""Chunk *text* using chonkie's ``RecursiveChunker``.

Returns a list of plain dicts (JSON-serialisable) with keys:
- ``text``: the chunk content
- ``start_index``: character start offset in the original text
- ``end_index``: character end offset in the original text
- ``token_count``: number of tokens in the chunk
"""
if not text or not text.strip():
return []

chunker = RecursiveChunker(
chunk_size=chunk_size,
)

chunks = chunker.chunk(text)

result: list[dict[str, Any]] = []
for chunk in chunks:
result.append({
"text": chunk.text,
"start_index": chunk.start_index,
"end_index": chunk.end_index,
"token_count": chunk.token_count,
})

_LOGGER.debug(
"Chunked %d chars into %d chunks (size=%d)",
len(text), len(result), chunk_size,
)
return result
101 changes: 101 additions & 0 deletions extralit-server/src/extralit_server/jobs/document_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
from rq import Retry, get_current_job
from rq.decorators import job

import re

from extralit_server.api.schemas.v1.document.metadata import DocumentProcessingMetadata
from extralit_server.contexts import files
from extralit_server.contexts.document.analysis import PDFOCRLayerDetector
from extralit_server.contexts.document.chunker import chunk_text
from extralit_server.contexts.document.margin import PDFAnalyzer
from extralit_server.contexts.document.preprocessing import PDFPreprocessingSettings, PDFPreprocessor
from extralit_server.database import AsyncSessionLocal
Expand Down Expand Up @@ -189,3 +192,101 @@ async def analysis_and_preprocess_job(
current_job.meta["error"] = str(e)
current_job.save_meta()
raise


@job(queue=DEFAULT_QUEUE, connection=REDIS_CONNECTION, timeout=600, retry=Retry(max=3, interval=[10, 30, 60]))
async def process_text_extraction_result_job(document_id: UUID, extraction_result: dict) -> dict[str, Any]:
"""Process text-extraction JSON, chunk text with chonkie, and persist to DB.

Accepts the JSON result produced by an external OCR / text-extraction
worker (e.g. ``extralit_ocr.jobs.pymupdf_to_markdown_job``). The
``extraction_result`` dict may have one of these shapes:

* ``{"markdown": "..."}``
* ``{"text": "..."}``
* Marker-style ``{"pages": [{"blocks": [{"text"|"content"|"markdown": ...}]}]}``

Steps
-----
1. Extract plain text from whichever JSON shape is provided.
2. Create text chunks using **chonkie** ``RecursiveChunker``
(via :func:`~extralit_server.contexts.document.chunker.chunk_text`).
3. Persist the extracted text **and** chunks into
``documents.metadata_`` → ``text_extraction_metadata`` in the DB.
"""
current_job = get_current_job()
if current_job is None:
raise Exception("No current job found")

current_job.meta.update({"document_id": str(document_id), "workflow_step": "process_text_extraction"})
current_job.save_meta()

try:
text = _extract_text_from_result(extraction_result)

# Chunk the text using chonkie RecursiveChunker
chunks = chunk_text(text or "")

# Persist text + chunks into document metadata in the database
async with AsyncSessionLocal() as db:
document = await db.get(Document, document_id)
if document:
if document.metadata_ is None:
document.metadata_ = DocumentProcessingMetadata().model_dump()

metadata = DocumentProcessingMetadata(**document.metadata_)
metadata.update_text_extraction_results(
text=text or "",
chunks=chunks,
extraction_method="external_ocr",
)
document.metadata_ = metadata.model_dump()
await db.commit()

current_job.meta["chunks_count"] = len(chunks)
current_job.save_meta()

return {"document_id": str(document_id), "chunks_count": len(chunks)}

except Exception as e:
_LOGGER.error(f"Error processing text extraction for document {document_id}: {e}")
current_job.meta["error"] = str(e)
current_job.save_meta()
raise


def _extract_text_from_result(extraction_result: dict) -> str:
"""Pull plain text out of the various JSON shapes produced by OCR workers."""
if not extraction_result:
return ""

# Direct string fields
for key in ("markdown", "text", "content", "value"):
val = extraction_result.get(key)
if isinstance(val, str):
return val

# Marker-style pages → blocks
if "pages" in extraction_result and isinstance(extraction_result["pages"], list):
parts: list[str] = []
for page in extraction_result["pages"]:
blocks = page.get("blocks", []) if isinstance(page, dict) else []
for block in blocks:
if not isinstance(block, dict):
continue
content = (
block.get("markdown")
or block.get("text")
or block.get("content")
or block.get("html")
or ""
)
# Strip basic HTML tags if present
if content and "<" in content and ">" in content:
content = re.sub(r"<[^>]+>", "", content)
if content:
parts.append(content)
return "\n\n".join(parts)

# Absolute fallback
return str(extraction_result)
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from rq.group import Group

from extralit_server.database import AsyncSessionLocal
from extralit_server.jobs.document_jobs import analysis_and_preprocess_job
from extralit_server.jobs.document_jobs import analysis_and_preprocess_job, process_text_extraction_result_job
from extralit_server.jobs.queues import DEFAULT_QUEUE, OCR_QUEUE, REDIS_CONNECTION
from extralit_server.models.database import DocumentWorkflow

Expand Down Expand Up @@ -93,6 +93,12 @@ async def create_document_workflow(
group.enqueue_many(queue=DEFAULT_QUEUE, job_datas=[analysis_job_data])
group.enqueue_many(queue=OCR_QUEUE, job_datas=[text_extraction_job_data])

# Step 5: Chunking job — runs after text extraction completes.
# `process_text_extraction_result_job` is invoked with the OCR result
# (the dict returned by pymupdf_to_markdown_job) once that job finishes.
# Wiring via RQ Callback or a dependent enqueue is done by the caller
# that has access to the text-extraction result.

# Step 6: Future table extraction job (conditional based on analysis results)
# This will be added when table extraction is implemented
# table_extraction_job_data = OCR_QUEUE.prepare_data(
Expand Down
13 changes: 13 additions & 0 deletions extralit-server/tests/unit/contexts/document/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2024-present, Extralit Labs, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
85 changes: 85 additions & 0 deletions extralit-server/tests/unit/contexts/document/test_chunker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright 2024-present, Extralit Labs, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for the chonkie-based text chunker."""

import pytest

from extralit_server.contexts.document.chunker import chunk_text


class TestChunkText:
"""Tests for chunk_text using chonkie RecursiveChunker."""

def test_empty_string_returns_empty(self):
assert chunk_text("") == []

def test_whitespace_only_returns_empty(self):
assert chunk_text(" \n\t ") == []

def test_short_text_single_chunk(self):
text = "Hello world. This is a short sentence."
chunks = chunk_text(text, chunk_size=512)
assert len(chunks) >= 1
# The full text should appear in the chunk(s)
combined = " ".join(c["text"] for c in chunks)
assert "Hello world" in combined

def test_chunk_dict_keys(self):
text = "Some sample text for testing the chunker output format."
chunks = chunk_text(text, chunk_size=512)
assert len(chunks) >= 1
for chunk in chunks:
assert "text" in chunk
assert "start_index" in chunk
assert "end_index" in chunk
assert "token_count" in chunk
assert isinstance(chunk["text"], str)
assert isinstance(chunk["start_index"], int)
assert isinstance(chunk["end_index"], int)
assert isinstance(chunk["token_count"], int)

def test_long_text_produces_multiple_chunks(self):
# Create text that exceeds a single chunk
text = ("This is a paragraph of text. " * 200)
chunks = chunk_text(text, chunk_size=64)
assert len(chunks) > 1

def test_token_count_positive(self):
text = "The quick brown fox jumps over the lazy dog."
chunks = chunk_text(text, chunk_size=512)
for chunk in chunks:
assert chunk["token_count"] > 0

def test_start_end_index_ordering(self):
text = ("Sentence number one. " * 100)
chunks = chunk_text(text, chunk_size=32)
for chunk in chunks:
assert chunk["start_index"] < chunk["end_index"]

def test_chunks_are_json_serialisable(self):
import json

text = "A simple test for JSON serialisation."
chunks = chunk_text(text, chunk_size=512)
# Should not raise
serialised = json.dumps(chunks)
assert isinstance(serialised, str)

def test_custom_chunk_size(self):
text = ("Word " * 500)
chunks_small = chunk_text(text, chunk_size=32)
chunks_large = chunk_text(text, chunk_size=256)
# Smaller chunk size ⇒ more chunks
assert len(chunks_small) > len(chunks_large)
Loading