diff --git a/Cargo.lock b/Cargo.lock index 3dd813c977..991041a2d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2936,11 +2936,13 @@ dependencies = [ "chrono", "derivative", "e3-crypto", + "e3-events", "e3-trbfv", "e3-utils", "futures-util", "once_cell", "proptest", + "rand 0.8.5", "serde", "sha2", "strum", diff --git a/crates/aggregator/src/committee_finalizer.rs b/crates/aggregator/src/committee_finalizer.rs index aef3847ea2..069e8a3886 100644 --- a/crates/aggregator/src/committee_finalizer.rs +++ b/crates/aggregator/src/committee_finalizer.rs @@ -6,8 +6,8 @@ use actix::prelude::*; use e3_events::{ - prelude::*, BusHandle, CommitteeFinalizeRequested, CommitteeRequested, EnclaveEvent, - EnclaveEventData, Shutdown, + prelude::*, trap, BusHandle, CommitteeFinalizeRequested, CommitteeRequested, EType, + EnclaveEvent, EnclaveEventData, Shutdown, }; use std::collections::HashMap; use std::time::Duration; @@ -16,19 +16,19 @@ use tracing::{error, info}; /// CommitteeFinalizer is an actor that listens to CommitteeRequested events and dispatches /// CommitteeFinalizeRequested events after the submission deadline has passed. pub struct CommitteeFinalizer { - bus: BusHandle, + bus: BusHandle, pending_committees: HashMap, } impl CommitteeFinalizer { - pub fn new(bus: &BusHandle) -> Self { + pub fn new(bus: &BusHandle) -> Self { Self { bus: bus.clone(), pending_committees: HashMap::new(), } } - pub fn attach(bus: &BusHandle) -> Addr { + pub fn attach(bus: &BusHandle) -> Addr { let addr = CommitteeFinalizer::new(bus).start(); bus.subscribe_all( @@ -112,8 +112,11 @@ impl Handler for CommitteeFinalizer { move |act, _ctx| { info!(e3_id = %e3_id_clone, "Dispatching CommitteeFinalizeRequested event"); - bus.publish(CommitteeFinalizeRequested { - e3_id: e3_id_clone.clone(), + trap(EType::Sortition, &act.bus.clone(), || { + bus.publish(CommitteeFinalizeRequested { + e3_id: e3_id_clone.clone(), + })?; + Ok(()) }); act.pending_committees.remove(&e3_id_clone.to_string()); diff --git a/crates/aggregator/src/ext.rs b/crates/aggregator/src/ext.rs index abdbbdf489..1e97ab4ac5 100644 --- a/crates/aggregator/src/ext.rs +++ b/crates/aggregator/src/ext.rs @@ -19,7 +19,7 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; use e3_data::{AutoPersist, Persistable, RepositoriesFactory}; use e3_events::{prelude::*, E3id}; -use e3_events::{BusHandle, EnclaveErrorType, EnclaveEvent, EnclaveEventData}; +use e3_events::{BusHandle, EType, EnclaveEvent, EnclaveEventData}; use e3_fhe::ext::FHE_KEY; use e3_fhe::Fhe; use e3_multithread::Multithread; @@ -28,12 +28,12 @@ use e3_sortition::Sortition; #[deprecated = "In favour of ThresholdPlaintextAggregatorExtension"] pub struct PlaintextAggregatorExtension { - bus: BusHandle, + bus: BusHandle, sortition: Addr, } impl PlaintextAggregatorExtension { - pub fn create(bus: &BusHandle, sortition: &Addr) -> Box { + pub fn create(bus: &BusHandle, sortition: &Addr) -> Box { Box::new(Self { bus: bus.clone(), sortition: sortition.clone(), @@ -54,7 +54,7 @@ impl E3Extension for PlaintextAggregatorExtension { let Some(fhe) = ctx.get_dependency(FHE_KEY) else { self.bus.err( - EnclaveErrorType::PlaintextAggregation, + EType::PlaintextAggregation, anyhow!(ERROR_PLAINTEXT_FHE_MISSING), ); return; @@ -62,7 +62,7 @@ impl E3Extension for PlaintextAggregatorExtension { let Some(ref meta) = ctx.get_dependency(META_KEY) else { self.bus.err( - EnclaveErrorType::PlaintextAggregation, + EType::PlaintextAggregation, anyhow!(ERROR_PLAINTEXT_META_MISSING), ); return; @@ -74,7 +74,7 @@ impl E3Extension for PlaintextAggregatorExtension { // This is a single ciphertext for the legacy PlaintextAggregator let Some(single_ciphertext) = data.ciphertext_output.first() else { self.bus.err( - EnclaveErrorType::PlaintextAggregation, + EType::PlaintextAggregation, anyhow!("Could not extract ciphertext from array"), ); return; @@ -122,7 +122,7 @@ impl E3Extension for PlaintextAggregatorExtension { // Get deps let Some(fhe) = ctx.get_dependency(FHE_KEY) else { self.bus.err( - EnclaveErrorType::PlaintextAggregation, + EType::PlaintextAggregation, anyhow!(ERROR_PLAINTEXT_FHE_MISSING), ); return Ok(()); @@ -148,11 +148,11 @@ impl E3Extension for PlaintextAggregatorExtension { } pub struct PublicKeyAggregatorExtension { - bus: BusHandle, + bus: BusHandle, } impl PublicKeyAggregatorExtension { - pub fn create(bus: &BusHandle) -> Box { + pub fn create(bus: &BusHandle) -> Box { Box::new(Self { bus: bus.clone() }) } } @@ -170,14 +170,14 @@ impl E3Extension for PublicKeyAggregatorExtension { let Some(fhe) = ctx.get_dependency(FHE_KEY) else { self.bus.err( - EnclaveErrorType::PublickeyAggregation, + EType::PublickeyAggregation, anyhow!(ERROR_PUBKEY_FHE_MISSING), ); return; }; let Some(ref meta) = ctx.get_dependency(META_KEY) else { self.bus.err( - EnclaveErrorType::PublickeyAggregation, + EType::PublickeyAggregation, anyhow!(ERROR_PUBKEY_META_MISSING), ); return; @@ -211,7 +211,7 @@ impl E3Extension for PublicKeyAggregatorExtension { // Get deps let Some(fhe) = ctx.get_dependency(FHE_KEY) else { self.bus.err( - EnclaveErrorType::PublickeyAggregation, + EType::PublickeyAggregation, anyhow!(ERROR_PUBKEY_FHE_MISSING), ); @@ -233,7 +233,7 @@ impl E3Extension for PublicKeyAggregatorExtension { fn create_publickey_aggregator( fhe: Arc, - bus: BusHandle, + bus: BusHandle, e3_id: E3id, sync_state: Persistable, ) -> Recipient { @@ -247,14 +247,14 @@ fn create_publickey_aggregator( } pub struct ThresholdPlaintextAggregatorExtension { - bus: BusHandle, + bus: BusHandle, sortition: Addr, multithread: Addr, } impl ThresholdPlaintextAggregatorExtension { pub fn create( - bus: &BusHandle, + bus: &BusHandle, sortition: &Addr, multithread: &Addr, ) -> Box { @@ -278,7 +278,7 @@ impl E3Extension for ThresholdPlaintextAggregatorExtension { let Some(ref meta) = ctx.get_dependency(META_KEY) else { self.bus.err( - EnclaveErrorType::PlaintextAggregation, + EType::PlaintextAggregation, anyhow!(ERROR_TRBFV_PLAINTEXT_META_MISSING), ); return; diff --git a/crates/aggregator/src/plaintext_aggregator.rs b/crates/aggregator/src/plaintext_aggregator.rs index afc416255b..f78b85508d 100644 --- a/crates/aggregator/src/plaintext_aggregator.rs +++ b/crates/aggregator/src/plaintext_aggregator.rs @@ -72,7 +72,7 @@ struct ComputeAggregate { #[deprecated = "To be replaced by ThresholdPlaintextAggregator"] pub struct PlaintextAggregator { fhe: Arc, - bus: BusHandle, + bus: BusHandle, sortition: Addr, e3_id: E3id, state: Persistable, @@ -80,7 +80,7 @@ pub struct PlaintextAggregator { pub struct PlaintextAggregatorParams { pub fhe: Arc, - pub bus: BusHandle, + pub bus: BusHandle, pub sortition: Addr, pub e3_id: E3id, } @@ -239,7 +239,7 @@ impl Handler for PlaintextAggregator { e3_id: self.e3_id.clone(), }; - self.bus.publish(event); + self.bus.publish(event)?; Ok(()) } diff --git a/crates/aggregator/src/publickey_aggregator.rs b/crates/aggregator/src/publickey_aggregator.rs index 4c66dbd129..1d022b1fec 100644 --- a/crates/aggregator/src/publickey_aggregator.rs +++ b/crates/aggregator/src/publickey_aggregator.rs @@ -12,7 +12,6 @@ use e3_events::{ PublicKeyAggregated, Seed, }; use e3_fhe::{Fhe, GetAggregatePublicKey}; -use e3_sortition::Sortition; use e3_utils::ArcBytes; use std::sync::Arc; use tracing::{error, info}; @@ -56,14 +55,14 @@ struct ComputeAggregate { pub struct PublicKeyAggregator { fhe: Arc, - bus: BusHandle, + bus: BusHandle, e3_id: E3id, state: Persistable, } pub struct PublicKeyAggregatorParams { pub fhe: Arc, - pub bus: BusHandle, + pub bus: BusHandle, pub e3_id: E3id, } @@ -176,7 +175,7 @@ impl Handler for PublicKeyAggregator { impl Handler for PublicKeyAggregator { type Result = Result<()>; - fn handle(&mut self, msg: ComputeAggregate, ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: ComputeAggregate, _: &mut Self::Context) -> Self::Result { info!("Computing Aggregate PublicKey..."); let pubkey = self.fhe.get_aggregate_public_key(GetAggregatePublicKey { keyshares: msg.keyshares, @@ -198,7 +197,7 @@ impl Handler for PublicKeyAggregator { e3_id: msg.e3_id, nodes, }; - self.bus.publish(event); + self.bus.publish(event)?; } Ok(()) } diff --git a/crates/aggregator/src/threshold_plaintext_aggregator.rs b/crates/aggregator/src/threshold_plaintext_aggregator.rs index 4471338185..1c5ee9e8b7 100644 --- a/crates/aggregator/src/threshold_plaintext_aggregator.rs +++ b/crates/aggregator/src/threshold_plaintext_aggregator.rs @@ -122,7 +122,7 @@ pub struct ComputeAggregate { pub struct ThresholdPlaintextAggregator { multithread: Addr, - bus: BusHandle, + bus: BusHandle, sortition: Addr, e3_id: E3id, state: Persistable, @@ -130,7 +130,7 @@ pub struct ThresholdPlaintextAggregator { pub struct ThresholdPlaintextAggregatorParams { pub multithread: Addr, - pub bus: BusHandle, + pub bus: BusHandle, pub sortition: Addr, pub e3_id: E3id, } @@ -342,7 +342,7 @@ impl Handler for ThresholdPlaintextAggregator { }; info!("Dispatching plaintext event {:?}", event); - act.bus.publish(event); + act.bus.publish(event)?; Ok(()) }), ) diff --git a/crates/ciphernode-builder/src/ciphernode.rs b/crates/ciphernode-builder/src/ciphernode.rs index 6bf9775d47..2e67d36187 100644 --- a/crates/ciphernode-builder/src/ciphernode.rs +++ b/crates/ciphernode-builder/src/ciphernode.rs @@ -6,13 +6,13 @@ use actix::Addr; use e3_data::{DataStore, InMemStore, StoreAddr}; -use e3_events::{BusHandle, EnclaveEvent, EventBus, HistoryCollector}; +use e3_events::{BusHandle, EnclaveEvent, HistoryCollector}; #[derive(Clone, Debug)] pub struct CiphernodeHandle { pub address: String, pub store: DataStore, - pub bus: BusHandle, + pub bus: BusHandle, pub history: Option>>, pub errors: Option>>, } @@ -21,7 +21,7 @@ impl CiphernodeHandle { pub fn new( address: String, store: DataStore, - bus: BusHandle, + bus: BusHandle, history: Option>>, errors: Option>>, ) -> Self { @@ -34,8 +34,8 @@ impl CiphernodeHandle { } } - pub fn bus(&self) -> Addr> { - self.bus.bus() + pub fn bus(&self) -> &BusHandle { + &self.bus } pub fn history(&self) -> Option>> { diff --git a/crates/ciphernode-builder/src/ciphernode_builder.rs b/crates/ciphernode-builder/src/ciphernode_builder.rs index 861b567a31..0a88d6793a 100644 --- a/crates/ciphernode-builder/src/ciphernode_builder.rs +++ b/crates/ciphernode-builder/src/ciphernode_builder.rs @@ -16,7 +16,7 @@ use e3_aggregator::ext::{ use e3_config::chain_config::ChainConfig; use e3_crypto::Cipher; use e3_data::{DataStore, InMemStore, Repositories, RepositoriesFactory}; -use e3_events::{EnclaveEvent, EventBus, EventBusConfig}; +use e3_events::{BusHandle, EnclaveEvent, EventBus, EventBusConfig}; use e3_evm::{ helpers::{ load_signer_from_repository, ConcreteReadProvider, ConcreteWriteProvider, EthProvider, @@ -299,7 +299,7 @@ impl CiphernodeBuilder { }; // Get a handle from the event bus - let bus = local_bus.into(); + let bus = BusHandle::new_from_consumer(local_bus); let addr = if let Some(addr) = self.address.clone() { info!("Using eth address = {}", addr); diff --git a/crates/data/src/sled_store.rs b/crates/data/src/sled_store.rs index 87f3d87c79..185b30f4f5 100644 --- a/crates/data/src/sled_store.rs +++ b/crates/data/src/sled_store.rs @@ -7,9 +7,7 @@ use crate::{Get, Insert, InsertSync, Remove}; use actix::{Actor, ActorContext, Addr, Handler}; use anyhow::{Context, Result}; -use e3_events::{ - get_enclave_bus_handle, prelude::*, BusHandle, EnclaveErrorType, EnclaveEvent, EnclaveEventData, -}; +use e3_events::{prelude::*, BusHandle, EType, EnclaveEvent, EnclaveEventData}; use once_cell::sync::Lazy; use sled::Db; use std::{ @@ -21,7 +19,7 @@ use tracing::{error, info}; pub struct SledStore { db: Option, - bus: BusHandle, + bus: BusHandle, // Only used for Shutdown } impl Actor for SledStore { @@ -29,7 +27,7 @@ impl Actor for SledStore { } impl SledStore { - pub fn new(bus: &BusHandle, path: &PathBuf) -> Result> { + pub fn new(bus: &BusHandle, path: &PathBuf) -> Result> { info!("Starting SledStore with {:?}", path); let db = SledDb::new(PathBuf::from(path))?; @@ -43,13 +41,6 @@ impl SledStore { Ok(store) } - - pub fn from_db(db: SledDb) -> Result { - Ok(Self { - db: Some(db), - bus: get_enclave_bus_handle(), - }) - } } impl Handler for SledStore { @@ -58,7 +49,7 @@ impl Handler for SledStore { fn handle(&mut self, event: Insert, _: &mut Self::Context) -> Self::Result { if let Some(ref mut db) = &mut self.db { match db.insert(event) { - Err(err) => self.bus.err(EnclaveErrorType::Data, err), + Err(err) => self.bus.err(EType::Data, err), _ => (), } } @@ -83,7 +74,7 @@ impl Handler for SledStore { fn handle(&mut self, event: Remove, _: &mut Self::Context) -> Self::Result { if let Some(ref mut db) = &mut self.db { match db.remove(event) { - Err(err) => self.bus.err(EnclaveErrorType::Data, err), + Err(err) => self.bus.err(EType::Data, err), _ => (), } } @@ -98,7 +89,7 @@ impl Handler for SledStore { return match db.get(event) { Ok(v) => v, Err(err) => { - self.bus.err(EnclaveErrorType::Data, err); + self.bus.err(EType::Data, err); None } }; @@ -108,7 +99,6 @@ impl Handler for SledStore { } } } - impl Handler for SledStore { type Result = (); fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { diff --git a/crates/entrypoint/src/helpers/datastore.rs b/crates/entrypoint/src/helpers/datastore.rs index 39fc5129d0..b186c64cf0 100644 --- a/crates/entrypoint/src/helpers/datastore.rs +++ b/crates/entrypoint/src/helpers/datastore.rs @@ -11,9 +11,9 @@ use anyhow::Result; use e3_config::AppConfig; use e3_data::{DataStore, InMemStore, SledDb, SledStore}; use e3_data::{Repositories, RepositoriesFactory}; -use e3_events::{get_enclave_bus_handle, BusHandle, EnclaveEvent}; +use e3_events::{get_enclave_bus_handle, BusHandle}; -pub fn get_sled_store(bus: &BusHandle, db_file: &PathBuf) -> Result { +pub fn get_sled_store(bus: &BusHandle, db_file: &PathBuf) -> Result { Ok((&SledStore::new(bus, db_file)?).into()) } @@ -21,7 +21,7 @@ pub fn get_in_mem_store() -> DataStore { (&InMemStore::new(true).start()).into() } -pub fn setup_datastore(config: &AppConfig, bus: &BusHandle) -> Result { +pub fn setup_datastore(config: &AppConfig, bus: &BusHandle) -> Result { let store: DataStore = if !config.use_in_mem_store() { get_sled_store(&bus, &config.db_file())? } else { diff --git a/crates/entrypoint/src/helpers/shutdown.rs b/crates/entrypoint/src/helpers/shutdown.rs index ae4e6123b2..1ff0d6748d 100644 --- a/crates/entrypoint/src/helpers/shutdown.rs +++ b/crates/entrypoint/src/helpers/shutdown.rs @@ -5,7 +5,7 @@ // or FITNESS FOR A PARTICULAR PURPOSE. use anyhow::Result; -use e3_events::{prelude::*, BusHandle, EnclaveEvent, Shutdown}; +use e3_events::{prelude::*, BusHandle, Shutdown}; use std::time::Duration; use tokio::{ select, @@ -14,7 +14,7 @@ use tokio::{ }; use tracing::{error, info}; -pub async fn listen_for_shutdown(bus: BusHandle, mut handle: JoinHandle>) { +pub async fn listen_for_shutdown(bus: BusHandle, mut handle: JoinHandle>) { let mut sigterm = signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal stream"); select! { @@ -22,7 +22,10 @@ pub async fn listen_for_shutdown(bus: BusHandle, mut handle: JoinH info!("SIGTERM received, initiating graceful shutdown..."); // Stop the actor system - bus.publish(Shutdown); + match bus.publish(Shutdown){ + Ok(_) => (), + Err(e) => error!("Shutdown failed to publish! {e}") + } // Wait for all events to propagate tokio::time::sleep(Duration::from_secs(2)).await; diff --git a/crates/entrypoint/src/start/aggregator_start.rs b/crates/entrypoint/src/start/aggregator_start.rs index 3789ba2f1b..dfbf9beeb1 100644 --- a/crates/entrypoint/src/start/aggregator_start.rs +++ b/crates/entrypoint/src/start/aggregator_start.rs @@ -9,7 +9,7 @@ use e3_ciphernode_builder::CiphernodeBuilder; use e3_config::AppConfig; use e3_crypto::Cipher; use e3_data::RepositoriesFactory; -use e3_events::{get_enclave_bus_handle, BusHandle, EnclaveEvent}; +use e3_events::{get_enclave_bus_handle, BusHandle}; use e3_net::{NetEventTranslator, NetRepositoryFactory}; use e3_test_helpers::{PlaintextWriter, PublicKeyWriter}; use rand::SeedableRng; @@ -27,7 +27,7 @@ pub async fn execute( pubkey_write_path: Option, plaintext_write_path: Option, experimental_trbfv: bool, -) -> Result<(BusHandle, JoinHandle>, String)> { +) -> Result<(BusHandle, JoinHandle>, String)> { let bus = get_enclave_bus_handle(); let rng = Arc::new(Mutex::new(ChaCha20Rng::from_rng(OsRng)?)); let store = setup_datastore(config, &bus)?; @@ -35,7 +35,7 @@ pub async fn execute( let cipher = Arc::new(Cipher::from_file(config.key_file()).await?); let mut builder = CiphernodeBuilder::new(rng.clone(), cipher.clone()) - .with_source_bus(&bus.bus()) + .with_source_bus(bus.consumer()) .with_datastore(store) .with_chains(&config.chains()) .with_sortition_score() diff --git a/crates/entrypoint/src/start/start.rs b/crates/entrypoint/src/start/start.rs index 9ba3e3ab35..87bd527969 100644 --- a/crates/entrypoint/src/start/start.rs +++ b/crates/entrypoint/src/start/start.rs @@ -10,8 +10,8 @@ use e3_ciphernode_builder::CiphernodeBuilder; use e3_config::AppConfig; use e3_crypto::Cipher; use e3_data::RepositoriesFactory; +use e3_events::get_enclave_bus_handle; use e3_events::BusHandle; -use e3_events::{get_enclave_bus_handle, EnclaveEvent}; use e3_net::{NetEventTranslator, NetRepositoryFactory}; use rand::SeedableRng; use rand_chacha::rand_core::OsRng; @@ -26,7 +26,7 @@ pub async fn execute( config: &AppConfig, address: Address, experimental_trbfv: bool, -) -> Result<(BusHandle, JoinHandle>, String)> { +) -> Result<(BusHandle, JoinHandle>, String)> { let rng = Arc::new(Mutex::new(rand_chacha::ChaCha20Rng::from_rng(OsRng)?)); let bus = get_enclave_bus_handle(); @@ -36,7 +36,7 @@ pub async fn execute( let mut builder = CiphernodeBuilder::new(rng.clone(), cipher.clone()) .with_address(&address.to_string()) - .with_source_bus(&bus.bus()) + .with_source_bus(bus.consumer()) .with_datastore(store) .with_sortition_score() .with_chains(&config.chains()) diff --git a/crates/events/Cargo.toml b/crates/events/Cargo.toml index bbdb604ae7..aa58f52e17 100644 --- a/crates/events/Cargo.toml +++ b/crates/events/Cargo.toml @@ -19,6 +19,7 @@ chrono = { workspace = true } derivative = { workspace = true } futures-util = { workspace = true } once_cell = { workspace = true } +rand = { workspace = true } serde = { workspace = true } sha2 = { workspace = true } strum = { workspace = true } @@ -29,5 +30,10 @@ e3-crypto = { workspace = true } e3-trbfv = { workspace = true } e3-utils = { workspace = true } +[features] +test-helpers = [] # ensure test-helpers is available for integration tests + [dev-dependencies] proptest = { workspace = true } +e3-events = { workspace = true, features = ["test-helpers"] } + diff --git a/crates/events/src/bus_handle.rs b/crates/events/src/bus_handle.rs index b0e2e3135c..44cacd6c9f 100644 --- a/crates/events/src/bus_handle.rs +++ b/crates/events/src/bus_handle.rs @@ -4,84 +4,294 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. -use actix::{Addr, Recipient}; +use std::sync::Arc; + +use actix::{Actor, Addr, Recipient}; +use anyhow::Result; +use derivative::Derivative; +use tracing::error; use crate::{ + hlc::Hlc, + sequencer::Sequencer, traits::{ - CompositeEvent, ErrorDispatcher, ErrorFactory, Event, EventConstructorWithTimestamp, - EventFactory, EventPublisher, EventSubscriber, + ErrorDispatcher, ErrorFactory, EventConstructorWithTimestamp, EventFactory, EventPublisher, + EventSubscriber, }, - ErrorEvent, EventBus, Subscribe, + EType, EnclaveEvent, EnclaveEventData, ErrorEvent, EventBus, HistoryCollector, Sequenced, + Subscribe, Unsequenced, }; -#[derive(Clone, Debug)] -pub struct BusHandle { - bus: Addr>, +#[derive(Clone, Derivative)] +#[derivative(Debug)] +pub struct BusHandle { + consumer: Addr>>, + producer: Addr, + #[derivative(Debug = "ignore")] + hlc: Arc, } -impl BusHandle { - pub fn new(bus: Addr>) -> Self { - Self { bus } +impl BusHandle { + pub fn new_from_consumer(consumer: Addr>>) -> Self { + let producer = Sequencer::new(&consumer).start(); + let hlc = Hlc::default(); + Self::new(consumer, producer, hlc) + } + + pub fn new( + consumer: Addr>>, + producer: Addr, + hlc: Hlc, + ) -> Self { + Self { + consumer, + producer, + hlc: Arc::new(hlc), + } } - pub fn bus(&self) -> Addr> { - self.bus.clone() + pub fn history(&self) -> Addr>> { + EventBus::>::history(&self.consumer) + } + + pub fn producer(&self) -> &Addr { + &self.producer + } + + pub fn consumer(&self) -> &Addr>> { + &self.consumer } } -impl EventPublisher for BusHandle { - fn publish(&self, data: impl Into) { - let evt = self.event_from(data); - self.bus.do_send(evt); +impl EventPublisher> for BusHandle { + fn publish(&self, data: impl Into) -> Result<()> { + let evt = self.event_from(data)?; + self.producer.do_send(evt); + Ok(()) } - fn publish_from_remote(&self, data: impl Into, ts: u128) { - let evt = self.event_from_remote_source(data, ts); - self.bus.do_send(evt) + fn publish_from_remote(&self, data: impl Into, ts: u128) -> Result<()> { + let evt = self.event_from_remote_source(data, ts)?; + self.producer.do_send(evt); + Ok(()) } - fn naked_dispatch(&self, event: E) { - self.bus.do_send(event); + fn naked_dispatch(&self, event: EnclaveEvent) { + self.producer.do_send(event); } } -impl ErrorDispatcher for BusHandle -where - E: CompositeEvent, -{ - fn err(&self, err_type: E::ErrType, error: impl Into) { - let evt = self.event_from_error(err_type, error); - self.bus.do_send(evt); +impl ErrorDispatcher> for BusHandle { + fn err(&self, err_type: EType, error: impl Into) { + match self.event_from_error(err_type, error) { + Ok(evt) => self.producer.do_send(evt), + Err(e) => error!("{e}"), + } } } -impl EventFactory for BusHandle { - fn event_from(&self, data: impl Into) -> E { - // TODO: add self.hcl.tick() - E::new_with_timestamp(data.into(), 0) +impl EventFactory> for BusHandle { + fn event_from(&self, data: impl Into) -> Result> { + let ts = self.hlc.tick()?; + Ok(EnclaveEvent::::new_with_timestamp( + data.into(), + ts.into(), + )) } - fn event_from_remote_source(&self, data: impl Into, ts: u128) -> E { - // TODO: add self.hcl.receive(ts) - E::new_with_timestamp(data.into(), ts) + fn event_from_remote_source( + &self, + data: impl Into, + ts: u128, + ) -> Result> { + let ts = self.hlc.receive(&ts.into())?; + Ok(EnclaveEvent::::new_with_timestamp( + data.into(), + ts.into(), + )) } } -impl ErrorFactory for BusHandle { - fn event_from_error(&self, err_type: E::ErrType, error: impl Into) -> E { - E::from_error(err_type, error) +impl ErrorFactory> for BusHandle { + fn event_from_error( + &self, + err_type: EType, + error: impl Into, + ) -> Result> { + let ts = self.hlc.tick()?; + EnclaveEvent::::from_error(err_type, error, ts.into()) } } -impl EventSubscriber for BusHandle { - fn subscribe(&self, event_type: &str, recipient: Recipient) { - self.bus.do_send(Subscribe::new(event_type, recipient)) +impl EventSubscriber> for BusHandle { + fn subscribe(&self, event_type: &str, recipient: Recipient>) { + self.consumer.do_send(Subscribe::new(event_type, recipient)) } - fn subscribe_all(&self, event_types: &[&str], recipient: Recipient) { + fn subscribe_all(&self, event_types: &[&str], recipient: Recipient>) { for event_type in event_types.into_iter() { - self.bus + self.consumer .do_send(Subscribe::new(*event_type, recipient.clone())); } } } + +impl Into for Addr> { + fn into(self) -> BusHandle { + BusHandle::new_from_consumer(self) + } +} + +impl Into for &Addr> { + fn into(self) -> BusHandle { + BusHandle::new_from_consumer(self.clone()) + } +} + +#[cfg(test)] +mod tests { + use std::time::{Duration, SystemTime, UNIX_EPOCH}; + + use crate::{ + hlc::Hlc, prelude::*, sequencer::Sequencer, BusHandle, EnclaveEvent, EnclaveEventData, + EventBus, TestEvent, + }; + use actix::{Actor, Handler, Message}; + use tokio::time::sleep; + fn now_micros() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_micros() as u64 + } + + #[actix::test] + async fn test_hlc_events() -> anyhow::Result<()> { + #[derive(Message)] + #[rtype("Vec")] + struct GetEventsOrdered; + + // Setup forwarder + struct Forwarder { + dest: BusHandle, + } + impl Actor for Forwarder { + type Context = actix::Context; + } + + impl Handler for Forwarder { + type Result = (); + fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result { + let ts = msg.get_ts(); + self.dest.publish_from_remote(msg.into_data(), ts).unwrap() + } + } + + // Setup saver + struct Saver { + events: Vec, + } + + impl Actor for Saver { + type Context = actix::Context; + } + + impl Handler for Saver { + type Result = (); + fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result { + self.events.push(msg); + } + } + + impl Handler for Saver { + type Result = Vec; + fn handle(&mut self, _: GetEventsOrdered, _: &mut Self::Context) -> Self::Result { + self.events.clone() + } + } + + // 1. setup up two separate busses with out of sync clocks A and B. B should be 30 seconds + // faster than A. + let consumer_a = EventBus::::default().start(); + let producer_a = Sequencer::new(&consumer_a).start(); + let clock_a = Hlc::new(1).with_clock(move || now_micros().saturating_sub(30_000_000)); // Late + let bus_a = BusHandle::new(consumer_a, producer_a, clock_a); + + let consumer_b = EventBus::::default().start(); + let producer_b = Sequencer::new(&consumer_b).start(); + let clock_b = Hlc::new(2); // in sync + let bus_b = BusHandle::new(consumer_b, producer_b, clock_b); + + let consumer_c = EventBus::::default().start(); + let producer_c = Sequencer::new(&consumer_c).start(); + let clock_c = Hlc::new(3); // in sync + let bus_c = BusHandle::new(consumer_c, producer_c, clock_c); + + let forwarder = Forwarder { + dest: bus_c.clone(), + } + .start(); + + // pipe all bus_a and bus_b events to bus_c + bus_a.subscribe("*", forwarder.clone().into()); + bus_b.subscribe("*", forwarder.into()); + + // Create and subscribe the Saver to bus_c + let saver = Saver { events: vec![] }.start(); + bus_c.subscribe("*", saver.clone().into()); + + // Publish events in causal order across buses + bus_a.publish(TestEvent::new("one", 1))?; + sleep(Duration::from_millis(5)).await; // next tick + bus_b.publish(TestEvent::new("two", 2))?; + sleep(Duration::from_millis(5)).await; // next tick + bus_a.publish(TestEvent::new("three", 3))?; + sleep(Duration::from_millis(5)).await; // next tick + bus_b.publish(TestEvent::new("four", 4))?; + sleep(Duration::from_millis(50)).await; // next tick + + // Get events + let events = saver.send(GetEventsOrdered).await?; + + // Sort by HLC timestamp + let mut sorted_events = events.clone(); + sorted_events.sort_by_key(|e| e.get_ts()); + + // Extract the payloads/names in HLC-sorted order + let ordered_names: Vec<_> = sorted_events + .iter() + .filter_map(|e| match e.get_data() { + EnclaveEventData::TestEvent(e) => Some(e.msg.clone()), + _ => None, + }) + .collect(); + + // ASSERTION 1: Causal order is preserved despite clock drift + assert_eq!( + ordered_names, + vec!["one", "two", "three", "four"], + "HLC should preserve causal ordering despite 30s clock drift on bus_a" + ); + + // ASSERTION 2: All timestamps are unique (HLC guarantee) + let timestamps: Vec<_> = sorted_events.iter().map(|e| e.get_ts()).collect(); + let unique_timestamps: std::collections::HashSet<_> = timestamps.iter().collect(); + assert_eq!( + timestamps.len(), + unique_timestamps.len(), + "All HLC timestamps should be unique" + ); + + // ASSERTION 3: Timestamps are strictly monotonically increasing when sorted + for window in timestamps.windows(2) { + assert!( + window[0] < window[1], + "HLC timestamps should be strictly increasing: {:?} should be < {:?}", + window[0], + window[1] + ); + } + + Ok(()) + } +} diff --git a/crates/events/src/enclave_event/enclave_error.rs b/crates/events/src/enclave_event/enclave_error.rs index 1487cc7836..e748298cd5 100644 --- a/crates/events/src/enclave_event/enclave_error.rs +++ b/crates/events/src/enclave_event/enclave_error.rs @@ -8,14 +8,16 @@ use actix::Message; use serde::{Deserialize, Serialize}; use std::fmt::{self, Display}; +use crate::{BusHandle, ErrorDispatcher}; + pub trait FromError { - fn from_error(err_type: EnclaveErrorType, error: impl Into) -> Self; + fn from_error(err_type: EType, error: impl Into) -> Self; } #[derive(Message, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] #[rtype(result = "()")] pub struct EnclaveError { - pub err_type: EnclaveErrorType, + pub err_type: EType, pub message: String, } @@ -26,19 +28,21 @@ impl Display for EnclaveError { } #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] -pub enum EnclaveErrorType { +pub enum EType { Evm, KeyGeneration, PublickeyAggregation, IO, + Net, PlaintextAggregation, Decryption, Sortition, Data, + Event, } impl EnclaveError { - pub fn new(err_type: EnclaveErrorType, message: impl Into) -> Self { + pub fn new(err_type: EType, message: impl Into) -> Self { Self { err_type, message: message.into().to_string(), @@ -47,10 +51,22 @@ impl EnclaveError { } impl FromError for EnclaveError { - fn from_error(err_type: EnclaveErrorType, error: impl Into) -> Self { + fn from_error(err_type: EType, error: impl Into) -> Self { Self { err_type, message: error.into(), } } } + +/// Function to run a closure that returns a result. If result is an Err variant it is trapped and +/// sent to the bus as an ErrorEvent +pub fn trap(err_type: EType, bus: &BusHandle, runner: F) +where + F: FnOnce() -> anyhow::Result<()>, +{ + match runner() { + Ok(_) => (), + Err(e) => bus.err(err_type, e), + } +} diff --git a/crates/events/src/enclave_event/mod.rs b/crates/events/src/enclave_event/mod.rs index 0048b85890..0f7f2b07ad 100644 --- a/crates/events/src/enclave_event/mod.rs +++ b/crates/events/src/enclave_event/mod.rs @@ -66,7 +66,7 @@ use crate::{ E3id, EventId, }; use actix::Message; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{ fmt::{self}, hash::Hash, @@ -123,14 +123,46 @@ impl EnclaveEventData { } } +pub trait SeqState: Clone + std::fmt::Debug + 'static { + type Seq: Unpin + + Sync + + Send + + Serialize + + DeserializeOwned + + Clone + + PartialEq + + Eq + + Hash + + std::fmt::Debug; +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct Unsequenced; + +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct Sequenced; + +impl SeqState for Unsequenced { + type Seq = (); +} + +impl SeqState for Sequenced { + type Seq = u64; +} + #[derive(Message, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] #[rtype(result = "()")] -pub struct EnclaveEvent { +pub struct EnclaveEvent { id: EventId, payload: EnclaveEventData, + seq: S::Seq, + ts: u128, } -impl EnclaveEvent { +impl EnclaveEvent +where + S: SeqState, +{ pub fn to_bytes(&self) -> Result, bincode::Error> { bincode::serialize(self) } @@ -140,15 +172,57 @@ impl EnclaveEvent { } pub fn get_id(&self) -> EventId { - self.clone().into() + self.into() + } + + pub fn get_ts(&self) -> u128 { + self.ts + } + + pub fn split(self) -> (EnclaveEventData, u128) { + (self.payload, self.ts) } } -impl Event for EnclaveEvent { +impl EnclaveEvent { + pub fn get_seq(&self) -> u64 { + self.seq + } + + pub fn clone_unsequenced(&self) -> EnclaveEvent { + let ts = self.get_ts(); + let data = self.clone().into_data(); + EnclaveEvent::new_with_timestamp(data, ts) + } +} + +impl EnclaveEvent { + pub fn into_sequenced(self, seq: u64) -> EnclaveEvent { + EnclaveEvent:: { + id: self.id, + payload: self.payload, + ts: self.ts, + seq, + } + } +} + +#[cfg(feature = "test-helpers")] +impl EnclaveEvent { + /// test-helpers only utility function to create a new unsequenced event + pub fn new_stored_event(data: EnclaveEventData, time: u128, seq: u64) -> Self { + EnclaveEvent::::new_with_timestamp(data, time).into_sequenced(seq) + } + + /// test-helpers only utility function to remove time information from an event + pub fn strip_ts(&self) -> EnclaveEvent { + EnclaveEvent::new_stored_event(self.get_data().clone(), 0, self.get_seq()) + } +} + +impl Event for EnclaveEvent { type Id = EventId; type Data = EnclaveEventData; - // type FromError = anyhow::Error; - // type ErrType = EnclaveErrorType; fn event_type(&self) -> String { self.payload.event_type() @@ -161,32 +235,45 @@ impl Event for EnclaveEvent { fn get_data(&self) -> &EnclaveEventData { &self.payload } + fn into_data(self) -> EnclaveEventData { self.payload } } -impl ErrorEvent for EnclaveEvent { - type ErrType = EnclaveErrorType; +impl ErrorEvent for EnclaveEvent { + type ErrType = EType; type FromError = anyhow::Error; - fn from_error(err_type: Self::ErrType, msg: impl Into) -> Self { + fn from_error( + err_type: Self::ErrType, + msg: impl Into, + ts: u128, + ) -> anyhow::Result { let payload = EnclaveError::new(err_type, msg); let id = EventId::hash(&payload); - EnclaveEvent { + Ok(EnclaveEvent { payload: payload.into(), id, - } + seq: (), + ts, + }) } } -impl From for EventId { - fn from(value: EnclaveEvent) -> Self { +impl From> for EventId { + fn from(value: EnclaveEvent) -> Self { value.id } } -impl EnclaveEvent { +impl From<&EnclaveEvent> for EventId { + fn from(value: &EnclaveEvent) -> Self { + value.id.clone() + } +} + +impl EnclaveEvent { pub fn get_e3_id(&self) -> Option { match self.payload { EnclaveEventData::KeyshareCreated(ref data) => Some(data.e3_id.clone()), @@ -238,16 +325,16 @@ impl_into_event_data!( ThresholdShareCreated ); -impl TryFrom<&EnclaveEvent> for EnclaveError { +impl TryFrom<&EnclaveEvent> for EnclaveError { type Error = anyhow::Error; fn try_from(value: &EnclaveEvent) -> Result { value.clone().try_into() } } -impl TryFrom for EnclaveError { +impl TryFrom> for EnclaveError { type Error = anyhow::Error; - fn try_from(value: EnclaveEvent) -> Result { + fn try_from(value: EnclaveEvent) -> Result { if let EnclaveEventData::EnclaveError(data) = value.payload.clone() { Ok(data) } else { @@ -256,17 +343,21 @@ impl TryFrom for EnclaveError { } } -impl fmt::Display for EnclaveEvent { +impl fmt::Display for EnclaveEvent { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(&format!("{:?}", self)) } } -impl EventConstructorWithTimestamp for EnclaveEvent { - fn new_with_timestamp(data: Self::Data, _ts: u128) -> Self { +impl EventConstructorWithTimestamp for EnclaveEvent { + fn new_with_timestamp(data: Self::Data, ts: u128) -> Self { let payload = data.into(); let id = EventId::hash(&payload); - // hcl.receive(remote_ts)?; - EnclaveEvent { id, payload } + EnclaveEvent { + id, + payload, + seq: (), + ts, + } } } diff --git a/crates/events/src/enclave_event/test_event.rs b/crates/events/src/enclave_event/test_event.rs index a4cbe09a93..ab26068958 100644 --- a/crates/events/src/enclave_event/test_event.rs +++ b/crates/events/src/enclave_event/test_event.rs @@ -14,6 +14,15 @@ pub struct TestEvent { pub entropy: u64, } +impl TestEvent { + pub fn new(msg: &str, entropy: u64) -> Self { + Self { + msg: msg.to_owned(), + entropy, + } + } +} + #[cfg(test)] use std::fmt::{self, Display}; diff --git a/crates/events/src/eventbus.rs b/crates/events/src/eventbus.rs index 883663d9fb..81d748b7ab 100644 --- a/crates/events/src/eventbus.rs +++ b/crates/events/src/eventbus.rs @@ -5,7 +5,7 @@ // or FITNESS FOR A PARTICULAR PURPOSE. use crate::traits::{ErrorEvent, Event}; -use crate::{prelude::*, BusHandle, CompositeEvent}; +use crate::{prelude::*, BusHandle, EnclaveEvent, Sequenced}; use actix::prelude::*; use bloom::{BloomFilter, ASMS}; use std::collections::{HashMap, VecDeque}; @@ -70,7 +70,7 @@ impl EventBus { addr } - pub fn error(source: &Addr>) -> Addr> { + pub fn error(source: &Addr>) -> Addr> { let addr = HistoryCollector::::new().start(); source.do_send(Subscribe::new("EnclaveError", addr.clone().recipient())); addr @@ -133,12 +133,6 @@ impl Handler for EventBus { } } -impl From>> for BusHandle { - fn from(value: Addr>) -> Self { - BusHandle::new(value) - } -} - ////////////////////////////////////////////////////////////////////////////// // Subscribe Message ////////////////////////////////////////////////////////////////////////////// @@ -417,10 +411,10 @@ impl Handler for HistoryCollector { ////////////////////////////////////////////////////////////////////////////// /// Function to help with testing when we want to maintain a vec of events -pub fn new_event_bus_with_history() -> (BusHandle, Addr>) +pub fn new_event_bus_with_history() -> (BusHandle, Addr>>) { - let bus: BusHandle = EventBus::::default().start().into(); - + let consumer = EventBus::>::default().start(); + let bus: BusHandle = BusHandle::new_from_consumer(consumer); let history = HistoryCollector::new().start(); bus.subscribe("*", history.clone().recipient()); (bus, history) diff --git a/crates/events/src/eventbus_factory.rs b/crates/events/src/eventbus_factory.rs index f7312d6f71..f251d86622 100644 --- a/crates/events/src/eventbus_factory.rs +++ b/crates/events/src/eventbus_factory.rs @@ -60,6 +60,7 @@ impl EventBusFactory { event_bus } + pub fn get_error_collector(&self) -> Addr> { let type_id = TypeId::of::(); let mut error_collector_cache = self.error_collector_cache.lock().unwrap(); @@ -92,6 +93,7 @@ pub fn get_error_collector() -> Addr> { EventBusFactory::instance().get_error_collector() } -pub fn get_enclave_bus_handle() -> BusHandle { - get_enclave_event_bus().into() +pub fn get_enclave_bus_handle() -> BusHandle { + let bus = get_enclave_event_bus(); + BusHandle::new_from_consumer(bus) } diff --git a/crates/events/src/hlc.rs b/crates/events/src/hlc.rs index 4cc36f3239..7a8cb3c11a 100644 --- a/crates/events/src/hlc.rs +++ b/crates/events/src/hlc.rs @@ -4,6 +4,8 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. +use rand::Rng; +use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::{Arc, Mutex}; use std::time::{SystemTime, UNIX_EPOCH}; use thiserror::Error; @@ -173,6 +175,13 @@ struct HlcInner { counter: u32, } +impl Default for Hlc { + fn default() -> Self { + let random_id: u32 = rand::thread_rng().gen(); + Self::new(random_id) + } +} + impl Hlc { const DEFAULT_MAX_DRIFT: u64 = 60_000_000; // 60 sec @@ -185,6 +194,13 @@ impl Hlc { } } + pub fn from_str(node: &str) -> Self { + let mut h = DefaultHasher::new(); + node.hash(&mut h); + let id: u64 = h.finish(); + Self::new(id as u32) + } + pub fn with_state(ts: u64, counter: u32, node: u32) -> Self { Self { inner: Mutex::new(HlcInner { ts, counter }), diff --git a/crates/events/src/lib.rs b/crates/events/src/lib.rs index 1f356b3a47..eac423f6e7 100644 --- a/crates/events/src/lib.rs +++ b/crates/events/src/lib.rs @@ -15,6 +15,7 @@ pub mod hlc; mod ordered_set; pub mod prelude; mod seed; +mod sequencer; mod traits; pub use bus_handle::*; diff --git a/crates/events/src/sequencer.rs b/crates/events/src/sequencer.rs new file mode 100644 index 0000000000..1a691e7f53 --- /dev/null +++ b/crates/events/src/sequencer.rs @@ -0,0 +1,74 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use actix::{Actor, Addr, Handler}; + +use crate::{EnclaveEvent, EventBus, Sequenced, Unsequenced}; + +pub struct Sequencer { + bus: Addr>>, + seq: u64, +} + +impl Sequencer { + pub fn new(bus: &Addr>>) -> Self { + Self { + bus: bus.clone(), + seq: 0, + } + } +} + +impl Actor for Sequencer { + type Context = actix::Context; +} + +impl Handler> for Sequencer { + type Result = (); + fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result { + // NOTE: FAKE SEQUENCER FOR NOW - JUST SET THE SEQUENCE NUMBER AND UPDATE + self.seq += 1; + self.bus.do_send(msg.into_sequenced(self.seq)) + } +} + +#[cfg(test)] +mod tests { + + use crate::{prelude::*, BusHandle, EnclaveEvent, EventBus, TakeEvents, TestEvent}; + use actix::Actor; + + #[actix::test] + async fn it_adds_seqence_numbers_to_events() -> anyhow::Result<()> { + let bus = BusHandle::new_from_consumer(EventBus::::default().start()); + let history = bus.history(); + + let event_data = vec![ + TestEvent::new("one", 1), + TestEvent::new("two", 2), + TestEvent::new("three", 3), + ]; + + for d in event_data.clone() { + bus.publish(d)?; + } + + let expected = event_data + .into_iter() + .map(|d| EnclaveEvent::new_stored_event(d.clone().into(), 0, d.entropy)) + .collect::>(); + let events = history.send(TakeEvents::new(3)).await?; + + assert_eq!( + events + .iter() + .map(EnclaveEvent::strip_ts) + .collect::>(), + expected + ); + Ok(()) + } +} diff --git a/crates/events/src/traits.rs b/crates/events/src/traits.rs index ba98fda9da..8f2c5fed47 100644 --- a/crates/events/src/traits.rs +++ b/crates/events/src/traits.rs @@ -5,6 +5,7 @@ // or FITNESS FOR A PARTICULAR PURPOSE. use actix::{Message, Recipient}; +use anyhow::Result; use std::fmt::Display; use std::hash::Hash; @@ -29,7 +30,11 @@ pub trait ErrorEvent: Event { type ErrType; type FromError; - fn from_error(err_type: Self::ErrType, error: impl Into) -> Self; + fn from_error( + err_type: Self::ErrType, + error: impl Into, + ts: u128, + ) -> Result; } /// An EventFactory creates events @@ -37,18 +42,18 @@ pub trait EventFactory { /// Create a new event from the given event data, apply a local HLC timestamp. /// /// This method should be used for events that have originated locally. - fn event_from(&self, data: impl Into) -> E; + fn event_from(&self, data: impl Into) -> Result; /// Create a new event from the given event data, apply the given remote HLC time to ensure correct /// event ordering. /// /// This method should be used for events that originated from remote sources. - fn event_from_remote_source(&self, data: impl Into, ts: u128) -> E; + fn event_from_remote_source(&self, data: impl Into, ts: u128) -> Result; } /// An ErrorFactory creates errors. pub trait ErrorFactory { /// Create an error event from the given error. - fn event_from_error(&self, err_type: E::ErrType, error: impl Into) -> E; + fn event_from_error(&self, err_type: E::ErrType, error: impl Into) -> Result; } /// An EventPublisher publishes events on it's internal EventBus @@ -57,12 +62,12 @@ pub trait EventPublisher { /// to the event bus. /// /// This method should be used for events that have originated locally. - fn publish(&self, data: impl Into); + fn publish(&self, data: impl Into) -> Result<()>; /// Create a new event from the given event data, apply the given remote HLC time to ensure correct /// event ordering and publish it. /// /// This method should be used for events that originated from remote sources. - fn publish_from_remote(&self, data: impl Into, ts: u128); + fn publish_from_remote(&self, data: impl Into, ts: u128) -> Result<()>; /// Dispatch the given event without applying any HLC transformation. fn naked_dispatch(&self, event: E); } @@ -87,6 +92,6 @@ pub trait EventConstructorWithTimestamp: Event + Sized { fn new_with_timestamp(data: Self::Data, ts: u128) -> Self; } -pub trait CompositeEvent: ErrorEvent + EventConstructorWithTimestamp {} +pub trait CompositeEvent: EventConstructorWithTimestamp {} -impl CompositeEvent for E where E: Sized + Event + ErrorEvent + EventConstructorWithTimestamp {} +impl CompositeEvent for E where E: Sized + Event + EventConstructorWithTimestamp {} diff --git a/crates/evm/src/bonding_registry_sol.rs b/crates/evm/src/bonding_registry_sol.rs index af199634ae..20652f3554 100644 --- a/crates/evm/src/bonding_registry_sol.rs +++ b/crates/evm/src/bonding_registry_sol.rs @@ -16,7 +16,7 @@ use alloy::{ }; use anyhow::Result; use e3_data::Repository; -use e3_events::{BusHandle, EnclaveEvent, EnclaveEventData}; +use e3_events::{BusHandle, EnclaveEventData}; use tracing::{error, info, trace}; sol!( @@ -145,7 +145,7 @@ pub struct BondingRegistrySolReader; impl BondingRegistrySolReader { pub async fn attach

( processor: &Recipient, - bus: &BusHandle, + bus: &BusHandle, provider: EthProvider

, contract_address: &str, repository: &Repository, @@ -179,7 +179,7 @@ pub struct BondingRegistrySol; impl BondingRegistrySol { pub async fn attach

( processor: &Recipient, - bus: &BusHandle, + bus: &BusHandle, provider: EthProvider

, contract_address: &str, repository: &Repository, diff --git a/crates/evm/src/ciphernode_registry_sol.rs b/crates/evm/src/ciphernode_registry_sol.rs index a6cf0918f4..d02192b0e2 100644 --- a/crates/evm/src/ciphernode_registry_sol.rs +++ b/crates/evm/src/ciphernode_registry_sol.rs @@ -18,7 +18,7 @@ use alloy::{ use anyhow::Result; use e3_data::Repository; use e3_events::{ - prelude::*, BusHandle, CommitteeFinalizeRequested, CommitteeFinalized, E3id, EnclaveErrorType, + prelude::*, BusHandle, CommitteeFinalizeRequested, CommitteeFinalized, E3id, EType, EnclaveEvent, EnclaveEventData, EventSubscriber, OrderedSet, PublicKeyAggregated, Seed, Shutdown, TicketGenerated, TicketId, }; @@ -218,7 +218,7 @@ pub struct CiphernodeRegistrySolReader; impl CiphernodeRegistrySolReader { pub async fn attach

( processor: &Recipient, - bus: &BusHandle, + bus: &BusHandle, provider: EthProvider

, contract_address: &str, repository: &Repository, @@ -250,12 +250,12 @@ impl CiphernodeRegistrySolReader { pub struct CiphernodeRegistrySolWriter

{ provider: EthProvider

, contract_address: Address, - bus: BusHandle, + bus: BusHandle, } impl CiphernodeRegistrySolWriter

{ pub async fn new( - bus: &BusHandle, + bus: &BusHandle, provider: EthProvider

, contract_address: Address, ) -> Result { @@ -267,7 +267,7 @@ impl CiphernodeRegistrySolWriter } pub async fn attach( - bus: &BusHandle, + bus: &BusHandle, provider: EthProvider

, contract_address: &str, is_aggregator: bool, @@ -361,7 +361,7 @@ impl Handler } Err(err) => { error!("Failed to submit ticket: {:?}", err); - bus.err(EnclaveErrorType::Evm, err); + bus.err(EType::Evm, err); } } }) @@ -391,7 +391,7 @@ impl Handler { error!("Failed to finalize committee: {:?}", err); - bus.err(EnclaveErrorType::Evm, err); + bus.err(EType::Evm, err); } } }) @@ -419,7 +419,7 @@ impl Handler { info!(tx=%receipt.transaction_hash, "Committee published to registry"); } - Err(err) => bus.err(EnclaveErrorType::Evm, err), + Err(err) => bus.err(EType::Evm, err), } }) } @@ -511,7 +511,7 @@ pub struct CiphernodeRegistrySol; impl CiphernodeRegistrySol { pub async fn attach

( processor: &Recipient, - bus: &BusHandle, + bus: &BusHandle, provider: EthProvider

, contract_address: &str, repository: &Repository, @@ -535,7 +535,7 @@ impl CiphernodeRegistrySol { } pub async fn attach_writer

( - bus: &BusHandle, + bus: &BusHandle, provider: EthProvider

, contract_address: &str, is_aggregator: bool, diff --git a/crates/evm/src/enclave_sol.rs b/crates/evm/src/enclave_sol.rs index 387116f92c..a8a6bebf30 100644 --- a/crates/evm/src/enclave_sol.rs +++ b/crates/evm/src/enclave_sol.rs @@ -12,14 +12,14 @@ use actix::Recipient; use alloy::providers::{Provider, WalletProvider}; use anyhow::Result; use e3_data::Repository; -use e3_events::{BusHandle, EnclaveEvent}; +use e3_events::BusHandle; pub struct EnclaveSol; impl EnclaveSol { pub async fn attach( processor: &Recipient, - bus: &BusHandle, + bus: &BusHandle, read_provider: EthProvider, write_provider: EthProvider, contract_address: &str, diff --git a/crates/evm/src/enclave_sol_reader.rs b/crates/evm/src/enclave_sol_reader.rs index 3a2a938005..fa1f6f8f67 100644 --- a/crates/evm/src/enclave_sol_reader.rs +++ b/crates/evm/src/enclave_sol_reader.rs @@ -13,7 +13,7 @@ use alloy::providers::Provider; use alloy::{sol, sol_types::SolEvent}; use anyhow::Result; use e3_data::Repository; -use e3_events::{BusHandle, E3id, EnclaveEvent, EnclaveEventData}; +use e3_events::{BusHandle, E3id, EnclaveEventData}; use e3_utils::utility_types::ArcBytes; use num_bigint::BigUint; use tracing::{error, info, trace}; @@ -107,7 +107,7 @@ pub struct EnclaveSolReader; impl EnclaveSolReader { pub async fn attach

( processor: &Recipient, - bus: &BusHandle, + bus: &BusHandle, provider: EthProvider

, contract_address: &str, repository: &Repository, diff --git a/crates/evm/src/enclave_sol_writer.rs b/crates/evm/src/enclave_sol_writer.rs index 35aade7e7f..e6bc7e59da 100644 --- a/crates/evm/src/enclave_sol_writer.rs +++ b/crates/evm/src/enclave_sol_writer.rs @@ -22,7 +22,7 @@ use e3_events::BusHandle; use e3_events::EnclaveEvent; use e3_events::EnclaveEventData; use e3_events::Shutdown; -use e3_events::{E3id, EnclaveErrorType, PlaintextAggregated}; +use e3_events::{E3id, EType, PlaintextAggregated}; use tracing::info; sol!( @@ -35,12 +35,12 @@ sol!( pub struct EnclaveSolWriter

{ provider: EthProvider

, contract_address: Address, - bus: BusHandle, + bus: BusHandle, } impl EnclaveSolWriter

{ pub fn new( - bus: &BusHandle, + bus: &BusHandle, provider: EthProvider

, contract_address: Address, ) -> Result { @@ -52,7 +52,7 @@ impl EnclaveSolWriter

{ } pub async fn attach( - bus: &BusHandle, + bus: &BusHandle, provider: EthProvider

, contract_address: &str, ) -> Result>> { @@ -99,10 +99,7 @@ impl Handler Handler { bus.err( - EnclaveErrorType::Evm, + EType::Evm, anyhow::anyhow!("Error publishing plaintext output: {:?}", err), ); } diff --git a/crates/evm/src/event_reader.rs b/crates/evm/src/event_reader.rs index eec8f1e463..39d2035253 100644 --- a/crates/evm/src/event_reader.rs +++ b/crates/evm/src/event_reader.rs @@ -14,7 +14,7 @@ use alloy::providers::Provider; use alloy::rpc::types::Filter; use anyhow::{anyhow, Result}; use e3_data::{AutoPersist, Persistable, Repository}; -use e3_events::{prelude::*, EnclaveErrorType, EnclaveEvent, EnclaveEventData, EventId}; +use e3_events::{prelude::*, EType, EnclaveEvent, EnclaveEventData, EventId}; use e3_events::{BusHandle, Event}; use futures_util::stream::StreamExt; use std::collections::HashSet; @@ -54,7 +54,7 @@ pub struct EvmEventReaderParams

{ contract_address: Address, start_block: Option, processor: Recipient, - bus: BusHandle, + bus: BusHandle, state: Persistable, rpc_url: String, } @@ -83,7 +83,7 @@ pub struct EvmEventReader

{ /// Processor to forward events an actor processor: Recipient, /// Event bus for error propagation only - bus: BusHandle, + bus: BusHandle, /// The auto persistable state of the event reader state: Persistable, /// The RPC URL for the provider @@ -113,7 +113,7 @@ impl EvmEventReader

{ contract_address: &str, start_block: Option, processor: &Recipient, - bus: &BusHandle, + bus: &BusHandle, repository: &Repository, rpc_url: String, ) -> Result> { @@ -156,7 +156,7 @@ impl Actor for EvmEventReader

{ let extractor = self.extractor; let Some(shutdown) = self.shutdown_rx.take() else { - bus.err(EnclaveErrorType::Evm, anyhow!("shutdown already called")); + bus.err(EType::Evm, anyhow!("shutdown already called")); return; }; @@ -191,7 +191,7 @@ async fn stream_from_evm( extractor: fn(&LogData, Option<&B256>, u64) -> Option, mut shutdown: oneshot::Receiver<()>, start_block: Option, - bus: &BusHandle, + bus: &BusHandle, rpc_url: String, ) { let chain_id = provider.chain_id(); @@ -203,7 +203,7 @@ async fn stream_from_evm( rpc_url ); bus.err( - EnclaveErrorType::Evm, + EType::Evm, anyhow!( "Misconfiguration: Attempted to query historical events from genesis on a non-local node. \ Please specify a `start_block` for contract address {contract_address} on chain {chain_id} using rpc {rpc_url}" @@ -235,7 +235,7 @@ async fn stream_from_evm( } Err(e) => { error!("Failed to fetch historical events: {}", e); - bus.err(EnclaveErrorType::Evm, anyhow!(e)); + bus.err(EType::Evm, anyhow!(e)); return; } } @@ -277,7 +277,7 @@ async fn stream_from_evm( } } Err(e) => { - bus.err(EnclaveErrorType::Evm, anyhow!("{}", e)); + bus.err(EType::Evm, anyhow!("{}", e)); } } @@ -344,7 +344,7 @@ impl Handler for EvmEventReader< Ok(state) }) { Ok(_) => (), - Err(err) => self.bus.err(EnclaveErrorType::Evm, err), + Err(err) => self.bus.err(EType::Evm, err), } } } diff --git a/crates/evm/src/historical_event_coordinator.rs b/crates/evm/src/historical_event_coordinator.rs index 12f1b9e466..5b02e33b89 100644 --- a/crates/evm/src/historical_event_coordinator.rs +++ b/crates/evm/src/historical_event_coordinator.rs @@ -6,7 +6,7 @@ use crate::EnclaveEvmEvent; use actix::prelude::*; -use e3_events::{prelude::*, BusHandle, EnclaveEvent, EnclaveEventData}; +use e3_events::{prelude::*, trap, BusHandle, EType, EnclaveEventData}; use tracing::info; #[derive(Clone)] @@ -30,13 +30,13 @@ pub struct HistoricalEventCoordinator { /// Buffered events during historical sync buffered_events: Vec, /// Target to forward events to (typically EventBus) - target: BusHandle, + target: BusHandle, /// Whether we've started forwarding (after Start message) started: bool, } impl HistoricalEventCoordinator { - pub fn new(target: BusHandle) -> Self { + pub fn new(target: BusHandle) -> Self { Self { registered_count: 0, completed_count: 0, @@ -46,7 +46,7 @@ impl HistoricalEventCoordinator { } } - pub fn setup(target: BusHandle) -> Addr { + pub fn setup(target: BusHandle) -> Addr { Self::new(target).start() } @@ -54,19 +54,20 @@ impl HistoricalEventCoordinator { self.registered_count > 0 && self.registered_count == self.completed_count } - fn flush_buffered_events(&mut self) { + fn flush_buffered_events(&mut self) -> anyhow::Result<()> { // Ordering by block number. But we should also consider the tx_index and log_index. self.buffered_events.sort_by_key(|e| e.block); let count = self.buffered_events.len(); for BufferedEvent { event, .. } in self.buffered_events.drain(..) { - self.target.publish(event); + self.target.publish(event)?; } info!( "HistoricalEventCoordinator: replay complete, published {} ordered events", count ); + Ok(()) } } @@ -82,13 +83,14 @@ impl Handler for HistoricalEventCoordinator { type Result = (); fn handle(&mut self, msg: EnclaveEvmEvent, _ctx: &mut Self::Context) -> Self::Result { - match msg { + trap(EType::Evm, &self.target.clone(), || match msg { EnclaveEvmEvent::RegisterReader => { self.registered_count += 1; info!( total_registered = self.registered_count, "Reader registered with coordinator" ); + Ok(()) } EnclaveEvmEvent::HistoricalSyncComplete => { @@ -101,8 +103,9 @@ impl Handler for HistoricalEventCoordinator { if self.started && self.all_readers_complete() { info!("All readers completed historical sync, flushing buffered events"); - self.flush_buffered_events(); + self.flush_buffered_events()?; } + Ok(()) } EnclaveEvmEvent::Event { event, block } => { @@ -111,10 +114,11 @@ impl Handler for HistoricalEventCoordinator { self.buffered_events.push(BufferedEvent { block, event }); } } else { - self.target.publish(event); + self.target.publish(event)?; } + Ok(()) } - } + }) } } @@ -122,14 +126,17 @@ impl Handler for HistoricalEventCoordinator { type Result = (); fn handle(&mut self, _msg: CoordinatorStart, _ctx: &mut Self::Context) -> Self::Result { - info!( - registered_readers = self.registered_count, - "Starting HistoricalEventCoordinator" - ); - self.started = true; - - if self.all_readers_complete() { - self.flush_buffered_events(); - } + trap(EType::Evm, &self.target.clone(), || { + info!( + registered_readers = self.registered_count, + "Starting HistoricalEventCoordinator" + ); + self.started = true; + + if self.all_readers_complete() { + self.flush_buffered_events()?; + } + Ok(()) + }) } } diff --git a/crates/evm/tests/integration.rs b/crates/evm/tests/integration.rs index a315276c2a..55bd774c09 100644 --- a/crates/evm/tests/integration.rs +++ b/crates/evm/tests/integration.rs @@ -269,7 +269,9 @@ async fn ensure_resume_after_shutdown() -> Result<()> { // Ensure shutdown doesn't cause event to be lost. sleep(Duration::from_millis(10)).await; - addr1.send(bus.event_from(Shutdown)).await?; + addr1 + .send(EnclaveEvent::new_stored_event(Shutdown.into(), 4321, 42)) + .await?; for msg in ["these", "are", "not", "lost"] { contract diff --git a/crates/fhe/src/ext.rs b/crates/fhe/src/ext.rs index 8c568f34f8..4f417a4547 100644 --- a/crates/fhe/src/ext.rs +++ b/crates/fhe/src/ext.rs @@ -8,9 +8,7 @@ use crate::{Fhe, FheRepositoryFactory}; use anyhow::{anyhow, Result}; use async_trait::async_trait; use e3_data::{FromSnapshotWithParams, RepositoriesFactory, Snapshot}; -use e3_events::{ - prelude::*, BusHandle, E3Requested, EnclaveErrorType, EnclaveEvent, EnclaveEventData, -}; +use e3_events::{prelude::*, BusHandle, E3Requested, EType, EnclaveEvent, EnclaveEventData}; use e3_request::{E3Context, E3ContextSnapshot, E3Extension, TypedKey}; use e3_utils::SharedRng; use std::sync::Arc; @@ -20,11 +18,11 @@ pub const FHE_KEY: TypedKey> = TypedKey::new("fhe"); /// TODO: move these to each package with access on MyStruct::launcher() pub struct FheExtension { rng: SharedRng, - bus: BusHandle, + bus: BusHandle, } impl FheExtension { - pub fn create(bus: &BusHandle, rng: &SharedRng) -> Box { + pub fn create(bus: &BusHandle, rng: &SharedRng) -> Box { Box::new(Self { rng: rng.clone(), bus: bus.clone(), @@ -50,10 +48,8 @@ impl E3Extension for FheExtension { } = data.clone(); let Ok(fhe_inner) = Fhe::from_encoded(¶ms, seed, self.rng.clone()) else { - self.bus.err( - EnclaveErrorType::KeyGeneration, - anyhow!(ERROR_FHE_FAILED_TO_DECODE), - ); + self.bus + .err(EType::KeyGeneration, anyhow!(ERROR_FHE_FAILED_TO_DECODE)); return; }; @@ -61,10 +57,8 @@ impl E3Extension for FheExtension { // FHE doesn't implement Checkpoint so we are going to store it manually let Ok(snapshot) = fhe.snapshot() else { - self.bus.err( - EnclaveErrorType::KeyGeneration, - anyhow!("Failed to get snapshot"), - ); + self.bus + .err(EType::KeyGeneration, anyhow!("Failed to get snapshot")); return; }; ctx.repositories().fhe(&e3_id).write(&snapshot); diff --git a/crates/keyshare/src/ext.rs b/crates/keyshare/src/ext.rs index c7bb8c95f4..39b8db852a 100644 --- a/crates/keyshare/src/ext.rs +++ b/crates/keyshare/src/ext.rs @@ -13,20 +13,20 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; use e3_crypto::Cipher; use e3_data::{AutoPersist, RepositoriesFactory}; -use e3_events::{prelude::*, BusHandle, EnclaveErrorType, EnclaveEvent, EnclaveEventData}; +use e3_events::{prelude::*, BusHandle, EType, EnclaveEvent, EnclaveEventData}; use e3_fhe::ext::FHE_KEY; use e3_multithread::Multithread; use e3_request::{E3Context, E3ContextSnapshot, E3Extension, META_KEY}; use std::sync::Arc; pub struct KeyshareExtension { - bus: BusHandle, + bus: BusHandle, address: String, cipher: Arc, } impl KeyshareExtension { - pub fn create(bus: &BusHandle, address: &str, cipher: &Arc) -> Box { + pub fn create(bus: &BusHandle, address: &str, cipher: &Arc) -> Box { Box::new(Self { bus: bus.clone(), address: address.to_owned(), @@ -48,10 +48,8 @@ impl E3Extension for KeyshareExtension { // Has the FHE dependency been already setup? (hint: it should have) let Some(fhe) = ctx.get_dependency(FHE_KEY) else { - self.bus.err( - EnclaveErrorType::KeyGeneration, - anyhow!(ERROR_KEYSHARE_FHE_MISSING), - ); + self.bus + .err(EType::KeyGeneration, anyhow!(ERROR_KEYSHARE_FHE_MISSING)); return; }; @@ -91,10 +89,8 @@ impl E3Extension for KeyshareExtension { // Has the FHE dependency been already setup? (hint: it should have) let Some(fhe) = ctx.get_dependency(FHE_KEY) else { - self.bus.err( - EnclaveErrorType::KeyGeneration, - anyhow!(ERROR_KEYSHARE_FHE_MISSING), - ); + self.bus + .err(EType::KeyGeneration, anyhow!(ERROR_KEYSHARE_FHE_MISSING)); return Ok(()); }; @@ -117,7 +113,7 @@ impl E3Extension for KeyshareExtension { } pub struct ThresholdKeyshareExtension { - bus: BusHandle, + bus: BusHandle, cipher: Arc, address: String, multithread: Addr, @@ -125,7 +121,7 @@ pub struct ThresholdKeyshareExtension { impl ThresholdKeyshareExtension { pub fn create( - bus: &BusHandle, + bus: &BusHandle, cipher: &Arc, multithread: &Addr, address: &str, @@ -139,6 +135,9 @@ impl ThresholdKeyshareExtension { } } +const ERROR_KEYSHARE_META_MISSING: &str = + "Could not create ThresholdKeyshare because the meta instance it depends on was not set on the context."; + #[async_trait] impl E3Extension for ThresholdKeyshareExtension { fn on_event(&self, ctx: &mut E3Context, evt: &EnclaveEvent) { @@ -150,10 +149,8 @@ impl E3Extension for ThresholdKeyshareExtension { let e3_id = data.clone().e3_id; let party_id = data.clone().party_id; let Some(meta) = ctx.get_dependency(META_KEY) else { - self.bus.err( - EnclaveErrorType::KeyGeneration, - anyhow!(ERROR_KEYSHARE_FHE_MISSING), - ); + self.bus + .err(EType::KeyGeneration, anyhow!(ERROR_KEYSHARE_META_MISSING)); return; }; let repo = ctx.repositories().threshold_keyshare(&e3_id); diff --git a/crates/keyshare/src/keyshare.rs b/crates/keyshare/src/keyshare.rs index 065009e39e..ff80744861 100644 --- a/crates/keyshare/src/keyshare.rs +++ b/crates/keyshare/src/keyshare.rs @@ -5,12 +5,13 @@ // or FITNESS FOR A PARTICULAR PURPOSE. use actix::prelude::*; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context as AnyhowContext, Result}; use e3_crypto::Cipher; use e3_data::Persistable; use e3_events::{ - prelude::*, BusHandle, CiphernodeSelected, CiphertextOutputPublished, DecryptionshareCreated, - Die, E3RequestComplete, EnclaveErrorType, EnclaveEvent, EnclaveEventData, KeyshareCreated, + prelude::*, trap, BusHandle, CiphernodeSelected, CiphertextOutputPublished, + DecryptionshareCreated, Die, E3RequestComplete, EType, EnclaveEvent, EnclaveEventData, + KeyshareCreated, }; use e3_fhe::{DecryptCiphertext, Fhe}; use e3_utils::utility_types::ArcBytes; @@ -19,7 +20,7 @@ use tracing::warn; pub struct Keyshare { fhe: Arc, - bus: BusHandle, + bus: BusHandle, secret: Persistable>, address: String, cipher: Arc, @@ -30,7 +31,7 @@ impl Actor for Keyshare { } pub struct KeyshareParams { - pub bus: BusHandle, + pub bus: BusHandle, pub secret: Persistable>, pub fhe: Arc, pub address: String, @@ -90,28 +91,27 @@ impl Handler for Keyshare { type Result = (); fn handle(&mut self, event: CiphernodeSelected, _: &mut actix::Context) -> Self::Result { - let CiphernodeSelected { e3_id, .. } = event; - - // generate keyshare - let Ok((secret, pubkey)) = self.fhe.generate_keyshare() else { - self.bus.err( - EnclaveErrorType::KeyGeneration, - anyhow!("Error creating Keyshare for {e3_id}"), - ); - return; - }; - - // Save secret on state - if let Err(err) = self.set_secret(secret) { - self.bus.err(EnclaveErrorType::KeyGeneration, err) - }; - - // Broadcast the KeyshareCreated message - self.bus.publish(KeyshareCreated { - pubkey, - e3_id, - node: self.address.clone(), - }); + trap(EType::KeyGeneration, &self.bus.clone(), || { + let CiphernodeSelected { e3_id, .. } = event; + + // generate keyshare + let (secret, pubkey) = self + .fhe + .generate_keyshare() + .with_context(|| format!("Error creating Keyshare for {}", e3_id))?; + + // Save secret on state + self.set_secret(secret)?; + + // Broadcast the KeyshareCreated message + self.bus.publish(KeyshareCreated { + pubkey, + e3_id, + node: self.address.clone(), + })?; + + Ok(()) + }) } } @@ -123,44 +123,32 @@ impl Handler for Keyshare { event: CiphertextOutputPublished, _: &mut actix::Context, ) -> Self::Result { - let CiphertextOutputPublished { - e3_id, - ciphertext_output, - } = event; - - let Ok(secret) = self.get_secret() else { - self.bus.err( - EnclaveErrorType::Decryption, - anyhow!("Secret not available for Keyshare for e3_id {e3_id}"), - ); - return; - }; - - let Some(ciphertext) = ciphertext_output.first() else { - self.bus.err( - EnclaveErrorType::Decryption, - anyhow!("Ciphernode output array is empty!"), - ); - return; - }; - - let Ok(decryption_share) = self.fhe.decrypt_ciphertext(DecryptCiphertext { - ciphertext: ciphertext.extract_bytes(), - unsafe_secret: secret, - }) else { - self.bus.err( - EnclaveErrorType::Decryption, - anyhow!("error decrypting ciphertext: {:?}", ciphertext_output), - ); - return; - }; - - self.bus.publish(DecryptionshareCreated { - party_id: 0, // Not used - e3_id, - decryption_share: vec![ArcBytes::from_bytes(&decryption_share)], - node: self.address.clone(), - }); + trap(EType::Decryption, &self.bus.clone(), || { + let CiphertextOutputPublished { + e3_id, + ciphertext_output, + } = event; + + let secret = self.get_secret()?; + + let ciphertext = ciphertext_output + .first() + .ok_or(anyhow!("Ciphernode output array is empty!"))?; + + let decryption_share = self.fhe.decrypt_ciphertext(DecryptCiphertext { + ciphertext: ciphertext.extract_bytes(), + unsafe_secret: secret, + })?; + + self.bus.publish(DecryptionshareCreated { + party_id: 0, // Not used + e3_id, + decryption_share: vec![ArcBytes::from_bytes(&decryption_share)], + node: self.address.clone(), + })?; + + Ok(()) + }) } } diff --git a/crates/keyshare/src/threshold_keyshare.rs b/crates/keyshare/src/threshold_keyshare.rs index ccea059247..d79398f13a 100644 --- a/crates/keyshare/src/threshold_keyshare.rs +++ b/crates/keyshare/src/threshold_keyshare.rs @@ -269,14 +269,14 @@ impl TryInto for ThresholdKeyshareState { } pub struct ThresholdKeyshareParams { - pub bus: BusHandle, + pub bus: BusHandle, pub cipher: Arc, pub multithread: Addr, pub state: Persistable, } pub struct ThresholdKeyshare { - bus: BusHandle, + bus: BusHandle, cipher: Arc, decryption_key_collector: Option>, multithread: Addr, @@ -535,7 +535,7 @@ impl ThresholdKeyshare { sk_sss, }), external: false, - }); + })?; Ok(()) } @@ -634,7 +634,7 @@ impl ThresholdKeyshare { pubkey: current.pk_share, e3_id, node: address, - }); + })?; Ok(()) } @@ -697,7 +697,7 @@ impl ThresholdKeyshare { }; // send the decryption share - self.bus.publish(event); + self.bus.publish(event)?; // mark as complete self.state.try_mutate(|s| { diff --git a/crates/logger/src/logger.rs b/crates/logger/src/logger.rs index ea222bbe4f..9367736d9a 100644 --- a/crates/logger/src/logger.rs +++ b/crates/logger/src/logger.rs @@ -5,7 +5,7 @@ // or FITNESS FOR A PARTICULAR PURPOSE. use actix::{Actor, Addr, Context, Handler}; -use e3_events::{prelude::Event, EnclaveEvent, EnclaveEventData, EventBus, Subscribe}; +use e3_events::{prelude::Event, EnclaveEvent, EnclaveEventData, EventBus, SeqState, Subscribe}; use std::marker::PhantomData; use tracing::{error, info}; @@ -46,7 +46,7 @@ impl Handler for SimpleLogger { } } -impl EventLogging for EnclaveEvent { +impl EventLogging for EnclaveEvent { fn log(&self, logger_name: &str) { match self.get_data() { EnclaveEventData::EnclaveError(_) => error!(event=%self, "ERROR!"), diff --git a/crates/net/src/document_publisher.rs b/crates/net/src/document_publisher.rs index d5be42ec57..a0aac9e06f 100644 --- a/crates/net/src/document_publisher.rs +++ b/crates/net/src/document_publisher.rs @@ -15,8 +15,8 @@ use anyhow::Result; use chrono::{DateTime, Utc}; use e3_events::{ prelude::*, BusHandle, CiphernodeSelected, CorrelationId, DocumentKind, DocumentMeta, - DocumentReceived, E3RequestComplete, E3id, EnclaveErrorType, EnclaveEvent, EnclaveEventData, - Event, PartyId, PublishDocumentRequested, ThresholdShareCreated, + DocumentReceived, E3RequestComplete, E3id, EType, EnclaveEvent, EnclaveEventData, Event, + PartyId, PublishDocumentRequested, ThresholdShareCreated, }; use e3_utils::retry::{retry_with_backoff, to_retry}; use e3_utils::ArcBytes; @@ -40,7 +40,7 @@ const KADEMLIA_BROADCAST_TIMEOUT: Duration = Duration::from_secs(30); /// bus pub struct DocumentPublisher { /// Enclave EventBus - bus: BusHandle, + bus: BusHandle, /// NetCommand sender to forward commands to the NetInterface tx: mpsc::Sender, /// NetEvent receiver to resubscribe for events from the NetInterface. This is in an Arc so @@ -55,7 +55,7 @@ pub struct DocumentPublisher { impl DocumentPublisher { /// Create a new NetEventTranslator actor pub fn new( - bus: &BusHandle, + bus: &BusHandle, tx: &mpsc::Sender, rx: &Arc>, topic: impl Into, @@ -81,7 +81,7 @@ impl DocumentPublisher { /// Setup the DocumentPublisher and start listening for GossipEvents pub fn setup( - bus: &BusHandle, + bus: &BusHandle, tx: &mpsc::Sender, rx: &Arc>, topic: impl Into, @@ -155,7 +155,7 @@ impl Handler for DocumentPublisher { Ok(_) => (), Err(e) => { error!(error=?e, "Could not handle publish document requested"); - bus.err(EnclaveErrorType::IO, e) + bus.err(EType::IO, e) } } }) @@ -204,7 +204,7 @@ impl Handler for DocumentPublisher { Ok(_) => (), Err(e) => { error!(error=?e, "Could not handle document published notification"); - bus.err(EnclaveErrorType::IO, e); + bus.err(EType::IO, e); } } }) @@ -253,7 +253,7 @@ pub async fn handle_publish_document_requested( pub async fn handle_document_published_notification( net_cmds: mpsc::Sender, net_events: Arc>, - bus: BusHandle, + bus: BusHandle, ids: HashMap, event: DocumentPublishedNotification, ) -> Result<()> { @@ -283,7 +283,7 @@ pub async fn handle_document_published_notification( bus.publish(DocumentReceived { meta: event.meta, value, - }); + })?; Ok(()) } @@ -374,7 +374,7 @@ async fn broadcast_document_published_notification( /// Convert between ThresholdShareCreated and DocumentPublished events pub struct EventConverter { - bus: BusHandle, + bus: BusHandle, } #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -399,10 +399,10 @@ impl ReceivableDocument { } impl EventConverter { - pub fn new(bus: &BusHandle) -> Self { + pub fn new(bus: &BusHandle) -> Self { Self { bus: bus.clone() } } - pub fn setup(bus: &BusHandle) -> Addr { + pub fn setup(bus: &BusHandle) -> Addr { let addr = Self::new(bus).start(); bus.subscribe("ThresholdShareCreated", addr.clone().into()); bus.subscribe("DocumentReceived", addr.clone().into()); @@ -422,7 +422,8 @@ impl EventConverter { vec![], None, ); - self.bus.publish(PublishDocumentRequested::new(meta, value)); + self.bus + .publish(PublishDocumentRequested::new(meta, value))?; Ok(()) } /// Received document externally @@ -437,7 +438,7 @@ impl EventConverter { }, }; - self.bus.publish(event); + self.bus.publish(event)?; Ok(()) } } @@ -499,7 +500,7 @@ mod tests { fn setup_test() -> ( DefaultGuard, - BusHandle, + BusHandle, mpsc::Sender, mpsc::Receiver, broadcast::Sender, @@ -517,10 +518,8 @@ mod tests { let guard = tracing::subscriber::set_default(subscriber); - let bus: BusHandle = - EventBus::::new(EventBusConfig { deduplicate: true }) - .start() - .into(); + let consumer = EventBus::::new(EventBusConfig { deduplicate: true }).start(); + let bus = BusHandle::new_from_consumer(consumer); let (net_cmd_tx, net_cmd_rx) = mpsc::channel(100); let (net_evt_tx, net_evt_rx) = broadcast::channel(100); let net_evt_rx = Arc::new(net_evt_rx); @@ -547,7 +546,7 @@ mod tests { bus.publish(PublishDocumentRequested { meta: DocumentMeta::new(e3_id, DocumentKind::TrBFV, vec![], expires_at), value: value.clone(), - }); + })?; // 2. Document publisher should have asked the NetInterface to put the doc on Kademlia let Some(NetCommand::DhtPutRecord { @@ -623,7 +622,7 @@ mod tests { threshold_m: 3, threshold_n: 5, ..CiphernodeSelected::default() - }); + })?; net_evt_tx.send(NetEvent::GossipData( GossipData::DocumentPublishedNotification(DocumentPublishedNotification { @@ -683,7 +682,7 @@ mod tests { bus.publish(PublishDocumentRequested { meta: DocumentMeta::new(e3_id, DocumentKind::TrBFV, vec![], expires_at), value: value.clone(), - }); + })?; for _ in 0..4 { // Expect retry @@ -735,7 +734,7 @@ mod tests { threshold_m: 3, threshold_n: 5, ..CiphernodeSelected::default() - }); + })?; // 2. Dispatch a NetEvent from the NetInterface signaling that a document was published net_evt_tx.send(NetEvent::GossipData( diff --git a/crates/net/src/events.rs b/crates/net/src/events.rs index 010e7130be..0218cae8c1 100644 --- a/crates/net/src/events.rs +++ b/crates/net/src/events.rs @@ -6,8 +6,11 @@ use crate::Cid; use actix::Message; -use anyhow::{Context, Result}; -use e3_events::{CorrelationId, DocumentMeta}; +use anyhow::{bail, Context, Result}; +use e3_events::{ + CorrelationId, DocumentMeta, EnclaveEvent, EventConstructorWithTimestamp, Sequenced, + Unsequenced, +}; use e3_utils::ArcBytes; use libp2p::{ gossipsub::{MessageId, PublishError, TopicHash}, @@ -39,6 +42,28 @@ impl GossipData { } } +impl TryFrom> for GossipData { + type Error = anyhow::Error; + fn try_from(value: EnclaveEvent) -> Result { + let bytes = value + .clone_unsequenced() // Note serializing UNSEQUENCED + .to_bytes() + .context("Could not convert event to bytes for serialization!")?; + Ok(GossipData::GossipBytes(bytes)) + } +} + +impl TryFrom for EnclaveEvent { + type Error = anyhow::Error; + fn try_from(value: GossipData) -> Result { + let GossipData::GossipBytes(bytes) = value else { + bail!("GossipData was not the GossipBytes variant"); + }; + + Ok(EnclaveEvent::from_bytes(&bytes)?) + } +} + /// NetInterface Commands are sent to the network peer over a mspc channel #[derive(Debug)] pub enum NetCommand { @@ -220,3 +245,38 @@ where .map_err(|_| anyhow::anyhow!(format!("Timed out waiting for response from {}", debug_cmd)))?; result } + +#[cfg(test)] +mod tests { + use e3_events::{ + EnclaveEvent, EventConstructorWithTimestamp, Sequenced, TestEvent, Unsequenced, + }; + + use super::GossipData; + + #[test] + fn test_enclave_event_gossip_lifecycle() -> anyhow::Result<()> { + // event is created locally + let event: EnclaveEvent = + EnclaveEvent::new_with_timestamp(TestEvent::new("fish", 42).into(), 31415); + + // event is sequenced after bus.publish() adds a sequence number + let event: EnclaveEvent = event.into_sequenced(90210); + + // event is broadcast + let gossip_data: GossipData = event.try_into()?; + + let GossipData::GossipBytes(_) = gossip_data else { + panic!("events must only be serialized to GossipBytes"); + }; + + // received gossip data from libp2p convert to unsequenced event + let event: EnclaveEvent = gossip_data.try_into()?; + let (data, ts) = event.split(); + + assert_eq!(data, TestEvent::new("fish", 42).into()); + assert_eq!(ts, 31415); + + Ok(()) + } +} diff --git a/crates/net/src/net_event_translator.rs b/crates/net/src/net_event_translator.rs index f23905184f..d12abd5793 100644 --- a/crates/net/src/net_event_translator.rs +++ b/crates/net/src/net_event_translator.rs @@ -16,9 +16,12 @@ use anyhow::{bail, Result}; use e3_crypto::Cipher; use e3_data::Repository; use e3_events::prelude::*; +use e3_events::trap; use e3_events::BusHandle; +use e3_events::EType; use e3_events::EnclaveEventData; use e3_events::Event; +use e3_events::Unsequenced; use e3_events::{CorrelationId, EnclaveEvent, EventId}; use libp2p::identity::ed25519; use std::collections::HashSet; @@ -34,7 +37,7 @@ use tracing::{error, info, instrument, trace}; /// NetEventTranslator Actor converts between EventBus events and Libp2p events forwarding them to a /// NetInterface for propagation over the p2p network pub struct NetEventTranslator { - bus: BusHandle, + bus: BusHandle, tx: mpsc::Sender, sent_events: HashSet, topic: String, @@ -47,11 +50,11 @@ impl Actor for NetEventTranslator { /// Libp2pEvent is used to send data to the NetInterface from the NetEventTranslator #[derive(Message, Clone, Debug, PartialEq, Eq)] #[rtype(result = "anyhow::Result<()>")] -struct LibP2pEvent(pub Vec); +struct LibP2pEvent(pub GossipData); impl NetEventTranslator { /// Create a new NetEventTranslator actor - pub fn new(bus: &BusHandle, tx: &mpsc::Sender, topic: &str) -> Self { + pub fn new(bus: &BusHandle, tx: &mpsc::Sender, topic: &str) -> Self { Self { bus: bus.clone(), tx: tx.clone(), @@ -61,7 +64,7 @@ impl NetEventTranslator { } pub fn setup( - bus: &BusHandle, + bus: &BusHandle, tx: &mpsc::Sender, rx: &Arc>, topic: &str, @@ -78,8 +81,8 @@ impl NetEventTranslator { while let Ok(event) = rx.recv().await { match event { NetEvent::GossipData(data) => { - if let GossipData::GossipBytes(payload) = data { - addr.do_send(LibP2pEvent(payload)); + if let GossipData::GossipBytes(_) = data { + addr.do_send(LibP2pEvent(data)); } } _ => (), @@ -108,7 +111,7 @@ impl NetEventTranslator { /// Spawn a Libp2p interface and hook it up to this actor #[instrument(name = "libp2p", skip_all)] pub async fn setup_with_interface( - bus: BusHandle, + bus: BusHandle, peers: Vec, cipher: &Arc, quic_port: u16, @@ -159,57 +162,47 @@ impl NetEventTranslator { impl Handler for NetEventTranslator { type Result = anyhow::Result<()>; fn handle(&mut self, msg: LibP2pEvent, _: &mut Self::Context) -> Self::Result { - let LibP2pEvent(bytes) = msg; - match EnclaveEvent::from_bytes(&bytes) { - Ok(event) => { - self.bus.naked_dispatch(event.clone()); // TODO: convert to receive - self.sent_events.insert(event.into()); - } - Err(err) => error!(error=?err, "Could not create EnclaveEvent from Libp2p Bytes!"), - } + let LibP2pEvent(data) = msg; + let event: EnclaveEvent = data.try_into()?; + self.sent_events.insert(event.get_id()); + let (data, ts) = event.split(); + self.bus.publish_from_remote(data, ts)?; Ok(()) } } impl Handler for NetEventTranslator { - type Result = ResponseFuture<()>; + type Result = (); fn handle(&mut self, event: EnclaveEvent, _: &mut Self::Context) -> Self::Result { - let sent_events = self.sent_events.clone(); - let tx = self.tx.clone(); - let evt = event.clone(); - let topic = self.topic.clone(); - Box::pin(async move { + trap(EType::Net, &self.bus.clone(), || { + let sent_events = self.sent_events.clone(); + let tx = self.tx.clone(); + let evt = event.clone(); + let topic = self.topic.clone(); let id: EventId = evt.clone().into(); // Ignore events that should be considered local if !Self::is_forwardable_event(&evt) { trace!(evt_id=%id,"Local events should not be rebroadcast so ignoring"); - return; + return Ok(()); } // if we have seen this event before dont rebroadcast if sent_events.contains(&id) { trace!(evt_id=%id,"Have seen event before not rebroadcasting!"); - return; + return Ok(()); } + warn!("GossipPublish event: {}", event.event_type()); - match evt.to_bytes() { - Ok(data) => { - if let Err(e) = tx - .send(NetCommand::GossipPublish { - topic, - data: GossipData::GossipBytes(data), - correlation_id: CorrelationId::new(), - }) - .await - { - error!(error=?e, "Error sending bytes to libp2p: {e}"); - }; - } - Err(error) => { - error!(error=?error, "Could not convert event to bytes for serialization!") - } - } + let data: GossipData = evt.try_into()?; + + tx.try_send(NetCommand::GossipPublish { + topic, + data, + correlation_id: CorrelationId::new(), + })?; + + Ok(()) }) } } diff --git a/crates/net/src/net_interface.rs b/crates/net/src/net_interface.rs index c130cc25f9..3041c63878 100644 --- a/crates/net/src/net_interface.rs +++ b/crates/net/src/net_interface.rs @@ -74,8 +74,8 @@ impl NetInterface { udp_port: Option, topic: &str, ) -> Result { - let (event_tx, _) = broadcast::channel(100); // TODO : tune this param - let (cmd_tx, cmd_rx) = mpsc::channel(100); // TODO : tune this param + let (event_tx, _) = broadcast::channel(1000); // TODO : tune this param + let (cmd_tx, cmd_rx) = mpsc::channel(1000); // TODO : tune this param let swarm = libp2p::SwarmBuilder::with_existing_identity(id.clone()) .with_tokio() diff --git a/crates/request/src/router.rs b/crates/request/src/router.rs index a3edf20f48..3df1c2e1a9 100644 --- a/crates/request/src/router.rs +++ b/crates/request/src/router.rs @@ -10,7 +10,7 @@ use crate::E3ContextParams; use crate::E3ContextSnapshot; use crate::E3MetaExtension; use crate::RouterRepositoryFactory; -use actix::AsyncContext; +use actix::Message; use actix::{Actor, Addr, Context, Handler}; use anyhow::*; use async_trait::async_trait; @@ -21,16 +21,16 @@ use e3_data::RepositoriesFactory; use e3_data::Repository; use e3_data::Snapshot; use e3_events::prelude::*; +use e3_events::trap; use e3_events::BusHandle; use e3_events::E3RequestComplete; +use e3_events::EType; use e3_events::EnclaveEventData; -use e3_events::Shutdown; use e3_events::{E3id, EnclaveEvent, Event}; use serde::Deserialize; use serde::Serialize; use std::collections::HashSet; use std::{collections::HashMap, sync::Arc}; -use tracing::error; /// Buffers events for downstream instances to handle out-of-order event delivery. /// Events are stored in a HashMap keyed by string identifiers until they are ready @@ -105,19 +105,19 @@ pub struct E3Router { /// A buffer for events to send to the buffer: EventBuffer, /// The EventBus - bus: BusHandle, + bus: BusHandle, /// A repository for storing snapshots store: Repository, } pub struct E3RouterParams { extensions: Arc>>, - bus: BusHandle, + bus: BusHandle, store: Repository, } impl E3Router { - pub fn builder(bus: &BusHandle, store: DataStore) -> E3RouterBuilder { + pub fn builder(bus: &BusHandle, store: DataStore) -> E3RouterBuilder { let repositories = store.repositories(); let builder = E3RouterBuilder { bus: bus.clone(), @@ -149,71 +149,66 @@ impl Actor for E3Router { impl Handler for E3Router { type Result = (); - fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { - // If we are shutting down then bail on anything else - if let EnclaveEventData::Shutdown(data) = msg.get_data() { - ctx.notify(data.clone()); - return; - } - - // Only process events with e3_ids - let Some(e3_id) = msg.get_e3_id() else { - return; - }; - - // If this e3 round has already been completed then we are not going to do anything here - if self.completed.contains(&e3_id) { - error!("Received the following event to E3Id({}) despite already being completed:\n\n{:?}\n\n", e3_id, msg); - return; - } - - let repositories = self.repository().repositories(); - let context = self.contexts.entry(e3_id.clone()).or_insert_with(|| { - E3Context::from_params(E3ContextParams { - e3_id: e3_id.clone(), - repository: repositories.context(&e3_id), - extensions: self.extensions.clone(), - }) - }); + fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result { + trap(EType::Event, &self.bus.clone(), || { + // If we are shutting down then bail on anything else + if let EnclaveEventData::Shutdown(_) = msg.get_data() { + for (_, ctx) in self.contexts.iter() { + ctx.forward_message_now(&msg) + } + + return Ok(()); + } - for extension in self.extensions.iter() { - extension.on_event(context, &msg); - } + // Only process events with e3_ids + let Some(e3_id) = msg.get_e3_id() else { + return Ok(()); + }; - context.forward_message(&msg, &mut self.buffer); + // If this e3 round has already been completed then we are not going to do anything here + if self.completed.contains(&e3_id) { + return Err(anyhow!("Received the following event to E3Id({}) despite already being completed:\n\n{:?}\n\n", e3_id, msg)); + } - match msg.into_data() { - EnclaveEventData::PlaintextAggregated(_) => { - // Here we are detemining that by receiving the PlaintextAggregated event our request is - // complete and we can notify everyone. This might change as we consider other factors - // when determining if the request is complete - let event = E3RequestComplete { + let repositories = self.repository().repositories(); + let context = self.contexts.entry(e3_id.clone()).or_insert_with(|| { + E3Context::from_params(E3ContextParams { e3_id: e3_id.clone(), - }; + repository: repositories.context(&e3_id), + extensions: self.extensions.clone(), + }) + }); - // Send to bus so all other actors can react to a request being complete. - self.bus.publish(event); + for extension in self.extensions.iter() { + extension.on_event(context, &msg); } - EnclaveEventData::E3RequestComplete(_) => { - // Note this will be sent above to the children who can kill themselves based on - // the event - self.contexts.remove(&e3_id); - self.completed.insert(e3_id); - } - _ => (), - } - self.checkpoint(); - } -} + context.forward_message(&msg, &mut self.buffer); -impl Handler for E3Router { - type Result = (); - fn handle(&mut self, msg: Shutdown, _ctx: &mut Self::Context) -> Self::Result { - let shutdown_evt = self.bus.event_from(msg); - for (_, ctx) in self.contexts.iter() { - ctx.forward_message_now(&shutdown_evt) - } + match msg.into_data() { + EnclaveEventData::PlaintextAggregated(_) => { + // Here we are detemining that by receiving the PlaintextAggregated event our request is + // complete and we can notify everyone. This might change as we consider other factors + // when determining if the request is complete + let event = E3RequestComplete { + e3_id: e3_id.clone(), + }; + + // Send to bus so all other actors can react to a request being complete. + self.bus.publish(event)?; + } + EnclaveEventData::E3RequestComplete(_) => { + // Note this will be sent above to the children who can kill themselves based on + // the event + self.contexts.remove(&e3_id); + self.completed.insert(e3_id); + } + _ => (), + } + + self.checkpoint(); + Ok(()) + }); } } @@ -282,7 +277,7 @@ impl FromSnapshotWithParams for E3Router { /// Builder for E3Router pub struct E3RouterBuilder { - pub bus: BusHandle, + pub bus: BusHandle, pub extensions: Vec>, pub store: Repository, } diff --git a/crates/sortition/src/ciphernode_selector.rs b/crates/sortition/src/ciphernode_selector.rs index 0d2d278a57..8de35d2885 100644 --- a/crates/sortition/src/ciphernode_selector.rs +++ b/crates/sortition/src/ciphernode_selector.rs @@ -11,14 +11,14 @@ use actix::prelude::*; use e3_config::StoreKeys; use e3_data::{DataStore, RepositoriesFactory}; use e3_events::{ - prelude::*, BusHandle, CiphernodeSelected, CommitteeFinalized, E3Requested, EnclaveEvent, - EnclaveEventData, Shutdown, TicketGenerated, TicketId, + prelude::*, trap, BusHandle, CiphernodeSelected, CommitteeFinalized, E3Requested, EType, + EnclaveEvent, EnclaveEventData, Shutdown, TicketGenerated, TicketId, }; use e3_request::MetaRepositoryFactory; use tracing::info; pub struct CiphernodeSelector { - bus: BusHandle, + bus: BusHandle, sortition: Addr, address: String, data_store: DataStore, @@ -30,7 +30,7 @@ impl Actor for CiphernodeSelector { impl CiphernodeSelector { pub fn new( - bus: &BusHandle, + bus: &BusHandle, sortition: &Addr, address: &str, data_store: &DataStore, @@ -44,7 +44,7 @@ impl CiphernodeSelector { } pub fn attach( - bus: &BusHandle, + bus: &BusHandle, sortition: &Addr, address: &str, data_store: &DataStore, @@ -109,11 +109,14 @@ impl Handler for CiphernodeSelector { ticket_id = tid, "Ticket generated for score sortition" ); - bus.publish(TicketGenerated { - e3_id: data.e3_id.clone(), - ticket_id: TicketId::Score(tid), - node: address.clone(), - }); + trap(EType::Sortition, &bus.clone(), || { + bus.publish(TicketGenerated { + e3_id: data.e3_id.clone(), + ticket_id: TicketId::Score(tid), + node: address.clone(), + })?; + Ok(()) + }) } } else { info!("This node is not selected"); @@ -165,16 +168,19 @@ impl Handler for CiphernodeSelector { party_id = party_id, "Node is in finalized committee, emitting CiphernodeSelected" ); - bus.publish(CiphernodeSelected { - party_id: party_id as u64, - e3_id, - threshold_m: e3_meta.threshold_m, - threshold_n: e3_meta.threshold_n, - esi_per_ct: e3_meta.esi_per_ct, - error_size: e3_meta.error_size, - params: e3_meta.params, - seed: e3_meta.seed, - }); + trap(EType::Sortition, &bus.clone(), || { + bus.publish(CiphernodeSelected { + party_id: party_id as u64, + e3_id, + threshold_m: e3_meta.threshold_m, + threshold_n: e3_meta.threshold_n, + esi_per_ct: e3_meta.esi_per_ct, + error_size: e3_meta.error_size, + params: e3_meta.params, + seed: e3_meta.seed, + })?; + Ok(()) + }) }) } } diff --git a/crates/sortition/src/sortition.rs b/crates/sortition/src/sortition.rs index 09ecc5beb9..c3770080e1 100644 --- a/crates/sortition/src/sortition.rs +++ b/crates/sortition/src/sortition.rs @@ -11,8 +11,8 @@ use anyhow::Result; use e3_data::{AutoPersist, Persistable, Repository}; use e3_events::{ prelude::*, CiphernodeAdded, CiphernodeRemoved, CommitteeFinalized, CommitteePublished, - ConfigurationUpdated, EnclaveErrorType, EnclaveEvent, OperatorActivationChanged, - PlaintextOutputPublished, Seed, TicketBalanceUpdated, + ConfigurationUpdated, EType, EnclaveEvent, OperatorActivationChanged, PlaintextOutputPublished, + Seed, TicketBalanceUpdated, }; use e3_events::{BusHandle, EnclaveEventData}; use serde::{Deserialize, Serialize}; @@ -139,7 +139,7 @@ pub struct Sortition { /// Persistent map of `chain_id -> NodeStateStore`. node_state: Persistable>, /// Event bus for error reporting and enclave event subscription. - bus: BusHandle, + bus: BusHandle, /// Persistent map of finalized committees per E3 finalized_committees: Persistable>>, } @@ -148,7 +148,7 @@ pub struct Sortition { #[derive(Debug)] pub struct SortitionParams { /// Event bus address. - pub bus: BusHandle, + pub bus: BusHandle, /// Persisted per-chain backend map. pub backends: Persistable>, /// Node state store per chain @@ -169,7 +169,7 @@ impl Sortition { #[instrument(name = "sortition_attach", skip_all)] pub async fn attach( - bus: &BusHandle, + bus: &BusHandle, backends_store: Repository>, node_state_store: Repository>, committees_store: Repository>>, @@ -262,7 +262,7 @@ impl Handler for Sortition { .or_insert_with(NodeState::default); Ok(state_map) }) { - self.bus.err(EnclaveErrorType::Sortition, err); + self.bus.err(EType::Sortition, err); } if let Err(err) = self.backends.try_mutate(move |mut list_map| { @@ -277,7 +277,7 @@ impl Handler for Sortition { .add(addr); Ok(list_map) }) { - self.bus.err(EnclaveErrorType::Sortition, err); + self.bus.err(EType::Sortition, err); } info!(address = %msg.address, chain_id = chain_id, "Node added to sortition state"); @@ -297,7 +297,7 @@ impl Handler for Sortition { } Ok(state_map) }) { - self.bus.err(EnclaveErrorType::Sortition, err); + self.bus.err(EType::Sortition, err); } if let Err(err) = self.backends.try_mutate(move |mut list_map| { @@ -306,7 +306,7 @@ impl Handler for Sortition { } Ok(list_map) }) { - self.bus.err(EnclaveErrorType::Sortition, err); + self.bus.err(EType::Sortition, err); } info!(address = %msg.address, chain_id = chain_id, "Node removed from sortition state"); @@ -336,7 +336,7 @@ impl Handler for Sortition { Ok(state_map) }) { - self.bus.err(EnclaveErrorType::Sortition, err); + self.bus.err(EType::Sortition, err); } } } @@ -363,7 +363,7 @@ impl Handler for Sortition { } Ok(state_map) }) { - self.bus.err(EnclaveErrorType::Sortition, err); + self.bus.err(EType::Sortition, err); } } } @@ -386,7 +386,7 @@ impl Handler for Sortition { ); Ok(state_map) }) { - self.bus.err(EnclaveErrorType::Sortition, err); + self.bus.err(EType::Sortition, err); } } } @@ -425,7 +425,7 @@ impl Handler for Sortition { Ok(state_map) }) { - self.bus.err(EnclaveErrorType::Sortition, err); + self.bus.err(EType::Sortition, err); } } } @@ -474,7 +474,7 @@ impl Handler for Sortition { Ok(state_map) }) { - self.bus.err(EnclaveErrorType::Sortition, err); + self.bus.err(EType::Sortition, err); } } } @@ -493,7 +493,7 @@ impl Handler for Sortition { committees.insert(msg.e3_id.clone(), msg.committee.clone()); Ok(committees) }) { - self.bus.err(EnclaveErrorType::Sortition, err); + self.bus.err(EType::Sortition, err); } } } @@ -514,7 +514,7 @@ impl Handler for Sortition { backend .get_index(msg.seed, msg.size, msg.address.clone(), msg.chain_id, state) .unwrap_or_else(|err| { - bus.err(EnclaveErrorType::Sortition, err); + bus.err(EType::Sortition, err); None }) } else { diff --git a/crates/test-helpers/src/ciphernode_system.rs b/crates/test-helpers/src/ciphernode_system.rs index dd62af9ca5..2772ffac09 100644 --- a/crates/test-helpers/src/ciphernode_system.rs +++ b/crates/test-helpers/src/ciphernode_system.rs @@ -205,7 +205,7 @@ mod tests { let bus = EventBus::::new(EventBusConfig { deduplicate: true }).start(); let history = EventBus::::history(&bus); let errors = EventBus::::error(&bus); - let bus = BusHandle::new(bus); + let bus = BusHandle::new_from_consumer(bus); Ok(CiphernodeHandle { address, diff --git a/crates/test-helpers/src/lib.rs b/crates/test-helpers/src/lib.rs index 2057b52416..c430d24db5 100644 --- a/crates/test-helpers/src/lib.rs +++ b/crates/test-helpers/src/lib.rs @@ -72,7 +72,7 @@ pub fn create_crp_bytes_params( pub fn get_common_setup( param_set: Option, ) -> Result<( - BusHandle, + BusHandle, SharedRng, Seed, Arc, @@ -126,9 +126,9 @@ pub fn get_common_setup( /// ``` pub fn simulate_libp2p_net(nodes: &[CiphernodeHandle]) { for node in nodes.iter() { - let source = &node.bus(); + let source = node.bus().consumer(); for (_, node) in nodes.iter().enumerate() { - let dest = &node.bus(); + let dest = node.bus().consumer(); if source != dest { EventBus::pipe_filter( source, @@ -159,13 +159,13 @@ pub fn create_random_eth_addrs(how_many: u32) -> Vec { /// Test helper to add addresses to the committee by creating events on the event bus #[derive(Clone, Debug)] pub struct AddToCommittee { - bus: BusHandle, + bus: BusHandle, count: usize, chain_id: u64, } impl AddToCommittee { - pub fn new(bus: &BusHandle, chain_id: u64) -> Self { + pub fn new(bus: &BusHandle, chain_id: u64) -> Self { Self { bus: bus.clone(), chain_id, @@ -182,7 +182,7 @@ impl AddToCommittee { self.count += 1; - self.bus.publish(evt.clone()); + self.bus.publish(evt.clone())?; Ok(evt.into()) } diff --git a/crates/test-helpers/src/plaintext_writer.rs b/crates/test-helpers/src/plaintext_writer.rs index 5e77a12a4e..c74a111d3c 100644 --- a/crates/test-helpers/src/plaintext_writer.rs +++ b/crates/test-helpers/src/plaintext_writer.rs @@ -17,7 +17,7 @@ pub struct PlaintextWriter { } impl PlaintextWriter { - pub fn attach(path: &PathBuf, bus: BusHandle) -> Addr { + pub fn attach(path: &PathBuf, bus: BusHandle) -> Addr { let addr = Self { path: path.to_owned(), } diff --git a/crates/test-helpers/src/public_key_writer.rs b/crates/test-helpers/src/public_key_writer.rs index bdd2523725..e2c7bec70f 100644 --- a/crates/test-helpers/src/public_key_writer.rs +++ b/crates/test-helpers/src/public_key_writer.rs @@ -16,7 +16,7 @@ pub struct PublicKeyWriter { } impl PublicKeyWriter { - pub fn attach(path: &PathBuf, bus: BusHandle) -> Addr { + pub fn attach(path: &PathBuf, bus: BusHandle) -> Addr { let addr = Self { path: path.to_owned(), } diff --git a/crates/tests/Cargo.toml b/crates/tests/Cargo.toml index 022576f966..fe94458915 100644 --- a/crates/tests/Cargo.toml +++ b/crates/tests/Cargo.toml @@ -43,3 +43,6 @@ tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } zeroize = { workspace = true } + +[dev-dependencies] +e3-events = { workspace = true, features = ["test-helpers"] } diff --git a/crates/tests/tests/integration.rs b/crates/tests/tests/integration.rs index deb78cd45c..0b13895c1c 100644 --- a/crates/tests/tests/integration.rs +++ b/crates/tests/tests/integration.rs @@ -33,7 +33,7 @@ pub fn save_snapshot(file_name: &str, bytes: &[u8]) { } async fn setup_score_sortition_environment( - bus: &BusHandle, + bus: &BusHandle, eth_addrs: &Vec, chain_id: u64, ) -> Result<()> { @@ -42,7 +42,7 @@ async fn setup_score_sortition_environment( old_value: U256::ZERO, new_value: U256::from(10_000_000u64), chain_id, - }); + })?; let mut adder = AddToCommittee::new(bus, chain_id); for addr in eth_addrs { @@ -54,13 +54,13 @@ async fn setup_score_sortition_environment( new_balance: U256::from(1_000_000_000u64), reason: FixedBytes::ZERO, chain_id, - }); + })?; bus.publish(OperatorActivationChanged { operator: addr.clone(), active: true, chain_id, - }); + })?; } Ok(()) @@ -118,10 +118,9 @@ async fn test_trbfv_actor() -> Result<()> { let rng = create_shared_rng_from_u64(42); // Create "trigger" bus - let bus: BusHandle = - EventBus::::new(EventBusConfig { deduplicate: true }) - .start() - .into(); + let bus: BusHandle = EventBus::::new(EventBusConfig { deduplicate: true }) + .start() + .into(); // Parameters (128bits of security) let (degree, plaintext_modulus, moduli) = ( @@ -189,7 +188,7 @@ async fn test_trbfv_actor() -> Result<()> { .with_pubkey_aggregation() .with_sortition_score() .with_threshold_plaintext_aggregation() - .testmode_with_forked_bus(&bus.bus()) + .testmode_with_forked_bus(bus.consumer()) .with_logging() .build() .await @@ -202,7 +201,7 @@ async fn test_trbfv_actor() -> Result<()> { .with_injected_multithread(multithread.clone()) .with_trbfv() .with_sortition_score() - .testmode_with_forked_bus(&bus.bus()) + .testmode_with_forked_bus(bus.consumer()) .with_logging() .build() .await @@ -247,7 +246,7 @@ async fn test_trbfv_actor() -> Result<()> { params, }; - bus.publish(e3_requested); + bus.publish(e3_requested)?; // For score sortition, we need to wait for nodes to process E3Requested and run sortition // Since TicketGenerated is a local-only event (not shared across network), we can't collect it @@ -273,7 +272,7 @@ async fn test_trbfv_actor() -> Result<()> { e3_id: e3_id.clone(), committee, chain_id, - }); + })?; let committee_finalized_timer = Instant::now(); @@ -370,7 +369,7 @@ async fn test_trbfv_actor() -> Result<()> { e3_id: e3_id.clone(), }; - bus.publish(ciphertext_published_event.clone()); + bus.publish(ciphertext_published_event.clone())?; println!("CiphertextOutputPublished event has been dispatched!"); diff --git a/crates/tests/tests/integration_legacy.rs b/crates/tests/tests/integration_legacy.rs index 0c0ba861c3..be35b47dbe 100644 --- a/crates/tests/tests/integration_legacy.rs +++ b/crates/tests/tests/integration_legacy.rs @@ -16,6 +16,7 @@ use e3_data::InMemStore; use e3_events::BusHandle; use e3_events::EnclaveEventData; use e3_events::GetEvents; +use e3_events::Unsequenced; use e3_events::{ prelude::*, CiphernodeSelected, CiphertextOutputPublished, CommitteeFinalized, ConfigurationUpdated, E3Requested, E3id, EnclaveEvent, EventBus, EventBusConfig, @@ -47,7 +48,7 @@ use tokio::sync::{broadcast, Mutex}; use tokio::time::sleep; async fn setup_local_ciphernode( - bus: &BusHandle, + bus: &BusHandle, rng: &SharedRng, logging: bool, addr: &str, @@ -57,7 +58,7 @@ async fn setup_local_ciphernode( let mut builder = CiphernodeBuilder::new(rng.clone(), cipher.clone()) .with_keyshare() .with_address(addr) - .testmode_with_forked_bus(&bus.bus()) + .testmode_with_forked_bus(bus.consumer()) .testmode_with_history() .testmode_with_errors() .with_pubkey_aggregation() @@ -102,7 +103,7 @@ fn generate_pk_shares( } async fn create_local_ciphernodes( - bus: &BusHandle, + bus: &BusHandle, rng: &SharedRng, count: u32, cipher: &Arc, @@ -120,7 +121,7 @@ async fn create_local_ciphernodes( } async fn setup_score_sortition_environment( - bus: &BusHandle, + bus: &BusHandle, eth_addrs: &Vec, chain_id: u64, ) -> Result<()> { @@ -129,7 +130,7 @@ async fn setup_score_sortition_environment( old_value: U256::ZERO, new_value: U256::from(10_000_000u64), chain_id, - }); + })?; let mut adder = AddToCommittee::new(bus, chain_id); for addr in eth_addrs { @@ -141,13 +142,13 @@ async fn setup_score_sortition_environment( new_balance: U256::from(1_000_000_000u64), reason: FixedBytes::ZERO, chain_id, - }); + })?; bus.publish(OperatorActivationChanged { operator: addr.clone(), active: true, chain_id, - }); + })?; } Ok(()) @@ -202,17 +203,17 @@ async fn test_public_key_aggregation_and_decryption() -> Result<()> { println!("Sending E3 event..."); // Send the computation requested event - bus.publish(e3_request_event.clone()); + bus.publish(e3_request_event.clone())?; // Test that we cannot send the same event twice - bus.publish(e3_request_event.clone()); + bus.publish(e3_request_event.clone())?; // Finalize committee with all available nodes bus.publish(CommitteeFinalized { e3_id: e3_id.clone(), committee: eth_addrs.clone(), chain_id: 1, - }); + })?; // Generate the test shares and pubkey let rng_test = create_shared_rng_from_u64(42); @@ -258,7 +259,7 @@ async fn test_public_key_aggregation_and_decryption() -> Result<()> { e3_id: e3_id.clone(), }; - bus.publish(ciphertext_published_event.clone()); + bus.publish(ciphertext_published_event.clone())?; let history = history_collector .send(TakeEvents::::new(6)) @@ -313,13 +314,13 @@ async fn test_stopped_keyshares_retain_state() -> Result<()> { seed: seed.clone(), params: ArcBytes::from_bytes(&encode_bfv_params(¶ms)), ..E3Requested::default() - }); + })?; bus.publish(CommitteeFinalized { e3_id: e3_id.clone(), committee: eth_addrs.clone(), chain_id: 1, - }); + })?; let history_collector = cn1.history().unwrap(); let error_collector = cn1.errors().unwrap(); @@ -331,7 +332,7 @@ async fn test_stopped_keyshares_retain_state() -> Result<()> { assert_eq!(errors.len(), 0); // SEND SHUTDOWN! - bus.publish(Shutdown); + bus.publish(Shutdown)?; // This is probably overkill but required to ensure that all the data is written sleep(Duration::from_secs(1)).await; @@ -406,7 +407,7 @@ async fn test_stopped_keyshares_retain_state() -> Result<()> { .map(|ct| ArcBytes::from_bytes(&ct.to_bytes())) .collect(), e3_id: e3_id.clone(), - }); + })?; let history = history_collector .send(TakeEvents::::new(5)) @@ -443,10 +444,9 @@ async fn test_p2p_actor_forwards_events_to_network() -> Result<()> { // Setup elements in test let (cmd_tx, mut cmd_rx) = mpsc::channel(100); // Transmit byte events to the network let (event_tx, _) = broadcast::channel(100); // Receive byte events from the network - let bus: BusHandle = - EventBus::::new(EventBusConfig { deduplicate: true }) - .start() - .into(); + let bus: BusHandle = EventBus::::new(EventBusConfig { deduplicate: true }) + .start() + .into(); let history_collector = HistoryCollector::::new().start(); bus.subscribe("*", history_collector.clone().recipient()); let event_rx = Arc::new(event_tx.subscribe()); @@ -454,7 +454,7 @@ async fn test_p2p_actor_forwards_events_to_network() -> Result<()> { NetEventTranslator::setup(&bus, &cmd_tx, &event_rx, "my-topic"); // Capture messages from output on msgs vec - let msgs: Arc>> = Arc::new(Mutex::new(Vec::new())); + let msgs: Arc>> = Arc::new(Mutex::new(Vec::new())); let msgs_loop = msgs.clone(); @@ -468,8 +468,12 @@ async fn test_p2p_actor_forwards_events_to_network() -> Result<()> { e3_net::events::NetCommand::GossipPublish { data, .. } => Some(data), _ => None, } { - msgs_loop.lock().await.push(msg.clone()); - event_tx.send(NetEvent::GossipData(msg))?; + if let GossipData::GossipBytes(_) = msg { + let event: EnclaveEvent = msg.clone().try_into().unwrap(); + let (data, _) = event.split(); + msgs_loop.lock().await.push(data); + event_tx.send(NetEvent::GossipData(msg)).unwrap(); + } } // if this manages to broadcast an event to the // event bus we will expect to see an extra event on @@ -495,9 +499,9 @@ async fn test_p2p_actor_forwards_events_to_network() -> Result<()> { ..CiphernodeSelected::default() }; - bus.publish(evt_1.clone()); - bus.publish(evt_2.clone()); - bus.publish(local_evt_3.clone()); // This is a local event which should not be broadcast to the network + bus.publish(evt_1.clone())?; + bus.publish(evt_2.clone())?; + bus.publish(local_evt_3.clone())?; // This is a local event which should not be broadcast to the network // check the history of the event bus let history = history_collector @@ -506,10 +510,7 @@ async fn test_p2p_actor_forwards_events_to_network() -> Result<()> { assert_eq!( *msgs.lock().await, - vec![ - GossipData::GossipBytes(bus.event_from(evt_1.clone()).to_bytes()?), - GossipData::GossipBytes(bus.event_from(evt_2.clone()).to_bytes()?) - ], // notice no local events + vec![evt_1.clone().into(), evt_2.clone().into()], // notice no local events "NetEventTranslator did not transmit correct events to the network" ); @@ -547,13 +548,13 @@ async fn test_duplicate_e3_id_with_different_chain_id() -> Result<()> { seed: seed.clone(), params: ArcBytes::from_bytes(&encode_bfv_params(¶ms)), ..E3Requested::default() - }); + })?; bus.publish(CommitteeFinalized { e3_id: E3id::new("1234", 1), committee: eth_addrs.clone(), chain_id: 1, - }); + })?; // Generate the test shares and pubkey let rng_test = create_shared_rng_from_u64(42); @@ -584,13 +585,13 @@ async fn test_duplicate_e3_id_with_different_chain_id() -> Result<()> { seed: seed.clone(), params: ArcBytes::from_bytes(&encode_bfv_params(¶ms)), ..E3Requested::default() - }); + })?; bus.publish(CommitteeFinalized { e3_id: E3id::new("1234", 2), committee: eth_addrs.clone(), chain_id: 2, - }); + })?; let test_pubkey = aggregate_public_key(&generate_pk_shares( ¶ms, &crpoly, &rng_test, ð_addrs, @@ -620,10 +621,9 @@ async fn test_p2p_actor_forwards_events_to_bus() -> Result<()> { // Setup elements in test let (cmd_tx, _) = mpsc::channel(100); // Transmit byte events to the network let (event_tx, event_rx) = broadcast::channel(100); // Receive byte events from the network - let bus: BusHandle = - EventBus::::new(EventBusConfig { deduplicate: true }) - .start() - .into(); + let bus: BusHandle = EventBus::::new(EventBusConfig { deduplicate: true }) + .start() + .into(); let history_collector = HistoryCollector::::new().start(); bus.subscribe("*", history_collector.clone().recipient()); @@ -641,7 +641,7 @@ async fn test_p2p_actor_forwards_events_to_bus() -> Result<()> { // lets send an event from the network let _ = event_tx.send(NetEvent::GossipData(GossipData::GossipBytes( - bus.event_from(event.clone()).to_bytes()?, + bus.event_from(event.clone())?.to_bytes()?, ))); // check the history of the event bus