diff --git a/crates/ciphernode-builder/src/ciphernode_builder.rs b/crates/ciphernode-builder/src/ciphernode_builder.rs index 821f8d7013..309995ceaf 100644 --- a/crates/ciphernode-builder/src/ciphernode_builder.rs +++ b/crates/ciphernode-builder/src/ciphernode_builder.rs @@ -102,11 +102,21 @@ pub enum KeyshareKind { } impl CiphernodeBuilder { - /// Create a new ciphernode builder. + /// Creates a new ciphernode builder configured with the given name, RNG, and cipher. /// - /// - name - Unique name for the ciphernode - /// - rng - Arc Mutex wrapped random number generator - /// - cipher - Cipher for encryption and decryption of sensitive data + /// The returned builder uses an in-memory event system by default, has no chains configured, + /// and leaves all optional features (keyshare, aggregation, multithread capture, logging, etc.) + /// disabled so callers can opt into the features they need. + /// + /// # Examples + /// + /// ``` + /// // Construct shared RNG and cipher according to your test/setup utilities. + /// let rng = SharedRng::default(); // replace with actual RNG initialization + /// let cipher = std::sync::Arc::new(Cipher::default()); // replace with actual Cipher initialization + /// let builder = CiphernodeBuilder::new("node1", rng, cipher); + /// assert_eq!(builder.name, "node1"); + /// ``` pub fn new(name: &str, rng: SharedRng, cipher: Arc) -> Self { Self { name: name.to_owned(), @@ -152,21 +162,62 @@ impl CiphernodeBuilder { self } - /// Use the Deprecated Keyshare feature + /// Enable the legacy non-threshold keyshare mechanism on the builder. + /// + /// This marks the builder to configure a non-threshold (deprecated) keyshare mode. + /// + /// # Examples + /// + /// ``` + /// let builder = CiphernodeBuilder::new("node", rng, cipher).with_keyshare(); + /// ``` #[deprecated = "in future versions we will migrate to with_trbfv()"] pub fn with_keyshare(mut self) -> Self { self.keyshare = Some(KeyshareKind::NonThreshold); self } - /// Use the given in-mem datastore. This is useful for injecting a store dump. + /// Configure the builder to use the provided in-memory datastore. + /// + /// This allows injecting a pre-populated in-memory store (for tests or restoring a store dump). + /// + /// # Parameters + /// + /// - `store`: address of the in-memory datastore to attach to the node. + /// + /// # Returns + /// + /// The updated `CiphernodeBuilder`. + /// + /// # Examples + /// + /// ```no_run + /// # use actix::Addr; + /// # use crate::{CiphernodeBuilder, InMemStore, SharedRng, Cipher}; + /// let rng: SharedRng = /* ... */; + /// let cipher: std::sync::Arc = /* ... */; + /// let builder = CiphernodeBuilder::new("node", rng, cipher); + /// let store_addr: Addr = /* create or obtain an in-memory store */ unimplemented!(); + /// let builder = builder.with_in_mem_datastore(&store_addr); + /// ``` pub fn with_in_mem_datastore(mut self, store: &Addr) -> Self { self.in_mem_store = Some(store.to_owned()); self } - /// Add persistence information for storing events and data. Without persistence information - /// the node will run in memory by default. + /// Configure the builder to use persisted on-disk storage for events and state. + /// + /// Sets the event system to `Persisted` with the given paths: `log_path` is used for the append-only event log + /// and `kv_path` is used for the key-value store backing repositories. + /// + /// # Examples + /// + /// ``` + /// use std::path::PathBuf; + /// + /// let builder = CiphernodeBuilder::new("node", rng, cipher) + /// .with_persistence(&PathBuf::from("events.log"), &PathBuf::from("store.kv")); + /// ``` pub fn with_persistence(mut self, log_path: &PathBuf, kv_path: &PathBuf) -> Self { self.event_system = EventSystemType::Persisted { log_path: log_path.to_owned(), @@ -295,6 +346,32 @@ impl CiphernodeBuilder { EventBus::::new(EventBusConfig { deduplicate: true }).start() } + /// Builds and starts a fully configured ciphernode and returns a runtime handle to its components. + /// + /// This initializes the event bus and event system (persisted or in-memory), sets up history and + /// error collectors if enabled, creates repositories and sortition, attaches contract readers and + /// optional writers for each configured chain, registers a ciphernode selector, starts the + /// historical event coordinator, attaches configured E3 router extensions (keyshare, FHE, + /// aggregation, etc.), builds the router, and returns a `CiphernodeHandle` that exposes the node's + /// address, store, event bus, and optional history/error collectors. + /// + /// # Returns + /// + /// `CiphernodeHandle` containing the node's Ethereum address, the backing store, the event bus + /// address, and optional history and error collector addresses. + /// + /// # Examples + /// + /// ``` + /// # use futures::executor::block_on; + /// # async fn example(builder: crate::CiphernodeBuilder) -> anyhow::Result<()> { + /// let handle = builder.build().await?; + /// // Use handle (address, store, bus, ...)... + /// drop(handle); + /// # Ok(()) + /// # } + /// # fn main() { let _ = block_on(async {}); } + /// ``` pub async fn build(mut self) -> anyhow::Result { // Local bus for ciphernode events can either be forked from a bus or it can be directly // attached to a source bus @@ -616,4 +693,4 @@ impl ProviderCaches { .insert(chain.clone(), write_provider.clone()); Ok(write_provider) } -} +} \ No newline at end of file diff --git a/crates/ciphernode-builder/src/event_system.rs b/crates/ciphernode-builder/src/event_system.rs index 73ad4dfba0..ada5d63545 100644 --- a/crates/ciphernode-builder/src/event_system.rs +++ b/crates/ciphernode-builder/src/event_system.rs @@ -84,12 +84,28 @@ pub struct EventSystem { } impl EventSystem { - /// Create a new in memory EventSystem with default settings + /// Creates an in-memory EventSystem configured from the provided name. + /// + /// # Examples + /// + /// ``` + /// let sys = EventSystem::new("node1"); + /// // `sys` is ready to be used with its in-memory backend. + /// ``` pub fn new(name: &str) -> Self { EventSystem::in_mem(name) } - /// Create an in memory EventSystem + /// Create an EventSystem configured to use an in-memory backend. + /// + /// The `node_id` string is hashed to derive the internal `u32` node identifier. + /// + /// # Examples + /// + /// ``` + /// let sys = EventSystem::in_mem("local-node"); + /// // `sys` uses in-memory stores and lazy-initializes actors when accessed. + /// ``` pub fn in_mem(node_id: &str) -> Self { Self { node_id: EventSystem::node_id(node_id), @@ -106,7 +122,16 @@ impl EventSystem { } } - /// Create an in memory EventSystem with a given store + /// Constructs an in-memory EventSystem that uses the provided `InMemStore`. + /// + /// The `node_id` string is used to derive the system's numeric node identifier. + /// + /// # Examples + /// + /// ``` + /// // Assume `store` is an existing Addr obtained elsewhere. + /// let system = EventSystem::in_mem_from_store("node-a", &store); + /// ``` pub fn in_mem_from_store(node_id: &str, store: &Addr) -> Self { Self { node_id: EventSystem::node_id(node_id), @@ -123,7 +148,27 @@ impl EventSystem { } } - /// Create a persisted EventSystem with datafiles at the given paths + /// Construct an EventSystem configured to use persisted storage at the specified file paths. + /// + /// The `node_id` string is hashed to derive the internal node identifier. `log_path` is the + /// filesystem path for the commit log, and `sled_path` is the path for the Sled key-value store. + /// + /// # Arguments + /// + /// * `node_id` - A human-readable identifier used to derive an internal u32 node identifier. + /// * `log_path` - Path to the commit log file to be used by the persisted event log. + /// * `sled_path` - Path to the directory used by the Sled database. + /// + /// # Returns + /// + /// An EventSystem instance configured to use persisted backends (CommitLog + Sled). + /// + /// # Examples + /// + /// ``` + /// use std::path::PathBuf; + /// let sys = EventSystem::persisted("node-a", PathBuf::from("commits.log"), PathBuf::from("sled_db")); + /// ``` pub fn persisted(node_id: &str, log_path: PathBuf, sled_path: PathBuf) -> Self { Self { node_id: EventSystem::node_id(node_id), @@ -142,13 +187,35 @@ impl EventSystem { } } - /// Pass in a specific given event bus + /// Sets the EventBus address to be used by this EventSystem. + /// + /// This overrides any previously configured bus for the instance. + /// + /// # Returns + /// + /// The EventSystem with the provided EventBus configured. + /// + /// # Examples + /// + /// ``` + /// // Assume `bus_addr` is an `Addr>` obtained elsewhere. + /// let sys = EventSystem::in_mem("node").with_event_bus(bus_addr); + /// ``` pub fn with_event_bus(self, bus: Addr>) -> Self { let _ = self.eventbus.set(bus); self } - /// Use a fresh event bus that is not the default singleton instance + /// Replaces the system's EventBus with a newly created, deduplicating EventBus. + /// + /// Returns the modified EventSystem with its event bus set to a fresh instance. + /// + /// # Examples + /// + /// ``` + /// let sys = EventSystem::in_mem("node").with_fresh_bus(); + /// // `sys` now uses a fresh EventBus with deduplication enabled + /// ``` pub fn with_fresh_bus(self) -> Self { let _ = self .eventbus @@ -156,18 +223,44 @@ impl EventSystem { self } - /// Add an injected hlc + /// Injects a high-level clock (Hlc) to be used by the EventSystem. + /// + /// This sets the HLC instance that the event system will use for generating timestamps + /// and returns the moved `EventSystem` to allow builder-style chaining. + /// + /// # Examples + /// + /// ``` + /// let sys = EventSystem::in_mem("node").with_hlc(Hlc::default()); + /// ``` pub fn with_hlc(self, hlc: Hlc) -> Self { let _ = self.hlc.set(hlc); self } - /// Get the eventbus address + /// Obtain the EventBus actor address used by the system. + /// + /// This will lazily initialize and return the shared EventBus if one is not already configured. + /// + /// # Examples + /// + /// ``` + /// let system = EventSystem::new("node"); + /// let bus = system.eventbus(); + /// // `bus` is an Addr> + /// ``` pub fn eventbus(&self) -> Addr> { self.eventbus.get_or_init(get_enclave_event_bus).clone() } - /// Get the buffer address + /// Returns the system's WriteBuffer actor address, creating the buffer if it does not yet exist and initiating wiring with other components when possible. + /// + /// # Examples + /// + /// ``` + /// let sys = EventSystem::new("node"); + /// let _buffer_addr = sys.buffer(); + /// ``` pub fn buffer(&self) -> Addr { let buffer = self .buffer @@ -177,7 +270,20 @@ impl EventSystem { buffer } - /// Get the sequencer address + /// Obtain the Sequencer actor address, initializing and starting it if it has not been created yet. + /// + /// The Sequencer is created using the system's EventBus, EventStore, and WriteBuffer when first requested. + /// + /// # Returns + /// + /// The address of the Sequencer actor. + /// + /// # Examples + /// + /// ``` + /// let system = EventSystem::new("node"); + /// let _sequencer = system.sequencer().unwrap(); + /// ``` pub fn sequencer(&self) -> Result> { self.sequencer .get_or_try_init(|| match self.eventstore()? { @@ -191,7 +297,25 @@ impl EventSystem { .cloned() } - /// Get the EventStore address + /// Retrieve the address of the configured EventStore, initializing it if necessary. + /// + /// Initializes and returns an in-memory EventStore for the in-memory backend or initializes + /// the sled sequence index and commit log and returns a persisted EventStore for the persisted backend. + /// + /// # Errors + /// + /// Returns an error if persisted backend initialization (sled index or commit log) fails. + /// + /// # Examples + /// + /// ``` + /// let sys = EventSystem::in_mem("node"); + /// let store_addr = sys.eventstore().unwrap(); + /// match store_addr { + /// EventStoreAddr::InMem(_) => {}, + /// EventStoreAddr::Persisted(_) => panic!("expected in-memory backend"), + /// } + /// ``` pub fn eventstore(&self) -> Result { match &self.backend { EventSystemBackend::InMem(b) => { @@ -217,14 +341,36 @@ impl EventSystem { } } - /// Get an instance of the Hlc + /// Provides the system's high-level clock, initializing it with the system node id if it has not been created yet. + /// + /// The returned `Hlc` is a clone of the internally stored clock instance. + /// + /// # Examples + /// + /// ``` + /// let sys = EventSystem::in_mem("node-1"); + /// let _hlc = sys.hlc().unwrap(); + /// ``` pub fn hlc(&self) -> Result { self.hlc .get_or_try_init(|| Ok(Hlc::new(self.node_id))) .cloned() } - /// Get the BusHandle + /// Returns a BusHandle connected to this EventSystem, initializing it lazily if needed. + /// + /// The returned handle coordinates the eventbus, sequencer, and HLC for publishing and querying events. + /// + /// # Errors + /// + /// Returns an `Err` if initialization of the sequencer or HLC fails. + /// + /// # Examples + /// + /// ``` + /// let system = EventSystem::new("node"); + /// let handle = system.handle().expect("failed to create BusHandle"); + /// ``` pub fn handle(&self) -> Result { self.handle .get_or_try_init(|| { @@ -237,7 +383,31 @@ impl EventSystem { .cloned() } - /// Get the DataStore + /// Obtain a DataStore view backed by the event system's configured backend. + /// + /// This returns a DataStore that routes write operations through the system's + /// WriteBuffer and is backed by either an in-memory store or a sled-backed store + /// depending on the EventSystem backend configuration. If the underlying store + /// has not yet been created it will be initialized lazily. Wiring between the + /// buffer and the store is attempted before returning. + /// + /// # Errors + /// + /// Returns an error if initializing or accessing the persisted store fails (for + /// the persisted backend). + /// + /// # Examples + /// + /// ``` + /// # use actix::System; + /// # use ciphernode_builder::event_system::EventSystem; + /// let sys = System::new(); + /// sys.block_on(async { + /// let es = EventSystem::new("node"); + /// let ds = es.store().unwrap(); + /// // use `ds` for reads/writes... + /// }); + /// ``` pub fn store(&self) -> Result { let store = match &self.backend { EventSystemBackend::InMem(b) => { @@ -264,6 +434,16 @@ impl EventSystem { // We need to ensure that once the buffer and store are created they are connected so that // inserts are sent between the two actors. This internal function ensures this happens. + /// Ensures the write buffer is forwarded to the underlying store when both are initialized. + /// + /// When both the system's WriteBuffer and a store `Recipient` are available, instructs the buffer to forward batches to that store. The operation is idempotent: subsequent calls do nothing once wiring has occurred. + /// + /// # Examples + /// + /// ``` + /// // Given an initialized `EventSystem` named `es`: + /// es.wire_if_ready(); + /// ``` fn wire_if_ready(&self) { let buffer = match self.buffer.get() { Some(b) => b, @@ -285,6 +465,17 @@ impl EventSystem { }); } + /// Derives a deterministic 32-bit node identifier from a name string. + /// + /// The result is a u32 value computed by hashing `name`. + /// + /// # Examples + /// + /// ``` + /// let a = node_id("alice"); + /// let b = node_id("alice"); + /// assert_eq!(a, b); + /// ``` fn node_id(name: &str) -> u32 { let mut hasher = DefaultHasher::new(); name.hash(&mut hasher); @@ -325,6 +516,23 @@ mod tests { impl Handler for Listener { type Result = (); + /// Handles an incoming `EnclaveEvent` and records the contained `TestEvent` message. + /// + /// When the event's payload is a `TestEvent`, its `msg` field is appended to the listener's + /// `logs` collection; other event types are ignored. + /// + /// # Examples + /// + /// ``` + /// # use ciphernode_builder::event_system::{Listener, EnclaveEvent, EnclaveEventData, TestEvent}; + /// # use actix::Context; + /// let mut listener = Listener { logs: Vec::new(), events: Vec::new() }; + /// // construct a TestEvent-wrapped EnclaveEvent (details depend on crate constructors) + /// let test_ev = EnclaveEvent::from(EnclaveEventData::TestEvent(TestEvent { msg: "hello".into(), ts: 0 })); + /// // call the handler directly (context parameter is not used) + /// listener.handle(test_ev, &mut Context::from_waker(std::task::noop_waker_ref())); + /// assert_eq!(listener.logs.last().map(String::as_str), Some("hello")); + /// ``` fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result { if let EnclaveEventData::TestEvent(TestEvent { msg, .. }) = msg.into_data() { self.logs.push(msg); @@ -334,6 +542,16 @@ mod tests { impl Handler for Listener { type Result = Vec; + /// Returns a clone of the listener's stored logs. + /// + /// # Examples + /// + /// ``` + /// // Construct a listener and retrieve its logs. + /// let mut listener = Listener { logs: vec!["entry".to_string()], events: vec![] }; + /// let logs = listener.logs.clone(); + /// assert_eq!(logs, vec!["entry".to_string()]); + /// ``` fn handle(&mut self, _: GetLogs, _: &mut Self::Context) -> Self::Result { self.logs.clone() } @@ -341,6 +559,16 @@ mod tests { impl Handler for Listener { type Result = Vec; + /// Collects the `msg` fields from any `TestEvent` entries in the listener's stored events. + /// + /// # Examples + /// + /// ``` + /// // assume `listener` is a mutable Listener with some EnclaveEvent entries, + /// // and `ctx` is a mutable actor context available in the test. + /// let msgs: Vec = listener.handle(GetEvents, &mut ctx); + /// // `msgs` now contains the `msg` of each `TestEvent` in insertion order. + /// ``` fn handle(&mut self, _: GetEvents, _: &mut Self::Context) -> Self::Result { self.events .iter() @@ -356,6 +584,13 @@ mod tests { impl Handler for Listener { type Result = (); + /// Replace the listener's stored events with the events carried by the incoming message. + /// + /// # Parameters + /// + /// - `msg`: message containing the sequence of `EnclaveEvent`s to store in the listener. + /// + /// The handler assigns the message's events to the actor's `events` field. fn handle(&mut self, msg: ReceiveEvents, _: &mut Self::Context) -> Self::Result { self.events = msg.events().clone(); } @@ -365,6 +600,25 @@ mod tests { type Context = actix::Context; } + /// Verifies that a persisted EventSystem initializes its components and automatically wires the buffer to the store. + /// + /// Creates a persisted EventSystem, obtains its BusHandle and DataStore, and asserts that wiring has occurred. + /// + /// # Examples + /// + /// ``` + /// #[actix::test] + /// async fn test_persisted() { + /// let tmp = tempfile::tempdir().unwrap(); + /// let system = EventSystem::persisted("cn2", tmp.path().join("log"), tmp.path().join("sled")); + /// + /// let _handle = system.handle().expect("Failed to get handle"); + /// system.store().expect("Failed to get store"); + /// + /// // Wiring happened automatically + /// assert!(system.wired.get().is_some()); + /// } + /// ``` #[actix::test] async fn test_persisted() { let tmp = TempDir::new().unwrap(); @@ -389,6 +643,29 @@ mod tests { assert!(system.wired.get().is_some()); } + /// Integration test that verifies in-memory EventSystem wiring, eventual consistency, event publishing, and event retrieval. + /// + /// This test sets up an in-memory EventSystem with a fresh EventBus, attaches a listener actor, + /// writes values to the DataStore (which are only persisted after the corresponding event is published), + /// publishes events, asserts the listener receives the events in order, and queries the in-memory EventStore + /// for events after a captured timestamp. + /// + /// # Examples + /// + /// ``` + /// # async fn run() -> anyhow::Result<()> { + /// let system = EventSystem::in_mem("cn1").with_fresh_bus(); + /// let handle = system.handle()?; + /// let datastore = system.store()?; + /// let listener = Listener { logs: Vec::new(), events: Vec::new() }.start(); + /// handle.subscribe("*", listener.clone().into()); + /// datastore.scope("/a").write("v".to_string()); + /// assert_eq!(datastore.scope("/a").read::().await?, None); + /// handle.publish(TestEvent::new("e", 1))?; + /// tokio::time::sleep(std::time::Duration::from_millis(1)).await; + /// assert_eq!(datastore.scope("/a").read::().await?, Some("v".to_string())); + /// # Ok(()) } + /// ``` #[actix::test] async fn test_event_system() -> Result<()> { let system = EventSystem::in_mem("cn1").with_fresh_bus(); @@ -468,4 +745,4 @@ mod tests { assert_eq!(events, vec!["yellow", "red", "white"]); Ok(()) } -} +} \ No newline at end of file diff --git a/crates/ciphernode-builder/src/eventbus_factory.rs b/crates/ciphernode-builder/src/eventbus_factory.rs index a8c6e9a490..9acb5b21fe 100644 --- a/crates/ciphernode-builder/src/eventbus_factory.rs +++ b/crates/ciphernode-builder/src/eventbus_factory.rs @@ -92,12 +92,36 @@ pub fn get_enclave_event_bus() -> Addr> { EventBusFactory::instance().get_event_bus() } +/// Retrieve the process-wide `HistoryCollector` for enclave events. +/// +/// Returns an `Addr>` that collects error/history events for the enclave event bus. +/// +/// # Examples +/// +/// ``` +/// let collector = get_error_collector(); +/// // `collector` is an `Addr>` you can use to interact with the collector. +/// ``` pub fn get_error_collector() -> Addr> { EventBusFactory::instance().get_error_collector() } +/// Creates a BusHandle for the enclave event bus using the provided AppConfig. +/// +/// The returned `BusHandle` is configured with an EventSystem named after `config.name()`. +/// +/// # Errors +/// +/// Returns an error if constructing the system handle fails. +/// +/// # Examples +/// +/// ``` +/// let config = AppConfig::default(); +/// let handle = get_enclave_bus_handle(&config).expect("failed to create enclave bus handle"); +/// ``` pub fn get_enclave_bus_handle(config: &AppConfig) -> anyhow::Result { let bus = get_enclave_event_bus(); let system = EventSystem::new(&config.name()).with_event_bus(bus); Ok(system.handle()?) -} +} \ No newline at end of file diff --git a/crates/config/src/app_config.rs b/crates/config/src/app_config.rs index 8bed2fe5d5..537f37da12 100644 --- a/crates/config/src/app_config.rs +++ b/crates/config/src/app_config.rs @@ -75,6 +75,31 @@ pub struct NodeDefinition { } impl Default for NodeDefinition { + /// Constructs a NodeDefinition populated with conventional default values. + /// + /// Defaults: + /// - `peers`: empty list + /// - `address`: `None` + /// - `quic_port`: 9091 + /// - `key_file`: "key" + /// - `db_file`: "db" + /// - `log_file`: "log" + /// - `config_dir` and `data_dir`: empty `PathBuf` (placeholders for OS defaults) + /// - `role`: `NodeRole::Ciphernode` + /// - `autonetkey`, `autopassword`, `autowallet`: `false` + /// + /// # Examples + /// + /// ``` + /// use std::path::PathBuf; + /// let def = NodeDefinition::default(); + /// assert_eq!(def.quic_port, 9091); + /// assert_eq!(def.key_file, PathBuf::from("key")); + /// assert_eq!(def.db_file, PathBuf::from("db")); + /// assert_eq!(def.log_file, PathBuf::from("log")); + /// assert!(def.peers.is_empty()); + /// assert!(matches!(def.role, NodeRole::Ciphernode)); + /// ``` fn default() -> Self { Self { peers: vec![], // NOTE: We should look at generation via ipns fetch for the latest nodes @@ -179,6 +204,40 @@ pub struct AppConfig { } impl AppConfig { + /// Convert an UnscopedAppConfig into a scoped AppConfig for the given node name. + /// + /// This validates that the requested node exists (and that `_default` is not used as an explicit node name), + /// applies node and global overrides for config and data directories, builds the paths engine (including + /// db, key, and log file locations), and returns an AppConfig instantiated for the named node. + /// + /// # Errors + /// + /// Returns an error if `name` is not present in `config.nodes` or if the reserved name `_default` is used + /// as an explicit node profile in `config.nodes`. + /// + /// # Examples + /// + /// ``` + /// use std::path::PathBuf; + /// + /// // Build a minimal unscoped config with a single node named "node1" + /// let mut unscoped = crate::config::UnscopedAppConfig::default(); + /// unscoped.nodes.insert("node1".to_string(), crate::config::NodeDefinition::default()); + /// + /// let default_data_dir = PathBuf::from("/tmp/data"); + /// let default_config_dir = PathBuf::from("/tmp/config"); + /// let cwd = PathBuf::from("."); + /// + /// let scoped = crate::config::AppConfig::try_from_unscoped( + /// "node1", + /// unscoped, + /// &default_data_dir, + /// &default_config_dir, + /// &cwd, + /// ).unwrap(); + /// + /// assert_eq!(scoped.name(), "node1"); + /// ``` pub fn try_from_unscoped( name: &str, config: UnscopedAppConfig, @@ -246,16 +305,50 @@ impl AppConfig { self.paths.key_file() } - /// Get the database file + /// Get the node's database file path. + /// + /// # Returns + /// + /// The filesystem path of the node's database file. + /// + /// # Examples + /// + /// ``` + /// // given an `AppConfig` instance `cfg` + /// let path = cfg.db_file(); + /// ``` pub fn db_file(&self) -> PathBuf { self.paths.db_file() } - /// Get the log file + /// Returns the resolved path to the node's log file. + /// + /// # Returns + /// + /// `PathBuf` containing the resolved log file path for the current node. + /// + /// # Examples + /// + /// ```no_run + /// // assuming `app_config` is an initialized `AppConfig` + /// let log_path = app_config.log_file(); + /// println!("Log file: {}", log_path.display()); + /// ``` pub fn log_file(&self) -> PathBuf { self.paths.log_file() } + /// Returns the NodeDefinition entry for the currently selected node name. + /// + /// Panics if the configuration does not contain a node definition for `self.name`. + /// + /// # Examples + /// + /// ``` + /// // assuming `cfg` is an AppConfig with a node definition named "node1" + /// let node_def = cfg.node_def(); + /// // use `node_def` to access node-specific settings, e.g. `node_def.quic_port` + /// ``` fn node_def(&self) -> &NodeDefinition { // NOTE: on creation an invariant we have is that our node name is an extant key in our // nodes datastructure so expect here is ok and we dont have to clone the NodeDefinition @@ -810,4 +903,4 @@ chains: Ok(()) }); } -} +} \ No newline at end of file diff --git a/crates/config/src/paths_engine.rs b/crates/config/src/paths_engine.rs index 891fceb0f3..5943e7f28a 100644 --- a/crates/config/src/paths_engine.rs +++ b/crates/config/src/paths_engine.rs @@ -48,6 +48,32 @@ pub const DEFAULT_LOG_NAME: &str = "log"; // the config file. Otherwise locate config in the default app configuration folder and data in // the default app data folder. impl PathsEngine { + /// Constructs a new `PathsEngine` configured with the provided name, working directory, + /// defaults, and optional overrides for config, data, db, key, and log locations. + /// + /// The `name` is used as an identifier when building per-application subpaths. Path + /// arguments are cloned into the engine; `Option<&PathBuf>` arguments are converted to + /// owned `Option`. + /// + /// # Examples + /// + /// ``` + /// use std::path::PathBuf; + /// let engine = crate::PathsEngine::new( + /// "myapp", + /// &PathBuf::from("."), + /// &PathBuf::from("/var/lib/myapp"), + /// &PathBuf::from("/etc/myapp"), + /// None, // found_config_file + /// None, // config_dir_override + /// None, // data_dir_override + /// None, // db_file_override + /// None, // key_file_override + /// None, // log_file_override + /// ); + /// let cfg = engine.config_file(); + /// assert!(cfg.ends_with("enclave.config.yaml")); + /// ``` pub fn new( name: &str, cwd: &PathBuf, @@ -99,7 +125,36 @@ impl PathsEngine { ) } - /// Full path to the database file containing the db + /// Resolve the full filesystem path to the database file for this engine. + /// + /// Chooses the database path in the following order: + /// - If `db_file_override` is set and absolute, return that cleaned path. + /// - If `db_file_override` is set and relative, return it joined under `//`. + /// - Otherwise return `//db`. + /// + /// # Examples + /// + /// ```no_run + /// use std::path::PathBuf; + /// // Construct a PathsEngine (arguments: name, cwd, default_data_dir, default_config_dir, + /// // config_dir_override, found_config_file, data_dir_override, db_file_override, log_file_override, key_file_override) + /// let engine = PathsEngine::new( + /// "app", + /// PathBuf::from("."), + /// PathBuf::from("/var/lib/app"), + /// PathBuf::from("/etc/app"), + /// None, + /// None, + /// None, + /// None, + /// None, + /// None, + /// ); + /// let db = engine.db_file(); + /// assert_eq!(db, PathBuf::from("/var/lib/app").join("app").join("db")); + /// ``` + /// + /// @returns A `PathBuf` containing the cleaned path to the database file. pub fn db_file(&self) -> PathBuf { if let Some(data_file) = self.db_file_override.clone() { if data_file.is_absolute() { @@ -112,6 +167,53 @@ impl PathsEngine { clean(self.get_data_dir().join(&self.name).join(DEFAULT_DB_NAME)) } + /// Resolves the log file path for this engine instance. + /// + /// If a `log_file_override` is set, an absolute override is returned as-is (cleaned); + /// a relative override is interpreted under `//` and returned cleaned. + /// If no override is provided, returns `//log` cleaned. + /// + /// # Returns + /// + /// A `PathBuf` pointing to the resolved log file. + /// + /// # Examples + /// + /// ``` + /// use std::path::PathBuf; + /// + /// // Construct PathsEngine with a relative log override + /// let engine = PathsEngine::new( + /// "app", + /// None, // config_dir_override + /// None, // found_config_file + /// None, // data_dir_override + /// None, // db_file_override + /// Some(&PathBuf::from("custom.log")), // log_file_override (relative) + /// None, // key_file_override + /// &PathBuf::from("/var/lib"), // default_data_dir + /// &PathBuf::from("/etc"), // default_config_dir + /// &PathBuf::from("."), // cwd + /// ); + /// + /// let log = engine.log_file(); + /// assert!(log.ends_with("app/custom.log")); + /// + /// // Absolute override is preserved + /// let engine_abs = PathsEngine::new( + /// "app", + /// None, + /// None, + /// None, + /// None, + /// Some(&PathBuf::from("/tmp/app.log")), // absolute override + /// None, + /// &PathBuf::from("/var/lib"), + /// &PathBuf::from("/etc"), + /// &PathBuf::from("."), + /// ); + /// assert_eq!(engine_abs.log_file(), PathBuf::from("/tmp/app.log")); + /// ``` pub fn log_file(&self) -> PathBuf { if let Some(log_file) = self.log_file_override.clone() { if log_file.is_absolute() { @@ -123,6 +225,50 @@ impl PathsEngine { clean(self.get_data_dir().join(&self.name).join(DEFAULT_LOG_NAME)) } + /// Resolve a path relative to the configuration file's directory. + /// + /// If `path` is absolute, it is returned unchanged. If `path` is relative, it is joined to the + /// directory containing the resolved configuration file (or to the current working directory if + /// the config file has no parent) and cleaned. + /// + /// # Parameters + /// + /// - `path`: the path to resolve; treated as absolute if `path.is_absolute()` is true. + /// + /// # Returns + /// + /// A `PathBuf` representing `path` resolved against the configuration directory or returned as-is + /// for absolute inputs. + /// + /// # Examples + /// + /// ```no_run + /// use std::path::PathBuf; + /// + /// // Construct a PathsEngine (arguments shown for illustration; adapt to actual constructor). + /// let engine = PathsEngine::new( + /// "app", + /// &PathBuf::from("/home/user/project"), + /// &PathBuf::from("/var/lib/app"), + /// &PathBuf::from("/etc/app"), + /// None, // config_dir_override + /// None, // found_config_file + /// None, // data_dir_override + /// None, // db_file_override + /// None, // log_file_override + /// None, // key_file_override + /// ); + /// + /// // Relative path is resolved against the config dir + /// let rel = PathBuf::from("sub/setting.yaml"); + /// let resolved = engine.relative_to_config(&rel); + /// assert!(resolved.is_absolute()); + /// + /// // Absolute path is returned unchanged + /// let abs = PathBuf::from("/tmp/override.yaml"); + /// let same = engine.relative_to_config(&abs); + /// assert_eq!(same, abs); + /// ``` pub fn relative_to_config(&self, path: &PathBuf) -> PathBuf { if path.is_absolute() { return PathBuf::from(path); @@ -203,6 +349,34 @@ mod test { log_file: &'static str, } + /// Runs a collection of path-resolution test cases, asserting that each `PathsEngine` + /// result matches the provided expectations. + /// + /// # Examples + /// + /// ``` + /// // Construct a single test case and run it with the helper. + /// let input = PathsInput { + /// name: "app", + /// cwd: ".", + /// default_data_dir: "/tmp/data", + /// default_config_dir: "/etc/app", + /// config_dir_override: None, + /// found_config_file: None, + /// data_dir_override: None, + /// db_file_override: None, + /// log_file_override: None, + /// key_file_override: None, + /// }; + /// let expected = PathsExpected { + /// config_file: "/etc/app/enclave.config.yaml", + /// key_file: "/etc/app/app/key", + /// db_file: "/tmp/data/app/db", + /// log_file: "/tmp/data/app/log", + /// }; + /// let tc = TestCase { name: "defaults", input, expected }; + /// test_cases(vec![tc]); + /// ``` fn test_cases(test_cases: Vec) { // Run all test cases for test_case in test_cases { @@ -434,4 +608,4 @@ mod test { }, ]); } -} +} \ No newline at end of file diff --git a/crates/config/src/store_keys.rs b/crates/config/src/store_keys.rs index 6b0eae21ae..dcac3d0ec7 100644 --- a/crates/config/src/store_keys.rs +++ b/crates/config/src/store_keys.rs @@ -69,11 +69,29 @@ impl StoreKeys { String::from("//node_state") } + /// Path for finalized committees in the store. + /// + /// Produces the store key string `"//finalized_committees"`. + /// + /// # Examples + /// + /// ``` + /// let key = finalized_committees(); + /// assert_eq!(key, "//finalized_committees".to_string()); + /// ``` pub fn finalized_committees() -> String { String::from("//finalized_committees") } + /// Constructs the store key path for the ciphernode selector. + /// + /// # Examples + /// + /// ``` + /// let key = ciphernode_selector(); + /// assert_eq!(key, "//ciphernode_selector".to_string()); + /// ``` pub fn ciphernode_selector() -> String { String::from("//ciphernode_selector") } -} +} \ No newline at end of file diff --git a/crates/data/src/commit_log_event_log.rs b/crates/data/src/commit_log_event_log.rs index 8c20d0800c..baf6de7300 100644 --- a/crates/data/src/commit_log_event_log.rs +++ b/crates/data/src/commit_log_event_log.rs @@ -17,6 +17,19 @@ pub struct CommitLogEventLog { } impl CommitLogEventLog { + /// Creates a new CommitLogEventLog backed by a CommitLog stored at the given path. + /// + /// The created log uses configured CommitLog options (including a permissive + /// maximum message size) and returns an error if the underlying CommitLog + /// cannot be opened or created. + /// + /// # Examples + /// + /// ``` + /// use std::path::PathBuf; + /// let dir = tempfile::tempdir().unwrap(); + /// let log = CommitLogEventLog::new(&dir.path().to_path_buf()).unwrap(); + /// ``` pub fn new(path: &PathBuf) -> Result { let mut opts = LogOptions::new(path); // TODO: drive this from config - currently set high to be permissive @@ -27,6 +40,21 @@ impl CommitLogEventLog { } impl EventLog for CommitLogEventLog { + /// Appends an EnclaveEvent to the underlying commit log and returns its 1-indexed sequence number. + /// + /// The function serializes the provided event and stores it in the commit log. + /// + /// # Returns + /// + /// `u64` containing the 1-indexed sequence number assigned to the appended event. + /// + /// # Examples + /// + /// ``` + /// // assuming `log` is a mutable CommitLogEventLog and `event` is an EnclaveEvent + /// let seq = log.append(&event).unwrap(); + /// assert!(seq >= 1); + /// ``` fn append(&mut self, event: &EnclaveEvent) -> Result { let bytes = bincode::serialize(event)?; let offset = self @@ -37,6 +65,29 @@ impl EventLog for CommitLogEventLog { Ok(offset + 1) } + /// Reads events starting at a 1-indexed sequence number and returns an iterator over (sequence, event) pairs. + /// + /// The `from` parameter is a 1-indexed sequence number; reading begins from that sequence (or the start when `from` is 0). + /// The returned iterator yields tuples where the first element is the 1-indexed sequence number and the second is the deserialized `EnclaveEvent`. + /// Messages that fail deserialization are skipped. + /// + /// # Examples + /// + /// ``` + /// # use e3_events::{EnclaveEvent, Unsequenced}; + /// # use tempfile::tempdir; + /// # use std::path::PathBuf; + /// # // setup omitted: create CommitLogEventLog and append events + /// # let dir = tempdir().unwrap(); + /// # let path = dir.path().to_path_buf(); + /// # let mut log = CommitLogEventLog::new(&path).unwrap(); + /// # let e = EnclaveEvent::::new(vec![1u8], 0); + /// # log.append(&e).unwrap(); + /// let mut iter = log.read_from(1); + /// let results: Vec<(u64, EnclaveEvent)> = iter.collect(); + /// assert!(!results.is_empty()); + /// assert_eq!(results[0].0, 1); + /// ``` fn read_from(&self, from: u64) -> Box)>> { // Convert 1-indexed sequence to 0-indexed offset let mut current_offset = from.saturating_sub(1); @@ -143,4 +194,4 @@ mod tests { let events: Vec<_> = log.read_from(100).collect(); assert!(events.is_empty()); } -} +} \ No newline at end of file diff --git a/crates/data/src/data_store.rs b/crates/data/src/data_store.rs index b2df9273cf..b15d0284c6 100644 --- a/crates/data/src/data_store.rs +++ b/crates/data/src/data_store.rs @@ -22,6 +22,21 @@ pub enum StoreAddr { } impl StoreAddr { + /// Get the `InMemStore` actor address when this `StoreAddr` is the `InMem` variant. + /// + /// # Returns + /// + /// `Some(&Addr)` if the variant is `InMem`, `None` otherwise. + /// + /// # Examples + /// + /// ```no_run + /// # use crate::StoreAddr; + /// let store_addr = /* obtain a StoreAddr */ unimplemented!(); + /// if let Some(in_mem_addr) = store_addr.to_maybe_in_mem() { + /// // use the in-memory store actor address + /// } + /// ``` pub fn to_maybe_in_mem(&self) -> Option<&Addr> { match self { StoreAddr::InMem(ref store) => Some(store), @@ -135,6 +150,17 @@ impl DataStore { } } + /// Create a new DataStore whose scope is set to the given key while preserving the same store address and actor recipients. + /// + /// The returned DataStore shares the original instance's address and message recipients but uses `key` as its absolute scope (replacing the current scope). + /// + /// # Examples + /// + /// ```no_run + /// // Given an existing `DataStore` instance `ds`, create a new root store at "settings". + /// let child = ds.base("settings"); + /// assert_eq!(child.get_scope().unwrap(), "settings"); + /// ``` pub fn base(&self, key: K) -> Self { Self { addr: self.addr.clone(), @@ -146,6 +172,23 @@ impl DataStore { } } + /// Constructs a DataStore backed by the provided Sled store and wired to the given WriteBuffer. + /// + /// The resulting DataStore uses `addr` for reads, synchronous writes, and removals, and uses + /// `write_buffer` for buffered insert operations. The returned store has an empty scope. + /// + /// # Parameters + /// - `addr`: address of the SledStore actor. + /// - `write_buffer`: address of the WriteBuffer actor. + /// + /// # Examples + /// + /// ``` + /// // Given existing actix addresses `sled_addr: Addr` and `wb_addr: Addr`: + /// let ds = DataStore::from_sled_store(&sled_addr, &wb_addr); + /// // new DataStore starts with an empty scope + /// assert_eq!(ds.get_scope().unwrap(), ""); + /// ``` pub fn from_sled_store(addr: &Addr, write_buffer: &Addr) -> Self { Self { addr: StoreAddr::Sled(addr.clone()), @@ -157,6 +200,20 @@ impl DataStore { } } + /// Creates a DataStore configured to use the provided in-memory store and write buffer. + /// + /// `addr` is the actor address of the InMemStore used for reads, synchronous inserts, and removals. + /// `write_buffer` is the actor address used for buffered (asynchronous) inserts. + /// + /// Returns a DataStore that uses the given in-memory store for direct operations and the write buffer + /// for buffered insertions. The returned DataStore has an empty initial scope. + /// + /// # Examples + /// + /// ``` + /// // assume `in_mem_addr` and `write_buffer_addr` are available Addr<...> values + /// let ds = DataStore::from_in_mem(&in_mem_addr, &write_buffer_addr); + /// ``` pub fn from_in_mem(addr: &Addr, write_buffer: &Addr) -> Self { Self { addr: StoreAddr::InMem(addr.clone()), @@ -193,4 +250,4 @@ impl From<&Addr> for DataStore { scope: vec![], } } -} +} \ No newline at end of file diff --git a/crates/data/src/events.rs b/crates/data/src/events.rs index 6ecfc25d40..fa3310cd3e 100644 --- a/crates/data/src/events.rs +++ b/crates/data/src/events.rs @@ -12,14 +12,42 @@ use anyhow::Result; #[rtype(result = "()")] pub struct Insert(pub Vec, pub Vec); impl Insert { + /// Creates a new message containing the provided key and value, converting the key to a `Vec` via `IntoKey`. + /// + /// # Examples + /// + /// ``` + /// let msg = Insert::new("my-key", vec![1, 2, 3]); + /// assert_eq!(msg.key(), &"my-key".into_key()); + /// assert_eq!(msg.value(), &vec![1, 2, 3]); + /// ``` pub fn new(key: K, value: Vec) -> Self { Self(key.into_key(), value) } + /// Get a reference to the key bytes stored in the message. + /// + /// # Examples + /// + /// ``` + /// // construct an Insert message and borrow its key + /// let msg = crate::events::Insert(Vec::from(b"key".as_slice()), Vec::from(b"val".as_slice())); + /// assert_eq!(msg.key(), &Vec::from(b"key".as_slice())); + /// ``` pub fn key(&self) -> &Vec { &self.0 } + /// Accesses the stored value bytes for this message. + /// + /// # Examples + /// + /// ``` + /// use crate::events::Insert; + /// + /// let msg = Insert(vec![b'k'], vec![1, 2, 3]); + /// assert_eq!(msg.value(), &vec![1, 2, 3]); + /// ``` pub fn value(&self) -> &Vec { &self.1 } @@ -29,10 +57,31 @@ impl Insert { #[rtype(result = "()")] pub struct InsertBatch(pub Vec); impl InsertBatch { + /// Creates an `InsertBatch` containing the provided insert commands. + /// + /// # Examples + /// + /// ``` + /// let cmd = Insert::new("key", vec![1, 2, 3]); + /// let batch = InsertBatch::new(vec![cmd.clone()]); + /// assert_eq!(batch.commands().len(), 1); + /// assert_eq!(batch.commands()[0], cmd); + /// ``` pub fn new(commands: Vec) -> Self { Self(commands) } + /// Accesses the batch's insert commands. + /// + /// Returns a reference to the vector of `Insert` commands contained in this batch. + /// + /// # Examples + /// + /// ``` + /// let cmd = Insert::new("key", vec![1, 2, 3]); + /// let batch = InsertBatch::new(vec![cmd.clone()]); + /// assert_eq!(batch.commands(), &vec![cmd]); + /// ``` pub fn commands(&self) -> &Vec { &self.0 } @@ -42,20 +91,58 @@ impl InsertBatch { #[rtype(result = "Result<()>")] pub struct InsertSync(pub Vec, pub Vec); impl InsertSync { + /// Creates a new message containing the provided key and value, converting the key to a `Vec` via `IntoKey`. + /// + /// # Examples + /// + /// ``` + /// let msg = Insert::new("my-key", vec![1, 2, 3]); + /// assert_eq!(msg.key(), &"my-key".into_key()); + /// assert_eq!(msg.value(), &vec![1, 2, 3]); + /// ``` pub fn new(key: K, value: Vec) -> Self { Self(key.into_key(), value) } + /// Get a reference to the key bytes stored in the message. + /// + /// # Examples + /// + /// ``` + /// // construct an Insert message and borrow its key + /// let msg = crate::events::Insert(Vec::from(b"key".as_slice()), Vec::from(b"val".as_slice())); + /// assert_eq!(msg.key(), &Vec::from(b"key".as_slice())); + /// ``` pub fn key(&self) -> &Vec { &self.0 } + /// Accesses the stored value bytes for this message. + /// + /// # Examples + /// + /// ``` + /// use crate::events::Insert; + /// + /// let msg = Insert(vec![b'k'], vec![1, 2, 3]); + /// assert_eq!(msg.value(), &vec![1, 2, 3]); + /// ``` pub fn value(&self) -> &Vec { &self.1 } } impl From for Insert { + /// Converts an `InsertSync` into an `Insert`, preserving the key and cloning the value. + /// + /// # Examples + /// + /// ``` + /// let sync = InsertSync::new("k", vec![1, 2, 3]); + /// let insert: Insert = Insert::from(sync); + /// assert_eq!(insert.key(), &b"k".to_vec()); + /// assert_eq!(insert.value(), &vec![1, 2, 3]); + /// ``` fn from(value: InsertSync) -> Self { Insert::new(value.key(), value.value().clone()) } @@ -65,10 +152,29 @@ impl From for Insert { #[rtype(result = "Option>")] pub struct Get(pub Vec); impl Get { + /// Creates a new `Get` message from a key convertible via `IntoKey`. + /// + /// The provided `key` is converted into a `Vec` using `IntoKey` and stored inside the message. + /// + /// # Examples + /// + /// ``` + /// let msg = Get::new("my-key"); + /// assert_eq!(msg.key(), &b"my-key".to_vec()); + /// ``` pub fn new(key: K) -> Self { Self(key.into_key()) } + /// Get a reference to the key bytes stored in the message. + /// + /// # Examples + /// + /// ``` + /// // construct an Insert message and borrow its key + /// let msg = crate::events::Insert(Vec::from(b"key".as_slice()), Vec::from(b"val".as_slice())); + /// assert_eq!(msg.key(), &Vec::from(b"key".as_slice())); + /// ``` pub fn key(&self) -> &Vec { &self.0 } @@ -78,11 +184,30 @@ impl Get { #[rtype(result = "()")] pub struct Remove(pub Vec); impl Remove { + /// Creates a new `Get` message from a key convertible via `IntoKey`. + /// + /// The provided `key` is converted into a `Vec` using `IntoKey` and stored inside the message. + /// + /// # Examples + /// + /// ``` + /// let msg = Get::new("my-key"); + /// assert_eq!(msg.key(), &b"my-key".to_vec()); + /// ``` pub fn new(key: K) -> Self { Self(key.into_key()) } + /// Get a reference to the key bytes stored in the message. + /// + /// # Examples + /// + /// ``` + /// // construct an Insert message and borrow its key + /// let msg = crate::events::Insert(Vec::from(b"key".as_slice()), Vec::from(b"val".as_slice())); + /// assert_eq!(msg.key(), &Vec::from(b"key".as_slice())); + /// ``` pub fn key(&self) -> &Vec { &self.0 } -} +} \ No newline at end of file diff --git a/crates/data/src/in_mem.rs b/crates/data/src/in_mem.rs index ed0f52f60f..33a6f0ad3f 100644 --- a/crates/data/src/in_mem.rs +++ b/crates/data/src/in_mem.rs @@ -46,6 +46,19 @@ impl InMemStore { bincode::serialize(&self.db.clone()).context("Error serializing BTreeMap") } + /// Construct an InMemStore from a bincode-serialized database. + /// + /// Attempts to deserialize `db` as a `BTreeMap, Vec>`. On success returns an `InMemStore` containing the deserialized map, the provided `capture` flag, and an empty operation log. Returns an error with context if deserialization fails. + /// + /// # Examples + /// + /// ``` + /// // Prepare a serialized empty DB for the example + /// let bytes = bincode::serialize(&std::collections::BTreeMap::, Vec>::new()).unwrap(); + /// let store = from_dump(bytes, false).unwrap(); + /// // `store` is constructed successfully on successful deserialization + /// let _ = store; + /// ``` pub fn from_dump(db: Vec, capture: bool) -> anyhow::Result { Ok(Self { db: bincode::deserialize(&db).context("Error deserializing BTreeMap")?, @@ -61,6 +74,22 @@ impl InMemStore { impl Handler for InMemStore { type Result = (); + /// Stores the provided insert event's key and value in the in-memory database and, if capture is enabled, records the insert operation in the store's operation log. + /// + /// # Examples + /// + /// ``` + /// # use crates::data::in_mem::{InMemStore, Insert, Get}; + /// # use actix::Context; + /// let mut store = InMemStore::new(true); + /// let msg = Insert::new(b"key".to_vec(), b"value".to_vec()); + /// // dispatch the insert handler (context value is not used by the handler) + /// store.handle(msg.clone(), &mut Context::new()); + /// // the value is stored + /// assert_eq!(store.db.get(b"key" as &[u8]), Some(&b"value".to_vec())); + /// // the operation is recorded when capture is enabled + /// assert!(matches!(store.log.last(), Some(crate::DataOp::Insert(_)))); + /// ``` fn handle(&mut self, event: Insert, _: &mut Self::Context) { // insert data into sled self.db.insert(event.key().to_vec(), event.value().to_vec()); @@ -73,6 +102,30 @@ impl Handler for InMemStore { impl Handler for InMemStore { type Result = (); + /// Handles a batch insert message by inserting each command's key/value into the in-memory store + /// and appending insert operations to the store's log when capture is enabled. + /// + /// # Examples + /// + /// ``` + /// use crates::data::in_mem::{InMemStore, Insert, InsertBatch, DataOp}; + /// + /// // prepare store and batch + /// let mut store = InMemStore::new(true); + /// let cmd = Insert::new(b"key".to_vec(), b"val".to_vec()); + /// let batch = InsertBatch::from_vec(vec![cmd.clone()]); + /// + /// // emulate handler behavior: insert batch commands into the store + /// for cmd in batch.commands() { + /// store.db.insert(cmd.key().to_owned(), cmd.value().to_owned()); + /// if store.capture { + /// store.log.push(DataOp::Insert(cmd.clone())); + /// } + /// } + /// + /// assert_eq!(store.db.get(b"key".as_ref()).map(|v| v.as_slice()), Some(b"val".as_ref())); + /// assert_eq!(store.log.len(), 1); + /// ``` fn handle(&mut self, msg: InsertBatch, _: &mut Self::Context) -> Self::Result { for cmd in msg.commands() { self.db.insert(cmd.key().to_owned(), cmd.value().to_owned()); @@ -109,6 +162,21 @@ impl Handler for InMemStore { impl Handler for InMemStore { type Result = Option>; + /// Retrieves the value associated with the `Get` message's key from the in-memory database. + /// + /// # Returns + /// + /// `Some(Vec)` containing the stored value if the key exists, `None` otherwise. + /// + /// # Examples + /// + /// ``` + /// use std::collections::BTreeMap; + /// let mut db: BTreeMap, Vec> = BTreeMap::new(); + /// db.insert(b"key".to_vec(), b"value".to_vec()); + /// let result = db.get(&b"key".to_vec()).cloned(); + /// assert_eq!(result, Some(b"value".to_vec())); + /// ``` fn handle(&mut self, event: Get, _: &mut Self::Context) -> Option> { let key = event.key(); let r = self.db.get(key); @@ -128,4 +196,4 @@ impl Handler for InMemStore { fn handle(&mut self, _: GetDump, _: &mut Self::Context) -> Self::Result { self.get_dump() } -} +} \ No newline at end of file diff --git a/crates/data/src/in_mem_event_log.rs b/crates/data/src/in_mem_event_log.rs index 9b95398919..21dfcdfb81 100644 --- a/crates/data/src/in_mem_event_log.rs +++ b/crates/data/src/in_mem_event_log.rs @@ -12,18 +12,55 @@ pub struct InMemEventLog { } impl InMemEventLog { + /// Creates a new, empty in-memory event log. + /// + /// # Examples + /// + /// ``` + /// let log = InMemEventLog::new(); + /// assert!(log.read_from(1).next().is_none()); + /// ``` pub fn new() -> Self { Self { log: Vec::new() } } } impl Default for InMemEventLog { + /// Creates a default, empty `InMemEventLog`. + /// + /// # Examples + /// + /// ``` + /// let log = InMemEventLog::default(); + /// assert!(log.read_from(1).collect::>().is_empty()); + /// ``` fn default() -> Self { Self::new() } } impl EventLog for InMemEventLog { + /// Reads events starting at a 1-based sequence offset. + /// + /// `from` is a 1-based index indicating the first event sequence number to return. The + /// returned iterator yields pairs `(sequence_number, event)` where `sequence_number` is + /// the 1-based position of the event in the log. If `from` is greater than the number + /// of stored events, the iterator is empty. + /// + /// # Examples + /// + /// ``` + /// let mut log = InMemEventLog::new(); + /// let e1 = event_from("a"); + /// let e2 = event_from("b"); + /// log.append(&e1).unwrap(); + /// log.append(&e2).unwrap(); + /// + /// let events: Vec<_> = log.read_from(1).collect(); + /// assert_eq!(events.len(), 2); + /// assert_eq!(events[0].0, 1); + /// assert_eq!(events[1].0, 2); + /// ``` fn read_from(&self, from: u64) -> Box)>> { // Convert 1-indexed sequence to 0-indexed array position let start_idx = from.saturating_sub(1) as usize; @@ -37,6 +74,18 @@ impl EventLog for InMemEventLog { .collect(); Box::new(events.into_iter()) } + /// Appends an event to the in-memory log and returns its 1-based sequence number. + /// + /// # Examples + /// + /// ``` + /// let mut log = InMemEventLog::new(); + /// let ev = /* construct an EnclaveEvent */ ; + /// let seq = log.append(&ev).unwrap(); + /// assert_eq!(seq, 1); + /// ``` + /// + /// Returns the 1-based sequence number assigned to the appended event. fn append(&mut self, event: &EnclaveEvent) -> Result { self.log.push(event.to_owned()); Ok(self.log.len() as u64) @@ -48,6 +97,22 @@ mod tests { use super::*; use e3_events::{EnclaveEventData, EventConstructorWithTimestamp, TestEvent}; + /// Create an `EnclaveEvent` from the given data with a fixed timestamp of 123. + /// + /// # Examples + /// + /// ``` + /// let ev = event_from("payload"); + /// // `ev` is an `EnclaveEvent` whose timestamp is 123. + /// ``` + /// + /// # Parameters + /// + /// - `data`: A value convertible into `EnclaveEventData`. + /// + /// # Returns + /// + /// An `EnclaveEvent` containing the provided data and timestamp 123. fn event_from(data: impl Into) -> EnclaveEvent { EnclaveEvent::::new_with_timestamp(data.into().into(), 123) } @@ -110,4 +175,4 @@ mod tests { let events: Vec<_> = log.read_from(100).collect(); assert!(events.is_empty()); } -} +} \ No newline at end of file diff --git a/crates/data/src/in_mem_sequence_index.rs b/crates/data/src/in_mem_sequence_index.rs index 675f18070d..e893c48f12 100644 --- a/crates/data/src/in_mem_sequence_index.rs +++ b/crates/data/src/in_mem_sequence_index.rs @@ -13,6 +13,15 @@ pub struct InMemSequenceIndex { } impl InMemSequenceIndex { + /// Creates a new InMemSequenceIndex with an empty in-memory index. + /// + /// # Examples + /// + /// ``` + /// let idx = InMemSequenceIndex::new(); + /// // new index contains no entries + /// assert!(idx.get(1).unwrap().is_none()); + /// ``` pub fn new() -> Self { Self { index: BTreeMap::new(), @@ -21,15 +30,54 @@ impl InMemSequenceIndex { } impl SequenceIndex for InMemSequenceIndex { + /// Finds the value for the smallest stored key that is greater than or equal to `key`. + /// + /// Returns the value associated with the smallest stored key >= `key`, or `None` if no such key exists. + /// + /// # Examples + /// + /// ``` + /// let mut idx = InMemSequenceIndex::new(); + /// idx.insert(100, 1).unwrap(); + /// idx.insert(200, 2).unwrap(); + /// assert_eq!(idx.seek(50).unwrap(), Some(1)); + /// assert_eq!(idx.seek(150).unwrap(), Some(2)); + /// assert_eq!(idx.seek(999).unwrap(), None); + /// ``` fn seek(&self, key: u128) -> Result> { Ok(self.index.range(key..).next().map(|(_, &v)| v)) } + /// Inserts or updates the in-memory mapping from a 128-bit key to a 64-bit sequence index. + /// + /// The method stores the provided `value` under `key`, replacing any existing entry. + /// + /// # Examples + /// + /// ``` + /// let mut idx = InMemSequenceIndex::new(); + /// idx.insert(100u128, 1u64).unwrap(); + /// assert_eq!(idx.get(100u128).unwrap(), Some(1)); + /// ``` fn insert(&mut self, key: u128, value: u64) -> Result<()> { self.index.insert(key, value); Ok(()) } + /// Retrieves the sequence value associated with the exact `key`, if present. + /// + /// # Returns + /// + /// `Some(value)` containing the stored sequence value when the key exists, `None` otherwise. + /// + /// # Examples + /// + /// ``` + /// let mut idx = InMemSequenceIndex::new(); + /// idx.insert(100, 1).unwrap(); + /// assert_eq!(idx.get(100).unwrap(), Some(1)); + /// assert_eq!(idx.get(50).unwrap(), None); + /// ``` fn get(&self, key: u128) -> Result> { Ok(self.index.get(&key).copied()) } @@ -60,4 +108,4 @@ mod tests { assert_eq!(index.seek(999).unwrap(), None); } -} +} \ No newline at end of file diff --git a/crates/data/src/into_key.rs b/crates/data/src/into_key.rs index 7c1c5e1213..f20f2381a1 100644 --- a/crates/data/src/into_key.rs +++ b/crates/data/src/into_key.rs @@ -53,6 +53,19 @@ impl IntoKey for &String { /// Keys can be &str impl<'a> IntoKey for &'a str { + /// Produce a UTF-8 byte sequence from a borrowed string. + /// + /// # Examples + /// + /// ``` + /// let s = String::from("hello"); + /// let key = (&s).into_key(); + /// assert_eq!(key, b"hello".to_vec()); + /// ``` + /// + /// # Returns + /// + /// A byte vector containing the UTF-8 encoding of the string. fn into_key(self) -> Vec { self.as_bytes().to_vec() } @@ -60,8 +73,18 @@ impl<'a> IntoKey for &'a str { /// Keys can be u128 impl IntoKey for u128 { + /// Converts the integer into a big-endian byte vector suitable for use as a key. + /// + /// The returned bytes are in big-endian order to preserve numeric ordering when compared lexicographically. + /// + /// # Examples + /// + /// ``` + /// let key = 42u128.into_key(); + /// assert_eq!(key, 42u128.to_be_bytes().to_vec()); + /// ``` fn into_key(self) -> Vec { // Ensuring big endian for ordering self.to_be_bytes().to_vec() } -} +} \ No newline at end of file diff --git a/crates/data/src/sled_db.rs b/crates/data/src/sled_db.rs index 641ea39797..43df63b853 100644 --- a/crates/data/src/sled_db.rs +++ b/crates/data/src/sled_db.rs @@ -18,15 +18,50 @@ pub struct SledDb { } impl SledDb { + /// Opens or creates a sled tree at the given filesystem path and returns a `SledDb` wrapping it. + /// + /// Errors if the underlying database tree cannot be opened or created. + /// + /// # Examples + /// + /// ``` + /// use std::path::PathBuf; + /// let path = std::env::temp_dir().join("sled_db_example"); + /// let db = SledDb::new(&path, "example_tree").unwrap(); + /// ``` pub fn new(path: &PathBuf, tree: &str) -> Result { let db = get_or_open_db_tree(path, tree)?; Ok(Self { db }) } + /// Clears all cached sled database trees and connections. + /// + /// This closes or clears any in-memory caches used to reuse opened sled trees across the process. + /// + /// # Examples + /// + /// ``` + /// // Close and clear all cached sled connections. + /// close_all_connections(); + /// ``` pub fn close_all_connections() { clear_all_caches() } + /// Inserts a key/value pair into the underlying sled tree. + /// + /// On success returns `Ok(())`. On failure returns an error with context "Could not insert data into db". + /// + /// # Examples + /// + /// ```no_run + /// # use std::path::PathBuf; + /// # use crate::sled_db::SledDb; + /// # use crate::Insert; + /// let mut db = SledDb::new(&PathBuf::from("/tmp/db"), "default").unwrap(); + /// let msg = Insert::from_parts("my_key".into(), b"my_value".to_vec()); + /// db.insert(msg).unwrap(); + /// ``` pub fn insert(&mut self, msg: Insert) -> Result<()> { self.db .insert(msg.key(), msg.value().to_vec()) @@ -35,6 +70,31 @@ impl SledDb { Ok(()) } + /// Inserts multiple key/value pairs into the database atomically. + /// + /// The provided `msgs` are written inside a single transaction so either all inserts succeed or none are applied. + /// + /// # Parameters + /// + /// - `msgs`: a slice of `Insert` messages whose `key()` and `value()` are stored. + /// + /// # Returns + /// + /// `Ok(())` on success, or an error with context `"Could not insert batch data into db"` on failure. + /// + /// # Examples + /// + /// ``` + /// use crate::{SledDb, Insert}; + /// use std::path::PathBuf; + /// + /// let mut db = SledDb::new(&PathBuf::from("/tmp/example"), "tree").unwrap(); + /// let batch = vec![ + /// Insert::new("k1".into(), b"v1".to_vec()), + /// Insert::new("k2".into(), b"v2".to_vec()), + /// ]; + /// db.insert_batch(&batch).unwrap(); + /// ``` pub fn insert_batch(&mut self, msgs: &Vec) -> Result<()> { self.db .transaction(|tx_db| { @@ -47,6 +107,17 @@ impl SledDb { Ok(()) } + /// Removes the entry identified by the given message's key from the database. + /// + /// The provided `msg` supplies the key to delete; if the key exists it will be removed. + /// + /// # Examples + /// + /// ```no_run + /// let mut db = SledDb::new(&path, "default").unwrap(); + /// let remove_msg = Remove::new("my-key"); + /// db.remove(remove_msg).unwrap(); + /// ``` pub fn remove(&mut self, msg: Remove) -> Result<()> { self.db .remove(msg.key()) @@ -54,6 +125,28 @@ impl SledDb { Ok(()) } + /// Fetches the value for the given key from the database. + /// + /// Returns `Some(Vec)` containing the value if the key exists, `None` if the key is not present. + /// Any underlying I/O or database error is returned as an `Err` with context that includes the key. + /// + /// # Examples + /// + /// ```ignore + /// use std::path::PathBuf; + /// use crate::SledDb; + /// use crate::Get; + /// + /// let path = PathBuf::from("/tmp/my_db"); + /// let mut db = SledDb::new(&path, "default").unwrap(); + /// // assume an Insert has been stored under key b"foo" + /// let val = db.get(Get::new(b"foo".to_vec())).unwrap(); + /// if let Some(bytes) = val { + /// assert_eq!(bytes, b"expected value".to_vec()); + /// } else { + /// // key not found + /// } + /// ``` pub fn get(&self, event: Get) -> Result>> { let key = event.key(); let str_key = String::from_utf8_lossy(&key).into_owned(); @@ -70,6 +163,24 @@ impl SledDb { mod tests { use super::*; + /// Verifies that SledDb instances share data when opened on the same path and remain isolated across different paths. + /// + /// This test exercises cross-instance visibility (writes from one instance are readable by another when using the same path/tree) + /// and ensures separate paths do not share data. + /// + /// # Examples + /// + /// ``` + /// use tempfile::tempdir; + /// let temp_dir = tempdir().unwrap(); + /// let db_path = temp_dir.path().join("test_cache.db"); + /// + /// let mut db1 = SledDb::new(&db_path, "datastore").unwrap(); + /// db1.insert(Insert::new(b"test_key".to_vec(), b"test_value".to_vec())).unwrap(); + /// + /// let mut db2 = SledDb::new(&db_path, "datastore").unwrap(); + /// assert_eq!(db2.get(Get::new(b"test_key".to_vec())).unwrap().unwrap(), b"test_value".to_vec()); + /// ``` #[test] fn test_sled_db_caching() -> Result<()> { use tempfile::tempdir; @@ -179,4 +290,4 @@ mod tests { Ok(()) } -} +} \ No newline at end of file diff --git a/crates/data/src/sled_sequence_index.rs b/crates/data/src/sled_sequence_index.rs index 4922fb4600..f851bfa5cd 100644 --- a/crates/data/src/sled_sequence_index.rs +++ b/crates/data/src/sled_sequence_index.rs @@ -17,17 +17,53 @@ pub struct SledSequenceIndex { } impl SledSequenceIndex { + /// Creates a new SledSequenceIndex by opening or creating the specified sled tree. + /// + /// The `path` identifies the database directory and `tree` is the name of the sled tree to open. + /// Returns an error if the underlying database tree cannot be opened or created. + /// + /// # Examples + /// + /// ``` + /// use tempfile::tempdir; + /// use std::path::PathBuf; + /// let dir = tempdir().unwrap(); + /// let path = dir.path().to_path_buf(); + /// let idx = sled_sequence_index::SledSequenceIndex::new(&path, "test_tree").unwrap(); + /// ``` pub fn new(path: &PathBuf, tree: &str) -> Result { let db = get_or_open_db_tree(path, tree)?; Ok(Self { db }) } + /// Closes all cached sled database connections and releases related resources. + /// + /// # Examples + /// + /// ``` + /// SledSequenceIndex::close_all_connections(); + /// ``` pub fn close_all_connections() { clear_all_caches() } } impl SequenceIndex for SledSequenceIndex { + /// Fetches the sequence value stored for the given 128-bit key, decoding the stored bytes as a big-endian `u64`. + /// + /// Returns `Some(u64)` when a value is found for `key`, `None` when the key is absent. + /// Returns an error if the database operation fails or if the stored value cannot be converted into an 8-byte big-endian `u64`. + /// + /// # Examples + /// + /// ``` + /// # use tempfile::tempdir; + /// # use crate::sled_sequence_index::SledSequenceIndex; + /// let dir = tempdir().unwrap(); + /// let mut idx = SledSequenceIndex::new(&dir.path().to_path_buf(), "test_tree").unwrap(); + /// idx.insert(100u128, 1u64).unwrap(); + /// assert_eq!(idx.get(100u128).unwrap(), Some(1)); + /// ``` fn get(&self, key: u128) -> Result> { self.db .get(key.to_be_bytes().to_vec()) @@ -36,6 +72,34 @@ impl SequenceIndex for SledSequenceIndex { .transpose() } + /// Inserts a mapping from `key` to `value` into the underlying sled tree. + /// + /// The `key` is stored as a big-endian `u128` byte sequence and the `value` is stored as a + /// big-endian `u64` byte sequence. + /// + /// # Parameters + /// + /// - `key`: Sequence key to insert, encoded as big-endian bytes for storage. + /// - `value`: Value to associate with `key`, encoded as big-endian bytes for storage. + /// + /// # Returns + /// + /// `Ok(())` on success, or an error with context "Failed to insert key: {key}" if the insert fails. + /// + /// # Examples + /// + /// ``` + /// use tempfile::tempdir; + /// use std::path::PathBuf; + /// use crate::SledSequenceIndex; + /// + /// let dir = tempdir().unwrap(); + /// let path: PathBuf = dir.path().to_path_buf(); + /// let mut idx = SledSequenceIndex::new(&path, "doc_tree").unwrap(); + /// + /// idx.insert(42u128, 7u64).unwrap(); + /// assert_eq!(idx.get(42u128).unwrap(), Some(7u64)); + /// ``` fn insert(&mut self, key: u128, value: u64) -> Result<()> { self.db .insert(key.to_be_bytes().to_vec(), value.to_be_bytes().to_vec()) @@ -43,6 +107,21 @@ impl SequenceIndex for SledSequenceIndex { Ok(()) } + /// Finds the stored sequence value for the first key at or after `key`. + /// + /// Returns the value associated with the smallest stored key greater than or equal to `key`, or `None` if no such key exists. + /// + /// # Errors + /// + /// Returns an error if the database range query fails or if a found value cannot be converted into an 8-byte `u64`. + /// + /// # Examples + /// + /// ``` + /// // assuming `idx` is a `SledSequenceIndex` with entries 100 -> 1, 200 -> 2 + /// let found = idx.seek(150).unwrap(); + /// assert_eq!(found, Some(2)); + /// ``` fn seek(&self, key: u128) -> Result> { let key_bytes = key.to_be_bytes(); self.db @@ -86,4 +165,4 @@ mod tests { // After all keys assert_eq!(index.seek(999).unwrap(), None); } -} +} \ No newline at end of file diff --git a/crates/data/src/sled_store.rs b/crates/data/src/sled_store.rs index 2deaab949c..2cd9ab7eaf 100644 --- a/crates/data/src/sled_store.rs +++ b/crates/data/src/sled_store.rs @@ -21,6 +21,17 @@ impl Actor for SledStore { } impl SledStore { + /// Creates and starts a SledStore actor backed by a SledDb at the given file system path. + /// + /// The actor is started immediately and subscribed to the bus "Shutdown" topic so it will + /// drop its database and stop when a shutdown event is published. + /// + /// # Examples + /// + /// ``` + /// // Assume `bus` and `path` are available in scope. + /// let addr = SledStore::new(&bus, &path).unwrap(); + /// ``` pub fn new(bus: &BusHandle, path: &PathBuf) -> Result> { info!("Starting SledStore with {:?}", path); let db = SledDb::new(path, "datastore")?; @@ -40,6 +51,18 @@ impl SledStore { impl Handler for SledStore { type Result = (); + /// Handles an `Insert` message by writing the provided data to the backing database if available. + /// + /// If the store has an open database connection, attempts to insert the event into it. + /// On insertion error, reports the error on the store's bus with `EType::Data`. Does nothing + /// when the database is absent or the insertion succeeds. + /// + /// # Examples + /// + /// ```no_run + /// // Given `store: SledStore` and `ctx: ::Context` + /// // store.handle(Insert::new(key, value), &mut ctx); + /// ``` fn handle(&mut self, event: Insert, _: &mut Self::Context) -> Self::Result { if let Some(ref mut db) = &mut self.db { match db.insert(event) { @@ -53,6 +76,13 @@ impl Handler for SledStore { impl Handler for SledStore { type Result = (); + /// Handles an InsertBatch event by inserting its commands into the underlying database and reporting any insertion errors to the bus. + /// + /// The handler is a no-op when the database is not present. + /// + /// # Parameters + /// + /// - `event`: an `InsertBatch` containing the commands to insert; errors from the insertion are forwarded to the bus as `EType::Data`. fn handle(&mut self, event: InsertBatch, _: &mut Self::Context) -> Self::Result { if let Some(ref mut db) = &mut self.db { match db.insert_batch(event.commands()) { @@ -108,10 +138,24 @@ impl Handler for SledStore { } impl Handler for SledStore { type Result = (); + /// Handles enclave lifecycle events for the actor. + /// + /// When the received `EnclaveEvent` carries `EnclaveEventData::Shutdown`, this handler drops + /// the actor's database connection (if any) and stops the actor's context. For any other + /// event data, no action is taken. + /// + /// # Examples + /// + /// ``` + /// // When an `EnclaveEvent` with `EnclaveEventData::Shutdown` is delivered to the actor, + /// // the actor will drop its DB and stop: + /// // let evt = EnclaveEvent::new(..., EnclaveEventData::Shutdown(...)); + /// // sled_store.handle(evt, &mut ctx); + /// ``` fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { if let EnclaveEventData::Shutdown(_) = msg.get_data() { let _db = self.db.take(); // db will be dropped ctx.stop() } } -} +} \ No newline at end of file diff --git a/crates/data/src/sled_utils.rs b/crates/data/src/sled_utils.rs index a894d2c0c9..fa7ad82a3f 100644 --- a/crates/data/src/sled_utils.rs +++ b/crates/data/src/sled_utils.rs @@ -20,6 +20,29 @@ pub static SLED_CACHE: Lazy>>> = // Returns a stable canonical string path used as a cache key. // Canonicalizes the parent directory if the target path does not yet exist. +/// Produce a stable string key for a filesystem path suitable for cache indexing. +/// +/// If the given path exists, its canonical form is used. If the path does not exist, +/// the parent directory is canonicalized (or "." if no parent) and the path's final +/// component (file name) is appended to that canonical parent to form the key. The +/// result is returned as an owned `String`. +/// +/// # Parameters +/// +/// - `path`: the filesystem path to convert into a stable key. +/// +/// # Returns +/// +/// An owned string representing a stable, canonicalized key for `path`. +/// +/// # Examples +/// +/// ``` +/// use std::path::PathBuf; +/// let p = PathBuf::from("some/nonexistent/path.txt"); +/// let key = canonical_key(&p); +/// assert!(key.ends_with("path.txt")); +/// ``` fn canonical_key(path: &PathBuf) -> String { if path.exists() { return path @@ -39,6 +62,17 @@ fn canonical_key(path: &PathBuf) -> String { // Opens or retrieves a cached sled database for the given path. // Prevents conflicts by ensuring only a single connection was open to a db file at once per process. // Ensures the directory exists and stabilizes the canonical key across OSes. +/// Opens a sled database at the given path or returns a cached handle for that path. +/// +/// Ensures the target directory exists, reuses a previously opened database for the same +/// canonical path when available, and logs whether the database was created or recovered. +/// +/// # Examples +/// +/// ```no_run +/// use std::path::PathBuf; +/// // let db = get_or_open_db(&PathBuf::from("my_db")).unwrap(); +/// ``` fn get_or_open_db(path: &PathBuf) -> Result { let _ = std::fs::create_dir_all(path); let key = canonical_key(path); @@ -62,12 +96,43 @@ fn get_or_open_db(path: &PathBuf) -> Result { Ok(db) } +/// Open or create the named sled `Tree` in the database at `path`. +/// +/// The returned tree is created if it does not already exist. +/// +/// # Examples +/// +/// ```no_run +/// use std::path::PathBuf; +/// +/// let path = PathBuf::from("my_db"); +/// let tree = get_or_open_db_tree(&path, "values").unwrap(); +/// // use `tree`... +/// ``` +/// +/// # Returns +/// +/// The `Tree` for the specified name. pub fn get_or_open_db_tree(path: &PathBuf, tree: &str) -> Result { let db = get_or_open_db(path)?; Ok(db.open_tree(tree)?) } +/// Clears the process-global in-memory cache of opened sled databases. +/// +/// This removes all entries from the global cache, dropping the stored `sled::Db` handles. +/// +/// # Examples +/// +/// ``` +/// use std::sync::MutexGuard; +/// // Call to clear the cache +/// clear_all_caches(); +/// // Verify the cache is empty +/// let guard = SLED_CACHE.lock().unwrap(); +/// assert!(guard.is_empty()); +/// ``` pub fn clear_all_caches() { let mut cache_lock = SLED_CACHE.lock().unwrap(); cache_lock.clear(); -} +} \ No newline at end of file diff --git a/crates/data/src/write_buffer.rs b/crates/data/src/write_buffer.rs index c3d39afdb0..7c192f3980 100644 --- a/crates/data/src/write_buffer.rs +++ b/crates/data/src/write_buffer.rs @@ -19,6 +19,19 @@ impl Actor for WriteBuffer { } impl WriteBuffer { + /// Creates a new WriteBuffer with no destination and an empty buffer. + /// + /// # Examples + /// + /// ``` + /// let wb = crate::write_buffer::WriteBuffer::new(); + /// assert!(wb.dest.is_none()); + /// assert!(wb.buffer.is_empty()); + /// ``` + /// + /// # Returns + /// + /// A `WriteBuffer` with `dest` set to `None` and an empty `buffer`. pub fn new() -> Self { Self { dest: None, @@ -29,6 +42,15 @@ impl WriteBuffer { impl Handler for WriteBuffer { type Result = (); + /// Sets the destination recipient for future insert batches. + /// + /// This handler updates the write buffer's internal destination so subsequent + /// commit operations will forward buffered `InsertBatch` messages to that + /// recipient. + /// + /// # Parameters + /// + /// * `msg` - A `ForwardTo` message containing the `Recipient` to use. fn handle(&mut self, msg: ForwardTo, _: &mut Self::Context) -> Self::Result { self.dest = Some(msg.dest()) } @@ -37,6 +59,18 @@ impl Handler for WriteBuffer { impl Handler for WriteBuffer { type Result = (); + /// Enqueues an `Insert` message into the actor's in-memory buffer for later batching. + /// + /// This does not send or commit the insert; buffered items are forwarded when a `CommitSnapshot` is handled. + /// + /// # Examples + /// + /// ``` + /// // Illustrative example — actual `Insert` constructor may differ in your codebase. + /// let mut wb = WriteBuffer::new(); + /// let insert = Insert::new("key", b"value".to_vec()); + /// wb.handle(insert, &mut wb_actor_context()); // after this, `wb.buffer` contains the insert + /// ``` fn handle(&mut self, msg: Insert, _: &mut Self::Context) -> Self::Result { self.buffer.push(msg); } @@ -45,6 +79,27 @@ impl Handler for WriteBuffer { impl Handler for WriteBuffer { type Result = (); + /// Flushes buffered inserts on a commit by sending them as an InsertBatch to the configured destination. + /// + /// If a destination recipient is set and the buffer contains any `Insert` items, this handler: + /// - takes the current buffer contents, + /// - appends an `Insert` with key `"//seq"` and the commit sequence encoded as big-endian bytes, + /// - constructs an `InsertBatch` from those inserts, + /// - sends the batch to the destination, and + /// - leaves the internal buffer empty. + /// + /// The handler does nothing if no destination is configured or if the buffer is empty. + /// + /// # Examples + /// + /// ```ignore + /// // Example (illustrative): + /// // let mut wb = WriteBuffer::new(); + /// // wb.handle(ForwardTo::new(dest_recipient), &mut ctx); + /// // wb.handle(Insert::new("k", b"v".to_vec()), &mut ctx); + /// // wb.handle(CommitSnapshot::new(42), &mut ctx); + /// // // dest_recipient receives an InsertBatch containing the buffered insert and a "//seq" entry with 42. + /// ``` fn handle(&mut self, msg: CommitSnapshot, _: &mut Self::Context) -> Self::Result { if let Some(ref dest) = self.dest { if !self.buffer.is_empty() { @@ -62,11 +117,33 @@ impl Handler for WriteBuffer { pub struct ForwardTo(Recipient); impl ForwardTo { + /// Creates a `ForwardTo` wrapper that holds a recipient for `InsertBatch` messages. + /// + /// The function accepts any value that can be converted into `Recipient` and + /// stores the converted recipient inside the returned `ForwardTo`. + /// + /// # Examples + /// + /// ``` + /// // `recipient` should be a `Recipient` obtained from an Actix actor. + /// let recipient: Recipient = /* obtain recipient */; + /// let forward = ForwardTo::new(recipient); + /// ``` pub fn new(dest: impl Into>) -> Self { Self(dest.into()) } + /// Accesses the contained `Recipient`. + /// + /// # Examples + /// + /// ``` + /// // obtain or construct a Recipient in your application context + /// let recipient: Recipient = /* ... */; + /// let fwd = ForwardTo::new(recipient); + /// let dest = fwd.dest(); + /// ``` pub fn dest(self) -> Recipient { self.0 } -} +} \ No newline at end of file diff --git a/crates/entrypoint/src/helpers/datastore.rs b/crates/entrypoint/src/helpers/datastore.rs index 692ec690df..de97bad92d 100644 --- a/crates/entrypoint/src/helpers/datastore.rs +++ b/crates/entrypoint/src/helpers/datastore.rs @@ -14,6 +14,18 @@ use e3_data::{DataStore, InMemStore, SledDb, SledStore}; use e3_data::{Repositories, RepositoriesFactory}; use e3_events::BusHandle; +/// Creates a sled-backed DataStore using the provided bus and database file. +/// +/// Constructs a `SledStore` with `bus` and `db_file` and converts it into a `DataStore`. +/// Propagates any construction error from `SledStore`. +/// +/// # Examples +/// +/// ```no_run +/// let bus: BusHandle = /* obtain a BusHandle */ unimplemented!(); +/// let db_file = std::path::PathBuf::from("my_db.sled"); +/// let datastore = get_sled_store(&bus, &db_file).expect("failed to create sled store"); +/// ``` pub fn get_sled_store(bus: &BusHandle, db_file: &PathBuf) -> Result { Ok((&SledStore::new(bus, db_file)?).into()) } @@ -22,6 +34,23 @@ pub fn get_in_mem_store() -> DataStore { (&InMemStore::new(true).start()).into() } +/// Selects and constructs the application's persistent or in-memory data store based on configuration. +/// +/// If the configuration indicates use of an on-disk store, initializes a Sled-backed store with the configured +/// database file; otherwise initializes an in-memory store. +/// +/// # Returns +/// +/// `Ok(DataStore)` containing the initialized store on success, or an `Err` describing any construction failure. +/// +/// # Examples +/// +/// ``` +/// let config = AppConfig::default(); +/// let bus = BusHandle::new(); +/// let store = setup_datastore(&config, &bus).unwrap(); +/// // Use `store`... +/// ``` pub fn setup_datastore(config: &AppConfig, bus: &BusHandle) -> Result { let store: DataStore = if !config.use_in_mem_store() { get_sled_store(&bus, &config.db_file())? @@ -31,6 +60,26 @@ pub fn setup_datastore(config: &AppConfig, bus: &BusHandle) -> Result Ok(store) } +/// Returns a repositories view configured according to the provided application config. +/// +/// Obtains an enclave bus handle from the configuration, initializes the appropriate +/// DataStore (sled-backed or in-memory) and returns the `Repositories` view from that store. +/// +/// # Parameters +/// +/// - `config`: application configuration that determines which datastore to initialize and +/// is used to obtain the enclave bus handle. +/// +/// # Returns +/// +/// A `Repositories` view backed by the initialized `DataStore`. +/// +/// # Examples +/// +/// ```no_run +/// let config = AppConfig::default(); +/// let repos = get_repositories(&config).expect("failed to get repositories"); +/// ``` pub fn get_repositories(config: &AppConfig) -> Result { let bus = get_enclave_bus_handle(config)?; let store = setup_datastore(config, &bus)?; @@ -39,4 +88,4 @@ pub fn get_repositories(config: &AppConfig) -> Result { pub fn close_all_connections() { SledDb::close_all_connections(); -} +} \ No newline at end of file diff --git a/crates/entrypoint/src/start/aggregator_start.rs b/crates/entrypoint/src/start/aggregator_start.rs index e8f2b2ce13..6d958ca6c2 100644 --- a/crates/entrypoint/src/start/aggregator_start.rs +++ b/crates/entrypoint/src/start/aggregator_start.rs @@ -20,6 +20,24 @@ use std::{ }; use tokio::task::JoinHandle; +/// Initializes and starts a ciphernode node configured from `config`, wires its cryptographic context and networking integration, and optionally attaches test writers for public keys and plaintexts. +/// +/// # Returns +/// +/// A tuple containing the node's enclave `BusHandle`, a `JoinHandle` for the running node task that resolves to `anyhow::Result<()>`, and the node's libp2p peer identifier. +/// +/// # Examples +/// +/// ```no_run +/// # use e3_config::AppConfig; +/// # use std::path::PathBuf; +/// # async fn example() -> anyhow::Result<()> { +/// let config = AppConfig::default(); // construct with real values in real use +/// let (bus, join_handle, peer_id) = e3_ciphernode::execute(&config, None, None, false).await?; +/// // use bus, join_handle, peer_id... +/// # Ok::<(), anyhow::Error>(()) +/// # } +/// ``` pub async fn execute( config: &AppConfig, pubkey_write_path: Option, @@ -70,4 +88,4 @@ pub async fn execute( } Ok((bus, join_handle, peer_id)) -} +} \ No newline at end of file diff --git a/crates/entrypoint/src/start/start.rs b/crates/entrypoint/src/start/start.rs index 545077e4b8..c781978f91 100644 --- a/crates/entrypoint/src/start/start.rs +++ b/crates/entrypoint/src/start/start.rs @@ -18,6 +18,36 @@ use std::sync::{Arc, Mutex}; use tokio::task::JoinHandle; use tracing::instrument; +/// Start and configure a ciphernode, initialize networking, and prepare the background event task. +/// +/// This function builds a ciphernode using values from `config` and `address`, initializes the enclave +/// bus and cipher, configures node features (including TRBFV or keyshare depending on +/// `experimental_trbfv`), and sets up the network event translator. It returns the enclave bus handle, +/// the JoinHandle for the running background task, and the local libp2p peer id. +/// +/// # Parameters +/// +/// * `config` - Application configuration used to configure the node and networking. +/// * `address` - Network address to bind the node to. +/// * `experimental_trbfv` - When `true`, enable TRBFV support; otherwise enable keyshare support. +/// +/// # Returns +/// +/// A tuple containing: +/// 1. the enclave `BusHandle`, +/// 2. a `JoinHandle>` for the node's background task, +/// 3. the local libp2p peer id as a `String`. +/// +/// # Examples +/// +/// ```no_run +/// # async fn example() -> Result<(), Box> { +/// // let config = AppConfig::load("config.toml")?; // hypothetical loader +/// // let address = Address::from_str("127.0.0.1:9000")?; +/// // let (bus, join_handle, peer_id) = crate::start::execute(&config, address, false).await?; +/// # Ok(()) +/// # } +/// ``` #[instrument(name = "app", skip_all)] pub async fn execute( config: &AppConfig, @@ -59,4 +89,4 @@ pub async fn execute( .await?; Ok((bus, join_handle, peer_id)) -} +} \ No newline at end of file diff --git a/crates/events/src/bus_handle.rs b/crates/events/src/bus_handle.rs index ece22e36c1..0d0b44c066 100644 --- a/crates/events/src/bus_handle.rs +++ b/crates/events/src/bus_handle.rs @@ -35,7 +35,18 @@ pub struct BusHandle { } impl BusHandle { - /// Create a new BusHandle + /// Constructs a BusHandle that connects an EventBus consumer with a Sequencer producer and an HLC clock. + /// + /// The provided HLC is associated with the handle and used to timestamp events created by it. + /// + /// # Examples + /// + /// ``` + /// // Given existing `consumer: Addr>>`, + /// // `producer: Addr` and `hlc: Hlc`: + /// let handle = BusHandle::new(consumer, producer, hlc); + /// // `handle` can now be used to publish and subscribe to events. + /// ``` pub fn new( consumer: Addr>>, producer: Addr, @@ -48,12 +59,37 @@ impl BusHandle { } } - /// Return a HistoryCollector for examining events that have passed through on the events bus + /// Returns the HistoryCollector actor address for this bus's sequenced events. + /// + /// The returned address can be used to query or inspect events that have passed through the consumer EventBus. + /// + /// # Examples + /// + /// ```no_run + /// # use actix::prelude::*; + /// # use crates::events::{BusHandle, EnclaveEvent, Sequenced, HistoryCollector}; + /// # fn example(handle: &BusHandle) { + /// let hist: Addr>> = handle.history(); + /// # } + /// ``` pub fn history(&self) -> Addr>> { EventBus::>::history(&self.consumer) } - /// Access the producer to internally dispatch am event to + /// Access the handle's internal producer actor. + /// + /// # Returns + /// + /// A reference to the internal producer address (`Addr`). + /// + /// # Examples + /// + /// ``` + /// // `consumer_addr`, `producer_addr`, and `hlc` are assumed to be available. + /// let handle = BusHandle::new(consumer_addr, producer_addr, hlc); + /// let producer_ref = handle.producer(); + /// // `producer_ref` can be used to send messages to the producer actor. + /// ``` pub fn producer(&self) -> &Addr { &self.producer } @@ -63,13 +99,43 @@ impl BusHandle { &self.consumer } - /// Get a new timestamp. Note this ticks over the internal Hlc. + /// Produces a new timestamp from the handle's HLC and advances the internal clock. + /// + /// # Returns + /// + /// `Ok(u128)` containing the new HLC-derived timestamp, or an `Err` if advancing the HLC fails. + /// + /// # Examples + /// + /// ``` + /// // Assume `handle` is a BusHandle available in scope. + /// let ts = handle.ts().expect("failed to obtain timestamp"); + /// println!("timestamp = {}", ts); + /// ``` pub fn ts(&self) -> Result { let ts = self.hlc.tick()?; Ok(ts.into()) } - /// Pipe events from this handle to the other handle only when the predicate returns true + /// Create a BusHandlePipe actor that forwards matching sequenced events from this handle to another. + /// + /// Starts a BusHandlePipe actor and subscribes it to all event types ("*"). For each incoming + /// `EnclaveEvent`, the provided `predicate` is invoked; when it returns `true` the event + /// is forwarded to the `other` handle with its original timestamp preserved. + /// + /// # Parameters + /// + /// - `other`: target `BusHandle` to forward matching events to. + /// - `predicate`: function called for each `EnclaveEvent`; return `true` to forward the event. + /// + /// # Examples + /// + /// ``` + /// // forward-only-important is an example predicate that forwards events whose data equals "important" + /// let a: BusHandle = /* source handle */; + /// let b: BusHandle = /* target handle */; + /// a.pipe_to(&b, |ev: &EnclaveEvent| ev.data == "important"); + /// ``` pub fn pipe_to(&self, other: &BusHandle, predicate: F) where F: Fn(&EnclaveEvent) -> bool + Unpin + 'static, @@ -144,6 +210,18 @@ impl EventSubscriber> for BusHandle { self.consumer.do_send(Subscribe::new(event_type, recipient)) } + /// Subscribe a recipient to multiple event types on the internal consumer. + /// + /// Each provided event type will be registered with the consumer so the given recipient will receive events of those types. The recipient is cloned for each registration. + /// + /// # Examples + /// + /// ``` + /// use actix::prelude::*; + /// // assume `handle` is a BusHandle and `recipient` implements `Recipient>` + /// let types = vec!["type_a", "type_b"]; + /// handle.subscribe_all(&types, recipient); + /// ``` fn subscribe_all(&self, event_types: &[&str], recipient: Recipient>) { for event_type in event_types.into_iter() { self.consumer @@ -163,6 +241,16 @@ mod tests { use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::time::sleep; + /// Get the current system time as microseconds since the UNIX epoch. + /// + /// Returns the number of microseconds elapsed since 1970-01-01 00:00:00 UTC. + /// + /// # Examples + /// + /// ``` + /// let t = now_micros(); + /// assert!(t > 0); + /// ``` fn now_micros() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) @@ -170,6 +258,21 @@ mod tests { .as_micros() as u64 } + /// Verifies that HLC timestamps preserve causal ordering and monotonicity across clock-drifted buses. + /// + /// Sets up three EventSystem buses (A, B, C) where A's clock is 30 seconds behind B's, forwards all + /// events from A and B to C, publishes four causally-ordered test events across A and B, and then + /// collects the events observed on C. Asserts that: + /// - the causal order of messages ("one","two","three","four") is preserved when sorted by HLC timestamp, + /// - all HLC timestamps are unique, + /// - timestamps are strictly increasing when ordered. + /// + /// # Examples + /// + /// ```no_run + /// // Run the test suite; this test will execute as part of `cargo test`. + /// // cargo test --test crates_events + /// ``` #[actix::test] async fn test_hlc_events() -> anyhow::Result<()> { #[derive(Message)] @@ -312,8 +415,20 @@ impl BusHandlePipe where F: Fn(&EnclaveEvent) -> bool + Unpin + 'static, { - /// Create a new BusHandlePipe only forwarding events to the wrapped handle when the predicate - /// function returns true + /// Creates a BusHandlePipe that forwards events to the provided `BusHandle` only when the predicate returns `true`. + /// + /// # Parameters + /// + /// - `handle`: The destination `BusHandle` to which matching events will be forwarded. + /// - `predicate`: A function that receives an `EnclaveEvent` and returns `true` to forward the event or `false` to drop it. + /// + /// # Examples + /// + /// ``` + /// // Assume `handle` is a valid BusHandle and `event` is an EnclaveEvent. + /// let pipe = BusHandlePipe::new(handle, |evt: &EnclaveEvent| evt.data().len() > 0); + /// // The pipe will forward only events whose data length is greater than zero. + /// ``` pub fn new(handle: BusHandle, predicate: F) -> Self { Self { handle, predicate } } @@ -331,10 +446,27 @@ where F: Fn(&EnclaveEvent) -> bool + Unpin + 'static, { type Result = (); + /// Forwards an incoming `EnclaveEvent` to the wrapped `BusHandle` when the pipe's predicate returns `true`. + /// + /// If the predicate accepts the event, the event is split into its data and timestamp and forwarded via + /// `publish_from_remote` on the inner handle. If the predicate rejects the event, the message is ignored. + /// + /// # Examples + /// + /// ``` + /// // Placeholder example — replace `MyHandle`, `EnclaveEvent`, and context creation with concrete types from the crate. + /// # use std::sync::Arc; + /// # use actix::prelude::*; + /// // let handle: MyHandle = /* existing BusHandle */ unimplemented!(); + /// // let mut pipe = BusHandlePipe::new(handle, |ev: &EnclaveEvent| true); + /// // let mut ctx = Context::new(&mut pipe); + /// // let event: EnclaveEvent = /* create or receive an event */ unimplemented!(); + /// // pipe.handle(event, &mut ctx); + /// ``` fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result { if (self.predicate)(&msg) { let (data, ts) = msg.split(); let _ = self.handle.publish_from_remote(data, ts); } } -} +} \ No newline at end of file diff --git a/crates/events/src/enclave_event/mod.rs b/crates/events/src/enclave_event/mod.rs index 16c7eb441a..cb22f4a953 100644 --- a/crates/events/src/enclave_event/mod.rs +++ b/crates/events/src/enclave_event/mod.rs @@ -345,6 +345,22 @@ impl TryFrom> for EnclaveError { } impl fmt::Display for EnclaveEvent { + /// Formats the event as a colorized event type followed by its debug representation. + /// + /// Writes the event's type name (colored cyan) followed by a space and the `Debug`-formatted + /// event into the provided formatter. + /// + /// # Returns + /// + /// `fmt::Result` indicating whether formatting succeeded. + /// + /// # Examples + /// + /// ``` + /// // Example usage (pseudo): + /// // let s = format!("{}", some_enclave_event); + /// // // s might look like: "CiphernodeAdded EnclaveEvent { id: ..., payload: ..., ... }" + /// ``` fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let t = self.event_type(); f.write_str(&format!("{} {:?}", colorize(t, Color::Cyan), self)) @@ -362,4 +378,4 @@ impl EventConstructorWithTimestamp for EnclaveEvent { ts, } } -} +} \ No newline at end of file diff --git a/crates/events/src/eventbus.rs b/crates/events/src/eventbus.rs index ddf2422ae6..8229bcd2e6 100644 --- a/crates/events/src/eventbus.rs +++ b/crates/events/src/eventbus.rs @@ -400,7 +400,15 @@ impl Actor for HistoryCollector { impl Handler for HistoryCollector { type Result = E::Result; + /// Record an incoming event into the collector. + /// + /// The received event is appended to pending takes if any are waiting for events; + /// otherwise it is appended to the collector's history. + /// + /// # Returns + /// + /// `E::Result` as required by the `Handler` implementation (typically `()`). fn handle(&mut self, msg: E, _ctx: &mut Self::Context) -> Self::Result { self.add_event(msg); } -} +} \ No newline at end of file diff --git a/crates/events/src/events.rs b/crates/events/src/events.rs index 96450e4921..1f6971dbbf 100644 --- a/crates/events/src/events.rs +++ b/crates/events/src/events.rs @@ -14,10 +14,30 @@ use crate::{EnclaveEvent, Sequenced, Unsequenced}; pub struct CommitSnapshot(u64); impl CommitSnapshot { + /// Creates a CommitSnapshot containing the specified sequence number. + /// + /// # Examples + /// + /// ``` + /// let msg = CommitSnapshot::new(42); + /// assert_eq!(msg.seq(), 42); + /// ``` pub fn new(seq: u64) -> Self { Self(seq) } + /// Retrieve the stored sequence number. + /// + /// # Returns + /// + /// The `u64` sequence number contained in the message. + /// + /// # Examples + /// + /// ``` + /// let msg = CommitSnapshot::new(42); + /// assert_eq!(msg.seq(), 42); + /// ``` pub fn seq(&self) -> u64 { self.0 } @@ -32,6 +52,20 @@ pub struct StoreEventRequested { } impl StoreEventRequested { + /// Constructs a `StoreEventRequested` pairing an unsequenced event with a recipient that will receive an `EventStored` response. + /// + /// The provided `sender` is converted into a `Recipient` using `Into`. + /// + /// # Examples + /// + /// ```no_run + /// use actix::Recipient; + /// // `event` should be an `EnclaveEvent` + /// // `recipient` should be a `Recipient` obtained from an actor or converted from an compatible type. + /// let event = /* build or obtain an EnclaveEvent */ unimplemented!(); + /// let recipient: Recipient = /* obtain recipient */ unimplemented!(); + /// let req = StoreEventRequested::new(event, recipient); + /// ``` pub fn new( event: EnclaveEvent, sender: impl Into>, @@ -52,6 +86,16 @@ pub struct GetEventsAfter { } impl GetEventsAfter { + /// Creates a `GetEventsAfter` message for requesting events that occurred after `ts`. + /// + /// The `sender` is converted into a `Recipient` and stored in the message. + /// + /// # Examples + /// + /// ``` + /// // assume `recipient` implements `Into>` + /// let msg = GetEventsAfter::new(1_700_000_000_000_000_000u128, recipient); + /// ``` pub fn new(ts: u128, sender: impl Into>) -> Self { Self { ts, @@ -65,9 +109,28 @@ impl GetEventsAfter { pub struct ReceiveEvents(Vec>); impl ReceiveEvents { + /// Creates a new `ReceiveEvents` wrapping the provided sequenced events. + /// + /// # Examples + /// + /// ``` + /// let msg = ReceiveEvents::new(Vec::new()); + /// assert!(msg.events().is_empty()); + /// ``` pub fn new(events: Vec) -> Self { Self(events) } + /// Borrows the stored list of sequenced events. + /// + /// Returns a reference to the internal `Vec>`. + /// + /// # Examples + /// + /// ``` + /// let evts = ReceiveEvents::new(Vec::new()); + /// let slice: &Vec> = evts.events(); + /// assert!(slice.is_empty()); + /// ``` pub fn events(&self) -> &Vec { &self.0 } @@ -79,7 +142,16 @@ impl ReceiveEvents { pub struct EventStored(pub EnclaveEvent); impl EventStored { + /// Extracts the contained sequenced `EnclaveEvent`. + /// + /// # Examples + /// + /// ``` + /// // Given a tuple struct `EventStored(pub EnclaveEvent)` + /// // let stored = EventStored(enclave_event); + /// // let event = stored.into_event(); + /// ``` pub fn into_event(self) -> EnclaveEvent { self.0 } -} +} \ No newline at end of file diff --git a/crates/events/src/eventstore.rs b/crates/events/src/eventstore.rs index 68295a7037..19d2dd0cc7 100644 --- a/crates/events/src/eventstore.rs +++ b/crates/events/src/eventstore.rs @@ -18,6 +18,26 @@ pub struct EventStore { } impl EventStore { + /// Store an event in the persistent log, index it by its timestamp, and reply to the requester with the sequenced event. + /// + /// Attempts to append the provided event to the event log, insert the resulting sequence number into the sequence index keyed by the event timestamp, and send an `EventStored` message containing the sequenced event to the original sender. Propagates any error encountered during append, index insertion, or sending. + /// + /// # Parameters + /// + /// - `msg`: a `StoreEventRequested` containing the event to persist and the sender to reply to. + /// + /// # Returns + /// + /// `Ok(())` on success, or the underlying error encountered while appending to the log, updating the index, or sending the response. + /// + /// # Examples + /// + /// ```no_run + /// // pseudo-code illustrating typical usage: + /// // let mut store = EventStore::new(index, log); + /// // let request = StoreEventRequested { event, sender }; + /// // store.handle_store_event_requested(request).unwrap(); + /// ``` pub fn handle_store_event_requested(&mut self, msg: StoreEventRequested) -> Result<()> { let event = msg.event; let sender = msg.sender; @@ -28,6 +48,27 @@ impl EventStore { Ok(()) } + /// Sends all events with sequence numbers at or after the sequence corresponding to `msg.ts` to the requesting actor. + /// + /// Looks up the starting sequence for `msg.ts` in the index (defaults to 1 if not found), reads events from the log + /// starting at that sequence, converts them into sequenced events, and delivers them to `msg.sender`. + /// + /// # Returns + /// + /// `Ok(())` if events were read and the response was delivered; `Err` if the index lookup, log read, or send operation failed. + /// + /// # Examples + /// + /// ```rust + /// # // Example is illustrative and ignored in doctest + /// # #[allow(unused_imports)] + /// # use crate::events::{EventStore, GetEventsAfter, ReceiveEvents}; + /// # fn example() { + /// // let mut store = EventStore::new(index, log); + /// // let msg = GetEventsAfter::new(ts, requester); + /// // store.handle_get_events_after(msg).unwrap(); + /// # } + /// ``` pub fn handle_get_events_after(&mut self, msg: GetEventsAfter) -> Result<()> { let seq = self.index.seek(msg.ts)?.unwrap_or(1); let evts = self @@ -40,6 +81,16 @@ impl EventStore { } } impl EventStore { + /// Creates a new EventStore using the provided sequence index and event log. + /// + /// # Examples + /// + /// ``` + /// // `index` implements `SequenceIndex` and `log` implements `EventLog`. + /// let index = Default::default(); + /// let log = Default::default(); + /// let store = EventStore::new(index, log); + /// ``` pub fn new(index: I, log: L) -> Self { Self { index, log } } @@ -51,6 +102,18 @@ impl Actor for EventStore { impl Handler for EventStore { type Result = (); + /// Handles an incoming `StoreEventRequested` message for this actor. + /// + /// Delegates processing to the internal store handler and logs any error produced during handling. + /// The method does not propagate errors to the caller; failures are recorded via tracing. + /// + /// # Parameters + /// + /// - `msg`: the `StoreEventRequested` message containing the event to persist and the reply channel. + /// + /// # Returns + /// + /// This handler does not return a value. fn handle(&mut self, msg: StoreEventRequested, _: &mut Self::Context) -> Self::Result { match self.handle_store_event_requested(msg) { Ok(_) => (), @@ -61,10 +124,22 @@ impl Handler for EventStore< impl Handler for EventStore { type Result = (); + /// Delegates a `GetEventsAfter` message to `handle_get_events_after` and logs any error. + /// + /// This handler forwards the incoming message to the internal processing method and suppresses + /// errors by logging them instead of propagating them to the Actix runtime. + /// + /// # Examples + /// + /// ```no_run + /// use crates::events::{EventStore, GetEventsAfter}; + /// // Given an EventStore `store` and a `ctx` obtained from Actix, calling the handler: + /// // store.handle(GetEventsAfter { /* ... */ }, &mut ctx); + /// ``` fn handle(&mut self, msg: GetEventsAfter, _: &mut Self::Context) -> Self::Result { match self.handle_get_events_after(msg) { Ok(_) => (), Err(e) => error!("{e}"), } } -} +} \ No newline at end of file diff --git a/crates/events/src/hlc.rs b/crates/events/src/hlc.rs index a49c077e79..8272d709f1 100644 --- a/crates/events/src/hlc.rs +++ b/crates/events/src/hlc.rs @@ -178,6 +178,16 @@ struct HlcInner { } impl Default for Hlc { + /// Creates a new `Hlc` instance seeded with a randomly generated node identifier. + /// + /// # Examples + /// + /// ``` + /// let hlc = Hlc::default(); + /// // produced timestamp preserves the HLC's node id + /// let ts = hlc.get(); + /// assert_eq!(ts.node, hlc.node()); + /// ``` fn default() -> Self { let random_id: u32 = rand::thread_rng().gen(); Self::new(random_id) @@ -185,6 +195,26 @@ impl Default for Hlc { } impl PartialEq for Hlc { + /// Compare two `Hlc` instances for identity of shared state, node, and maximum drift. + /// + /// This method considers two `Hlc` values equal only when they point to the same shared + /// inner state (`Arc` pointer equality), and their `node` and `max_drift` fields are equal. + /// The `clock` field is intentionally ignored for equality (it's intended for testing). + /// + /// # Examples + /// + /// ``` + /// let a = Hlc::new(1); + /// let b = a.clone(); // clones the Arc, so inner pointer is identical + /// assert_eq!(a, b); + /// + /// let c = Hlc::new(1); // distinct inner state even though node matches + /// assert_ne!(a, c); + /// ``` + /// + /// # Returns + /// + /// `true` if the two `Hlc` instances share the same inner state and have equal `node` and `max_drift`, `false` otherwise. fn eq(&self, other: &Self) -> bool { Arc::ptr_eq(&self.inner, &other.inner) && self.node == other.node @@ -196,6 +226,27 @@ impl PartialEq for Hlc { impl Hlc { const DEFAULT_MAX_DRIFT: u64 = 60_000_000; // 60 sec + /// Constructs a new Hybrid Logical Clock (Hlc) initialized to timestamp 0 and counter 0 for the given node. + /// + /// The new Hlc uses the default maximum clock drift and no custom clock override. + /// + /// # Parameters + /// + /// - `node`: identifier for this clock's node. + /// + /// # Returns + /// + /// A configured `Hlc` instance with internal state `ts = 0`, `counter = 0`, the provided `node`, default `max_drift`, and no custom clock. + /// + /// # Examples + /// + /// ``` + /// let hlc = Hlc::new(42); + /// assert_eq!(hlc.node(), 42); + /// let ts = hlc.get(); + /// assert_eq!(ts.ts, 0); + /// assert_eq!(ts.counter, 0); + /// ``` pub fn new(node: u32) -> Self { Self { inner: Arc::new(Mutex::new(HlcInner { ts: 0, counter: 0 })), @@ -212,6 +263,21 @@ impl Hlc { Self::new(id as u32) } + /// Creates an Hlc initialized with the given internal state and node identifier. + /// + /// The `ts` value is the physical timestamp in microseconds (typically UNIX epoch + /// microseconds). `counter` is the logical counter portion and `node` is the node id + /// to embed in produced `HlcTimestamp`s. + /// + /// # Examples + /// + /// ``` + /// let h = Hlc::with_state(1_600_000_000_000_000, 5, 42); + /// let t = h.get(); + /// assert_eq!(t.ts, 1_600_000_000_000_000); + /// assert_eq!(t.counter, 5); + /// assert_eq!(t.node, 42); + /// ``` pub fn with_state(ts: u64, counter: u32, node: u32) -> Self { Self { inner: Arc::new(Mutex::new(HlcInner { ts, counter })), @@ -764,4 +830,4 @@ mod tests { "all timestamps must be unique under mixed operations" ); } -} +} \ No newline at end of file diff --git a/crates/events/src/sequencer.rs b/crates/events/src/sequencer.rs index a521637def..e0eb084751 100644 --- a/crates/events/src/sequencer.rs +++ b/crates/events/src/sequencer.rs @@ -19,6 +19,20 @@ pub struct Sequencer { } impl Sequencer { + /// Creates a new `Sequencer` wired to the given bus, event store, and commit buffer. + /// + /// The sequencer will send sequenced events to `bus`, forward incoming unsequenced events to + /// `eventstore`, and send commit snapshots to `buffer`. + /// + /// # Examples + /// + /// ```no_run + /// use actix::prelude::*; + /// // `bus`: Addr>> + /// // `eventstore_recipient`: Recipient + /// // `buffer_recipient`: Recipient + /// let sequencer = Sequencer::new(&bus, eventstore_recipient, buffer_recipient); + /// ``` pub fn new( bus: &Addr>>, eventstore: impl Into>, @@ -38,6 +52,16 @@ impl Actor for Sequencer { impl Handler> for Sequencer { type Result = (); + /// Forwards an unsequenced enclave event to the event store for persistence. + /// + /// # Examples + /// + /// ```rust,ignore + /// // When the Sequencer actor receives an `EnclaveEvent`, + /// // it forwards that event to the configured event store: + /// // + /// // sequencer.handle(unsequenced_event, &mut ctx); + /// ``` fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { self.eventstore .do_send(StoreEventRequested::new(msg, ctx.address())) @@ -46,6 +70,9 @@ impl Handler> for Sequencer { impl Handler for Sequencer { type Result = (); + /// Handles an `EventStored` by committing its snapshot and forwarding the resulting sequenced event to the bus. + /// + /// Converts the incoming `EventStored` into its sequenced event, sends a `CommitSnapshot` with that event's sequence number to the configured buffer, and then forwards the sequenced event to the event bus. fn handle(&mut self, msg: EventStored, _: &mut Self::Context) -> Self::Result { let event = msg.into_event(); let seq = event.get_seq(); @@ -90,4 +117,4 @@ mod tests { ); Ok(()) } -} +} \ No newline at end of file diff --git a/crates/evm/tests/integration.rs b/crates/evm/tests/integration.rs index 385f890781..2c2f85908c 100644 --- a/crates/evm/tests/integration.rs +++ b/crates/evm/tests/integration.rs @@ -138,6 +138,25 @@ async fn evm_reader() -> Result<()> { Ok(()) } +/// Verifies that events emitted before an EVM reader is attached (historical events) +/// are captured and returned in order together with subsequently emitted live events. +/// +/// The test starts an Anvil WebSocket provider, deploys the EmitLogs contract, +/// emits a sequence of historical events, attaches an EvmEventReader, emits live events, +/// and asserts that the history contains the historical events followed by the live events. +/// +/// # Examples +/// +/// ``` +/// // This test is executed by the test harness (cargo test). To exercise the same +/// // behavior manually from an async context: +/// # async fn run() -> eyre::Result<()> { +/// # // call the test function as an async function in examples only; normally +/// # // the test framework runs it automatically. +/// # let _ = crate::ensure_historical_events().await?; +/// # Ok(()) +/// # } +/// ``` #[actix::test] async fn ensure_historical_events() -> Result<()> { // Create a WS provider @@ -218,6 +237,24 @@ async fn ensure_historical_events() -> Result<()> { Ok(()) } +/// Verifies that an EVM event reader resumes after a shutdown and that no events are lost during the shutdown window. +/// +/// The test: +/// - Emits events before any reader is attached (historical events). +/// - Attaches an `EvmEventReader` and starts the coordinator so live events are processed. +/// - Emits additional live events, then simulates a reader shutdown by storing a `Shutdown` enclave event. +/// - Emits further events while the reader is down, re-attaches a reader, and asserts that all events +/// (pre-shutdown historical, live before shutdown, events emitted during shutdown, and events after resume) +/// appear in the collected history in the original emission order. +/// +/// # Examples +/// +/// ```no_run +/// # use eyre::Result; +/// # async fn example() -> Result<()> { +/// ensure_resume_after_shutdown().await?; +/// # Ok(()) } +/// ``` #[actix::test] async fn ensure_resume_after_shutdown() -> Result<()> { // Create a WS provider @@ -329,6 +366,20 @@ async fn ensure_resume_after_shutdown() -> Result<()> { Ok(()) } +/// Integration test that verifies a single EVM reader processes both historical events and subsequent live events in order. +/// +/// This test deploys an EmitLogs contract to an Anvil node, stages several historical events by emitting logs +/// before attaching the reader, attaches an EvmEventReader via a HistoricalEventCoordinator, starts the +/// coordinator, and then emits live events to ensure they are processed after the historical ones. +/// +/// # Examples +/// +/// ``` +/// # async fn run() -> anyhow::Result<()> { +/// coordinator_single_reader().await?; +/// # Ok::<(), anyhow::Error>(()) +/// # } +/// ``` #[actix::test] async fn coordinator_single_reader() -> Result<()> { let anvil = Anvil::new().block_time(1).try_spawn()?; @@ -489,6 +540,19 @@ async fn coordinator_multiple_readers() -> Result<()> { Ok(()) } +/// Verifies that an EVM event reader attached with no historical events processes only subsequently emitted (live) events. +/// +/// # Examples +/// +/// ``` +/// // When no historical events are present, the history starts empty and +/// // later contains events emitted after the reader is attached. +/// let msgs_before: Vec<&str> = vec![]; +/// assert_eq!(msgs_before.len(), 0); +/// +/// let msgs_after = vec!["live1", "live2"]; +/// assert_eq!(msgs_after, ["live1", "live2"]); +/// ``` #[actix::test] async fn coordinator_no_historical_events() -> Result<()> { let anvil = Anvil::new().block_time(1).try_spawn()?; @@ -541,4 +605,4 @@ async fn coordinator_no_historical_events() -> Result<()> { assert_eq!(msgs, ["live1", "live2"]); Ok(()) -} +} \ No newline at end of file diff --git a/crates/keyshare/src/threshold_keyshare.rs b/crates/keyshare/src/threshold_keyshare.rs index ccc84e1125..a6e8e973e0 100644 --- a/crates/keyshare/src/threshold_keyshare.rs +++ b/crates/keyshare/src/threshold_keyshare.rs @@ -330,7 +330,35 @@ impl ThresholdKeyshare { Ok(()) } - /// 1. CiphernodeSelected + /// Begin generation of threshold-share material when a cipher node is selected. + /// + /// This updates the internal state to `GeneratingThresholdShare`, ensures the + /// threshold-share collector actor exists, and enqueues computation requests to + /// generate ESI SSS shards and the public-key share / secret-key SSS for the + /// selected cipher node. + /// + /// # Arguments + /// + /// * `msg` - The `CiphernodeSelected` event describing the selected cipher node. + /// * `address` - The actor address to which generation messages will be sent. + /// + /// # Returns + /// + /// `Ok(())` on success, or an error if the state transition or mutation fails. + /// + /// # Examples + /// + /// ``` + /// // Pseudocode example showing typical usage: + /// # use actix::prelude::*; + /// # use anyhow::Result; + /// # // Assume `tk` is a mutable ThresholdKeyshare instance and `addr` is its Addr<_> + /// # let mut tk = /* ThresholdKeyshare::new(...) */ unimplemented!(); + /// # let addr: Addr = unimplemented!(); + /// # let event: CiphernodeSelected = unimplemented!(); + /// // Start threshold-share generation for the selected cipher node + /// let _ = tk.handle_ciphernode_selected(event, addr); + /// ``` pub fn handle_ciphernode_selected( &mut self, msg: CiphernodeSelected, @@ -814,4 +842,4 @@ impl Handler for ThresholdKeyshare { |act, res, _| act.handle_calculate_decryption_share_response(res), ) } -} +} \ No newline at end of file diff --git a/crates/net/src/document_publisher.rs b/crates/net/src/document_publisher.rs index e46527adc9..4fbc61e985 100644 --- a/crates/net/src/document_publisher.rs +++ b/crates/net/src/document_publisher.rs @@ -498,6 +498,35 @@ mod tests { }; use tracing::subscriber::DefaultGuard; + /// Initializes a test environment for DocumentPublisher and returns handles needed by tests. + /// + /// Sets up tracing (capturing logs for the test), creates an EventSystem with a fresh bus, + /// constructs channels for NetCommand/NetEvent, starts history collectors for enclave events, + /// subscribes collectors to the bus, and spawns a DocumentPublisher wired to the test bus and + /// network channels. + /// + /// # Returns + /// + /// A 9-tuple containing: + /// 1. `DefaultGuard` — guard returned by `tracing::subscriber::set_default` to restore previous subscriber; + /// 2. `BusHandle` — handle to the fresh EventSystem bus; + /// 3. `mpsc::Sender` — sender side for issuing net commands in tests; + /// 4. `mpsc::Receiver` — receiver side for observing issued net commands; + /// 5. `broadcast::Sender` — broadcaster for injecting net events into the system; + /// 6. `Arc>` — shared receiver used by the DocumentPublisher under test; + /// 7. `Addr>` — history collector for general enclave events; + /// 8. `Addr>` — history collector for enclave error events; + /// 9. `Addr` — address of the spawned DocumentPublisher actor. + /// + /// # Examples + /// + /// ``` + /// # use anyhow::Result; + /// # fn doc_test() -> Result<()> { + /// let (_guard, bus, net_cmd_tx, mut net_cmd_rx, net_evt_tx, net_evt_rx, history, errors, publisher) = setup_test()?; + /// // Use returned handles to drive and observe the DocumentPublisher in tests. + /// # Ok(()) } + /// ``` fn setup_test() -> Result<( DefaultGuard, BusHandle, @@ -718,6 +747,23 @@ mod tests { Ok(()) } + /// Verifies that a published document for an interested E3 id is fetched and forwarded as DocumentReceived. + /// + /// Sends a CiphernodeSelected to mark interest, emits a DocumentPublishedNotification for a non-matching id + /// (which must be ignored), then emits a matching DocumentPublishedNotification and asserts that the + /// publisher issues a DhtGetRecord, that the DHT response is consumed, and that a DocumentReceived event + /// with the original bytes is published to the enclave event bus. + /// + /// # Examples + /// + /// ```no_run + /// // Test harness sets up bus, net command/event channels, and a DocumentPublisher. + /// // The test covers: ignore non-matching notifications, request DHT get for matching + /// // notification, and publish DocumentReceived after successful DHT get. + /// async fn example_usage() { + /// test_notified_of_document().await.unwrap(); + /// } + /// ``` #[actix::test] async fn test_notified_of_document() -> Result<()> { let (_guard, bus, _net_cmd_tx, mut net_cmd_rx, net_evt_tx, _net_evt_rx, history, _, _) = @@ -814,4 +860,4 @@ mod tests { pub fn days_from_now(days: u64) -> Instant { Instant::now() + Duration::from_secs(60 * 60 * 24 * days) } -} +} \ No newline at end of file diff --git a/crates/sortition/src/ciphernode_selector.rs b/crates/sortition/src/ciphernode_selector.rs index 5ae487dc7b..c88f399a20 100644 --- a/crates/sortition/src/ciphernode_selector.rs +++ b/crates/sortition/src/ciphernode_selector.rs @@ -32,6 +32,22 @@ impl Actor for CiphernodeSelector { } impl CiphernodeSelector { + /// Constructs a new CiphernodeSelector configured with the provided bus handle, + /// sortition actor address, persistent E3 metadata cache, and this node's address. + /// + /// `e3_cache` is moved into the selector and used to persist E3 metadata across requests. + /// `address` is the ciphernode's own address used for selection and committee membership checks. + /// + /// # Returns + /// + /// A configured `CiphernodeSelector` instance. + /// + /// # Examples + /// + /// ``` + /// // assume `bus`, `sortition_addr`, `cache`, and `addr` are available in scope + /// let selector = CiphernodeSelector::new(&bus, &sortition_addr, cache, "0xabc"); + /// ``` pub fn new( bus: &BusHandle, sortition: &Addr, @@ -46,6 +62,27 @@ impl CiphernodeSelector { } } + /// Creates, initializes, and starts a CiphernodeSelector actor, subscribing it to relevant bus events. + /// + /// This loads or initializes the persistent E3 metadata cache from `selector_store`, constructs and starts + /// the actor, subscribes the actor to "E3Requested", "CommitteeFinalized", and "Shutdown" events on the bus, + /// and returns the actor address. + /// + /// # Returns + /// + /// The started actor's address wrapped in `Result::Ok` on success, or an error from loading the cache. + /// + /// # Examples + /// + /// ``` + /// # use actix::Addr; + /// # use std::collections::HashMap; + /// # use my_crate::{CiphernodeSelector, BusHandle, Sortition, E3Meta, E3id, Repository}; + /// # async fn example(bus: &BusHandle, sortition: &Addr, store: Repository>, addr: &str) -> anyhow::Result> { + /// let selector_addr = CiphernodeSelector::attach(bus, sortition, store, addr).await?; + /// Ok(selector_addr) + /// # } + /// ``` pub async fn attach( bus: &BusHandle, sortition: &Addr, @@ -66,6 +103,23 @@ impl CiphernodeSelector { impl Handler for CiphernodeSelector { type Result = (); + /// Forwards an incoming `EnclaveEvent`'s inner data to the actor context as a notification. + /// + /// This method extracts the `EnclaveEventData` from the provided `EnclaveEvent` and notifies the + /// actor context with the contained message for these variants: `E3Requested`, `E3RequestComplete`, + /// `CommitteeFinalized`, and `Shutdown`. Other event variants are ignored. + /// + /// # Examples + /// + /// ```rust + /// // Pseudocode illustrating intended use: + /// // let event: EnclaveEvent = ...; + /// // let mut selector: CiphernodeSelector = ...; + /// // let mut ctx: Context = ...; + /// // selector.handle(event, &mut ctx); + /// // The context will receive a notification for E3Requested, E3RequestComplete, + /// // CommitteeFinalized, or Shutdown variants. + /// ``` fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { match msg.into_data() { EnclaveEventData::E3Requested(data) => ctx.notify(data), @@ -80,6 +134,32 @@ impl Handler for CiphernodeSelector { impl Handler for CiphernodeSelector { type Result = ResponseFuture<()>; + /// Handles an incoming E3Requested event: caches E3 metadata, queries the Sortition + /// actor for a node index for the given seed and size, and publishes a `TicketGenerated` + /// event if this node is selected. + /// + /// The handler first inserts an `E3Meta` entry for the provided `e3_id` into the + /// persistent `e3_cache`. It then requests a node index from the `Sortition` actor + /// using the event seed and threshold, and if a ticket is returned emits a + /// `TicketGenerated` event on the bus with the `e3_id`, ticket id, and node address. + /// + /// # Examples + /// + /// ``` + /// # use futures::executor::block_on; + /// # // `selector` is a mutable CiphernodeSelector, `ctx` is its context, and + /// # // `req` is an E3Requested value prepared for the test. + /// # let mut selector = /* ... */ panic!(); + /// # let mut ctx = /* ... */ panic!(); + /// # let req = /* ... */ panic!(); + /// // Call the handler and wait for it to complete. + /// let fut = selector.handle(req, &mut ctx); + /// block_on(fut); + /// ``` + /// + /// # Returns + /// + /// `()` on completion. fn handle(&mut self, data: E3Requested, _ctx: &mut Self::Context) -> Self::Result { let address = self.address.clone(); let sortition = self.sortition.clone(); @@ -156,6 +236,17 @@ impl Handler for CiphernodeSelector { impl Handler for CiphernodeSelector { type Result = (); + /// Remove cached metadata for the completed E3 request and report any sortition errors to the bus. + /// + /// This notifies the sortition error trap while attempting to remove `msg.e3_id` from the persistent + /// E3 metadata cache; failures during the mutation are reported via the bus. + /// + /// # Examples + /// + /// ``` + /// // Given a mutable CiphernodeSelector `sel`, remove the completed request's cache entry: + /// // sel.handle(E3RequestComplete { e3_id: my_e3_id }, &mut sel_context); + /// ``` fn handle(&mut self, msg: E3RequestComplete, _: &mut Self::Context) -> Self::Result { trap(EType::Sortition, &self.bus.clone(), move || { self.e3_cache.try_mutate(|mut cache| { @@ -169,6 +260,26 @@ impl Handler for CiphernodeSelector { impl Handler for CiphernodeSelector { type Result = (); + /// Handles a finalized committee event by checking local membership and, if a member, publishing a `CiphernodeSelected` event with cached E3 metadata. + /// + /// Retrieves E3 metadata for the finalized E3 id from the persistent cache, verifies whether this node's address is present in the finalized committee, and if so publishes a `CiphernodeSelected` event containing the party id and E3 metadata (thresholds, parameters, seed, etc.). If the cache is unavailable or the E3 metadata is missing, the handler reports the condition through the sortition trap and exits without publishing. + /// + /// # Parameters + /// + /// - `msg`: the `CommitteeFinalized` message containing the finalized committee list and the `e3_id`. + /// + /// # Returns + /// + /// `Ok(())` on success; any reported error is propagated through the sortition trap wrapper. + /// + /// # Examples + /// + /// ```no_run + /// // Sketch: when a CommitteeFinalized message arrives, the actor will check membership + /// // and publish CiphernodeSelected if applicable. + /// let msg = CommitteeFinalized { e3_id: some_e3_id, committee: vec![my_addr.clone(), /* ... */] }; + /// // The actual invocation occurs inside Actix actor runtime as part of Handler. + /// ``` fn handle(&mut self, msg: CommitteeFinalized, _ctx: &mut Self::Context) -> Self::Result { trap(EType::Sortition, &self.bus.clone(), move || { info!("CiphernodeSelector received CommitteeFinalized."); @@ -229,4 +340,4 @@ impl Handler for CiphernodeSelector { info!("Killing CiphernodeSelector"); ctx.stop(); } -} +} \ No newline at end of file diff --git a/crates/sortition/src/repo.rs b/crates/sortition/src/repo.rs index 75a92d0ccb..0b743dece9 100644 --- a/crates/sortition/src/repo.rs +++ b/crates/sortition/src/repo.rs @@ -17,6 +17,17 @@ pub trait SortitionRepositoryFactory { } impl SortitionRepositoryFactory for Repositories { + /// Create a Repository scoped to the sortition store. + /// + /// This repository provides access to the map keyed by `u64` with `SortitionBackend` values + /// stored under the sortition namespace. + /// + /// # Examples + /// + /// ``` + /// // Given a `repositories` value, obtain the sortition repository. + /// let sortition_repo = repositories.sortition(); + /// ``` fn sortition(&self) -> Repository> { Repository::new(self.store.scope(StoreKeys::sortition())) } @@ -27,6 +38,17 @@ pub trait CiphernodeSelectorFactory { } impl CiphernodeSelectorFactory for Repositories { + /// Create a repository scoped to the `ciphernode_selector` store key. + /// + /// The returned repository stores a `HashMap`. + /// + /// # Examples + /// + /// ``` + /// # use your_crate::Repositories; + /// let repos: Repositories = unimplemented!(); + /// let repo = repos.ciphernode_selector(); + /// ``` fn ciphernode_selector(&self) -> Repository> { Repository::new(self.store.scope(StoreKeys::ciphernode_selector())) } @@ -50,4 +72,4 @@ impl FinalizedCommitteesRepositoryFactory for Repositories { fn finalized_committees(&self) -> Repository>> { Repository::new(self.store.scope(StoreKeys::finalized_committees())) } -} +} \ No newline at end of file diff --git a/crates/test-helpers/src/ciphernode_system.rs b/crates/test-helpers/src/ciphernode_system.rs index 61ef319129..8cc2214c96 100644 --- a/crates/test-helpers/src/ciphernode_system.rs +++ b/crates/test-helpers/src/ciphernode_system.rs @@ -200,6 +200,23 @@ mod tests { use e3_data::InMemStore; use e3_events::{EventBus, EventBusConfig}; + /// Constructs a mock CiphernodeHandle for tests using an in-memory store and a test event system. + /// + /// The returned handle is configured with the provided `address`, an in-memory store, an `EventSystem` handle + /// wired to an event bus, and populated `history` and `errors` bus handles when available. + /// + /// # Examples + /// + /// ``` + /// # use anyhow::Result; + /// # async fn __example() -> Result<()> { + /// let handle = mock_setup_node("node_a".to_string()).await?; + /// assert_eq!(handle.address, "node_a"); + /// assert!(handle.history.is_some()); + /// assert!(handle.errors.is_some()); + /// # Ok(()) + /// # } + /// ``` async fn mock_setup_node(address: String) -> Result { // Create mock actors for the test let store = InMemStore::new(true).start(); @@ -236,4 +253,4 @@ mod tests { assert_eq!(nodes[3].address, "node_b"); assert_eq!(nodes[4].address, "node_b"); } -} +} \ No newline at end of file diff --git a/crates/test-helpers/src/lib.rs b/crates/test-helpers/src/lib.rs index 8f7540e14c..865a45d722 100644 --- a/crates/test-helpers/src/lib.rs +++ b/crates/test-helpers/src/lib.rs @@ -69,6 +69,36 @@ pub fn create_crp_bytes_params( (crp_bytes, params) } +/// Sets up an in-memory event environment and cryptographic test fixtures for use in tests. +/// +/// This creates an in-memory `EventBus` and two `HistoryCollector`s (one subscribed to all events, +/// the other subscribed to `EnclaveError`), a deterministic shared RNG and seed, BFV parameters +/// (using the provided `param_set` or a secure default), and a deserialized `CommonRandomPoly`. +/// +/// # Parameters +/// +/// * `param_set` - Optional BFV parameter set to use; when `None`, a default `InsecureSet2048_1032193_1` is used. +/// +/// # Returns +/// +/// A tuple containing: +/// 1. `BusHandle` — an in-memory event system handle bound to the created event bus, +/// 2. `SharedRng` — a thread-safe, seeded ChaCha20 RNG, +/// 3. `Seed` — seed material derived deterministically, +/// 4. `Arc` — the BFV parameters derived from the chosen parameter set, +/// 5. `CommonRandomPoly` — the deserialized common random polynomial created from CRP bytes, +/// 6. `Addr>` — the address of the error history collector, +/// 7. `Addr>` — the address of the general history collector. +/// +/// # Examples +/// +/// ``` +/// let (handle, _rng, _seed, _params, _crp, errors, history) = +/// get_common_setup(None).expect("setup should succeed"); +/// // collectors should be running and addresses valid +/// assert!(errors.connected()); +/// assert!(history.connected()); +/// ``` pub fn get_common_setup( param_set: Option, ) -> Result<( @@ -98,31 +128,18 @@ pub fn get_common_setup( Ok((handle, rng, seed, params, crpoly, errors, history)) } -/// Simulate libp2p by taking output events on each local bus and filter for !is_local_only() and forward remaining events back to the event bus -/// deduplication will remove previously seen events. -/// This sets up a set of cyphernodes without libp2p. -/// The way it works is that it feeds back all events from -/// all nodes filteres by whether they are broadcastible or not -/// ```txt +/// Wire node event buses so broadcastable events are forwarded between distinct nodes to simulate a libp2p-like network. /// -/// ┌─────┐ -/// │ BUS │ -/// └─────┘ -/// │ -/// ┌────────────┼────────────┐ -/// │ │ │ -/// ▼ ▼ ▼ -/// ┌────┐ ┌────┐ ┌────┐ -/// │ B1 │ │ B2 │ │ B3 │◀──┐ -/// └────┘ └────┘ └────┘ │ -/// │ │ │ │ -/// │ │ │ │ -/// └────────────┼────────────┘ │ -/// │ │ -/// ▼ │ -/// ┌─────┐ │ -/// │ FIL │───────────────┘ -/// └─────┘ +/// For each pair of distinct ciphernode handles, this forwards events from the source bus to the destination bus when the event +/// is considered forwardable by `NetEventTranslator` or is a document-publisher event. Events are not forwarded to the same node's bus. +/// This function does not modify nodes or perform network IO; it only connects in-memory event buses so tests can observe cross-node propagation. +/// +/// # Examples +/// +/// ```no_run +/// // Given a list of initialized CiphernodeHandle values, connect their buses so broadcastable events propagate between them. +/// // let nodes: Vec = ...; +/// // simulate_libp2p_net(&nodes); /// ``` pub fn simulate_libp2p_net(nodes: &[CiphernodeHandle]) { for node in nodes.iter() { @@ -204,4 +221,4 @@ pub fn encrypt_ciphertext( }) .collect::>>()?; Ok((ciphertext, plaintext)) -} +} \ No newline at end of file diff --git a/crates/tests/tests/integration.rs b/crates/tests/tests/integration.rs index df0fe66a30..0d89049ff2 100644 --- a/crates/tests/tests/integration.rs +++ b/crates/tests/tests/integration.rs @@ -82,7 +82,28 @@ fn serialize_report(report: &[(&str, Duration)]) -> String { .join("\n") } -/// Test trbfv +/// Runs an end-to-end integration test of the TRBFV workflow on a simulated multi-node ciphernode system, +/// exercising committee setup, threshold keyshare generation, public-key aggregation, application ciphertext +/// generation and publication, decryption share collection, and plaintext aggregation and verification. +/// +/// This test builds an in-process event system with multiple ciphernodes (including a collector), configures +/// a threshold BFV setup (m=2, n=5 in this test), publishes an E3Requested event, finalizes a deterministic +/// committee, generates application ciphertexts, publishes them, and asserts that the final plaintext tally +/// matches the expected results. Timing measurements for major phases are collected and printed for diagnostics. +/// +/// # Returns +/// +/// `Ok(())` if the full workflow completes and all assertions succeed, `Err(_)` on failure. +/// +/// # Examples +/// +/// ``` +/// #[actix::test] +/// async fn run_trbfv_example() -> anyhow::Result<()> { +/// // Execute the full integration test; any failure will be returned as an error. +/// test_trbfv_actor().await +/// } +/// ``` #[actix::test] #[serial_test::serial] async fn test_trbfv_actor() -> Result<()> { @@ -431,4 +452,4 @@ async fn test_trbfv_actor() -> Result<()> { println!("{}", serialize_report(&report)); Ok(()) -} +} \ No newline at end of file diff --git a/crates/tests/tests/integration_legacy.rs b/crates/tests/tests/integration_legacy.rs index 4bfc160924..4e57396788 100644 --- a/crates/tests/tests/integration_legacy.rs +++ b/crates/tests/tests/integration_legacy.rs @@ -48,6 +48,37 @@ use tokio::sync::mpsc; use tokio::sync::{broadcast, Mutex}; use tokio::time::sleep; +/// Constructs and configures a local Ciphernode and returns its handle. +/// +/// The node is created with keyshare, address, test-mode features (forked bus, history, errors), +/// pubkey and plaintext aggregation, and sortition enabled. Optionally attaches an in-memory +/// datastore and enables logging when requested. +/// +/// # Parameters +/// +/// - `addr`: The node's address (used as identifier). +/// - `store`: Optional in-memory datastore address to attach for persistence/hydration. +/// +/// # Returns +/// +/// A configured `CiphernodeHandle` on success. +/// +/// # Examples +/// +/// ```rust +/// # use std::sync::Arc; +/// # use tokio::runtime::Runtime; +/// # async fn _example() -> anyhow::Result<()> { +/// // setup placeholders (create real instances in integration tests) +/// let bus = /* BusHandle */ unimplemented!(); +/// let rng = /* SharedRng */ unimplemented!(); +/// let cipher = Arc::new(/* Cipher */ unimplemented!()); +/// +/// let handle = crate::setup_local_ciphernode(&bus, &rng, true, "node1", None, &cipher).await?; +/// // use `handle`... +/// # Ok(()) +/// # } +/// ``` async fn setup_local_ciphernode( bus: &BusHandle, rng: &SharedRng, @@ -292,6 +323,20 @@ async fn test_public_key_aggregation_and_decryption() -> Result<()> { Ok(()) } +/// Verifies that keyshare state is preserved after shutdown and rehydration. +/// +/// This integration test sets up two local ciphernodes, triggers keyshare creation and +/// aggregation, shuts down the nodes and dumps their in-memory stores, rehydrates new +/// ciphernodes from those dumps, publishes a ciphertext encrypted with the recovered +/// aggregated public key, and asserts that the plaintext recovered by the rehydrated +/// nodes matches the original plaintext. +/// +/// # Examples +/// +/// ```no_run +/// // This is an integration-style async test executed by the test harness. +/// // Run with `cargo test` to execute the test case in the repository. +/// ``` #[actix::test] async fn test_stopped_keyshares_retain_state() -> Result<()> { let e3_id = E3id::new("1234", 1); @@ -441,6 +486,23 @@ async fn test_stopped_keyshares_retain_state() -> Result<()> { Ok(()) } +/// Verifies that NetEventTranslator forwards non-local bus events to the network while keeping local-only events on the local bus. +/// +/// Publishes two `PlaintextAggregated` events and one local `CiphernodeSelected` event, then asserts: +/// - the network output channel receives the two non-local `PlaintextAggregated` messages (and not the local event), +/// - the bus history contains all three local events (including the local-only event), +/// ensuring forwarded events are transmitted to the network but not retransmitted back into the bus. +/// +/// # Examples +/// +/// ```no_run +/// #[actix::test] +/// async fn example_forwarding_behavior() -> anyhow::Result<()> { +/// // Setup EventSystem, NetEventTranslator, publish events and assert network vs bus history behavior. +/// // This test mirrors `test_p2p_actor_forwards_events_to_network` integration assertions. +/// Ok(()) +/// } +/// ``` #[actix::test] async fn test_p2p_actor_forwards_events_to_network() -> Result<()> { // Setup elements in test @@ -656,4 +718,4 @@ async fn test_p2p_actor_forwards_events_to_bus() -> Result<()> { ); Ok(()) -} +} \ No newline at end of file diff --git a/crates/utils/src/formatters.rs b/crates/utils/src/formatters.rs index b8094d900b..b1053af1fb 100644 --- a/crates/utils/src/formatters.rs +++ b/crates/utils/src/formatters.rs @@ -22,7 +22,25 @@ pub fn hexf(data: &[u8], f: &mut fmt::Formatter) -> fmt::Result { ) } -/// truncate a string +/// Truncates a hex-like string for compact display, prefixing short values with `0x` and +/// summarizing long values with their length and leading/trailing segments. +/// +/// For input strings with length <= 100, the function returns the input prefixed with `0x`. +/// For longer inputs it returns `"..>"`, where `start` is the +/// first 25 characters and `end` is the last 25 characters of the original string. +/// +/// # Examples +/// +/// ``` +/// // Short input is prefixed with 0x +/// let short = "deadbeef".to_string(); +/// assert_eq!(crate::truncate(short), "0xdeadbeef"); +/// +/// // Long input is summarized with length and leading/trailing segments +/// let long = String::from("a").repeat(120); +/// let expected = format!("", 120, "a".repeat(25), "a".repeat(25)); +/// assert_eq!(crate::truncate(long), expected); +/// ``` pub fn truncate(s: String) -> String { let threshold = 100; // will leave it let limit = 50; @@ -55,6 +73,18 @@ pub enum Color { BrightWhite = 97, } +/// Wraps a Displayable value with ANSI escape codes to apply the specified color. +/// +/// The returned string contains an ANSI sequence that sets the given color, the +/// formatted value, and a reset sequence to restore terminal formatting. +/// +/// # Examples +/// +/// ``` +/// let s = colorize("hello", Color::Red); +/// assert!(s.starts_with("\x1b[31m")); +/// assert!(s.ends_with("\x1b[0m")); +/// ``` pub fn colorize(s: T, color: Color) -> String { format!("\x1b[{}m{}\x1b[0m", color as u8, s) -} +} \ No newline at end of file