Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,22 +150,22 @@ core/
**Goal**: Implement robust document processing pipeline with multiple format support

### Tasks:
- [ ] **Document extraction engines**
- [ ] Implement PDFExtractor using PyMuPDF with fallback to Unstructured
- [ ] Implement WordExtractor using python-docx and Unstructured
- [ ] Implement TextExtractor for plain text files (.txt, .md)
- [ ] Implement MetadataExtractor for file properties and creation dates

- [ ] **Content processing pipeline**
- [ ] Create DocumentProcessor orchestration class
- [ ] Implement text chunking with semantic boundaries
- [ ] Add chunk overlap management for context preservation
- [ ] Create content validation and sanitization

- [ ] **Error handling & resilience**
- [ ] Implement robust error handling for corrupted files
- [x] **Document extraction engines**
- [x] Implement PDFExtractor using PyMuPDF with fallback to Unstructured
- [x] Implement WordExtractor using python-docx and Unstructured
- [x] Implement TextExtractor for plain text files (.txt, .md)
- [x] Implement MetadataExtractor for file properties and creation dates

- [x] **Content processing pipeline**
- [x] Create DocumentProcessor orchestration class
- [x] Implement text chunking with semantic boundaries
- [x] Add chunk overlap management for context preservation
- [x] Create content validation and sanitization

- [x] **Error handling & resilience**
- [x] Implement robust error handling for corrupted files
- [ ] Add retry logic for transient failures
- [ ] Create graceful degradation for unsupported formats
- [x] Create graceful degradation for unsupported formats
- [ ] Add progress tracking for long-running operations

- [ ] **Performance optimizations**
Expand All @@ -175,18 +175,18 @@ core/
- [ ] Add processing timeout and cancellation support

### Acceptance Criteria:
- [ ] Successfully processes PDF, DOCX, TXT, and MD files
- [ ] Handles corrupted or malformed files gracefully
- [ ] Extracts meaningful text while preserving structure
- [x] Successfully processes PDF, DOCX, TXT, and MD files
- [x] Handles corrupted or malformed files gracefully
- [x] Extracts meaningful text while preserving structure
- [ ] Processing completes within 30 seconds for files up to 100MB
- [ ] Provides real-time progress feedback

### Definition of Done:
- [ ] All document types tested with sample files
- [ ] Error scenarios thoroughly tested and handled
- [x] All document types tested with sample files
- [x] Error scenarios thoroughly tested and handled
- [ ] Performance requirements met (< 30s per document)
- [ ] Memory usage optimized for large file processing
- [ ] Comprehensive logging for debugging and monitoring
- [x] Comprehensive logging for debugging and monitoring

---

Expand Down
3 changes: 3 additions & 0 deletions core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .document_processor import DocumentProcessor

__all__ = ["DocumentProcessor"]
4 changes: 4 additions & 0 deletions core/chunking/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .text_chunker import TextChunker
from .chunk_manager import ChunkManager

__all__ = ["TextChunker", "ChunkManager"]
23 changes: 23 additions & 0 deletions core/chunking/chunk_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from __future__ import annotations

from typing import List

from core.models.document import DocumentChunk


class ChunkManager:
"""Utility to manage overlapping chunks."""

def __init__(self, overlap: int = 200) -> None:
self.overlap = overlap

def apply_overlap(self, chunks: List[DocumentChunk]) -> List[DocumentChunk]:
if not chunks:
return []
for i in range(1, len(chunks)):
prev = chunks[i - 1]
curr = chunks[i]
if prev.end_char - self.overlap < curr.start_char:
# adjust start to include overlap
curr.start_char = max(prev.end_char - self.overlap, 0)
return chunks
71 changes: 71 additions & 0 deletions core/chunking/text_chunker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from __future__ import annotations

import logging
from typing import List

from langchain.text_splitter import RecursiveCharacterTextSplitter

try: # pragma: no cover - optional for offline environments
import tiktoken
except Exception: # pragma: no cover - tiktoken may fail to download model
tiktoken = None

from core.models.document import DocumentChunk

logger = logging.getLogger(__name__)


class TextChunker:
"""Smart text chunking with overlap for RAG."""

def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200) -> None:
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
if tiktoken is not None:
try:
self.encoding = tiktoken.get_encoding("cl100k_base")
except Exception: # pragma: no cover - fallback when no network
self.encoding = None
else:
self.encoding = None
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=self._token_length,
separators=["\n\n", "\n", ". ", " ", ""],
)

def _token_length(self, text: str) -> int:
if self.encoding is None:
return len(text)
return len(self.encoding.encode(text))

def chunk_document(self, document_id: str, content: str) -> List[DocumentChunk]:
"""Split document into overlapping chunks."""
if not content.strip():
return []

texts = self.splitter.split_text(content)
chunks: List[DocumentChunk] = []
char_index = 0
for i, text in enumerate(texts):
start_char = content.find(text, char_index)
end_char = start_char + len(text)
char_index = start_char + len(text) - self.chunk_overlap

chunk = DocumentChunk(
id=f"chunk_{document_id}_{i}",
document_id=document_id,
content=text,
chunk_index=i,
start_char=start_char,
end_char=end_char,
metadata={
"token_count": self._token_length(text),
"chunk_total": len(texts),
},
)
chunks.append(chunk)

logger.info("Created %s chunks for document %s", len(chunks), document_id)
return chunks
70 changes: 70 additions & 0 deletions core/document_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from __future__ import annotations

import hashlib
import logging
from pathlib import Path
from typing import List, Tuple

from core.exceptions import DocumentProcessingError
from core.models.document import Document, DocumentChunk

from .extractors import (
DOCXExtractor,
MetadataExtractor,
PDFExtractor,
TextExtractor,
)
from .chunking import TextChunker
from .processors import ContentCleaner

logger = logging.getLogger(__name__)


class DocumentProcessor:
"""Main document processing orchestrator."""

def __init__(self) -> None:
self.extractors = [PDFExtractor(), DOCXExtractor(), TextExtractor()]
self.metadata_extractor = MetadataExtractor()
self.chunker = TextChunker()
self.cleaner = ContentCleaner()

def process_document(self, file_path: str) -> Tuple[Document, List[DocumentChunk]]:
"""Process a document and return the Document object and its chunks."""
logger.info("Processing document: %s", file_path)
extractor = self._get_extractor(file_path)
if extractor is None:
raise DocumentProcessingError(f"No extractor found for file: {file_path}")

extracted = extractor.extract(file_path)
content = extracted["content"]
metadata = extracted.get("metadata", {})
metadata.update(self.metadata_extractor.extract(file_path))

cleaned_content = self.cleaner.clean_text(content)

file_path_obj = Path(file_path)
document = Document(
id=self._generate_document_id(cleaned_content),
filename=file_path_obj.name,
file_type=file_path_obj.suffix.lower().lstrip("."),
file_size=file_path_obj.stat().st_size,
content=cleaned_content,
metadata=metadata,
)

chunks = self.chunker.chunk_document(document.id, cleaned_content)
document.chunk_ids = [chunk.id for chunk in chunks]

logger.info("Document processed: %s with %s chunks", document.id, len(chunks))
return document, chunks

def _get_extractor(self, file_path: str):
for extractor in self.extractors:
if extractor.can_extract(file_path):
return extractor
return None

def _generate_document_id(self, content: str) -> str:
content_hash = hashlib.sha256(content.encode()).hexdigest()
return f"doc_{content_hash[:12]}"
13 changes: 13 additions & 0 deletions core/extractors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .base_extractor import BaseExtractor
from .pdf_extractor import PDFExtractor
from .docx_extractor import DOCXExtractor
from .text_extractor import TextExtractor
from .metadata_extractor import MetadataExtractor

__all__ = [
"BaseExtractor",
"PDFExtractor",
"DOCXExtractor",
"TextExtractor",
"MetadataExtractor",
]
29 changes: 29 additions & 0 deletions core/extractors/base_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from __future__ import annotations

import os
from abc import ABC, abstractmethod
from typing import Any, Dict

from core.exceptions import DocumentProcessingError


class BaseExtractor(ABC):
"""Abstract base class for document extractors."""

@abstractmethod
def can_extract(self, file_path: str) -> bool:
"""Return True if this extractor can handle the given file."""
raise NotImplementedError

@abstractmethod
def extract(self, file_path: str) -> Dict[str, Any]:
"""Extract text and metadata from the file."""
raise NotImplementedError

def validate_file(self, file_path: str) -> None:
"""Validate that the file exists and is within size limits."""
if not os.path.exists(file_path):
raise DocumentProcessingError(f"File not found: {file_path}")

if os.path.getsize(file_path) > 100 * 1024 * 1024:
raise DocumentProcessingError("File too large (max 100MB)")
55 changes: 55 additions & 0 deletions core/extractors/docx_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from __future__ import annotations

import logging
from typing import Any, Dict

from docx import Document as DocxDocument

from core.exceptions import DocumentProcessingError

from .base_extractor import BaseExtractor

logger = logging.getLogger(__name__)


class DOCXExtractor(BaseExtractor):
"""Extract text from Word documents."""

def can_extract(self, file_path: str) -> bool:
return file_path.lower().endswith(".docx")

def extract(self, file_path: str) -> Dict[str, Any]:
"""Extract text and metadata from DOCX."""
self.validate_file(file_path)

try:
doc = DocxDocument(file_path)

paragraphs = [para.text for para in doc.paragraphs if para.text.strip()]

table_texts: list[str] = []
for table in doc.tables:
table_text = []
for row in table.rows:
row_text = [cell.text.strip() for cell in row.cells]
if any(row_text):
table_text.append(" | ".join(row_text))
if table_text:
table_texts.append("\n".join(table_text))

full_text = "\n\n".join(paragraphs)
if table_texts:
full_text += "\n\n[Tables]\n" + "\n\n".join(table_texts)

metadata = {
"paragraph_count": len(paragraphs),
"table_count": len(doc.tables),
"author": doc.core_properties.author or "",
"title": doc.core_properties.title or "",
"created": str(doc.core_properties.created) if doc.core_properties.created else "",
}

return {"content": full_text, "metadata": metadata}
except Exception as exc: # pragma: no cover - external lib
logger.error("DOCX extraction failed: %s", exc)
raise DocumentProcessingError(f"Failed to extract DOCX: {exc}") from exc
17 changes: 17 additions & 0 deletions core/extractors/metadata_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from __future__ import annotations

import os
from datetime import datetime
from typing import Any, Dict


class MetadataExtractor:
"""Extract basic file metadata."""

def extract(self, file_path: str) -> Dict[str, Any]:
stat = os.stat(file_path)
return {
"created": datetime.fromtimestamp(stat.st_ctime).isoformat(),
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"size": stat.st_size,
}
Loading