Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/intelstream/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ async def _setup_search(self) -> None:
self.vector_store = VectorStore(
data_dir=self.settings.zvec_data_dir,
dimensions=self.settings.embedding_dimensions,
model_name=self.settings.embedding_model,
)
await self.vector_store.initialize()

Expand Down
10 changes: 10 additions & 0 deletions src/intelstream/database/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,16 @@ async def get_summarized_content_items(
)
return list(result.scalars().all())

async def count_summarized_content_items(self) -> int:
async with self.session() as session:
result = await session.execute(
select(func.count())
.select_from(ContentItem)
.where(ContentItem.summary.isnot(None))
.where(ContentItem.summary != "")
)
return int(result.scalar_one())

async def content_item_exists(self, external_id: str) -> bool:
async with self.session() as session:
result = await session.execute(
Expand Down
216 changes: 174 additions & 42 deletions src/intelstream/database/vector_store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import json
import os
import shutil
from dataclasses import dataclass
Expand All @@ -14,6 +15,9 @@

logger = structlog.get_logger(__name__)

_VECTOR_FIELD_NAME = "embedding"
_METADATA_FILENAME = "intelstream-index.json"


@dataclass
class SearchResult:
Expand All @@ -31,20 +35,32 @@ class VectorStore:
_ARTICLES_COLLECTION = "articles"
_MESSAGE_CHUNKS_COLLECTION = "message_chunks"

def __init__(self, data_dir: str, dimensions: int = 384) -> None:
def __init__(
self,
data_dir: str,
dimensions: int = 384,
model_name: str | None = None,
) -> None:
self._data_dir = data_dir
self._dimensions = dimensions
self._model_name = model_name
self._articles: zvec.Collection | None = None
self._message_chunks: dict[str, zvec.Collection] = {}

async def initialize(self) -> None:
await asyncio.to_thread(os.makedirs, self._data_dir, exist_ok=True)
self._articles = await self._open_or_create_collection(self._ARTICLES_COLLECTION)
self._articles = await self._open_or_create_collection(
self._ARTICLES_COLLECTION,
validate_metadata=True,
)
await asyncio.to_thread(self._warn_if_legacy_message_chunk_collection_present)

def _collection_path(self, collection_name: str) -> str:
return str(Path(self._data_dir) / collection_name)

def _collection_metadata_path(self, collection_name: str, path: str | None = None) -> Path:
return Path(path or self._collection_path(collection_name)) / _METADATA_FILENAME

def _collection_attr_name(self, collection_name: str) -> str:
if collection_name == self._ARTICLES_COLLECTION:
return "_articles"
Expand Down Expand Up @@ -74,53 +90,173 @@ def _build_schema(self, collection_name: str) -> zvec.CollectionSchema:

return zvec.CollectionSchema(
name=collection_name,
vectors=zvec.VectorSchema("embedding", zvec.DataType.VECTOR_FP32, self._dimensions),
vectors=zvec.VectorSchema(
_VECTOR_FIELD_NAME,
zvec.DataType.VECTOR_FP32,
self._dimensions,
),
)

async def _open_or_create_collection(
def _expected_collection_metadata(self, collection_name: str) -> dict[str, Any]:
return {
"collection": collection_name,
"dimensions": self._dimensions,
"model_name": self._model_name,
}

def _read_collection_metadata(
self, collection_name: str, path: str | None = None
) -> dict[str, Any] | None:
path_obj = self._collection_metadata_path(collection_name, path)
if not path_obj.exists():
return None
try:
data = json.loads(path_obj.read_text())
if isinstance(data, dict):
return data
logger.warning(
"Vector collection metadata file is not a JSON object",
collection=collection_name,
path=str(path_obj),
)
return None
except (json.JSONDecodeError, OSError):
logger.warning(
"Failed to read vector collection metadata",
collection=collection_name,
path=str(path_obj),
)
return None

def _write_collection_metadata(self, collection_name: str, path: str | None = None) -> None:
path_obj = self._collection_metadata_path(collection_name, path)
path_obj.write_text(
json.dumps(
self._expected_collection_metadata(collection_name), indent=2, sort_keys=True
)
)

def _collection_dimension(self, collection: zvec.Collection) -> int:
schema = json.loads(str(collection.schema))
return int(schema["vectors"][_VECTOR_FIELD_NAME]["dimension"])

async def _collection_needs_recreate(
self,
collection_name: str,
collection: zvec.Collection,
path: str | None = None,
) -> str | None:
actual_dimension = await asyncio.to_thread(self._collection_dimension, collection)
if actual_dimension != self._dimensions:
return f"dimension mismatch ({actual_dimension} != {self._dimensions})"

metadata = await asyncio.to_thread(self._read_collection_metadata, collection_name, path)
if metadata is None:
return None

stored_dimensions = metadata.get("dimensions")
if isinstance(stored_dimensions, int) and stored_dimensions != self._dimensions:
return (
f"stored metadata dimensions mismatch ({stored_dimensions} != {self._dimensions})"
)

stored_model_name = metadata.get("model_name")
if self._model_name and stored_model_name and stored_model_name != self._model_name:
return f"model mismatch ({stored_model_name} != {self._model_name})"

return None

async def _destroy_collection_at_path(
self,
collection_name: str,
collection: zvec.Collection | None,
path: str | None = None,
) -> None:
collection_path = path or self._collection_path(collection_name)
if collection is not None:
try:
await asyncio.to_thread(collection.destroy)
except Exception:
logger.warning(
"Failed to destroy vector collection cleanly, removing files manually",
collection=collection_name,
path=collection_path,
)

if await asyncio.to_thread(os.path.exists, collection_path):
await asyncio.to_thread(shutil.rmtree, collection_path, True)

async def _open_or_create_collection(
self,
collection_name: str,
path: str | None = None,
*,
validate_metadata: bool = False,
) -> zvec.Collection:
import zvec

path = path or self._collection_path(collection_name)
collection_path = path or self._collection_path(collection_name)
try:
collection = await asyncio.to_thread(
zvec.create_and_open,
path=path,
path=collection_path,
schema=self._build_schema(collection_name),
)
logger.info("Created new vector collection", collection=collection_name)
if validate_metadata:
await asyncio.to_thread(
self._write_collection_metadata,
collection_name,
collection_path,
)
return collection
except Exception:
collection = await asyncio.to_thread(
zvec.open,
path=path,
path=collection_path,
option=zvec.CollectionOption(),
)
logger.info("Opened existing vector collection", collection=collection_name)
if validate_metadata:
recreate_reason = await self._collection_needs_recreate(
collection_name,
collection,
collection_path,
)
if recreate_reason is not None:
logger.warning(
"Recreating incompatible vector collection",
collection=collection_name,
reason=recreate_reason,
)
await self._destroy_collection_at_path(
collection_name,
collection,
collection_path,
)
collection = await asyncio.to_thread(
zvec.create_and_open,
path=collection_path,
schema=self._build_schema(collection_name),
)
logger.info("Recreated vector collection", collection=collection_name)
await asyncio.to_thread(
self._write_collection_metadata,
collection_name,
collection_path,
)
return collection

async def _recreate_collection(self, collection_name: str) -> zvec.Collection:
attr_name = self._collection_attr_name(collection_name)
collection = getattr(self, attr_name)
path = self._collection_path(collection_name)
await self._destroy_collection_at_path(collection_name, collection)
setattr(self, attr_name, None)

if collection is not None:
try:
await asyncio.to_thread(collection.destroy)
except Exception:
logger.warning(
"Failed to destroy vector collection cleanly, removing files manually",
collection=collection_name,
path=path,
)
finally:
setattr(self, attr_name, None)

if await asyncio.to_thread(os.path.exists, path):
await asyncio.to_thread(shutil.rmtree, path, True)

recreated = await self._open_or_create_collection(collection_name)
recreated = await self._open_or_create_collection(
collection_name,
validate_metadata=True,
)
setattr(self, attr_name, recreated)
return recreated

Expand All @@ -146,18 +282,11 @@ async def _recreate_message_chunk_collection(self, guild_id: str) -> zvec.Collec
collection = self._message_chunks.pop(guild_id, None)
path = self._message_chunk_collection_path(guild_id)

if collection is not None:
try:
await asyncio.to_thread(collection.destroy)
except Exception:
logger.warning(
"Failed to destroy vector collection cleanly, removing files manually",
collection=self._message_chunk_collection_name(guild_id),
path=path,
)

if await asyncio.to_thread(os.path.exists, path):
await asyncio.to_thread(shutil.rmtree, path, True)
await self._destroy_collection_at_path(
self._message_chunk_collection_name(guild_id),
collection,
path,
)

recreated = await self._open_or_create_collection(
self._message_chunk_collection_name(guild_id),
Expand All @@ -178,7 +307,7 @@ async def upsert_article(self, content_item_id: str, embedding: list[float]) ->
raise RuntimeError("VectorStore not initialized")
doc = zvec.Doc(
id=content_item_id,
vectors={"embedding": embedding},
vectors={_VECTOR_FIELD_NAME: embedding},
)
await asyncio.to_thread(self._articles.upsert, [doc])

Expand All @@ -189,7 +318,7 @@ async def upsert_articles_batch(self, items: list[tuple[str, list[float]]]) -> N
raise RuntimeError("VectorStore not initialized")
if not items:
return
docs = [zvec.Doc(id=item_id, vectors={"embedding": emb}) for item_id, emb in items]
docs = [zvec.Doc(id=item_id, vectors={_VECTOR_FIELD_NAME: emb}) for item_id, emb in items]
await asyncio.to_thread(self._articles.upsert, docs)

async def search_articles(
Expand All @@ -201,7 +330,7 @@ async def search_articles(
raise RuntimeError("VectorStore not initialized")
results: Any = await asyncio.to_thread(
self._articles.query,
zvec.VectorQuery("embedding", vector=query_embedding),
zvec.VectorQuery(_VECTOR_FIELD_NAME, vector=query_embedding),
topk=topk,
)
return [SearchResult(content_item_id=r.id, score=r.score) for r in results]
Expand All @@ -214,6 +343,9 @@ async def delete_article(self, content_item_id: str) -> None:
async def article_doc_count(self) -> int:
return await self._doc_count(self._articles)

async def recreate_articles_collection(self) -> None:
await self._recreate_collection(self._ARTICLES_COLLECTION)

async def upsert_message_chunk(
self, guild_id: str, chunk_id: str, embedding: list[float]
) -> None:
Expand All @@ -224,7 +356,7 @@ async def upsert_message_chunk(
raise RuntimeError("VectorStore not initialized")
doc = zvec.Doc(
id=chunk_id,
vectors={"embedding": embedding},
vectors={_VECTOR_FIELD_NAME: embedding},
)
await asyncio.to_thread(collection.upsert, [doc])

Expand All @@ -238,7 +370,7 @@ async def upsert_message_chunks_batch(
raise RuntimeError("VectorStore not initialized")
if not items:
return
docs = [zvec.Doc(id=cid, vectors={"embedding": emb}) for cid, emb in items]
docs = [zvec.Doc(id=cid, vectors={_VECTOR_FIELD_NAME: emb}) for cid, emb in items]
await asyncio.to_thread(collection.upsert, docs)

async def search_message_chunks(
Expand All @@ -251,7 +383,7 @@ async def search_message_chunks(
return []
results: Any = await asyncio.to_thread(
collection.query,
zvec.VectorQuery("embedding", vector=query_embedding),
zvec.VectorQuery(_VECTOR_FIELD_NAME, vector=query_embedding),
topk=topk,
)
return [ChunkSearchResult(chunk_id=r.id, score=r.score) for r in results]
Expand Down
Loading
Loading