Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,22 +240,22 @@ core/
**Goal**: Implement ChromaDB integration with search capabilities

### Tasks:
- [ ] **ChromaDB integration**
- [ ] Create ChromaManager class for database operations
- [ ] Implement collection management and persistence
- [ ] Add document and chunk storage with metadata
- [x] **ChromaDB integration**
- [x] Create ChromaManager class for database operations
- [x] Implement collection management and persistence
- [x] Add document and chunk storage with metadata
- [ ] Create database connection management and health checks

- [ ] **Vector operations**
- [ ] Implement vector storage with automatic indexing
- [ ] Add similarity search with configurable distance metrics
- [ ] Create metadata filtering and query optimization
- [x] **Vector operations**
- [x] Implement vector storage with automatic indexing
- [x] Add similarity search with configurable distance metrics
- [x] Create metadata filtering and query optimization
- [ ] Implement batch operations for efficiency

- [ ] **Data management**
- [ ] Add document deletion and cleanup operations
- [x] **Data management**
- [x] Add document deletion and cleanup operations
- [ ] Implement database backup and recovery
- [ ] Create collection statistics and monitoring
- [x] Create collection statistics and monitoring
- [ ] Add data consistency validation

- [ ] **Performance optimization**
Expand All @@ -265,15 +265,15 @@ core/
- [ ] Create database maintenance and optimization routines

### Acceptance Criteria:
- [ ] Stores document embeddings with metadata successfully
- [ ] Similarity search returns relevant results in < 2 seconds
- [x] Stores document embeddings with metadata successfully
- [x] Similarity search returns relevant results in < 2 seconds
- [ ] Supports collections of 1000+ documents efficiently
- [ ] Database persists data correctly across restarts
- [ ] Metadata filtering works with complex queries
- [x] Metadata filtering works with complex queries

### Definition of Done:
- [ ] Full CRUD operations implemented and tested
- [ ] Search performance meets requirements (< 2s response)
- [x] Full CRUD operations implemented and tested
- [x] Search performance meets requirements (< 2s response)
- [ ] Data persistence verified across application restarts
- [ ] Database health monitoring and alerts configured
- [ ] Backup and recovery procedures documented and tested
Expand Down
3 changes: 2 additions & 1 deletion core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .document_processor import DocumentProcessor
from .embedder import EmbeddingService
from .vector_store import VectorStore

__all__ = ["DocumentProcessor", "EmbeddingService"]
__all__ = ["DocumentProcessor", "EmbeddingService", "VectorStore"]
19 changes: 19 additions & 0 deletions core/utils/vector_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import annotations

from typing import List
import math


def cosine_similarity(vec1: List[float], vec2: List[float]) -> float:
"""Compute cosine similarity between two vectors."""
if len(vec1) != len(vec2):
raise ValueError("Vectors must be the same length")
dot = sum(a * b for a, b in zip(vec1, vec2))
norm1 = math.sqrt(sum(a * a for a in vec1))
norm2 = math.sqrt(sum(b * b for b in vec2))
if norm1 == 0 or norm2 == 0:
return 0.0
return dot / (norm1 * norm2)


__all__ = ["cosine_similarity"]
120 changes: 120 additions & 0 deletions core/vector_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from __future__ import annotations

from typing import Any, Dict, List
import logging
import time

from core.models.document import Document, DocumentChunk
from core.models.search import SearchQuery, SearchResponse
from core.exceptions.custom_exceptions import VectorStoreError
from config.settings import get_settings

from .vectordb.chroma_manager import ChromaManager
from .vectordb.collection_manager import CollectionManager
from .vectordb.query_builder import QueryBuilder

logger = logging.getLogger(__name__)
settings = get_settings()


class VectorStore:
"""Store and retrieve document embeddings using ChromaDB."""

def __init__(self) -> None:
self.chroma_manager = ChromaManager(persist_directory=settings.chroma_persist_dir)
self.collection = self.chroma_manager.get_or_create_collection(
name="semantic_scout_docs",
metadata={
"description": "Document embeddings for semantic search",
"embedding_model": settings.embedding_model,
"embedding_dimension": settings.embedding_dimension,
},
)
self.collection_manager = CollectionManager(self.collection)
self.query_builder = QueryBuilder(self.collection)

def store_document(self, document: Document, chunks: List[DocumentChunk]) -> None:
"""Store a document and its chunks in the vector database."""
logger.info("Storing document %s with %s chunks", document.id, len(chunks))
deleted = self.collection_manager.delete_document(document.id)
if deleted > 0:
logger.info("Removed %s existing chunks for document %s", deleted, document.id)
self.collection_manager.add_documents(document, chunks)

def search(self, query_embedding: List[float], search_query: SearchQuery) -> SearchResponse:
"""Search for similar chunks."""
start = time.time()
results = self.query_builder.search(query_embedding, search_query)
duration = (time.time() - start) * 1000
response = SearchResponse(
query=search_query,
results=results,
total_results=len(results),
search_time_ms=duration,
)
logger.info("Search completed in %.2fms, found %s results", duration, len(results))
return response

def get_chunks_by_ids(self, chunk_ids: List[str]) -> List[DocumentChunk]:
"""Retrieve chunks by their IDs."""
try:
results = self.collection.get(
ids=chunk_ids,
include=["documents", "metadatas", "embeddings"],
)
chunks: List[DocumentChunk] = []
for i, chunk_id in enumerate(results["ids"]):
chunks.append(
DocumentChunk(
id=chunk_id,
document_id=results["metadatas"][i]["document_id"],
content=results["documents"][i],
chunk_index=results["metadatas"][i]["chunk_index"],
start_char=results["metadatas"][i]["start_char"],
end_char=results["metadatas"][i]["end_char"],
embedding=results["embeddings"][i]
if results.get("embeddings") is not None
else None,
metadata=results["metadatas"][i],
)
)
return chunks
except Exception as exc:
logger.error("Failed to retrieve chunks: %s", exc)
raise VectorStoreError(f"Chunk retrieval failed: {exc}") from exc

def delete_document(self, document_id: str) -> bool:
"""Delete a document and its chunks."""
try:
deleted = self.collection_manager.delete_document(document_id)
return deleted > 0
except Exception as exc: # pragma: no cover - wrapper
logger.error("Failed to delete document: %s", exc)
return False

def get_all_documents(self) -> List[Dict[str, Any]]:
"""Return summary of stored documents."""
try:
all_metadata = self.collection.get(include=["metadatas"])["metadatas"]
documents: Dict[str, Dict[str, Any]] = {}
for metadata in all_metadata:
doc_id = metadata.get("document_id")
if doc_id and doc_id not in documents:
documents[doc_id] = {
"document_id": doc_id,
"filename": metadata.get("filename", "Unknown"),
"file_type": metadata.get("file_type", "Unknown"),
"chunk_count": 0,
}
if doc_id:
documents[doc_id]["chunk_count"] += 1
return list(documents.values())
except Exception as exc: # pragma: no cover - wrapper
logger.error("Failed to get documents: %s", exc)
return []

def get_stats(self) -> Dict[str, Any]:
"""Return statistics about the vector store."""
stats = self.collection_manager.get_stats()
stats["persist_directory"] = str(self.chroma_manager.persist_directory)
return stats
5 changes: 5 additions & 0 deletions core/vectordb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .chroma_manager import ChromaManager
from .collection_manager import CollectionManager
from .query_builder import QueryBuilder

__all__ = ["ChromaManager", "CollectionManager", "QueryBuilder"]
68 changes: 68 additions & 0 deletions core/vectordb/chroma_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from __future__ import annotations

from pathlib import Path
from typing import Any, Dict, List, Optional
import logging

import chromadb
from chromadb.config import Settings

from core.exceptions.custom_exceptions import VectorStoreError

logger = logging.getLogger(__name__)


class ChromaManager:
"""Manage ChromaDB client and collections."""

def __init__(self, persist_directory: str = "./data/chroma_db") -> None:
self.persist_directory = Path(persist_directory)
self.persist_directory.mkdir(parents=True, exist_ok=True)

try:
self.client = chromadb.PersistentClient(
path=str(self.persist_directory),
settings=Settings(anonymized_telemetry=False, allow_reset=True),
)
logger.info("ChromaDB initialized at %s", self.persist_directory)
except Exception as exc: # pragma: no cover - initialization rarely fails
logger.error("Failed to initialize ChromaDB: %s", exc)
raise VectorStoreError(f"ChromaDB initialization failed: {exc}") from exc

def get_or_create_collection(
self, name: str, metadata: Optional[Dict[str, Any]] = None
) -> chromadb.Collection:
"""Return existing collection or create a new one."""
try:
collection = self.client.get_collection(name=name)
logger.info("Retrieved existing collection: %s", name)
return collection
except Exception:
collection = self.client.create_collection(
name=name,
metadata=metadata or {"description": "Document embeddings"},
)
logger.info("Created new collection: %s", name)
return collection

def delete_collection(self, name: str) -> None:
"""Delete a collection by name."""
try:
self.client.delete_collection(name=name)
logger.info("Deleted collection: %s", name)
except Exception as exc: # pragma: no cover - simple wrapper
logger.error("Failed to delete collection: %s", exc)
raise VectorStoreError(f"Collection deletion failed: {exc}") from exc

def list_collections(self) -> List[str]:
"""List available collections."""
return [col.name for col in self.client.list_collections()]

def reset_database(self) -> None:
"""Reset the entire Chroma database."""
try:
self.client.reset()
logger.warning("ChromaDB has been reset")
except Exception as exc: # pragma: no cover - rarely used
logger.error("Failed to reset ChromaDB: %s", exc)
raise VectorStoreError(f"Database reset failed: {exc}") from exc
116 changes: 116 additions & 0 deletions core/vectordb/collection_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from __future__ import annotations

from typing import Any, Dict, List
import logging

import chromadb

from core.models.document import Document, DocumentChunk
from core.exceptions.custom_exceptions import VectorStoreError

logger = logging.getLogger(__name__)


class CollectionManager:
"""Handle operations on a ChromaDB collection."""

def __init__(self, collection: chromadb.Collection) -> None:
self.collection = collection

def add_documents(self, document: Document, chunks: List[DocumentChunk]) -> None:
"""Add document chunks to the collection."""
if not chunks:
return

ids: List[str] = []
embeddings: List[List[float]] = []
documents: List[str] = []
metadatas: List[Dict[str, Any]] = []

for chunk in chunks:
if chunk.embedding is None:
logger.warning("Skipping chunk %s - no embedding", chunk.id)
continue
ids.append(chunk.id)
embeddings.append(chunk.embedding)
documents.append(chunk.content)
metadata = {
"document_id": document.id,
"filename": document.filename,
"file_type": document.file_type,
"chunk_index": chunk.chunk_index,
"start_char": chunk.start_char,
"end_char": chunk.end_char,
**chunk.metadata,
}
metadatas.append(metadata)

if not ids:
logger.warning("No chunks with embeddings for document %s", document.id)
return

try:
self.collection.add(
ids=ids,
embeddings=embeddings,
documents=documents,
metadatas=metadatas,
)
logger.info("Added %s chunks from document %s", len(ids), document.id)
except Exception as exc:
logger.error("Failed to add documents: %s", exc)
raise VectorStoreError(f"Failed to store document chunks: {exc}") from exc

def delete_document(self, document_id: str) -> int:
"""Remove all chunks for a document."""
try:
results = self.collection.get(where={"document_id": document_id})
if results["ids"]:
self.collection.delete(ids=results["ids"])
logger.info(
"Deleted %s chunks for document %s", len(results["ids"]), document_id
)
return len(results["ids"])
return 0
except Exception as exc:
logger.error("Failed to delete document: %s", exc)
raise VectorStoreError(f"Failed to delete document: {exc}") from exc

def get_document_chunks(self, document_id: str) -> List[Dict[str, Any]]:
"""Retrieve all chunks for a document."""
try:
results = self.collection.get(
where={"document_id": document_id},
include=["documents", "metadatas", "embeddings"],
)
chunks: List[Dict[str, Any]] = []
for i in range(len(results["ids"])):
chunks.append(
{
"id": results["ids"][i],
"content": results["documents"][i],
"metadata": results["metadatas"][i],
"embedding": results["embeddings"][i]
if results.get("embeddings")
else None,
}
)
return chunks
except Exception as exc:
logger.error("Failed to get document chunks: %s", exc)
raise VectorStoreError(f"Failed to retrieve chunks: {exc}") from exc

def get_stats(self) -> Dict[str, Any]:
"""Return statistics about the collection."""
try:
count = self.collection.count()
all_metadata = self.collection.get(include=["metadatas"])["metadatas"]
unique_docs = {m.get("document_id") for m in all_metadata if m}
return {
"total_chunks": count,
"total_documents": len(unique_docs),
"collection_name": self.collection.name,
}
except Exception as exc: # pragma: no cover - simple wrapper
logger.error("Failed to get stats: %s", exc)
return {"error": str(exc)}
Loading