diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c843ba4e7c..ed3fdb9f92 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -116,7 +116,7 @@ jobs: run: pnpm rust:lint - name: Run Integration Tests - run: 'cargo test --test integration -- --nocapture && cargo test --test integration_legacy -- --nocapture' + run: 'cargo test --test integration -- --nocapture' build_e3_support_risc0: runs-on: ubuntu-latest diff --git a/crates/aggregator/src/ext.rs b/crates/aggregator/src/ext.rs index b605eb9eb7..b87289d50c 100644 --- a/crates/aggregator/src/ext.rs +++ b/crates/aggregator/src/ext.rs @@ -8,11 +8,9 @@ use std::sync::Arc; use crate::keyshare_created_filter_buffer::KeyshareCreatedFilterBuffer; use crate::{ - PlaintextAggregator, PlaintextAggregatorParams, PlaintextAggregatorState, - PlaintextRepositoryFactory, PublicKeyAggregator, PublicKeyAggregatorParams, - PublicKeyAggregatorState, PublicKeyRepositoryFactory, ThresholdPlaintextAggregator, - ThresholdPlaintextAggregatorParams, ThresholdPlaintextAggregatorState, - TrBfvPlaintextRepositoryFactory, + PublicKeyAggregator, PublicKeyAggregatorParams, PublicKeyAggregatorState, + PublicKeyRepositoryFactory, ThresholdPlaintextAggregator, ThresholdPlaintextAggregatorParams, + ThresholdPlaintextAggregatorState, TrBfvPlaintextRepositoryFactory, }; use actix::{Actor, Addr, Recipient}; use anyhow::{anyhow, Result}; @@ -25,127 +23,6 @@ use e3_fhe::Fhe; use e3_request::{E3Context, E3ContextSnapshot, E3Extension, META_KEY}; use e3_sortition::Sortition; -#[deprecated = "In favour of ThresholdPlaintextAggregatorExtension"] -pub struct PlaintextAggregatorExtension { - bus: BusHandle, - sortition: Addr, -} - -impl PlaintextAggregatorExtension { - pub fn create(bus: &BusHandle, sortition: &Addr) -> Box { - Box::new(Self { - bus: bus.clone(), - sortition: sortition.clone(), - }) - } -} - -const ERROR_PLAINTEXT_FHE_MISSING:&str = "Could not create PlaintextAggregator because the fhe instance it depends on was not set on the context."; -const ERROR_PLAINTEXT_META_MISSING:&str = "Could not create PlaintextAggregator because the meta instance it depends on was not set on the context."; - -#[async_trait] -impl E3Extension for PlaintextAggregatorExtension { - fn on_event(&self, ctx: &mut E3Context, evt: &EnclaveEvent) { - // Save plaintext aggregator - let EnclaveEventData::CiphertextOutputPublished(data) = evt.get_data() else { - return; - }; - - let Some(fhe) = ctx.get_dependency(FHE_KEY) else { - self.bus.err( - EType::PlaintextAggregation, - anyhow!(ERROR_PLAINTEXT_FHE_MISSING), - ); - return; - }; - - let Some(ref meta) = ctx.get_dependency(META_KEY) else { - self.bus.err( - EType::PlaintextAggregation, - anyhow!(ERROR_PLAINTEXT_META_MISSING), - ); - return; - }; - - let e3_id = data.e3_id.clone(); - let repo = ctx.repositories().plaintext(&e3_id); - - // This is a single ciphertext for the legacy PlaintextAggregator - let Some(single_ciphertext) = data.ciphertext_output.first() else { - self.bus.err( - EType::PlaintextAggregation, - anyhow!("Could not extract ciphertext from array"), - ); - return; - }; - - let sync_state = repo.send(Some(PlaintextAggregatorState::init( - meta.threshold_m, - meta.threshold_n, - meta.seed, - single_ciphertext.clone(), - ))); - - ctx.set_event_recipient( - "plaintext", - Some( - PlaintextAggregator::new( - PlaintextAggregatorParams { - fhe: fhe.clone(), - bus: self.bus.clone(), - sortition: self.sortition.clone(), - e3_id: e3_id.clone(), - }, - sync_state, - ) - .start() - .into(), - ), - ); - } - - async fn hydrate(&self, ctx: &mut E3Context, snapshot: &E3ContextSnapshot) -> Result<()> { - // No ID on the snapshot -> bail - if !snapshot.contains("plaintext") { - return Ok(()); - } - - let repo = ctx.repositories().plaintext(&snapshot.e3_id); - let sync_state = repo.load().await?; - - // No Snapshot returned from the store -> bail - if !sync_state.has() { - return Ok(()); - }; - - // Get deps - let Some(fhe) = ctx.get_dependency(FHE_KEY) else { - self.bus.err( - EType::PlaintextAggregation, - anyhow!(ERROR_PLAINTEXT_FHE_MISSING), - ); - return Ok(()); - }; - - let value = PlaintextAggregator::new( - PlaintextAggregatorParams { - fhe: fhe.clone(), - bus: self.bus.clone(), - sortition: self.sortition.clone(), - e3_id: ctx.e3_id.clone(), - }, - sync_state, - ) - .start() - .into(); - - // send to context - ctx.set_event_recipient("plaintext", Some(value)); - - Ok(()) - } -} - pub struct PublicKeyAggregatorExtension { bus: BusHandle, } diff --git a/crates/aggregator/src/lib.rs b/crates/aggregator/src/lib.rs index f335f2c737..6695841161 100644 --- a/crates/aggregator/src/lib.rs +++ b/crates/aggregator/src/lib.rs @@ -7,14 +7,10 @@ mod committee_finalizer; pub mod ext; mod keyshare_created_filter_buffer; -mod plaintext_aggregator; mod publickey_aggregator; mod repo; mod threshold_plaintext_aggregator; pub use committee_finalizer::CommitteeFinalizer; -pub use plaintext_aggregator::{ - PlaintextAggregator, PlaintextAggregatorParams, PlaintextAggregatorState, -}; pub use publickey_aggregator::{ PublicKeyAggregator, PublicKeyAggregatorParams, PublicKeyAggregatorState, }; diff --git a/crates/aggregator/src/plaintext_aggregator.rs b/crates/aggregator/src/plaintext_aggregator.rs deleted file mode 100644 index f78b85508d..0000000000 --- a/crates/aggregator/src/plaintext_aggregator.rs +++ /dev/null @@ -1,253 +0,0 @@ -// SPDX-License-Identifier: LGPL-3.0-only -// -// This file is provided WITHOUT ANY WARRANTY; -// without even the implied warranty of MERCHANTABILITY -// or FITNESS FOR A PARTICULAR PURPOSE. - -use actix::prelude::*; -use anyhow::Result; -use e3_data::Persistable; -use e3_events::{ - prelude::*, BusHandle, DecryptionshareCreated, Die, E3id, EnclaveEvent, EnclaveEventData, - OrderedSet, PlaintextAggregated, Seed, -}; -use e3_fhe::{Fhe, GetAggregatePlaintext}; -use e3_sortition::{GetNodeIndex, Sortition}; -use e3_utils::ArcBytes; -use std::sync::Arc; -use tracing::error; - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub enum PlaintextAggregatorState { - Collecting { - threshold_m: usize, - threshold_n: usize, - shares: OrderedSet>, - seed: Seed, - ciphertext_output: ArcBytes, - }, - Computing { - shares: OrderedSet>, - ciphertext_output: ArcBytes, - }, - Complete { - decrypted: Vec, - shares: OrderedSet>, - }, -} - -impl PlaintextAggregatorState { - pub fn init( - threshold_m: usize, - threshold_n: usize, - seed: Seed, - ciphertext_output: ArcBytes, - ) -> Self { - PlaintextAggregatorState::Collecting { - threshold_m, - threshold_n, - shares: OrderedSet::new(), - seed, - ciphertext_output, - } - } - - pub fn get_name(&self) -> String { - match self { - PlaintextAggregatorState::Collecting { .. } => "Collecting", - PlaintextAggregatorState::Computing { .. } => "Computing", - PlaintextAggregatorState::Complete { .. } => "Complete", - } - .to_string() - } -} - -#[derive(Message)] -#[rtype(result = "anyhow::Result<()>")] -struct ComputeAggregate { - pub shares: OrderedSet>, - pub ciphertext_output: Vec, -} - -#[deprecated = "To be replaced by ThresholdPlaintextAggregator"] -pub struct PlaintextAggregator { - fhe: Arc, - bus: BusHandle, - sortition: Addr, - e3_id: E3id, - state: Persistable, -} - -pub struct PlaintextAggregatorParams { - pub fhe: Arc, - pub bus: BusHandle, - pub sortition: Addr, - pub e3_id: E3id, -} - -impl PlaintextAggregator { - pub fn new( - params: PlaintextAggregatorParams, - state: Persistable, - ) -> Self { - PlaintextAggregator { - fhe: params.fhe, - bus: params.bus, - sortition: params.sortition, - e3_id: params.e3_id, - state, - } - } - - pub fn add_share(&mut self, share: Vec) -> Result<()> { - self.state.try_mutate(|mut state| { - let PlaintextAggregatorState::Collecting { - // NOTE: In the deprecated PlaintextAggregator we need all shares to - // decrypt so here we set threshold_n - threshold_n, - shares, - ciphertext_output, - .. - } = &mut state - else { - return Err(anyhow::anyhow!("Can only add share in Collecting state")); - }; - - shares.insert(share); - - if shares.len() == *threshold_n { - return Ok(PlaintextAggregatorState::Computing { - shares: shares.clone(), - ciphertext_output: ciphertext_output.clone(), - }); - } - - Ok(state) - }) - } - - pub fn set_decryption(&mut self, decrypted: Vec) -> Result<()> { - self.state.try_mutate(|mut state| { - let PlaintextAggregatorState::Computing { shares, .. } = &mut state else { - return Ok(state.clone()); - }; - let shares = shares.to_owned(); - - Ok(PlaintextAggregatorState::Complete { decrypted, shares }) - }) - } -} - -impl Actor for PlaintextAggregator { - type Context = Context; -} - -impl Handler for PlaintextAggregator { - type Result = (); - fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { - match msg.into_data() { - EnclaveEventData::DecryptionshareCreated(data) => ctx.notify(data), - EnclaveEventData::E3RequestComplete(_) => ctx.notify(Die), - _ => (), - } - } -} - -impl Handler for PlaintextAggregator { - type Result = ResponseActFuture>; - - fn handle(&mut self, event: DecryptionshareCreated, _: &mut Self::Context) -> Self::Result { - let Some(PlaintextAggregatorState::Collecting { - threshold_n, seed, .. - }) = self.state.get() - else { - let name = self.state.get().map(|s| s.get_name()); - error!( - "Aggregator has been closed for collecting. {}", - name.unwrap_or("Unknown".to_string()) - ); - return Box::pin(fut::ready(Ok(()))); - }; - - let size = threshold_n; - let address = event.node; - let chain_id = event.e3_id.chain_id(); - let e3_id = event.e3_id.clone(); - let decryption_share = event.decryption_share.clone(); - - Box::pin( - self.sortition - .send(GetNodeIndex { - chain_id, - address, - size, - seed, - }) - .into_actor(self) - .map(move |res, act, ctx| { - let maybe_found_index = res?; - let Some(_) = maybe_found_index else { - error!("Node not found in committee"); - return Ok(()); - }; - - if e3_id != act.e3_id { - error!("Wrong e3_id sent to aggregator. This should not happen."); - return Ok(()); - } - - // add the keyshare and - let Some(share) = decryption_share.first() else { - error!("Share not found in decryption_share vector"); - return Ok(()); - }; - - act.add_share(share.extract_bytes())?; - - // Check the state and if it has changed to the computing - if let Some(PlaintextAggregatorState::Computing { - shares, - ciphertext_output, - }) = &act.state.get() - { - ctx.notify(ComputeAggregate { - shares: shares.clone(), - ciphertext_output: ciphertext_output.to_vec(), - }) - } - - Ok(()) - }), - ) - } -} - -impl Handler for PlaintextAggregator { - type Result = Result<()>; - fn handle(&mut self, msg: ComputeAggregate, _: &mut Self::Context) -> Self::Result { - let decrypted_output = self.fhe.get_aggregate_plaintext(GetAggregatePlaintext { - decryptions: msg.shares.clone(), - ciphertext_output: msg.ciphertext_output, - })?; - - // Update the local state - self.set_decryption(decrypted_output.clone())?; - - // Dispatch the PlaintextAggregated event - let event = PlaintextAggregated { - decrypted_output: vec![ArcBytes::from_bytes(&decrypted_output)], - e3_id: self.e3_id.clone(), - }; - - self.bus.publish(event)?; - - Ok(()) - } -} - -impl Handler for PlaintextAggregator { - type Result = (); - fn handle(&mut self, _: Die, ctx: &mut Self::Context) -> Self::Result { - ctx.stop() - } -} diff --git a/crates/aggregator/src/repo.rs b/crates/aggregator/src/repo.rs index 67865654a5..769c7da7d2 100644 --- a/crates/aggregator/src/repo.rs +++ b/crates/aggregator/src/repo.rs @@ -8,19 +8,7 @@ use e3_config::StoreKeys; use e3_data::{Repositories, Repository}; use e3_events::E3id; -use crate::{ - PlaintextAggregatorState, PublicKeyAggregatorState, ThresholdPlaintextAggregatorState, -}; - -pub trait PlaintextRepositoryFactory { - fn plaintext(&self, e3_id: &E3id) -> Repository; -} - -impl PlaintextRepositoryFactory for Repositories { - fn plaintext(&self, e3_id: &E3id) -> Repository { - Repository::new(self.store.scope(StoreKeys::plaintext(e3_id))) - } -} +use crate::{PublicKeyAggregatorState, ThresholdPlaintextAggregatorState}; pub trait TrBfvPlaintextRepositoryFactory { fn trbfv_plaintext(&self, e3_id: &E3id) -> Repository; diff --git a/crates/ciphernode-builder/src/ciphernode_builder.rs b/crates/ciphernode-builder/src/ciphernode_builder.rs index 2c09de7996..9f1d522c21 100644 --- a/crates/ciphernode-builder/src/ciphernode_builder.rs +++ b/crates/ciphernode-builder/src/ciphernode_builder.rs @@ -9,10 +9,7 @@ use actix::{Actor, Addr}; use alloy::signers::{k256::ecdsa::SigningKey, local::LocalSigner}; use anyhow::Result; use derivative::Derivative; -use e3_aggregator::ext::{ - PlaintextAggregatorExtension, PublicKeyAggregatorExtension, - ThresholdPlaintextAggregatorExtension, -}; +use e3_aggregator::ext::{PublicKeyAggregatorExtension, ThresholdPlaintextAggregatorExtension}; use e3_config::chain_config::ChainConfig; use e3_crypto::Cipher; use e3_data::{InMemStore, Repositories, RepositoriesFactory}; @@ -28,7 +25,7 @@ use e3_evm::{ HistoricalEventCoordinator, }; use e3_fhe::ext::FheExtension; -use e3_keyshare::ext::{KeyshareExtension, ThresholdKeyshareExtension}; +use e3_keyshare::ext::ThresholdKeyshareExtension; use e3_multithread::{Multithread, MultithreadReport, TaskPool}; use e3_request::E3Router; use e3_sortition::{ @@ -36,7 +33,6 @@ use e3_sortition::{ NodeStateRepositoryFactory, Sortition, SortitionBackend, SortitionRepositoryFactory, }; use e3_utils::{rand_eth_addr, SharedRng}; -use rayon::ThreadPool; use std::{collections::HashMap, path::PathBuf, sync::Arc}; use tracing::{error, info}; @@ -66,7 +62,6 @@ pub struct CiphernodeBuilder { multithread_concurrent_jobs: Option, multithread_report: Option>, name: String, - plaintext_agg: bool, pubkey_agg: bool, rng: SharedRng, sortition_backend: SortitionBackend, @@ -95,7 +90,6 @@ pub enum BusMode { #[derive(Clone, Debug)] pub enum KeyshareKind { Threshold, - NonThreshold, // Soft Deprecated } impl CiphernodeBuilder { @@ -118,7 +112,6 @@ impl CiphernodeBuilder { multithread_concurrent_jobs: None, multithread_report: None, name: name.to_owned(), - plaintext_agg: false, pubkey_agg: false, rng, sortition_backend: SortitionBackend::score(), @@ -150,13 +143,6 @@ impl CiphernodeBuilder { self } - /// Use the Deprecated Keyshare feature - #[deprecated = "in future versions we will migrate to with_trbfv()"] - pub fn with_keyshare(mut self) -> Self { - self.keyshare = Some(KeyshareKind::NonThreshold); - self - } - /// Use the given in-mem datastore. This is useful for injecting a store dump. pub fn with_in_mem_datastore(mut self, store: &Addr) -> Self { self.in_mem_store = Some(store.to_owned()); @@ -212,12 +198,6 @@ impl CiphernodeBuilder { self } - /// Do plaintext aggregation - pub fn with_plaintext_aggregation(mut self) -> Self { - self.plaintext_agg = true; - self - } - /// Connect rayon work to the given threadpool pub fn with_shared_taskpool(mut self, pool: &TaskPool) -> Self { self.task_pool = Some(pool.clone()); @@ -485,10 +465,7 @@ impl CiphernodeBuilder { )) } - if matches!(self.keyshare, Some(KeyshareKind::NonThreshold)) - || self.pubkey_agg - || self.plaintext_agg - { + if self.pubkey_agg { info!("Setting up FheExtension"); e3_builder = e3_builder.with(FheExtension::create(&bus, &self.rng)) } @@ -498,23 +475,13 @@ impl CiphernodeBuilder { e3_builder = e3_builder.with(PublicKeyAggregatorExtension::create(&bus)) } - if self.plaintext_agg { - info!("Setting up PlaintextAggregationExtension (legacy)"); - e3_builder = e3_builder.with(PlaintextAggregatorExtension::create(&bus, &sortition)) - } - if self.threshold_plaintext_agg { - info!("Setting up ThresholdPlaintextAggregatorExtension NEW!"); + info!("Setting up ThresholdPlaintextAggregatorExtension"); let _ = self.ensure_multithread(&bus); e3_builder = e3_builder.with(ThresholdPlaintextAggregatorExtension::create( &bus, &sortition, )) } - - if matches!(self.keyshare, Some(KeyshareKind::NonThreshold)) { - info!("Setting up KeyshareExtension (legacy)!"); - e3_builder = e3_builder.with(KeyshareExtension::create(&bus, &addr, &self.cipher)) - } info!("building..."); e3_builder.build().await?; diff --git a/crates/keyshare/src/ext.rs b/crates/keyshare/src/ext.rs index ab2a47d908..11ff88e46d 100644 --- a/crates/keyshare/src/ext.rs +++ b/crates/keyshare/src/ext.rs @@ -5,8 +5,8 @@ // or FITNESS FOR A PARTICULAR PURPOSE. use crate::{ - Keyshare, KeyshareParams, KeyshareRepositoryFactory, KeyshareState, ThresholdKeyshare, - ThresholdKeyshareParams, ThresholdKeyshareRepositoryFactory, ThresholdKeyshareState, + ThresholdKeyshare, ThresholdKeyshareParams, ThresholdKeyshareRepositoryFactory, + ThresholdKeyshareState, }; use actix::Actor; use anyhow::{anyhow, Result}; @@ -14,102 +14,10 @@ use async_trait::async_trait; use e3_crypto::Cipher; use e3_data::{AutoPersist, RepositoriesFactory}; use e3_events::{prelude::*, BusHandle, EType, EnclaveEvent, EnclaveEventData}; -use e3_fhe::ext::FHE_KEY; use e3_request::{E3Context, E3ContextSnapshot, E3Extension, META_KEY}; use std::sync::Arc; -pub struct KeyshareExtension { - bus: BusHandle, - address: String, - cipher: Arc, -} - -impl KeyshareExtension { - pub fn create(bus: &BusHandle, address: &str, cipher: &Arc) -> Box { - Box::new(Self { - bus: bus.clone(), - address: address.to_owned(), - cipher: cipher.to_owned(), - }) - } -} - -const ERROR_KEYSHARE_FHE_MISSING: &str = - "Could not create Keyshare because the fhe instance it depends on was not set on the context."; - -#[async_trait] -impl E3Extension for KeyshareExtension { - fn on_event(&self, ctx: &mut E3Context, evt: &EnclaveEvent) { - // if this is NOT a CiphernodeSelected event then ignore - let EnclaveEventData::CiphernodeSelected(data) = evt.get_data() else { - return; - }; - - // Has the FHE dependency been already setup? (hint: it should have) - let Some(fhe) = ctx.get_dependency(FHE_KEY) else { - self.bus - .err(EType::KeyGeneration, anyhow!(ERROR_KEYSHARE_FHE_MISSING)); - return; - }; - - let e3_id = data.clone().e3_id; - let repo = ctx.repositories().keyshare(&e3_id); - let container = repo.send(None); // New container with None - - ctx.set_event_recipient( - "keyshare", - Some( - Keyshare::new(KeyshareParams { - bus: self.bus.clone(), - secret: container, - fhe: fhe.clone(), - address: self.address.clone(), - cipher: self.cipher.clone(), - }) - .start() - .into(), - ), - ); - } - - async fn hydrate(&self, ctx: &mut E3Context, snapshot: &E3ContextSnapshot) -> Result<()> { - // No keyshare on the snapshot -> bail - if !snapshot.contains("keyshare") { - return Ok(()); - }; - - // Get the saved state as a persistable - let sync_secret = ctx.repositories().keyshare(&snapshot.e3_id).load().await?; - - // No Snapshot returned from the sync_secret -> bail - if !sync_secret.has() { - return Ok(()); - }; - - // Has the FHE dependency been already setup? (hint: it should have) - let Some(fhe) = ctx.get_dependency(FHE_KEY) else { - self.bus - .err(EType::KeyGeneration, anyhow!(ERROR_KEYSHARE_FHE_MISSING)); - return Ok(()); - }; - - // Construct from snapshot - let value = Keyshare::new(KeyshareParams { - fhe: fhe.clone(), - bus: self.bus.clone(), - secret: sync_secret, - address: self.address.clone(), - cipher: self.cipher.clone(), - }) - .start() - .into(); - - // send to context - ctx.set_event_recipient("keyshare", Some(value)); - - Ok(()) - } -} +use crate::KeyshareState; pub struct ThresholdKeyshareExtension { bus: BusHandle, diff --git a/crates/keyshare/src/keyshare.rs b/crates/keyshare/src/keyshare.rs deleted file mode 100644 index ff80744861..0000000000 --- a/crates/keyshare/src/keyshare.rs +++ /dev/null @@ -1,169 +0,0 @@ -// SPDX-License-Identifier: LGPL-3.0-only -// -// This file is provided WITHOUT ANY WARRANTY; -// without even the implied warranty of MERCHANTABILITY -// or FITNESS FOR A PARTICULAR PURPOSE. - -use actix::prelude::*; -use anyhow::{anyhow, Context as AnyhowContext, Result}; -use e3_crypto::Cipher; -use e3_data::Persistable; -use e3_events::{ - prelude::*, trap, BusHandle, CiphernodeSelected, CiphertextOutputPublished, - DecryptionshareCreated, Die, E3RequestComplete, EType, EnclaveEvent, EnclaveEventData, - KeyshareCreated, -}; -use e3_fhe::{DecryptCiphertext, Fhe}; -use e3_utils::utility_types::ArcBytes; -use std::sync::Arc; -use tracing::warn; - -pub struct Keyshare { - fhe: Arc, - bus: BusHandle, - secret: Persistable>, - address: String, - cipher: Arc, -} - -impl Actor for Keyshare { - type Context = actix::Context; -} - -pub struct KeyshareParams { - pub bus: BusHandle, - pub secret: Persistable>, - pub fhe: Arc, - pub address: String, - pub cipher: Arc, -} - -impl Keyshare { - pub fn new(params: KeyshareParams) -> Self { - Self { - bus: params.bus, - fhe: params.fhe, - secret: params.secret, - address: params.address, - cipher: params.cipher, - } - } - - fn set_secret(&mut self, mut data: Vec) -> Result<()> { - let encrypted = self.cipher.encrypt_data(&mut data)?; - - self.secret.set(encrypted); - - Ok(()) - } - - fn get_secret(&self) -> Result> { - let encrypted = self - .secret - .get() - .ok_or(anyhow!("State was not stored on keyshare"))?; - - let decrypted = self.cipher.decrypt_data(&encrypted)?; - - Ok(decrypted) - } - - fn clear_secret(&mut self) { - self.secret.clear(); - } -} - -impl Handler for Keyshare { - type Result = (); - - fn handle(&mut self, event: EnclaveEvent, ctx: &mut actix::Context) -> Self::Result { - match event.into_data() { - EnclaveEventData::CiphernodeSelected(data) => ctx.notify(data), - EnclaveEventData::CiphertextOutputPublished(data) => ctx.notify(data), - EnclaveEventData::E3RequestComplete(data) => ctx.notify(data), - EnclaveEventData::Shutdown(_) => ctx.notify(Die), - _ => (), - } - } -} - -impl Handler for Keyshare { - type Result = (); - - fn handle(&mut self, event: CiphernodeSelected, _: &mut actix::Context) -> Self::Result { - trap(EType::KeyGeneration, &self.bus.clone(), || { - let CiphernodeSelected { e3_id, .. } = event; - - // generate keyshare - let (secret, pubkey) = self - .fhe - .generate_keyshare() - .with_context(|| format!("Error creating Keyshare for {}", e3_id))?; - - // Save secret on state - self.set_secret(secret)?; - - // Broadcast the KeyshareCreated message - self.bus.publish(KeyshareCreated { - pubkey, - e3_id, - node: self.address.clone(), - })?; - - Ok(()) - }) - } -} - -impl Handler for Keyshare { - type Result = (); - - fn handle( - &mut self, - event: CiphertextOutputPublished, - _: &mut actix::Context, - ) -> Self::Result { - trap(EType::Decryption, &self.bus.clone(), || { - let CiphertextOutputPublished { - e3_id, - ciphertext_output, - } = event; - - let secret = self.get_secret()?; - - let ciphertext = ciphertext_output - .first() - .ok_or(anyhow!("Ciphernode output array is empty!"))?; - - let decryption_share = self.fhe.decrypt_ciphertext(DecryptCiphertext { - ciphertext: ciphertext.extract_bytes(), - unsafe_secret: secret, - })?; - - self.bus.publish(DecryptionshareCreated { - party_id: 0, // Not used - e3_id, - decryption_share: vec![ArcBytes::from_bytes(&decryption_share)], - node: self.address.clone(), - })?; - - Ok(()) - }) - } -} - -impl Handler for Keyshare { - type Result = (); - fn handle(&mut self, _: E3RequestComplete, ctx: &mut Self::Context) -> Self::Result { - self.clear_secret(); - ctx.notify(Die); - } -} - -impl Handler for Keyshare { - type Result = (); - fn handle(&mut self, _: Die, ctx: &mut Self::Context) -> Self::Result { - warn!("Keyshare is shutting down now"); - ctx.stop() - } -} diff --git a/crates/keyshare/src/lib.rs b/crates/keyshare/src/lib.rs index 98f52ab4a5..c9d9c80cd9 100644 --- a/crates/keyshare/src/lib.rs +++ b/crates/keyshare/src/lib.rs @@ -6,11 +6,9 @@ mod encryption_key_collector; pub mod ext; -mod keyshare; mod repo; mod threshold_keyshare; mod threshold_share_collector; pub use encryption_key_collector::{AllEncryptionKeysCollected, EncryptionKeyCollector}; -pub use keyshare::*; pub use repo::*; pub use threshold_keyshare::*; diff --git a/crates/keyshare/src/repo.rs b/crates/keyshare/src/repo.rs index b129f98490..e7d17eb618 100644 --- a/crates/keyshare/src/repo.rs +++ b/crates/keyshare/src/repo.rs @@ -10,16 +10,6 @@ use e3_events::E3id; use crate::ThresholdKeyshareState; -pub trait KeyshareRepositoryFactory { - fn keyshare(&self, e3_id: &E3id) -> Repository>; -} - -impl KeyshareRepositoryFactory for Repositories { - fn keyshare(&self, e3_id: &E3id) -> Repository> { - Repository::new(self.store.scope(StoreKeys::keyshare(e3_id))) - } -} - pub trait ThresholdKeyshareRepositoryFactory { fn threshold_keyshare(&self, e3_id: &E3id) -> Repository; } diff --git a/crates/tests/tests/integration.rs b/crates/tests/tests/integration.rs index 5fbae9fd85..4e3e11f883 100644 --- a/crates/tests/tests/integration.rs +++ b/crates/tests/tests/integration.rs @@ -440,3 +440,562 @@ async fn test_trbfv_actor() -> Result<()> { Ok(()) } + +// ============================================================================ +// Networking and P2P Tests +// ============================================================================ + +#[actix::test] +async fn test_p2p_actor_forwards_events_to_network() -> Result<()> { + use e3_events::{CiphernodeSelected, EnclaveEvent, TakeEvents, Unsequenced}; + use e3_net::events::GossipData; + use e3_net::{events::NetEvent, NetEventTranslator}; + use std::sync::Arc; + use tokio::sync::mpsc; + use tokio::sync::{broadcast, Mutex}; + + // Setup elements in test + let (cmd_tx, mut cmd_rx) = mpsc::channel(100); // Transmit byte events to the network + let (event_tx, _) = broadcast::channel(100); // Receive byte events from the network + let system = EventSystem::new("test"); + let bus = system.handle()?; + let history_collector = bus.history(); + let event_rx = Arc::new(event_tx.subscribe()); + // Pas cmd and event channels to NetEventTranslator + NetEventTranslator::setup(&bus, &cmd_tx, &event_rx, "my-topic"); + + // Capture messages from output on msgs vec + let msgs: Arc>> = Arc::new(Mutex::new(Vec::new())); + + let msgs_loop = msgs.clone(); + + tokio::spawn(async move { + // Pull events from command channel + while let Some(cmd) = cmd_rx.recv().await { + // If the command is a GossipPublish then extract it and save it whilst sending it to + // the event bus as if it was gossiped from the network and ended up as an external + // message this simulates a rebroadcast message + if let Some(msg) = match cmd { + e3_net::events::NetCommand::GossipPublish { data, .. } => Some(data), + _ => None, + } { + if let GossipData::GossipBytes(_) = msg { + let event: EnclaveEvent = msg.clone().try_into().unwrap(); + let (data, _) = event.split(); + msgs_loop.lock().await.push(data); + event_tx.send(NetEvent::GossipData(msg)).unwrap(); + } + } + // if this manages to broadcast an event to the + // event bus we will expect to see an extra event on + // the bus but we don't because we handle this + } + anyhow::Ok(()) + }); + + let evt_1 = PlaintextAggregated { + e3_id: E3id::new("1235", 1), + decrypted_output: vec![ArcBytes::from_bytes(&[1, 2, 3, 4])], + }; + + let evt_2 = PlaintextAggregated { + e3_id: E3id::new("1236", 1), + decrypted_output: vec![ArcBytes::from_bytes(&[1, 2, 3, 4])], + }; + + let local_evt_3 = CiphernodeSelected { + e3_id: E3id::new("1235", 1), + threshold_m: 2, + threshold_n: 5, + ..CiphernodeSelected::default() + }; + + bus.publish(evt_1.clone())?; + bus.publish(evt_2.clone())?; + bus.publish(local_evt_3.clone())?; // This is a local event which should not be broadcast to the network + + // check the history of the event bus + let history = history_collector + .send(TakeEvents::::new(3)) + .await?; + + assert_eq!( + *msgs.lock().await, + vec![evt_1.clone().into(), evt_2.clone().into()], // notice no local events + "NetEventTranslator did not transmit correct events to the network" + ); + + assert_eq!( + history + .into_iter() + .map(|e| e.into_data()) + .collect::>(), + vec![evt_1.into(), evt_2.into(), local_evt_3.into()], // all local events that have been broadcast but no + // events from the loopback + "NetEventTranslator must not retransmit forwarded event to event bus" + ); + + Ok(()) +} + +#[actix::test] +async fn test_p2p_actor_forwards_events_to_bus() -> Result<()> { + use e3_events::{EnclaveEvent, TakeEvents}; + use e3_net::events::GossipData; + use e3_net::{events::NetEvent, NetEventTranslator}; + use rand::SeedableRng; + use rand_chacha::ChaCha20Rng; + use std::sync::Arc; + use tokio::sync::broadcast; + use tokio::sync::mpsc; + + let seed = e3_events::Seed(ChaCha20Rng::seed_from_u64(123).get_seed()); + + // Setup elements in test + let (cmd_tx, _) = mpsc::channel(100); // Transmit byte events to the network + let (event_tx, event_rx) = broadcast::channel(100); // Receive byte events from the network + let system = EventSystem::new("test").with_fresh_bus(); + let bus = system.handle()?; + let history_collector = bus.history(); + + NetEventTranslator::setup(&bus, &cmd_tx, &Arc::new(event_rx), "mytopic"); + + // Capture messages from output on msgs vec + let event = E3Requested { + e3_id: E3id::new("1235", 1), + threshold_m: 2, + threshold_n: 5, + seed: seed.clone(), + params: ArcBytes::from_bytes(&[1, 2, 3, 4]), + ..E3Requested::default() + }; + + // lets send an event from the network + let _ = event_tx.send(NetEvent::GossipData(GossipData::GossipBytes( + bus.event_from(event.clone())?.to_bytes()?, + ))); + + // check the history of the event bus + let history = history_collector + .send(TakeEvents::::new(1)) + .await?; + + assert_eq!( + history + .into_iter() + .map(|e| e.into_data()) + .collect::>(), + vec![event.into()] + ); + + Ok(()) +} + +// ============================================================================ +// Legacy Tests Pending Port to trBFV +// ============================================================================ + +/// Test that stopped keyshares retain their state after restart. +/// This test needs to be ported to the new trBFV system once Sync is completed. +#[actix::test] +#[ignore = "Needs to be ported to trBFV system after Sync is completed"] +async fn test_stopped_keyshares_retain_state() -> Result<()> { + use e3_bfv_client::{decode_bytes_to_vec_u64, decode_plaintext_to_vec_u64}; + use e3_data::{GetDump, InMemStore}; + use e3_events::{EventBus, EventBusConfig, GetEvents, Shutdown, TakeEvents}; + use e3_test_helpers::{create_random_eth_addrs, get_common_setup, simulate_libp2p_net}; + use fhe::{ + bfv::{PublicKey, SecretKey}, + mbfv::{AggregateIter, PublicKeyShare}, + }; + use fhe_traits::Serialize; + use std::time::Duration; + use tokio::time::sleep; + + type PkSkShareTuple = (PublicKeyShare, SecretKey, String); + + async fn setup_local_ciphernode( + bus: &BusHandle, + rng: &e3_utils::SharedRng, + logging: bool, + addr: &str, + store: Option>, + cipher: &Arc, + ) -> Result { + let mut builder = CiphernodeBuilder::new(&addr, rng.clone(), cipher.clone()) + .with_trbfv() + .with_address(addr) + .testmode_with_forked_bus(bus.consumer()) + .testmode_with_history() + .testmode_with_errors() + .with_pubkey_aggregation() + .with_threshold_plaintext_aggregation() + .with_sortition_score(); + + if let Some(ref in_mem_store) = store { + builder = builder.with_in_mem_datastore(in_mem_store); + } + + if logging { + builder = builder.with_logging() + } + + let node = builder.build().await?; + Ok(node) + } + + async fn create_local_ciphernodes( + bus: &BusHandle, + rng: &e3_utils::SharedRng, + count: u32, + cipher: &Arc, + ) -> Result> { + let eth_addrs = create_random_eth_addrs(count); + let mut result = vec![]; + for addr in ð_addrs { + println!("Setting up eth addr: {}", addr); + let tuple = setup_local_ciphernode(&bus, &rng, true, addr, None, cipher).await?; + result.push(tuple); + } + simulate_libp2p_net(&result); + Ok(result) + } + + let e3_id = E3id::new("1234", 1); + let (rng, cn1_address, cn1_data, cn2_address, cn2_data, cipher, history, params, crpoly) = { + let (bus, rng, seed, params, crpoly, _, _) = get_common_setup(None)?; + let cipher = Arc::new(Cipher::from_password("Don't tell anyone my secret").await?); + let ciphernodes = create_local_ciphernodes(&bus, &rng, 2, &cipher).await?; + let eth_addrs = ciphernodes.iter().map(|n| n.address()).collect::>(); + + setup_score_sortition_environment(&bus, ð_addrs, 1).await?; + + let [cn1, cn2] = &ciphernodes.as_slice() else { + panic!("Not enough elements") + }; + + // Send e3request + bus.publish(E3Requested { + e3_id: e3_id.clone(), + threshold_m: 2, + threshold_n: 2, + seed: seed.clone(), + params: ArcBytes::from_bytes(&encode_bfv_params(¶ms)), + ..E3Requested::default() + })?; + + bus.publish(CommitteeFinalized { + e3_id: e3_id.clone(), + committee: eth_addrs.clone(), + chain_id: 1, + })?; + + let history_collector = cn1.history().unwrap(); + let error_collector = cn1.errors().unwrap(); + let history = history_collector + .send(TakeEvents::::new(14)) + .await?; + let errors = error_collector.send(GetEvents::new()).await?; + + assert_eq!(errors.len(), 0); + + // SEND SHUTDOWN! + bus.publish(Shutdown)?; + + // This is probably overkill but required to ensure that all the data is written + sleep(Duration::from_secs(1)).await; + + // Unwrap does not matter as we are in a test + let cn1_dump = cn1.in_mem_store().unwrap().send(GetDump).await??; + let cn2_dump = cn2.in_mem_store().unwrap().send(GetDump).await??; + + ( + rng, + cn1.address(), + cn1_dump, + cn2.address(), + cn2_dump, + cipher, + history, + params, + crpoly, + ) + }; + + let bus = EventSystem::in_mem("cn2") + .with_event_bus( + EventBus::::new(EventBusConfig { deduplicate: true }).start(), + ) + .handle()?; + let cn1 = setup_local_ciphernode( + &bus, + &rng, + true, + &cn1_address, + Some(InMemStore::from_dump(cn1_data, true)?.start()), + &cipher, + ) + .await?; + let cn2 = setup_local_ciphernode( + &bus, + &rng, + true, + &cn2_address, + Some(InMemStore::from_dump(cn2_data, true)?.start()), + &cipher, + ) + .await?; + let history_collector = cn1.history().unwrap(); + simulate_libp2p_net(&[cn1, cn2]); + + println!("getting collector from cn1.6"); + + // get the public key from history. + let pubkey: PublicKey = history + .iter() + .filter_map(|evt| match evt.get_data() { + EnclaveEventData::KeyshareCreated(data) => { + PublicKeyShare::deserialize(&data.pubkey, ¶ms, crpoly.clone()).ok() + } + _ => None, + }) + .aggregate()?; + + // Publish the ciphertext + use e3_test_helpers::encrypt_ciphertext; + let raw_plaintext = vec![vec![4, 5]]; + let (ciphertext, expected) = encrypt_ciphertext(¶ms, pubkey, raw_plaintext)?; + bus.publish(CiphertextOutputPublished { + ciphertext_output: ciphertext + .iter() + .map(|ct| ArcBytes::from_bytes(&ct.to_bytes())) + .collect(), + e3_id: e3_id.clone(), + })?; + + let history = history_collector + .send(TakeEvents::::new(5)) + .await?; + + let actual = history + .into_iter() + .filter_map(|e| match e.into_data() { + EnclaveEventData::PlaintextAggregated(data) => Some(data), + _ => None, + }) + .collect::>() + .first() + .unwrap() + .clone(); + + assert_eq!( + actual + .decrypted_output + .iter() + .map(|b| decode_bytes_to_vec_u64(b).unwrap()) + .collect::>>(), + expected + .iter() + .map(|p| decode_plaintext_to_vec_u64(p).unwrap()) + .collect::>>() + ); + + Ok(()) +} + +/// Test that duplicate E3 IDs work correctly with different chain IDs. +/// This test needs to be ported to use trBFV instead of legacy keyshare. +#[actix::test] +#[ignore = "Needs to be ported to trBFV system"] +async fn test_duplicate_e3_id_with_different_chain_id() -> Result<()> { + use e3_bfv_client::compute_pk_commitment; + use e3_events::{OrderedSet, PublicKeyAggregated, TakeEvents}; + use e3_test_helpers::{ + create_random_eth_addrs, create_shared_rng_from_u64, get_common_setup, simulate_libp2p_net, + }; + use fhe::{ + bfv::{BfvParameters, PublicKey, SecretKey}, + mbfv::{AggregateIter, CommonRandomPoly, PublicKeyShare}, + }; + use fhe_traits::Serialize; + + type PkSkShareTuple = (PublicKeyShare, SecretKey, String); + + async fn setup_local_ciphernode( + bus: &BusHandle, + rng: &e3_utils::SharedRng, + logging: bool, + addr: &str, + store: Option>, + cipher: &Arc, + ) -> Result { + let mut builder = CiphernodeBuilder::new(&addr, rng.clone(), cipher.clone()) + .with_trbfv() + .with_address(addr) + .testmode_with_forked_bus(bus.consumer()) + .testmode_with_history() + .testmode_with_errors() + .with_pubkey_aggregation() + .with_threshold_plaintext_aggregation() + .with_sortition_score(); + + if let Some(ref in_mem_store) = store { + builder = builder.with_in_mem_datastore(in_mem_store); + } + + if logging { + builder = builder.with_logging() + } + + let node = builder.build().await?; + Ok(node) + } + + async fn create_local_ciphernodes( + bus: &BusHandle, + rng: &e3_utils::SharedRng, + count: u32, + cipher: &Arc, + ) -> Result> { + let eth_addrs = create_random_eth_addrs(count); + let mut result = vec![]; + for addr in ð_addrs { + println!("Setting up eth addr: {}", addr); + let tuple = setup_local_ciphernode(&bus, &rng, true, addr, None, cipher).await?; + result.push(tuple); + } + simulate_libp2p_net(&result); + Ok(result) + } + + fn generate_pk_share( + params: &Arc, + crp: &CommonRandomPoly, + rng: &e3_utils::SharedRng, + addr: &str, + ) -> Result { + let sk = SecretKey::random(¶ms, &mut *rng.lock().unwrap()); + let pk = PublicKeyShare::new(&sk, crp.clone(), &mut *rng.lock().unwrap())?; + Ok((pk, sk, addr.to_owned())) + } + + fn generate_pk_shares( + params: &Arc, + crp: &CommonRandomPoly, + rng: &e3_utils::SharedRng, + eth_addrs: &Vec, + ) -> Result> { + let mut result = vec![]; + for addr in eth_addrs { + result.push(generate_pk_share(params, crp, rng, addr)?); + } + Ok(result) + } + + fn aggregate_public_key(shares: &Vec) -> Result { + Ok(shares + .clone() + .into_iter() + .map(|(pk, _, _)| pk) + .aggregate()?) + } + + // Setup + let (bus, rng, seed, params, crpoly, _, _) = get_common_setup(None)?; + let cipher = Arc::new(Cipher::from_password("Don't tell anyone my secret").await?); + + // Setup actual ciphernodes and dispatch add events + let ciphernodes = create_local_ciphernodes(&bus, &rng, 3, &cipher).await?; + let eth_addrs = ciphernodes.iter().map(|tup| tup.address()).collect(); + + setup_score_sortition_environment(&bus, ð_addrs, 1).await?; + setup_score_sortition_environment(&bus, ð_addrs, 2).await?; + + // Send the computation requested event + bus.publish(E3Requested { + e3_id: E3id::new("1234", 1), + threshold_m: 2, + threshold_n: 5, + seed: seed.clone(), + params: ArcBytes::from_bytes(&encode_bfv_params(¶ms)), + ..E3Requested::default() + })?; + + bus.publish(CommitteeFinalized { + e3_id: E3id::new("1234", 1), + committee: eth_addrs.clone(), + chain_id: 1, + })?; + + // Generate the test shares and pubkey + let rng_test = create_shared_rng_from_u64(42); + let test_pubkey = aggregate_public_key(&generate_pk_shares( + ¶ms, &crpoly, &rng_test, ð_addrs, + )?)?; + let public_key_hash = compute_pk_commitment( + test_pubkey.to_bytes(), + params.degree(), + params.plaintext(), + params.moduli().to_vec(), + )?; + + let history_collector = ciphernodes.last().unwrap().history().unwrap(); + let history = history_collector + .send(TakeEvents::::new(28)) + .await?; + + assert_eq!( + history.last().cloned().unwrap().into_data(), + PublicKeyAggregated { + pubkey: test_pubkey.to_bytes(), + public_key_hash, + e3_id: E3id::new("1234", 1), + nodes: OrderedSet::from(eth_addrs.clone()), + } + .into() + ); + + // Send the computation requested event + bus.publish(E3Requested { + e3_id: E3id::new("1234", 2), + threshold_m: 2, + threshold_n: 5, + seed: seed.clone(), + params: ArcBytes::from_bytes(&encode_bfv_params(¶ms)), + ..E3Requested::default() + })?; + + bus.publish(CommitteeFinalized { + e3_id: E3id::new("1234", 2), + committee: eth_addrs.clone(), + chain_id: 2, + })?; + + let test_pubkey = aggregate_public_key(&generate_pk_shares( + ¶ms, &crpoly, &rng_test, ð_addrs, + )?)?; + + let public_key_hash = compute_pk_commitment( + test_pubkey.to_bytes(), + params.degree(), + params.plaintext(), + params.moduli().to_vec(), + )?; + + let history = history_collector + .send(TakeEvents::::new(8)) + .await?; + + assert_eq!( + history.last().cloned().unwrap().into_data(), + PublicKeyAggregated { + pubkey: test_pubkey.to_bytes(), + public_key_hash, + e3_id: E3id::new("1234", 2), + nodes: OrderedSet::from(eth_addrs.clone()), + } + .into() + ); + + Ok(()) +} diff --git a/crates/tests/tests/integration_legacy.rs b/crates/tests/tests/integration_legacy.rs deleted file mode 100644 index d3f5eb5604..0000000000 --- a/crates/tests/tests/integration_legacy.rs +++ /dev/null @@ -1,684 +0,0 @@ -// SPDX-License-Identifier: LGPL-3.0-only -// -// This file is provided WITHOUT ANY WARRANTY; -// without even the implied warranty of MERCHANTABILITY -// or FITNESS FOR A PARTICULAR PURPOSE. - -use actix::prelude::*; -use actix::Actor; -use alloy::primitives::{FixedBytes, I256, U256}; -use anyhow::*; -use e3_bfv_client::client::compute_pk_commitment; -use e3_bfv_client::{decode_bytes_to_vec_u64, decode_plaintext_to_vec_u64}; -use e3_ciphernode_builder::CiphernodeBuilder; -use e3_ciphernode_builder::CiphernodeHandle; -use e3_ciphernode_builder::EventSystem; -use e3_crypto::Cipher; -use e3_data::GetDump; -use e3_data::InMemStore; -use e3_events::BusHandle; -use e3_events::EnclaveEventData; -use e3_events::GetEvents; -use e3_events::Unsequenced; -use e3_events::{ - prelude::*, CiphernodeSelected, CiphertextOutputPublished, CommitteeFinalized, - ConfigurationUpdated, E3Requested, E3id, EnclaveEvent, EventBus, EventBusConfig, - OperatorActivationChanged, OrderedSet, PlaintextAggregated, PublicKeyAggregated, Seed, - Shutdown, TakeEvents, TicketBalanceUpdated, -}; -use e3_fhe_params::encode_bfv_params; -use e3_net::events::GossipData; -use e3_net::{events::NetEvent, NetEventTranslator}; -use e3_test_helpers::encrypt_ciphertext; -use e3_test_helpers::{ - create_random_eth_addrs, create_shared_rng_from_u64, get_common_setup, simulate_libp2p_net, - AddToCommittee, -}; -use e3_utils::utility_types::ArcBytes; -use e3_utils::SharedRng; -use fhe::{ - bfv::{BfvParameters, PublicKey, SecretKey}, - mbfv::{AggregateIter, CommonRandomPoly, PublicKeyShare}, -}; -use fhe_traits::Serialize; -use rand::SeedableRng; -use rand_chacha::ChaCha20Rng; -use std::{sync::Arc, time::Duration}; -use tokio::sync::mpsc; -use tokio::sync::{broadcast, Mutex}; -use tokio::time::sleep; - -async fn setup_local_ciphernode( - bus: &BusHandle, - rng: &SharedRng, - logging: bool, - addr: &str, - store: Option>, - cipher: &Arc, -) -> Result { - let mut builder = CiphernodeBuilder::new(&addr, rng.clone(), cipher.clone()) - .with_keyshare() - .with_address(addr) - .testmode_with_forked_bus(bus.consumer()) - .testmode_with_history() - .testmode_with_errors() - .with_pubkey_aggregation() - .with_plaintext_aggregation() - .with_sortition_score(); - - if let Some(ref in_mem_store) = store { - builder = builder.with_in_mem_datastore(in_mem_store); - } - - if logging { - builder = builder.with_logging() - } - - let node = builder.build().await?; - - Ok(node) -} - -fn generate_pk_share( - params: &Arc, - crp: &CommonRandomPoly, - rng: &SharedRng, - addr: &str, -) -> Result { - let sk = SecretKey::random(¶ms, &mut *rng.lock().unwrap()); - let pk = PublicKeyShare::new(&sk, crp.clone(), &mut *rng.lock().unwrap())?; - Ok((pk, sk, addr.to_owned())) -} - -fn generate_pk_shares( - params: &Arc, - crp: &CommonRandomPoly, - rng: &SharedRng, - eth_addrs: &Vec, -) -> Result> { - let mut result = vec![]; - for addr in eth_addrs { - result.push(generate_pk_share(params, crp, rng, addr)?); - } - Ok(result) -} - -async fn create_local_ciphernodes( - bus: &BusHandle, - rng: &SharedRng, - count: u32, - cipher: &Arc, -) -> Result> { - let eth_addrs = create_random_eth_addrs(count); - let mut result = vec![]; - for addr in ð_addrs { - println!("Setting up eth addr: {}", addr); - let tuple = setup_local_ciphernode(&bus, &rng, true, addr, None, cipher).await?; - result.push(tuple); - } - simulate_libp2p_net(&result); - - Ok(result) -} - -async fn setup_score_sortition_environment( - bus: &BusHandle, - eth_addrs: &Vec, - chain_id: u64, -) -> Result<()> { - bus.publish(ConfigurationUpdated { - parameter: "ticketPrice".to_string(), - old_value: U256::ZERO, - new_value: U256::from(10_000_000u64), - chain_id, - })?; - - let mut adder = AddToCommittee::new(bus, chain_id); - for addr in eth_addrs { - adder.add(addr).await?; - - bus.publish(TicketBalanceUpdated { - operator: addr.clone(), - delta: I256::try_from(1_000_000_000u64).unwrap(), - new_balance: U256::from(1_000_000_000u64), - reason: FixedBytes::ZERO, - chain_id, - })?; - - bus.publish(OperatorActivationChanged { - operator: addr.clone(), - active: true, - chain_id, - })?; - } - - Ok(()) -} - -// Type for our tests to test against -type PkSkShareTuple = (PublicKeyShare, SecretKey, String); - -fn aggregate_public_key(shares: &Vec) -> Result { - Ok(shares - .clone() - .into_iter() - .map(|(pk, _, _)| pk) - .aggregate()?) -} - -#[actix::test] -#[ignore = "Needs to be deleted after #1999 is merged"] -async fn test_public_key_aggregation_and_decryption() -> Result<()> { - use tracing_subscriber::{fmt, EnvFilter}; - - let subscriber = fmt() - .with_env_filter(EnvFilter::new("info")) - .with_test_writer() - .finish(); - - let _guard = tracing::subscriber::set_default(subscriber); - - // Setup - let (bus, rng, seed, params, crpoly, _, _) = get_common_setup(None)?; - let e3_id = E3id::new("1234", 1); - let cipher = Arc::new(Cipher::from_password("Don't tell anyone my secret").await?); - - // Setup actual ciphernodes and dispatch add events - let ciphernodes = create_local_ciphernodes(&bus, &rng, 3, &cipher).await?; - let eth_addrs = ciphernodes - .iter() - .map(|tup| tup.address().to_owned()) - .collect::>(); - - println!("Adding ciphernodes..."); - - setup_score_sortition_environment(&bus, ð_addrs, 1).await?; - - let e3_request_event = E3Requested { - e3_id: e3_id.clone(), - params: ArcBytes::from_bytes(&encode_bfv_params(¶ms)), - seed: seed.clone(), - threshold_m: 2, - threshold_n: 5, // Need to use n now to suggest committee size - ..E3Requested::default() - }; - - println!("Sending E3 event..."); - // Send the computation requested event - bus.publish(e3_request_event.clone())?; - - // Test that we cannot send the same event twice - bus.publish(e3_request_event.clone())?; - - // Finalize committee with all available nodes - bus.publish(CommitteeFinalized { - e3_id: e3_id.clone(), - committee: eth_addrs.clone(), - chain_id: 1, - })?; - - // Generate the test shares and pubkey - let rng_test = create_shared_rng_from_u64(42); - let test_shares = generate_pk_shares(¶ms, &crpoly, &rng_test, ð_addrs)?; - let test_pubkey = aggregate_public_key(&test_shares)?; - let public_key_hash = compute_pk_commitment( - test_pubkey.to_bytes(), - params.degree(), - params.plaintext(), - params.moduli().to_vec(), - )?; - - let expected_aggregated_event = PublicKeyAggregated { - pubkey: test_pubkey.to_bytes(), - public_key_hash, - e3_id: e3_id.clone(), - nodes: OrderedSet::from(eth_addrs.clone()), - }; - - let history_collector = ciphernodes.get(2).unwrap().history().unwrap(); - let history = history_collector - .send(TakeEvents::::new(18)) - .await?; - - let aggregated_event: Vec<_> = history - .into_iter() - .filter_map(|e| match e.into_data() { - EnclaveEventData::PublicKeyAggregated(data) => Some(data), - _ => None, - }) - .collect(); - - assert!( - !aggregated_event.is_empty(), - "No PublicKeyAggregated event found" - ); - assert_eq!(aggregated_event.last().unwrap(), &expected_aggregated_event); - println!("Aggregating decryption..."); - // Aggregate decryption - - let raw_plaintext = vec![vec![4, 5]]; - let (ciphertext, expected) = encrypt_ciphertext(¶ms, test_pubkey, raw_plaintext)?; - - // Setup Ciphertext Published Event - let ciphertext_published_event = CiphertextOutputPublished { - ciphertext_output: ciphertext - .iter() - .map(|ct| ArcBytes::from_bytes(&ct.to_bytes())) - .collect(), - e3_id: e3_id.clone(), - }; - - bus.publish(ciphertext_published_event.clone())?; - - let history = history_collector - .send(TakeEvents::::new(6)) - .await?; - - let actual = history - .into_iter() - .filter_map(|e| match e.into_data() { - EnclaveEventData::PlaintextAggregated(data) => Some(data), - _ => None, - }) - .collect::>() - .first() - .unwrap() - .clone(); - - assert_eq!( - actual - .decrypted_output - .iter() - .map(|b| decode_bytes_to_vec_u64(b).unwrap()) - .collect::>>(), - expected - .iter() - .map(|p| decode_plaintext_to_vec_u64(p).unwrap()) - .collect::>>() - ); - - Ok(()) -} - -#[actix::test] -#[ignore = "Needs to be ported to trBFV system after Sync is completed"] -async fn test_stopped_keyshares_retain_state() -> Result<()> { - let e3_id = E3id::new("1234", 1); - let (rng, cn1_address, cn1_data, cn2_address, cn2_data, cipher, history, params, crpoly) = { - let (bus, rng, seed, params, crpoly, _, _) = get_common_setup(None)?; - let cipher = Arc::new(Cipher::from_password("Don't tell anyone my secret").await?); - let ciphernodes = create_local_ciphernodes(&bus, &rng, 2, &cipher).await?; - let eth_addrs = ciphernodes.iter().map(|n| n.address()).collect::>(); - - setup_score_sortition_environment(&bus, ð_addrs, 1).await?; - - let [cn1, cn2] = &ciphernodes.as_slice() else { - panic!("Not enough elements") - }; - - // Send e3request - bus.publish(E3Requested { - e3_id: e3_id.clone(), - threshold_m: 2, - threshold_n: 5, - seed: seed.clone(), - params: ArcBytes::from_bytes(&encode_bfv_params(¶ms)), - ..E3Requested::default() - })?; - - bus.publish(CommitteeFinalized { - e3_id: e3_id.clone(), - committee: eth_addrs.clone(), - chain_id: 1, - })?; - - let history_collector = cn1.history().unwrap(); - let error_collector = cn1.errors().unwrap(); - let history = history_collector - .send(TakeEvents::::new(14)) - .await?; - let errors = error_collector.send(GetEvents::new()).await?; - - assert_eq!(errors.len(), 0); - - // SEND SHUTDOWN! - bus.publish(Shutdown)?; - - // This is probably overkill but required to ensure that all the data is written - sleep(Duration::from_secs(1)).await; - - // Unwrap does not matter as we are in a test - let cn1_dump = cn1.in_mem_store().unwrap().send(GetDump).await??; - let cn2_dump = cn2.in_mem_store().unwrap().send(GetDump).await??; - - ( - rng, - cn1.address(), - cn1_dump, - cn2.address(), - cn2_dump, - cipher, - history, - params, - crpoly, - ) - }; - - // Reset history - - // Get the address and the data actor from the two ciphernodes - // and rehydrate them to new actors - - // Apply the address and data node to two new actors - // Here we test that hydration occurred sucessfully - - let bus = EventSystem::in_mem("cn2") - .with_event_bus(EventBus::::new(EventBusConfig { deduplicate: true }).start()) - .handle()?; - let cn1 = setup_local_ciphernode( - &bus, - &rng, - true, - &cn1_address, - Some(InMemStore::from_dump(cn1_data, true)?.start()), - &cipher, - ) - .await?; - let cn2 = setup_local_ciphernode( - &bus, - &rng, - true, - &cn2_address, - Some(InMemStore::from_dump(cn2_data, true)?.start()), - &cipher, - ) - .await?; - let history_collector = cn1.history().unwrap(); - simulate_libp2p_net(&[cn1, cn2]); - - println!("getting collector from cn1.6"); - - // get the public key from history. - let pubkey: PublicKey = history - .iter() - .filter_map(|evt| match evt.get_data() { - EnclaveEventData::KeyshareCreated(data) => { - PublicKeyShare::deserialize(&data.pubkey, ¶ms, crpoly.clone()).ok() - } - _ => None, - }) - .aggregate()?; - - // Publish the ciphertext - let raw_plaintext = vec![vec![4, 5]]; - let (ciphertext, expected) = encrypt_ciphertext(¶ms, pubkey, raw_plaintext)?; - bus.publish(CiphertextOutputPublished { - ciphertext_output: ciphertext - .iter() - .map(|ct| ArcBytes::from_bytes(&ct.to_bytes())) - .collect(), - e3_id: e3_id.clone(), - })?; - - let history = history_collector - .send(TakeEvents::::new(5)) - .await?; - - let actual = history - .into_iter() - .filter_map(|e| match e.into_data() { - EnclaveEventData::PlaintextAggregated(data) => Some(data), - _ => None, - }) - .collect::>() - .first() - .unwrap() - .clone(); - - assert_eq!( - actual - .decrypted_output - .iter() - .map(|b| decode_bytes_to_vec_u64(b).unwrap()) - .collect::>>(), - expected - .iter() - .map(|p| decode_plaintext_to_vec_u64(p).unwrap()) - .collect::>>() - ); - - Ok(()) -} - -#[actix::test] -async fn test_p2p_actor_forwards_events_to_network() -> Result<()> { - // Setup elements in test - let (cmd_tx, mut cmd_rx) = mpsc::channel(100); // Transmit byte events to the network - let (event_tx, _) = broadcast::channel(100); // Receive byte events from the network - let system = EventSystem::new("test"); - let bus = system.handle()?; - let history_collector = bus.history(); - let event_rx = Arc::new(event_tx.subscribe()); - // Pas cmd and event channels to NetEventTranslator - NetEventTranslator::setup(&bus, &cmd_tx, &event_rx, "my-topic"); - - // Capture messages from output on msgs vec - let msgs: Arc>> = Arc::new(Mutex::new(Vec::new())); - - let msgs_loop = msgs.clone(); - - tokio::spawn(async move { - // Pull events from command channel - while let Some(cmd) = cmd_rx.recv().await { - // If the command is a GossipPublish then extract it and save it whilst sending it to - // the event bus as if it was gossiped from the network and ended up as an external - // message this simulates a rebroadcast message - if let Some(msg) = match cmd { - e3_net::events::NetCommand::GossipPublish { data, .. } => Some(data), - _ => None, - } { - if let GossipData::GossipBytes(_) = msg { - let event: EnclaveEvent = msg.clone().try_into().unwrap(); - let (data, _) = event.split(); - msgs_loop.lock().await.push(data); - event_tx.send(NetEvent::GossipData(msg)).unwrap(); - } - } - // if this manages to broadcast an event to the - // event bus we will expect to see an extra event on - // the bus but we don't because we handle this - } - anyhow::Ok(()) - }); - - let evt_1 = PlaintextAggregated { - e3_id: E3id::new("1235", 1), - decrypted_output: vec![ArcBytes::from_bytes(&[1, 2, 3, 4])], - }; - - let evt_2 = PlaintextAggregated { - e3_id: E3id::new("1236", 1), - decrypted_output: vec![ArcBytes::from_bytes(&[1, 2, 3, 4])], - }; - - let local_evt_3 = CiphernodeSelected { - e3_id: E3id::new("1235", 1), - threshold_m: 2, - threshold_n: 5, - ..CiphernodeSelected::default() - }; - - bus.publish(evt_1.clone())?; - bus.publish(evt_2.clone())?; - bus.publish(local_evt_3.clone())?; // This is a local event which should not be broadcast to the network - - // check the history of the event bus - let history = history_collector - .send(TakeEvents::::new(3)) - .await?; - - assert_eq!( - *msgs.lock().await, - vec![evt_1.clone().into(), evt_2.clone().into()], // notice no local events - "NetEventTranslator did not transmit correct events to the network" - ); - - assert_eq!( - history - .into_iter() - .map(|e| e.into_data()) - .collect::>(), - vec![evt_1.into(), evt_2.into(), local_evt_3.into()], // all local events that have been broadcast but no - // events from the loopback - "NetEventTranslator must not retransmit forwarded event to event bus" - ); - - Ok(()) -} - -#[actix::test] -#[ignore = "Needs to be ported to trBFV system"] -async fn test_duplicate_e3_id_with_different_chain_id() -> Result<()> { - // Setup - let (bus, rng, seed, params, crpoly, _, _) = get_common_setup(None)?; - let cipher = Arc::new(Cipher::from_password("Don't tell anyone my secret").await?); - - // Setup actual ciphernodes and dispatch add events - let ciphernodes = create_local_ciphernodes(&bus, &rng, 3, &cipher).await?; - let eth_addrs = ciphernodes.iter().map(|tup| tup.address()).collect(); - - setup_score_sortition_environment(&bus, ð_addrs, 1).await?; - setup_score_sortition_environment(&bus, ð_addrs, 2).await?; - - // Send the computation requested event - bus.publish(E3Requested { - e3_id: E3id::new("1234", 1), - threshold_m: 2, - threshold_n: 5, - seed: seed.clone(), - params: ArcBytes::from_bytes(&encode_bfv_params(¶ms)), - ..E3Requested::default() - })?; - - bus.publish(CommitteeFinalized { - e3_id: E3id::new("1234", 1), - committee: eth_addrs.clone(), - chain_id: 1, - })?; - - // Generate the test shares and pubkey - let rng_test = create_shared_rng_from_u64(42); - let test_pubkey = aggregate_public_key(&generate_pk_shares( - ¶ms, &crpoly, &rng_test, ð_addrs, - )?)?; - let public_key_hash = compute_pk_commitment( - test_pubkey.to_bytes(), - params.degree(), - params.plaintext(), - params.moduli().to_vec(), - )?; - - let history_collector = ciphernodes.last().unwrap().history().unwrap(); - let history = history_collector - .send(TakeEvents::::new(28)) - .await?; - - assert_eq!( - history.last().cloned().unwrap().into_data(), - PublicKeyAggregated { - pubkey: test_pubkey.to_bytes(), - public_key_hash, - e3_id: E3id::new("1234", 1), - nodes: OrderedSet::from(eth_addrs.clone()), - } - .into() - ); - - // Send the computation requested event - bus.publish(E3Requested { - e3_id: E3id::new("1234", 2), - threshold_m: 2, - threshold_n: 5, - seed: seed.clone(), - params: ArcBytes::from_bytes(&encode_bfv_params(¶ms)), - ..E3Requested::default() - })?; - - bus.publish(CommitteeFinalized { - e3_id: E3id::new("1234", 2), - committee: eth_addrs.clone(), - chain_id: 2, - })?; - - let test_pubkey = aggregate_public_key(&generate_pk_shares( - ¶ms, &crpoly, &rng_test, ð_addrs, - )?)?; - - let public_key_hash = compute_pk_commitment( - test_pubkey.to_bytes(), - params.degree(), - params.plaintext(), - params.moduli().to_vec(), - )?; - - let history = history_collector - .send(TakeEvents::::new(8)) - .await?; - - assert_eq!( - history.last().cloned().unwrap().into_data(), - PublicKeyAggregated { - pubkey: test_pubkey.to_bytes(), - public_key_hash, - e3_id: E3id::new("1234", 2), - nodes: OrderedSet::from(eth_addrs.clone()), - } - .into() - ); - - Ok(()) -} - -#[actix::test] -async fn test_p2p_actor_forwards_events_to_bus() -> Result<()> { - let seed = Seed(ChaCha20Rng::seed_from_u64(123).get_seed()); - - // Setup elements in test - let (cmd_tx, _) = mpsc::channel(100); // Transmit byte events to the network - let (event_tx, event_rx) = broadcast::channel(100); // Receive byte events from the network - let system = EventSystem::new("test").with_fresh_bus(); - let bus = system.handle()?; - let history_collector = bus.history(); - - NetEventTranslator::setup(&bus, &cmd_tx, &Arc::new(event_rx), "mytopic"); - - // Capture messages from output on msgs vec - let event = E3Requested { - e3_id: E3id::new("1235", 1), - threshold_m: 2, - threshold_n: 5, - seed: seed.clone(), - params: ArcBytes::from_bytes(&[1, 2, 3, 4]), - ..E3Requested::default() - }; - - // lets send an event from the network - let _ = event_tx.send(NetEvent::GossipData(GossipData::GossipBytes( - bus.event_from(event.clone())?.to_bytes()?, - ))); - - // check the history of the event bus - let history = history_collector - .send(TakeEvents::::new(1)) - .await?; - - assert_eq!( - history - .into_iter() - .map(|e| e.into_data()) - .collect::>(), - vec![event.into()] - ); - - Ok(()) -}