Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- Docs are built with Quarto, so use it's syntax in docstrings.

## Must-Run Commands Before Hand-off
- Before committing or pushing Python changes, run `./.venv/bin/task format`; CI runs `format_check` and will fail on unformatted files.
- `./.venv/bin/task check`
- `./.venv/bin/task tests`
- For docs changes, also `./.venv/bin/task docs_build`
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ for link in links:
chunked_document = chunker.chunk(document)
store.upsert(chunked_document)

# Build indexes before retrieval
store.build_index()

# Retrieve relevant chunks
chunks = store.retrieve("How do I stream a response?", top_k=5)
for chunk in chunks:
Expand Down
128 changes: 102 additions & 26 deletions src/raghilda/_duckdb_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ class DuckDBStore(BaseStore):
)
store.upsert(MarkdownChunker().chunk(doc))

# Build indexes before retrieval
store.build_index()

# Retrieve similar chunks
chunks = store.retrieve("How do I use this?", top_k=5)
```
Expand Down Expand Up @@ -326,7 +329,8 @@ def create(
name VARCHAR,
title VARCHAR,
embed_config VARCHAR,
attributes_schema_json VARCHAR
attributes_schema_json VARCHAR,
bm25_index_is_current BOOLEAN DEFAULT FALSE
);

CREATE OR REPLACE TABLE documents (
Expand Down Expand Up @@ -367,14 +371,16 @@ def create(
name,
title,
embed_config,
attributes_schema_json
) VALUES (?, ?, ?, ?)
attributes_schema_json,
bm25_index_is_current
) VALUES (?, ?, ?, ?, ?)
""",
[
name,
title,
embed_config_json,
attributes_schema_json,
False,
],
)

Expand All @@ -400,7 +406,11 @@ def __init__(
attributes_schema=self.metadata.attributes_schema,
require_embedding=self.metadata.embed is not None,
)
self._db_lock = threading.Lock()
self._db_lock = threading.RLock()
# Best-effort BM25 state for this handle. We intentionally avoid a
# metadata read on every retrieval; multiple live stores per DB file
# are unsupported and are not kept in sync.
self._has_bm25_index = _read_bm25_index_state(self.con)

def upsert(
self,
Expand Down Expand Up @@ -508,7 +518,11 @@ def upsert(
else:
_duckdb_append(self.con, "documents", [doc_row])
_duckdb_append(self.con, "embeddings", chunk_rows)
_set_bm25_index_state(self.con, False)
self.con.commit()
# DuckDB FTS materializes BM25 state in side tables and does not
# refresh it after writes, while HNSW indexes are maintained.
self._has_bm25_index = False
except Exception:
try:
self.con.rollback()
Expand Down Expand Up @@ -989,12 +1003,12 @@ def retrieve_bm25(
sql = f"""
WITH ranked AS (
SELECT
e.chunk_id,
e.chunk_id,
doc.origin AS origin,
e.start_index,
e.end_index,
e.start_index,
e.end_index,
e.char_count,
e.context,
e.context,
{attribute_select}
doc.text[e.start_index + 1:e.end_index] AS text,
'bm25' AS metric_name,
Expand All @@ -1011,6 +1025,7 @@ def retrieve_bm25(
"""

with self._db_lock:
self._require_bm25_index()
result = self.con.execute(
sql,
{
Expand Down Expand Up @@ -1048,6 +1063,18 @@ def retrieve_bm25(

return output

def _require_bm25_index(self) -> None:
if not self._has_bm25_index:
rebuild_hint = 'Call `store.build_index("bm25")`'
if self.metadata.embed is not None:
rebuild_hint += " or `store.build_index()`"
raise RuntimeError(
"DuckDBStore retrieval requires a current BM25 index. "
f"{rebuild_hint} "
"after inserting or updating documents and before calling "
"`retrieve_bm25()` or `retrieve()`."
)

def build_index(
self,
type: Optional[IndexType | str | list[IndexType | str]] = None,
Expand All @@ -1069,25 +1096,28 @@ def build_index(
else:
index_types = [_coerce_index_type(item) for item in type]

if IndexType.BM25 in index_types:
self.con.execute("INSTALL FTS; LOAD FTS;")
try:
self.con.begin()
self._create_fts_index()
self.con.commit()
except Exception as e:
self.con.rollback()
raise e
with self._db_lock:
if IndexType.BM25 in index_types:
self.con.execute("INSTALL FTS; LOAD FTS;")
try:
self.con.begin()
self._create_fts_index()
_set_bm25_index_state(self.con, True)
self.con.commit()
self._has_bm25_index = True
except Exception as e:
self.con.rollback()
raise e

if IndexType.HNSW in index_types:
self.con.execute("INSTALL vss; LOAD vss;")
try:
self.con.begin()
self._create_hnsw_index()
self.con.commit()
except Exception as e:
self.con.rollback()
raise e
if IndexType.HNSW in index_types:
self.con.execute("INSTALL vss; LOAD vss;")
try:
self.con.begin()
self._create_hnsw_index()
self.con.commit()
except Exception as e:
self.con.rollback()
raise e

def _create_fts_index(self):
self.con.execute(
Expand Down Expand Up @@ -1207,6 +1237,52 @@ def _load_extensions_for_existing_indexes(con: duckdb.DuckDBPyConnection) -> Non
con.execute("INSTALL vss; LOAD vss;")


def _ensure_bm25_index_state_column(
con: duckdb.DuckDBPyConnection,
) -> None:
if "bm25_index_is_current" in _table_columns(con, table="metadata"):
return
con.execute(
"ALTER TABLE metadata ADD COLUMN bm25_index_is_current BOOLEAN DEFAULT FALSE"
)


def _read_bm25_index_state(con: duckdb.DuckDBPyConnection) -> bool:
if "bm25_index_is_current" not in _table_columns(con, table="metadata"):
# NOTE: legacy stores predate explicit BM25 freshness tracking.
# For backward compatibility we keep trusting any existing FTS index
# until this release writes the new metadata field. TODO: switch the
# missing-column case to conservative "stale until rebuilt" behavior
# in a future breaking release.
return _has_legacy_bm25_index(con)
row = con.execute("SELECT bm25_index_is_current FROM metadata").fetchone()
assert row is not None
return bool(row[0])


def _has_legacy_bm25_index(con: duckdb.DuckDBPyConnection) -> bool:
row = con.execute(
"""
SELECT EXISTS (
SELECT 1
FROM duckdb_functions()
WHERE schema_name = 'fts_main_chunks'
AND function_name = 'match_bm25'
)
"""
).fetchone()
assert row is not None
return bool(row[0])


def _set_bm25_index_state(con: duckdb.DuckDBPyConnection, is_current: bool) -> None:
_ensure_bm25_index_state_column(con)
con.execute(
"UPDATE metadata SET bm25_index_is_current = ?",
[is_current],
)


def _validate_required_schema(
con: duckdb.DuckDBPyConnection,
*,
Expand Down
Loading
Loading