Introduce cached, concurrent crawl and ingest APIs#72
Conversation
c43454f to
b7ce077
Compare
b7ce077 to
3e95596
Compare
|
Can you expand on what's expected with eg: from raghilda.chunker import MarkdownChunker
from raghilda.crawl import CrawlScope, WebCrawler
from raghilda.store import DuckDBStore
crawler = WebCrawler(cache_dir=".cache/crawl", max_workers=4)
scope = CrawlScope(
roots=["https://example.com/docs"],
include_patterns=["https://example.com/docs/**"],
depth=2,
)
documents = crawler.markdown_documents(scope)
store = DuckDBStore.create(
location="raghilda.duckdb",
overwrite=True,
embed=None,
)
summary = store.ingest(
documents,
prepare=MarkdownChunker(chunk_size=1000).chunk,
max_workers=4,
)Suppose I want to update my store daily with all new links from 'example.com'. What's the expected behavior? Should I run this exact same code everyday? (excluding |
|
Yes, users could run the same script daily, but with a store To make the max cache age explicit, they would also want to add an explicit arg in the crawler constructor (which also manages the cache): crawler = WebCrawler(
cache_dir=".cache/crawl",
cache_stale_after=timedelta(days=1),
max_workers=4,
) |
| __all__ = [ | ||
| "BaseStore", | ||
| "WriteResult", | ||
| "IngestSummary", |
There was a problem hiding this comment.
I'd return a list of WriteResult probably..
There was a problem hiding this comment.
I started with that, but I went with IngestSummary because ingest() is the batch convenience path, and it is nice for the default return value to print as something compact like “inserted X, replaced Y, skipped Z”.
A full list[WriteResult] also brings a few awkward issues that I did not want to bake into the API. WriteResult contains the fully written document, and sometimes the replaced document too, so for large ingests that can mean holding a lot of data in memory just to return it. It also raises ordering questions under concurrency (input order vs completion order), and makes the contract murkier if the batch fails partway through after some writes have already completed.
That said, I do think the current summary is probably a little too sparse to be very useful. It might be good if IngestSummary also carried origin lists under each category, rather than only the raw counts. That would still keep the return value lightweight and nicely printable, without returning a full list[WriteResult].
| def __init__( | ||
| self, | ||
| *, | ||
| cache_dir: bool | str | Path | None = None, |
There was a problem hiding this comment.
What's the purpose of caching if there's no control over
cache_stale_after
cache_force_refresh
It seems it will always walk trough the entire dir?
There was a problem hiding this comment.
For local directories, we can cheaply enough detect when the cache is invalid through simple file mtime and size checks, that exposing something like ttl doesn't seem to make sense.
Review finding:
[P1] Preserve Windows drive letters when parsing file:// origins — /Users/tomasz/github/posit-dev/raghilda/.worktrees/feature-crawl-api/src/raghilda/crawl.py:1567-1570
On Windows, urlparse('file:///C:/docs/readme.md').path is '/C:/docs/readme.md', so Path(unquote(parsed.path)) resolves to \C:\docs\readme.md instead of C:\docs\readme.md. That means DirectoryCrawler.origins() can emit file:// URLs that fetch_raw() / fetch_markdown() cannot reopen, and CrawlScope(roots=[file_uri]) fails for the same reason because the same parsing is duplicated in _to_directory_path() above. Using a file-URI decoder that preserves the drive/netloc avoids breaking the public file-URI workflow on Windows.
Response:
Addressed. Replace the duplicated urlparse(...).path decoding with a shared stdlib-backed file-URI decoder based on urllib.request.url2pathname so Windows drive letters and UNC netlocs round-trip correctly without custom path-munging logic. Add a Windows-only public API regression test that exercises CrawlScope(roots=[path.as_uri()]), DirectoryCrawler.origins(), and fetch_raw() on the same file URI.
Review finding: [P2] Pass cache_force_refresh through markdown_documents — /Users/tomasz/github/posit-dev/raghilda/.worktrees/feature-crawl-api/src/raghilda/crawl.py:661-664 markdown_documents(..., cache_force_refresh=True) refreshes pages while origins() is discovering them, but this second pass hard-codes cache_force_refresh=False. With WebCrawler or CloudflareCrawler and any finite cache_stale_after (for example timedelta(0)), every origin is immediately revalidated/fetched again, so the caller does not actually get a single refreshed crawl and can observe content from a second request/crawl instead of the snapshot used for discovery. Response: Not addressed as a behavior bug. The second pass intentionally keeps cache_force_refresh=False so markdown_documents(..., cache_force_refresh=True) reuses the snapshot refreshed during origins() instead of fetching the same origin twice in one user call. Add an inline comment at the point of concern to make that contract explicit; the existing test_web_markdown_documents_reuses_refreshed_sources regression test continues to encode the single-fetch behavior.
Review finding: [P2] Respect scope.limit before fetching a full frontier batch — /Users/tomasz/github/posit-dev/raghilda/.worktrees/feature-crawl-api/src/raghilda/crawl.py:920-923 Here the entire current-depth batch is submitted to _map_ordered() before yielded >= limit is checked below, so WebCrawler.origins() still requests every root/frontier page even when the first result already satisfies scope.limit. For example, with two roots and limit=1, the method yields only the first URL but still fetches the second one, which creates unexpected network traffic and rate-limit pressure despite the public limit being reached. Response: Addressed. Fetch each current-depth frontier in windows bounded by the remaining crawl limit, and cap worker usage to each window, so concurrent WebCrawler.origins() calls stop issuing same-depth requests once the public limit has been satisfied. Add a regression test covering two roots with max_workers=2 and limit=1, which now yields only the first origin and makes only one HTTP request.
|
This feels magic to me:
How does this happen in practice? When I interrupt and re-call |
|
That's right. The cache gives cheap resumability for parts of the workflow up to In particular, we're building on the fact that calling |
|
The hanging test with openai vector stores is unrelated to this PR. It currently reproduces on main branch too. I opened an issue upstream: openai/openai-python#3097 |
Review findings: - [P2] Skip the directory cache while walking roots - [P2] Preserve the root host for subdomain traversal - [P2] Do not mark out-of-scope URLs visited globally Response: - Exclude DirectoryCrawler's own cache directory from directory-origin walks. - Keep the original scope host while traversing included subdomains so sibling subdomains under the requested host remain in scope. - Mark web origins visited only after they pass the current scope check, so an out-of-scope occurrence for one root does not suppress a later in-scope occurrence for another root. - Add public API regression tests covering all three cases.
Review findings: - [P2] Track visited URLs per root host - [P2] Use HTTP Content-Type before URL suffix Response: - Track web crawler visited state by the effective root host while still yielding each origin at most once, so a shared page can be traversed again under a broader multi-root subdomain scope. - Prefer recognized HTTP Content-Type values when choosing the cached body suffix, so rendered HTML served from markdown-like URLs is converted as HTML. - Add public API regression tests for both cases.
Review findings: - [P2] Preserve escaped URL path segments - [P2] Avoid refetching freshly crawled sources Response: - Add a web URL normalizer that preserves reserved percent escapes such as %2F, and use it for WebCrawler and CloudflareCrawler origins. - Reuse sources materialized during origins() when markdown_documents() converts discovered origins, so immediately stale cache policies do not trigger a second network request or crawl. - Add public API regression tests for escaped web origins and immediately stale WebCrawler and CloudflareCrawler markdown conversion.
Address crawl review findings around cache safety, URL canonicalization, Cloudflare scope enforcement, and duplicate origin handling. Cover the regressions through public crawler and ingest tests.
Preserve Cloudflare discovery results for root redirects while continuing to filter non-seed out-of-scope records. Add regressions for cross-origin redirected seeds and external-first API results.
Summary
The primary motivation for this PR is to make store creation workflows faster
in practice.
Raghilda can make store creation faster in two ways:
(and make interrupted store creation jobs resumable)
This PR introduces a crawl and ingest API that makes caching and concurrency
available in a principled way, without forcing everything into one opaque
pipeline.
The design centers on a visible seam between crawler and store:
That seam matters because it lets us add caching and concurrency while keeping
the workflow inspectable and composable:
run(..., store=...)pathThe crawl side is centered on:
The ingest side is centered on:
This PR covers both pieces because the crawler/store seam is best described in
terms of the document objects moving through the pipeline.
Proposed Crawler Contract
The main happy path is:
Additional lower-level crawler methods are also part of the API:
origins(scope) -> Iterator[str]fetch_raw(origin, cache_force_refresh=False) -> FetchedSourcefetch_markdown(origin, cache_force_refresh=False) -> MarkdownDocumentBaseCrawlerdefines this contract and is exposed for subclassing.CrawlScopeCrawlScopeowns the traversal policy definition.The split is:
CrawlScopedefines what to crawl for one runFetchedSourceOne shared intermediate across crawler implementations is
FetchedSource.It is the inspectable boundary that abstracts away whether an origin is
resolved via an actual web call or from the local filesystem cache.
Concrete Crawlers
Raghilda exposes:
DirectoryCrawlerWebCrawlerCloudflareCrawlerConstructors stay backend-specific:
Cache-related controls use a
cache_prefix:cache_dir=cache_stale_after=cache_force_refresh=Concurrency
This PR includes concurrency on both sides of the seam, with independent
threadpools used by the crawler and the store writer.
Crawl-Side Concurrency
markdown_documents()can fetch and convert concurrentlyWebCrawler.origins()can fetch a breadth-first frontier concurrently whilepreserving stable output order;
markdown_documents()preserves the sameorder as
origins()origins(),fetch_raw(), andfetch_markdown()Store-Side Concurrency
store.ingest(..., max_workers=..., prepare=...)can perform concurrentwrites while continuing to prepare later documents as the stream is consumed
Caching
This PR adds an internal filesystem cache, but does not introduce a public
cache abstraction yet. It is intentionally a flat directory of files so it
stays easy to inspect with normal file tools.
The cache layout is:
If
cache_dir=True, the cache resolves to a backend-specific directory under:The metadata sidecar is the source of truth for each cache entry.
This cache is also part of the practical interruptibility story. The persistent
crawl cache plus the store's own upsert behavior act as an ad hoc resumability
mechanism: if a script is interrupted and rerun without underlying content
changes, the rerun should stay cheap enough in practice without exposing a
large public state object that can drift out of sync.
Store Seam
The crawler/store seam is:
MarkdownDocumentupsert()writes the final store-specific document shapeThat keeps chunking outside the crawler.
Store expectations differ:
DuckDBStore.upsert()expectsChunkedMarkdownDocumentChromaDBStore.upsert()expectsChunkedMarkdownDocumentPostgreSQLStore.upsert()expectsChunkedMarkdownDocumentOpenAIStore.upsert()expectsMarkdownDocumentIngest API
The convenience path is:
The lower-level path is:
The intended behavior is:
prepareis applied as documents are read from the input iterableValueErrorwhen encounteredThis keeps ingest streaming while still catching nearby duplicate-origin
mistakes.
Example
For
OpenAIStore, there is no chunking step:Non-Goals
This PR does not try to settle:
crawler.cacheobjectcrawler.run(..., store=...)wrapperingest()andupsert()Open Questions
markdown_documents()the right main happy path?CrawlScopethe right name?cache_dir=,cache_stale_after=, andcache_force_refresh=?thicker convenience layer on top later?