diff --git a/fluree-db-api/examples/reindex_swap_read_profile.rs b/fluree-db-api/examples/reindex_swap_read_profile.rs new file mode 100644 index 0000000000..b95caa16fb --- /dev/null +++ b/fluree-db-api/examples/reindex_swap_read_profile.rs @@ -0,0 +1,376 @@ +//! Read-after-reindex-swap cold-cost profiler. +//! +//! Isolates the micro-cost that the "warm-on-write" work targets, independent +//! of the macro QMpH sweep. The BSBM `explore+update` sweep shows that a low +//! `reindex_min_bytes` (continuous reindex) collapses product-read latency +//! ~30x; this harness reproduces the *per-swap* mechanism deterministically so +//! any fix is measurable on a single number rather than inferred from QMpH. +//! +//! ## What it does, per burst (one controlled index generation) +//! 1. INSERT a small BSBM-shape burst (new products + their reviews). +//! 2. `trigger_index` — build the incremental index and publish it (waits). +//! 3. Reload the graph snapshot so reads see the new generation (the swap). +//! 4. Point-read a product *written in this burst* (its FLI3 leaf was just +//! rewritten → new CID → cold decode path) and, as a control, a product +//! from the base commit (its leaf is usually CID-stable → warm). +//! +//! Background auto-indexing is suppressed during commits (high thresholds) so +//! exactly ONE swap happens per burst and the read that follows is clean. +//! +//! ## Signals (per-burst CSV row + end summary) +//! - `index_ms` — `trigger_index` build+publish latency. +//! - `swap` — 1 if `index_t` advanced (a new generation published). +//! - `read_new_ms` — point-read of a just-written product (cold-hot region). +//! - `read_old_ms` — point-read of a base product (warm control). +//! - `cache_ins_new` — leaflet-cache entries added *during* the new-product +//! read (a proxy for cold misses; ~0 means fully warm). +//! - `cache_ins_old` — same for the control read. +//! - `cache_entries` / `cache_mb` — cache occupancy after the cycle. +//! +//! A warm-on-write fix should drive `read_new_ms` toward `read_old_ms` and +//! `cache_ins_new` toward ~0. Note: `entry_count()` is moka's +//! eventually-consistent estimate, so `cache_ins_*` is a proxy, not an exact +//! hit/miss count — precise counters can be added to `LeafletCache` later. +//! +//! ## Config (env vars) +//! | Var | Default | Meaning | +//! |---|---|---| +//! | `RSR_TOTAL_PRODUCTS` | `20000` | full dataset product count | +//! | `RSR_BASE_PRODUCTS` | `5000` | products in the base indexed commit | +//! | `RSR_BURST_PRODUCTS` | `20` | new products per burst | +//! | `RSR_BURSTS` | `50` | measured burst cycles | +//! | `RSR_WARMUP_BURSTS` | `3` | untimed warmup cycles (excluded) | +//! | `RSR_DB_DIR` | (tempdir) | persistent storage dir | +//! | `RSR_CSV` | `target/reindex-swap-read.csv` | per-burst CSV | +//! +//! ## Run +//! ```bash +//! cargo run --release --example reindex_swap_read_profile -p fluree-db-api +//! RSR_BURSTS=100 RSR_BURST_PRODUCTS=40 \ +//! cargo run --release --example reindex_swap_read_profile -p fluree-db-api +//! ``` + +use std::time::Instant; + +use fluree_bench_support::gen::bsbm::{ + bsbm_data_to_turtle, generate_dataset, BsbmData, Product, Review, +}; +use fluree_db_api::admin::{ReindexOptions, TriggerIndexOptions}; +use fluree_db_api::{CommitOpts, Fluree, FlureeBuilder, IndexConfig, LedgerState, TxnOpts}; + +/// `ex:` prefix expansion used by `bsbm_data_to_turtle`; product ids are +/// `ex:product-NNNNNN`, so the absolute IRI is this + `product-NNNNNN`. +const EX_NS: &str = "http://example.org/ns/"; + +/// High enough that a foreground commit never triggers background indexing; +/// we drive every build explicitly via `trigger_index` so swaps are 1:1 with +/// bursts. +const SUPPRESS_INDEX_BYTES: usize = 5_000_000_000; + +fn env_str(key: &str, default: &str) -> String { + std::env::var(key).unwrap_or_else(|_| default.to_string()) +} + +fn env_usize(key: &str, default: usize) -> usize { + std::env::var(key) + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(default) +} + +/// A burst carries only the new products and their reviews; vendors and +/// persons are committed once in the base and referenced by IRI (RDF needs no +/// prior definition of an object IRI), so a burst stays small and BSBM-shaped. +fn slice_burst(full: &BsbmData, p0: usize, p1: usize) -> BsbmData { + let products: Vec = full.products[p0..p1].to_vec(); + // Reviews are laid out 3-per-product contiguously (review i → product i/3). + let reviews: Vec = full.reviews[p0 * 3..p1 * 3].to_vec(); + BsbmData { + vendors: Vec::new(), + products, + persons: Vec::new(), + reviews, + } +} + +async fn commit_turtle(fluree: &Fluree, ledger: LedgerState, turtle: &str) -> (LedgerState, f64) { + let index_config = IndexConfig { + reindex_min_bytes: SUPPRESS_INDEX_BYTES, + reindex_max_bytes: SUPPRESS_INDEX_BYTES, + }; + let t0 = Instant::now(); + let out = fluree + .insert_turtle_with_opts( + ledger, + turtle, + TxnOpts::default(), + CommitOpts::default(), + &index_config, + None, + ) + .await + .expect("insert burst"); + (out.ledger, t0.elapsed().as_secs_f64() * 1e3) +} + +/// Read every triple about one product — a point read that lands in the +/// product's FLI3 leaf(let). Returns (elapsed_ms, cache_entries_inserted). +async fn point_read_product(fluree: &Fluree, alias: &str, product_local_id: &str) -> (f64, i64) { + let iri = format!("{EX_NS}{product_local_id}"); + let query = format!("SELECT ?p ?o WHERE {{ <{iri}> ?p ?o }}"); + + let entries_before = fluree.leaflet_cache().entry_count() as i64; + let t_load = Instant::now(); + let snapshot = fluree + .graph(alias) + .load() + .await + .expect("graph load for read"); + let load_ms = t_load.elapsed().as_secs_f64() * 1e3; + let t_q = Instant::now(); + let result = snapshot + .query() + .sparql(&query) + .execute() + .await + .expect("point read execute"); + let query_ms = t_q.elapsed().as_secs_f64() * 1e3; + std::hint::black_box(result); + let _ = load_ms; // load is negligible once the generation is applied + let entries_after = fluree.leaflet_cache().entry_count() as i64; + // Return query-only time (load excluded) so the metric isolates read + // latency on the applied generation. + (query_ms, entries_after - entries_before) +} + +#[derive(Default)] +struct Agg { + read_new: Vec, + read_new2: Vec, + read_old: Vec, + read_old_random: Vec, + index_ms: Vec, + ins_new: Vec, + ins_old: Vec, + swaps: usize, +} + +fn pct(sorted: &[f64], p: f64) -> f64 { + if sorted.is_empty() { + return 0.0; + } + let idx = ((sorted.len() as f64 - 1.0) * p).round() as usize; + sorted[idx] +} + +fn summarize(label: &str, samples: &mut [f64]) { + samples.sort_by(|a, b| a.partial_cmp(b).unwrap()); + let mean = samples.iter().sum::() / samples.len().max(1) as f64; + eprintln!( + " {label:<14} n={:<4} mean={:>8.3}ms p50={:>8.3}ms p90={:>8.3}ms max={:>8.3}ms", + samples.len(), + mean, + pct(samples, 0.50), + pct(samples, 0.90), + samples.last().copied().unwrap_or(0.0), + ); +} + +fn run() { + let total_products = env_usize("RSR_TOTAL_PRODUCTS", 20_000); + let base_products = env_usize("RSR_BASE_PRODUCTS", 5_000); + let burst_products = env_usize("RSR_BURST_PRODUCTS", 20); + let bursts = env_usize("RSR_BURSTS", 50); + let warmup = env_usize("RSR_WARMUP_BURSTS", 3); + let csv_path = env_str("RSR_CSV", "target/reindex-swap-read.csv"); + + let needed = base_products + (warmup + bursts) * burst_products; + assert!( + total_products >= needed, + "RSR_TOTAL_PRODUCTS ({total_products}) < base + (warmup+bursts)*burst ({needed}); \ + raise RSR_TOTAL_PRODUCTS or lower the burst/count knobs" + ); + + eprintln!( + "[reindex_swap_read] total={total_products} base={base_products} \ + burst={burst_products} bursts={bursts} warmup={warmup}" + ); + + // Build fluree runtime + full dataset once. + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("tokio runtime"); + + rt.block_on(async move { + let full = generate_dataset(total_products); + + // Storage: persistent dir if requested, else a tempdir kept alive for + // the whole run (`_tempdir` holds it open until the closure ends). + let mut _tempdir: Option = None; + let storage_path = match std::env::var("RSR_DB_DIR") { + Ok(dir) => dir, + Err(_) => { + let td = tempfile::tempdir().expect("db tmpdir"); + let p = td.path().to_string_lossy().to_string(); + _tempdir = Some(td); + p + } + }; + let fluree = FlureeBuilder::file(storage_path) + .build() + .expect("build file-backed Fluree"); + + let alias = "reindex-swap-read/bench:main".to_string(); + let mut ledger = fluree.create_ledger(&alias).await.expect("create_ledger"); + + // --- Base commit: all vendors + persons + the first `base_products` + // products and their reviews. One commit keeps the chain shallow. --- + let base = BsbmData { + vendors: full.vendors.clone(), + persons: full.persons.clone(), + products: full.products[..base_products].to_vec(), + reviews: full.reviews[..base_products * 3].to_vec(), + }; + let base_turtle = bsbm_data_to_turtle(&base); + let (l, base_commit_ms) = commit_turtle(&fluree, ledger, &base_turtle).await; + drop(l); + + // Baseline index behind the binary columnar store (full reindex). + let base_reindex = Instant::now(); + let _ = fluree + .reindex(&alias, ReindexOptions::default()) + .await + .expect("baseline reindex"); + // Reload so `ledger` tracks the published index head; each burst threads + // it forward and the next trigger_index runs incrementally. + ledger = fluree + .ledger(&alias) + .await + .expect("reload after baseline reindex"); + eprintln!( + " base: commit={base_commit_ms:.1}ms reindex={:.1}ms products=0..{base_products}", + base_reindex.elapsed().as_secs_f64() * 1e3, + ); + + // A stable product from the base for the warm control read. + let old_product_id = full.products[base_products / 2].id.clone(); + let old_local = old_product_id.trim_start_matches("ex:").to_string(); + + // CSV header. + let mut csv = String::new(); + csv.push_str( + "burst,phase,commit_ms,index_ms,index_t,fuel,swap,read_new_ms,read_old_ms,\ + cache_ins_new,cache_ins_old,cache_entries,cache_mb\n", + ); + + let mut agg = Agg::default(); + let mut prev_index_t: i64 = -1; + let total_cycles = warmup + bursts; + + for cycle in 0..total_cycles { + let is_warmup = cycle < warmup; + let p0 = base_products + cycle * burst_products; + let p1 = p0 + burst_products; + + // 1. INSERT the burst. + let burst = slice_burst(&full, p0, p1); + let turtle = bsbm_data_to_turtle(&burst); + let (l, commit_ms) = commit_turtle(&fluree, ledger, &turtle).await; + ledger = l; + + // 2. Build + publish the incremental index (waits for completion). + let t_idx = Instant::now(); + let res = fluree + .trigger_index(&alias, TriggerIndexOptions::default()) + .await + .expect("trigger_index"); + let index_ms = t_idx.elapsed().as_secs_f64() * 1e3; + let swapped = res.index_t > prev_index_t; + prev_index_t = res.index_t; + + // Absorb the one-time apply/swap cost (fresh store build + novelty + // dict rebuild) that the FIRST load after a publish pays, so the + // measured reads reflect query latency on the already-applied + // generation — as a client read does, since the background listener + // applies before client traffic arrives. + let _ = fluree + .graph(&alias) + .load() + .await + .expect("absorb apply load"); + + // 3 + 4. Read the just-written product (cold-hot region) and a base + // product (warm control), on the already-applied generation. + let new_local = full.products[p1 - 1].id.trim_start_matches("ex:").to_string(); + let (read_new_ms, ins_new) = point_read_product(&fluree, &alias, &new_local).await; + // Control A: second read of the SAME just-inserted product, back to + // back — isolates first-touch CPU/cache/setup from insertedness. + let (read_new2_ms, _) = point_read_product(&fluree, &alias, &new_local).await; + let (read_old_ms, ins_old) = point_read_product(&fluree, &alias, &old_local).await; + // Control B: a DIFFERENT existing base product each burst (indexed at + // t=1, stable structure) — isolates "fresh cold subject" from + // "just-inserted". If this tracks read_new, the gap is cold-subject. + let rnd_idx = cycle.wrapping_mul(48_611).wrapping_add(12_345) % base_products; + let rnd_local = full.products[rnd_idx].id.trim_start_matches("ex:").to_string(); + let (read_old_rand_ms, _) = point_read_product(&fluree, &alias, &rnd_local).await; + + let cache_entries = fluree.leaflet_cache().entry_count(); + let cache_mb = fluree.leaflet_cache().weighted_size_bytes() as f64 / (1024.0 * 1024.0); + + csv.push_str(&format!( + "{cycle},{phase},{commit_ms:.3},{index_ms:.3},{index_t},{fuel:.1},{swap},\ + {read_new_ms:.3},{read_old_ms:.3},{ins_new},{ins_old},{cache_entries},{cache_mb:.2}\n", + phase = if is_warmup { "warmup" } else { "measure" }, + index_t = res.index_t, + fuel = res.fuel.unwrap_or(0.0), + swap = u8::from(swapped), + )); + + if !is_warmup { + agg.read_new.push(read_new_ms); + agg.read_new2.push(read_new2_ms); + agg.read_old.push(read_old_ms); + agg.read_old_random.push(read_old_rand_ms); + agg.index_ms.push(index_ms); + agg.ins_new.push(ins_new); + agg.ins_old.push(ins_old); + if swapped { + agg.swaps += 1; + } + } + } + + std::fs::write(&csv_path, csv).unwrap_or_else(|e| { + eprintln!(" (could not write {csv_path}: {e})"); + }); + + // --- Summary --- + eprintln!("\n[reindex_swap_read] summary over {bursts} measured bursts:"); + summarize("read_new", &mut agg.read_new); + summarize("read_new2", &mut agg.read_new2); + summarize("read_old", &mut agg.read_old); + summarize("read_old_random", &mut agg.read_old_random); + summarize("index_ms", &mut agg.index_ms); + let sum_i64 = |v: &[i64]| v.iter().sum::(); + let n = agg.ins_new.len().max(1) as i64; + eprintln!( + " swaps={}/{bursts} cache_ins_new(avg)={} cache_ins_old(avg)={} \ + final_cache: {} entries, {:.1} MB", + agg.swaps, + sum_i64(&agg.ins_new) / n, + sum_i64(&agg.ins_old) / n, + fluree.leaflet_cache().entry_count(), + fluree.leaflet_cache().weighted_size_bytes() as f64 / (1024.0 * 1024.0), + ); + eprintln!(" CSV: {csv_path}"); + eprintln!( + " READ (cold-hot region) is the warm-on-write target: expect read_new >> read_old \ + and cache_ins_new > 0 today; a fix drives them together toward ~0." + ); + }); +} + +fn main() { + run(); +} diff --git a/fluree-db-api/src/indexer_attachment_provider.rs b/fluree-db-api/src/indexer_attachment_provider.rs index 4ae61fc655..62effd02ae 100644 --- a/fluree-db-api/src/indexer_attachment_provider.rs +++ b/fluree-db-api/src/indexer_attachment_provider.rs @@ -55,6 +55,29 @@ impl std::fmt::Debug for ApiAttachmentEventsProvider { } } +/// Resolves the process-shared read cache from the api's `LedgerManager` once +/// it's constructed — the manager owns the `LeafletCache` the query server +/// reads from, so the background indexer can warm-on-write into that exact +/// cache. Yields `None` until the manager cell is filled (and always for a +/// separate-machine indexer, which has no local manager). +pub(crate) struct LedgerManagerWarmCache { + pub(crate) manager: LedgerManagerCell, +} + +impl std::fmt::Debug for LedgerManagerWarmCache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LedgerManagerWarmCache") + .field("bound", &self.manager.get().is_some()) + .finish() + } +} + +impl fluree_db_indexer::WarmCacheSource for LedgerManagerWarmCache { + fn warm_cache(&self) -> Option> { + self.manager.get().and_then(|m| m.leaflet_cache().cloned()) + } +} + #[async_trait] impl AttachmentEventsProvider for ApiAttachmentEventsProvider { async fn attachment_events(&self, ledger_id: &str) -> Option { diff --git a/fluree-db-api/src/lib.rs b/fluree-db-api/src/lib.rs index 3da5da41d2..15086c3f9e 100644 --- a/fluree-db-api/src/lib.rs +++ b/fluree-db-api/src/lib.rs @@ -2552,11 +2552,20 @@ impl FlureeBuilder { }, ) as Arc; + // Warm-on-write (co-located only): let the background build seed the + // query server's shared read cache with the leaflets it just wrote. + // Resolved late from the same LedgerManager cell used above, so the + // worker warms the exact cache readers use. + let warm_cache_source = + Arc::new(crate::indexer_attachment_provider::LedgerManagerWarmCache { + manager: Arc::clone(attachment_provider_cell), + }) as Arc; let indexer_config = idx_config .indexer_config .clone() .with_fulltext_config_provider(provider) - .with_attachment_events_provider(ann_provider); + .with_attachment_events_provider(ann_provider) + .with_warm_cache_source(warm_cache_source); // BackgroundIndexerWorker takes an // `Arc` — the combined lookup // + index-publish surface. `ReadWriteNameService` diff --git a/fluree-db-binary-index/src/read/binary_cursor.rs b/fluree-db-binary-index/src/read/binary_cursor.rs index c1e2f8e03a..7829f17b2e 100644 --- a/fluree-db-binary-index/src/read/binary_cursor.rs +++ b/fluree-db-binary-index/src/read/binary_cursor.rs @@ -442,7 +442,7 @@ impl BinaryCursor { let batch = if self.filter.is_empty() || batch.is_empty() { batch } else { - filter_batch(&self.filter, &batch) + filter_batch(&self.filter, &batch, self.order) }; // Apply overlay merge if we have overlay ops. @@ -798,9 +798,16 @@ fn push_overlay_row( /// Apply the filter to a batch, returning only matching rows. /// Returns the batch unchanged if all rows match (avoids copy). -fn filter_batch(filter: &BinaryFilter, batch: &ColumnBatch) -> ColumnBatch { +fn filter_batch(filter: &BinaryFilter, batch: &ColumnBatch, order: RunSortOrder) -> ColumnBatch { + // Within-leaflet seek: leaflet rows are sorted by the order's key, so when + // the leading sort column is bound to a single value we binary-search its + // contiguous row range instead of scanning every row. This is the common + // bound-subject SPOT point read — a large leaflet with ~5 target rows. Rows + // outside the range have a different leading value, so they can't match a + // filter that pins it — the output is identical to a full scan. + let (start, end) = leading_bound_range(filter, batch, order); let mut matching: Vec = Vec::new(); - for i in 0..batch.row_count { + for i in start..end { let s_id = batch.s_id.get(i); // always present let o_key = batch.o_key.get(i); // always present let p_id = batch.p_id.get_or(i, 0); @@ -811,6 +818,7 @@ fn filter_batch(filter: &BinaryFilter, batch: &ColumnBatch) -> ColumnBatch { } } + // Fast path only when nothing was skipped and everything matched. if matching.len() == batch.row_count { return batch.clone(); } @@ -818,6 +826,39 @@ fn filter_batch(filter: &BinaryFilter, batch: &ColumnBatch) -> ColumnBatch { gather_batch(batch, &matching) } +/// Binary-search the contiguous `[start, end)` row range whose leading sort +/// column equals its bound filter value. Returns the full `[0, row_count)` +/// range when the leading column is unbound or not a materialized (sorted) +/// block — so callers fall back to a full scan, never miss rows. +fn leading_bound_range( + filter: &BinaryFilter, + batch: &ColumnBatch, + order: RunSortOrder, +) -> (usize, usize) { + match order { + RunSortOrder::Spot => sorted_block_range(&batch.s_id, filter.s_id, batch.row_count), + RunSortOrder::Post | RunSortOrder::Psot => { + sorted_block_range(&batch.p_id, filter.p_id, batch.row_count) + } + RunSortOrder::Opst => sorted_block_range(&batch.o_type, filter.o_type, batch.row_count), + } +} + +fn sorted_block_range( + col: &ColumnData, + bound: Option, + row_count: usize, +) -> (usize, usize) { + match (bound, col) { + (Some(v), ColumnData::Block(arr)) => { + let start = arr.partition_point(|&x| x < v); + let end = arr.partition_point(|&x| x <= v); + (start, end) + } + _ => (0, row_count), + } +} + /// Gather rows at the given indices from a batch into a new batch. fn gather_batch(src: &ColumnBatch, indices: &[usize]) -> ColumnBatch { ColumnBatch { diff --git a/fluree-db-binary-index/src/read/binary_index_store.rs b/fluree-db-binary-index/src/read/binary_index_store.rs index c9d118b85b..fc3646fa8a 100644 --- a/fluree-db-binary-index/src/read/binary_index_store.rs +++ b/fluree-db-binary-index/src/read/binary_index_store.rs @@ -652,28 +652,19 @@ impl BinaryIndexStore { let leaf_id = xxhash_rust::xxh3::xxh3_128(leaf_cid.to_bytes().as_ref()); - // Fast path 1: local filesystem — full read is optimal (OS page cache). + // Fast path 1: local filesystem — mmap so the raw bytes stay in OS page + // cache (only touched pages fault in) with the directory served from the + // shared cache. Avoids copying the whole (possibly grown) leaf blob and + // re-decoding its directory on every read. if let Some(local_path) = cs.resolve_local_path(leaf_cid) { - let bytes = std::fs::read(local_path)?; - let sidecar = if need_replay { - self.fetch_sidecar_bytes_sync(sidecar_cid)? - } else { - None - }; - return Ok(Box::new(FullBlobLeafHandle::new(bytes, sidecar, leaf_id)?)); + return self.open_mmapped_leaf(&local_path, leaf_id, sidecar_cid, need_replay); } - // Fast path 2: locally cached — full read from disk cache. + // Fast path 2: locally cached (remote-promoted) — same mmap path; a + // missing cache file falls through to the range-read paths below. let cache_path = self.cache_dir.join(leaf_cid.to_string()); - match std::fs::read(&cache_path) { - Ok(bytes) => { - let sidecar = if need_replay { - self.fetch_sidecar_bytes_sync(sidecar_cid)? - } else { - None - }; - return Ok(Box::new(FullBlobLeafHandle::new(bytes, sidecar, leaf_id)?)); - } + match self.open_mmapped_leaf(&cache_path, leaf_id, sidecar_cid, need_replay) { + Ok(handle) => return Ok(handle), Err(err) if err.kind() == io::ErrorKind::NotFound => {} Err(err) => return Err(err), } @@ -758,6 +749,46 @@ impl BinaryIndexStore { /// Decoded directories are cached in the shared [`LeafletCache`] keyed by /// leaf CID — content-addressed, so entries never go stale. /// + /// Open a local (or locally-cached) leaf via mmap, with the decoded + /// directory served from the shared `LeafletCache`. The raw bytes stay in OS + /// page cache (only touched pages fault in) — no whole-blob copy, and the + /// directory is parsed once per leaf CID. See [`super::leaf_access::MmapLeafHandle`]. + fn open_mmapped_leaf( + &self, + path: &Path, + leaf_id: u128, + sidecar_cid: Option<&ContentId>, + need_replay: bool, + ) -> io::Result> { + let file = std::fs::File::open(path)?; + // SAFETY: leaf files are immutable, content-addressed CAS artifacts — + // never modified after write, so the mapping's bytes are stable. + let mmap = unsafe { memmap2::Mmap::map(&file)? }; + // Content-addressed directory: parse once per leaf CID, reused across + // opens. `leaf_id` == xxh3_128(leaf_cid) is the same key `open_leaf_dir` + // and warm-on-write use. + let dir = if let Some(cache) = &self.leaflet_cache { + cache.try_get_or_load_leaf_dir(leaf_id, || { + let header = crate::format::leaf::decode_leaf_header_v3(&mmap[..])?; + crate::format::leaf::decode_leaf_dir_v3_with_base(&mmap[..], &header).map(Arc::new) + })? + } else { + let header = crate::format::leaf::decode_leaf_header_v3(&mmap[..])?; + Arc::new(crate::format::leaf::decode_leaf_dir_v3_with_base( + &mmap[..], + &header, + )?) + }; + let sidecar = if need_replay { + self.fetch_sidecar_bytes_sync(sidecar_cid)? + } else { + None + }; + Ok(Box::new(super::leaf_access::MmapLeafHandle::new( + mmap, dir, sidecar, leaf_id, + ))) + } + /// Callers that may need `load_columns` must use [`Self::open_leaf_handle`]. pub fn open_leaf_dir(&self, leaf_cid: &ContentId) -> io::Result> { let key = xxhash_rust::xxh3::xxh3_128(leaf_cid.to_bytes().as_ref()); diff --git a/fluree-db-binary-index/src/read/column_types.rs b/fluree-db-binary-index/src/read/column_types.rs index f3ddad9231..4680f703da 100644 --- a/fluree-db-binary-index/src/read/column_types.rs +++ b/fluree-db-binary-index/src/read/column_types.rs @@ -139,6 +139,36 @@ impl ColumnBatch { + self.o_i.byte_size() + self.t.byte_size() } + + /// Project a (super)batch down to `requested`, mirroring what a fresh narrow + /// decode of `requested` would yield: requested columns are shared by cheap + /// `Arc` clone, unrequested columns become `AbsentDefault` so downstream + /// `filter_batch`/`gather_batch` never carry or copy them (e.g. an unwanted + /// `t`). Used by the leaflet cache's superset fallback so a warm-on-write + /// `ColumnSet::ALL` batch can serve narrower reads. + /// + /// Sound only when `self` already covers `requested` (`cached ⊇ requested`); + /// the cache guarantees this by projecting only an `ALL` batch. + pub fn project_to(&self, requested: ColumnSet) -> ColumnBatch { + macro_rules! keep { + ($col:expr, $field:ident) => { + if requested.contains($col) { + self.$field.clone() + } else { + ColumnData::AbsentDefault + } + }; + } + ColumnBatch { + row_count: self.row_count, + s_id: keep!(ColumnId::SId, s_id), + o_key: keep!(ColumnId::OKey, o_key), + p_id: keep!(ColumnId::PId, p_id), + o_type: keep!(ColumnId::OType, o_type), + o_i: keep!(ColumnId::OI, o_i), + t: keep!(ColumnId::T, t), + } + } } // ============================================================================ diff --git a/fluree-db-binary-index/src/read/leaf_access.rs b/fluree-db-binary-index/src/read/leaf_access.rs index 55602dc79c..1ba4fdcdcc 100644 --- a/fluree-db-binary-index/src/read/leaf_access.rs +++ b/fluree-db-binary-index/src/read/leaf_access.rs @@ -144,6 +144,99 @@ impl LeafHandle for FullBlobLeafHandle { } } +// ============================================================================ +// MmapLeafHandle +// ============================================================================ + +/// Leaf handle backed by an mmap of the local leaf file, with the decoded +/// directory shared from the [`LeafletCache`](super::leaflet_cache::LeafletCache). +/// +/// The local read fast path used to `std::fs::read` the entire leaf blob into a +/// fresh heap buffer on every open and re-decode the directory each time. For a +/// leaf that grows across incremental builds, that whole-blob copy + full +/// directory decode is re-paid per point read. This handle instead: +/// - maps the (immutable, content-addressed) leaf file, so the raw bytes stay in +/// OS page cache and only the pages actually touched (header, directory, and +/// the one scanned leaflet's columns) fault in — no whole-blob copy; and +/// - takes the decoded directory as an `Arc` from the shared cache, so the +/// directory is parsed once per leaf CID, not per open. +/// +/// Column data itself is still materialized once, per leaflet, via the V3Batch +/// cache — this handle only supplies the bytes for a cold decode. Raw leaf bytes +/// are never copied into the cache budget. +pub struct MmapLeafHandle { + mmap: memmap2::Mmap, + dir: Arc, + sidecar: Option>, + leaf_id: u128, +} + +impl MmapLeafHandle { + pub fn new( + mmap: memmap2::Mmap, + dir: Arc, + sidecar: Option>, + leaf_id: u128, + ) -> Self { + Self { + mmap, + dir, + sidecar, + leaf_id, + } + } +} + +impl LeafHandle for MmapLeafHandle { + fn dir(&self) -> &DecodedLeafDirV3 { + &self.dir + } + + fn load_columns( + &self, + leaflet_idx: usize, + projection: &ColumnProjection, + order: RunSortOrder, + ) -> io::Result { + let entry = &self.dir.entries[leaflet_idx]; + load_leaflet_columns( + &self.mmap[..], + entry, + self.dir.payload_base, + projection, + order, + ) + } + + fn load_sidecar_segment(&self, leaflet_idx: usize) -> io::Result> { + let entry = &self.dir.entries[leaflet_idx]; + if entry.history_len == 0 { + return Ok(Vec::new()); + } + let sc_bytes = self.sidecar.as_deref().ok_or_else(|| { + io::Error::new( + io::ErrorKind::NotFound, + "sidecar bytes required for history replay but not available", + ) + })?; + let seg = HistorySegmentRef { + offset: entry.history_offset, + len: entry.history_len, + min_t: entry.history_min_t, + max_t: entry.history_max_t, + }; + decode_history_segment(sc_bytes, &seg) + } + + fn sidecar_bytes(&self) -> Option<&[u8]> { + self.sidecar.as_deref() + } + + fn leaf_id(&self) -> u128 { + self.leaf_id + } +} + // ============================================================================ // RangeReadFetcher trait // ============================================================================ diff --git a/fluree-db-binary-index/src/read/leaflet_cache.rs b/fluree-db-binary-index/src/read/leaflet_cache.rs index e1102a1dc0..7736bfb696 100644 --- a/fluree-db-binary-index/src/read/leaflet_cache.rs +++ b/fluree-db-binary-index/src/read/leaflet_cache.rs @@ -383,6 +383,15 @@ pub struct LeafletCache { inner: Cache, } +impl std::fmt::Debug for LeafletCache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LeafletCache") + .field("entries", &self.entry_count()) + .field("bytes", &self.weighted_size_bytes()) + .finish() + } +} + /// Run a moka single-flight call (`get_with`/`try_get_with`) under a Tokio /// blocking region when on a multi-thread runtime. /// @@ -537,6 +546,16 @@ impl LeafletCache { } } + /// Warm-on-write: seed a reverse-dict leaf the *writer* already holds. + /// + /// Key must be `cid_cache_key(cas_address_string)` — the reader keys dict + /// leaves on the CAS address string (`cid.to_string()`), not the CID bytes. + /// Value is the raw on-disk leaf bytes; the reader decodes on access. + pub fn insert_dict_leaf(&self, key: u128, bytes: Arc<[u8]>) { + self.inner + .insert(CacheKey::DictLeaf(key), CachedEntry::DictLeaf(bytes)); + } + // ======================================================================== // Decoded leaf directory cache // ======================================================================== @@ -578,6 +597,18 @@ impl LeafletCache { } } + /// Warm-on-write: seed a decoded leaf directory the *writer* already holds. + /// + /// The incremental indexer, running co-located with the query server, has + /// the just-written leaf's bytes in hand; decoding + inserting the directory + /// here saves the reader that immediately revisits the leaf a re-decode. + /// Key is `cid_cache_key(leaf_cid)` — content-addressed, so this can never + /// serve stale data. Admission is TinyLFU-bounded like any other insert. + pub fn insert_leaf_dir(&self, key: u128, dir: Arc) { + self.inner + .insert(CacheKey::LeafDir(key), CachedEntry::LeafDir(dir)); + } + // ======================================================================== // BM25 posting leaflet cache // ======================================================================== @@ -765,6 +796,21 @@ impl LeafletCache { if let Some(batch) = self.get_v3_batch(&key) { return Ok(batch); } + // Superset fallback: a warm-on-write `ColumnSet::ALL` batch covers any + // narrower request. Serve it projected down rather than re-decoding — + // the invariant is that a cached batch may satisfy a request only when + // its columns ⊇ the requested columns, and ALL ⊇ everything. + let all_columns = super::column_types::ColumnSet::ALL.0; + if key.columns != all_columns { + let all_key = V3BatchCacheKey { + leaf_id: key.leaf_id, + leaflet_idx: key.leaflet_idx, + columns: all_columns, + }; + if let Some(all_batch) = self.get_v3_batch(&all_key) { + return Ok(all_batch.project_to(super::column_types::ColumnSet(key.columns))); + } + } // Run the single-flight wait/init in a blocking region so a // waiter promotes a replacement worker (see in_blocking_region). let result = in_blocking_region(|| { @@ -779,6 +825,17 @@ impl LeafletCache { } } + /// Warm-on-write: seed a decoded V3 column batch the *writer* already holds. + /// + /// Counterpart to [`Self::insert_leaf_dir`] for the column data. The key's + /// `columns` bitmask must match the projection a reader will request, or the + /// warmed entry is simply never hit (never mis-served) — so seed the common + /// projection(s) only. Content-addressed leaf id makes this always safe. + pub fn insert_v3_batch(&self, key: V3BatchCacheKey, batch: super::column_types::ColumnBatch) { + self.inner + .insert(CacheKey::V3Batch(key), CachedEntry::V3Batch(batch)); + } + // ======================================================================== // Housekeeping // ======================================================================== @@ -962,6 +1019,56 @@ mod tests { assert!(cache.get_dict_leaf(1000).is_none()); } + #[test] + fn test_v3_batch_superset_serves_narrow_without_decode() { + use crate::read::column_types::{ColumnBatch, ColumnData, ColumnSet}; + let cache = LeafletCache::with_max_bytes(10 * 1024 * 1024); + + // Warm-on-write seeds a full (ALL) batch for a leaflet. + let all_batch = ColumnBatch { + row_count: 2, + s_id: ColumnData::Block(vec![10u64, 11].into()), + o_key: ColumnData::Block(vec![100u64, 101].into()), + p_id: ColumnData::Const(5), + o_type: ColumnData::Const(7), + o_i: ColumnData::Block(vec![0u32, 1].into()), + t: ColumnData::Block(vec![3u32, 4].into()), + }; + let all_key = V3BatchCacheKey { + leaf_id: 0xABCD, + leaflet_idx: 2, + columns: ColumnSet::ALL.0, + }; + cache.insert_v3_batch(all_key.clone(), all_batch); + + // A narrow (CORE) read must be served from the ALL batch, projected — + // decode_fn panics to prove no re-decode happens. + let core_key = V3BatchCacheKey { + leaf_id: 0xABCD, + leaflet_idx: 2, + columns: ColumnSet::CORE.0, + }; + let served = cache + .try_get_or_decode_v3_batch(core_key, || panic!("must not decode: ALL covers CORE")) + .expect("superset hit"); + + // Requested (CORE) columns carry real data; omitted ones are absent. + assert_eq!(served.row_count, 2); + assert_eq!(served.s_id.get(1), 11); + assert_eq!(served.o_key.get(0), 100); + assert_eq!(served.p_id.get(0), 5); + assert_eq!(served.o_type.get(0), 7); + assert!(served.o_i.is_absent(), "o_i not in CORE → projected away"); + assert!(served.t.is_absent(), "t not in CORE → projected away"); + + // A request for columns NOT covered (here, an exact ALL request) still + // hits the stored ALL batch directly. + let full = cache + .try_get_or_decode_v3_batch(all_key, || panic!("must not decode: exact ALL hit")) + .expect("exact hit"); + assert!(!full.t.is_absent(), "exact ALL request keeps t"); + } + #[test] fn test_bm25_leaflet_insert_get_miss() { let cache = LeafletCache::with_max_bytes(10 * 1024 * 1024); diff --git a/fluree-db-indexer/src/build/dicts.rs b/fluree-db-indexer/src/build/dicts.rs index 8cc39f8020..ce7d7bea70 100644 --- a/fluree-db-indexer/src/build/dicts.rs +++ b/fluree-db-indexer/src/build/dicts.rs @@ -3,7 +3,7 @@ //! These functions handle updating existing reverse dictionary trees when //! new subjects or strings are added during incremental indexing. -use fluree_db_binary_index::DictTreeRefs; +use fluree_db_binary_index::{DictTreeRefs, LeafletCache}; use fluree_db_core::{ContentId, ContentKind, ContentStore}; use crate::error::{IndexerError, Result}; @@ -15,6 +15,7 @@ pub(crate) async fn upload_incremental_reverse_tree_async( dict: fluree_db_core::DictKind, existing_refs: &DictTreeRefs, new_subjects: Vec<(u16, u64, Vec)>, + warm_cache: Option<&LeafletCache>, ) -> Result { use fluree_db_binary_index::dict::reverse_leaf::{subject_reverse_key, ReverseEntry}; @@ -27,7 +28,8 @@ pub(crate) async fn upload_incremental_reverse_tree_async( .collect(); entries.sort_by(|a, b| a.key.cmp(&b.key)); - upload_incremental_reverse_tree_core(content_store, dict, existing_refs, entries).await + upload_incremental_reverse_tree_core(content_store, dict, existing_refs, entries, warm_cache) + .await } /// Async version of reverse tree upload for **string** dictionaries. @@ -40,6 +42,7 @@ pub(crate) async fn upload_incremental_reverse_tree_async_strings( dict: fluree_db_core::DictKind, existing_refs: &DictTreeRefs, new_strings: Vec<(u32, Vec)>, + warm_cache: Option<&LeafletCache>, ) -> Result { use fluree_db_binary_index::dict::reverse_leaf::ReverseEntry; @@ -52,7 +55,8 @@ pub(crate) async fn upload_incremental_reverse_tree_async_strings( .collect(); entries.sort_by(|a, b| a.key.cmp(&b.key)); - upload_incremental_reverse_tree_core(content_store, dict, existing_refs, entries).await + upload_incremental_reverse_tree_core(content_store, dict, existing_refs, entries, warm_cache) + .await } /// Core async reverse tree upload: pre-fetch affected leaves, spawn_blocking @@ -62,6 +66,7 @@ async fn upload_incremental_reverse_tree_core( dict: fluree_db_core::DictKind, existing_refs: &DictTreeRefs, entries: Vec, + warm_cache: Option<&LeafletCache>, ) -> Result { let kind = ContentKind::DictBlob { dict }; @@ -130,6 +135,15 @@ async fn upload_incremental_reverse_tree_core( .await .map_err(|e| IndexerError::StorageWrite(e.to_string()))?; let cid_str = cid.to_string(); + // Warm-on-write (co-located): seed the just-written reverse-dict leaf so + // a reader resolving a newly-added IRI/string hits the cache instead of + // a cold read. Reader keys on the CAS address string (== cid_str). + if let Some(cache) = warm_cache { + cache.insert_dict_leaf( + LeafletCache::cid_cache_key(cid_str.as_bytes()), + std::sync::Arc::from(leaf_art.bytes.as_slice()), + ); + } address_to_cid.insert(cid_str.clone(), cid); hash_to_address.insert(leaf_art.hash.clone(), cid_str); } diff --git a/fluree-db-indexer/src/build/incremental.rs b/fluree-db-indexer/src/build/incremental.rs index 6467bb46ea..5de3d99ea0 100644 --- a/fluree-db-indexer/src/build/incremental.rs +++ b/fluree-db-indexer/src/build/incremental.rs @@ -24,7 +24,7 @@ use std::time::Instant; use fluree_db_binary_index::format::branch::LeafEntry; use fluree_db_binary_index::format::run_record::RunSortOrder; use fluree_db_binary_index::format::run_record_v2::{cmp_v2_for_order, RunRecordV2}; -use fluree_db_binary_index::{BinaryGarbageRef, BinaryPrevIndexRef}; +use fluree_db_binary_index::{BinaryGarbageRef, BinaryPrevIndexRef, LeafletCache}; use fluree_db_core::{ContentId, ContentKind, ContentStore}; use futures::stream::{self, StreamExt, TryStreamExt}; @@ -176,6 +176,7 @@ async fn run_update_branch( upload_budget: Arc, upload_buffer: usize, cache_dir: std::path::PathBuf, + warm_cache: Option>, ) -> std::result::Result<(BranchUpdateMeta, Phase2FetchStatsSnapshot), IndexerError> { let parent_span = tracing::Span::current(); let stats = Phase2FetchStats::default(); @@ -332,6 +333,7 @@ async fn run_update_branch( let content_store = content_store.as_ref(); let tracker = &tracker; let cache_dir = &cache_dir; + let warm_cache = warm_cache.as_ref(); async move { let mut totals = LeafUploadCounts::default(); while let Some(blob) = rx.recv().await { @@ -341,6 +343,7 @@ async fn run_update_branch( upload_budget_ref, cache_dir, blob, + warm_cache, ) .await?; totals.leaf_bytes += c.leaf_bytes; @@ -434,6 +437,13 @@ async fn execute_phase2_task( }; let started = Instant::now(); + // Resolve the warm-on-write cache once per task (co-located only; `None` + // elsewhere). The CoW (update) paths seed it; fresh builds do not (they'd + // decode the whole graph). + let warm_cache = config + .warm_cache_source + .as_ref() + .and_then(|s| s.warm_cache()); let output = match kind { Phase2TaskKind::DefaultExisting { leaves } => { let branch_bytes = @@ -448,6 +458,7 @@ async fn execute_phase2_task( Arc::clone(&upload_budget), upload_buffer, cache_dir.clone(), + warm_cache.clone(), ) .await?; Phase2TaskOutput { @@ -508,6 +519,7 @@ async fn execute_phase2_task( Arc::clone(&upload_budget), upload_buffer, cache_dir.clone(), + warm_cache.clone(), ) .await?; let branch_cid = content_store @@ -826,6 +838,13 @@ pub async fn incremental_index( upload_incremental_reverse_tree_async, upload_incremental_reverse_tree_async_strings, }; let mut new_dict_refs = base_root.dict_refs.clone(); + // Resolve the warm-on-write cache once (co-located only); seeds the + // reverse-dict leaves we rewrite so a reader resolving a just-added + // IRI/string hits the cache instead of a cold read. + let warm_cache = config + .warm_cache_source + .as_ref() + .and_then(|s| s.warm_cache()); if !novelty.new_subjects.is_empty() { tracing::debug!( @@ -837,6 +856,7 @@ pub async fn incremental_index( fluree_db_core::DictKind::SubjectReverse, &base_root.dict_refs.subject_reverse, novelty.new_subjects.clone(), + warm_cache.as_deref(), ) .await?; root_builder.add_replaced_cids(updated.replaced_cids); @@ -853,6 +873,7 @@ pub async fn incremental_index( fluree_db_core::DictKind::StringReverse, &base_root.dict_refs.string_reverse, novelty.new_strings.clone(), + warm_cache.as_deref(), ) .await?; root_builder.add_replaced_cids(updated.replaced_cids); @@ -3755,6 +3776,7 @@ async fn upload_one_leaf_blob( upload_budget: &Semaphore, cache_dir: &std::path::Path, blob: NewLeafBlob, + warm_cache: Option<&Arc>, ) -> Result { let _permit = upload_budget .acquire() @@ -3773,6 +3795,27 @@ async fn upload_one_leaf_blob( cache_artifact_bytes(cache_dir, &leaf_cid, &info.leaf_bytes, "index_leaf"); counts.leaf_bytes = info.leaf_bytes.len() as u64; + // Warm-on-write (co-located only): seed the shared read cache with the + // leaflets we just wrote, from bytes already in hand, so the query server's + // immediate read of this new-CID leaf hits the cache instead of cold decode. + // + // The zstd decode runs on a blocking thread, not this async uploader task — + // in the co-located deployment the same runtime serves queries, and a churn + // build must not stall a query worker on decompression. Warming is + // best-effort and fire-and-forget: a read that races a not-yet-finished warm + // just cold-decodes as before, and the remaining build steps (branch + root + // write, head swap) almost always outlast the decode, so the immediate + // post-swap read still hits. `leaf_bytes` is at its last use here, so it + // moves into the task rather than being copied. + if let Some(cache) = warm_cache { + let cache = Arc::clone(cache); + let leaf_cid = info.leaf_cid.clone(); + let leaf_bytes = info.leaf_bytes; + tokio::task::spawn_blocking(move || { + warm_leaf_into_cache(&cache, &leaf_cid, &leaf_bytes); + }); + } + if let Some(sc_bytes) = info.sidecar_bytes.as_deref() { let sidecar_cid = content_store .put(ContentKind::HistorySidecar, sc_bytes) @@ -3785,10 +3828,61 @@ async fn upload_one_leaf_blob( counts.sidecar_bytes = sc_bytes.len() as u64; counts.sidecar_count = 1; } - // `info` (and its leaf_bytes/sidecar_bytes) drops here, freeing the buffers. + // `info`'s sidecar buffer drops here; `leaf_bytes` was moved into the warm + // task above (or already dropped when no warm cache is configured). Ok(counts) } +/// Warm-on-write: seed the shared read cache with the leaflets this build just +/// wrote so a co-located query server's immediate read of the freshly-rewritten +/// (new-CID) leaf hits the cache instead of re-reading + re-decoding from disk. +/// +/// Decodes from the bytes already in hand (no re-fetch) and inserts each leaflet +/// under `ColumnSet::ALL`; the read-side superset fallback projects that down to +/// any narrower request, so we never have to guess a reader's projection. The +/// `leaf_id` derivation matches the reader's (`xxh3_128(leaf_cid.to_bytes())`). +/// Best-effort: a decode error just leaves the reader to cold-decode as before. +fn warm_leaf_into_cache(cache: &LeafletCache, leaf_cid: &ContentId, leaf_bytes: &[u8]) { + use fluree_db_binary_index::format::leaf::{ + decode_leaf_dir_v3_with_base, decode_leaf_header_v3, + }; + use fluree_db_binary_index::read::column_loader::load_leaflet_columns; + use fluree_db_binary_index::read::column_types::{ColumnProjection, ColumnSet}; + use fluree_db_binary_index::V3BatchCacheKey; + + let Ok(header) = decode_leaf_header_v3(leaf_bytes) else { + return; + }; + let order = header.order; + let Ok(dir) = decode_leaf_dir_v3_with_base(leaf_bytes, &header) else { + return; + }; + let leaf_id = LeafletCache::cid_cache_key(&leaf_cid.to_bytes()); + for (idx, entry) in dir.entries.iter().enumerate() { + if entry.row_count == 0 { + continue; + } + if let Ok(batch) = load_leaflet_columns( + leaf_bytes, + entry, + dir.payload_base, + &ColumnProjection::all(), + order, + ) { + cache.insert_v3_batch( + V3BatchCacheKey { + leaf_id, + leaflet_idx: idx as u32, + columns: ColumnSet::ALL.0, + }, + batch, + ); + } + } + // Seed the directory too (used once the local scan path consults it). + cache.insert_leaf_dir(leaf_id, std::sync::Arc::new(dir)); +} + /// Upload a fully-materialized set of leaf blobs (the fresh-build paths produce /// the whole `Vec` up front) in parallel under the shared global budget, /// consuming each blob by value so its bytes are freed after its put. @@ -3806,7 +3900,9 @@ async fn upload_leaf_blobs( // large fresh build. let buffer = upload_buffer.max(1); let totals = stream::iter(blobs) - .map(|blob| upload_one_leaf_blob(content_store, tracker, upload_budget, cache_dir, blob)) + .map(|blob| { + upload_one_leaf_blob(content_store, tracker, upload_budget, cache_dir, blob, None) + }) .buffer_unordered(buffer) .try_fold(LeafUploadCounts::default(), |mut acc, c| async move { acc.leaf_bytes += c.leaf_bytes; diff --git a/fluree-db-indexer/src/config.rs b/fluree-db-indexer/src/config.rs index 0743bc155b..40757b4984 100644 --- a/fluree-db-indexer/src/config.rs +++ b/fluree-db-indexer/src/config.rs @@ -2,6 +2,7 @@ use crate::gc::{DEFAULT_MAX_OLD_INDEXES, DEFAULT_MIN_TIME_GARBAGE_MINS}; use async_trait::async_trait; +use fluree_db_binary_index::LeafletCache; use std::path::PathBuf; use std::sync::Arc; @@ -127,6 +128,18 @@ pub struct ConfiguredFulltextProperty { pub property_iri: String, } +/// Late-bound resolver for the shared read cache to warm on write. +/// +/// The background indexer worker is constructed before the api's +/// `LedgerManager` (which owns the process-shared `LeafletCache`) exists, so +/// the api plugs in a resolver that yields that cache once it's available. +/// Returns `None` until then — and always in separate-machine deployments, +/// where the worker cannot reach the query server's cache. +pub trait WarmCacheSource: std::fmt::Debug + Send + Sync { + /// The shared read cache to seed on write, or `None` if unavailable. + fn warm_cache(&self) -> Option>; +} + /// Configuration for index building #[derive(Debug, Clone)] pub struct IndexerConfig { @@ -343,6 +356,17 @@ pub struct IndexerConfig { /// `pending_commit_cids` unset so discovery falls back to the serial walk. /// Used to A/B the fast path against its baseline on the same backlog. pub force_serial_commit_walk: bool, + + /// Late-bound resolver for the shared read cache to warm on write — see + /// [`WarmCacheSource`]. + /// + /// When it yields a cache (co-located deployments), the incremental build + /// seeds that cache with the leaflets it just wrote (decoded under + /// `ColumnSet::ALL`) so the query server's immediate read of a freshly + /// rewritten leaf hits the cache instead of re-reading + re-decoding from + /// disk. `None` (default) = off. Not build output, so excluded from config + /// identity. + pub warm_cache_source: Option>, } /// Default run-sort budget: 256 MB. @@ -387,6 +411,7 @@ impl Default for IndexerConfig { attachment_events_provider: None, pending_commit_cids: None, force_serial_commit_walk: false, + warm_cache_source: None, } } } @@ -422,6 +447,7 @@ impl IndexerConfig { attachment_events_provider: None, pending_commit_cids: None, force_serial_commit_walk: false, + warm_cache_source: None, } } @@ -450,6 +476,7 @@ impl IndexerConfig { attachment_events_provider: None, pending_commit_cids: None, force_serial_commit_walk: false, + warm_cache_source: None, } } @@ -478,6 +505,7 @@ impl IndexerConfig { attachment_events_provider: None, pending_commit_cids: None, force_serial_commit_walk: false, + warm_cache_source: None, } } @@ -508,6 +536,14 @@ impl IndexerConfig { self } + /// Attach a late-bound resolver for the shared read cache to warm on write. + /// Only co-located deployments plug this in; separate-machine indexers leave + /// it unset (they can't reach the query server's cache). + pub fn with_warm_cache_source(mut self, source: Arc) -> Self { + self.warm_cache_source = Some(source); + self + } + pub fn with_leaflet_rows(mut self, rows: usize) -> Self { self.leaflet_rows = rows.max(1); self diff --git a/fluree-db-indexer/src/lib.rs b/fluree-db-indexer/src/lib.rs index c5b9d0be52..7eaf4ef535 100644 --- a/fluree-db-indexer/src/lib.rs +++ b/fluree-db-indexer/src/lib.rs @@ -45,7 +45,7 @@ pub mod stats; // Re-export main types pub use config::{ AttachmentEventCoverage, AttachmentEventsProvider, ConfiguredFulltextProperty, - ConfiguredFulltextScope, FulltextConfigProvider, IndexerConfig, + ConfiguredFulltextScope, FulltextConfigProvider, IndexerConfig, WarmCacheSource, }; pub use drop::collect_ledger_cids; pub use error::{IndexerError, Result}; diff --git a/fluree-db-indexer/src/run_index/build/incremental_branch.rs b/fluree-db-indexer/src/run_index/build/incremental_branch.rs index 7ab637ad9b..2e6a2b3e87 100644 --- a/fluree-db-indexer/src/run_index/build/incremental_branch.rs +++ b/fluree-db-indexer/src/run_index/build/incremental_branch.rs @@ -297,9 +297,26 @@ impl BranchAcc { } } -/// Build the `LeafUpdateInput` for a touched leaf. The effective leaf target is -/// bumped so a touched leaf is never split into multiple new leaves — preserving -/// branch structure for CID stability (identical to the serial-loop logic). +/// A touched leaf keeps its "grow in place, don't split" behaviour (target +/// bumped to fit) only while its merged size stays within this multiple of the +/// leaf target; beyond it, the real target is used so `assemble_output_leaves` +/// splits the leaf into bounded, ~target-sized leaves. `2x` mirrors the config's +/// `leaf_max = 2 * leaf_target` convention and the greedy packing of the +/// full-build `LeafWriter`. Below the ceiling we avoid a split so a small commit +/// doesn't churn the branch (CID stability) every build. +const LEAF_SPLIT_CEILING_FACTOR: usize = 2; + +/// Build the `LeafUpdateInput` for a touched leaf. +/// +/// A merged leaf that stays within `LEAF_SPLIT_CEILING_FACTOR × leaf_target` is +/// kept whole (target bumped to fit) for branch/CID stability; a genuinely +/// oversized one uses the real `leaf_target_rows` so it splits into bounded +/// leaves. This applies uniformly to every touched leaf — middle, leftmost, and +/// rightmost — so no leaf grows without bound. Splits are gap-free: novelty is +/// routed to leaves by `first_key(next)` half-open intervals +/// (`slice_novelty_to_leaves`), and the leftmost/rightmost leaves keep their +/// −∞/+∞ coverage (leaf 0 takes everything below `leaves[1].first_key`; the last +/// leaf takes all remaining). fn make_leaf_update_input<'a>( leaf_bytes: &'a [u8], sidecar_bytes: Option<&'a [u8]>, @@ -308,9 +325,19 @@ fn make_leaf_update_input<'a>( config: &BranchUpdateConfig, ) -> io::Result> { let existing_header = decode_leaf_header_v3(leaf_bytes)?; - let effective_leaf_target_rows = (existing_header.total_rows as usize) - .saturating_add(nov_slice.len()) - .saturating_add(1); + let estimated_rows = (existing_header.total_rows as usize).saturating_add(nov_slice.len()); + let ceiling = config + .leaf_target_rows + .saturating_mul(LEAF_SPLIT_CEILING_FACTOR); + let leaf_target_rows = if estimated_rows > ceiling { + // Oversized → split into ~target-sized leaves via the existing machinery. + config.leaf_target_rows.max(1) + } else { + // Modest growth → keep as one leaf (bump target to fit) for CID stability. + config + .leaf_target_rows + .max(estimated_rows.saturating_add(1)) + }; Ok(LeafUpdateInput { leaf_bytes, novelty: nov_slice, @@ -319,7 +346,7 @@ fn make_leaf_update_input<'a>( g_id: config.g_id, zstd_level: config.zstd_level, leaflet_target_rows: config.leaflet_target_rows, - leaf_target_rows: config.leaf_target_rows.max(effective_leaf_target_rows), + leaf_target_rows, sidecar_bytes, }) } diff --git a/fluree-db-indexer/src/run_index/build/incremental_leaf.rs b/fluree-db-indexer/src/run_index/build/incremental_leaf.rs index 693ccbf18a..fd51eec07d 100644 --- a/fluree-db-indexer/src/run_index/build/incremental_leaf.rs +++ b/fluree-db-indexer/src/run_index/build/incremental_leaf.rs @@ -755,6 +755,72 @@ mod tests { assert_eq!(output.leaves[0].info.total_rows, 2); } + #[test] + fn test_update_splits_oversized_leaf_gap_free() { + // Base leaf: subjects 1..=200 (SPOT). Novelty appends 201..=400. + let base: Vec = (1..=200).map(|s| rec2(s, 1, s as i64, 1)).collect(); + let (leaf_bytes, sidecar) = build_test_leaf(&base, RunSortOrder::Spot); + let novelty: Vec = (201..=400).map(|s| rec2(s, 1, s as i64, 5)).collect(); + let ops = vec![1u8; novelty.len()]; + + // Small targets force the merged 400 rows to split into multiple leaves. + let input = LeafUpdateInput { + leaf_bytes: &leaf_bytes, + novelty: &novelty, + novelty_ops: &ops, + order: RunSortOrder::Spot, + g_id: 0, + zstd_level: 1, + leaflet_target_rows: 50, + leaf_target_rows: 100, + sidecar_bytes: sidecar.as_deref(), + }; + let output = update_leaf(&input).unwrap(); + + // Split occurred. + assert!( + output.leaves.len() >= 2, + "expected a split into multiple leaves, got {}", + output.leaves.len() + ); + // No rows lost across the split. + let total: u64 = output.leaves.iter().map(|l| l.info.total_rows).sum(); + assert_eq!(total, 400, "row count must be preserved across the split"); + // Decode each leaf's header for its real routing keys (LeafInfo's + // first_key/last_key are zeroed placeholders; the keys live in the blob). + use fluree_db_binary_index::format::leaf::decode_leaf_header_v3; + use fluree_db_binary_index::format::run_record_v2::read_ordered_key_v2; + let headers: Vec<_> = output + .leaves + .iter() + .map(|l| decode_leaf_header_v3(&l.info.leaf_bytes).unwrap()) + .collect(); + // Ordered + non-overlapping: last_key(i) < first_key(i+1) (ordered-key + // bytes). With the full row count preserved, this proves a gap-free + // tiling — so first_key(next) slicing routes every key to exactly one + // leaf (no gap between one leaf's last and the next leaf's first). + for w in headers.windows(2) { + assert!( + w[0].last_key < w[1].first_key, + "leaf boundaries must be strictly increasing and non-overlapping", + ); + } + // The split preserves the full span: first leaf starts at s_id 1, last + // leaf ends at s_id 400 (the leftmost/rightmost keep their −∞/+∞ reach). + assert_eq!( + read_ordered_key_v2(RunSortOrder::Spot, &headers[0].first_key) + .s_id + .as_u64(), + 1 + ); + assert_eq!( + read_ordered_key_v2(RunSortOrder::Spot, &headers.last().unwrap().last_key) + .s_id + .as_u64(), + 400 + ); + } + #[test] fn test_update_insert_new_fact() { let records = vec![rec2(1, 1, 10, 1), rec2(3, 1, 30, 1)];