From 39c3b7dbe46b976c0af6316f76390a0bf88e0744 Mon Sep 17 00:00:00 2001 From: Benjamin Lamothe Date: Thu, 2 Jul 2026 19:32:29 -0400 Subject: [PATCH 1/6] spawning the cache listener should not depend on indexing mode --- fluree-db-api/src/lib.rs | 101 ++++++++++----- fluree-db-api/src/query/builder.rs | 12 +- .../src/query/nameservice_builder.rs | 12 +- fluree-db-server/src/lib.rs | 35 ++++- fluree-db-server/src/state.rs | 32 ++++- fluree-db-server/tests/raft_multi_node.rs | 120 ++++++++++++++++++ 6 files changed, 258 insertions(+), 54 deletions(-) diff --git a/fluree-db-api/src/lib.rs b/fluree-db-api/src/lib.rs index 6510b4a506..d08cdcbed9 100644 --- a/fluree-db-api/src/lib.rs +++ b/fluree-db-api/src/lib.rs @@ -1141,6 +1141,15 @@ pub struct FlureeBuilder { novelty_thresholds: Option, /// Remote Fluree connection registry for SERVICE federation. remote_connections: remote_service::RemoteConnectionRegistry, + /// Externally-supplied event bus. When set, `Fluree` uses this + /// instance instead of allocating its own. Used by the raft + /// integration path: `RaftIntegration::bootstrap` constructs the + /// bus (so the state-machine adapter can emit into it during + /// apply), and the server assembly passes the same `Arc` here + /// so the events endpoint (which subscribes on + /// `Fluree::event_bus()`) sees runtime-fired raft events without + /// a second listener bridge. + event_bus_override: Option>, } /// Configuration for background indexing in `FlureeBuilder`. @@ -1221,12 +1230,10 @@ struct RuntimeParts { /// `ledger_manager` on every commit / index publish; drops loaded /// ledgers on retract. /// -/// `FlureeBuilder::build` calls this with Fluree's internal bus when -/// indexing is enabled. Deployments that publish commit events on a -/// separate bus (e.g. raft's `LedgerEventBus`) should call this again -/// with that bus so cache reconciliation also fires on those events — -/// otherwise follower nodes only refresh on initial load, and writes -/// that land between the initial load and the next reload are invisible. +/// `FlureeBuilder::build` calls this during `finalize_with_backend` +/// whenever a `LedgerManager` exists. External embedders constructing +/// `Fluree` via `Fluree::new` bypass the builder and may call this +/// directly if they want the same cache reconciliation. pub fn spawn_local_cache_event_listener( event_bus: Arc, ledger_manager: Arc, @@ -1357,6 +1364,7 @@ impl FlureeBuilder { indexing_config: Some(default_indexing_builder_config()), novelty_thresholds: None, remote_connections: remote_service::RemoteConnectionRegistry::new(), + event_bus_override: None, } } @@ -1371,6 +1379,7 @@ impl FlureeBuilder { indexing_config: None, novelty_thresholds: None, remote_connections: remote_service::RemoteConnectionRegistry::new(), + event_bus_override: None, } } @@ -1437,6 +1446,7 @@ impl FlureeBuilder { indexing_config: Some(default_indexing_builder_config()), novelty_thresholds: None, remote_connections: remote_service::RemoteConnectionRegistry::new(), + event_bus_override: None, } } @@ -1629,6 +1639,7 @@ impl FlureeBuilder { indexing_config, novelty_thresholds: None, remote_connections: remote_service::RemoteConnectionRegistry::new(), + event_bus_override: None, }) } @@ -1765,6 +1776,32 @@ impl FlureeBuilder { self } + /// Use a caller-supplied `LedgerEventBus` instead of allocating + /// a new one on `build*`. + /// + /// The events endpoint (and any other consumer that subscribes + /// via `Fluree::event_bus()`) sees notifications from every + /// publisher wired to this bus. In the raft server assembly this + /// is used to unify the state-machine adapter's event stream + /// with Fluree's own: `RaftIntegration::bootstrap` creates the + /// bus (so the adapter can emit into it during apply), and the + /// server assembly threads the same `Arc` here so runtime events + /// reach the events endpoint without a second bridge task. + pub fn with_event_bus(mut self, bus: Arc) -> Self { + self.event_bus_override = Some(bus); + self + } + + /// Returns the caller-supplied event bus if one was set via + /// [`with_event_bus`](Self::with_event_bus), otherwise allocates + /// a fresh bus with the historical default capacity. Called from + /// every `build_*` path so the override behaviour is uniform. + fn resolve_event_bus(&self) -> Arc { + self.event_bus_override + .clone() + .unwrap_or_else(|| Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024))) + } + /// Register a remote Fluree connection for SERVICE federation. /// /// The `name` is used in SPARQL queries as `SERVICE { ... }`. @@ -1797,7 +1834,7 @@ impl FlureeBuilder { let storage = FileStorage::new(&path); let nameservice = FileNameService::new(&path); - let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024)); + let event_bus = self.resolve_event_bus(); let notifying = fluree_db_nameservice::NotifyingNameService::new(nameservice, event_bus.clone()); let backend = StorageBackend::Managed(Arc::new(storage)); @@ -1833,7 +1870,7 @@ impl FlureeBuilder { storage: impl Storage + 'static, nameservice: NameServiceMode, ) -> Fluree { - let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024)); + let event_bus = self.resolve_event_bus(); let index_config = self.derive_indexing(); Self::finalize_with_backend( self.ledger_cache_config, @@ -1932,7 +1969,7 @@ impl FlureeBuilder { let key_provider = StaticKeyProvider::new(encryption_key); let storage = EncryptedStorage::new(file_storage, key_provider); let nameservice = FileNameService::new(&path); - let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024)); + let event_bus = self.resolve_event_bus(); let notifying = fluree_db_nameservice::NotifyingNameService::new(nameservice, event_bus.clone()); let index_config = self.derive_indexing(); @@ -1972,7 +2009,7 @@ impl FlureeBuilder { pub fn build_memory(self) -> Fluree { let storage = MemoryStorage::new(); let nameservice = MemoryNameService::new(); - let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024)); + let event_bus = self.resolve_event_bus(); let notifying = fluree_db_nameservice::NotifyingNameService::new(nameservice, event_bus.clone()); let ns_mode = NameServiceMode::ReadWrite(Arc::new(notifying)); @@ -2005,7 +2042,7 @@ impl FlureeBuilder { let key_provider = StaticKeyProvider::new(encryption_key); let storage = EncryptedStorage::new(mem_storage, key_provider); let nameservice = MemoryNameService::new(); - let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024)); + let event_bus = self.resolve_event_bus(); let notifying = fluree_db_nameservice::NotifyingNameService::new(nameservice, event_bus.clone()); let ns_mode = NameServiceMode::ReadWrite(Arc::new(notifying)); @@ -2055,7 +2092,7 @@ impl FlureeBuilder { }); let backend = StorageBackend::Permanent(Arc::new(ipfs_store)); let nameservice = MemoryNameService::new(); - let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024)); + let event_bus = self.resolve_event_bus(); let notifying = fluree_db_nameservice::NotifyingNameService::new(nameservice, event_bus.clone()); let ns_mode = NameServiceMode::ReadWrite(Arc::new(notifying.clone())); @@ -2131,7 +2168,7 @@ impl FlureeBuilder { // Empty prefix: S3Storage already applies its own key prefix. let nameservice = StorageNameService::new(storage.clone(), ""); - let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024)); + let event_bus = self.resolve_event_bus(); let notifying = fluree_db_nameservice::NotifyingNameService::new(nameservice, event_bus.clone()); let ns_mode = NameServiceMode::ReadWrite(Arc::new(notifying.clone())); @@ -2232,7 +2269,7 @@ impl FlureeBuilder { .await .map_err(|e| ApiError::config(format!("Failed to ensure DynamoDB table: {e}")))?; - let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024)); + let event_bus = self.resolve_event_bus(); let notifying = fluree_db_nameservice::NotifyingNameService::new(dynamo_ns, event_bus.clone()); let ns_mode = NameServiceMode::ReadWrite(Arc::new(notifying.clone())); @@ -2317,7 +2354,7 @@ impl FlureeBuilder { // Empty prefix: S3Storage already applies its own key prefix. let nameservice = StorageNameService::new(storage.clone(), ""); - let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024)); + let event_bus = self.resolve_event_bus(); let notifying = fluree_db_nameservice::NotifyingNameService::new(nameservice, event_bus.clone()); let ns_mode = NameServiceMode::ReadWrite(Arc::new(notifying.clone())); @@ -2492,10 +2529,8 @@ impl FlureeBuilder { let _ = attachment_provider_cell.set(Arc::clone(mgr)); } - if indexing_mode.is_enabled() { - if let Some(manager) = &ledger_manager { - spawn_local_cache_event_listener(Arc::clone(&event_bus), Arc::clone(manager)); - } + if let Some(manager) = &ledger_manager { + spawn_local_cache_event_listener(Arc::clone(&event_bus), Arc::clone(manager)); } Fluree { @@ -2586,7 +2621,7 @@ impl FlureeBuilder { // Wrap with address identifier routing if configured let storage = self.wrap_address_identifiers(base_storage)?; let backend = StorageBackend::Managed(storage); - let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024)); + let event_bus = self.resolve_event_bus(); let index_config = self.derive_indexing(); let attachment_provider_cell = Self::new_attachment_provider_cell(); @@ -2653,7 +2688,7 @@ impl FlureeBuilder { // Wrap with address identifier routing if configured let storage = self.wrap_address_identifiers(base_storage)?; let backend = StorageBackend::Managed(storage); - let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024)); + let event_bus = self.resolve_event_bus(); let index_config = self.derive_indexing(); let attachment_provider_cell = Self::new_attachment_provider_cell(); @@ -2719,7 +2754,7 @@ impl FlureeBuilder { .wrap_address_identifiers_aws(base_storage, aws_handle.config()) .await?; let backend = StorageBackend::Managed(storage); - let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024)); + let event_bus = self.resolve_event_bus(); let index_config = self.derive_indexing(); let attachment_provider_cell = Self::new_attachment_provider_cell(); @@ -4403,18 +4438,16 @@ pub fn fluree_memory() -> Fluree { mod tests { use super::*; - #[test] - fn test_fluree_builder_memory() { + #[tokio::test] + async fn test_fluree_builder_memory() { let fluree = FlureeBuilder::memory().cache_max_mb(500).build_memory(); assert_eq!(fluree.config.cache.max_mb, 500); } - #[test] + #[tokio::test] #[cfg(feature = "native")] - fn test_fluree_builder_file() { - // `without_indexing()` keeps this a plain `#[test]` — the default - // background indexer would require a tokio runtime. + async fn test_fluree_builder_file() { let result = FlureeBuilder::file("/tmp/test") .without_indexing() .parallelism(8) @@ -4433,8 +4466,8 @@ mod tests { assert!(result.is_err()); } - #[test] - fn test_fluree_memory_convenience() { + #[tokio::test] + async fn test_fluree_memory_convenience() { let _fluree = fluree_memory(); } @@ -4442,8 +4475,8 @@ mod tests { // IndexConfig propagation tests (commit e6d0044) // ======================================================================== - #[test] - fn test_default_index_config_returns_defaults_without_thresholds() { + #[tokio::test] + async fn test_default_index_config_returns_defaults_without_thresholds() { let fluree = FlureeBuilder::memory().build_memory(); let cfg = fluree.default_index_config(); let expected = server_defaults::default_index_config(); @@ -4451,8 +4484,8 @@ mod tests { assert_eq!(cfg.reindex_max_bytes, expected.reindex_max_bytes); } - #[test] - fn test_with_indexing_thresholds_propagates_to_default_index_config() { + #[tokio::test] + async fn test_with_indexing_thresholds_propagates_to_default_index_config() { // This is the exact scenario that was broken before e6d0044: // custom thresholds set via the builder were silently dropped. let fluree = FlureeBuilder::memory() diff --git a/fluree-db-api/src/query/builder.rs b/fluree-db-api/src/query/builder.rs index b00ae6fde1..eea5986059 100644 --- a/fluree-db-api/src/query/builder.rs +++ b/fluree-db-api/src/query/builder.rs @@ -1541,8 +1541,8 @@ mod tests { // Builder construction tests // ======================================================================== - #[test] - fn test_view_query_builder_validate_missing_input() { + #[tokio::test] + async fn test_view_query_builder_validate_missing_input() { let fluree = FlureeBuilder::memory().build_memory(); // We can't create a view without a ledger, so test validate on FromQueryBuilder instead let builder = FromQueryBuilder::new(&fluree); @@ -1556,8 +1556,8 @@ mod tests { )); } - #[test] - fn test_from_query_builder_validate_with_input() { + #[tokio::test] + async fn test_from_query_builder_validate_with_input() { let fluree = FlureeBuilder::memory().build_memory(); let query = json!({ "from": "test:main", @@ -1569,8 +1569,8 @@ mod tests { assert!(result.is_ok()); } - #[test] - fn test_from_query_builder_validate_conflict() { + #[tokio::test] + async fn test_from_query_builder_validate_conflict() { let fluree = FlureeBuilder::memory().build_memory(); let query = json!({"from": "test:main", "select": ["?s"]}); let builder = fluree diff --git a/fluree-db-api/src/query/nameservice_builder.rs b/fluree-db-api/src/query/nameservice_builder.rs index fe4d01c589..6f31e14267 100644 --- a/fluree-db-api/src/query/nameservice_builder.rs +++ b/fluree-db-api/src/query/nameservice_builder.rs @@ -358,8 +358,8 @@ mod tests { fluree } - #[test] - fn test_builder_validate_missing_input() { + #[tokio::test] + async fn test_builder_validate_missing_input() { let fluree = FlureeBuilder::memory().build_memory(); let builder = NameserviceQueryBuilder::new(&fluree); let result = builder.validate(); @@ -372,8 +372,8 @@ mod tests { )); } - #[test] - fn test_builder_validate_with_jsonld() { + #[tokio::test] + async fn test_builder_validate_with_jsonld() { let fluree = FlureeBuilder::memory().build_memory(); let query = json!({ "select": ["?ledger"], @@ -384,8 +384,8 @@ mod tests { assert!(result.is_ok()); } - #[test] - fn test_builder_validate_with_sparql() { + #[tokio::test] + async fn test_builder_validate_with_sparql() { let fluree = FlureeBuilder::memory().build_memory(); let builder = fluree .nameservice_query() diff --git a/fluree-db-server/src/lib.rs b/fluree-db-server/src/lib.rs index 82eb1eeb99..440d96ff13 100644 --- a/fluree-db-server/src/lib.rs +++ b/fluree-db-server/src/lib.rs @@ -535,6 +535,22 @@ impl FlureeServerBuilder { // deployment mode. Raft mode wires `RaftNameService` so // every node's reads observe replicated state; default mode // uses whatever the storage backend implies. + // + // Raft-mode also threads the integration's + // `LedgerEventBus` into Fluree. Without this, the + // state-machine adapter emits `NameServiceEvent`s on the + // integration's private bus while the events endpoint and + // Fluree's own cache reconciler subscribe on + // `Fluree::event_bus()` — a different bus instance. + // Runtime raft commits then never surface to SSE + // subscribers (peers, external tools), because nothing + // bridges the two. Passing the same `Arc` here makes both + // sides observe the same broadcast channel. + #[cfg(feature = "raft")] + let raft_event_bus = self + .raft + .as_ref() + .map(|(integration, _)| std::sync::Arc::clone(&integration.event_bus)); #[cfg(feature = "raft")] let (fluree, cache_stats_handle) = if let Some(raft_ns) = raft_nameservice.as_ref() { // RaftNameService satisfies the full @@ -544,12 +560,13 @@ impl FlureeServerBuilder { let publisher: std::sync::Arc = raft_ns.clone(); let ns_mode = fluree_db_api::NameServiceMode::ReadWrite(publisher); - state::build_fluree_with_nameservice(&self.config, ns_mode).await? + state::build_fluree_with_nameservice(&self.config, ns_mode, raft_event_bus.clone()) + .await? } else { - state::build_default_fluree(&self.config).await? + state::build_default_fluree(&self.config, raft_event_bus.clone()).await? }; #[cfg(not(feature = "raft"))] - let (fluree, cache_stats_handle) = state::build_default_fluree(&self.config).await?; + let (fluree, cache_stats_handle) = state::build_default_fluree(&self.config, None).await?; #[allow(unused_mut)] let mut state_inner = @@ -627,9 +644,19 @@ impl FlureeServerBuilder { None => None, }; - // Subscribe Fluree's `LedgerManager` to the raft integration's + // Subscribe Fluree's `LedgerManager` to the (now-unified) // event bus so commit / index applies reconcile cached state // on every node, not just the one that staged the commit. + // + // This is not a duplicate of Fluree's own internal listener + // (spawned inside `finalize_with_backend` when indexing is + // enabled): `build_client_with_nameservice` — the raft path — + // passes a caller-supplied nameservice, which flips + // `indexing_mode` to `Disabled`, which skips the internal + // listener. So this is the *only* subscription forwarding + // ledger commits to the manager in raft mode. Post-fix, it + // fires on the same bus the events endpoint reads, so both + // consumers see the same event stream. #[cfg(feature = "raft")] if let Some(((integration, _), mgr)) = self.raft.as_ref().zip(state_inner.fluree.ledger_manager()) diff --git a/fluree-db-server/src/state.rs b/fluree-db-server/src/state.rs index 8a832ed208..45b5958c43 100644 --- a/fluree-db-server/src/state.rs +++ b/fluree-db-server/src/state.rs @@ -152,7 +152,12 @@ impl AppState { config.validate().map_err(|e| { fluree_db_api::ApiError::internal(format!("Invalid configuration: {e}")) })?; - let (fluree, cache_stats_handle) = build_default_fluree(&config).await?; + // No event bus to inject on the vanilla (non-raft) path — Fluree + // allocates its own. The raft assembly path builds Fluree + // separately via `build_fluree_with_nameservice`, threading + // `integration.event_bus` in so the state-machine adapter and + // the events endpoint share a single bus instance. + let (fluree, cache_stats_handle) = build_default_fluree(&config, None).await?; Self::with_fluree(config, telemetry_config, fluree, cache_stats_handle).await } @@ -318,13 +323,21 @@ impl Drop for AppState { /// Build the default `Fluree` instance from server config — picks /// proxy or direct mode based on `config.is_proxy_storage_mode()`. +/// +/// `event_bus` is threaded into `FlureeBuilder::with_event_bus` when +/// provided; the raft assembly path uses this to hand Fluree the +/// same `LedgerEventBus` the state-machine adapter emits into, so +/// runtime raft events reach the events endpoint's subscription on +/// `Fluree::event_bus()`. `None` yields the historical behaviour +/// (Fluree allocates its own bus). pub async fn build_default_fluree( config: &ServerConfig, + event_bus: Option>, ) -> Result<(Arc, tokio::task::JoinHandle<()>), fluree_db_api::ApiError> { if config.is_proxy_storage_mode() { build_proxy_fluree(config) } else { - build_direct_fluree(config, None).await + build_direct_fluree(config, None, event_bus).await } } @@ -335,10 +348,16 @@ pub async fn build_default_fluree( /// /// Raft mode requires direct storage; passing a proxy-mode config /// here errors at validation time below. +/// +/// `event_bus` is the raft integration's `LedgerEventBus`; passing +/// it here unifies the emitter (state-machine adapter) and the +/// subscribers (events endpoint, Fluree's own cache reconciler) +/// onto the same bus. #[cfg(feature = "raft")] pub async fn build_fluree_with_nameservice( config: &ServerConfig, nameservice: fluree_db_api::NameServiceMode, + event_bus: Option>, ) -> Result<(Arc, tokio::task::JoinHandle<()>), fluree_db_api::ApiError> { if config.is_proxy_storage_mode() { return Err(fluree_db_api::ApiError::config( @@ -346,15 +365,17 @@ pub async fn build_fluree_with_nameservice( use direct storage (file or S3) instead", )); } - build_direct_fluree(config, Some(nameservice)).await + build_direct_fluree(config, Some(nameservice), event_bus).await } /// Build a direct-storage `Fluree` (file, S3, DynamoDB, etc.) from /// config. When `nameservice` is `Some`, it replaces the -/// backend-implied nameservice. +/// backend-implied nameservice. When `event_bus` is `Some`, it +/// replaces Fluree's default per-instance bus. async fn build_direct_fluree( config: &ServerConfig, nameservice: Option, + event_bus: Option>, ) -> Result<(Arc, tokio::task::JoinHandle<()>), fluree_db_api::ApiError> { let mut builder = if let Some(ref path) = config.connection_config { // Connection config: build from JSON-LD (supports S3, @@ -391,6 +412,9 @@ async fn build_direct_fluree( if let Some(max_mb) = config.disk_cache_max_mb { builder = builder.disk_cache_max_mb(max_mb); } + if let Some(bus) = event_bus { + builder = builder.with_event_bus(bus); + } if config.indexing_enabled { let max_bytes = config .reindex_max_bytes diff --git a/fluree-db-server/tests/raft_multi_node.rs b/fluree-db-server/tests/raft_multi_node.rs index da20b7f7b7..d5d7c2caf7 100644 --- a/fluree-db-server/tests/raft_multi_node.rs +++ b/fluree-db-server/tests/raft_multi_node.rs @@ -645,6 +645,126 @@ async fn happy_path_follower_forwards_to_leader() { } } +/// Regression test for the event-bus wiring bug fixed in this branch. +/// +/// Before the fix, the raft `StateMachineAdapter` emitted +/// `NameServiceEvent`s on `RaftIntegration::event_bus`, but the +/// `/v1/fluree/events` SSE endpoint subscribed on the *other* bus +/// (`Fluree::event_bus()`) — a distinct instance. Runtime commits +/// on the raft path therefore never surfaced to SSE subscribers +/// (peers, tooling). The snapshot the events endpoint sends on +/// connect still populated per-ledger records because it reads +/// straight from the nameservice, so the bug hid — connection-time +/// data appeared, subsequent inserts did not. +/// +/// This test opens an SSE subscription BEFORE any inserts happen, +/// then does a live insert, and asserts an `ns-record` event with +/// `commit_t = 1` arrives on the stream within a short window. +/// Would fail against pre-fix code (stream stays silent). +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn sse_events_endpoint_emits_runtime_raft_commits() { + init_test_tracing(); + let cluster = TestCluster::spawn(CLUSTER_SIZE).await; + cluster.bootstrap().await; + + let ledger = "raft:sse-runtime"; + cluster + .create_ledger(cluster.nodes[0].node_id, ledger) + .await; + + // Subscribe against a follower — the interesting case, because + // the state machine adapter applies on every node and we want + // to verify the follower's SSE endpoint sees runtime events too. + let follower = cluster.pick_follower().await; + let follower_url = cluster + .nodes + .iter() + .find(|n| n.node_id == follower) + .map(|n| n.public_url.clone()) + .expect("follower node has a URL"); + + // Long-timeout client for the SSE stream; the normal 10s + // read-timeout on `cluster.client` would kill an idle stream + // before we insert. + let sse_client = reqwest::Client::builder() + .connect_timeout(Duration::from_secs(2)) + .no_gzip() + .build() + .expect("build sse client"); + + let events_url = format!("{follower_url}/v1/fluree/events?all=true"); + let resp = sse_client + .get(&events_url) + .header("accept", "text/event-stream") + .send() + .await + .expect("SSE subscribe"); + assert!( + resp.status().is_success(), + "SSE endpoint returned {}: {}", + resp.status(), + resp.text().await.unwrap_or_default() + ); + + // Drive the byte stream on a background task, accumulating raw + // bytes into a shared buffer that the main task can scan. + let buffer = Arc::new(tokio::sync::Mutex::new(Vec::::new())); + let buffer_bg = Arc::clone(&buffer); + let stream_task = tokio::spawn(async move { + use futures::StreamExt; + let mut stream = resp.bytes_stream(); + while let Some(chunk) = stream.next().await { + match chunk { + Ok(bytes) => buffer_bg.lock().await.extend_from_slice(&bytes), + Err(_) => break, + } + } + }); + + // Give the SSE endpoint a moment to emit its connection-time + // snapshot (which contains the ledger at commit_t=0), so any + // event we observe after this point is genuinely live. + tokio::time::sleep(Duration::from_millis(500)).await; + + // Fire the runtime insert. Post-fix, this should produce an + // `ns-record` event with commit_t=1 on the SSE stream. + cluster + .insert_subject(cluster.nodes[0].node_id, ledger, "alice", "Alice") + .await; + + // Poll the accumulated bytes for the runtime event. Give it a + // generous budget — raft apply + SSE emit is well under a + // second on a quiet machine, but CI can be slow. + let deadline = Instant::now() + Duration::from_secs(10); + let mut saw_runtime_event = false; + while Instant::now() < deadline { + let buf = buffer.lock().await; + let text = String::from_utf8_lossy(&buf); + // Look for an ns-record event whose data payload includes + // `"commit_t":1`. commit_t=0 is the snapshot state (the + // create landing); commit_t=1 is the runtime insert. + if text + .split("event: ns-record") + .skip(1) + .any(|block| block.contains("\"commit_t\":1")) + { + saw_runtime_event = true; + break; + } + drop(buf); + tokio::time::sleep(Duration::from_millis(50)).await; + } + + stream_task.abort(); + + assert!( + saw_runtime_event, + "did not observe an ns-record event for commit_t=1 within 10s — the raft state-machine \ + adapter's event bus is not reaching the SSE endpoint. Captured stream so far:\n{}", + String::from_utf8_lossy(&buffer.lock().await), + ); +} + /// Many transactions fan out to every node in parallel; the final /// state matches the union of all writes and every node sees it. #[tokio::test(flavor = "multi_thread", worker_threads = 4)] From 0350f34725436483c91654b5bf0aad1f489e1315 Mon Sep 17 00:00:00 2001 From: Benjamin Lamothe Date: Thu, 2 Jul 2026 19:39:23 -0400 Subject: [PATCH 2/6] remove now redundant local event listener spawing in raft --- fluree-db-server/src/lib.rs | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/fluree-db-server/src/lib.rs b/fluree-db-server/src/lib.rs index 440d96ff13..379f73630d 100644 --- a/fluree-db-server/src/lib.rs +++ b/fluree-db-server/src/lib.rs @@ -644,29 +644,6 @@ impl FlureeServerBuilder { None => None, }; - // Subscribe Fluree's `LedgerManager` to the (now-unified) - // event bus so commit / index applies reconcile cached state - // on every node, not just the one that staged the commit. - // - // This is not a duplicate of Fluree's own internal listener - // (spawned inside `finalize_with_backend` when indexing is - // enabled): `build_client_with_nameservice` — the raft path — - // passes a caller-supplied nameservice, which flips - // `indexing_mode` to `Disabled`, which skips the internal - // listener. So this is the *only* subscription forwarding - // ledger commits to the manager in raft mode. Post-fix, it - // fires on the same bus the events endpoint reads, so both - // consumers see the same event stream. - #[cfg(feature = "raft")] - if let Some(((integration, _), mgr)) = - self.raft.as_ref().zip(state_inner.fluree.ledger_manager()) - { - fluree_db_api::spawn_local_cache_event_listener( - Arc::clone(&integration.event_bus), - Arc::clone(mgr), - ); - } - // Per-node worker supervisor. Runs on every node (leader and // followers alike) because distributed workers can land // anywhere under rendezvous assignment. Spawned here so its From e2f0d531e264c5864c35a5ce8f14dd7349b23b68 Mon Sep 17 00:00:00 2001 From: Benjamin Lamothe Date: Fri, 3 Jul 2026 01:01:57 -0400 Subject: [PATCH 3/6] remove now unnecessary PeerSyncTask explicit cached leger refreshing --- fluree-db-server/src/peer/sync_task.rs | 53 -------------------------- 1 file changed, 53 deletions(-) diff --git a/fluree-db-server/src/peer/sync_task.rs b/fluree-db-server/src/peer/sync_task.rs index 485485b56c..1e58a2c966 100644 --- a/fluree-db-server/src/peer/sync_task.rs +++ b/fluree-db-server/src/peer/sync_task.rs @@ -10,7 +10,6 @@ use std::sync::Arc; -use fluree_db_api::{NotifyResult, NsNotify}; use fluree_db_nameservice::{ CasResult, NameServiceError, NameServiceEvent, NsRecord, RefKind, RefValue, }; @@ -300,11 +299,6 @@ impl PeerSyncTask { "Remote ledger watermark updated (persisted to local NS)" ); } - - // 5. Notify LedgerManager (AFTER NS is updated, so reload sees new refs) - // Note: events are emitted automatically by NotifyingNameService when - // the CAS operations above succeed — no manual emission needed here. - self.refresh_cached_ledger(record).await; } /// Retract ledger locally and evict from cache. @@ -322,59 +316,12 @@ impl PeerSyncTask { ); } - // Note: retraction event emitted automatically by NotifyingNameService. - // 2. Clear in-memory watermarks self.peer_state.remove_ledger(ledger_id).await; - // 3. Evict from cache - self.fluree.disconnect_ledger(ledger_id).await; - tracing::info!(ledger_id = %ledger_id, "Ledger retracted from remote"); } - /// Notify LedgerManager to refresh a cached ledger from the NS update. - async fn refresh_cached_ledger(&self, record: &NsRecord) { - let Some(mgr) = self.fluree.ledger_manager() else { - return; - }; - - match mgr - .notify(NsNotify { - ledger_id: record.ledger_id.clone(), - record: Some(record.clone()), - }) - .await - { - Ok(NotifyResult::NotLoaded) => { - // Not cached — do not cold-load on events - } - Ok(NotifyResult::Current) => { - // Already up to date - } - Ok( - result @ (NotifyResult::Reloaded - | NotifyResult::IndexUpdated - | NotifyResult::CommitsApplied { .. }), - ) => { - let after_t = mgr.current_t(&record.ledger_id).await; - tracing::debug!( - alias = %record.ledger_id, - after_cached_t = ?after_t, - ?result, - "refreshed cached ledger from SSE update" - ); - } - Err(e) => { - tracing::warn!( - alias = %record.ledger_id, - error = %e, - "Failed to refresh cached ledger from SSE update" - ); - } - } - } - /// Preload explicitly configured ledgers into the cache. async fn preload_configured_ledgers(&self) { let sub = self.config.peer_subscription(); From b6b79d7fead05c0bebfd7ba765be513171ba06e1 Mon Sep 17 00:00:00 2001 From: Benjamin Lamothe Date: Fri, 3 Jul 2026 12:49:45 -0400 Subject: [PATCH 4/6] trem comments for relevance --- fluree-db-api/src/lib.rs | 26 +++++++++++--------------- fluree-db-server/src/peer/sync_task.rs | 7 +++++-- fluree-db-server/src/state.rs | 12 ++---------- 3 files changed, 18 insertions(+), 27 deletions(-) diff --git a/fluree-db-api/src/lib.rs b/fluree-db-api/src/lib.rs index d08cdcbed9..2e209b406b 100644 --- a/fluree-db-api/src/lib.rs +++ b/fluree-db-api/src/lib.rs @@ -1120,6 +1120,10 @@ fn derive_index_config(config: &ConnectionConfig) -> IndexConfig { /// at compile time. /// - **Dynamic build** (`build_client()`) returns `FlureeClient` (type-erased) — /// used when the storage backend is determined at runtime from config. +/// +/// All `build*` methods (including the synchronous ones) must be called +/// within a tokio runtime: whenever ledger caching is enabled (the +/// default), building spawns the local cache event listener task. #[derive(Debug, Clone, Default)] pub struct FlureeBuilder { config: ConnectionConfig, @@ -1142,13 +1146,9 @@ pub struct FlureeBuilder { /// Remote Fluree connection registry for SERVICE federation. remote_connections: remote_service::RemoteConnectionRegistry, /// Externally-supplied event bus. When set, `Fluree` uses this - /// instance instead of allocating its own. Used by the raft - /// integration path: `RaftIntegration::bootstrap` constructs the - /// bus (so the state-machine adapter can emit into it during - /// apply), and the server assembly passes the same `Arc` here - /// so the events endpoint (which subscribes on - /// `Fluree::event_bus()`) sees runtime-fired raft events without - /// a second listener bridge. + /// instance instead of allocating its own, so external publishers + /// and subscribers on `Fluree::event_bus()` share one broadcast + /// channel. event_bus_override: Option>, } @@ -1779,14 +1779,10 @@ impl FlureeBuilder { /// Use a caller-supplied `LedgerEventBus` instead of allocating /// a new one on `build*`. /// - /// The events endpoint (and any other consumer that subscribes - /// via `Fluree::event_bus()`) sees notifications from every - /// publisher wired to this bus. In the raft server assembly this - /// is used to unify the state-machine adapter's event stream - /// with Fluree's own: `RaftIntegration::bootstrap` creates the - /// bus (so the adapter can emit into it during apply), and the - /// server assembly threads the same `Arc` here so runtime events - /// reach the events endpoint without a second bridge task. + /// Every consumer that subscribes via `Fluree::event_bus()` + /// (including the cache event listener spawned on build) then + /// observes notifications from every publisher wired to the + /// supplied bus, with no bridge task between separate instances. pub fn with_event_bus(mut self, bus: Arc) -> Self { self.event_bus_override = Some(bus); self diff --git a/fluree-db-server/src/peer/sync_task.rs b/fluree-db-server/src/peer/sync_task.rs index 1e58a2c966..d0ef4a7f1c 100644 --- a/fluree-db-server/src/peer/sync_task.rs +++ b/fluree-db-server/src/peer/sync_task.rs @@ -130,7 +130,8 @@ impl PeerSyncTask { } /// Persist remote ledger state into local FileNameService, then update - /// in-memory watermarks and notify LedgerManager. + /// in-memory watermarks. Cache refresh follows from the events the + /// notifying nameservice emits when the CAS operations succeed. async fn handle_ledger_updated(&self, record: &NsRecord) { let Some(ns) = self.fluree.nameservice_mode().publisher() else { tracing::error!("PeerSyncTask requires a read-write nameservice"); @@ -301,7 +302,9 @@ impl PeerSyncTask { } } - /// Retract ledger locally and evict from cache. + /// Retract ledger locally and clear its in-memory watermarks. Cache + /// eviction follows from the retraction event the notifying + /// nameservice emits. async fn handle_ledger_retracted(&self, ledger_id: &str) { // 1. Retract via Publisher::retract() let Some(ns) = self.fluree.nameservice_mode().publisher() else { diff --git a/fluree-db-server/src/state.rs b/fluree-db-server/src/state.rs index 45b5958c43..d566715f22 100644 --- a/fluree-db-server/src/state.rs +++ b/fluree-db-server/src/state.rs @@ -152,11 +152,7 @@ impl AppState { config.validate().map_err(|e| { fluree_db_api::ApiError::internal(format!("Invalid configuration: {e}")) })?; - // No event bus to inject on the vanilla (non-raft) path — Fluree - // allocates its own. The raft assembly path builds Fluree - // separately via `build_fluree_with_nameservice`, threading - // `integration.event_bus` in so the state-machine adapter and - // the events endpoint share a single bus instance. + // No external event bus to inject here — Fluree allocates its own. let (fluree, cache_stats_handle) = build_default_fluree(&config, None).await?; Self::with_fluree(config, telemetry_config, fluree, cache_stats_handle).await } @@ -325,11 +321,7 @@ impl Drop for AppState { /// proxy or direct mode based on `config.is_proxy_storage_mode()`. /// /// `event_bus` is threaded into `FlureeBuilder::with_event_bus` when -/// provided; the raft assembly path uses this to hand Fluree the -/// same `LedgerEventBus` the state-machine adapter emits into, so -/// runtime raft events reach the events endpoint's subscription on -/// `Fluree::event_bus()`. `None` yields the historical behaviour -/// (Fluree allocates its own bus). +/// provided; `None` lets Fluree allocate its own bus. pub async fn build_default_fluree( config: &ServerConfig, event_bus: Option>, From 34abea12a773ee8086b0228b02e39c5f3c135d81 Mon Sep 17 00:00:00 2001 From: Benjamin Lamothe Date: Fri, 3 Jul 2026 12:54:02 -0400 Subject: [PATCH 5/6] remove unnecessary clone --- fluree-db-server/src/lib.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/fluree-db-server/src/lib.rs b/fluree-db-server/src/lib.rs index 379f73630d..a34dbd6ba1 100644 --- a/fluree-db-server/src/lib.rs +++ b/fluree-db-server/src/lib.rs @@ -560,10 +560,9 @@ impl FlureeServerBuilder { let publisher: std::sync::Arc = raft_ns.clone(); let ns_mode = fluree_db_api::NameServiceMode::ReadWrite(publisher); - state::build_fluree_with_nameservice(&self.config, ns_mode, raft_event_bus.clone()) - .await? + state::build_fluree_with_nameservice(&self.config, ns_mode, raft_event_bus).await? } else { - state::build_default_fluree(&self.config, raft_event_bus.clone()).await? + state::build_default_fluree(&self.config, raft_event_bus).await? }; #[cfg(not(feature = "raft"))] let (fluree, cache_stats_handle) = state::build_default_fluree(&self.config, None).await?; From d12bb8cb0c52bc5b021a3ee8ad283a4b9a516068 Mon Sep 17 00:00:00 2001 From: Benjamin Lamothe Date: Fri, 3 Jul 2026 12:57:31 -0400 Subject: [PATCH 6/6] remove unnecessary _override suffix --- fluree-db-api/src/lib.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/fluree-db-api/src/lib.rs b/fluree-db-api/src/lib.rs index 2e209b406b..8e28c68131 100644 --- a/fluree-db-api/src/lib.rs +++ b/fluree-db-api/src/lib.rs @@ -1149,7 +1149,7 @@ pub struct FlureeBuilder { /// instance instead of allocating its own, so external publishers /// and subscribers on `Fluree::event_bus()` share one broadcast /// channel. - event_bus_override: Option>, + event_bus: Option>, } /// Configuration for background indexing in `FlureeBuilder`. @@ -1364,7 +1364,7 @@ impl FlureeBuilder { indexing_config: Some(default_indexing_builder_config()), novelty_thresholds: None, remote_connections: remote_service::RemoteConnectionRegistry::new(), - event_bus_override: None, + event_bus: None, } } @@ -1379,7 +1379,7 @@ impl FlureeBuilder { indexing_config: None, novelty_thresholds: None, remote_connections: remote_service::RemoteConnectionRegistry::new(), - event_bus_override: None, + event_bus: None, } } @@ -1446,7 +1446,7 @@ impl FlureeBuilder { indexing_config: Some(default_indexing_builder_config()), novelty_thresholds: None, remote_connections: remote_service::RemoteConnectionRegistry::new(), - event_bus_override: None, + event_bus: None, } } @@ -1639,7 +1639,7 @@ impl FlureeBuilder { indexing_config, novelty_thresholds: None, remote_connections: remote_service::RemoteConnectionRegistry::new(), - event_bus_override: None, + event_bus: None, }) } @@ -1784,7 +1784,7 @@ impl FlureeBuilder { /// observes notifications from every publisher wired to the /// supplied bus, with no bridge task between separate instances. pub fn with_event_bus(mut self, bus: Arc) -> Self { - self.event_bus_override = Some(bus); + self.event_bus = Some(bus); self } @@ -1793,7 +1793,7 @@ impl FlureeBuilder { /// a fresh bus with the historical default capacity. Called from /// every `build_*` path so the override behaviour is uniform. fn resolve_event_bus(&self) -> Arc { - self.event_bus_override + self.event_bus .clone() .unwrap_or_else(|| Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024))) }