Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Changelog

## Unreleased

### Added

- Added `raghilda.crawl`, including `CrawlScope`, `FetchedSource`,
`DirectoryCrawler`, `WebCrawler`, and `CloudflareCrawler`, for discovering
directory, web, and Cloudflare sources and converting them to markdown
documents.
- Added `BaseStore.ingest()` and `IngestSummary` for bulk document ingestion
with optional document preparation, parallel writes, and inserted, replaced,
and skipped counts.

### Fixed

- Fixed sitemap URL extraction so each `<loc>` entry is collected as one URL.
10 changes: 10 additions & 0 deletions great-docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ reference:
- name: store.OpenAIStore
- name: store.PostgreSQLStore

- title: Crawl
desc: Crawlers for discovering and converting source documents
contents:
- crawl.CrawlScope
- crawl.FetchedSource
- crawl.BaseCrawler
- crawl.DirectoryCrawler
- crawl.WebCrawler
- crawl.CloudflareCrawler

- title: Embedding
desc: Embedding providers for generating vector representations
contents:
Expand Down
3 changes: 2 additions & 1 deletion src/raghilda/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from . import embedding, store, types, chunk, chunker, document, read, scrape
from . import crawl, embedding, store, types, chunk, chunker, document, read, scrape

__all__ = [
"crawl",
"embedding",
"store",
"types",
Expand Down
9 changes: 9 additions & 0 deletions src/raghilda/_postgres_store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ._store import BaseStore, WriteResult
import json
import threading
from .embedding import EmbeddingProvider, EmbedInputType, embedding_from_config
from .document import Document, ChunkedMarkdownDocument
from .chunk import Chunk, MarkdownChunk, RetrievedChunk, Metric
Expand Down Expand Up @@ -137,6 +138,7 @@ def __init__(
self.con = con
self._metadata = metadata
self._schema = psycopg2.extensions.quote_ident(schema, con)
self._ingest_upsert_lock = threading.Lock()

def close(self) -> None:
"""Close the store's database connection."""
Expand Down Expand Up @@ -539,6 +541,13 @@ def upsert(
replaced_document=replaced_document,
)

def _ingest_upsert(
self,
document: Document,
) -> WriteResult[ChunkedMarkdownDocument]:
with self._ingest_upsert_lock:
return self.upsert(document)

def _load_document_snapshot(
self, *, origin: str, text: str
) -> ChunkedMarkdownDocument:
Expand Down
127 changes: 126 additions & 1 deletion src/raghilda/_store.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from concurrent.futures import FIRST_COMPLETED, CancelledError, ThreadPoolExecutor, wait
from dataclasses import dataclass
from typing import Generic, Literal, Sequence, TypeVar
import threading
from typing import Any, Callable, Generic, Iterable, Literal, Sequence, TypeVar

from .chunk import RetrievedChunk
from .document import Document

TDocument = TypeVar("TDocument", bound=Document, covariant=True)
_RECENT_INGEST_ORIGIN_WINDOW = 10_000


@dataclass(frozen=True)
Expand All @@ -17,6 +20,13 @@ class WriteResult(Generic[TDocument]):
replaced_document: TDocument | None = None


@dataclass(frozen=True)
class IngestSummary:
inserted: int
replaced: int
skipped: int


class BaseStore(ABC):
"""Abstract base class for vector stores.

Expand Down Expand Up @@ -77,6 +87,121 @@ def upsert(
"""
pass

def _ingest_upsert(self, document: Document) -> WriteResult[Document]:
return self.upsert(document)

def ingest(
self,
documents: Iterable[Any],
*,
prepare: Callable[[Any], Document] | None = None,
max_workers: int = 1,
) -> IngestSummary:
"""Prepare and upsert a stream of documents.

Inputs are consumed lazily and submitted incrementally. After
``prepare`` is applied, recent non-empty string origins are checked for
duplicates as the stream is consumed. Duplicate detection is best
effort: a duplicate raises ``ValueError`` when encountered, after any
writes already in flight complete. No rollback is attempted.

Returns
-------
IngestSummary
Aggregate counts for inserted, replaced, and skipped documents.
Call ``upsert()`` directly when per-document ``WriteResult`` values
are needed.
"""
assert max_workers >= 1
stop_event = threading.Event()
recent_origins: dict[str, None] = {}
recent_origins_lock = threading.Lock()

def remember_origin(origin: str | None) -> None:
if not isinstance(origin, str) or not origin:
return
with recent_origins_lock:
if origin in recent_origins:
raise ValueError(f"Duplicate origin during ingest: {origin}")
recent_origins[origin] = None
if len(recent_origins) > _RECENT_INGEST_ORIGIN_WINDOW:
# dict preserves insertion order, so the first key is the oldest.
recent_origins.pop(next(iter(recent_origins)))

def process_document(item: Any) -> WriteResult[Document]:
if stop_event.is_set():
raise CancelledError()
document = prepare(item) if prepare is not None else item
if stop_event.is_set():
raise CancelledError()
remember_origin(document.origin)
if stop_event.is_set():
raise CancelledError()
return self._ingest_upsert(document)

iterator = iter(documents)
pending = set()
inserted = 0
replaced = 0
skipped = 0
exhausted = False
executor = ThreadPoolExecutor(max_workers=max_workers)
try:
while not exhausted and len(pending) < max_workers:
try:
document = next(iterator)
except StopIteration:
exhausted = True
continue
pending.add(executor.submit(process_document, document))

while pending:
done, pending = wait(pending, return_when=FIRST_COMPLETED)
results = []
cancelled_errors = []
errors = []
for future in done:
try:
results.append(future.result())
except CancelledError as exc:
cancelled_errors.append(exc)
except Exception as exc:
errors.append(exc)
if errors:
raise errors[0]
if cancelled_errors and not stop_event.is_set():
raise cancelled_errors[0]
for result in results:
if result.action == "inserted":
inserted += 1
elif result.action == "replaced":
replaced += 1
elif result.action == "skipped":
skipped += 1
else:
raise ValueError(f"Unknown write action: {result.action}")

while not exhausted and len(pending) < max_workers:
try:
document = next(iterator)
except StopIteration:
exhausted = True
continue
pending.add(executor.submit(process_document, document))
except Exception:
stop_event.set()
for future in pending:
future.cancel()
executor.shutdown(wait=True, cancel_futures=True)
raise

executor.shutdown(wait=True, cancel_futures=False)
return IngestSummary(
inserted=inserted,
replaced=replaced,
skipped=skipped,
)

@abstractmethod
def retrieve(
self, text: str, top_k: int, *args, **kwargs
Expand Down
Loading
Loading