Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 63 additions & 34 deletions fluree-db-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,10 @@ fn derive_index_config(config: &ConnectionConfig) -> IndexConfig {
/// at compile time.
/// - **Dynamic build** (`build_client()`) returns `FlureeClient` (type-erased) —
/// used when the storage backend is determined at runtime from config.
///
/// All `build*` methods (including the synchronous ones) must be called
/// within a tokio runtime: whenever ledger caching is enabled (the
/// default), building spawns the local cache event listener task.
#[derive(Debug, Clone, Default)]
pub struct FlureeBuilder {
config: ConnectionConfig,
Expand All @@ -1141,6 +1145,11 @@ pub struct FlureeBuilder {
novelty_thresholds: Option<IndexConfig>,
/// Remote Fluree connection registry for SERVICE federation.
remote_connections: remote_service::RemoteConnectionRegistry,
/// Externally-supplied event bus. When set, `Fluree` uses this
/// instance instead of allocating its own, so external publishers
/// and subscribers on `Fluree::event_bus()` share one broadcast
/// channel.
event_bus: Option<Arc<fluree_db_nameservice::LedgerEventBus>>,
}

/// Configuration for background indexing in `FlureeBuilder`.
Expand Down Expand Up @@ -1221,12 +1230,10 @@ struct RuntimeParts {
/// `ledger_manager` on every commit / index publish; drops loaded
/// ledgers on retract.
///
/// `FlureeBuilder::build` calls this with Fluree's internal bus when
/// indexing is enabled. Deployments that publish commit events on a
/// separate bus (e.g. raft's `LedgerEventBus`) should call this again
/// with that bus so cache reconciliation also fires on those events —
/// otherwise follower nodes only refresh on initial load, and writes
/// that land between the initial load and the next reload are invisible.
/// `FlureeBuilder::build` calls this during `finalize_with_backend`
/// whenever a `LedgerManager` exists. External embedders constructing
/// `Fluree` via `Fluree::new` bypass the builder and may call this
/// directly if they want the same cache reconciliation.
pub fn spawn_local_cache_event_listener(
event_bus: Arc<fluree_db_nameservice::LedgerEventBus>,
ledger_manager: Arc<LedgerManager>,
Expand Down Expand Up @@ -1357,6 +1364,7 @@ impl FlureeBuilder {
indexing_config: Some(default_indexing_builder_config()),
novelty_thresholds: None,
remote_connections: remote_service::RemoteConnectionRegistry::new(),
event_bus: None,
}
}

Expand All @@ -1371,6 +1379,7 @@ impl FlureeBuilder {
indexing_config: None,
novelty_thresholds: None,
remote_connections: remote_service::RemoteConnectionRegistry::new(),
event_bus: None,
}
}

Expand Down Expand Up @@ -1437,6 +1446,7 @@ impl FlureeBuilder {
indexing_config: Some(default_indexing_builder_config()),
novelty_thresholds: None,
remote_connections: remote_service::RemoteConnectionRegistry::new(),
event_bus: None,
}
}

Expand Down Expand Up @@ -1629,6 +1639,7 @@ impl FlureeBuilder {
indexing_config,
novelty_thresholds: None,
remote_connections: remote_service::RemoteConnectionRegistry::new(),
event_bus: None,
})
}

Expand Down Expand Up @@ -1765,6 +1776,28 @@ impl FlureeBuilder {
self
}

/// Use a caller-supplied `LedgerEventBus` instead of allocating
/// a new one on `build*`.
///
/// Every consumer that subscribes via `Fluree::event_bus()`
/// (including the cache event listener spawned on build) then
/// observes notifications from every publisher wired to the
/// supplied bus, with no bridge task between separate instances.
pub fn with_event_bus(mut self, bus: Arc<fluree_db_nameservice::LedgerEventBus>) -> Self {
self.event_bus = Some(bus);
self
}

/// Returns the caller-supplied event bus if one was set via
/// [`with_event_bus`](Self::with_event_bus), otherwise allocates
/// a fresh bus with the historical default capacity. Called from
/// every `build_*` path so the override behaviour is uniform.
fn resolve_event_bus(&self) -> Arc<fluree_db_nameservice::LedgerEventBus> {
self.event_bus
.clone()
.unwrap_or_else(|| Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024)))
}

/// Register a remote Fluree connection for SERVICE federation.
///
/// The `name` is used in SPARQL queries as `SERVICE <fluree:remote:name/ledger> { ... }`.
Expand Down Expand Up @@ -1797,7 +1830,7 @@ impl FlureeBuilder {

let storage = FileStorage::new(&path);
let nameservice = FileNameService::new(&path);
let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024));
let event_bus = self.resolve_event_bus();
let notifying =
fluree_db_nameservice::NotifyingNameService::new(nameservice, event_bus.clone());
let backend = StorageBackend::Managed(Arc::new(storage));
Expand Down Expand Up @@ -1833,7 +1866,7 @@ impl FlureeBuilder {
storage: impl Storage + 'static,
nameservice: NameServiceMode,
) -> Fluree {
let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024));
let event_bus = self.resolve_event_bus();
let index_config = self.derive_indexing();
Self::finalize_with_backend(
self.ledger_cache_config,
Expand Down Expand Up @@ -1932,7 +1965,7 @@ impl FlureeBuilder {
let key_provider = StaticKeyProvider::new(encryption_key);
let storage = EncryptedStorage::new(file_storage, key_provider);
let nameservice = FileNameService::new(&path);
let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024));
let event_bus = self.resolve_event_bus();
let notifying =
fluree_db_nameservice::NotifyingNameService::new(nameservice, event_bus.clone());
let index_config = self.derive_indexing();
Expand Down Expand Up @@ -1972,7 +2005,7 @@ impl FlureeBuilder {
pub fn build_memory(self) -> Fluree {
let storage = MemoryStorage::new();
let nameservice = MemoryNameService::new();
let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024));
let event_bus = self.resolve_event_bus();
let notifying =
fluree_db_nameservice::NotifyingNameService::new(nameservice, event_bus.clone());
let ns_mode = NameServiceMode::ReadWrite(Arc::new(notifying));
Expand Down Expand Up @@ -2005,7 +2038,7 @@ impl FlureeBuilder {
let key_provider = StaticKeyProvider::new(encryption_key);
let storage = EncryptedStorage::new(mem_storage, key_provider);
let nameservice = MemoryNameService::new();
let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024));
let event_bus = self.resolve_event_bus();
let notifying =
fluree_db_nameservice::NotifyingNameService::new(nameservice, event_bus.clone());
let ns_mode = NameServiceMode::ReadWrite(Arc::new(notifying));
Expand Down Expand Up @@ -2055,7 +2088,7 @@ impl FlureeBuilder {
});
let backend = StorageBackend::Permanent(Arc::new(ipfs_store));
let nameservice = MemoryNameService::new();
let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024));
let event_bus = self.resolve_event_bus();
let notifying =
fluree_db_nameservice::NotifyingNameService::new(nameservice, event_bus.clone());
let ns_mode = NameServiceMode::ReadWrite(Arc::new(notifying.clone()));
Expand Down Expand Up @@ -2131,7 +2164,7 @@ impl FlureeBuilder {

// Empty prefix: S3Storage already applies its own key prefix.
let nameservice = StorageNameService::new(storage.clone(), "");
let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024));
let event_bus = self.resolve_event_bus();
let notifying =
fluree_db_nameservice::NotifyingNameService::new(nameservice, event_bus.clone());
let ns_mode = NameServiceMode::ReadWrite(Arc::new(notifying.clone()));
Expand Down Expand Up @@ -2232,7 +2265,7 @@ impl FlureeBuilder {
.await
.map_err(|e| ApiError::config(format!("Failed to ensure DynamoDB table: {e}")))?;

let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024));
let event_bus = self.resolve_event_bus();
let notifying =
fluree_db_nameservice::NotifyingNameService::new(dynamo_ns, event_bus.clone());
let ns_mode = NameServiceMode::ReadWrite(Arc::new(notifying.clone()));
Expand Down Expand Up @@ -2317,7 +2350,7 @@ impl FlureeBuilder {

// Empty prefix: S3Storage already applies its own key prefix.
let nameservice = StorageNameService::new(storage.clone(), "");
let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024));
let event_bus = self.resolve_event_bus();
let notifying =
fluree_db_nameservice::NotifyingNameService::new(nameservice, event_bus.clone());
let ns_mode = NameServiceMode::ReadWrite(Arc::new(notifying.clone()));
Expand Down Expand Up @@ -2492,10 +2525,8 @@ impl FlureeBuilder {
let _ = attachment_provider_cell.set(Arc::clone(mgr));
}

if indexing_mode.is_enabled() {
if let Some(manager) = &ledger_manager {
spawn_local_cache_event_listener(Arc::clone(&event_bus), Arc::clone(manager));
}
if let Some(manager) = &ledger_manager {
spawn_local_cache_event_listener(Arc::clone(&event_bus), Arc::clone(manager));
}

Fluree {
Expand Down Expand Up @@ -2586,7 +2617,7 @@ impl FlureeBuilder {
// Wrap with address identifier routing if configured
let storage = self.wrap_address_identifiers(base_storage)?;
let backend = StorageBackend::Managed(storage);
let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024));
let event_bus = self.resolve_event_bus();
let index_config = self.derive_indexing();
let attachment_provider_cell = Self::new_attachment_provider_cell();

Expand Down Expand Up @@ -2653,7 +2684,7 @@ impl FlureeBuilder {
// Wrap with address identifier routing if configured
let storage = self.wrap_address_identifiers(base_storage)?;
let backend = StorageBackend::Managed(storage);
let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024));
let event_bus = self.resolve_event_bus();
let index_config = self.derive_indexing();
let attachment_provider_cell = Self::new_attachment_provider_cell();

Expand Down Expand Up @@ -2719,7 +2750,7 @@ impl FlureeBuilder {
.wrap_address_identifiers_aws(base_storage, aws_handle.config())
.await?;
let backend = StorageBackend::Managed(storage);
let event_bus = Arc::new(fluree_db_nameservice::LedgerEventBus::new(1024));
let event_bus = self.resolve_event_bus();
let index_config = self.derive_indexing();
let attachment_provider_cell = Self::new_attachment_provider_cell();

Expand Down Expand Up @@ -4403,18 +4434,16 @@ pub fn fluree_memory() -> Fluree {
mod tests {
use super::*;

#[test]
fn test_fluree_builder_memory() {
#[tokio::test]
async fn test_fluree_builder_memory() {
let fluree = FlureeBuilder::memory().cache_max_mb(500).build_memory();

assert_eq!(fluree.config.cache.max_mb, 500);
}

#[test]
#[tokio::test]
#[cfg(feature = "native")]
fn test_fluree_builder_file() {
// `without_indexing()` keeps this a plain `#[test]` — the default
// background indexer would require a tokio runtime.
async fn test_fluree_builder_file() {
let result = FlureeBuilder::file("/tmp/test")
.without_indexing()
.parallelism(8)
Expand All @@ -4433,26 +4462,26 @@ mod tests {
assert!(result.is_err());
}

#[test]
fn test_fluree_memory_convenience() {
#[tokio::test]
async fn test_fluree_memory_convenience() {
let _fluree = fluree_memory();
}

// ========================================================================
// IndexConfig propagation tests (commit e6d0044)
// ========================================================================

#[test]
fn test_default_index_config_returns_defaults_without_thresholds() {
#[tokio::test]
async fn test_default_index_config_returns_defaults_without_thresholds() {
let fluree = FlureeBuilder::memory().build_memory();
let cfg = fluree.default_index_config();
let expected = server_defaults::default_index_config();
assert_eq!(cfg.reindex_min_bytes, expected.reindex_min_bytes);
assert_eq!(cfg.reindex_max_bytes, expected.reindex_max_bytes);
}

#[test]
fn test_with_indexing_thresholds_propagates_to_default_index_config() {
#[tokio::test]
async fn test_with_indexing_thresholds_propagates_to_default_index_config() {
// This is the exact scenario that was broken before e6d0044:
// custom thresholds set via the builder were silently dropped.
let fluree = FlureeBuilder::memory()
Expand Down
12 changes: 6 additions & 6 deletions fluree-db-api/src/query/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1590,8 +1590,8 @@ mod tests {
// Builder construction tests
// ========================================================================

#[test]
fn test_view_query_builder_validate_missing_input() {
#[tokio::test]
async fn test_view_query_builder_validate_missing_input() {
let fluree = FlureeBuilder::memory().build_memory();
// We can't create a view without a ledger, so test validate on FromQueryBuilder instead
let builder = FromQueryBuilder::new(&fluree);
Expand All @@ -1605,8 +1605,8 @@ mod tests {
));
}

#[test]
fn test_from_query_builder_validate_with_input() {
#[tokio::test]
async fn test_from_query_builder_validate_with_input() {
let fluree = FlureeBuilder::memory().build_memory();
let query = json!({
"from": "test:main",
Expand All @@ -1618,8 +1618,8 @@ mod tests {
assert!(result.is_ok());
}

#[test]
fn test_from_query_builder_validate_conflict() {
#[tokio::test]
async fn test_from_query_builder_validate_conflict() {
let fluree = FlureeBuilder::memory().build_memory();
let query = json!({"from": "test:main", "select": ["?s"]});
let builder = fluree
Expand Down
12 changes: 6 additions & 6 deletions fluree-db-api/src/query/nameservice_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ mod tests {
fluree
}

#[test]
fn test_builder_validate_missing_input() {
#[tokio::test]
async fn test_builder_validate_missing_input() {
let fluree = FlureeBuilder::memory().build_memory();
let builder = NameserviceQueryBuilder::new(&fluree);
let result = builder.validate();
Expand All @@ -372,8 +372,8 @@ mod tests {
));
}

#[test]
fn test_builder_validate_with_jsonld() {
#[tokio::test]
async fn test_builder_validate_with_jsonld() {
let fluree = FlureeBuilder::memory().build_memory();
let query = json!({
"select": ["?ledger"],
Expand All @@ -384,8 +384,8 @@ mod tests {
assert!(result.is_ok());
}

#[test]
fn test_builder_validate_with_sparql() {
#[tokio::test]
async fn test_builder_validate_with_sparql() {
let fluree = FlureeBuilder::memory().build_memory();
let builder = fluree
.nameservice_query()
Expand Down
Loading
Loading