feat: materialize Iceberg/R2RML graph sources into native ledgers + tracking worker#1422
Open
christophediprima wants to merge 5 commits into
Open
feat: materialize Iceberg/R2RML graph sources into native ledgers + tracking worker#1422christophediprima wants to merge 5 commits into
christophediprima wants to merge 5 commits into
Conversation
Adds the read half of incremental materialization: - SendScanPlanner::plan_incremental(from, to): the data files ADDED in a (from, to] sequence-number window (live entries whose effective data sequence number > from.sequence_number), reusing the existing manifest walk. - SendScanPlanner::plan_scan_with_selection: full scan at a chosen snapshot. - effective_sequence_number(): Iceberg null/0 seq inheritance from the manifest. - TableMetadata::snapshot_window / window_is_append_only: parent-chain ancestry walk + append-only detection, so the materialize layer can fall back to a full re-read when the window has overwrite/delete/replace (updates/deletes the added-files scan can't see) or from is not an ancestor of to. Unit tests cover seq-number inheritance, the ancestry walk (incl. branch/expired errors), and append-only detection. Row-level delete-file (v2 MoR) support is future work. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Adds, on FlureeR2rmlProvider: - prepare_iceberg_scan(): shared setup (REST/Direct × GCS/S3 × creds × metadata cache). - read_scan_tasks(): bounded-parallel Parquet read of a task set. - current_snapshot_id(): the source table's current snapshot (materialize 'to' point). - scan_table_incremental(from, to): scans only data files ADDED in the snapshot window, via SendScanPlanner::plan_incremental. These back the materialization layer. NOTE: scan_table still inlines the same setup/read (TODO dedup once the incremental path is verified end-to-end). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…tracking worker
Materialize an R2RML/Iceberg graph source into a native Fluree ledger so
native features (BM25, vector/RAG, reasoning) can run over external tables.
- Engine: Fluree::materialize_r2rml_graph_source(source, target, force_full)
reuses the query-path R2RML term materializers (subject-IRI parity),
aggregates one JSON-LD node per subject, and upserts into the target ledger.
- Incremental: scan only files added in the (from, to] snapshot window when it
is append/compaction-only (TableMetadata::window_is_incremental_safe);
overwrite/delete or expired/branched history fall back to a full re-read.
Compaction (replace) stays incremental because Iceberg preserves each row's
data_sequence_number, so the sequence-number window excludes rewritten rows.
- Watermark persisted in the target ledger (string-encoded snapshot id),
written atomically in the same upsert; a no-delta poll commits nothing.
- Tracking worker (MaterializeTrackingWorker): a Send polling task spawned on
non-peer nodes that keeps tracked source->target jobs fresh on an interval.
- Routes: POST /v1/fluree/iceberg/{materialize,track,untrack},
GET /v1/fluree/iceberg/tracking.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…zation
Extend materialization with change-data-capture semantics so a materialized
native ledger tracks source updates and deletes, not just inserts.
- DeleteConvention { column, deleted_values: Vec<Option<String>> } on
IcebergGsConfig: a row is a delete when its column value is in deleted_values;
a null entry matches a NULL column (Debezium null-payload). Threaded through
the create builders + /iceberg/map route; validated at graph-source creation.
- order_by column enables latest-by-key: per subject the highest-ordered row
wins (value-orderable int/date/timestamp only, enforced) with a whole-subject
replace that clears fields dropped in a newer revision; a tombstone retracts
the whole subject via a typed-@id wildcard update. Two ordered commits keep the
watermark advancing only in the upsert (crash/failure self-heals next poll).
- Per-(source,table) watermark with an injective subject; each table is read once
with its own (from,to] window. Dual-mode: with neither delete nor order_by set
the pass is the legacy additive merge (unchanged). Fails loud on a
non-orderable order_by column or multiple triples maps per table.
- term.rs exposes column_string / batch_has_column / column_sort_key /
column_is_orderable. subjects_retracted on MaterializeResult / route / worker.
- Docs: a "Materialization" guide in docs/graph-sources/iceberg.md plus the
materialize/track/untrack/tracking endpoints in docs/api/endpoints.md.
- Tests: latest-by-key ordering + retraction-doc-shape unit tests, and an
end-to-end integration test proving retraction actually deletes (with a control
proving the earlier bare-string form was a silent no-op).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A static bearer for a Google Iceberg REST catalog (BigLake) is a short-lived
OAuth access token: it expires after ~1h and `BearerTokenAuth` cannot renew it,
so a long-running materialization tracking worker starts returning 401s.
Add `AuthConfig::GoogleMetadata` + `GoogleMetadataAuth`, which mints and
auto-refreshes tokens from the GCE/GKE instance metadata server (Workload
Identity), mirroring the existing `OAuth2ClientCredentials` cache+refresh — the
jittered-expiry `CachedToken` is now shared via `auth::token`. Wire it through
`{Iceberg,R2rml}CreateConfig::with_auth_google_metadata` and the `/iceberg/map`
`auth_google_metadata` field.
The GCS reader's storage HMAC keys are unaffected (static, non-expiring). The
metadata server is only reachable on GCE/GKE; local runs keep using a static
`auth_bearer`.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds the ability to materialize an Iceberg / R2RML graph source into a native Fluree ledger and
keep it fresh with a background tracking worker. Querying a graph source directly re-reads and
re-joins the raw Iceberg rows on every request; materializing into a native ledger gives deduped,
indexed, time-travelable state that queries at native speed and refreshes incrementally.
Five commits:
fluree-db-iceberg).fluree-db-api).fluree-db-api,fluree-db-server).Iceberg REST catalog keeps authenticating past the ~1h a static bearer lasts.
Builds on the existing Iceberg graph-source support on
main.Motivation
A graph source exposes an Iceberg table as read-on-query triples. That's ideal for ad-hoc access,
but not for a dataset you query repeatedly:
versions per entity. A graph source surfaces all of them; there's no "current state" of a subject.
?s a X ; p1 ?a ; p2 ?b …) join the rawappend-only rows on each request, which blows up combinatorially and re-does the scan every time.
Materializing solves all three: collapse the append-only rows to one node per subject (upsert),
write them into a native ledger (native indexes, SPARQL, history), and refresh incrementally off
the source's snapshot log. The tracking worker automates the refresh.
Changes
fluree-db-iceberg— incremental scan + snapshot windows (scan/planner.rs,scan/send_planner.rs,metadata/table.rs,config.rs): plan a scan over only the data filesadded between two snapshots (append-only incremental), plus helpers to resolve the current
snapshot and the delta from a watermark. Falls back to a full scan when an incremental window
isn't safe (e.g. a non-append snapshot). + unit tests.
fluree-db-api— R2RML materialization (graph_source/r2rml_materialize.rs): readgraph-source rows (full or incremental) and materialize them into a native ledger.
Latest-by-key dedup collapses append-only rows to one node per subject (upsert); whole-subject
tombstone retraction removes a subject when its key disappears from the source. A per-
(source, table)watermark node (urn:fluree:materialize-state:{source}:{table}, source:escaped sothe encoding is injective) persists the last-materialized snapshot id in the target ledger, so a
refresh resumes incrementally;
force_fullignores it. + unit tests (watermark encoding/injectivity,latest-by-key, retract shape).
fluree-db-api— tracking worker (materialize_worker.rs): a per-job worker(
TrackedJob { source, target, interval, next_due }) driven by a base-granularity ticker; each jobre-syncs on its own poll interval. Public API:
MaterializeTrackingWorker,MaterializeWorkerConfig/Handle/Stats. + unit tests (due-job selection + rescheduling).fluree-db-server— endpoints (routes/iceberg.rs,routes/mod.rs,state.rs): the fourendpoints below.
/trackruns an immediate first sync on registration so the target ispopulated without waiting a cycle;
/trackingis an admin-protected read of this node's workerstate. Peer nodes forward writes to the write node, consistent with the existing iceberg routes.
fluree-db-iceberg— refreshable catalog auth (auth/google_metadata.rs,auth/token.rs,auth/mod.rs): agoogle_metadataAuthConfigvariant that mints and auto-refreshes catalogOAuth tokens from the GCE/GKE metadata server (Workload Identity). A
CachedTokenhelper isextracted from the existing OAuth2 provider and shared, so the new provider is the same small shape
as
oauth2.rs(leanreqwestclient + cache, no new dependencies). Exposed viaauth_google_metadataon the map endpoint. + unit tests (wiremock:Metadata-Flavorheader,scopes, caching, failure surfacing).
fluree-db-api(graph_source/r2rml.rs): incremental-scan wiring onFlureeR2rmlProvider—prepare_iceberg_scan(shared REST/Direct storage + table-metadata setup overS3IcebergStorage),read_scan_tasks,current_snapshot_id, andscan_table_incremental.docs: materialize / track / untrack / tracking in
docs/api/endpoints.md; a materialization +tracking section (incl. append-only dedup and the watermark) in
docs/graph-sources/iceberg.md; theauth_google_metadataoption.New endpoints
/iceberg/materialize{source, target, force_full?}target. Returns{from_snapshot_id, to_snapshot_id, incremental, committed, rows_read, subjects_upserted, subjects_retracted}./iceberg/track{source, target, poll_interval_secs?}source → targetjob, run an immediate first sync, then keeptargetfresh on the interval (default 30s). Returns the effective interval + the first-sync result./iceberg/untrack{source, target}/iceberg/trackingAuth
The tracking worker holds a job open for hours, so catalog auth has to survive token expiry. For a
workload running as a GCP service account (GKE Workload Identity / GCE), set
auth_google_metadata: true: tokens are minted and auto-refreshed from the instance metadata server,so tracked jobs against a Google Iceberg REST catalog (e.g. BigLake) keep authenticating. A static
auth_bearerstill works for one-shot map/query and for local use, but expires after ~1h.Design note: the provider talks to the metadata endpoint directly rather than pulling a full
Application-Default-Credentials crate. That keeps
fluree-db-iceberglean and runtime-agnostic(
AuthConfigstays an explicit, declared config rather than ambient credential discovery), and itmirrors the existing
oauth2_client_credentialsprovider exactly — same struct shape, sameCachedToken, no new dependencies. It's precisely how a GKE workload authenticates. Storage-sideHMAC interop keys are unaffected — they don't expire.
Testing
fluree-rust-dev, workspace pinned toolchain):cargo test -p fluree-db-iceberg --lib→ 148 passed (incremental scan / snapshot windows,google_metadataprovider,CachedToken).cargo test -p fluree-db-api --features iceberg --lib→ 669 passed (watermarkencoding/injectivity, latest-by-key dedup, worker due-job scheduling/rescheduling).
cargo test -p fluree-db-api --features iceberg --test grp_graphsource→ 103 passed, incl.it_materialize_retract— a regression test that whole-subject retraction binds?sas a typed@id(a bare-string binding parses to a literal that joins nothing and silently retracts zerorows) and asserts the triples are actually gone end-to-end.
cargo fmt --all --checkclean;cargo clippy --all-targetsclean on the touched crates,with the
icebergfeature both off and on (the materialize code is#[cfg(feature = "iceberg")],so it must be linted with the feature enabled).
R2RML mappings, tracked several tables into a single native ledger via
/iceberg/track, confirmed theimmediate first sync + an incremental re-sync on the worker (only added files re-read), and ran SPARQL
over the deduped materialized ledger — one node per subject, expected counts.
Known limitations / follow-ups
re-registered); persisting the
ApplyRecordis noted as a follow-up in the routes. The materializeddata and its watermark are durable (they live in the target ledger), so a re-registered job
resumes incrementally rather than re-reading everything.
read; combined with latest-by-key upserts this is still correct (idempotent), just less cheap.
google_metadatarequires GCE/GKE (the metadata server isn't reachable elsewhere); locally, usea static
auth_bearer.Breaking changes
None. Everything is additive: four new endpoints, a new
AuthConfigvariant, and additive requestfields (
poll_interval_secs,auth_google_metadata). All new server/API code is behind the existingicebergfeature; nothing changes for builds without it.