Skip to content

feat: materialize Iceberg/R2RML graph sources into native ledgers + tracking worker#1422

Open
christophediprima wants to merge 5 commits into
fluree:mainfrom
christophediprima:feature/iceberg-materialize
Open

feat: materialize Iceberg/R2RML graph sources into native ledgers + tracking worker#1422
christophediprima wants to merge 5 commits into
fluree:mainfrom
christophediprima:feature/iceberg-materialize

Conversation

@christophediprima

Copy link
Copy Markdown
Contributor

Summary

Adds the ability to materialize an Iceberg / R2RML graph source into a native Fluree ledger and
keep it fresh with a background tracking worker. Querying a graph source directly re-reads and
re-joins the raw Iceberg rows on every request; materializing into a native ledger gives deduped,
indexed, time-travelable state that queries at native speed and refreshes incrementally.

Five commits:

  1. Append-only incremental scan + snapshot-window helpers (fluree-db-iceberg).
  2. R2RML incremental scan + snapshot helpers (fluree-db-api).
  3. Materialize into native ledgers + tracking worker + endpoints (fluree-db-api, fluree-db-server).
  4. Tombstone deletion + latest-by-key dedup for materialization.
  5. Refreshable Google metadata-server catalog auth — so long-running tracking against a Google
    Iceberg REST catalog keeps authenticating past the ~1h a static bearer lasts.

Builds on the existing Iceberg graph-source support on main.

Motivation

A graph source exposes an Iceberg table as read-on-query triples. That's ideal for ad-hoc access,
but not for a dataset you query repeatedly:

  • No dedup. Iceberg tables written by streaming/CDC pipelines are append-only — many row
    versions per entity. A graph source surfaces all of them; there's no "current state" of a subject.
  • Re-read + re-join every query. Multi-pattern queries (?s a X ; p1 ?a ; p2 ?b …) join the raw
    append-only rows on each request, which blows up combinatorially and re-does the scan every time.
  • No native indexes / time travel over the sourced data.

Materializing solves all three: collapse the append-only rows to one node per subject (upsert),
write them into a native ledger (native indexes, SPARQL, history), and refresh incrementally off
the source's snapshot log. The tracking worker automates the refresh.

Changes

  • fluree-db-iceberg — incremental scan + snapshot windows (scan/planner.rs,
    scan/send_planner.rs, metadata/table.rs, config.rs): plan a scan over only the data files
    added between two snapshots (append-only incremental), plus helpers to resolve the current
    snapshot and the delta from a watermark. Falls back to a full scan when an incremental window
    isn't safe (e.g. a non-append snapshot). + unit tests.

  • fluree-db-api — R2RML materialization (graph_source/r2rml_materialize.rs): read
    graph-source rows (full or incremental) and materialize them into a native ledger.
    Latest-by-key dedup collapses append-only rows to one node per subject (upsert); whole-subject
    tombstone retraction
    removes a subject when its key disappears from the source. A per-(source, table) watermark node (urn:fluree:materialize-state:{source}:{table}, source : escaped so
    the encoding is injective) persists the last-materialized snapshot id in the target ledger, so a
    refresh resumes incrementally; force_full ignores it. + unit tests (watermark encoding/injectivity,
    latest-by-key, retract shape).

  • fluree-db-api — tracking worker (materialize_worker.rs): a per-job worker
    (TrackedJob { source, target, interval, next_due }) driven by a base-granularity ticker; each job
    re-syncs on its own poll interval. Public API: MaterializeTrackingWorker,
    MaterializeWorkerConfig / Handle / Stats. + unit tests (due-job selection + rescheduling).

  • fluree-db-server — endpoints (routes/iceberg.rs, routes/mod.rs, state.rs): the four
    endpoints below. /track runs an immediate first sync on registration so the target is
    populated without waiting a cycle; /tracking is an admin-protected read of this node's worker
    state. Peer nodes forward writes to the write node, consistent with the existing iceberg routes.

  • fluree-db-iceberg — refreshable catalog auth (auth/google_metadata.rs, auth/token.rs,
    auth/mod.rs): a google_metadata AuthConfig variant that mints and auto-refreshes catalog
    OAuth tokens from the GCE/GKE metadata server (Workload Identity). A CachedToken helper is
    extracted from the existing OAuth2 provider and shared, so the new provider is the same small shape
    as oauth2.rs (lean reqwest client + cache, no new dependencies). Exposed via
    auth_google_metadata on the map endpoint. + unit tests (wiremock: Metadata-Flavor header,
    scopes, caching, failure surfacing).

  • fluree-db-api (graph_source/r2rml.rs): incremental-scan wiring on FlureeR2rmlProvider
    prepare_iceberg_scan (shared REST/Direct storage + table-metadata setup over S3IcebergStorage),
    read_scan_tasks, current_snapshot_id, and scan_table_incremental.

  • docs: materialize / track / untrack / tracking in docs/api/endpoints.md; a materialization +
    tracking section (incl. append-only dedup and the watermark) in docs/graph-sources/iceberg.md; the
    auth_google_metadata option.

New endpoints

Endpoint Method Body Purpose
/iceberg/materialize POST {source, target, force_full?} One-shot: read the source and upsert it into target. Returns {from_snapshot_id, to_snapshot_id, incremental, committed, rows_read, subjects_upserted, subjects_retracted}.
/iceberg/track POST {source, target, poll_interval_secs?} Register a source → target job, run an immediate first sync, then keep target fresh on the interval (default 30s). Returns the effective interval + the first-sync result.
/iceberg/untrack POST {source, target} Stop tracking (leaves already-materialized data in place).
/iceberg/tracking GET Worker status for this node (admin-protected read).

Auth

The tracking worker holds a job open for hours, so catalog auth has to survive token expiry. For a
workload running as a GCP service account (GKE Workload Identity / GCE), set
auth_google_metadata: true: tokens are minted and auto-refreshed from the instance metadata server,
so tracked jobs against a Google Iceberg REST catalog (e.g. BigLake) keep authenticating. A static
auth_bearer still works for one-shot map/query and for local use, but expires after ~1h.

Design note: the provider talks to the metadata endpoint directly rather than pulling a full
Application-Default-Credentials crate. That keeps fluree-db-iceberg lean and runtime-agnostic
(AuthConfig stays an explicit, declared config rather than ambient credential discovery), and it
mirrors the existing oauth2_client_credentials provider exactly — same struct shape, same
CachedToken, no new dependencies. It's precisely how a GKE workload authenticates. Storage-side
HMAC interop keys are unaffected — they don't expire.

Testing

  • Unit + integration (Docker fluree-rust-dev, workspace pinned toolchain):
    • cargo test -p fluree-db-iceberg --lib148 passed (incremental scan / snapshot windows,
      google_metadata provider, CachedToken).
    • cargo test -p fluree-db-api --features iceberg --lib669 passed (watermark
      encoding/injectivity, latest-by-key dedup, worker due-job scheduling/rescheduling).
    • cargo test -p fluree-db-api --features iceberg --test grp_graphsource103 passed, incl.
      it_materialize_retract — a regression test that whole-subject retraction binds ?s as a typed
      @id (a bare-string binding parses to a literal that joins nothing and silently retracts zero
      rows) and asserts the triples are actually gone end-to-end.
  • cargo fmt --all --check clean; cargo clippy --all-targets clean on the touched crates,
    with the iceberg feature both off and on (the materialize code is #[cfg(feature = "iceberg")],
    so it must be linted with the feature enabled).
  • Manual end-to-end against a real Iceberg dataset behind a Google Iceberg REST catalog: registered
    R2RML mappings, tracked several tables into a single native ledger via /iceberg/track, confirmed the
    immediate first sync + an incremental re-sync on the worker (only added files re-read), and ran SPARQL
    over the deduped materialized ledger — one node per subject, expected counts.

Known limitations / follow-ups

  • Worker state is in-memory per node. Tracked jobs are not persisted across restarts (they must be
    re-registered); persisting the ApplyRecord is noted as a follow-up in the routes. The materialized
    data and its watermark are durable (they live in the target ledger), so a re-registered job
    resumes incrementally rather than re-reading everything.
  • Incremental scan is append-only. Snapshots that delete/rewrite files fall back to a full-window
    read; combined with latest-by-key upserts this is still correct (idempotent), just less cheap.
  • google_metadata requires GCE/GKE (the metadata server isn't reachable elsewhere); locally, use
    a static auth_bearer.

Breaking changes

None. Everything is additive: four new endpoints, a new AuthConfig variant, and additive request
fields (poll_interval_secs, auth_google_metadata). All new server/API code is behind the existing
iceberg feature; nothing changes for builds without it.

christophediprima and others added 5 commits July 2, 2026 16:15
Adds the read half of incremental materialization:
- SendScanPlanner::plan_incremental(from, to): the data files ADDED in a
  (from, to] sequence-number window (live entries whose effective data
  sequence number > from.sequence_number), reusing the existing manifest walk.
- SendScanPlanner::plan_scan_with_selection: full scan at a chosen snapshot.
- effective_sequence_number(): Iceberg null/0 seq inheritance from the manifest.
- TableMetadata::snapshot_window / window_is_append_only: parent-chain ancestry
  walk + append-only detection, so the materialize layer can fall back to a full
  re-read when the window has overwrite/delete/replace (updates/deletes the
  added-files scan can't see) or from is not an ancestor of to.

Unit tests cover seq-number inheritance, the ancestry walk (incl. branch/expired
errors), and append-only detection. Row-level delete-file (v2 MoR) support is
future work.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Adds, on FlureeR2rmlProvider:
- prepare_iceberg_scan(): shared setup (REST/Direct × GCS/S3 × creds × metadata cache).
- read_scan_tasks(): bounded-parallel Parquet read of a task set.
- current_snapshot_id(): the source table's current snapshot (materialize 'to' point).
- scan_table_incremental(from, to): scans only data files ADDED in the snapshot
  window, via SendScanPlanner::plan_incremental.

These back the materialization layer. NOTE: scan_table still inlines the same
setup/read (TODO dedup once the incremental path is verified end-to-end).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…tracking worker

Materialize an R2RML/Iceberg graph source into a native Fluree ledger so
native features (BM25, vector/RAG, reasoning) can run over external tables.

- Engine: Fluree::materialize_r2rml_graph_source(source, target, force_full)
  reuses the query-path R2RML term materializers (subject-IRI parity),
  aggregates one JSON-LD node per subject, and upserts into the target ledger.
- Incremental: scan only files added in the (from, to] snapshot window when it
  is append/compaction-only (TableMetadata::window_is_incremental_safe);
  overwrite/delete or expired/branched history fall back to a full re-read.
  Compaction (replace) stays incremental because Iceberg preserves each row's
  data_sequence_number, so the sequence-number window excludes rewritten rows.
- Watermark persisted in the target ledger (string-encoded snapshot id),
  written atomically in the same upsert; a no-delta poll commits nothing.
- Tracking worker (MaterializeTrackingWorker): a Send polling task spawned on
  non-peer nodes that keeps tracked source->target jobs fresh on an interval.
- Routes: POST /v1/fluree/iceberg/{materialize,track,untrack},
  GET /v1/fluree/iceberg/tracking.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…zation

Extend materialization with change-data-capture semantics so a materialized
native ledger tracks source updates and deletes, not just inserts.

- DeleteConvention { column, deleted_values: Vec<Option<String>> } on
  IcebergGsConfig: a row is a delete when its column value is in deleted_values;
  a null entry matches a NULL column (Debezium null-payload). Threaded through
  the create builders + /iceberg/map route; validated at graph-source creation.
- order_by column enables latest-by-key: per subject the highest-ordered row
  wins (value-orderable int/date/timestamp only, enforced) with a whole-subject
  replace that clears fields dropped in a newer revision; a tombstone retracts
  the whole subject via a typed-@id wildcard update. Two ordered commits keep the
  watermark advancing only in the upsert (crash/failure self-heals next poll).
- Per-(source,table) watermark with an injective subject; each table is read once
  with its own (from,to] window. Dual-mode: with neither delete nor order_by set
  the pass is the legacy additive merge (unchanged). Fails loud on a
  non-orderable order_by column or multiple triples maps per table.
- term.rs exposes column_string / batch_has_column / column_sort_key /
  column_is_orderable. subjects_retracted on MaterializeResult / route / worker.
- Docs: a "Materialization" guide in docs/graph-sources/iceberg.md plus the
  materialize/track/untrack/tracking endpoints in docs/api/endpoints.md.
- Tests: latest-by-key ordering + retraction-doc-shape unit tests, and an
  end-to-end integration test proving retraction actually deletes (with a control
  proving the earlier bare-string form was a silent no-op).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A static bearer for a Google Iceberg REST catalog (BigLake) is a short-lived
OAuth access token: it expires after ~1h and `BearerTokenAuth` cannot renew it,
so a long-running materialization tracking worker starts returning 401s.

Add `AuthConfig::GoogleMetadata` + `GoogleMetadataAuth`, which mints and
auto-refreshes tokens from the GCE/GKE instance metadata server (Workload
Identity), mirroring the existing `OAuth2ClientCredentials` cache+refresh — the
jittered-expiry `CachedToken` is now shared via `auth::token`. Wire it through
`{Iceberg,R2rml}CreateConfig::with_auth_google_metadata` and the `/iceberg/map`
`auth_google_metadata` field.

The GCS reader's storage HMAC keys are unaffected (static, non-expiring). The
metadata server is only reachable on GCE/GKE; local runs keep using a static
`auth_bearer`.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant