Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
376 changes: 376 additions & 0 deletions fluree-db-api/examples/reindex_swap_read_profile.rs

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions fluree-db-api/src/indexer_attachment_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,29 @@ impl std::fmt::Debug for ApiAttachmentEventsProvider {
}
}

/// Resolves the process-shared read cache from the api's `LedgerManager` once
/// it's constructed — the manager owns the `LeafletCache` the query server
/// reads from, so the background indexer can warm-on-write into that exact
/// cache. Yields `None` until the manager cell is filled (and always for a
/// separate-machine indexer, which has no local manager).
pub(crate) struct LedgerManagerWarmCache {
pub(crate) manager: LedgerManagerCell,
}

impl std::fmt::Debug for LedgerManagerWarmCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LedgerManagerWarmCache")
.field("bound", &self.manager.get().is_some())
.finish()
}
}

impl fluree_db_indexer::WarmCacheSource for LedgerManagerWarmCache {
fn warm_cache(&self) -> Option<Arc<fluree_db_binary_index::LeafletCache>> {
self.manager.get().and_then(|m| m.leaflet_cache().cloned())
}
}

#[async_trait]
impl AttachmentEventsProvider for ApiAttachmentEventsProvider {
async fn attachment_events(&self, ledger_id: &str) -> Option<AttachmentEventCoverage> {
Expand Down
11 changes: 10 additions & 1 deletion fluree-db-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2552,11 +2552,20 @@ impl FlureeBuilder {
},
)
as Arc<dyn fluree_db_indexer::AttachmentEventsProvider>;
// Warm-on-write (co-located only): let the background build seed the
// query server's shared read cache with the leaflets it just wrote.
// Resolved late from the same LedgerManager cell used above, so the
// worker warms the exact cache readers use.
let warm_cache_source =
Arc::new(crate::indexer_attachment_provider::LedgerManagerWarmCache {
manager: Arc::clone(attachment_provider_cell),
}) as Arc<dyn fluree_db_indexer::WarmCacheSource>;
let indexer_config = idx_config
.indexer_config
.clone()
.with_fulltext_config_provider(provider)
.with_attachment_events_provider(ann_provider);
.with_attachment_events_provider(ann_provider)
.with_warm_cache_source(warm_cache_source);
// BackgroundIndexerWorker takes an
// `Arc<dyn IndexingNameService>` — the combined lookup
// + index-publish surface. `ReadWriteNameService`
Expand Down
47 changes: 44 additions & 3 deletions fluree-db-binary-index/src/read/binary_cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ impl BinaryCursor {
let batch = if self.filter.is_empty() || batch.is_empty() {
batch
} else {
filter_batch(&self.filter, &batch)
filter_batch(&self.filter, &batch, self.order)
};

// Apply overlay merge if we have overlay ops.
Expand Down Expand Up @@ -798,9 +798,16 @@ fn push_overlay_row(

/// Apply the filter to a batch, returning only matching rows.
/// Returns the batch unchanged if all rows match (avoids copy).
fn filter_batch(filter: &BinaryFilter, batch: &ColumnBatch) -> ColumnBatch {
fn filter_batch(filter: &BinaryFilter, batch: &ColumnBatch, order: RunSortOrder) -> ColumnBatch {
// Within-leaflet seek: leaflet rows are sorted by the order's key, so when
// the leading sort column is bound to a single value we binary-search its
// contiguous row range instead of scanning every row. This is the common
// bound-subject SPOT point read — a large leaflet with ~5 target rows. Rows
// outside the range have a different leading value, so they can't match a
// filter that pins it — the output is identical to a full scan.
let (start, end) = leading_bound_range(filter, batch, order);
let mut matching: Vec<usize> = Vec::new();
for i in 0..batch.row_count {
for i in start..end {
let s_id = batch.s_id.get(i); // always present
let o_key = batch.o_key.get(i); // always present
let p_id = batch.p_id.get_or(i, 0);
Expand All @@ -811,13 +818,47 @@ fn filter_batch(filter: &BinaryFilter, batch: &ColumnBatch) -> ColumnBatch {
}
}

// Fast path only when nothing was skipped and everything matched.
if matching.len() == batch.row_count {
return batch.clone();
}

gather_batch(batch, &matching)
}

/// Binary-search the contiguous `[start, end)` row range whose leading sort
/// column equals its bound filter value. Returns the full `[0, row_count)`
/// range when the leading column is unbound or not a materialized (sorted)
/// block — so callers fall back to a full scan, never miss rows.
fn leading_bound_range(
filter: &BinaryFilter,
batch: &ColumnBatch,
order: RunSortOrder,
) -> (usize, usize) {
match order {
RunSortOrder::Spot => sorted_block_range(&batch.s_id, filter.s_id, batch.row_count),
RunSortOrder::Post | RunSortOrder::Psot => {
sorted_block_range(&batch.p_id, filter.p_id, batch.row_count)
}
RunSortOrder::Opst => sorted_block_range(&batch.o_type, filter.o_type, batch.row_count),
}
}

fn sorted_block_range<T: Copy + Ord>(
col: &ColumnData<T>,
bound: Option<T>,
row_count: usize,
) -> (usize, usize) {
match (bound, col) {
(Some(v), ColumnData::Block(arr)) => {
let start = arr.partition_point(|&x| x < v);
let end = arr.partition_point(|&x| x <= v);
(start, end)
}
_ => (0, row_count),
}
}

/// Gather rows at the given indices from a batch into a new batch.
fn gather_batch(src: &ColumnBatch, indices: &[usize]) -> ColumnBatch {
ColumnBatch {
Expand Down
67 changes: 49 additions & 18 deletions fluree-db-binary-index/src/read/binary_index_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,28 +652,19 @@ impl BinaryIndexStore {

let leaf_id = xxhash_rust::xxh3::xxh3_128(leaf_cid.to_bytes().as_ref());

// Fast path 1: local filesystem — full read is optimal (OS page cache).
// Fast path 1: local filesystem — mmap so the raw bytes stay in OS page
// cache (only touched pages fault in) with the directory served from the
// shared cache. Avoids copying the whole (possibly grown) leaf blob and
// re-decoding its directory on every read.
if let Some(local_path) = cs.resolve_local_path(leaf_cid) {
let bytes = std::fs::read(local_path)?;
let sidecar = if need_replay {
self.fetch_sidecar_bytes_sync(sidecar_cid)?
} else {
None
};
return Ok(Box::new(FullBlobLeafHandle::new(bytes, sidecar, leaf_id)?));
return self.open_mmapped_leaf(&local_path, leaf_id, sidecar_cid, need_replay);
}

// Fast path 2: locally cached — full read from disk cache.
// Fast path 2: locally cached (remote-promoted) — same mmap path; a
// missing cache file falls through to the range-read paths below.
let cache_path = self.cache_dir.join(leaf_cid.to_string());
match std::fs::read(&cache_path) {
Ok(bytes) => {
let sidecar = if need_replay {
self.fetch_sidecar_bytes_sync(sidecar_cid)?
} else {
None
};
return Ok(Box::new(FullBlobLeafHandle::new(bytes, sidecar, leaf_id)?));
}
match self.open_mmapped_leaf(&cache_path, leaf_id, sidecar_cid, need_replay) {
Ok(handle) => return Ok(handle),
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
Err(err) => return Err(err),
}
Expand Down Expand Up @@ -758,6 +749,46 @@ impl BinaryIndexStore {
/// Decoded directories are cached in the shared [`LeafletCache`] keyed by
/// leaf CID — content-addressed, so entries never go stale.
///
/// Open a local (or locally-cached) leaf via mmap, with the decoded
/// directory served from the shared `LeafletCache`. The raw bytes stay in OS
/// page cache (only touched pages fault in) — no whole-blob copy, and the
/// directory is parsed once per leaf CID. See [`super::leaf_access::MmapLeafHandle`].
fn open_mmapped_leaf(
&self,
path: &Path,
leaf_id: u128,
sidecar_cid: Option<&ContentId>,
need_replay: bool,
) -> io::Result<Box<dyn super::leaf_access::LeafHandle>> {
let file = std::fs::File::open(path)?;
// SAFETY: leaf files are immutable, content-addressed CAS artifacts —
// never modified after write, so the mapping's bytes are stable.
let mmap = unsafe { memmap2::Mmap::map(&file)? };
// Content-addressed directory: parse once per leaf CID, reused across
// opens. `leaf_id` == xxh3_128(leaf_cid) is the same key `open_leaf_dir`
// and warm-on-write use.
let dir = if let Some(cache) = &self.leaflet_cache {
cache.try_get_or_load_leaf_dir(leaf_id, || {
let header = crate::format::leaf::decode_leaf_header_v3(&mmap[..])?;
crate::format::leaf::decode_leaf_dir_v3_with_base(&mmap[..], &header).map(Arc::new)
})?
} else {
let header = crate::format::leaf::decode_leaf_header_v3(&mmap[..])?;
Arc::new(crate::format::leaf::decode_leaf_dir_v3_with_base(
&mmap[..],
&header,
)?)
};
let sidecar = if need_replay {
self.fetch_sidecar_bytes_sync(sidecar_cid)?
} else {
None
};
Ok(Box::new(super::leaf_access::MmapLeafHandle::new(
mmap, dir, sidecar, leaf_id,
)))
}

/// Callers that may need `load_columns` must use [`Self::open_leaf_handle`].
pub fn open_leaf_dir(&self, leaf_cid: &ContentId) -> io::Result<Arc<DecodedLeafDirV3>> {
let key = xxhash_rust::xxh3::xxh3_128(leaf_cid.to_bytes().as_ref());
Expand Down
30 changes: 30 additions & 0 deletions fluree-db-binary-index/src/read/column_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,36 @@ impl ColumnBatch {
+ self.o_i.byte_size()
+ self.t.byte_size()
}

/// Project a (super)batch down to `requested`, mirroring what a fresh narrow
/// decode of `requested` would yield: requested columns are shared by cheap
/// `Arc` clone, unrequested columns become `AbsentDefault` so downstream
/// `filter_batch`/`gather_batch` never carry or copy them (e.g. an unwanted
/// `t`). Used by the leaflet cache's superset fallback so a warm-on-write
/// `ColumnSet::ALL` batch can serve narrower reads.
///
/// Sound only when `self` already covers `requested` (`cached ⊇ requested`);
/// the cache guarantees this by projecting only an `ALL` batch.
pub fn project_to(&self, requested: ColumnSet) -> ColumnBatch {
macro_rules! keep {
($col:expr, $field:ident) => {
if requested.contains($col) {
self.$field.clone()
} else {
ColumnData::AbsentDefault
}
};
}
ColumnBatch {
row_count: self.row_count,
s_id: keep!(ColumnId::SId, s_id),
o_key: keep!(ColumnId::OKey, o_key),
p_id: keep!(ColumnId::PId, p_id),
o_type: keep!(ColumnId::OType, o_type),
o_i: keep!(ColumnId::OI, o_i),
t: keep!(ColumnId::T, t),
}
}
}

// ============================================================================
Expand Down
93 changes: 93 additions & 0 deletions fluree-db-binary-index/src/read/leaf_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,99 @@ impl LeafHandle for FullBlobLeafHandle {
}
}

// ============================================================================
// MmapLeafHandle
// ============================================================================

/// Leaf handle backed by an mmap of the local leaf file, with the decoded
/// directory shared from the [`LeafletCache`](super::leaflet_cache::LeafletCache).
///
/// The local read fast path used to `std::fs::read` the entire leaf blob into a
/// fresh heap buffer on every open and re-decode the directory each time. For a
/// leaf that grows across incremental builds, that whole-blob copy + full
/// directory decode is re-paid per point read. This handle instead:
/// - maps the (immutable, content-addressed) leaf file, so the raw bytes stay in
/// OS page cache and only the pages actually touched (header, directory, and
/// the one scanned leaflet's columns) fault in — no whole-blob copy; and
/// - takes the decoded directory as an `Arc` from the shared cache, so the
/// directory is parsed once per leaf CID, not per open.
///
/// Column data itself is still materialized once, per leaflet, via the V3Batch
/// cache — this handle only supplies the bytes for a cold decode. Raw leaf bytes
/// are never copied into the cache budget.
pub struct MmapLeafHandle {
mmap: memmap2::Mmap,
dir: Arc<DecodedLeafDirV3>,
sidecar: Option<Vec<u8>>,
leaf_id: u128,
}

impl MmapLeafHandle {
pub fn new(
mmap: memmap2::Mmap,
dir: Arc<DecodedLeafDirV3>,
sidecar: Option<Vec<u8>>,
leaf_id: u128,
) -> Self {
Self {
mmap,
dir,
sidecar,
leaf_id,
}
}
}

impl LeafHandle for MmapLeafHandle {
fn dir(&self) -> &DecodedLeafDirV3 {
&self.dir
}

fn load_columns(
&self,
leaflet_idx: usize,
projection: &ColumnProjection,
order: RunSortOrder,
) -> io::Result<ColumnBatch> {
let entry = &self.dir.entries[leaflet_idx];
load_leaflet_columns(
&self.mmap[..],
entry,
self.dir.payload_base,
projection,
order,
)
}

fn load_sidecar_segment(&self, leaflet_idx: usize) -> io::Result<Vec<HistEntryV2>> {
let entry = &self.dir.entries[leaflet_idx];
if entry.history_len == 0 {
return Ok(Vec::new());
}
let sc_bytes = self.sidecar.as_deref().ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotFound,
"sidecar bytes required for history replay but not available",
)
})?;
let seg = HistorySegmentRef {
offset: entry.history_offset,
len: entry.history_len,
min_t: entry.history_min_t,
max_t: entry.history_max_t,
};
decode_history_segment(sc_bytes, &seg)
}

fn sidecar_bytes(&self) -> Option<&[u8]> {
self.sidecar.as_deref()
}

fn leaf_id(&self) -> u128 {
self.leaf_id
}
}

// ============================================================================
// RangeReadFetcher trait
// ============================================================================
Expand Down
Loading
Loading