Skip to content

Introduce cached, concurrent crawl and ingest APIs#72

Merged
t-kalinowski merged 17 commits into
mainfrom
feature/crawl-api
Jun 8, 2026
Merged

Introduce cached, concurrent crawl and ingest APIs#72
t-kalinowski merged 17 commits into
mainfrom
feature/crawl-api

Conversation

@t-kalinowski

@t-kalinowski t-kalinowski commented Apr 15, 2026

Copy link
Copy Markdown
Member

Summary

The primary motivation for this PR is to make store creation workflows faster
in practice.

Raghilda can make store creation faster in two ways:

  • local caching to avoid repeated fetches and repeated conversion work across runs
    (and make interrupted store creation jobs resumable)
  • concurrency to do more crawl and ingest work in parallel within a run

This PR introduces a crawl and ingest API that makes caching and concurrency
available in a principled way, without forcing everything into one opaque
pipeline.

The design centers on a visible seam between crawler and store:

crawler.markdown_documents(scope) -> Iterator[MarkdownDocument]
store.ingest(documents, prepare=None, max_workers=1) -> IngestSummary

That seam matters because it lets us add caching and concurrency while keeping
the workflow inspectable and composable:

  • callers can inspect discovered origins
  • callers can fetch or convert one origin directly
  • callers can chunk outside the crawler
  • callers can rerun ingest policy without re-fetching content
  • callers are not forced into one monolithic, opaque, run(..., store=...) path

The crawl side is centered on:

crawler.markdown_documents(scope) -> Iterator[MarkdownDocument]

The ingest side is centered on:

store.ingest(documents, prepare=None, max_workers=1) -> IngestSummary
store.upsert(document) -> WriteResult

This PR covers both pieces because the crawler/store seam is best described in
terms of the document objects moving through the pipeline.

Proposed Crawler Contract

The main happy path is:

crawler.markdown_documents(scope) -> Iterator[MarkdownDocument]

Additional lower-level crawler methods are also part of the API:

  • origins(scope) -> Iterator[str]
  • fetch_raw(origin, cache_force_refresh=False) -> FetchedSource
  • fetch_markdown(origin, cache_force_refresh=False) -> MarkdownDocument

BaseCrawler defines this contract and is exposed for subclassing.

CrawlScope

CrawlScope owns the traversal policy definition.

@dataclass(frozen=True)
class CrawlScope:
    roots: str | Path | Sequence[str | Path]
    include_patterns: Sequence[str] | None = None
    exclude_patterns: Sequence[str] | None = None
    depth: int | None = None
    limit: int | None = None
    include_types: Sequence[str] | None = None
    exclude_types: Sequence[str] | None = None
    include_external_links: bool = False
    include_subdomains: bool = False

The split is:

  • CrawlScope defines what to crawl for one run
  • the crawler constructor configures backend-specific behavior

FetchedSource

One shared intermediate across crawler implementations is FetchedSource.

@dataclass(frozen=True)
class FetchedSource:
    origin: str
    body_path: Path
    resolved_origin: str | None = None
    content_type: str | None = None
    status_code: int | None = None
    metadata: dict[str, Any] | None = None
    fetched_at: datetime | None = None
    revalidated_at: datetime | None = None
    markdown_path: Path | None = None

It is the inspectable boundary that abstracts away whether an origin is
resolved via an actual web call or from the local filesystem cache.

Concrete Crawlers

Raghilda exposes:

  • DirectoryCrawler
  • WebCrawler
  • CloudflareCrawler

Constructors stay backend-specific:

DirectoryCrawler(
    *,
    cache_dir: bool | str | Path | None = None,
    max_workers: int = 1,
)

WebCrawler(
    *,
    session: requests.Session | None = None,
    cache_dir: bool | str | Path | None = None,
    cache_stale_after: timedelta | None = None,
    max_workers: int = 1,
)

CloudflareCrawler(
    *,
    account_id: str,
    api_token: str,
    cache_dir: bool | str | Path | None = None,
    session: requests.Session | Any | None = None,
    source: str = "all",
    render: bool = True,
    cache_stale_after: timedelta | None = None,
    modified_since: int | None = None,
    poll_interval: float = 5.0,
    max_poll_attempts: int = 60,
    max_workers: int = 1,
    base_url: str = "https://api.cloudflare.com/client/v4",
)

Cache-related controls use a cache_ prefix:

  • cache_dir=
  • cache_stale_after=
  • cache_force_refresh=

Concurrency

This PR includes concurrency on both sides of the seam, with independent
threadpools used by the crawler and the store writer.

Crawl-Side Concurrency

  • markdown_documents() can fetch and convert concurrently
  • WebCrawler.origins() can fetch a breadth-first frontier concurrently while
    preserving stable output order; markdown_documents() preserves the same
    order as origins()
  • callers still have direct access to origins(), fetch_raw(), and
    fetch_markdown()

Store-Side Concurrency

  • store.ingest(..., max_workers=..., prepare=...) can perform concurrent
    writes while continuing to prepare later documents as the stream is consumed

Caching

This PR adds an internal filesystem cache, but does not introduce a public
cache abstraction yet. It is intentionally a flat directory of files so it
stays easy to inspect with normal file tools.

The cache layout is:

<root>/<sanitized-key>--<hash>.metadata.json
<root>/<sanitized-key>--<hash><ext>

If cache_dir=True, the cache resolves to a backend-specific directory under:

.raghilda/cache/directory
.raghilda/cache/web
.raghilda/cache/cloudflare

The metadata sidecar is the source of truth for each cache entry.

This cache is also part of the practical interruptibility story. The persistent
crawl cache plus the store's own upsert behavior act as an ad hoc resumability
mechanism: if a script is interrupted and rerun without underlying content
changes, the rerun should stay cheap enough in practice without exposing a
large public state object that can drift out of sync.

Store Seam

The crawler/store seam is:

  • crawler methods produce MarkdownDocument
  • optional preparation converts documents into the shape a store wants
  • upsert() writes the final store-specific document shape

That keeps chunking outside the crawler.

Store expectations differ:

  • DuckDBStore.upsert() expects ChunkedMarkdownDocument
  • ChromaDBStore.upsert() expects ChunkedMarkdownDocument
  • PostgreSQLStore.upsert() expects ChunkedMarkdownDocument
  • OpenAIStore.upsert() expects MarkdownDocument

Ingest API

The convenience path is:

store.ingest(documents, prepare=None, max_workers=1) -> IngestSummary

The lower-level path is:

store.upsert(document) -> WriteResult

The intended behavior is:

  • inputs are consumed lazily
  • prepare is applied as documents are read from the input iterable
  • writes are submitted incrementally to a threadpool
  • duplicate-origin detection is best effort over a bounded recent window
  • a duplicate raises ValueError when encountered
  • no rollback is attempted
  • already running writes are allowed to complete before the error is surfaced
  • rerunning after interruption should still be cheap in practice when crawl cache hits and store upserts can avoid unnecessary work

This keeps ingest streaming while still catching nearby duplicate-origin
mistakes.

Example

from raghilda.chunker import MarkdownChunker
from raghilda.crawl import CrawlScope, WebCrawler
from raghilda.store import DuckDBStore

crawler = WebCrawler(cache_dir=".cache/crawl", max_workers=4)

scope = CrawlScope(
    roots=["https://example.com/docs"],
    include_patterns=["https://example.com/docs/**"],
    depth=2,
)

documents = crawler.markdown_documents(scope)

store = DuckDBStore.create(
    location="raghilda.duckdb",
    overwrite=True,
    embed=None,
)

summary = store.ingest(
    documents,
    prepare=MarkdownChunker(chunk_size=1000).chunk,
    max_workers=4,
)

For OpenAIStore, there is no chunking step:

documents = crawler.markdown_documents(scope)
store.ingest(documents)

Non-Goals

This PR does not try to settle:

  • a public cache abstraction
  • a public crawler.cache object
  • a top-level crawler.run(..., store=...) wrapper
  • the full long-term ingest API beyond ingest() and upsert()
  • a generic crawler plugin system

Open Questions

  • Is markdown_documents() the right main happy path?
  • Is CrawlScope the right name?
  • How much cache control should be public now beyond cache_dir=,
    cache_stale_after=, and cache_force_refresh=?
  • Should converted-document caching stay internal?
  • Is the current crawler/store seam sufficient, or do we need a thinner or
    thicker convenience layer on top later?

@t-kalinowski t-kalinowski force-pushed the feature/crawl-api branch 2 times, most recently from c43454f to b7ce077 Compare April 15, 2026 18:46
@dfalbel

dfalbel commented Apr 15, 2026

Copy link
Copy Markdown
Contributor

Can you expand on what's expected with eg:

from raghilda.chunker import MarkdownChunker
from raghilda.crawl import CrawlScope, WebCrawler
from raghilda.store import DuckDBStore

crawler = WebCrawler(cache_dir=".cache/crawl", max_workers=4)

scope = CrawlScope(
    roots=["https://example.com/docs"],
    include_patterns=["https://example.com/docs/**"],
    depth=2,
)

documents = crawler.markdown_documents(scope)

store = DuckDBStore.create(
    location="raghilda.duckdb",
    overwrite=True,
    embed=None,
)

summary = store.ingest(
    documents,
    prepare=MarkdownChunker(chunk_size=1000).chunk,
    max_workers=4,
)

Suppose I want to update my store daily with all new links from 'example.com'. What's the expected behavior? Should I run this exact same code everyday? (excluding DuckDBStore.create, which would become connect )

@t-kalinowski

t-kalinowski commented Apr 15, 2026

Copy link
Copy Markdown
Member Author

Yes, users could run the same script daily, but with a store connect() instead of a create() call, to update an existing store.

To make the max cache age explicit, they would also want to add an explicit arg in the crawler constructor (which also manages the cache):

crawler = WebCrawler(
    cache_dir=".cache/crawl", 
    cache_stale_after=timedelta(days=1),
    max_workers=4,
)

Comment thread src/raghilda/store.py
__all__ = [
"BaseStore",
"WriteResult",
"IngestSummary",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd return a list of WriteResult probably..

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started with that, but I went with IngestSummary because ingest() is the batch convenience path, and it is nice for the default return value to print as something compact like “inserted X, replaced Y, skipped Z”.

A full list[WriteResult] also brings a few awkward issues that I did not want to bake into the API. WriteResult contains the fully written document, and sometimes the replaced document too, so for large ingests that can mean holding a lot of data in memory just to return it. It also raises ordering questions under concurrency (input order vs completion order), and makes the contract murkier if the batch fails partway through after some writes have already completed.

That said, I do think the current summary is probably a little too sparse to be very useful. It might be good if IngestSummary also carried origin lists under each category, rather than only the raw counts. That would still keep the return value lightweight and nicely printable, without returning a full list[WriteResult].

Comment thread src/raghilda/crawl.py
def __init__(
self,
*,
cache_dir: bool | str | Path | None = None,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of caching if there's no control over

cache_stale_after
cache_force_refresh

It seems it will always walk trough the entire dir?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For local directories, we can cheaply enough detect when the cache is invalid through simple file mtime and size checks, that exposing something like ttl doesn't seem to make sense.

Review finding:
[P1] Preserve Windows drive letters when parsing file:// origins — /Users/tomasz/github/posit-dev/raghilda/.worktrees/feature-crawl-api/src/raghilda/crawl.py:1567-1570
On Windows, urlparse('file:///C:/docs/readme.md').path is '/C:/docs/readme.md', so Path(unquote(parsed.path)) resolves to \C:\docs\readme.md instead of C:\docs\readme.md. That means DirectoryCrawler.origins() can emit file:// URLs that fetch_raw() / fetch_markdown() cannot reopen, and CrawlScope(roots=[file_uri]) fails for the same reason because the same parsing is duplicated in _to_directory_path() above. Using a file-URI decoder that preserves the drive/netloc avoids breaking the public file-URI workflow on Windows.

Response:
Addressed. Replace the duplicated urlparse(...).path decoding with a shared stdlib-backed file-URI decoder based on urllib.request.url2pathname so Windows drive letters and UNC netlocs round-trip correctly without custom path-munging logic. Add a Windows-only public API regression test that exercises CrawlScope(roots=[path.as_uri()]), DirectoryCrawler.origins(), and fetch_raw() on the same file URI.
Review finding:
[P2] Pass cache_force_refresh through markdown_documents — /Users/tomasz/github/posit-dev/raghilda/.worktrees/feature-crawl-api/src/raghilda/crawl.py:661-664
markdown_documents(..., cache_force_refresh=True) refreshes pages while origins() is discovering them, but this second pass hard-codes cache_force_refresh=False. With WebCrawler or CloudflareCrawler and any finite cache_stale_after (for example timedelta(0)), every origin is immediately revalidated/fetched again, so the caller does not actually get a single refreshed crawl and can observe content from a second request/crawl instead of the snapshot used for discovery.

Response:
Not addressed as a behavior bug. The second pass intentionally keeps cache_force_refresh=False so markdown_documents(..., cache_force_refresh=True) reuses the snapshot refreshed during origins() instead of fetching the same origin twice in one user call. Add an inline comment at the point of concern to make that contract explicit; the existing test_web_markdown_documents_reuses_refreshed_sources regression test continues to encode the single-fetch behavior.
Review finding:
[P2] Respect scope.limit before fetching a full frontier batch — /Users/tomasz/github/posit-dev/raghilda/.worktrees/feature-crawl-api/src/raghilda/crawl.py:920-923
Here the entire current-depth batch is submitted to _map_ordered() before yielded >= limit is checked below, so WebCrawler.origins() still requests every root/frontier page even when the first result already satisfies scope.limit. For example, with two roots and limit=1, the method yields only the first URL but still fetches the second one, which creates unexpected network traffic and rate-limit pressure despite the public limit being reached.

Response:
Addressed. Fetch each current-depth frontier in windows bounded by the remaining crawl limit, and cap worker usage to each window, so concurrent WebCrawler.origins() calls stop issuing same-depth requests once the public limit has been satisfied. Add a regression test covering two roots with max_workers=2 and limit=1, which now yields only the first origin and makes only one HTTP request.
@dfalbel

dfalbel commented Apr 15, 2026

Copy link
Copy Markdown
Contributor

This feels magic to me:

This cache is also part of the practical interruptibility story. The persistent
crawl cache plus the store's own upsert behavior act as an ad hoc resumability
mechanism: if a script is interrupted and rerun without underlying content
changes, the rerun should stay cheap enough in practice without exposing a
large public state object that can drift out of sync.

How does this happen in practice? When I interrupt and re-call crawler.markdow_documents() with the same scope, will it return documents previously inserted? From a quick look at the implementation this seems to be the case. Then we try re-inserting the documents that were already inserted? The crawler only protect us from reaching the API/remote source again right?

@t-kalinowski

t-kalinowski commented Apr 15, 2026

Copy link
Copy Markdown
Member Author

That's right. The cache gives cheap resumability for parts of the workflow up to MarkdownDocument (i.e., lets us skip url requests and conversion of responses to markdown). The underlying assumption is that later parts of the pipeline are already cheap enough that it's fine to just rerun the full workflow.

In particular, we're building on the fact that calling upsert(<document>) on an already inserted identical document avoids re-computing embeddings or modifying the store.
That also assumes that embedding generation is the "expensive" part, and chunking or fetching is not.

@t-kalinowski

Copy link
Copy Markdown
Member Author

The hanging test with openai vector stores is unrelated to this PR. It currently reproduces on main branch too. I opened an issue upstream: openai/openai-python#3097

Review findings:
- [P2] Skip the directory cache while walking roots
- [P2] Preserve the root host for subdomain traversal
- [P2] Do not mark out-of-scope URLs visited globally

Response:
- Exclude DirectoryCrawler's own cache directory from directory-origin walks.
- Keep the original scope host while traversing included subdomains so sibling
  subdomains under the requested host remain in scope.
- Mark web origins visited only after they pass the current scope check, so an
  out-of-scope occurrence for one root does not suppress a later in-scope
  occurrence for another root.
- Add public API regression tests covering all three cases.
Review findings:
- [P2] Track visited URLs per root host
- [P2] Use HTTP Content-Type before URL suffix

Response:
- Track web crawler visited state by the effective root host while still
  yielding each origin at most once, so a shared page can be traversed again
  under a broader multi-root subdomain scope.
- Prefer recognized HTTP Content-Type values when choosing the cached body
  suffix, so rendered HTML served from markdown-like URLs is converted as HTML.
- Add public API regression tests for both cases.
Review findings:
- [P2] Preserve escaped URL path segments
- [P2] Avoid refetching freshly crawled sources

Response:
- Add a web URL normalizer that preserves reserved percent escapes such as
  %2F, and use it for WebCrawler and CloudflareCrawler origins.
- Reuse sources materialized during origins() when markdown_documents()
  converts discovered origins, so immediately stale cache policies do not
  trigger a second network request or crawl.
- Add public API regression tests for escaped web origins and immediately
  stale WebCrawler and CloudflareCrawler markdown conversion.
Address crawl review findings around cache safety, URL canonicalization, Cloudflare scope enforcement, and duplicate origin handling.

Cover the regressions through public crawler and ingest tests.
Preserve Cloudflare discovery results for root redirects while continuing to filter non-seed out-of-scope records.

Add regressions for cross-origin redirected seeds and external-first API results.
@t-kalinowski t-kalinowski merged commit 3b576ca into main Jun 8, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants