diff --git a/Cargo.lock b/Cargo.lock index 8fbf7c3276..a65e93d4aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2152,6 +2152,20 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "commitlog" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bf2bc2a9c8e9369d54360cc20cd9f0d5f03e427bc3fe8c9d7bc6ba1512addff" +dependencies = [ + "byteorder", + "bytes", + "crc32c", + "log", + "memmap2", + "page_size", +] + [[package]] name = "compile-time" version = "0.2.0" @@ -2320,6 +2334,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32c" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47" +dependencies = [ + "rustc_version 0.4.1", +] + [[package]] name = "crc32fast" version = "1.5.0" @@ -2762,6 +2785,7 @@ dependencies = [ "actix", "alloy", "anyhow", + "bincode", "derivative", "e3-aggregator", "e3-config", @@ -2776,6 +2800,9 @@ dependencies = [ "e3-sortition", "e3-trbfv", "e3-utils", + "once_cell", + "tempfile", + "tokio", "tracing", ] @@ -2875,7 +2902,9 @@ dependencies = [ "anyhow", "async-trait", "bincode", + "commitlog", "e3-events", + "e3-utils", "once_cell", "serde", "sled", @@ -2936,7 +2965,9 @@ dependencies = [ "bs58", "chrono", "derivative", + "e3-ciphernode-builder", "e3-crypto", + "e3-data", "e3-events", "e3-trbfv", "e3-utils", @@ -2963,6 +2994,7 @@ dependencies = [ "anyhow", "async-trait", "base64", + "e3-ciphernode-builder", "e3-config", "e3-crypto", "e3-data", @@ -3128,6 +3160,7 @@ dependencies = [ "async-trait", "bincode", "chrono", + "e3-ciphernode-builder", "e3-config", "e3-crypto", "e3-data", @@ -3196,6 +3229,7 @@ dependencies = [ "e3-data", "e3-events", "e3-request", + "e3-utils", "num", "num-bigint", "rand 0.8.5", @@ -5437,6 +5471,15 @@ version = "2.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" +[[package]] +name = "memmap2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83faa42c0a078c393f6b29d5db232d8be22776a891f8f56e5284faee4a20b327" +dependencies = [ + "libc", +] + [[package]] name = "mime" version = "0.3.17" @@ -5999,6 +6042,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "page_size" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eebde548fbbf1ea81a99b128872779c437752fb99f217c45245e1a61dcd9edcd" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "parity-scale-codec" version = "3.7.5" diff --git a/Cargo.toml b/Cargo.toml index a39b962616..4bb2a832ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -115,6 +115,7 @@ bs58 = "=0.5.1" base64 = "=0.22.1" clap = { version = "=4.5.41", features = ["derive"] } chrono = { version = "=0.4.41", features = ["serde"] } +commitlog = "=0.2.0" compile-time = "=0.2.0" derivative = "=2.2.0" dirs = "=5.0.1" diff --git a/crates/ciphernode-builder/Cargo.toml b/crates/ciphernode-builder/Cargo.toml index 2603e56169..233b9cda94 100644 --- a/crates/ciphernode-builder/Cargo.toml +++ b/crates/ciphernode-builder/Cargo.toml @@ -11,6 +11,7 @@ actix.workspace = true alloy.workspace = true anyhow.workspace = true derivative.workspace = true +once_cell.workspace = true e3-aggregator.workspace = true e3-crypto.workspace = true e3-config.workspace = true @@ -24,4 +25,7 @@ e3-request.workspace = true e3-sortition.workspace = true e3-trbfv.workspace = true e3-utils.workspace = true +tempfile.workspace = true +tokio.workspace = true tracing.workspace = true +bincode.workspace = true diff --git a/crates/ciphernode-builder/src/ciphernode_builder.rs b/crates/ciphernode-builder/src/ciphernode_builder.rs index 54874e5de7..dc6901f655 100644 --- a/crates/ciphernode-builder/src/ciphernode_builder.rs +++ b/crates/ciphernode-builder/src/ciphernode_builder.rs @@ -4,7 +4,7 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. -use crate::CiphernodeHandle; +use crate::{CiphernodeHandle, EventSystem}; use actix::{Actor, Addr}; use alloy::signers::{k256::ecdsa::SigningKey, local::LocalSigner}; use anyhow::Result; @@ -15,8 +15,8 @@ use e3_aggregator::ext::{ }; use e3_config::chain_config::ChainConfig; use e3_crypto::Cipher; -use e3_data::{DataStore, InMemStore, Repositories, RepositoriesFactory}; -use e3_events::{BusHandle, EnclaveEvent, EventBus, EventBusConfig}; +use e3_data::{InMemStore, Repositories, RepositoriesFactory}; +use e3_events::{EnclaveEvent, EventBus, EventBusConfig}; use e3_evm::{ helpers::{ load_signer_from_repository, ConcreteReadProvider, ConcreteWriteProvider, EthProvider, @@ -32,13 +32,19 @@ use e3_keyshare::ext::{KeyshareExtension, ThresholdKeyshareExtension}; use e3_multithread::Multithread; use e3_request::E3Router; use e3_sortition::{ - CiphernodeSelector, FinalizedCommitteesRepositoryFactory, NodeStateRepositoryFactory, - Sortition, SortitionBackend, SortitionRepositoryFactory, + CiphernodeSelector, CiphernodeSelectorFactory, FinalizedCommitteesRepositoryFactory, + NodeStateRepositoryFactory, Sortition, SortitionBackend, SortitionRepositoryFactory, }; use e3_utils::{rand_eth_addr, SharedRng}; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, path::PathBuf, sync::Arc}; use tracing::{error, info}; +#[derive(Clone, Debug)] +enum EventSystemType { + Persisted { log_path: PathBuf, kv_path: PathBuf }, + InMem, +} + /// Build a ciphernode configuration. // NOTE: We could use a typestate pattern here to separate production and testing methods. I hummed // and hawed about it for quite a while and in the end felt it was too complex while we dont know @@ -46,14 +52,16 @@ use tracing::{error, info}; #[derive(Derivative)] #[derivative(Debug)] pub struct CiphernodeBuilder { + name: String, address: Option, chains: Vec, #[derivative(Debug = "ignore")] cipher: Arc, contract_components: ContractComponents, - datastore: Option, + in_mem_store: Option>, keyshare: Option, logging: bool, + event_system: EventSystemType, multithread_cache: Option>, multithread_concurrent_jobs: Option, multithread_capture_events: bool, @@ -89,19 +97,26 @@ pub enum KeyshareKind { } impl CiphernodeBuilder { - pub fn new(rng: SharedRng, cipher: Arc) -> Self { + /// Create a new ciphernode builder. + /// + /// - name - Unique name for the ciphernode + /// - rng - Arc Mutex wrapped random number generator + /// - cipher - Cipher for encryption and decryption of sensitive data + pub fn new(name: &str, rng: SharedRng, cipher: Arc) -> Self { Self { + name: name.to_owned(), address: None, chains: vec![], cipher, contract_components: ContractComponents::default(), - datastore: None, + in_mem_store: None, keyshare: None, logging: false, multithread_cache: None, plaintext_agg: false, pubkey_agg: false, multithread_concurrent_jobs: None, + event_system: EventSystemType::InMem, rng, source_bus: None, sortition_backend: SortitionBackend::score(), @@ -139,9 +154,19 @@ impl CiphernodeBuilder { self } - /// Attach an existing in mem store to the node - pub fn with_datastore(mut self, store: DataStore) -> Self { - self.datastore = Some(store); + /// Use the given in-mem datastore. This is useful for injecting a store dump. + pub fn with_in_mem_datastore(mut self, store: &Addr) -> Self { + self.in_mem_store = Some(store.to_owned()); + self + } + + /// Add persistence information for storing events and data. Without persistence information + /// the node will run in memory by default. + pub fn with_persistence(mut self, log_path: &PathBuf, kv_path: &PathBuf) -> Self { + self.event_system = EventSystemType::Persisted { + log_path: log_path.to_owned(), + kv_path: kv_path.to_owned(), + }; self } @@ -166,7 +191,7 @@ impl CiphernodeBuilder { self } - /// Use the given Address to represent the node + /// Use the given Address to represent the node. This should be unique. pub fn with_address(mut self, addr: &str) -> Self { self.address = Some(addr.to_owned()); self @@ -298,9 +323,6 @@ impl CiphernodeBuilder { None }; - // Get a handle from the event bus - let bus = BusHandle::new_from_consumer(local_bus); - let addr = if let Some(addr) = self.address.clone() { info!("Using eth address = {}", addr); addr @@ -310,10 +332,20 @@ impl CiphernodeBuilder { rand_eth_addr(&self.rng) }; - let store = self - .datastore - .clone() - .unwrap_or_else(|| (&InMemStore::new(self.logging).start()).into()); + // Get an event system instance. + let event_system = + if let EventSystemType::Persisted { kv_path, log_path } = self.event_system.clone() { + EventSystem::persisted(&addr, log_path, kv_path).with_event_bus(local_bus) + } else { + if let Some(ref store) = self.in_mem_store { + EventSystem::in_mem_from_store(&addr, store).with_event_bus(local_bus) + } else { + EventSystem::in_mem(&addr).with_event_bus(local_bus) + } + }; + + let bus = event_system.handle()?; + let store = event_system.store()?; let repositories = store.repositories(); @@ -329,7 +361,8 @@ impl CiphernodeBuilder { ) .await?; - CiphernodeSelector::attach(&bus, &sortition, &addr, &store); + CiphernodeSelector::attach(&bus, &sortition, repositories.ciphernode_selector(), &addr) + .await?; let mut provider_cache = ProviderCaches::new(); let cipher = &self.cipher; diff --git a/crates/ciphernode-builder/src/event_system.rs b/crates/ciphernode-builder/src/event_system.rs new file mode 100644 index 0000000000..73ad4dfba0 --- /dev/null +++ b/crates/ciphernode-builder/src/event_system.rs @@ -0,0 +1,471 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use crate::get_enclave_event_bus; +use actix::{Actor, Addr, Recipient}; +use anyhow::{anyhow, Result}; +use e3_data::{ + CommitLogEventLog, DataStore, ForwardTo, InMemEventLog, InMemSequenceIndex, InMemStore, + InsertBatch, SledSequenceIndex, SledStore, WriteBuffer, +}; +use e3_events::hlc::Hlc; +use e3_events::{BusHandle, EnclaveEvent, EventBus, EventBusConfig, EventStore, Sequencer}; +use once_cell::sync::OnceCell; +use std::hash::{DefaultHasher, Hash, Hasher}; +use std::path::PathBuf; + +/// Hold the InMem EventStore instance and InMemStore +struct InMemBackend { + eventstore: OnceCell>>, + store: OnceCell>, +} + +/// Hold the Persistent EventStore instance and SledStore +struct PersistedBackend { + log_path: PathBuf, + sled_path: PathBuf, + eventstore: OnceCell>>, + store: OnceCell>, +} + +/// An EventSystemBackend is holding the potentially persistent structures for the system +enum EventSystemBackend { + InMem(InMemBackend), + Persisted(PersistedBackend), +} + +pub enum EventStoreAddr { + InMem(Addr>), + Persisted(Addr>), +} + +impl TryFrom for Addr> { + type Error = anyhow::Error; + fn try_from(value: EventStoreAddr) -> std::result::Result { + if let EventStoreAddr::InMem(addr) = value { + Ok(addr) + } else { + Err(anyhow!( + "address was not EventStore" + )) + } + } +} + +/// EventSystem holds interconnected references to the components that manage events and +/// persistence within the node. The EventSystem connects: +/// +/// - **BusHandle** for interacting with the event system +/// - **EventBus** for managing publishing of events to listeners +/// - **EventStore** for managing persistence of events +/// - **Sequencer** for managing sequencing of event persistence and snapshot coordination +/// - **WriteBuffer** for batching inserts from actors into a snapshot +/// +pub struct EventSystem { + /// A nodes id to be used as a tiebreaker in logical clock timestamp differentiation + node_id: u32, + /// EventSystem backend either persisted or in memory + backend: EventSystemBackend, + /// WriteBuffer for batching inserts from actors into a snapshot + buffer: OnceCell>, + /// EventSystem Sequencer + sequencer: OnceCell>, + /// EventSystem eventbus + eventbus: OnceCell>>, + /// EventSystem BusHandle + handle: OnceCell, + /// A OnceLock that is used to indicate whether the system is wired to write snapshots + wired: OnceCell<()>, + /// Hlc override + hlc: OnceCell, +} + +impl EventSystem { + /// Create a new in memory EventSystem with default settings + pub fn new(name: &str) -> Self { + EventSystem::in_mem(name) + } + + /// Create an in memory EventSystem + pub fn in_mem(node_id: &str) -> Self { + Self { + node_id: EventSystem::node_id(node_id), + backend: EventSystemBackend::InMem(InMemBackend { + eventstore: OnceCell::new(), + store: OnceCell::new(), + }), + buffer: OnceCell::new(), + sequencer: OnceCell::new(), + eventbus: OnceCell::new(), + handle: OnceCell::new(), + wired: OnceCell::new(), + hlc: OnceCell::new(), + } + } + + /// Create an in memory EventSystem with a given store + pub fn in_mem_from_store(node_id: &str, store: &Addr) -> Self { + Self { + node_id: EventSystem::node_id(node_id), + backend: EventSystemBackend::InMem(InMemBackend { + eventstore: OnceCell::new(), + store: OnceCell::from(store.to_owned()), + }), + buffer: OnceCell::new(), + sequencer: OnceCell::new(), + eventbus: OnceCell::new(), + handle: OnceCell::new(), + wired: OnceCell::new(), + hlc: OnceCell::new(), + } + } + + /// Create a persisted EventSystem with datafiles at the given paths + pub fn persisted(node_id: &str, log_path: PathBuf, sled_path: PathBuf) -> Self { + Self { + node_id: EventSystem::node_id(node_id), + backend: EventSystemBackend::Persisted(PersistedBackend { + log_path, + sled_path, + eventstore: OnceCell::new(), + store: OnceCell::new(), + }), + buffer: OnceCell::new(), + sequencer: OnceCell::new(), + eventbus: OnceCell::new(), + handle: OnceCell::new(), + wired: OnceCell::new(), + hlc: OnceCell::new(), + } + } + + /// Pass in a specific given event bus + pub fn with_event_bus(self, bus: Addr>) -> Self { + let _ = self.eventbus.set(bus); + self + } + + /// Use a fresh event bus that is not the default singleton instance + pub fn with_fresh_bus(self) -> Self { + let _ = self + .eventbus + .set(EventBus::new(EventBusConfig { deduplicate: true }).start()); + self + } + + /// Add an injected hlc + pub fn with_hlc(self, hlc: Hlc) -> Self { + let _ = self.hlc.set(hlc); + self + } + + /// Get the eventbus address + pub fn eventbus(&self) -> Addr> { + self.eventbus.get_or_init(get_enclave_event_bus).clone() + } + + /// Get the buffer address + pub fn buffer(&self) -> Addr { + let buffer = self + .buffer + .get_or_init(|| WriteBuffer::new().start()) + .clone(); + self.wire_if_ready(); + buffer + } + + /// Get the sequencer address + pub fn sequencer(&self) -> Result> { + self.sequencer + .get_or_try_init(|| match self.eventstore()? { + EventStoreAddr::InMem(es) => { + Ok(Sequencer::new(&self.eventbus(), es, self.buffer()).start()) + } + EventStoreAddr::Persisted(es) => { + Ok(Sequencer::new(&self.eventbus(), es, self.buffer()).start()) + } + }) + .cloned() + } + + /// Get the EventStore address + pub fn eventstore(&self) -> Result { + match &self.backend { + EventSystemBackend::InMem(b) => { + let addr = b + .eventstore + .get_or_init(|| { + EventStore::new(InMemSequenceIndex::new(), InMemEventLog::new()).start() + }) + .clone(); + Ok(EventStoreAddr::InMem(addr)) + } + EventSystemBackend::Persisted(b) => { + let addr = b + .eventstore + .get_or_try_init(|| -> Result<_> { + let index = SledSequenceIndex::new(&b.sled_path, "sequence_index")?; + let log = CommitLogEventLog::new(&b.log_path)?; + Ok(EventStore::new(index, log).start()) + })? + .clone(); + Ok(EventStoreAddr::Persisted(addr)) + } + } + } + + /// Get an instance of the Hlc + pub fn hlc(&self) -> Result { + self.hlc + .get_or_try_init(|| Ok(Hlc::new(self.node_id))) + .cloned() + } + + /// Get the BusHandle + pub fn handle(&self) -> Result { + self.handle + .get_or_try_init(|| { + Ok(BusHandle::new( + self.eventbus(), + self.sequencer()?, + self.hlc()?, + )) + }) + .cloned() + } + + /// Get the DataStore + pub fn store(&self) -> Result { + let store = match &self.backend { + EventSystemBackend::InMem(b) => { + let addr = b + .store + .get_or_init(|| InMemStore::new(true).start()) + .clone(); + DataStore::from_in_mem(&addr, &self.buffer()) + } + EventSystemBackend::Persisted(b) => { + let addr = b + .store + .get_or_try_init(|| { + let handle = self.handle()?; + SledStore::new(&handle, &b.sled_path) + })? + .clone(); + DataStore::from_sled_store(&addr, &self.buffer()) + } + }; + self.wire_if_ready(); + Ok(store) + } + + // We need to ensure that once the buffer and store are created they are connected so that + // inserts are sent between the two actors. This internal function ensures this happens. + fn wire_if_ready(&self) { + let buffer = match self.buffer.get() { + Some(b) => b, + None => return, + }; + + let store: Option> = match &self.backend { + EventSystemBackend::InMem(b) => b.store.get().cloned().map(Into::into), + EventSystemBackend::Persisted(b) => b.store.get().cloned().map(Into::into), + }; + + let Some(store) = store else { + return; + }; + + // Now we know both are ready, so initialization will succeed + self.wired.get_or_init(|| { + buffer.do_send(ForwardTo::new(store)); + }); + } + + fn node_id(name: &str) -> u32 { + let mut hasher = DefaultHasher::new(); + name.hash(&mut hasher); + hasher.finish() as u32 + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + use actix::Actor; + use actix::Handler; + use actix::Message; + + use e3_events::prelude::*; + use e3_events::EnclaveEventData; + use e3_events::GetEventsAfter; + use e3_events::ReceiveEvents; + use e3_events::TestEvent; + use tempfile::TempDir; + use tokio::time::sleep; + + // Setup Listener for the test + #[derive(Message, Debug)] + #[rtype("Vec")] + struct GetLogs; + + #[derive(Message, Debug)] + #[rtype("Vec")] + struct GetEvents; + + struct Listener { + logs: Vec, + events: Vec, + } + + impl Handler for Listener { + type Result = (); + fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result { + if let EnclaveEventData::TestEvent(TestEvent { msg, .. }) = msg.into_data() { + self.logs.push(msg); + } + } + } + + impl Handler for Listener { + type Result = Vec; + fn handle(&mut self, _: GetLogs, _: &mut Self::Context) -> Self::Result { + self.logs.clone() + } + } + + impl Handler for Listener { + type Result = Vec; + fn handle(&mut self, _: GetEvents, _: &mut Self::Context) -> Self::Result { + self.events + .iter() + .filter_map(|event| { + if let EnclaveEventData::TestEvent(evt) = event.get_data() { + return Some(evt.msg.clone()); + } + None + }) + .collect::>() + } + } + + impl Handler for Listener { + type Result = (); + fn handle(&mut self, msg: ReceiveEvents, _: &mut Self::Context) -> Self::Result { + self.events = msg.events().clone(); + } + } + + impl Actor for Listener { + type Context = actix::Context; + } + + #[actix::test] + async fn test_persisted() { + let tmp = TempDir::new().unwrap(); + let system = EventSystem::persisted("cn2", tmp.path().join("log"), tmp.path().join("sled")); + + let _handle = system.handle().expect("Failed to get handle"); + system.store().expect("Failed to get store"); + + // Wiring happened automatically + assert!(system.wired.get().is_some()); + } + + #[actix::test] + async fn test_in_mem() { + let eventbus = EventBus::::default().start(); + let system = EventSystem::in_mem("cn1").with_event_bus(eventbus); + + let _handle = system.handle().expect("Failed to get handle"); + system.store().expect("Failed to get store"); + + // Wiring happened automatically + assert!(system.wired.get().is_some()); + } + + #[actix::test] + async fn test_event_system() -> Result<()> { + let system = EventSystem::in_mem("cn1").with_fresh_bus(); + let handle = system.handle()?; + let datastore = system.store()?; + let eventstore = system.eventstore()?; + let listener = Listener { + logs: Vec::new(), + events: Vec::new(), + } + .start(); + + // Send all evts to the listener + handle.subscribe("*", listener.clone().into()); + + // Lets store some data + datastore.scope("/foo/name").write("Fred".to_string()); + datastore.scope("/foo/age").write(21u64); + datastore + .scope("/foo/occupation") + .write("developer".to_string()); + + // NOTE: Eventual consistency + // Store should not have data set on it until event has been published + // There is an argument we should instead delay reads until the event has been stored but + // this would: + // a. Promote poor patterns of sharing data through persistence + // b. Add a large amount of complexity to batching Get operations + // For now we allow this inconsistency under the assumption that data is written for + // snapshot storage exclusively. + + // Let's check the eventual consistency all data points should be none... + assert_eq!(datastore.scope("/foo/name").read::().await?, None); + assert_eq!(datastore.scope("/foo/age").read::().await?, None); + assert_eq!( + datastore.scope("/foo/occupation").read::().await?, + None + ); + + // Push an event + handle.publish(TestEvent::new("pink", 1))?; + sleep(Duration::from_millis(1)).await; + + // Now we have published an event all data should be written we can get the data from the store + assert_eq!( + datastore.scope("/foo/name").read::().await?, + Some("Fred".to_string()) + ); + assert_eq!(datastore.scope("/foo/age").read::().await?, Some(21)); + assert_eq!( + datastore.scope("/foo/occupation").read::().await?, + Some("developer".to_string()) + ); + + // Get a timestamp + let ts = handle.ts()?; + + // Push a few other events + handle.publish(TestEvent::new("yellow", 1))?; + handle.publish(TestEvent::new("red", 1))?; + handle.publish(TestEvent::new("white", 1))?; + sleep(Duration::from_millis(100)).await; + + // Get the event logs from the listener + let logs = listener.send(GetLogs).await?; + assert_eq!(logs, vec!["pink", "yellow", "red", "white"]); + + // Get the in mem address for the event store + let es: Addr> = eventstore.try_into()?; + + // Get all events after the given timestamp and send them to the listener + es.do_send(GetEventsAfter::new(ts, listener.clone())); + sleep(Duration::from_millis(100)).await; + + // Pull the events off the listsner since the timestamp + let events = listener.send(GetEvents).await?; + assert_eq!(events, vec!["yellow", "red", "white"]); + Ok(()) + } +} diff --git a/crates/events/src/eventbus_factory.rs b/crates/ciphernode-builder/src/eventbus_factory.rs similarity index 88% rename from crates/events/src/eventbus_factory.rs rename to crates/ciphernode-builder/src/eventbus_factory.rs index f251d86622..a8c6e9a490 100644 --- a/crates/events/src/eventbus_factory.rs +++ b/crates/ciphernode-builder/src/eventbus_factory.rs @@ -6,18 +6,21 @@ use actix::Actor; use actix::Addr; +use e3_config::AppConfig; use once_cell::sync::Lazy; use std::any::Any; use std::any::TypeId; use std::collections::HashMap; use std::sync::Mutex; -use crate::traits::Event; -use crate::BusHandle; -use crate::EnclaveEvent; -use crate::EventBus; -use crate::HistoryCollector; -use crate::Subscribe; +use e3_events::BusHandle; +use e3_events::EnclaveEvent; +use e3_events::Event; +use e3_events::EventBus; +use e3_events::HistoryCollector; +use e3_events::Subscribe; + +use crate::EventSystem; // The singleton factory using once_cell pub struct EventBusFactory { @@ -93,7 +96,8 @@ pub fn get_error_collector() -> Addr> { EventBusFactory::instance().get_error_collector() } -pub fn get_enclave_bus_handle() -> BusHandle { +pub fn get_enclave_bus_handle(config: &AppConfig) -> anyhow::Result { let bus = get_enclave_event_bus(); - BusHandle::new_from_consumer(bus) + let system = EventSystem::new(&config.name()).with_event_bus(bus); + Ok(system.handle()?) } diff --git a/crates/ciphernode-builder/src/lib.rs b/crates/ciphernode-builder/src/lib.rs index d2847b6d67..c77952b744 100644 --- a/crates/ciphernode-builder/src/lib.rs +++ b/crates/ciphernode-builder/src/lib.rs @@ -6,5 +6,9 @@ mod ciphernode; mod ciphernode_builder; +mod event_system; +mod eventbus_factory; pub use ciphernode::*; pub use ciphernode_builder::*; +pub use event_system::*; +pub use eventbus_factory::*; diff --git a/crates/config/src/app_config.rs b/crates/config/src/app_config.rs index 1298e15da9..8bed2fe5d5 100644 --- a/crates/config/src/app_config.rs +++ b/crates/config/src/app_config.rs @@ -57,6 +57,8 @@ pub struct NodeDefinition { pub db_file: PathBuf, /// The name for the keyfile pub key_file: PathBuf, + /// The name for the logfile + pub log_file: PathBuf, /// The data dir for enclave defaults to `~/.local/share/enclave/{name}` pub data_dir: PathBuf, /// Override the base folder for enclave configuration defaults to `~/.config/enclave/{name}` on linux @@ -80,6 +82,7 @@ impl Default for NodeDefinition { quic_port: 9091, key_file: PathBuf::from("key"), // ~/.config/enclave/key db_file: PathBuf::from("db"), // ~/.config/enclave/db + log_file: PathBuf::from("log"), // ~/.config/enclave/log config_dir: std::path::PathBuf::new(), // ~/.config/enclave data_dir: std::path::PathBuf::new(), // ~/.config/enclave role: NodeRole::Ciphernode, @@ -216,6 +219,7 @@ impl AppConfig { data_dir_override, Some(&node.db_file), Some(&node.key_file), + Some(&node.log_file), ); Ok(AppConfig { @@ -247,6 +251,11 @@ impl AppConfig { self.paths.db_file() } + /// Get the log file + pub fn log_file(&self) -> PathBuf { + self.paths.log_file() + } + fn node_def(&self) -> &NodeDefinition { // NOTE: on creation an invariant we have is that our node name is an extant key in our // nodes datastructure so expect here is ok and we dont have to clone the NodeDefinition diff --git a/crates/config/src/paths_engine.rs b/crates/config/src/paths_engine.rs index 124f531615..891fceb0f3 100644 --- a/crates/config/src/paths_engine.rs +++ b/crates/config/src/paths_engine.rs @@ -24,6 +24,9 @@ pub struct PathsEngine { /// This can either be a fully qualified path to a specific db file or a relative path to the /// data_dir location db_file_override: Option, + /// This can either be a fully qualified path to a specific log file or a relative path to the + /// data_dir location + log_file_override: Option, /// This can either be a fully qualified path to a specific key file or a relative path to the /// config_dir location key_file_override: Option, @@ -38,6 +41,7 @@ pub struct PathsEngine { pub const DEFAULT_CONFIG_NAME: &str = "enclave.config.yaml"; pub const DEFAULT_KEY_NAME: &str = "key"; pub const DEFAULT_DB_NAME: &str = "db"; +pub const DEFAULT_LOG_NAME: &str = "log"; // Find the config file is specified anywhere upstream from cwd and if found then locate the // data and config folders under .enclave/data and .enclave/config relative to the location of @@ -54,6 +58,7 @@ impl PathsEngine { data_dir_override: Option<&PathBuf>, db_file_override: Option<&PathBuf>, key_file_override: Option<&PathBuf>, + log_file_override: Option<&PathBuf>, ) -> Self { Self { name: name.to_owned(), @@ -65,6 +70,7 @@ impl PathsEngine { data_dir_override: data_dir_override.map(PathBuf::from), db_file_override: db_file_override.map(PathBuf::from), key_file_override: key_file_override.map(PathBuf::from), + log_file_override: log_file_override.map(PathBuf::from), } } @@ -106,6 +112,17 @@ impl PathsEngine { clean(self.get_data_dir().join(&self.name).join(DEFAULT_DB_NAME)) } + pub fn log_file(&self) -> PathBuf { + if let Some(log_file) = self.log_file_override.clone() { + if log_file.is_absolute() { + return clean(log_file); + } else { + return clean(self.get_data_dir().join(&self.name).join(log_file)); + } + } + clean(self.get_data_dir().join(&self.name).join(DEFAULT_LOG_NAME)) + } + pub fn relative_to_config(&self, path: &PathBuf) -> PathBuf { if path.is_absolute() { return PathBuf::from(path); @@ -175,6 +192,7 @@ mod test { found_config_file: Option<&'static str>, data_dir_override: Option<&'static str>, db_file_override: Option<&'static str>, + log_file_override: Option<&'static str>, key_file_override: Option<&'static str>, } @@ -182,6 +200,7 @@ mod test { config_file: &'static str, key_file: &'static str, db_file: &'static str, + log_file: &'static str, } fn test_cases(test_cases: Vec) { @@ -197,6 +216,7 @@ mod test { let data_dir_override = test_case.input.data_dir_override.map(PathBuf::from); let db_file = test_case.input.db_file_override.map(PathBuf::from); let key_file = test_case.input.key_file_override.map(PathBuf::from); + let log_file = test_case.input.log_file_override.map(PathBuf::from); let cwd = PathBuf::from(test_case.input.cwd); let paths = PathsEngine::new( @@ -209,6 +229,7 @@ mod test { data_dir_override.as_ref(), db_file.as_ref(), key_file.as_ref(), + log_file.as_ref(), ); assert_eq!( @@ -229,6 +250,13 @@ mod test { "Failed db_file assertion for test case: {}", test_case.name ); + + assert_eq!( + paths.log_file(), + PathBuf::from(test_case.expected.log_file), + "Failed log_file assertion for test case: {}", + test_case.name + ); } } @@ -247,11 +275,13 @@ mod test { data_dir_override: None, db_file_override: None, key_file_override: None, + log_file_override: None, }, expected: PathsExpected { config_file: "/home/user/.config/enclave/enclave.config.yaml", key_file: "/home/user/.config/enclave/_default/key", db_file: "/home/user/.local/share/enclave/_default/db", + log_file: "/home/user/.local/share/enclave/_default/log", }, }, TestCase { @@ -266,11 +296,13 @@ mod test { data_dir_override: None, db_file_override: None, key_file_override: None, + log_file_override: None, }, expected: PathsExpected { config_file: "/foo/some.config.yaml", key_file: "/foo/.enclave/config/_default/key", db_file: "/foo/.enclave/data/_default/db", + log_file: "/foo/.enclave/data/_default/log", }, }, TestCase { @@ -285,11 +317,13 @@ mod test { data_dir_override: Some("/path/to/data"), db_file_override: None, key_file_override: None, + log_file_override: None, }, expected: PathsExpected { config_file: "/foo/some.config.yaml", key_file: "/foo/.enclave/config/_default/key", db_file: "/path/to/data/_default/db", + log_file: "/path/to/data/_default/log", }, }, TestCase { @@ -304,11 +338,13 @@ mod test { data_dir_override: Some("/path/to/data"), db_file_override: None, key_file_override: None, + log_file_override: None, }, expected: PathsExpected { config_file: "/foo/some.config.yaml", key_file: "/confy/stuff/_default/key", db_file: "/path/to/data/_default/db", + log_file: "/path/to/data/_default/log", }, }, TestCase { @@ -323,11 +359,13 @@ mod test { data_dir_override: Some("/path/to/data"), db_file_override: None, key_file_override: Some("/ding/bat/key_file"), + log_file_override: None, }, expected: PathsExpected { config_file: "/foo/some.config.yaml", key_file: "/ding/bat/key_file", db_file: "/path/to/data/_default/db", + log_file: "/path/to/data/_default/log", }, }, TestCase { @@ -342,11 +380,14 @@ mod test { data_dir_override: Some("/path/to/data"), db_file_override: None, key_file_override: Some("../bat/key_file"), + log_file_override: None, }, + expected: PathsExpected { config_file: "/foo/some.config.yaml", key_file: "/confy/stuff/bat/key_file", db_file: "/path/to/data/_default/db", + log_file: "/path/to/data/_default/log", }, }, TestCase { @@ -361,11 +402,13 @@ mod test { data_dir_override: Some("/path/to/data"), db_file_override: Some("/ding/blat/foo/my/data"), key_file_override: Some("../bat/key_file"), + log_file_override: Some("../ding/loggy"), }, expected: PathsExpected { config_file: "/foo/some.config.yaml", key_file: "/confy/stuff/bat/key_file", db_file: "/ding/blat/foo/my/data", + log_file: "/path/to/data/ding/loggy", }, }, TestCase { @@ -380,11 +423,13 @@ mod test { data_dir_override: Some("/path/to/data"), db_file_override: Some("../../yes"), key_file_override: Some("../bat/key_file"), + log_file_override: None, }, expected: PathsExpected { config_file: "/foo/some.config.yaml", key_file: "/confy/stuff/bat/key_file", db_file: "/path/to/yes", + log_file: "/path/to/data/_default/log", }, }, ]); diff --git a/crates/config/src/store_keys.rs b/crates/config/src/store_keys.rs index 5cdc6ee26f..6b0eae21ae 100644 --- a/crates/config/src/store_keys.rs +++ b/crates/config/src/store_keys.rs @@ -72,4 +72,8 @@ impl StoreKeys { pub fn finalized_committees() -> String { String::from("//finalized_committees") } + + pub fn ciphernode_selector() -> String { + String::from("//ciphernode_selector") + } } diff --git a/crates/data/Cargo.toml b/crates/data/Cargo.toml index 60479f452b..59c4aa6987 100644 --- a/crates/data/Cargo.toml +++ b/crates/data/Cargo.toml @@ -9,6 +9,7 @@ repository = "https://github.com/gnosisguild/enclave/crates/data" [dependencies] actix = { workspace = true } e3-events = { workspace = true } +e3-utils = { workspace = true } anyhow = { workspace = true } serde = { workspace = true } sled = { workspace = true } @@ -17,3 +18,4 @@ tracing = { workspace = true } async-trait = { workspace = true } once_cell = { workspace = true } tempfile = { workspace = true } +commitlog = { workspace = true } diff --git a/crates/data/src/commit_log_event_log.rs b/crates/data/src/commit_log_event_log.rs new file mode 100644 index 0000000000..5905f1b7dd --- /dev/null +++ b/crates/data/src/commit_log_event_log.rs @@ -0,0 +1,166 @@ +// SPDX-License-Identifier: LGPL-2.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use anyhow::Context; +use anyhow::Result; +use commitlog::message::MessageSet; +use commitlog::{CommitLog, LogOptions, ReadLimit}; +use e3_events::{EnclaveEvent, EventLog, Unsequenced}; +use std::path::PathBuf; +use tracing::error; + +pub struct CommitLogEventLog { + log: CommitLog, +} + +impl CommitLogEventLog { + pub fn new(path: &PathBuf) -> Result { + let mut opts = LogOptions::new(path); + // TODO: drive this from config - currently set high to be permissive + opts.message_max_bytes(32 * 1024 * 1024); + let log = CommitLog::new(opts)?; + Ok(Self { log }) + } + + fn append_bytes(&mut self, bytes: &[u8]) -> Result { + let offset = self + .log + .append_msg(&bytes) + .context("Failed to append to event log")?; + // Return 1-indexed sequence number + Ok(offset + 1) + } +} + +impl EventLog for CommitLogEventLog { + fn append(&mut self, event: &EnclaveEvent) -> Result { + let bytes = bincode::serialize(event)?; + self.append_bytes(&bytes) + } + + fn read_from(&self, from: u64) -> Box)>> { + // Convert 1-indexed sequence to 0-indexed offset + let mut current_offset = from.saturating_sub(1); + let mut events = Vec::new(); + + loop { + let message_buf = match self.log.read(current_offset, ReadLimit::default()) { + Ok(msgs) => msgs, + Err(_) => break, + }; + + let mut count = 0; + for msg in message_buf.iter() { + if let Ok(event) = bincode::deserialize::>(msg.payload()) + { + // Convert 0-indexed offset back to 1-indexed sequence number + events.push((msg.offset() + 1, event)); + } else { + error!("Error deserializing event in read_from... skipping"); + } + current_offset = msg.offset() + 1; // Next offset to read from + count += 1; + } + + // No more messages to read + if count == 0 { + break; + } + } + + Box::new(events.into_iter()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use e3_events::{EnclaveEventData, EventConstructorWithTimestamp, TestEvent}; + use tempfile::tempdir; + + fn event_from(data: impl Into) -> EnclaveEvent { + EnclaveEvent::::new_with_timestamp(data.into().into(), 123) + } + + #[test] + fn test_append_and_read() { + let dir = tempdir().unwrap(); + let mut log = CommitLogEventLog::new(&dir.path().to_path_buf()).unwrap(); + + let event1 = event_from(TestEvent::new("one", 1)); + let event2 = event_from(TestEvent::new("two", 2)); + + let offset1 = log.append(&event1).unwrap(); + let offset2 = log.append(&event2).unwrap(); + + assert_eq!(offset1, 1); // 1-indexed + assert_eq!(offset2, 2); + + // Read back from the beginning + let events: Vec<_> = log.read_from(1).collect(); + assert_eq!(events.len(), 2); + assert_eq!(events[0].0, 1); + assert_eq!(events[1].0, 2); + } + + #[test] + fn test_read_from_offset() { + let dir = tempdir().unwrap(); + let mut log = CommitLogEventLog::new(&dir.path().to_path_buf()).unwrap(); + + let event1 = event_from(TestEvent::new("one", 1)); + let event2 = event_from(TestEvent::new("two", 2)); + let event3 = event_from(TestEvent::new("three", 3)); + + log.append(&event1).unwrap(); + log.append(&event2).unwrap(); + log.append(&event3).unwrap(); + + // Read from offset 2 (should get events 2 and 3) + let events: Vec<_> = log.read_from(2).collect(); + assert_eq!(events.len(), 2); + assert_eq!(events[0].0, 2); + assert_eq!(events[1].0, 3); + } + + #[test] + fn test_read_from_corruption_at_end_causes_infinite_loop() { + let dir = tempdir().unwrap(); + let mut log = CommitLogEventLog::new(&dir.path().to_path_buf()).unwrap(); + + for i in 0..100 { + let e = event_from(TestEvent::new("myevent", i)); + log.append(&e).unwrap(); + } + // Corrupt the last message + log.append_bytes(b"I am a bad event!").unwrap(); + + // Ensure if last message is corrupt we don't end up in an infinite loop + let _: Vec<_> = log.read_from(1).collect(); + } + + #[test] + fn test_read_empty_log() { + let dir = tempdir().unwrap(); + let log = CommitLogEventLog::new(&dir.path().to_path_buf()).unwrap(); + + let events: Vec<_> = log.read_from(1).collect(); + assert!(events.is_empty()); + } + + #[test] + fn test_read_past_end() { + let dir = tempdir().unwrap(); + let mut log = CommitLogEventLog::new(&dir.path().to_path_buf()).unwrap(); + + let event = event_from(TestEvent::new("one", 1)); + log.append(&event).unwrap(); + + // Read from offset beyond what exists + let events: Vec<_> = log.read_from(100).collect(); + assert!(events.is_empty()); + } +} diff --git a/crates/data/src/data_store.rs b/crates/data/src/data_store.rs index 84ce71e4fc..b2df9273cf 100644 --- a/crates/data/src/data_store.rs +++ b/crates/data/src/data_store.rs @@ -6,80 +6,15 @@ use std::borrow::Cow; +use crate::{Get, Insert, InsertSync, Remove, WriteBuffer}; use crate::{InMemStore, IntoKey, SledStore}; -use actix::{Addr, Message, Recipient}; +use actix::{Addr, Recipient}; use anyhow::anyhow; use anyhow::Context; use anyhow::Result; use serde::{Deserialize, Serialize}; use tracing::error; -#[derive(Message, Clone, Debug, PartialEq, Eq, Hash)] -#[rtype(result = "()")] -pub struct Insert(pub Vec, pub Vec); -impl Insert { - pub fn new(key: K, value: Vec) -> Self { - Self(key.into_key(), value) - } - - pub fn key(&self) -> &Vec { - &self.0 - } - - pub fn value(&self) -> &Vec { - &self.1 - } -} - -#[derive(Message, Clone, Debug, PartialEq, Eq, Hash)] -#[rtype(result = "Result<()>")] -pub struct InsertSync(pub Vec, pub Vec); -impl InsertSync { - pub fn new(key: K, value: Vec) -> Self { - Self(key.into_key(), value) - } - - pub fn key(&self) -> &Vec { - &self.0 - } - - pub fn value(&self) -> &Vec { - &self.1 - } -} - -impl From for Insert { - fn from(value: InsertSync) -> Self { - Insert::new(value.key(), value.value().clone()) - } -} - -#[derive(Message, Clone, Debug, PartialEq, Eq, Hash)] -#[rtype(result = "Option>")] -pub struct Get(pub Vec); -impl Get { - pub fn new(key: K) -> Self { - Self(key.into_key()) - } - - pub fn key(&self) -> &Vec { - &self.0 - } -} - -#[derive(Message, Clone, Debug, PartialEq, Eq, Hash)] -#[rtype(result = "()")] -pub struct Remove(pub Vec); -impl Remove { - pub fn new(key: K) -> Self { - Self(key.into_key()) - } - - pub fn key(&self) -> &Vec { - &self.0 - } -} - #[derive(Clone, Debug)] pub enum StoreAddr { InMem(Addr), @@ -87,13 +22,6 @@ pub enum StoreAddr { } impl StoreAddr { - pub fn to_data_store(&self) -> DataStore { - match self { - StoreAddr::InMem(s) => s.into(), - StoreAddr::Sled(s) => s.into(), - } - } - pub fn to_maybe_in_mem(&self) -> Option<&Addr> { match self { StoreAddr::InMem(ref store) => Some(store), @@ -217,6 +145,28 @@ impl DataStore { scope: key.into_key(), } } + + pub fn from_sled_store(addr: &Addr, write_buffer: &Addr) -> Self { + Self { + addr: StoreAddr::Sled(addr.clone()), + get: addr.clone().recipient(), + insert: write_buffer.clone().recipient(), + insert_sync: addr.clone().recipient(), + remove: addr.clone().recipient(), + scope: vec![], + } + } + + pub fn from_in_mem(addr: &Addr, write_buffer: &Addr) -> Self { + Self { + addr: StoreAddr::InMem(addr.clone()), + get: addr.clone().recipient(), + insert: write_buffer.clone().recipient(), + insert_sync: addr.clone().recipient(), + remove: addr.clone().recipient(), + scope: vec![], + } + } } impl From<&Addr> for DataStore { diff --git a/crates/data/src/events.rs b/crates/data/src/events.rs new file mode 100644 index 0000000000..6ecfc25d40 --- /dev/null +++ b/crates/data/src/events.rs @@ -0,0 +1,88 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use crate::IntoKey; +use actix::Message; +use anyhow::Result; + +#[derive(Message, Clone, Debug, PartialEq, Eq, Hash)] +#[rtype(result = "()")] +pub struct Insert(pub Vec, pub Vec); +impl Insert { + pub fn new(key: K, value: Vec) -> Self { + Self(key.into_key(), value) + } + + pub fn key(&self) -> &Vec { + &self.0 + } + + pub fn value(&self) -> &Vec { + &self.1 + } +} + +#[derive(Message, Clone, Debug, PartialEq, Eq, Hash)] +#[rtype(result = "()")] +pub struct InsertBatch(pub Vec); +impl InsertBatch { + pub fn new(commands: Vec) -> Self { + Self(commands) + } + + pub fn commands(&self) -> &Vec { + &self.0 + } +} + +#[derive(Message, Clone, Debug, PartialEq, Eq, Hash)] +#[rtype(result = "Result<()>")] +pub struct InsertSync(pub Vec, pub Vec); +impl InsertSync { + pub fn new(key: K, value: Vec) -> Self { + Self(key.into_key(), value) + } + + pub fn key(&self) -> &Vec { + &self.0 + } + + pub fn value(&self) -> &Vec { + &self.1 + } +} + +impl From for Insert { + fn from(value: InsertSync) -> Self { + Insert::new(value.key(), value.value().clone()) + } +} + +#[derive(Message, Clone, Debug, PartialEq, Eq, Hash)] +#[rtype(result = "Option>")] +pub struct Get(pub Vec); +impl Get { + pub fn new(key: K) -> Self { + Self(key.into_key()) + } + + pub fn key(&self) -> &Vec { + &self.0 + } +} + +#[derive(Message, Clone, Debug, PartialEq, Eq, Hash)] +#[rtype(result = "()")] +pub struct Remove(pub Vec); +impl Remove { + pub fn new(key: K) -> Self { + Self(key.into_key()) + } + + pub fn key(&self) -> &Vec { + &self.0 + } +} diff --git a/crates/data/src/in_mem.rs b/crates/data/src/in_mem.rs index e8ad52b3c8..ed0f52f60f 100644 --- a/crates/data/src/in_mem.rs +++ b/crates/data/src/in_mem.rs @@ -4,7 +4,7 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. -use crate::{Get, Insert, InsertSync, Remove}; +use crate::{Get, Insert, InsertBatch, InsertSync, Remove}; use actix::{Actor, Handler, Message}; use anyhow::{Context, Result}; use std::collections::BTreeMap; @@ -55,6 +55,10 @@ impl InMemStore { } } +// Add a BatchInsert event that contains multiple Insert messages +// Use the Responder pattern to manage the response +// Have a proxy actor hold the Inserts until the BatchInsert event is called + impl Handler for InMemStore { type Result = (); fn handle(&mut self, event: Insert, _: &mut Self::Context) { @@ -67,6 +71,18 @@ impl Handler for InMemStore { } } +impl Handler for InMemStore { + type Result = (); + fn handle(&mut self, msg: InsertBatch, _: &mut Self::Context) -> Self::Result { + for cmd in msg.commands() { + self.db.insert(cmd.key().to_owned(), cmd.value().to_owned()); + if self.capture { + self.log.push(DataOp::Insert(cmd.clone())); + } + } + } +} + impl Handler for InMemStore { type Result = Result<()>; @@ -95,7 +111,8 @@ impl Handler for InMemStore { type Result = Option>; fn handle(&mut self, event: Get, _: &mut Self::Context) -> Option> { let key = event.key(); - self.db.get(key).cloned() + let r = self.db.get(key); + r.cloned() } } diff --git a/crates/data/src/in_mem_event_log.rs b/crates/data/src/in_mem_event_log.rs new file mode 100644 index 0000000000..9b95398919 --- /dev/null +++ b/crates/data/src/in_mem_event_log.rs @@ -0,0 +1,113 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use anyhow::Result; +use e3_events::{EnclaveEvent, EventLog, Unsequenced}; + +pub struct InMemEventLog { + log: Vec>, +} + +impl InMemEventLog { + pub fn new() -> Self { + Self { log: Vec::new() } + } +} + +impl Default for InMemEventLog { + fn default() -> Self { + Self::new() + } +} + +impl EventLog for InMemEventLog { + fn read_from(&self, from: u64) -> Box)>> { + // Convert 1-indexed sequence to 0-indexed array position + let start_idx = from.saturating_sub(1) as usize; + + let events: Vec<_> = self + .log + .iter() + .skip(start_idx) + .enumerate() + .map(|(i, event)| (from + i as u64, event.clone())) + .collect(); + Box::new(events.into_iter()) + } + fn append(&mut self, event: &EnclaveEvent) -> Result { + self.log.push(event.to_owned()); + Ok(self.log.len() as u64) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use e3_events::{EnclaveEventData, EventConstructorWithTimestamp, TestEvent}; + + fn event_from(data: impl Into) -> EnclaveEvent { + EnclaveEvent::::new_with_timestamp(data.into().into(), 123) + } + + #[test] + fn test_append_and_read() { + let mut log = InMemEventLog::new(); + + let event1 = event_from(TestEvent::new("one", 1)); + let event2 = event_from(TestEvent::new("two", 2)); + + let offset1 = log.append(&event1).unwrap(); + let offset2 = log.append(&event2).unwrap(); + + assert_eq!(offset1, 1); // 1-indexed + assert_eq!(offset2, 2); + + // Read back from the beginning + let events: Vec<_> = log.read_from(1).collect(); + assert_eq!(events.len(), 2); + assert_eq!(events[0].0, 1); + assert_eq!(events[1].0, 2); + } + + #[test] + fn test_read_from_offset() { + let mut log = InMemEventLog::new(); + + let event1 = event_from(TestEvent::new("one", 1)); + let event2 = event_from(TestEvent::new("two", 2)); + let event3 = event_from(TestEvent::new("three", 3)); + + log.append(&event1).unwrap(); + log.append(&event2).unwrap(); + log.append(&event3).unwrap(); + + // Read from offset 2 (should get events 2 and 3) + let events: Vec<_> = log.read_from(2).collect(); + assert_eq!(events.len(), 2); + assert_eq!(events[0].0, 2); + assert_eq!(events[1].0, 3); + } + + #[test] + fn test_read_empty_log() { + let log = InMemEventLog::new(); + + let events: Vec<_> = log.read_from(1).collect(); + assert!(events.is_empty()); + } + + #[test] + fn test_read_past_end() { + let mut log = InMemEventLog::new(); + + let event = event_from(TestEvent::new("one", 1)); + log.append(&event).unwrap(); + + // Read from offset beyond what exists + let events: Vec<_> = log.read_from(100).collect(); + assert!(events.is_empty()); + } +} diff --git a/crates/data/src/in_mem_sequence_index.rs b/crates/data/src/in_mem_sequence_index.rs new file mode 100644 index 0000000000..675f18070d --- /dev/null +++ b/crates/data/src/in_mem_sequence_index.rs @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use anyhow::Result; +use e3_events::SequenceIndex; +use std::collections::BTreeMap; + +pub struct InMemSequenceIndex { + index: BTreeMap, +} + +impl InMemSequenceIndex { + pub fn new() -> Self { + Self { + index: BTreeMap::new(), + } + } +} + +impl SequenceIndex for InMemSequenceIndex { + fn seek(&self, key: u128) -> Result> { + Ok(self.index.range(key..).next().map(|(_, &v)| v)) + } + + fn insert(&mut self, key: u128, value: u64) -> Result<()> { + self.index.insert(key, value); + Ok(()) + } + + fn get(&self, key: u128) -> Result> { + Ok(self.index.get(&key).copied()) + } +} + +#[cfg(test)] +mod tests { + use crate::InMemSequenceIndex; + use e3_events::SequenceIndex; + + #[test] + fn seek_finds_nearest_key_at_or_after_target() { + let mut index = InMemSequenceIndex::new(); + index.insert(100, 1).unwrap(); + index.insert(200, 2).unwrap(); + index.insert(300, 3).unwrap(); + + assert_eq!(index.seek(50).unwrap(), Some(1)); + + // Exact matches + assert_eq!(index.seek(100).unwrap(), Some(1)); + assert_eq!(index.seek(200).unwrap(), Some(2)); + assert_eq!(index.seek(300).unwrap(), Some(3)); + + // Between keys (returns next) + assert_eq!(index.seek(150).unwrap(), Some(2)); + assert_eq!(index.seek(250).unwrap(), Some(3)); + + assert_eq!(index.seek(999).unwrap(), None); + } +} diff --git a/crates/data/src/into_key.rs b/crates/data/src/into_key.rs index 4fdff60b09..7c1c5e1213 100644 --- a/crates/data/src/into_key.rs +++ b/crates/data/src/into_key.rs @@ -57,3 +57,11 @@ impl<'a> IntoKey for &'a str { self.as_bytes().to_vec() } } + +/// Keys can be u128 +impl IntoKey for u128 { + fn into_key(self) -> Vec { + // Ensuring big endian for ordering + self.to_be_bytes().to_vec() + } +} diff --git a/crates/data/src/lib.rs b/crates/data/src/lib.rs index 8cbab7aa47..83827a3aa4 100644 --- a/crates/data/src/lib.rs +++ b/crates/data/src/lib.rs @@ -4,20 +4,35 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. +mod commit_log_event_log; mod data_store; +mod events; mod in_mem; +mod in_mem_event_log; +mod in_mem_sequence_index; mod into_key; mod persistable; mod repositories; mod repository; +mod sled_db; +mod sled_sequence_index; mod sled_store; +mod sled_utils; mod snapshot; +mod write_buffer; +pub use commit_log_event_log::*; pub use data_store::*; +pub use events::*; pub use in_mem::*; +pub use in_mem_event_log::*; +pub use in_mem_sequence_index::*; pub use into_key::IntoKey; pub use persistable::*; pub use repositories::*; pub use repository::*; +pub use sled_db::*; +pub use sled_sequence_index::*; pub use sled_store::*; pub use snapshot::*; +pub use write_buffer::*; diff --git a/crates/data/src/sled_db.rs b/crates/data/src/sled_db.rs new file mode 100644 index 0000000000..641ea39797 --- /dev/null +++ b/crates/data/src/sled_db.rs @@ -0,0 +1,182 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use anyhow::{Context, Result}; +use sled::{transaction::ConflictableTransactionError, Tree}; +use std::path::PathBuf; + +use crate::{ + sled_utils::{clear_all_caches, get_or_open_db_tree}, + Get, Insert, Remove, +}; + +pub struct SledDb { + db: Tree, +} + +impl SledDb { + pub fn new(path: &PathBuf, tree: &str) -> Result { + let db = get_or_open_db_tree(path, tree)?; + Ok(Self { db }) + } + + pub fn close_all_connections() { + clear_all_caches() + } + + pub fn insert(&mut self, msg: Insert) -> Result<()> { + self.db + .insert(msg.key(), msg.value().to_vec()) + .context("Could not insert data into db")?; + + Ok(()) + } + + pub fn insert_batch(&mut self, msgs: &Vec) -> Result<()> { + self.db + .transaction(|tx_db| { + for msg in msgs { + tx_db.insert(msg.key().as_slice(), msg.value().to_vec())?; + } + Ok::<(), ConflictableTransactionError>(()) + }) + .context("Could not insert batch data into db")?; + Ok(()) + } + + pub fn remove(&mut self, msg: Remove) -> Result<()> { + self.db + .remove(msg.key()) + .context("Could not remove data from db")?; + Ok(()) + } + + pub fn get(&self, event: Get) -> Result>> { + let key = event.key(); + let str_key = String::from_utf8_lossy(&key).into_owned(); + let res = self + .db + .get(key) + .context(format!("Failed to fetch {}", str_key))?; + + Ok(res.map(|v| v.to_vec())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_sled_db_caching() -> Result<()> { + use tempfile::tempdir; + + // Section 1: Test basic cache functionality + let temp_dir = tempdir().expect("Failed to create temporary directory"); + let db_path = temp_dir.path().join("test_cache.db"); + + // Create first instance and insert data + let mut db1 = SledDb::new(&db_path, "datastore")?; + db1.insert(Insert::new(b"test_key".to_vec(), b"test_value".to_vec()))?; + + // Create second instance to same path and verify data access + let mut db2 = SledDb::new(&db_path, "datastore")?; + let result = db2.get(Get::new(b"test_key".to_vec()))?; + assert_eq!( + result.unwrap(), + b"test_value".to_vec(), + "Values from db2 should match" + ); + + // Cross-modify and verify (db1 writes, db2 reads) + db1.insert(Insert::new(b"key2".to_vec(), b"value2".to_vec()))?; + assert_eq!( + db2.get(Get::new(b"key2".to_vec()))?.unwrap(), + b"value2".to_vec(), + "db2 should see changes from db1" + ); + + // Section 2: Test cross-instance operations (db2 writes, db1 reads) + db2.insert(Insert::new(b"key3".to_vec(), b"value3".to_vec()))?; + assert_eq!( + db1.get(Get::new(b"key3".to_vec()))?.unwrap(), + b"value3".to_vec(), + "db1 should see changes from db2" + ); + + // Section 3: Test cache with different path + let second_path = temp_dir.path().join("different_cache.db"); + let mut db3 = SledDb::new(&second_path, "datastore")?; + db3.insert(Insert::new(b"db3_key".to_vec(), b"db3_value".to_vec()))?; + + // Create another instance to the second path + let db4 = SledDb::new(&second_path, "datastore")?; + assert_eq!( + db4.get(Get::new(b"db3_key".to_vec()))?.unwrap(), + b"db3_value".to_vec(), + "db4 should see db3's data" + ); + + // Verify first path data isn't in second path + assert!( + db4.get(Get::new(b"test_key".to_vec()))?.is_none(), + "db4 should not see data from db1/db2" + ); + + // Verify second path data isn't in first path + assert!( + db1.get(Get::new(b"db3_key".to_vec()))?.is_none(), + "db1 should not see data from db3/db4" + ); + + Ok(()) + } + + #[test] + fn test_sled_db_batch_insert() -> Result<()> { + use tempfile::tempdir; + + let temp_dir = tempdir().expect("Failed to create temporary directory"); + let db_path = temp_dir.path().join("test_batch.db"); + + let mut db = SledDb::new(&db_path, "datastore")?; + + // Create a batch of inserts + let batch = vec![ + Insert::new(b"batch_key1".to_vec(), b"batch_value1".to_vec()), + Insert::new(b"batch_key2".to_vec(), b"batch_value2".to_vec()), + Insert::new(b"batch_key3".to_vec(), b"batch_value3".to_vec()), + ]; + + // Insert the batch + db.insert_batch(&batch)?; + + // Verify all items were inserted + assert_eq!( + db.get(Get::new(b"batch_key1".to_vec()))?.unwrap(), + b"batch_value1".to_vec(), + "First batch item should be retrievable" + ); + assert_eq!( + db.get(Get::new(b"batch_key2".to_vec()))?.unwrap(), + b"batch_value2".to_vec(), + "Second batch item should be retrievable" + ); + assert_eq!( + db.get(Get::new(b"batch_key3".to_vec()))?.unwrap(), + b"batch_value3".to_vec(), + "Third batch item should be retrievable" + ); + + // Verify non-existent key returns None + assert!( + db.get(Get::new(b"nonexistent".to_vec()))?.is_none(), + "Non-existent key should return None" + ); + + Ok(()) + } +} diff --git a/crates/data/src/sled_sequence_index.rs b/crates/data/src/sled_sequence_index.rs new file mode 100644 index 0000000000..4922fb4600 --- /dev/null +++ b/crates/data/src/sled_sequence_index.rs @@ -0,0 +1,89 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use std::path::PathBuf; + +use anyhow::{Context, Result}; +use e3_events::SequenceIndex; +use sled::Tree; + +use crate::sled_utils::{clear_all_caches, get_or_open_db_tree}; + +pub struct SledSequenceIndex { + db: Tree, +} + +impl SledSequenceIndex { + pub fn new(path: &PathBuf, tree: &str) -> Result { + let db = get_or_open_db_tree(path, tree)?; + Ok(Self { db }) + } + + pub fn close_all_connections() { + clear_all_caches() + } +} + +impl SequenceIndex for SledSequenceIndex { + fn get(&self, key: u128) -> Result> { + self.db + .get(key.to_be_bytes().to_vec()) + .context(format!("Failed to fetch timestamp: {}", key))? + .map(|v| Ok(u64::from_be_bytes(v.as_ref().try_into()?))) + .transpose() + } + + fn insert(&mut self, key: u128, value: u64) -> Result<()> { + self.db + .insert(key.to_be_bytes().to_vec(), value.to_be_bytes().to_vec()) + .context(format!("Failed to insert key: {}", key))?; + Ok(()) + } + + fn seek(&self, key: u128) -> Result> { + let key_bytes = key.to_be_bytes(); + self.db + .range(key_bytes..) + .next() + .transpose() + .context(format!("Failed to seek: {}", key))? + .map(|(_, v)| Ok(u64::from_be_bytes(v.as_ref().try_into()?))) + .transpose() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[test] + fn seek_finds_nearest_key_at_or_after_target() { + let dir = tempdir().unwrap(); + let path = dir.path().to_path_buf(); + + let mut index = SledSequenceIndex::new(&path, "test_tree").unwrap(); + + index.insert(100, 1).unwrap(); + index.insert(200, 2).unwrap(); + index.insert(300, 3).unwrap(); + + // Before all keys (returns first) + assert_eq!(index.seek(50).unwrap(), Some(1)); + + // Exact matches + assert_eq!(index.seek(100).unwrap(), Some(1)); + assert_eq!(index.seek(200).unwrap(), Some(2)); + assert_eq!(index.seek(300).unwrap(), Some(3)); + + // Between keys (returns next) + assert_eq!(index.seek(150).unwrap(), Some(2)); + assert_eq!(index.seek(250).unwrap(), Some(3)); + + // After all keys + assert_eq!(index.seek(999).unwrap(), None); + } +} diff --git a/crates/data/src/sled_store.rs b/crates/data/src/sled_store.rs index 185b30f4f5..2deaab949c 100644 --- a/crates/data/src/sled_store.rs +++ b/crates/data/src/sled_store.rs @@ -4,17 +4,11 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. -use crate::{Get, Insert, InsertSync, Remove}; +use crate::{Get, Insert, InsertBatch, InsertSync, Remove, SledDb}; use actix::{Actor, ActorContext, Addr, Handler}; -use anyhow::{Context, Result}; +use anyhow::Result; use e3_events::{prelude::*, BusHandle, EType, EnclaveEvent, EnclaveEventData}; -use once_cell::sync::Lazy; -use sled::Db; -use std::{ - collections::HashMap, - path::PathBuf, - sync::{Arc, Mutex}, -}; +use std::path::PathBuf; use tracing::{error, info}; pub struct SledStore { @@ -29,7 +23,7 @@ impl Actor for SledStore { impl SledStore { pub fn new(bus: &BusHandle, path: &PathBuf) -> Result> { info!("Starting SledStore with {:?}", path); - let db = SledDb::new(PathBuf::from(path))?; + let db = SledDb::new(path, "datastore")?; let store = Self { db: Some(db), @@ -56,6 +50,19 @@ impl Handler for SledStore { } } +impl Handler for SledStore { + type Result = (); + + fn handle(&mut self, event: InsertBatch, _: &mut Self::Context) -> Self::Result { + if let Some(ref mut db) = &mut self.db { + match db.insert_batch(event.commands()) { + Err(err) => self.bus.err(EType::Data, err), + _ => (), + } + } + } +} + impl Handler for SledStore { type Result = Result<()>; @@ -108,167 +115,3 @@ impl Handler for SledStore { } } } - -pub struct SledDb { - db: Db, -} - -// Global static cache -pub static SLED_CACHE: Lazy>>> = - Lazy::new(|| Arc::new(Mutex::new(HashMap::new()))); - -// Returns a stable canonical string path used as a cache key. -// Canonicalizes the parent directory if the target path does not yet exist. -fn canonical_key(path: &PathBuf) -> String { - use std::path::{Path, PathBuf}; - if path.exists() { - return path - .canonicalize() - .unwrap_or_else(|_| path.clone()) - .to_string_lossy() - .into_owned(); - } - let parent = path.parent().unwrap_or_else(|| Path::new(".")); - let base: PathBuf = parent - .canonicalize() - .unwrap_or_else(|_| parent.to_path_buf()); - let tail = path.file_name().map(|s| s.to_owned()).unwrap_or_default(); - base.join(tail).to_string_lossy().into_owned() -} - -// Opens or retrieves a cached sled database for the given path. -// Prevents conflicts by ensuring only a single connection was open to a db file at once per process. -// Ensures the directory exists and stabilizes the canonical key across OSes. -fn get_or_open_db(path: &PathBuf) -> Result { - let _ = std::fs::create_dir_all(path); - let key = canonical_key(path); - let mut cache = SLED_CACHE.lock().unwrap(); - if let Some(db) = cache.get(&key) { - return Ok(db.clone()); - } - let db = sled::open(path)?; - cache.insert(key, db.clone()); - Ok(db) -} - -fn clear_all_caches() { - let mut cache_lock = SLED_CACHE.lock().unwrap(); - cache_lock.clear(); -} - -impl SledDb { - pub fn new(path: PathBuf) -> Result { - let db = get_or_open_db(&path).with_context(|| { - format!( - "Could not open database at path '{}'", - path.to_string_lossy() - ) - })?; - if !db.was_recovered() { - info!("created db at: {:?}", &path); - } else { - info!("recovered db st: {:?}", &path); - } - Ok(Self { db }) - } - - pub fn insert(&mut self, msg: Insert) -> Result<()> { - self.db - .insert(msg.key(), msg.value().to_vec()) - .context("Could not insert data into db")?; - - Ok(()) - } - - pub fn remove(&mut self, msg: Remove) -> Result<()> { - self.db - .remove(msg.key()) - .context("Could not remove data from db")?; - Ok(()) - } - - pub fn get(&mut self, event: Get) -> Result>> { - let key = event.key(); - let str_key = String::from_utf8_lossy(&key).into_owned(); - let res = self - .db - .get(key) - .context(format!("Failed to fetch {}", str_key))?; - - Ok(res.map(|v| v.to_vec())) - } - - pub fn close_all_connections() { - clear_all_caches() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_sled_db_caching() -> Result<()> { - use tempfile::tempdir; - - // Section 1: Test basic cache functionality - let temp_dir = tempdir().expect("Failed to create temporary directory"); - let db_path = temp_dir.path().join("test_cache.db"); - - // Create first instance and insert data - let mut db1 = SledDb::new(db_path.clone())?; - db1.insert(Insert::new(b"test_key".to_vec(), b"test_value".to_vec()))?; - - // Create second instance to same path and verify data access - let mut db2 = SledDb::new(db_path.clone())?; - let result = db2.get(Get::new(b"test_key".to_vec()))?; - assert_eq!( - result.unwrap(), - b"test_value".to_vec(), - "Values from db2 should match" - ); - - // Cross-modify and verify (db1 writes, db2 reads) - db1.insert(Insert::new(b"key2".to_vec(), b"value2".to_vec()))?; - assert_eq!( - db2.get(Get::new(b"key2".to_vec()))?.unwrap(), - b"value2".to_vec(), - "db2 should see changes from db1" - ); - - // Section 2: Test cross-instance operations (db2 writes, db1 reads) - db2.insert(Insert::new(b"key3".to_vec(), b"value3".to_vec()))?; - assert_eq!( - db1.get(Get::new(b"key3".to_vec()))?.unwrap(), - b"value3".to_vec(), - "db1 should see changes from db2" - ); - - // Section 3: Test cache with different path - let second_path = temp_dir.path().join("different_cache.db"); - let mut db3 = SledDb::new(second_path.clone())?; - db3.insert(Insert::new(b"db3_key".to_vec(), b"db3_value".to_vec()))?; - - // Create another instance to the second path - let mut db4 = SledDb::new(second_path)?; - assert_eq!( - db4.get(Get::new(b"db3_key".to_vec()))?.unwrap(), - b"db3_value".to_vec(), - "db4 should see db3's data" - ); - - // Verify first path data isn't in second path - assert!( - db4.get(Get::new(b"test_key".to_vec()))?.is_none(), - "db4 should not see data from db1/db2" - ); - - // Verify second path data isn't in first path - assert!( - db1.get(Get::new(b"db3_key".to_vec()))?.is_none(), - "db1 should not see data from db3/db4" - ); - - Ok(()) - } -} diff --git a/crates/data/src/sled_utils.rs b/crates/data/src/sled_utils.rs new file mode 100644 index 0000000000..a894d2c0c9 --- /dev/null +++ b/crates/data/src/sled_utils.rs @@ -0,0 +1,73 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use anyhow::{Context, Result}; +use once_cell::sync::Lazy; +use sled::{Db, Tree}; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::{Arc, Mutex}, +}; +use tracing::info; + +// Global static cache +pub static SLED_CACHE: Lazy>>> = + Lazy::new(|| Arc::new(Mutex::new(HashMap::new()))); + +// Returns a stable canonical string path used as a cache key. +// Canonicalizes the parent directory if the target path does not yet exist. +fn canonical_key(path: &PathBuf) -> String { + if path.exists() { + return path + .canonicalize() + .unwrap_or_else(|_| path.clone()) + .to_string_lossy() + .into_owned(); + } + let parent = path.parent().unwrap_or_else(|| Path::new(".")); + let base: PathBuf = parent + .canonicalize() + .unwrap_or_else(|_| parent.to_path_buf()); + let tail = path.file_name().map(|s| s.to_owned()).unwrap_or_default(); + base.join(tail).to_string_lossy().into_owned() +} + +// Opens or retrieves a cached sled database for the given path. +// Prevents conflicts by ensuring only a single connection was open to a db file at once per process. +// Ensures the directory exists and stabilizes the canonical key across OSes. +fn get_or_open_db(path: &PathBuf) -> Result { + let _ = std::fs::create_dir_all(path); + let key = canonical_key(path); + let mut cache = SLED_CACHE.lock().unwrap(); + if let Some(db) = cache.get(&key) { + return Ok(db.clone()); + } + let db = sled::open(path).with_context(|| { + format!( + "Could not open database at path '{}'", + path.to_string_lossy() + ) + })?; + cache.insert(key, db.clone()); + if !db.was_recovered() { + info!("created db at: {:?}", &path); + } else { + info!("recovered db st: {:?}", &path); + } + + Ok(db) +} + +pub fn get_or_open_db_tree(path: &PathBuf, tree: &str) -> Result { + let db = get_or_open_db(path)?; + Ok(db.open_tree(tree)?) +} + +pub fn clear_all_caches() { + let mut cache_lock = SLED_CACHE.lock().unwrap(); + cache_lock.clear(); +} diff --git a/crates/data/src/write_buffer.rs b/crates/data/src/write_buffer.rs new file mode 100644 index 0000000000..c3d39afdb0 --- /dev/null +++ b/crates/data/src/write_buffer.rs @@ -0,0 +1,72 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use actix::{Actor, Handler, Message, Recipient}; +use e3_events::CommitSnapshot; + +use crate::{Insert, InsertBatch}; + +pub struct WriteBuffer { + dest: Option>, + buffer: Vec, +} + +impl Actor for WriteBuffer { + type Context = actix::Context; +} + +impl WriteBuffer { + pub fn new() -> Self { + Self { + dest: None, + buffer: Vec::new(), + } + } +} + +impl Handler for WriteBuffer { + type Result = (); + fn handle(&mut self, msg: ForwardTo, _: &mut Self::Context) -> Self::Result { + self.dest = Some(msg.dest()) + } +} + +impl Handler for WriteBuffer { + type Result = (); + + fn handle(&mut self, msg: Insert, _: &mut Self::Context) -> Self::Result { + self.buffer.push(msg); + } +} + +impl Handler for WriteBuffer { + type Result = (); + + fn handle(&mut self, msg: CommitSnapshot, _: &mut Self::Context) -> Self::Result { + if let Some(ref dest) = self.dest { + if !self.buffer.is_empty() { + let mut inserts = std::mem::take(&mut self.buffer); + inserts.push(Insert::new("//seq", msg.seq().to_be_bytes().to_vec())); + let batch = InsertBatch::new(inserts); + dest.do_send(batch); + } + } + } +} + +#[derive(Message)] +#[rtype("()")] +pub struct ForwardTo(Recipient); + +impl ForwardTo { + pub fn new(dest: impl Into>) -> Self { + Self(dest.into()) + } + + pub fn dest(self) -> Recipient { + self.0 + } +} diff --git a/crates/entrypoint/src/helpers/datastore.rs b/crates/entrypoint/src/helpers/datastore.rs index b186c64cf0..692ec690df 100644 --- a/crates/entrypoint/src/helpers/datastore.rs +++ b/crates/entrypoint/src/helpers/datastore.rs @@ -8,10 +8,11 @@ use std::path::PathBuf; use actix::Actor; use anyhow::Result; +use e3_ciphernode_builder::get_enclave_bus_handle; use e3_config::AppConfig; use e3_data::{DataStore, InMemStore, SledDb, SledStore}; use e3_data::{Repositories, RepositoriesFactory}; -use e3_events::{get_enclave_bus_handle, BusHandle}; +use e3_events::BusHandle; pub fn get_sled_store(bus: &BusHandle, db_file: &PathBuf) -> Result { Ok((&SledStore::new(bus, db_file)?).into()) @@ -31,7 +32,7 @@ pub fn setup_datastore(config: &AppConfig, bus: &BusHandle) -> Result } pub fn get_repositories(config: &AppConfig) -> Result { - let bus = get_enclave_bus_handle(); + let bus = get_enclave_bus_handle(config)?; let store = setup_datastore(config, &bus)?; Ok(store.repositories()) } diff --git a/crates/entrypoint/src/start/aggregator_start.rs b/crates/entrypoint/src/start/aggregator_start.rs index dfbf9beeb1..602863c46e 100644 --- a/crates/entrypoint/src/start/aggregator_start.rs +++ b/crates/entrypoint/src/start/aggregator_start.rs @@ -5,11 +5,11 @@ // or FITNESS FOR A PARTICULAR PURPOSE. use anyhow::Result; -use e3_ciphernode_builder::CiphernodeBuilder; +use e3_ciphernode_builder::{get_enclave_bus_handle, get_enclave_event_bus, CiphernodeBuilder}; use e3_config::AppConfig; use e3_crypto::Cipher; use e3_data::RepositoriesFactory; -use e3_events::{get_enclave_bus_handle, BusHandle}; +use e3_events::BusHandle; use e3_net::{NetEventTranslator, NetRepositoryFactory}; use e3_test_helpers::{PlaintextWriter, PublicKeyWriter}; use rand::SeedableRng; @@ -20,23 +20,16 @@ use std::{ }; use tokio::task::JoinHandle; -use crate::helpers::datastore::setup_datastore; - pub async fn execute( config: &AppConfig, pubkey_write_path: Option, plaintext_write_path: Option, experimental_trbfv: bool, ) -> Result<(BusHandle, JoinHandle>, String)> { - let bus = get_enclave_bus_handle(); let rng = Arc::new(Mutex::new(ChaCha20Rng::from_rng(OsRng)?)); - let store = setup_datastore(config, &bus)?; - let repositories = store.repositories(); let cipher = Arc::new(Cipher::from_file(config.key_file()).await?); - - let mut builder = CiphernodeBuilder::new(rng.clone(), cipher.clone()) - .with_source_bus(bus.consumer()) - .with_datastore(store) + let mut builder = CiphernodeBuilder::new(&config.name(), rng.clone(), cipher.clone()) + .with_persistence(&config.log_file(), &config.db_file()) .with_chains(&config.chains()) .with_sortition_score() .with_contract_enclave_full() @@ -50,7 +43,12 @@ pub async fn execute( } else { builder = builder.with_plaintext_aggregation() } - builder.build().await?; + + // TODO: put net package provisioning in the ciphernode-builder: + let node = builder.build().await?; + let store = node.store(); + let repositories = store.repositories(); + let bus = node.bus.clone(); let (_, _, join_handle, peer_id) = NetEventTranslator::setup_with_interface( bus.clone(), config.peers(), diff --git a/crates/entrypoint/src/start/start.rs b/crates/entrypoint/src/start/start.rs index 87bd527969..16a19870d9 100644 --- a/crates/entrypoint/src/start/start.rs +++ b/crates/entrypoint/src/start/start.rs @@ -6,11 +6,10 @@ use alloy::primitives::Address; use anyhow::Result; -use e3_ciphernode_builder::CiphernodeBuilder; +use e3_ciphernode_builder::{get_enclave_bus_handle, get_enclave_event_bus, CiphernodeBuilder}; use e3_config::AppConfig; use e3_crypto::Cipher; use e3_data::RepositoriesFactory; -use e3_events::get_enclave_bus_handle; use e3_events::BusHandle; use e3_net::{NetEventTranslator, NetRepositoryFactory}; use rand::SeedableRng; @@ -19,8 +18,6 @@ use std::sync::{Arc, Mutex}; use tokio::task::JoinHandle; use tracing::instrument; -use crate::helpers::datastore::setup_datastore; - #[instrument(name = "app", skip_all)] pub async fn execute( config: &AppConfig, @@ -28,16 +25,10 @@ pub async fn execute( experimental_trbfv: bool, ) -> Result<(BusHandle, JoinHandle>, String)> { let rng = Arc::new(Mutex::new(rand_chacha::ChaCha20Rng::from_rng(OsRng)?)); - - let bus = get_enclave_bus_handle(); let cipher = Arc::new(Cipher::from_file(&config.key_file()).await?); - let store = setup_datastore(&config, &bus)?; - let repositories = store.repositories(); - - let mut builder = CiphernodeBuilder::new(rng.clone(), cipher.clone()) + let mut builder = CiphernodeBuilder::new(&config.name(), rng.clone(), cipher.clone()) .with_address(&address.to_string()) - .with_source_bus(bus.consumer()) - .with_datastore(store) + .with_persistence(&config.log_file(), &config.db_file()) .with_sortition_score() .with_chains(&config.chains()) .with_contract_enclave_reader() @@ -50,7 +41,10 @@ pub async fn execute( } else { builder = builder.with_keyshare(); } - builder.build().await?; + + let node = builder.build().await?; + let repositories = node.store().repositories(); + let bus = node.bus.clone(); let (_, _, join_handle, peer_id) = NetEventTranslator::setup_with_interface( bus.clone(), config.peers(), diff --git a/crates/events/Cargo.toml b/crates/events/Cargo.toml index 0eb2104289..a37cb25c54 100644 --- a/crates/events/Cargo.toml +++ b/crates/events/Cargo.toml @@ -37,4 +37,5 @@ test-helpers = [] # ensure test-helpers is available for integration tests [dev-dependencies] proptest = { workspace = true } e3-events = { workspace = true, features = ["test-helpers"] } - +e3-data = { workspace = true } +e3-ciphernode-builder = { workspace = true } diff --git a/crates/events/src/bus_handle.rs b/crates/events/src/bus_handle.rs index 44cacd6c9f..ece22e36c1 100644 --- a/crates/events/src/bus_handle.rs +++ b/crates/events/src/bus_handle.rs @@ -6,7 +6,7 @@ use std::sync::Arc; -use actix::{Actor, Addr, Recipient}; +use actix::{Actor, Addr, Handler, Recipient}; use anyhow::Result; use derivative::Derivative; use tracing::error; @@ -23,21 +23,19 @@ use crate::{ }; #[derive(Clone, Derivative)] -#[derivative(Debug)] +#[derivative(Debug, PartialEq, Eq)] pub struct BusHandle { + /// EventBus that actors can consume sequenced events from consumer: Addr>>, + /// Sequencer that new events should be produced from producer: Addr, + /// Hlc clock used to time all events created on this BusHandle #[derivative(Debug = "ignore")] hlc: Arc, } impl BusHandle { - pub fn new_from_consumer(consumer: Addr>>) -> Self { - let producer = Sequencer::new(&consumer).start(); - let hlc = Hlc::default(); - Self::new(consumer, producer, hlc) - } - + /// Create a new BusHandle pub fn new( consumer: Addr>>, producer: Addr, @@ -50,17 +48,35 @@ impl BusHandle { } } + /// Return a HistoryCollector for examining events that have passed through on the events bus pub fn history(&self) -> Addr>> { EventBus::>::history(&self.consumer) } + /// Access the producer to internally dispatch am event to pub fn producer(&self) -> &Addr { &self.producer } + /// Access the consumer to internally subscribe to events pub fn consumer(&self) -> &Addr>> { &self.consumer } + + /// Get a new timestamp. Note this ticks over the internal Hlc. + pub fn ts(&self) -> Result { + let ts = self.hlc.tick()?; + Ok(ts.into()) + } + + /// Pipe events from this handle to the other handle only when the predicate returns true + pub fn pipe_to(&self, other: &BusHandle, predicate: F) + where + F: Fn(&EnclaveEvent) -> bool + Unpin + 'static, + { + let pipe = BusHandlePipe::new(other.to_owned(), predicate).start(); + self.subscribe("*", pipe.into()); + } } impl EventPublisher> for BusHandle { @@ -136,28 +152,17 @@ impl EventSubscriber> for BusHandle { } } -impl Into for Addr> { - fn into(self) -> BusHandle { - BusHandle::new_from_consumer(self) - } -} - -impl Into for &Addr> { - fn into(self) -> BusHandle { - BusHandle::new_from_consumer(self.clone()) - } -} - #[cfg(test)] mod tests { - use std::time::{Duration, SystemTime, UNIX_EPOCH}; - - use crate::{ - hlc::Hlc, prelude::*, sequencer::Sequencer, BusHandle, EnclaveEvent, EnclaveEventData, - EventBus, TestEvent, - }; use actix::{Actor, Handler, Message}; + use e3_ciphernode_builder::EventSystem; + // NOTE: We cannot pull from crate as the features will be missing as they are not default. + use e3_events::{ + hlc::Hlc, prelude::*, BusHandle, EnclaveEvent, EnclaveEventData, EventPublisher, TestEvent, + }; + use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::time::sleep; + fn now_micros() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) @@ -212,20 +217,18 @@ mod tests { // 1. setup up two separate busses with out of sync clocks A and B. B should be 30 seconds // faster than A. - let consumer_a = EventBus::::default().start(); - let producer_a = Sequencer::new(&consumer_a).start(); - let clock_a = Hlc::new(1).with_clock(move || now_micros().saturating_sub(30_000_000)); // Late - let bus_a = BusHandle::new(consumer_a, producer_a, clock_a); - - let consumer_b = EventBus::::default().start(); - let producer_b = Sequencer::new(&consumer_b).start(); - let clock_b = Hlc::new(2); // in sync - let bus_b = BusHandle::new(consumer_b, producer_b, clock_b); - - let consumer_c = EventBus::::default().start(); - let producer_c = Sequencer::new(&consumer_c).start(); - let clock_c = Hlc::new(3); // in sync - let bus_c = BusHandle::new(consumer_c, producer_c, clock_c); + let bus_a = EventSystem::new("a") + .with_fresh_bus() + .with_hlc(Hlc::new(1).with_clock(move || now_micros().saturating_sub(30_000_000))) // Late + .handle()?; + let bus_b = EventSystem::new("b") + .with_fresh_bus() + .with_hlc(Hlc::new(2)) + .handle()?; + let bus_c = EventSystem::new("c") + .with_fresh_bus() + .with_hlc(Hlc::new(3)) + .handle()?; let forwarder = Forwarder { dest: bus_c.clone(), @@ -295,3 +298,43 @@ mod tests { Ok(()) } } + +/// Actor for piping between BusHandles. +pub struct BusHandlePipe +where + F: Fn(&EnclaveEvent) -> bool + Unpin + 'static, +{ + handle: BusHandle, + predicate: F, +} + +impl BusHandlePipe +where + F: Fn(&EnclaveEvent) -> bool + Unpin + 'static, +{ + /// Create a new BusHandlePipe only forwarding events to the wrapped handle when the predicate + /// function returns true + pub fn new(handle: BusHandle, predicate: F) -> Self { + Self { handle, predicate } + } +} + +impl Actor for BusHandlePipe +where + F: Fn(&EnclaveEvent) -> bool + Unpin + 'static, +{ + type Context = actix::Context; +} + +impl Handler> for BusHandlePipe +where + F: Fn(&EnclaveEvent) -> bool + Unpin + 'static, +{ + type Result = (); + fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result { + if (self.predicate)(&msg) { + let (data, ts) = msg.split(); + let _ = self.handle.publish_from_remote(data, ts); + } + } +} diff --git a/crates/events/src/enclave_event/mod.rs b/crates/events/src/enclave_event/mod.rs index c29cc9a50e..0b1ac02a11 100644 --- a/crates/events/src/enclave_event/mod.rs +++ b/crates/events/src/enclave_event/mod.rs @@ -49,6 +49,7 @@ pub use decryptionshare_created::*; pub use die::*; pub use e3_request_complete::*; pub use e3_requested::*; +use e3_utils::{colorize, Color}; pub use enclave_error::*; pub use encryption_key_collection_failed::*; pub use encryption_key_created::*; @@ -358,7 +359,8 @@ impl TryFrom> for EnclaveError { impl fmt::Display for EnclaveEvent { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(&format!("{:?}", self)) + let t = self.event_type(); + f.write_str(&format!("{} {:?}", colorize(t, Color::Cyan), self)) } } diff --git a/crates/events/src/eventbus.rs b/crates/events/src/eventbus.rs index 81d748b7ab..ddf2422ae6 100644 --- a/crates/events/src/eventbus.rs +++ b/crates/events/src/eventbus.rs @@ -5,7 +5,6 @@ // or FITNESS FOR A PARTICULAR PURPOSE. use crate::traits::{ErrorEvent, Event}; -use crate::{prelude::*, BusHandle, EnclaveEvent, Sequenced}; use actix::prelude::*; use bloom::{BloomFilter, ASMS}; use std::collections::{HashMap, VecDeque}; @@ -405,17 +404,3 @@ impl Handler for HistoryCollector { self.add_event(msg); } } - -////////////////////////////////////////////////////////////////////////////// -// Test Helper Functions -////////////////////////////////////////////////////////////////////////////// - -/// Function to help with testing when we want to maintain a vec of events -pub fn new_event_bus_with_history() -> (BusHandle, Addr>>) -{ - let consumer = EventBus::>::default().start(); - let bus: BusHandle = BusHandle::new_from_consumer(consumer); - let history = HistoryCollector::new().start(); - bus.subscribe("*", history.clone().recipient()); - (bus, history) -} diff --git a/crates/events/src/events.rs b/crates/events/src/events.rs new file mode 100644 index 0000000000..96450e4921 --- /dev/null +++ b/crates/events/src/events.rs @@ -0,0 +1,85 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use actix::{Message, Recipient}; + +use crate::{EnclaveEvent, Sequenced, Unsequenced}; + +/// Direct event received by the snapshot buffer in order to save snapshot to disk +#[derive(Message, Debug)] +#[rtype("()")] +pub struct CommitSnapshot(u64); + +impl CommitSnapshot { + pub fn new(seq: u64) -> Self { + Self(seq) + } + + pub fn seq(&self) -> u64 { + self.0 + } +} + +/// Direct event received by the EventStore to store an event +#[derive(Message, Debug)] +#[rtype("()")] +pub struct StoreEventRequested { + pub event: EnclaveEvent, + pub sender: Recipient, +} + +impl StoreEventRequested { + pub fn new( + event: EnclaveEvent, + sender: impl Into>, + ) -> Self { + Self { + event, + sender: sender.into(), + } + } +} + +/// Get events after timestamp in EventStore +#[derive(Message, Debug)] +#[rtype("()")] +pub struct GetEventsAfter { + pub ts: u128, + pub sender: Recipient, +} + +impl GetEventsAfter { + pub fn new(ts: u128, sender: impl Into>) -> Self { + Self { + ts, + sender: sender.into(), + } + } +} + +#[derive(Message, Debug)] +#[rtype("()")] +pub struct ReceiveEvents(Vec>); + +impl ReceiveEvents { + pub fn new(events: Vec) -> Self { + Self(events) + } + pub fn events(&self) -> &Vec { + &self.0 + } +} + +/// Direct event received by the Sequencer once an event has been stored +#[derive(Message, Debug)] +#[rtype("()")] +pub struct EventStored(pub EnclaveEvent); + +impl EventStored { + pub fn into_event(self) -> EnclaveEvent { + self.0 + } +} diff --git a/crates/events/src/eventstore.rs b/crates/events/src/eventstore.rs new file mode 100644 index 0000000000..5218459196 --- /dev/null +++ b/crates/events/src/eventstore.rs @@ -0,0 +1,79 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use crate::{ + events::{EventStored, StoreEventRequested}, + EventLog, GetEventsAfter, ReceiveEvents, SequenceIndex, +}; +use actix::{Actor, Handler}; +use anyhow::{bail, Result}; +use tracing::error; + +pub struct EventStore { + index: I, + log: L, +} + +impl EventStore { + pub fn handle_store_event_requested(&mut self, msg: StoreEventRequested) -> Result<()> { + let event = msg.event; + let sender = msg.sender; + let ts = event.get_ts(); + if let Some(_) = self.index.get(ts)? { + bail!("Event already stored at timestamp {ts}!"); + } + let seq = self.log.append(&event)?; + self.index.insert(ts, seq)?; + sender.try_send(EventStored(event.into_sequenced(seq)))?; + Ok(()) + } + + pub fn handle_get_events_after(&mut self, msg: GetEventsAfter) -> Result<()> { + // if there are no events after the timestamp return an empty vector + let Some(seq) = self.index.seek(msg.ts)? else { + msg.sender.try_send(ReceiveEvents::new(vec![]))?; + return Ok(()); + }; + // read and return the events + let evts = self + .log + .read_from(seq) + .map(|(s, e)| e.into_sequenced(s)) + .collect::>(); + msg.sender.try_send(ReceiveEvents::new(evts))?; + Ok(()) + } +} +impl EventStore { + pub fn new(index: I, log: L) -> Self { + Self { index, log } + } +} + +impl Actor for EventStore { + type Context = actix::Context; +} + +impl Handler for EventStore { + type Result = (); + fn handle(&mut self, msg: StoreEventRequested, _: &mut Self::Context) -> Self::Result { + match self.handle_store_event_requested(msg) { + Ok(_) => (), + Err(e) => panic!("{e}"), // panic here because when event storage fails we really need + // to just give up + } + } +} + +impl Handler for EventStore { + type Result = (); + fn handle(&mut self, msg: GetEventsAfter, _: &mut Self::Context) -> Self::Result { + match self.handle_get_events_after(msg) { + Ok(_) => (), + Err(e) => error!("{e}"), + } + } +} diff --git a/crates/events/src/hlc.rs b/crates/events/src/hlc.rs index 7a8cb3c11a..a49c077e79 100644 --- a/crates/events/src/hlc.rs +++ b/crates/events/src/hlc.rs @@ -159,9 +159,10 @@ impl From for HlcTimestamp { /// # Ok(()) /// # } /// ``` +#[derive(Clone)] pub struct Hlc { /// Inner state guarded by mutex - inner: Mutex, + inner: Arc>, /// Our node id node: u32, /// Maximum drift amount @@ -170,6 +171,7 @@ pub struct Hlc { clock: Option u64 + Send + Sync>>, } +#[derive(PartialEq)] struct HlcInner { ts: u64, counter: u32, @@ -182,12 +184,21 @@ impl Default for Hlc { } } +impl PartialEq for Hlc { + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.inner, &other.inner) + && self.node == other.node + && self.max_drift == other.max_drift + // note clock ignored because it is only used in testing + } +} + impl Hlc { const DEFAULT_MAX_DRIFT: u64 = 60_000_000; // 60 sec pub fn new(node: u32) -> Self { Self { - inner: Mutex::new(HlcInner { ts: 0, counter: 0 }), + inner: Arc::new(Mutex::new(HlcInner { ts: 0, counter: 0 })), node, max_drift: Self::DEFAULT_MAX_DRIFT, clock: None, @@ -203,7 +214,7 @@ impl Hlc { pub fn with_state(ts: u64, counter: u32, node: u32) -> Self { Self { - inner: Mutex::new(HlcInner { ts, counter }), + inner: Arc::new(Mutex::new(HlcInner { ts, counter })), node, max_drift: Self::DEFAULT_MAX_DRIFT, clock: None, diff --git a/crates/events/src/lib.rs b/crates/events/src/lib.rs index eac423f6e7..863ed76b54 100644 --- a/crates/events/src/lib.rs +++ b/crates/events/src/lib.rs @@ -10,7 +10,8 @@ mod e3id; mod enclave_event; mod event_id; mod eventbus; -mod eventbus_factory; +mod events; +mod eventstore; pub mod hlc; mod ordered_set; pub mod prelude; @@ -24,7 +25,9 @@ pub use e3id::*; pub use enclave_event::*; pub use event_id::*; pub use eventbus::*; -pub use eventbus_factory::*; +pub use events::*; +pub use eventstore::*; pub use ordered_set::*; pub use seed::*; +pub use sequencer::*; pub use traits::*; diff --git a/crates/events/src/sequencer.rs b/crates/events/src/sequencer.rs index 1a691e7f53..a521637def 100644 --- a/crates/events/src/sequencer.rs +++ b/crates/events/src/sequencer.rs @@ -4,20 +4,30 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. -use actix::{Actor, Addr, Handler}; +use actix::{Actor, Addr, AsyncContext, Handler, Recipient}; -use crate::{EnclaveEvent, EventBus, Sequenced, Unsequenced}; +use crate::{ + events::{CommitSnapshot, EventStored, StoreEventRequested}, + EnclaveEvent, EventBus, Sequenced, Unsequenced, +}; +/// Component to sequence the storage of events pub struct Sequencer { bus: Addr>>, - seq: u64, + eventstore: Recipient, + buffer: Recipient, } impl Sequencer { - pub fn new(bus: &Addr>>) -> Self { + pub fn new( + bus: &Addr>>, + eventstore: impl Into>, + buffer: impl Into>, + ) -> Self { Self { bus: bus.clone(), - seq: 0, + eventstore: eventstore.into(), + buffer: buffer.into(), } } } @@ -28,22 +38,31 @@ impl Actor for Sequencer { impl Handler> for Sequencer { type Result = (); - fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result { - // NOTE: FAKE SEQUENCER FOR NOW - JUST SET THE SEQUENCE NUMBER AND UPDATE - self.seq += 1; - self.bus.do_send(msg.into_sequenced(self.seq)) + fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { + self.eventstore + .do_send(StoreEventRequested::new(msg, ctx.address())) + } +} + +impl Handler for Sequencer { + type Result = (); + fn handle(&mut self, msg: EventStored, _: &mut Self::Context) -> Self::Result { + let event = msg.into_event(); + let seq = event.get_seq(); + self.buffer.do_send(CommitSnapshot::new(seq)); + self.bus.do_send(event) } } #[cfg(test)] mod tests { - - use crate::{prelude::*, BusHandle, EnclaveEvent, EventBus, TakeEvents, TestEvent}; - use actix::Actor; + use e3_ciphernode_builder::EventSystem; + use e3_events::{EnclaveEvent, EventPublisher, TakeEvents, TestEvent}; #[actix::test] async fn it_adds_seqence_numbers_to_events() -> anyhow::Result<()> { - let bus = BusHandle::new_from_consumer(EventBus::::default().start()); + let system = EventSystem::new("test"); + let bus = system.handle()?; let history = bus.history(); let event_data = vec![ diff --git a/crates/events/src/traits.rs b/crates/events/src/traits.rs index 8f2c5fed47..cdae6383a1 100644 --- a/crates/events/src/traits.rs +++ b/crates/events/src/traits.rs @@ -9,6 +9,8 @@ use anyhow::Result; use std::fmt::Display; use std::hash::Hash; +use crate::{EnclaveEvent, Unsequenced}; + /// Trait that must be implemented by events used with EventBus pub trait Event: Message + Clone + Display + Send + Sync + Unpin + Sized + 'static @@ -95,3 +97,21 @@ pub trait EventConstructorWithTimestamp: Event + Sized { pub trait CompositeEvent: EventConstructorWithTimestamp {} impl CompositeEvent for E where E: Sized + Event + EventConstructorWithTimestamp {} + +/// SequenceIndex is the index for each sequence which we can lookup based on HLC timestamp +pub trait SequenceIndex: Unpin + 'static { + /// Insert a sequence offset at the given timestamp + fn insert(&mut self, key: u128, value: u64) -> Result<()>; + /// Get the sequence offset for the given timestamp + fn get(&self, key: u128) -> Result>; + /// Get the first sequence offset at or after the given timestamp + fn seek(&self, key: u128) -> Result>; +} + +/// Store and retrieve events from a write ahead log +pub trait EventLog: Unpin + 'static { + /// Append an event to the log, returning its sequence number + fn append(&mut self, event: &EnclaveEvent) -> Result; + /// Read all events starting from the given sequence number (inclusive) + fn read_from(&self, from: u64) -> Box)>>; +} diff --git a/crates/evm/Cargo.toml b/crates/evm/Cargo.toml index c229bcc318..a230390952 100644 --- a/crates/evm/Cargo.toml +++ b/crates/evm/Cargo.toml @@ -30,4 +30,5 @@ zeroize = { workspace = true } [dev-dependencies] e3-entrypoint = { workspace = true } - +e3-ciphernode-builder = { workspace = true } +e3-events = { workspace = true, features = ["test-helpers"] } diff --git a/crates/evm/tests/integration.rs b/crates/evm/tests/integration.rs index 55bd774c09..385f890781 100644 --- a/crates/evm/tests/integration.rs +++ b/crates/evm/tests/integration.rs @@ -14,11 +14,11 @@ use alloy::{ sol_types::SolEvent, }; use anyhow::Result; +use e3_ciphernode_builder::EventSystem; use e3_data::Repository; use e3_entrypoint::helpers::datastore::get_in_mem_store; use e3_events::{ - new_event_bus_with_history, prelude::*, EnclaveEvent, EnclaveEventData, GetEvents, - HistoryCollector, Shutdown, TestEvent, + prelude::*, EnclaveEvent, EnclaveEventData, GetEvents, HistoryCollector, Shutdown, TestEvent, }; use e3_evm::{helpers::EthProvider, CoordinatorStart, EvmEventReader, HistoricalEventCoordinator}; use std::time::Duration; @@ -81,7 +81,9 @@ async fn evm_reader() -> Result<()> { ) .await?; let contract = EmitLogs::deploy(provider.provider()).await?; - let (bus, history_collector) = new_event_bus_with_history(); + let system = EventSystem::new("test").with_fresh_bus(); + let bus = system.handle()?; + let history_collector = bus.history(); let repository = Repository::new(get_in_mem_store()); let coordinator = HistoricalEventCoordinator::setup(bus.clone()); @@ -150,8 +152,9 @@ async fn ensure_historical_events() -> Result<()> { ) .await?; let contract = EmitLogs::deploy(provider.provider()).await?; - - let (bus, history_collector) = new_event_bus_with_history(); + let system = EventSystem::new("test").with_fresh_bus(); + let bus = system.handle()?; + let history_collector = bus.history(); let historical_msgs = vec!["these", "are", "historical", "events"]; let live_events = vec!["these", "events", "are", "live"]; @@ -229,7 +232,9 @@ async fn ensure_resume_after_shutdown() -> Result<()> { ) .await?; let contract = EmitLogs::deploy(provider.provider()).await?; - let (bus, history_collector) = new_event_bus_with_history(); + let system = EventSystem::new("test").with_fresh_bus(); + let bus = system.handle()?; + let history_collector = bus.history(); let repository = Repository::new(get_in_mem_store()); let coordinator = HistoricalEventCoordinator::setup(bus.clone()); @@ -336,7 +341,9 @@ async fn coordinator_single_reader() -> Result<()> { ) .await?; let contract = EmitLogs::deploy(provider.provider()).await?; - let (bus, history_collector) = new_event_bus_with_history(); + let system = EventSystem::new("test").with_fresh_bus(); + let bus = system.handle()?; + let history_collector = bus.history(); let repository = Repository::new(get_in_mem_store()); let coordinator = HistoricalEventCoordinator::setup(bus.clone()); @@ -409,7 +416,9 @@ async fn coordinator_multiple_readers() -> Result<()> { let contract1 = EmitLogs::deploy(provider.provider()).await?; let contract2 = EmitLogs::deploy(provider.provider()).await?; - let (bus, history_collector) = new_event_bus_with_history(); + let system = EventSystem::new("test").with_fresh_bus(); + let bus = system.handle()?; + let history_collector = bus.history(); let repository1 = Repository::new(get_in_mem_store()); let repository2 = Repository::new(get_in_mem_store()); @@ -492,7 +501,9 @@ async fn coordinator_no_historical_events() -> Result<()> { ) .await?; let contract = EmitLogs::deploy(provider.provider()).await?; - let (bus, history_collector) = new_event_bus_with_history(); + let system = EventSystem::new("test").with_fresh_bus(); + let bus = system.handle()?; + let history_collector = bus.history(); let repository = Repository::new(get_in_mem_store()); let coordinator = HistoricalEventCoordinator::setup(bus.clone()); diff --git a/crates/keyshare/src/threshold_keyshare.rs b/crates/keyshare/src/threshold_keyshare.rs index e577d81201..fc49d89a94 100644 --- a/crates/keyshare/src/threshold_keyshare.rs +++ b/crates/keyshare/src/threshold_keyshare.rs @@ -416,6 +416,8 @@ impl ThresholdKeyshare { msg: CiphernodeSelected, address: Addr, ) -> Result<()> { + info!("CiphernodeSelected received."); + // Ensure the collector is created let _ = self.ensure_collector(address.clone()); let _ = self.ensure_encryption_key_collector(address.clone()); diff --git a/crates/net/Cargo.toml b/crates/net/Cargo.toml index 8bca0864f6..8fb6f8ae49 100644 --- a/crates/net/Cargo.toml +++ b/crates/net/Cargo.toml @@ -26,6 +26,7 @@ tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } e3-events = { workspace = true } +e3-ciphernode-builder = { workspace = true } anyhow = { workspace = true } actix = { workspace = true } zeroize = { workspace = true } diff --git a/crates/net/src/document_publisher.rs b/crates/net/src/document_publisher.rs index 8f46ce7c4b..6e52feeee4 100644 --- a/crates/net/src/document_publisher.rs +++ b/crates/net/src/document_publisher.rs @@ -553,10 +553,10 @@ mod tests { use crate::events::NetCommand; use actix::Addr; use anyhow::{bail, Result}; + use e3_ciphernode_builder::EventSystem; use e3_events::{ BusHandle, CiphernodeSelected, DocumentKind, DocumentMeta, E3id, EnclaveError, - EnclaveEvent, EventBus, EventBusConfig, GetEvents, HistoryCollector, - PublishDocumentRequested, TakeEvents, + EnclaveEvent, GetEvents, HistoryCollector, PublishDocumentRequested, TakeEvents, }; use libp2p::kad::{GetRecordError, PutRecordError, RecordKey}; use tokio::{ @@ -565,7 +565,7 @@ mod tests { }; use tracing::subscriber::DefaultGuard; - fn setup_test() -> ( + fn setup_test() -> Result<( DefaultGuard, BusHandle, mpsc::Sender, @@ -575,7 +575,7 @@ mod tests { Addr>, Addr>, Addr, - ) { + )> { use tracing_subscriber::{fmt, EnvFilter}; let subscriber = fmt() @@ -585,8 +585,8 @@ mod tests { let guard = tracing::subscriber::set_default(subscriber); - let consumer = EventBus::::new(EventBusConfig { deduplicate: true }).start(); - let bus = BusHandle::new_from_consumer(consumer); + let system = EventSystem::new("test").with_fresh_bus(); + let bus = system.handle()?; let (net_cmd_tx, net_cmd_rx) = mpsc::channel(100); let (net_evt_tx, net_evt_rx) = broadcast::channel(100); let net_evt_rx = Arc::new(net_evt_rx); @@ -596,15 +596,15 @@ mod tests { bus.subscribe("EnclaveError", error.clone().recipient()); let publisher = DocumentPublisher::setup(&bus, &net_cmd_tx, &net_evt_rx, "topic"); - ( + Ok(( guard, bus, net_cmd_tx, net_cmd_rx, net_evt_tx, net_evt_rx, history, error, publisher, - ) + )) } #[actix::test] async fn test_publishes_document() -> Result<()> { let (_guard, bus, _net_cmd_tx, mut net_cmd_rx, net_evt_tx, _net_evt_rx, _, _, _) = - setup_test(); + setup_test()?; let value = ArcBytes::from_bytes(b"I am a special document"); let expires_at = Some(Utc::now() + chrono::Duration::days(1)); let e3_id = E3id::new("1243", 1); @@ -676,7 +676,7 @@ mod tests { #[actix::test] async fn test_get_document_fails_with_exponential_backoff() -> Result<()> { let (_guard, bus, _net_cmd_tx, mut net_cmd_rx, net_evt_tx, _net_evt_rx, _, errors, _) = - setup_test(); + setup_test()?; let value = b"I am a special document".to_vec(); let expires_at = Some(Utc::now() + chrono::Duration::days(1)); @@ -740,7 +740,7 @@ mod tests { _history, errors, _, - ) = setup_test(); + ) = setup_test()?; let value = ArcBytes::from_bytes(b"I am a special document"); let expires_at = Some(Utc::now() + chrono::Duration::days(1)); let e3_id = E3id::new("1243", 1); @@ -788,7 +788,7 @@ mod tests { #[actix::test] async fn test_notified_of_document() -> Result<()> { let (_guard, bus, _net_cmd_tx, mut net_cmd_rx, net_evt_tx, _net_evt_rx, history, _, _) = - setup_test(); + setup_test()?; let value = ArcBytes::from_bytes(b"I am a special document"); let expires_at = Utc::now() + chrono::Duration::days(1); diff --git a/crates/net/src/events.rs b/crates/net/src/events.rs index 0218cae8c1..e042eedd1d 100644 --- a/crates/net/src/events.rs +++ b/crates/net/src/events.rs @@ -7,10 +7,7 @@ use crate::Cid; use actix::Message; use anyhow::{bail, Context, Result}; -use e3_events::{ - CorrelationId, DocumentMeta, EnclaveEvent, EventConstructorWithTimestamp, Sequenced, - Unsequenced, -}; +use e3_events::{CorrelationId, DocumentMeta, EnclaveEvent, Sequenced, Unsequenced}; use e3_utils::ArcBytes; use libp2p::{ gossipsub::{MessageId, PublishError, TopicHash}, diff --git a/crates/net/src/net_event_translator.rs b/crates/net/src/net_event_translator.rs index d12abd5793..54126d7e13 100644 --- a/crates/net/src/net_event_translator.rs +++ b/crates/net/src/net_event_translator.rs @@ -29,7 +29,7 @@ use std::sync::Arc; use tokio::sync::broadcast; use tokio::sync::mpsc; use tracing::warn; -use tracing::{error, info, instrument, trace}; +use tracing::{info, instrument, trace}; // TODO: store event filtering here on this actor instead of is_local_only() on the event. We // should do this as this functionality is not global and ramifications should stay local to here diff --git a/crates/net/tests/Dockerfile b/crates/net/tests/Dockerfile index bf4f3ded8f..7c60d6d798 100644 --- a/crates/net/tests/Dockerfile +++ b/crates/net/tests/Dockerfile @@ -3,8 +3,17 @@ FROM rust:1.86 AS builder WORKDIR /app +RUN apt-get update && apt-get install -y --no-install-recommends \ + jq \ + software-properties-common \ + && rm -rf /var/lib/apt/lists/* + +RUN curl -L https://github.com/ethereum/solidity/releases/download/v0.8.27/solc-static-linux -o /usr/local/bin/solc \ + && chmod +x /usr/local/bin/solc + # Copy workspace files and all crates EXCEPT net COPY --exclude=crates/net . . + COPY ./crates/net/Cargo.toml ./crates/net/Cargo.toml RUN mkdir -p ./crates/net/src/bin && \ echo "fn main() {}" > ./crates/net/src/main.rs && \ diff --git a/crates/net/tests/docker-compose.yaml b/crates/net/tests/docker-compose.yaml index f75de41fbd..2741a121d3 100644 --- a/crates/net/tests/docker-compose.yaml +++ b/crates/net/tests/docker-compose.yaml @@ -1,9 +1,10 @@ x-common-variables: &common-variables SYNC_THRESHOLD: 6 - +x-common-image: &common-image + image: p2p_test:${IMAGE_TAG} services: alice: - image: p2p_test:latest + <<: *common-image command: ['/app/p2p_test', 'alice'] environment: <<: *common-variables @@ -12,9 +13,8 @@ services: TEST_CONFIG: 'lead' networks: - p2p_test_net - bob: - image: p2p_test:latest + <<: *common-image command: ['/app/p2p_test', 'bob'] environment: <<: *common-variables @@ -22,18 +22,16 @@ services: DIAL_TO: '/dns4/charlie/udp/9091/quic-v1' networks: - p2p_test_net - charlie: - image: p2p_test:latest + <<: *common-image command: ['/app/p2p_test', 'charlie'] environment: <<: *common-variables QUIC_PORT: 9091 networks: - p2p_test_net - daniel: - image: p2p_test:latest + <<: *common-image command: ['/app/p2p_test', 'daniel'] environment: <<: *common-variables @@ -41,9 +39,8 @@ services: DIAL_TO: '/dns4/charlie/udp/9091/quic-v1' networks: - p2p_test_net - eve: - image: p2p_test:latest + <<: *common-image command: ['/app/p2p_test', 'eve'] environment: <<: *common-variables @@ -51,9 +48,8 @@ services: DIAL_TO: '/dns4/charlie/udp/9091/quic-v1' networks: - p2p_test_net - fabian: - image: p2p_test:latest + <<: *common-image command: ['/app/p2p_test', 'fabian'] environment: <<: *common-variables @@ -61,7 +57,6 @@ services: DIAL_TO: '/dns4/charlie/udp/9091/quic-v1' networks: - p2p_test_net - networks: p2p_test_net: driver: bridge diff --git a/crates/net/tests/run.sh b/crates/net/tests/run.sh index 81a4fc589d..fa724bf041 100755 --- a/crates/net/tests/run.sh +++ b/crates/net/tests/run.sh @@ -1,16 +1,16 @@ #!/usr/bin/env bash - set -e - # Export env vars once for all docker compose commands export DOCKER_BUILDKIT=1 export COMPOSE_DOCKER_CLI_BUILD=1 +# Get the current commit SHA +export IMAGE_TAG=$(git rev-parse --short HEAD) + echo "" -echo "Building docker image" +echo "Building docker image (p2p_test:${IMAGE_TAG})" echo "" -docker build --network host -f ./Dockerfile -t p2p_test:latest ../../.. - +docker build --network host -f ./Dockerfile -t "p2p_test:${IMAGE_TAG}" ../../.. echo "" echo "NETWORK TESTS" echo "" diff --git a/crates/request/src/router.rs b/crates/request/src/router.rs index 3df1c2e1a9..82ee518b04 100644 --- a/crates/request/src/router.rs +++ b/crates/request/src/router.rs @@ -10,7 +10,6 @@ use crate::E3ContextParams; use crate::E3ContextSnapshot; use crate::E3MetaExtension; use crate::RouterRepositoryFactory; -use actix::Message; use actix::{Actor, Addr, Context, Handler}; use anyhow::*; use async_trait::async_trait; diff --git a/crates/sortition/Cargo.toml b/crates/sortition/Cargo.toml index d38cd3bf30..42bf45d7b0 100644 --- a/crates/sortition/Cargo.toml +++ b/crates/sortition/Cargo.toml @@ -17,6 +17,7 @@ e3-config = { workspace = true } e3-data = { workspace = true } e3-events = { workspace = true } e3-request = { workspace = true } +e3-utils = { workspace = true } num = { workspace = true } rand = { workspace = true } serde = { workspace = true } diff --git a/crates/sortition/src/ciphernode_selector.rs b/crates/sortition/src/ciphernode_selector.rs index 8de35d2885..5ae487dc7b 100644 --- a/crates/sortition/src/ciphernode_selector.rs +++ b/crates/sortition/src/ciphernode_selector.rs @@ -5,23 +5,26 @@ // or FITNESS FOR A PARTICULAR PURPOSE. use crate::sortition::{GetNodeIndex, Sortition}; -/// CiphernodeSelector is an actor that determines if a ciphernode is part of a committee and if so -/// emits a TicketGenerated event (score sortition) to the event bus use actix::prelude::*; -use e3_config::StoreKeys; -use e3_data::{DataStore, RepositoriesFactory}; +use anyhow::bail; +use anyhow::Result; +use e3_data::{AutoPersist, Persistable, Repository}; +use e3_events::E3RequestComplete; use e3_events::{ - prelude::*, trap, BusHandle, CiphernodeSelected, CommitteeFinalized, E3Requested, EType, + prelude::*, trap, BusHandle, CiphernodeSelected, CommitteeFinalized, E3Requested, E3id, EType, EnclaveEvent, EnclaveEventData, Shutdown, TicketGenerated, TicketId, }; -use e3_request::MetaRepositoryFactory; +use e3_request::E3Meta; +use std::collections::HashMap; use tracing::info; +/// CiphernodeSelector is an actor that determines if a ciphernode is part of a committee and if so +/// emits a TicketGenerated event (score sortition) to the event bus pub struct CiphernodeSelector { bus: BusHandle, sortition: Addr, address: String, - data_store: DataStore, + e3_cache: Persistable>, } impl Actor for CiphernodeSelector { @@ -32,30 +35,32 @@ impl CiphernodeSelector { pub fn new( bus: &BusHandle, sortition: &Addr, + e3_cache: Persistable>, address: &str, - data_store: &DataStore, ) -> Self { Self { bus: bus.clone(), sortition: sortition.clone(), + e3_cache, address: address.to_owned(), - data_store: data_store.clone(), } } - pub fn attach( + pub async fn attach( bus: &BusHandle, sortition: &Addr, + selector_store: Repository>, address: &str, - data_store: &DataStore, - ) -> Addr { - let addr = CiphernodeSelector::new(bus, sortition, address, data_store).start(); + ) -> Result> { + let e3_cache = selector_store.load_or_default(HashMap::new()).await?; + let addr = CiphernodeSelector::new(bus, sortition, e3_cache, address).start(); bus.subscribe("E3Requested", addr.clone().recipient()); bus.subscribe("CommitteeFinalized", addr.clone().recipient()); bus.subscribe("Shutdown", addr.clone().recipient()); - addr + info!("CiphernodeSelector listening!"); + Ok(addr) } } @@ -64,6 +69,7 @@ impl Handler for CiphernodeSelector { fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { match msg.into_data() { EnclaveEventData::E3Requested(data) => ctx.notify(data), + EnclaveEventData::E3RequestComplete(data) => ctx.notify(data), EnclaveEventData::CommitteeFinalized(data) => ctx.notify(data), EnclaveEventData::Shutdown(data) => ctx.notify(data), _ => (), @@ -80,6 +86,27 @@ impl Handler for CiphernodeSelector { let bus = self.bus.clone(); let chain_id = data.e3_id.chain_id(); + trap(EType::Sortition, &bus.clone(), || { + self.e3_cache.try_mutate(|mut cache| { + info!( + "Mutating e3_cache: appending data: {:?}", + data.e3_id.clone() + ); + cache.insert( + data.e3_id.clone(), + E3Meta { + seed: data.seed, + threshold_n: data.threshold_n, + threshold_m: data.threshold_m, + params: data.params, + esi_per_ct: data.esi_per_ct, + error_size: data.error_size, + }, + ); + Ok(cache) + }) + }); + Box::pin(async move { let seed = data.seed; let size = data.threshold_n; @@ -89,6 +116,8 @@ impl Handler for CiphernodeSelector { seed, size ); + // TODO: instead of this it would be better to pass the event theough sortition and + // then decorate it with this information WithIndex if let Ok(found_result) = sortition .send(GetNodeIndex { chain_id, @@ -125,62 +154,71 @@ impl Handler for CiphernodeSelector { } } +impl Handler for CiphernodeSelector { + type Result = (); + fn handle(&mut self, msg: E3RequestComplete, _: &mut Self::Context) -> Self::Result { + trap(EType::Sortition, &self.bus.clone(), move || { + self.e3_cache.try_mutate(|mut cache| { + cache.remove(&msg.e3_id); + Ok(cache) + }) + }) + } +} + impl Handler for CiphernodeSelector { - type Result = ResponseFuture<()>; + type Result = (); fn handle(&mut self, msg: CommitteeFinalized, _ctx: &mut Self::Context) -> Self::Result { - let address = self.address.clone(); - let bus = self.bus.clone(); - let e3_id = msg.e3_id.clone(); - let repositories = self - .data_store - .scope(StoreKeys::router()) - .scope(StoreKeys::context(&e3_id)) - .repositories(); - - // Check if this node is in the finalized committee - if !msg.committee.contains(&address) { - info!(node = address, "Node not in finalized committee"); - return Box::pin(async {}); - } + trap(EType::Sortition, &self.bus.clone(), move || { + info!("CiphernodeSelector received CommitteeFinalized."); + let bus = self.bus.clone(); + info!("Getting e3_cache..."); + let Some(e3_cache) = self.e3_cache.get() else { + bail!("Could not get cache"); + }; - Box::pin(async move { - // Retrieve E3 metadata from repository - let meta_repo = repositories.meta(&e3_id); - let Some(e3_meta) = meta_repo.read().await.ok().flatten() else { - info!( - node = address, - "No stored E3 metadata for {:?}, skipping", e3_id + info!("Getting e3_meta..."); + let Some(e3_meta) = e3_cache.get(&msg.e3_id) else { + bail!( + "Could not find E3Meta on CiphernodeSelector for {}", + msg.e3_id ); - return; }; - let Some(party_id) = msg.committee.iter().position(|addr| addr == &address) else { + // Check if this node is in the finalized committee + if !msg.committee.contains(&self.address) { + info!(node = self.address, "Node not in finalized committee"); + return Ok(()); + } + + // Retrieve E3 metadata from repository + let Some(party_id) = msg.committee.iter().position(|addr| addr == &self.address) else { info!( - node = address, + node = self.address, "Node address not found in committee list (should not happen)" ); - return; + return Ok(()); }; info!( - node = address, + node = self.address, party_id = party_id, "Node is in finalized committee, emitting CiphernodeSelected" ); - trap(EType::Sortition, &bus.clone(), || { - bus.publish(CiphernodeSelected { - party_id: party_id as u64, - e3_id, - threshold_m: e3_meta.threshold_m, - threshold_n: e3_meta.threshold_n, - esi_per_ct: e3_meta.esi_per_ct, - error_size: e3_meta.error_size, - params: e3_meta.params, - seed: e3_meta.seed, - })?; - Ok(()) - }) + + bus.publish(CiphernodeSelected { + party_id: party_id as u64, + e3_id: msg.e3_id, + threshold_m: e3_meta.threshold_m, + threshold_n: e3_meta.threshold_n, + esi_per_ct: e3_meta.esi_per_ct, + error_size: e3_meta.error_size.clone(), + params: e3_meta.params.clone(), + seed: e3_meta.seed, + })?; + + Ok(()) }) } } diff --git a/crates/sortition/src/repo.rs b/crates/sortition/src/repo.rs index 419c8db7d4..75a92d0ccb 100644 --- a/crates/sortition/src/repo.rs +++ b/crates/sortition/src/repo.rs @@ -9,6 +9,7 @@ use crate::sortition::NodeStateStore; use e3_config::StoreKeys; use e3_data::{Repositories, Repository}; use e3_events::E3id; +use e3_request::E3Meta; use std::collections::HashMap; pub trait SortitionRepositoryFactory { @@ -21,6 +22,16 @@ impl SortitionRepositoryFactory for Repositories { } } +pub trait CiphernodeSelectorFactory { + fn ciphernode_selector(&self) -> Repository>; +} + +impl CiphernodeSelectorFactory for Repositories { + fn ciphernode_selector(&self) -> Repository> { + Repository::new(self.store.scope(StoreKeys::ciphernode_selector())) + } +} + pub trait NodeStateRepositoryFactory { fn node_state(&self) -> Repository>; } diff --git a/crates/test-helpers/src/ciphernode_system.rs b/crates/test-helpers/src/ciphernode_system.rs index 2772ffac09..61ef319129 100644 --- a/crates/test-helpers/src/ciphernode_system.rs +++ b/crates/test-helpers/src/ciphernode_system.rs @@ -196,8 +196,9 @@ impl Deref for CiphernodeHistory { mod tests { use super::*; use actix::prelude::*; + use e3_ciphernode_builder::EventSystem; use e3_data::InMemStore; - use e3_events::{BusHandle, EventBus, EventBusConfig}; + use e3_events::{EventBus, EventBusConfig}; async fn mock_setup_node(address: String) -> Result { // Create mock actors for the test @@ -205,7 +206,7 @@ mod tests { let bus = EventBus::::new(EventBusConfig { deduplicate: true }).start(); let history = EventBus::::history(&bus); let errors = EventBus::::error(&bus); - let bus = BusHandle::new_from_consumer(bus); + let bus = EventSystem::new("test").with_event_bus(bus).handle()?; Ok(CiphernodeHandle { address, diff --git a/crates/test-helpers/src/lib.rs b/crates/test-helpers/src/lib.rs index c430d24db5..8f7540e14c 100644 --- a/crates/test-helpers/src/lib.rs +++ b/crates/test-helpers/src/lib.rs @@ -13,7 +13,7 @@ mod utils; use actix::prelude::*; use alloy::primitives::Address; use anyhow::*; -use e3_ciphernode_builder::CiphernodeHandle; +use e3_ciphernode_builder::{CiphernodeHandle, EventSystem}; use e3_events::{ BusHandle, CiphernodeAdded, EnclaveEvent, EnclaveEventData, EventBus, EventBusConfig, EventPublisher, HistoryCollector, Seed, Subscribe, @@ -94,8 +94,8 @@ pub fn get_common_setup( let moduli = param_set.moduli; let (crp_bytes, params) = create_crp_bytes_params(moduli, degree, plaintext_modulus, &seed); let crpoly = CommonRandomPoly::deserialize(&crp_bytes.clone(), ¶ms)?; - - Ok((bus.into(), rng, seed, params, crpoly, errors, history)) + let handle = EventSystem::in_mem("cn1").with_event_bus(bus).handle()?; + Ok((handle, rng, seed, params, crpoly, errors, history)) } /// Simulate libp2p by taking output events on each local bus and filter for !is_local_only() and forward remaining events back to the event bus @@ -126,21 +126,17 @@ pub fn get_common_setup( /// ``` pub fn simulate_libp2p_net(nodes: &[CiphernodeHandle]) { for node in nodes.iter() { - let source = node.bus().consumer(); + let source = node.bus(); for (_, node) in nodes.iter().enumerate() { - let dest = node.bus().consumer(); + let dest = node.bus(); if source != dest { - EventBus::pipe_filter( - source, - move |e: &EnclaveEvent| { - // TODO: Document publisher events need to be - // converted to DocumentReceived events - - NetEventTranslator::is_forwardable_event(e) - || DocumentPublisher::is_document_publisher_event(e) - }, - dest, - ) + source.pipe_to(dest, |e: &EnclaveEvent| { + // TODO: Document publisher events need to be + // converted to DocumentReceived events + + NetEventTranslator::is_forwardable_event(e) + || DocumentPublisher::is_document_publisher_event(e) + }); } else { println!("not piping bus to itself"); } diff --git a/crates/tests/Cargo.toml b/crates/tests/Cargo.toml index fe94458915..ee84285db7 100644 --- a/crates/tests/Cargo.toml +++ b/crates/tests/Cargo.toml @@ -46,3 +46,4 @@ zeroize = { workspace = true } [dev-dependencies] e3-events = { workspace = true, features = ["test-helpers"] } + diff --git a/crates/tests/tests/integration.rs b/crates/tests/tests/integration.rs index 4e7b924924..b53f3ca924 100644 --- a/crates/tests/tests/integration.rs +++ b/crates/tests/tests/integration.rs @@ -4,15 +4,14 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. -use actix::Actor; use alloy::primitives::{FixedBytes, I256, U256}; use anyhow::{bail, Result}; -use e3_ciphernode_builder::CiphernodeBuilder; +use e3_ciphernode_builder::{CiphernodeBuilder, EventSystem}; use e3_crypto::Cipher; use e3_events::{ prelude::*, BusHandle, CiphertextOutputPublished, CommitteeFinalized, ConfigurationUpdated, - E3Requested, E3id, EnclaveEvent, EnclaveEventData, EventBus, EventBusConfig, - OperatorActivationChanged, PlaintextAggregated, TicketBalanceUpdated, + E3Requested, E3id, EnclaveEventData, OperatorActivationChanged, PlaintextAggregated, + TicketBalanceUpdated, }; use e3_multithread::{GetReport, Multithread}; use e3_sdk::bfv_helpers::{build_bfv_params_arc, decode_bytes_to_vec_u64, encode_bfv_params}; @@ -87,6 +86,7 @@ fn serialize_report(report: &[(&str, Duration)]) -> String { #[actix::test] #[serial_test::serial] async fn test_trbfv_actor() -> Result<()> { + println!("Running test_trbfv_actor..."); let mut report: Vec<(&str, Duration)> = vec![]; let whole_test = Instant::now(); use tracing_subscriber::{fmt, EnvFilter}; @@ -118,9 +118,8 @@ async fn test_trbfv_actor() -> Result<()> { let rng = create_shared_rng_from_u64(42); // Create "trigger" bus - let bus: BusHandle = EventBus::::new(EventBusConfig { deduplicate: true }) - .start() - .into(); + let system = EventSystem::new("test").with_fresh_bus(); + let bus = system.handle()?; // Parameters (128bits of security) let (degree, plaintext_modulus, moduli) = ( @@ -180,7 +179,7 @@ async fn test_trbfv_actor() -> Result<()> { .add_group(1, || async { let addr = rand_eth_addr(&rng); println!("Building collector {}!", addr); - CiphernodeBuilder::new(rng.clone(), cipher.clone()) + CiphernodeBuilder::new(&addr, rng.clone(), cipher.clone()) .with_address(&addr) .with_injected_multithread(multithread.clone()) .testmode_with_history() @@ -196,7 +195,7 @@ async fn test_trbfv_actor() -> Result<()> { .add_group(6, || async { let addr = rand_eth_addr(&rng); println!("Building normal {}", &addr); - CiphernodeBuilder::new(rng.clone(), cipher.clone()) + CiphernodeBuilder::new(&addr, rng.clone(), cipher.clone()) .with_address(&addr) .with_injected_multithread(multithread.clone()) .with_trbfv() @@ -277,7 +276,6 @@ async fn test_trbfv_actor() -> Result<()> { let committee_finalized_timer = Instant::now(); let expected = vec!["E3Requested", "CommitteeFinalized"]; - let _ = nodes .take_history_with_timeout(0, expected.len(), Duration::from_secs(1000)) .await?; @@ -326,6 +324,7 @@ async fn test_trbfv_actor() -> Result<()> { let h = nodes .take_history_with_timeout(0, expected.len(), Duration::from_secs(1000)) .await?; + report.push(( "ThresholdShares -> PublicKeyAggregated", shares_to_pubkey_agg_timer.elapsed(), diff --git a/crates/tests/tests/integration_legacy.rs b/crates/tests/tests/integration_legacy.rs index be35b47dbe..4bfc160924 100644 --- a/crates/tests/tests/integration_legacy.rs +++ b/crates/tests/tests/integration_legacy.rs @@ -10,6 +10,7 @@ use alloy::primitives::{FixedBytes, I256, U256}; use anyhow::*; use e3_ciphernode_builder::CiphernodeBuilder; use e3_ciphernode_builder::CiphernodeHandle; +use e3_ciphernode_builder::EventSystem; use e3_crypto::Cipher; use e3_data::GetDump; use e3_data::InMemStore; @@ -20,8 +21,8 @@ use e3_events::Unsequenced; use e3_events::{ prelude::*, CiphernodeSelected, CiphertextOutputPublished, CommitteeFinalized, ConfigurationUpdated, E3Requested, E3id, EnclaveEvent, EventBus, EventBusConfig, - HistoryCollector, OperatorActivationChanged, OrderedSet, PlaintextAggregated, - PublicKeyAggregated, Seed, Shutdown, TakeEvents, TicketBalanceUpdated, + OperatorActivationChanged, OrderedSet, PlaintextAggregated, PublicKeyAggregated, Seed, + Shutdown, TakeEvents, TicketBalanceUpdated, }; use e3_net::events::GossipData; use e3_net::{events::NetEvent, NetEventTranslator}; @@ -52,10 +53,10 @@ async fn setup_local_ciphernode( rng: &SharedRng, logging: bool, addr: &str, - data: Option>, + store: Option>, cipher: &Arc, ) -> Result { - let mut builder = CiphernodeBuilder::new(rng.clone(), cipher.clone()) + let mut builder = CiphernodeBuilder::new(&addr, rng.clone(), cipher.clone()) .with_keyshare() .with_address(addr) .testmode_with_forked_bus(bus.consumer()) @@ -65,8 +66,8 @@ async fn setup_local_ciphernode( .with_plaintext_aggregation() .with_sortition_score(); - if let Some(data) = data { - builder = builder.with_datastore((&data).into()); + if let Some(ref in_mem_store) = store { + builder = builder.with_in_mem_datastore(in_mem_store); } if logging { @@ -361,9 +362,10 @@ async fn test_stopped_keyshares_retain_state() -> Result<()> { // Apply the address and data node to two new actors // Here we test that hydration occurred sucessfully - let bus = EventBus::::new(EventBusConfig { deduplicate: true }) - .start() - .into(); + + let bus = EventSystem::in_mem("cn2") + .with_event_bus(EventBus::::new(EventBusConfig { deduplicate: true }).start()) + .handle()?; let cn1 = setup_local_ciphernode( &bus, &rng, @@ -444,11 +446,9 @@ async fn test_p2p_actor_forwards_events_to_network() -> Result<()> { // Setup elements in test let (cmd_tx, mut cmd_rx) = mpsc::channel(100); // Transmit byte events to the network let (event_tx, _) = broadcast::channel(100); // Receive byte events from the network - let bus: BusHandle = EventBus::::new(EventBusConfig { deduplicate: true }) - .start() - .into(); - let history_collector = HistoryCollector::::new().start(); - bus.subscribe("*", history_collector.clone().recipient()); + let system = EventSystem::new("test"); + let bus = system.handle()?; + let history_collector = bus.history(); let event_rx = Arc::new(event_tx.subscribe()); // Pas cmd and event channels to NetEventTranslator NetEventTranslator::setup(&bus, &cmd_tx, &event_rx, "my-topic"); @@ -621,11 +621,9 @@ async fn test_p2p_actor_forwards_events_to_bus() -> Result<()> { // Setup elements in test let (cmd_tx, _) = mpsc::channel(100); // Transmit byte events to the network let (event_tx, event_rx) = broadcast::channel(100); // Receive byte events from the network - let bus: BusHandle = EventBus::::new(EventBusConfig { deduplicate: true }) - .start() - .into(); - let history_collector = HistoryCollector::::new().start(); - bus.subscribe("*", history_collector.clone().recipient()); + let system = EventSystem::new("test").with_fresh_bus(); + let bus = system.handle()?; + let history_collector = bus.history(); NetEventTranslator::setup(&bus, &cmd_tx, &Arc::new(event_rx), "mytopic"); diff --git a/crates/utils/src/formatters.rs b/crates/utils/src/formatters.rs index 48051a3830..b8094d900b 100644 --- a/crates/utils/src/formatters.rs +++ b/crates/utils/src/formatters.rs @@ -35,3 +35,26 @@ pub fn truncate(s: String) -> String { format!("", s.len(), start, end) } } + +pub enum Color { + Black = 30, + Red = 31, + Green = 32, + Yellow = 33, + Blue = 34, + Magenta = 35, + Cyan = 36, + White = 37, + BrightBlack = 90, + BrightRed = 91, + BrightGreen = 92, + BrightYellow = 93, + BrightBlue = 94, + BrightMagenta = 95, + BrightCyan = 96, + BrightWhite = 97, +} + +pub fn colorize(s: T, color: Color) -> String { + format!("\x1b[{}m{}\x1b[0m", color as u8, s) +}