diff --git a/.fluree-memory/repo.ttl b/.fluree-memory/repo.ttl index cf6a292ee..a9d0b0415 100644 --- a/.fluree-memory/repo.ttl +++ b/.fluree-memory/repo.ttl @@ -1620,6 +1620,22 @@ mem:fact-01ktyzzbs4ewxqpgvssvj72tt5 a mem:Fact ; mem:createdAt "2026-06-12T22:41:50.884413+00:00"^^xsd:dateTime ; mem:rationale "Continuation point for the decimal fix series; the remaining priority-4 cluster (canonical identity) is the prerequisite for issues #1324/#1325." . +mem:fact-01kwpj81fpnp0rk5s952wf3shd a mem:Fact ; + mem:content "FIXED (branch fix/filtered-delete-staging-hang): range_with_overlay point lookups paid a full overlay-translation tax per call (unbounded novelty walk + dict-translate + sort in binary_range_eq_v3) → per-flake lookup loops were O(calls × novelty log novelty); livelocked filtered-DELETE staging >900s. Two-layer fix: (1) staging list-meta hydration grouped per (g_id, s, p), FIRST-match dt-compatible meta per retraction — NOT distinct-per-duplicate: duplicate-value @list entries must lose exactly one entry per distinct WHERE binding, pinned by it_join_batched_overlay object-probe-list-retract; (2) cross-call LRU in binary_range.rs keyed (store_id, index_t, OverlayProvider::content_version, to_t, g, index) + overlay_window_for_range narrowing. content_version = globally-unique stamp refreshed on every Novelty mutation (per-instance epochs collide across divergent clones); wrapper overlays return None → uncached." ; + mem:tag "delete" ; + mem:tag "novelty" ; + mem:tag "overlay" ; + mem:tag "performance" ; + mem:tag "range-provider" ; + mem:tag "staging" ; + mem:tag "transact" ; + mem:scope mem:repo ; + mem:artifactRef "fluree-db-query/src/binary_range.rs" ; + mem:artifactRef "fluree-db-transact/src/stage.rs" ; + mem:branch "fix/filtered-delete-staging-hang" ; + mem:createdAt "2026-07-04T12:39:23.382379+00:00"^^xsd:dateTime ; + mem:rationale "range_with_overlay looks like a cheap point lookup at call sites but is O(novelty log novelty) per call; future per-flake lookup loops will reintroduce this livelock class." . + mem:decision-01kwphem9r3gvcvj2j5842ddt6 a mem:Decision ; mem:content "Fulltext positioning: f:fullTextDefaults (#config graph) is the RECOMMENDED path — values keep standard xsd:string/rdf:langString datatypes (external RDF consumers see ordinary literals) and language-tagged values get per-language analyzers. The @fulltext datatype REPLACES the stored datatype with Fluree-specific f:fullText, so docs/guidance must not push it as the default; it's a quick-start for siloed databases or properties orthogonal to the core data model. Docs repositioned 2026-07 (fulltext.md, indexing-and-search README, cookbook-search, concepts/datatypes)." ; mem:tag "datatypes" ; diff --git a/fluree-db-api/tests/it_transact_list_retract.rs b/fluree-db-api/tests/it_transact_list_retract.rs index 615c804fd..956b265aa 100644 --- a/fluree-db-api/tests/it_transact_list_retract.rs +++ b/fluree-db-api/tests/it_transact_list_retract.rs @@ -130,6 +130,260 @@ async fn wildcard_delete_retracts_all_distinct_list_entries() { ); } +/// Filtered two-pattern DELETE over subjects carrying `@list` properties, +/// on a novelty-heavy ledger (delete-everything then re-insert, no index +/// rebuild in between). +/// +/// Mirrors the field-reported staging livelock: `{?s tag } {?s ?p ?o} +/// DELETE {?s ?p ?o}` matching subjects with large `@list` vectors, where +/// all matched data lives in novelty. List-index hydration is grouped by +/// (graph, subject, predicate) — one range lookup per group — so this pins +/// that the grouped path still fills `m.i` on every retraction: every list +/// entry and scalar of the tagged subjects must be retracted, and untagged +/// subjects must be untouched. +#[tokio::test] +async fn filtered_delete_retracts_tagged_subjects_with_lists_in_novelty() { + let fluree = FlureeBuilder::memory().build_memory(); + let ledger0 = fluree + .create_ledger("tx/list-retract-filtered:main") + .await + .expect("create"); + + let list_ctx = json!({ + "ex": "http://example.org/", + "ex:vector": { "@container": "@list" } + }); + let make_docs = |tag: &str| { + let subjects: Vec = (0..4) + .map(|i| { + let vector: Vec = (0..32).map(|k| (i * 100 + k) as f64 * 0.5).collect(); + json!({ + "@id": format!("ex:chunk-{tag}-{i}"), + "ex:sourceDocument": { "@id": format!("ex:doc-{tag}") }, + "ex:label": format!("chunk {i} of {tag}"), + "ex:vector": vector + }) + }) + .collect(); + json!({ "@context": list_ctx, "@graph": subjects }) + }; + + // Build the novelty-heavy state: insert both docs' chunks, delete + // everything, then re-insert — all without an index rebuild, so every + // matched flake lives in the novelty overlay (assert + retract + assert). + let receipt = fluree + .insert(ledger0, &make_docs("a")) + .await + .expect("insert a"); + let receipt = fluree + .insert(receipt.ledger, &make_docs("b")) + .await + .expect("insert b"); + let receipt = fluree + .update( + receipt.ledger, + &json!({ + "where": { "@id": "?s", "?p": "?o" }, + "delete": { "@id": "?s", "?p": "?o" } + }), + ) + .await + .expect("delete everything"); + let receipt = fluree + .insert(receipt.ledger, &make_docs("a")) + .await + .expect("re-insert a"); + let receipt = fluree + .insert(receipt.ledger, &make_docs("b")) + .await + .expect("re-insert b"); + + let count_all = |ledger: fluree_db_api::LedgerState, tag: &'static str| { + let fluree = &fluree; + async move { + let sparql = format!( + "PREFIX ex: \ + SELECT (COUNT(*) AS ?c) WHERE {{ \ + ?s ex:sourceDocument ex:doc-{tag} . ?s ?p ?o }}" + ); + let result = support::query_sparql(fluree, &ledger, &sparql) + .await + .expect("sparql count"); + let jsonld = result + .to_jsonld_async(ledger.as_graph_db_ref(0)) + .await + .expect("to_jsonld_async"); + let arr = jsonld.as_array().expect("array result"); + arr.first() + .and_then(JsonValue::as_array) + .and_then(|row| row.first()) + .and_then(JsonValue::as_u64) + .unwrap_or(0) + } + }; + + // 4 subjects × (1 sourceDocument + 1 label + 32 list entries) per doc. + let per_doc_triples = 4 * (1 + 1 + 32); + assert_eq!( + count_all(receipt.ledger.clone(), "a").await, + per_doc_triples, + "precondition: doc-a chunks fully re-inserted into novelty" + ); + + // The reported shape: tag pattern + wildcard pattern, wildcard delete. + let out = fluree + .update( + receipt.ledger, + &json!({ + "@context": { "ex": "http://example.org/" }, + "where": [ + { "@id": "?s", "ex:sourceDocument": { "@id": "ex:doc-a" } }, + { "@id": "?s", "?p": "?o" } + ], + "delete": { "@id": "?s", "?p": "?o" } + }), + ) + .await + .expect("filtered delete"); + + assert_eq!( + count_all(out.ledger.clone(), "a").await, + 0, + "every triple of the tagged subjects must be retracted, including \ + all @list entries — survivors mean grouped hydration failed to \ + populate `m.i` on some retraction" + ); + assert_eq!( + count_all(out.ledger, "b").await, + per_doc_triples, + "untagged doc-b subjects must be untouched by the filtered delete" + ); +} + +/// Indexed variant of the filtered-delete case: a binary index is published +/// mid-history, so staging's list-meta hydration lookups route through the +/// V3 range provider (`binary_range_eq_v3`) and its cross-call overlay +/// translation cache, with the delete-everything + re-insert novelty stacked +/// on top of the persisted base. Pins that cached overlay translations are +/// (a) correct on repeated same-state lookups and (b) invalidated across the +/// intervening commits — a stale entry would surface pre-delete flakes or +/// miss re-inserted ones, breaking the counts below. +#[tokio::test] +async fn filtered_delete_with_lists_on_indexed_base_plus_novelty() { + let fluree = FlureeBuilder::memory().build_memory(); + let ledger_id = "tx/list-retract-indexed:main"; + let ledger0 = fluree.create_ledger(ledger_id).await.expect("create"); + + let list_ctx = json!({ + "ex": "http://example.org/", + "ex:vector": { "@container": "@list" } + }); + let make_docs = |tag: &str| { + let subjects: Vec = (0..4) + .map(|i| { + let vector: Vec = (0..32).map(|k| (i * 100 + k) as f64 * 0.5).collect(); + json!({ + "@id": format!("ex:chunk-{tag}-{i}"), + "ex:sourceDocument": { "@id": format!("ex:doc-{tag}") }, + "ex:label": format!("chunk {i} of {tag}"), + "ex:vector": vector + }) + }) + .collect(); + json!({ "@context": list_ctx, "@graph": subjects }) + }; + + // Base state: both docs inserted, then persisted into a binary index. + let receipt = fluree + .insert(ledger0, &make_docs("a")) + .await + .expect("insert a"); + fluree + .insert(receipt.ledger, &make_docs("b")) + .await + .expect("insert b"); + support::rebuild_and_publish_index(&fluree, ledger_id).await; + let indexed = fluree.ledger(ledger_id).await.expect("reload indexed"); + + // Novelty on top of the index: delete everything, re-insert both docs. + let receipt = fluree + .update( + indexed, + &json!({ + "where": { "@id": "?s", "?p": "?o" }, + "delete": { "@id": "?s", "?p": "?o" } + }), + ) + .await + .expect("delete everything"); + let receipt = fluree + .insert(receipt.ledger, &make_docs("a")) + .await + .expect("re-insert a"); + let receipt = fluree + .insert(receipt.ledger, &make_docs("b")) + .await + .expect("re-insert b"); + + let count_all = |ledger: fluree_db_api::LedgerState, tag: &'static str| { + let fluree = &fluree; + async move { + let sparql = format!( + "PREFIX ex: \ + SELECT (COUNT(*) AS ?c) WHERE {{ \ + ?s ex:sourceDocument ex:doc-{tag} . ?s ?p ?o }}" + ); + let result = support::query_sparql(fluree, &ledger, &sparql) + .await + .expect("sparql count"); + let jsonld = result + .to_jsonld_async(ledger.as_graph_db_ref(0)) + .await + .expect("to_jsonld_async"); + let arr = jsonld.as_array().expect("array result"); + arr.first() + .and_then(JsonValue::as_array) + .and_then(|row| row.first()) + .and_then(JsonValue::as_u64) + .unwrap_or(0) + } + }; + + let per_doc_triples = 4 * (1 + 1 + 32); + assert_eq!( + count_all(receipt.ledger.clone(), "a").await, + per_doc_triples, + "precondition: doc-a re-inserted into novelty over the indexed base" + ); + + let out = fluree + .update( + receipt.ledger, + &json!({ + "@context": { "ex": "http://example.org/" }, + "where": [ + { "@id": "?s", "ex:sourceDocument": { "@id": "ex:doc-a" } }, + { "@id": "?s", "?p": "?o" } + ], + "delete": { "@id": "?s", "?p": "?o" } + }), + ) + .await + .expect("filtered delete"); + + assert_eq!( + count_all(out.ledger.clone(), "a").await, + 0, + "every triple of the tagged subjects must be retracted through the \ + indexed range-provider path, including all @list entries" + ); + assert_eq!( + count_all(out.ledger, "b").await, + per_doc_triples, + "untagged doc-b subjects must be untouched" + ); +} + /// Companion to the three-entry case: retracting a single-entry `@list` /// where the asserted flake has `m.i = 0`. Pins the hydration behavior /// for the simplest case. diff --git a/fluree-db-core/src/overlay.rs b/fluree-db-core/src/overlay.rs index ff6389e2b..b3f4b6b9e 100644 --- a/fluree-db-core/src/overlay.rs +++ b/fluree-db-core/src/overlay.rs @@ -81,6 +81,20 @@ pub trait OverlayProvider: Send + Sync { false } + /// Globally-unique version stamp of this overlay's current content, for + /// keying caches of data derived from a full overlay walk (e.g. V3 + /// overlay-op translations shared across `range_with_overlay` calls). + /// + /// Unlike [`Self::epoch`] — which is only unique within one overlay + /// instance's lineage — implementations must guarantee that **no two + /// overlays whose `for_each_overlay_flake` output differs ever report + /// the same version**, across instances, clones, and overlay types. + /// Return `None` (the default) when no such guarantee exists; callers + /// must then skip caching and derive from a fresh walk. + fn content_version(&self) -> Option { + None + } + /// Push overlay flakes for a leaf's range to the callback /// /// # Arguments diff --git a/fluree-db-core/src/range.rs b/fluree-db-core/src/range.rs index dc776df73..f7f463e6d 100644 --- a/fluree-db-core/src/range.rs +++ b/fluree-db-core/src/range.rs @@ -203,6 +203,15 @@ impl OverlayProvider for SizedOverlayRef<'_, O> { fn epoch(&self) -> u64 { self.0.epoch() } + + fn is_effectively_empty(&self) -> bool { + self.0.is_effectively_empty() + } + + fn content_version(&self) -> Option { + self.0.content_version() + } + fn for_each_overlay_flake( &self, g_id: GraphId, diff --git a/fluree-db-novelty/src/lib.rs b/fluree-db-novelty/src/lib.rs index d5174db7d..deb508821 100644 --- a/fluree-db-novelty/src/lib.rs +++ b/fluree-db-novelty/src/lib.rs @@ -477,6 +477,14 @@ pub struct Novelty { /// Epoch for cache invalidation - bumped once per commit pub epoch: u64, + /// Globally-unique content-version stamp (see + /// [`OverlayProvider::content_version`]): refreshed from a process-wide + /// counter on every content mutation, so no two novelty states with + /// different flake content ever share a version — across instances, + /// clones, and ledgers. Clones share the version until one mutates, + /// which is exactly right: identical content, identical key. `0` means + /// "empty since construction" (all empty overlays are equivalent). + content_version: u64, /// Epoch for the RDFS schema-hierarchy cache — bumped only when a commit /// asserts or retracts `rdfs:subClassOf` / `rdfs:subPropertyOf`, so the /// shared hierarchy cache stays current without any work on the (vastly @@ -501,6 +509,10 @@ pub struct Novelty { fact_state: NoveltyFactState, } +/// Process-wide counter backing [`Novelty::content_version`]. Starts at 1 so +/// `0` uniquely means "empty since construction". +static NEXT_CONTENT_VERSION: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1); + impl Novelty { /// Create a new empty novelty overlay pub fn new(t: i64) -> Self { @@ -510,6 +522,7 @@ impl Novelty { flake_count: 0, t, epoch: 0, + content_version: 0, schema_epoch: 0, shacl_epoch: 0, attachments: AttachmentNovelty::new(), @@ -530,6 +543,15 @@ impl Novelty { } } + /// Stamp a fresh globally-unique content version. Pair with every + /// `epoch += 1`: `epoch` drives lineage-local cache invalidation, while + /// the content version keys cross-instance caches (see + /// [`OverlayProvider::content_version`]). + fn refresh_content_version(&mut self) { + self.content_version = + NEXT_CONTENT_VERSION.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + /// Append a freshly built segment to a graph, growing the graphs vec. fn push_segment(&mut self, g_id: GraphId, seg: Segment) { let idx = g_id as usize; @@ -659,6 +681,7 @@ impl Novelty { if merges > 0 { self.recompute_totals(); self.epoch += 1; + self.refresh_content_version(); } merges } @@ -815,6 +838,7 @@ impl Novelty { // From here on every step is infallible. self.t = self.t.max(commit_t); self.epoch += 1; // Bump epoch once per commit + self.refresh_content_version(); // RDF set semantics: skip assertion flakes whose fact (s, p, o, dt, m) is // already **currently asserted** in this graph's novelty window. This @@ -951,6 +975,7 @@ impl Novelty { if per_graph.is_empty() { self.t = max_t; self.epoch += 1; + self.refresh_content_version(); return Ok(()); } @@ -1052,6 +1077,7 @@ impl Novelty { self.t = max_t; self.epoch += 1; + self.refresh_content_version(); self.recompute_totals(); tracing::debug!( @@ -1126,6 +1152,7 @@ impl Novelty { self.fact_state = fs; self.epoch += 1; + self.refresh_content_version(); } /// Comparator-ordered k-way merge over one graph's segments for `index` and @@ -1381,6 +1408,10 @@ impl OverlayProvider for Novelty { self.is_empty() } + fn content_version(&self) -> Option { + Some(self.content_version) + } + fn for_each_overlay_flake( &self, g_id: GraphId, @@ -1648,6 +1679,44 @@ mod tests { assert!(novelty.size > 0); } + #[test] + fn content_version_is_globally_unique_across_divergent_clones() { + let mut a = Novelty::new(0); + assert_eq!( + OverlayProvider::content_version(&a), + Some(0), + "empty-since-construction novelty reports version 0" + ); + + let rg = no_graphs(); + a.apply_commit(vec![make_flake(1, 1, 100, 1, true)], 1, &rg) + .unwrap(); + let v_a1 = OverlayProvider::content_version(&a).unwrap(); + assert_ne!(v_a1, 0, "mutation stamps a fresh version"); + + // A clone shares the version while content is identical. + let mut b = a.clone(); + assert_eq!(OverlayProvider::content_version(&b), Some(v_a1)); + + // Divergent mutations from the same base must never share a version + // (per-instance `epoch` DOES collide here: both go 1 → 2). + a.apply_commit(vec![make_flake(2, 1, 200, 2, true)], 2, &rg) + .unwrap(); + b.apply_commit(vec![make_flake(3, 1, 300, 2, true)], 2, &rg) + .unwrap(); + assert_eq!(a.epoch, b.epoch, "epochs collide across divergent clones"); + let v_a2 = OverlayProvider::content_version(&a).unwrap(); + let v_b2 = OverlayProvider::content_version(&b).unwrap(); + assert_ne!(v_a2, v_b2, "content versions must not collide"); + assert_ne!(v_a2, v_a1); + assert_ne!(v_b2, v_a1); + + // clear_up_to changes content, so it must also refresh the version. + let before_clear = OverlayProvider::content_version(&a).unwrap(); + a.clear_up_to(1); + assert_ne!(OverlayProvider::content_version(&a).unwrap(), before_clear); + } + #[test] fn test_apply_commit_multiple() { let mut novelty = Novelty::new(0); diff --git a/fluree-db-query/src/binary_range.rs b/fluree-db-query/src/binary_range.rs index bfe2d1400..003080583 100644 --- a/fluree-db-query/src/binary_range.rs +++ b/fluree-db-query/src/binary_range.rs @@ -136,6 +136,123 @@ fn translate_overlay_ops_v3_with_raw( } } +/// Identity of a cached range-provider overlay translation. +/// +/// Every component that can change the translated product is included: +/// `store_id` is process-unique per `BinaryIndexStore` instance (covering +/// ledger identity and same-`index_t` store rebuilds that re-rank dict ids), +/// `index_t` covers in-place incremental index advances, `content_version` +/// is the overlay's globally-unique content stamp (see +/// [`OverlayProvider::content_version`] — overlays that cannot vouch for one +/// are never cached), and `to_t` bounds which overlay flakes the walk emits. +#[derive(Clone, PartialEq, Eq, Hash)] +struct RangeTranslationKey { + store_id: u64, + index_t: i64, + content_version: u64, + to_t: i64, + g_id: GraphId, + index: IndexType, +} + +/// Cached product of an **unfiltered** `translate_overlay_ops_v3_with_raw` +/// call: ops sorted by the key's index order with lifecycles resolved, plus +/// the raw untranslatable flakes (retracts intact — the raw-merge fallback +/// in `binary_range_eq_v3` needs them to cancel base facts) and the +/// ephemeral predicate mapping for decode. +struct CachedRangeTranslation { + ops: Arc<[fluree_db_binary_index::OverlayOp]>, + raw: Arc<[Flake]>, + ephemeral_p_id_to_sid: Arc>, +} + +/// Cross-call LRU of range-provider overlay translations. +/// +/// `range_with_overlay` looks like a cheap point lookup at call sites, but +/// each call re-walked the graph's entire novelty, re-translated every flake +/// (dict probes), and re-sorted the op set — so per-flake lookup loops +/// (staging list-meta hydration, policy class lookups, upsert deletions, +/// annotation cascades) cost O(calls × novelty log novelty) on +/// novelty-heavy ledgers. Entries are large (~50 B/op), so capacity stays +/// small: one or two hot database states × a few index orders. +type RangeTranslationLru = lru::LruCache>; + +fn range_translation_cache() -> &'static std::sync::Mutex { + use once_cell::sync::Lazy; + static CACHE: Lazy> = Lazy::new(|| { + std::sync::Mutex::new(lru::LruCache::new( + std::num::NonZeroUsize::new(8).expect("capacity must be > 0"), + )) + }); + &CACHE +} + +/// Translate the full (unfiltered) overlay for `(g_id, index, to_t)`, served +/// from [`range_translation_cache`] when the overlay reports a +/// [`content_version`](OverlayProvider::content_version). Returns `None` +/// when the overlay opts out of caching — callers fall back to a fresh +/// per-call translation, preserving pre-cache behavior. +#[allow(clippy::too_many_arguments)] +fn cached_overlay_translation( + overlay: &dyn OverlayProvider, + g_id: GraphId, + index: IndexType, + effective_to_t: i64, + store: &Arc, + dict_novelty: &Arc, + runtime_small_dicts: &Arc, + warn_ctx: &'static str, +) -> Option> { + let content_version = overlay.content_version()?; + let key = RangeTranslationKey { + store_id: store.store_id(), + index_t: store.max_t(), + content_version, + to_t: effective_to_t, + g_id, + index, + }; + + if let Some(hit) = range_translation_cache() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .get(&key) + { + return Some(Arc::clone(hit)); + } + + let OverlayTranslateV3Result { + mut ops, + raw, + ephemeral_p_id_to_sid, + failed: _, + } = translate_overlay_ops_v3_with_raw( + overlay, + g_id, + index, + effective_to_t, + store, + dict_novelty, + runtime_small_dicts, + |_| true, + warn_ctx, + ); + let order = index_type_to_sort_order(index); + fluree_db_binary_index::read::types::sort_overlay_ops(&mut ops, order); + fluree_db_binary_index::read::types::resolve_overlay_ops(&mut ops); + + let entry = Arc::new(CachedRangeTranslation { + ops: ops.into(), + raw: raw.into(), + ephemeral_p_id_to_sid: Arc::new(ephemeral_p_id_to_sid), + }); + range_translation_cache() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .put(key, Arc::clone(&entry)); + Some(entry) +} + /// Try persisted lookup first, then DictNovelty. Returns `None` if neither resolves. fn resolve_or_novelty( persisted: Option, @@ -422,6 +539,7 @@ fn binary_range_eq_v3( || filter.o_type.is_some() || filter.o_key.is_some(); + let mut range_keys: Option<(RunRecordV2, RunRecordV2)> = None; let mut cursor = if use_range { let min_key = RunRecordV2 { s_id: SubjectId(filter.s_id.unwrap_or(0)), @@ -441,7 +559,7 @@ fn binary_range_eq_v3( o_type: filter.o_type.unwrap_or(u16::MAX), g_id, }; - BinaryCursor::new( + let cursor = BinaryCursor::new( Arc::clone(store), order, branch, @@ -449,7 +567,9 @@ fn binary_range_eq_v3( &max_key, filter, projection, - ) + ); + range_keys = Some((min_key, max_key)); + cursor } else { BinaryCursor::scan_all(Arc::clone(store), order, branch, filter, projection) }; @@ -462,36 +582,86 @@ fn binary_range_eq_v3( let effective_to_t = opts.to_t.unwrap_or_else(|| store.max_t()); cursor.set_to_t(effective_to_t); - // Overlay translation. When the caller supplied a projection-predicate - // allow-list, filter overlay flakes by `flake.p` before translation — - // this both skips work on discarded overlay rows and ensures the - // untranslated raw-fallback set doesn't smuggle non-selected predicates - // back in. Sid match (vs persisted p_id) so novel predicates still pass. - let predicate_filter_sids = opts.predicate_filter.clone(); - let OverlayTranslateV3Result { - mut ops, - raw: untranslated, - ephemeral_p_id_to_sid, - failed: _overlay_failed_translation, - } = translate_overlay_ops_v3_with_raw( - overlay, - g_id, - index, - effective_to_t, - store, - dict_novelty, - runtime_small_dicts, - move |flake| match &predicate_filter_sids { - Some(allow) => allow.iter().any(|p| p == &flake.p), - None => true, - }, - "V3 range", - ); + // Overlay translation. Unfiltered translations are served from the + // cross-call LRU when the overlay reports a content version (raw + // `Novelty` does) — see `range_translation_cache` for why fresh per-call + // translation makes point-lookup loops quadratic in novelty size. + // + // When the caller supplied a projection-predicate allow-list, translate + // fresh with the `flake.p` filter as before — the allow-list changes + // both the translated set and the raw-fallback set (it must not smuggle + // non-selected predicates back in), so that product is not cacheable + // under the unfiltered key. Sid match (vs persisted p_id) so novel + // predicates still pass. + let cached = if opts.predicate_filter.is_none() { + cached_overlay_translation( + overlay, + g_id, + index, + effective_to_t, + store, + dict_novelty, + runtime_small_dicts, + "V3 range", + ) + } else { + None + }; + let (overlay_ops, untranslated, ephemeral_p_id_to_sid) = match cached { + Some(entry) => ( + Arc::clone(&entry.ops), + entry.raw.to_vec(), + Arc::clone(&entry.ephemeral_p_id_to_sid), + ), + None => { + let predicate_filter_sids = opts.predicate_filter.clone(); + let OverlayTranslateV3Result { + mut ops, + raw, + ephemeral_p_id_to_sid, + failed: _overlay_failed_translation, + } = translate_overlay_ops_v3_with_raw( + overlay, + g_id, + index, + effective_to_t, + store, + dict_novelty, + runtime_small_dicts, + move |flake| match &predicate_filter_sids { + Some(allow) => allow.iter().any(|p| p == &flake.p), + None => true, + }, + "V3 range", + ); + fluree_db_binary_index::read::types::sort_overlay_ops(&mut ops, order); + fluree_db_binary_index::read::types::resolve_overlay_ops(&mut ops); + ( + Arc::<[fluree_db_binary_index::OverlayOp]>::from(ops), + raw, + Arc::new(ephemeral_p_id_to_sid), + ) + } + }; - if !ops.is_empty() { - fluree_db_binary_index::read::types::sort_overlay_ops(&mut ops, order); - fluree_db_binary_index::read::types::resolve_overlay_ops(&mut ops); - cursor.set_overlay_ops(ops.into()); + if !overlay_ops.is_empty() { + // Range-bounded cursors get only the ops window intersecting + // [min_key, max_key]: out-of-range ops can never match the filter, + // and carrying them costs an O(overlay) merge walk per call while + // defeating leaflet pre-skips (same pattern as + // `BinaryScanOperator::open`). + let (start, end) = match &range_keys { + Some((min_key, max_key)) => fluree_db_binary_index::overlay_window_for_range( + &overlay_ops, + min_key, + max_key, + order, + ), + None => (0, overlay_ops.len()), + }; + if start < end { + cursor.set_overlay_ops_window(overlay_ops, start, end); + } } // Extend the row-loop allow-set with ephemeral p_ids whose mapped Sid is @@ -504,7 +674,7 @@ fn binary_range_eq_v3( opts.predicate_filter.as_deref(), predicate_filter_p_ids.as_mut(), ) { - for (eph_p_id, sid) in &ephemeral_p_id_to_sid { + for (eph_p_id, sid) in ephemeral_p_id_to_sid.iter() { if allow_sids.iter().any(|s| s == sid) { allow_ids.push(*eph_p_id); } diff --git a/fluree-db-transact/src/stage.rs b/fluree-db-transact/src/stage.rs index ad0b3ad64..7f98312d6 100644 --- a/fluree-db-transact/src/stage.rs +++ b/fluree-db-transact/src/stage.rs @@ -1102,24 +1102,34 @@ async fn hydrate_list_index_meta_for_retractions( retractions: &mut [Flake], reverse_graph: &HashMap, ) -> Result<()> { - for flake in retractions.iter_mut() { + use std::collections::BTreeMap; + + // Group candidates by (graph, subject, predicate): one range lookup per + // group, not one per retraction. Every `range_with_overlay` call pays a + // full overlay translation of the graph's novelty (walk + translate + + // sort), so per-flake lookups make filtered-DELETE staging + // O(matched_triples × novelty log novelty) — observed as a >900s livelock + // for ~21k matched triples on a novelty-heavy ledger. Grouped, the cost + // scales with distinct (subject, predicate) pairs instead. + let mut groups: HashMap<(GraphId, Sid, Sid), Vec> = HashMap::new(); + for (idx, flake) in retractions.iter().enumerate() { // Only retractions with no metadata are candidates. - if flake.op { - continue; - } - if flake.m.is_some() { + if flake.op || flake.m.is_some() { continue; } - - // Resolve the correct graph for this retraction flake. let g_id = resolve_flake_graph_id(flake, reverse_graph)?; + groups + .entry((g_id, flake.s.clone(), flake.p.clone())) + .or_default() + .push(idx); + } - // Find currently asserted matching flakes (db + novelty overlay) and copy list index meta if present. + for ((g_id, s, p), members) in groups { + // Find currently asserted flakes for this (subject, predicate) + // (db + novelty overlay) and copy list index meta where present. let rm = fluree_db_core::RangeMatch::new() - .with_subject(flake.s.clone()) - .with_predicate(flake.p.clone()) - .with_object(flake.o.clone()) - .with_datatype(flake.dt.clone()); + .with_subject(s) + .with_predicate(p); let found = fluree_db_core::range_with_overlay( &ledger.snapshot, @@ -1132,11 +1142,36 @@ async fn hydrate_list_index_meta_for_retractions( ) .await?; - if let Some(existing) = found - .into_iter() - .find(|f| f.op && f.m.as_ref().and_then(|m| m.i).is_some()) - { - flake.m = existing.m; + // Index asserted list-carrying metas per object value, in index + // order. Every matching retraction copies the FIRST dt-compatible + // meta — mirroring the per-flake lookup's `.find()` this replaces: + // identical duplicates then collapse in the accumulator, so a value + // asserted at N list positions loses exactly one entry per distinct + // WHERE binding (pinned by the `object-probe-list-retract` case in + // `it_join_batched_overlay.rs`). + let mut metas: BTreeMap> = + BTreeMap::new(); + for f in found { + if f.op { + if let Some(m) = f.m.filter(|m| m.i.is_some()) { + metas.entry(f.o).or_default().push((f.dt, m)); + } + } + } + if metas.is_empty() { + continue; + } + + for idx in members { + let flake = &mut retractions[idx]; + if let Some(candidates) = metas.get(&flake.o) { + if let Some((_, m)) = candidates + .iter() + .find(|(dt, _)| fluree_db_core::dt_compatible(&flake.dt, dt)) + { + flake.m = Some(m.clone()); + } + } } }