Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 10 additions & 7 deletions crates/aggregator/src/committee_finalizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

use actix::prelude::*;
use e3_events::{
prelude::*, BusHandle, CommitteeFinalizeRequested, CommitteeRequested, EnclaveEvent,
EnclaveEventData, Shutdown,
prelude::*, trap, BusHandle, CommitteeFinalizeRequested, CommitteeRequested, EType,
EnclaveEvent, EnclaveEventData, Shutdown,
};
use std::collections::HashMap;
use std::time::Duration;
Expand All @@ -16,19 +16,19 @@ use tracing::{error, info};
/// CommitteeFinalizer is an actor that listens to CommitteeRequested events and dispatches
/// CommitteeFinalizeRequested events after the submission deadline has passed.
pub struct CommitteeFinalizer {
bus: BusHandle<EnclaveEvent>,
bus: BusHandle,
pending_committees: HashMap<String, SpawnHandle>,
}

impl CommitteeFinalizer {
pub fn new(bus: &BusHandle<EnclaveEvent>) -> Self {
pub fn new(bus: &BusHandle) -> Self {
Self {
bus: bus.clone(),
pending_committees: HashMap::new(),
}
}

pub fn attach(bus: &BusHandle<EnclaveEvent>) -> Addr<Self> {
pub fn attach(bus: &BusHandle) -> Addr<Self> {
let addr = CommitteeFinalizer::new(bus).start();

bus.subscribe_all(
Expand Down Expand Up @@ -112,8 +112,11 @@ impl Handler<CommitteeRequested> for CommitteeFinalizer {
move |act, _ctx| {
info!(e3_id = %e3_id_clone, "Dispatching CommitteeFinalizeRequested event");

bus.publish(CommitteeFinalizeRequested {
e3_id: e3_id_clone.clone(),
trap(EType::Sortition, &act.bus.clone(), || {
bus.publish(CommitteeFinalizeRequested {
e3_id: e3_id_clone.clone(),
})?;
Ok(())
});

act.pending_committees.remove(&e3_id_clone.to_string());
Expand Down
32 changes: 16 additions & 16 deletions crates/aggregator/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use anyhow::{anyhow, Result};
use async_trait::async_trait;
use e3_data::{AutoPersist, Persistable, RepositoriesFactory};
use e3_events::{prelude::*, E3id};
use e3_events::{BusHandle, EnclaveErrorType, EnclaveEvent, EnclaveEventData};
use e3_events::{BusHandle, EType, EnclaveEvent, EnclaveEventData};
use e3_fhe::ext::FHE_KEY;
use e3_fhe::Fhe;
use e3_multithread::Multithread;
Expand All @@ -28,12 +28,12 @@ use e3_sortition::Sortition;

#[deprecated = "In favour of ThresholdPlaintextAggregatorExtension"]
pub struct PlaintextAggregatorExtension {
bus: BusHandle<EnclaveEvent>,
bus: BusHandle,
sortition: Addr<Sortition>,
}

impl PlaintextAggregatorExtension {
pub fn create(bus: &BusHandle<EnclaveEvent>, sortition: &Addr<Sortition>) -> Box<Self> {
pub fn create(bus: &BusHandle, sortition: &Addr<Sortition>) -> Box<Self> {
Box::new(Self {
bus: bus.clone(),
sortition: sortition.clone(),
Expand All @@ -54,15 +54,15 @@ impl E3Extension for PlaintextAggregatorExtension {

let Some(fhe) = ctx.get_dependency(FHE_KEY) else {
self.bus.err(
EnclaveErrorType::PlaintextAggregation,
EType::PlaintextAggregation,
anyhow!(ERROR_PLAINTEXT_FHE_MISSING),
);
return;
};

let Some(ref meta) = ctx.get_dependency(META_KEY) else {
self.bus.err(
EnclaveErrorType::PlaintextAggregation,
EType::PlaintextAggregation,
anyhow!(ERROR_PLAINTEXT_META_MISSING),
);
return;
Expand All @@ -74,7 +74,7 @@ impl E3Extension for PlaintextAggregatorExtension {
// This is a single ciphertext for the legacy PlaintextAggregator
let Some(single_ciphertext) = data.ciphertext_output.first() else {
self.bus.err(
EnclaveErrorType::PlaintextAggregation,
EType::PlaintextAggregation,
anyhow!("Could not extract ciphertext from array"),
);
return;
Expand Down Expand Up @@ -122,7 +122,7 @@ impl E3Extension for PlaintextAggregatorExtension {
// Get deps
let Some(fhe) = ctx.get_dependency(FHE_KEY) else {
self.bus.err(
EnclaveErrorType::PlaintextAggregation,
EType::PlaintextAggregation,
anyhow!(ERROR_PLAINTEXT_FHE_MISSING),
);
return Ok(());
Expand All @@ -148,11 +148,11 @@ impl E3Extension for PlaintextAggregatorExtension {
}

pub struct PublicKeyAggregatorExtension {
bus: BusHandle<EnclaveEvent>,
bus: BusHandle,
}

impl PublicKeyAggregatorExtension {
pub fn create(bus: &BusHandle<EnclaveEvent>) -> Box<Self> {
pub fn create(bus: &BusHandle) -> Box<Self> {
Box::new(Self { bus: bus.clone() })
}
}
Expand All @@ -170,14 +170,14 @@ impl E3Extension for PublicKeyAggregatorExtension {

let Some(fhe) = ctx.get_dependency(FHE_KEY) else {
self.bus.err(
EnclaveErrorType::PublickeyAggregation,
EType::PublickeyAggregation,
anyhow!(ERROR_PUBKEY_FHE_MISSING),
);
return;
};
let Some(ref meta) = ctx.get_dependency(META_KEY) else {
self.bus.err(
EnclaveErrorType::PublickeyAggregation,
EType::PublickeyAggregation,
anyhow!(ERROR_PUBKEY_META_MISSING),
);
return;
Expand Down Expand Up @@ -211,7 +211,7 @@ impl E3Extension for PublicKeyAggregatorExtension {
// Get deps
let Some(fhe) = ctx.get_dependency(FHE_KEY) else {
self.bus.err(
EnclaveErrorType::PublickeyAggregation,
EType::PublickeyAggregation,
anyhow!(ERROR_PUBKEY_FHE_MISSING),
);

Expand All @@ -233,7 +233,7 @@ impl E3Extension for PublicKeyAggregatorExtension {

fn create_publickey_aggregator(
fhe: Arc<Fhe>,
bus: BusHandle<EnclaveEvent>,
bus: BusHandle,
e3_id: E3id,
sync_state: Persistable<PublicKeyAggregatorState>,
) -> Recipient<EnclaveEvent> {
Expand All @@ -247,14 +247,14 @@ fn create_publickey_aggregator(
}

pub struct ThresholdPlaintextAggregatorExtension {
bus: BusHandle<EnclaveEvent>,
bus: BusHandle,
sortition: Addr<Sortition>,
multithread: Addr<Multithread>,
}

impl ThresholdPlaintextAggregatorExtension {
pub fn create(
bus: &BusHandle<EnclaveEvent>,
bus: &BusHandle,
sortition: &Addr<Sortition>,
multithread: &Addr<Multithread>,
) -> Box<Self> {
Expand All @@ -278,7 +278,7 @@ impl E3Extension for ThresholdPlaintextAggregatorExtension {

let Some(ref meta) = ctx.get_dependency(META_KEY) else {
self.bus.err(
EnclaveErrorType::PlaintextAggregation,
EType::PlaintextAggregation,
anyhow!(ERROR_TRBFV_PLAINTEXT_META_MISSING),
);
return;
Expand Down
6 changes: 3 additions & 3 deletions crates/aggregator/src/plaintext_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ struct ComputeAggregate {
#[deprecated = "To be replaced by ThresholdPlaintextAggregator"]
pub struct PlaintextAggregator {
fhe: Arc<Fhe>,
bus: BusHandle<EnclaveEvent>,
bus: BusHandle,
sortition: Addr<Sortition>,
e3_id: E3id,
state: Persistable<PlaintextAggregatorState>,
}

pub struct PlaintextAggregatorParams {
pub fhe: Arc<Fhe>,
pub bus: BusHandle<EnclaveEvent>,
pub bus: BusHandle,
pub sortition: Addr<Sortition>,
pub e3_id: E3id,
}
Expand Down Expand Up @@ -239,7 +239,7 @@ impl Handler<ComputeAggregate> for PlaintextAggregator {
e3_id: self.e3_id.clone(),
};

self.bus.publish(event);
self.bus.publish(event)?;

Ok(())
}
Expand Down
9 changes: 4 additions & 5 deletions crates/aggregator/src/publickey_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use e3_events::{
PublicKeyAggregated, Seed,
};
use e3_fhe::{Fhe, GetAggregatePublicKey};
use e3_sortition::Sortition;
use e3_utils::ArcBytes;
use std::sync::Arc;
use tracing::{error, info};
Expand Down Expand Up @@ -56,14 +55,14 @@ struct ComputeAggregate {

pub struct PublicKeyAggregator {
fhe: Arc<Fhe>,
bus: BusHandle<EnclaveEvent>,
bus: BusHandle,
e3_id: E3id,
state: Persistable<PublicKeyAggregatorState>,
}

pub struct PublicKeyAggregatorParams {
pub fhe: Arc<Fhe>,
pub bus: BusHandle<EnclaveEvent>,
pub bus: BusHandle,
pub e3_id: E3id,
}

Expand Down Expand Up @@ -176,7 +175,7 @@ impl Handler<KeyshareCreated> for PublicKeyAggregator {
impl Handler<ComputeAggregate> for PublicKeyAggregator {
type Result = Result<()>;

fn handle(&mut self, msg: ComputeAggregate, ctx: &mut Self::Context) -> Self::Result {
fn handle(&mut self, msg: ComputeAggregate, _: &mut Self::Context) -> Self::Result {
info!("Computing Aggregate PublicKey...");
let pubkey = self.fhe.get_aggregate_public_key(GetAggregatePublicKey {
keyshares: msg.keyshares,
Expand All @@ -198,7 +197,7 @@ impl Handler<ComputeAggregate> for PublicKeyAggregator {
e3_id: msg.e3_id,
nodes,
};
self.bus.publish(event);
self.bus.publish(event)?;
}
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions crates/aggregator/src/threshold_plaintext_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,15 @@ pub struct ComputeAggregate {

pub struct ThresholdPlaintextAggregator {
multithread: Addr<Multithread>,
bus: BusHandle<EnclaveEvent>,
bus: BusHandle,
sortition: Addr<Sortition>,
e3_id: E3id,
state: Persistable<ThresholdPlaintextAggregatorState>,
}

pub struct ThresholdPlaintextAggregatorParams {
pub multithread: Addr<Multithread>,
pub bus: BusHandle<EnclaveEvent>,
pub bus: BusHandle,
pub sortition: Addr<Sortition>,
pub e3_id: E3id,
}
Expand Down Expand Up @@ -342,7 +342,7 @@ impl Handler<ComputeAggregate> for ThresholdPlaintextAggregator {
};

info!("Dispatching plaintext event {:?}", event);
act.bus.publish(event);
act.bus.publish(event)?;
Ok(())
}),
)
Expand Down
10 changes: 5 additions & 5 deletions crates/ciphernode-builder/src/ciphernode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

use actix::Addr;
use e3_data::{DataStore, InMemStore, StoreAddr};
use e3_events::{BusHandle, EnclaveEvent, EventBus, HistoryCollector};
use e3_events::{BusHandle, EnclaveEvent, HistoryCollector};

#[derive(Clone, Debug)]
pub struct CiphernodeHandle {
pub address: String,
pub store: DataStore,
pub bus: BusHandle<EnclaveEvent>,
pub bus: BusHandle,
pub history: Option<Addr<HistoryCollector<EnclaveEvent>>>,
pub errors: Option<Addr<HistoryCollector<EnclaveEvent>>>,
}
Expand All @@ -21,7 +21,7 @@ impl CiphernodeHandle {
pub fn new(
address: String,
store: DataStore,
bus: BusHandle<EnclaveEvent>,
bus: BusHandle,
history: Option<Addr<HistoryCollector<EnclaveEvent>>>,
errors: Option<Addr<HistoryCollector<EnclaveEvent>>>,
) -> Self {
Expand All @@ -34,8 +34,8 @@ impl CiphernodeHandle {
}
}

pub fn bus(&self) -> Addr<EventBus<EnclaveEvent>> {
self.bus.bus()
pub fn bus(&self) -> &BusHandle {
&self.bus
}

pub fn history(&self) -> Option<Addr<HistoryCollector<EnclaveEvent>>> {
Expand Down
4 changes: 2 additions & 2 deletions crates/ciphernode-builder/src/ciphernode_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use e3_aggregator::ext::{
use e3_config::chain_config::ChainConfig;
use e3_crypto::Cipher;
use e3_data::{DataStore, InMemStore, Repositories, RepositoriesFactory};
use e3_events::{EnclaveEvent, EventBus, EventBusConfig};
use e3_events::{BusHandle, EnclaveEvent, EventBus, EventBusConfig};
use e3_evm::{
helpers::{
load_signer_from_repository, ConcreteReadProvider, ConcreteWriteProvider, EthProvider,
Expand Down Expand Up @@ -299,7 +299,7 @@ impl CiphernodeBuilder {
};

// Get a handle from the event bus
let bus = local_bus.into();
let bus = BusHandle::new_from_consumer(local_bus);

let addr = if let Some(addr) = self.address.clone() {
info!("Using eth address = {}", addr);
Expand Down
Loading
Loading