Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
63dd7a5
prefactor enclave event
Nov 28, 2025
a376534
formatting
Nov 28, 2025
067a4b5
compiling
Nov 28, 2025
5778792
ensure compilation and fix errors
Nov 28, 2025
aebcd86
fix unused var
Nov 28, 2025
4755e33
use strum for event type
Nov 29, 2025
e0a55a8
fix shutdown pattern
Nov 29, 2025
8635caf
update core types
Nov 29, 2025
ae886a3
manager types and refactor wip
Nov 29, 2025
8678fc1
stable abstraction
Nov 29, 2025
49e8e24
fix up traits and types
Nov 30, 2025
61c09a7
major refactor of eventing
Nov 30, 2025
810711f
add license
Nov 30, 2025
2b5dc43
fix test compilation
Nov 30, 2025
5eb29d0
rename event manager
Nov 30, 2025
cf54412
format
Nov 30, 2025
49425da
ManagedEvent -> CompositeEvent
Nov 30, 2025
78453b2
remove import
Nov 30, 2025
30710a0
create_local -> event_from
Nov 30, 2025
299ac15
fix api design
Nov 30, 2025
840fc2c
update api design to use pubsub terminology
Nov 30, 2025
fd3b2a2
event_manager.rs -> bus_handle.rs
Nov 30, 2025
c27fc40
event_manager.rs -> bus_handle.rs
Nov 30, 2025
4500220
event_manager.rs -> bus_handle.rs
Nov 30, 2025
876a1be
add comments
Nov 30, 2025
bf7f2e5
remove FromError
Nov 30, 2025
b230565
i know this sounds crazy but hear me out...
Nov 30, 2025
851f856
remove unused imports
Nov 30, 2025
e36a9c0
add required type constraints
Nov 30, 2025
55c0860
Merge branch 'main' into ry/1050-eventsourcing-clocks
ryardley Dec 1, 2025
75abb34
Merge branch 'main' into ry/1050-eventsourcing-clocks
ryardley Dec 1, 2025
14ea698
add wait for poseidon
Dec 1, 2025
5e96380
remove logging
Dec 1, 2025
a860e87
add logging
Dec 1, 2025
10cb525
add logging
Dec 1, 2025
f9be99e
try buffering keyshare created events
Dec 1, 2025
71fc523
headers
Dec 1, 2025
af374f0
public key aggregator does not need sortition
Dec 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 13 additions & 13 deletions crates/aggregator/src/committee_finalizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

use actix::prelude::*;
use e3_events::{
CommitteeFinalizeRequested, CommitteeRequested, EnclaveEvent, EventBus, Shutdown, Subscribe,
prelude::*, BusHandle, CommitteeFinalizeRequested, CommitteeRequested, EnclaveEvent,
EnclaveEventData, Shutdown,
};
use std::collections::HashMap;
use std::time::Duration;
Expand All @@ -15,26 +16,25 @@ use tracing::{error, info};
/// CommitteeFinalizer is an actor that listens to CommitteeRequested events and dispatches
/// CommitteeFinalizeRequested events after the submission deadline has passed.
pub struct CommitteeFinalizer {
bus: Addr<EventBus<EnclaveEvent>>,
bus: BusHandle<EnclaveEvent>,
pending_committees: HashMap<String, SpawnHandle>,
}

impl CommitteeFinalizer {
pub fn new(bus: &Addr<EventBus<EnclaveEvent>>) -> Self {
pub fn new(bus: &BusHandle<EnclaveEvent>) -> Self {
Self {
bus: bus.clone(),
pending_committees: HashMap::new(),
}
}

pub fn attach(bus: &Addr<EventBus<EnclaveEvent>>) -> Addr<Self> {
pub fn attach(bus: &BusHandle<EnclaveEvent>) -> Addr<Self> {
let addr = CommitteeFinalizer::new(bus).start();

bus.do_send(Subscribe::new(
"CommitteeRequested",
bus.subscribe_all(
&["CommitteeRequested", "Shutdown"],
addr.clone().recipient(),
));
bus.do_send(Subscribe::new("Shutdown", addr.clone().recipient()));
);

addr
}
Expand All @@ -47,9 +47,9 @@ impl Actor for CommitteeFinalizer {
impl Handler<EnclaveEvent> for CommitteeFinalizer {
type Result = ();
fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result {
match msg {
EnclaveEvent::CommitteeRequested { data, .. } => ctx.notify(data),
EnclaveEvent::Shutdown { data, .. } => ctx.notify(data),
match msg.into_data() {
EnclaveEventData::CommitteeRequested(data) => ctx.notify(data),
EnclaveEventData::Shutdown(data) => ctx.notify(data),
_ => (),
}
}
Expand Down Expand Up @@ -112,9 +112,9 @@ impl Handler<CommitteeRequested> for CommitteeFinalizer {
move |act, _ctx| {
info!(e3_id = %e3_id_clone, "Dispatching CommitteeFinalizeRequested event");

bus.do_send(EnclaveEvent::from(CommitteeFinalizeRequested {
bus.publish(CommitteeFinalizeRequested {
e3_id: e3_id_clone.clone(),
}));
});

act.pending_committees.remove(&e3_id_clone.to_string());
},
Expand Down
86 changes: 42 additions & 44 deletions crates/aggregator/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,36 @@
// without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE.

use std::sync::Arc;

use crate::keyshare_created_filter_buffer::KeyshareCreatedFilterBuffer;
use crate::{
PlaintextAggregator, PlaintextAggregatorParams, PlaintextAggregatorState,
PlaintextRepositoryFactory, PublicKeyAggregator, PublicKeyAggregatorParams,
PublicKeyAggregatorState, PublicKeyRepositoryFactory, ThresholdPlaintextAggregator,
ThresholdPlaintextAggregatorParams, ThresholdPlaintextAggregatorState,
TrBfvPlaintextRepositoryFactory,
};
use actix::{Actor, Addr};
use actix::{Actor, Addr, Recipient};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use e3_data::{AutoPersist, RepositoriesFactory};
use e3_events::{BusError, EnclaveErrorType, EnclaveEvent, EventBus};
use e3_data::{AutoPersist, Persistable, RepositoriesFactory};
use e3_events::{prelude::*, E3id};
use e3_events::{BusHandle, EnclaveErrorType, EnclaveEvent, EnclaveEventData};
use e3_fhe::ext::FHE_KEY;
use e3_fhe::Fhe;
use e3_multithread::Multithread;
use e3_request::{E3Context, E3ContextSnapshot, E3Extension, META_KEY};
use e3_sortition::Sortition;

#[deprecated = "In favour of ThresholdPlaintextAggregatorExtension"]
pub struct PlaintextAggregatorExtension {
bus: Addr<EventBus<EnclaveEvent>>,
bus: BusHandle<EnclaveEvent>,
sortition: Addr<Sortition>,
}

impl PlaintextAggregatorExtension {
pub fn create(bus: &Addr<EventBus<EnclaveEvent>>, sortition: &Addr<Sortition>) -> Box<Self> {
pub fn create(bus: &BusHandle<EnclaveEvent>, sortition: &Addr<Sortition>) -> Box<Self> {
Box::new(Self {
bus: bus.clone(),
sortition: sortition.clone(),
Expand All @@ -43,7 +48,7 @@ const ERROR_PLAINTEXT_META_MISSING:&str = "Could not create PlaintextAggregator
impl E3Extension for PlaintextAggregatorExtension {
fn on_event(&self, ctx: &mut E3Context, evt: &EnclaveEvent) {
// Save plaintext aggregator
let EnclaveEvent::CiphertextOutputPublished { data, .. } = evt else {
let EnclaveEventData::CiphertextOutputPublished(data) = evt.get_data() else {
return;
};

Expand Down Expand Up @@ -143,16 +148,12 @@ impl E3Extension for PlaintextAggregatorExtension {
}

pub struct PublicKeyAggregatorExtension {
bus: Addr<EventBus<EnclaveEvent>>,
sortition: Addr<Sortition>,
bus: BusHandle<EnclaveEvent>,
}

impl PublicKeyAggregatorExtension {
pub fn create(bus: &Addr<EventBus<EnclaveEvent>>, sortition: &Addr<Sortition>) -> Box<Self> {
Box::new(Self {
bus: bus.clone(),
sortition: sortition.clone(),
})
pub fn create(bus: &BusHandle<EnclaveEvent>) -> Box<Self> {
Box::new(Self { bus: bus.clone() })
}
}

Expand All @@ -163,7 +164,7 @@ const ERROR_PUBKEY_META_MISSING:&str = "Could not create PublicKeyAggregator bec
impl E3Extension for PublicKeyAggregatorExtension {
fn on_event(&self, ctx: &mut E3Context, evt: &EnclaveEvent) {
// Saving the publickey aggregator with deps on E3Requested
let EnclaveEvent::E3Requested { data, .. } = evt else {
let EnclaveEventData::E3Requested(data) = evt.get_data() else {
return;
};

Expand All @@ -187,22 +188,10 @@ impl E3Extension for PublicKeyAggregatorExtension {
meta.threshold_n,
meta.seed,
)));
ctx.set_event_recipient(
"publickey",
Some(
PublicKeyAggregator::new(
PublicKeyAggregatorParams {
fhe: fhe.clone(),
bus: self.bus.clone(),
sortition: self.sortition.clone(),
e3_id,
},
sync_state,
)
.start()
.into(),
),
);

let value = create_publickey_aggregator(fhe.clone(), self.bus.clone(), e3_id, sync_state);

ctx.set_event_recipient("publickey", Some(value));
}

async fn hydrate(&self, ctx: &mut E3Context, snapshot: &E3ContextSnapshot) -> Result<()> {
Expand All @@ -228,18 +217,12 @@ impl E3Extension for PublicKeyAggregatorExtension {

return Ok(());
};

let value = PublicKeyAggregator::new(
PublicKeyAggregatorParams {
fhe: fhe.clone(),
bus: self.bus.clone(),
sortition: self.sortition.clone(),
e3_id: ctx.e3_id.clone(),
},
let value = create_publickey_aggregator(
fhe.clone(),
self.bus.clone(),
ctx.e3_id.clone(),
sync_state,
)
.start()
.into();
);

// send to context
ctx.set_event_recipient("publickey", Some(value));
Expand All @@ -248,15 +231,30 @@ impl E3Extension for PublicKeyAggregatorExtension {
}
}

fn create_publickey_aggregator(
fhe: Arc<Fhe>,
bus: BusHandle<EnclaveEvent>,
e3_id: E3id,
sync_state: Persistable<PublicKeyAggregatorState>,
) -> Recipient<EnclaveEvent> {
KeyshareCreatedFilterBuffer::new(
PublicKeyAggregator::new(PublicKeyAggregatorParams { fhe, bus, e3_id }, sync_state)
.start()
.into(),
)
.start()
.into()
}

pub struct ThresholdPlaintextAggregatorExtension {
bus: Addr<EventBus<EnclaveEvent>>,
bus: BusHandle<EnclaveEvent>,
sortition: Addr<Sortition>,
multithread: Addr<Multithread>,
}

impl ThresholdPlaintextAggregatorExtension {
pub fn create(
bus: &Addr<EventBus<EnclaveEvent>>,
bus: &BusHandle<EnclaveEvent>,
sortition: &Addr<Sortition>,
multithread: &Addr<Multithread>,
) -> Box<Self> {
Expand All @@ -274,7 +272,7 @@ const ERROR_TRBFV_PLAINTEXT_META_MISSING:&str = "Could not create ThresholdPlain
impl E3Extension for ThresholdPlaintextAggregatorExtension {
fn on_event(&self, ctx: &mut E3Context, evt: &EnclaveEvent) {
// Save plaintext aggregator
let EnclaveEvent::CiphertextOutputPublished { data, .. } = evt else {
let EnclaveEventData::CiphertextOutputPublished(data) = evt.get_data() else {
return;
};

Expand Down
74 changes: 74 additions & 0 deletions crates/aggregator/src/keyshare_created_filter_buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// SPDX-License-Identifier: LGPL-3.0-only
//
// This file is provided WITHOUT ANY WARRANTY;
// without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE.

use actix::prelude::*;

use e3_events::{prelude::*, EnclaveEvent, EnclaveEventData};
use std::collections::HashSet;

use crate::PublicKeyAggregator;

/// Buffer KeyshareCreated events until CommitteeFinalized has been published
pub struct KeyshareCreatedFilterBuffer {
dest: Addr<PublicKeyAggregator>,
committee: Option<HashSet<String>>,
buffer: Vec<EnclaveEvent>,
}

impl KeyshareCreatedFilterBuffer {
pub fn new(dest: Addr<PublicKeyAggregator>) -> Self {
Self {
dest,
committee: None,
buffer: Vec::new(),
}
}

fn process_buffered_events(&mut self) {
if let Some(ref committee) = self.committee {
for event in self.buffer.drain(..) {
if let EnclaveEventData::KeyshareCreated(data) = event.get_data() {
if committee.contains(&data.node) {
self.dest.do_send(event);
}
}
}
}
}
}
Comment thread
ryardley marked this conversation as resolved.

impl Actor for KeyshareCreatedFilterBuffer {
type Context = Context<Self>;
}

impl Handler<EnclaveEvent> for KeyshareCreatedFilterBuffer {
type Result = ();

fn handle(&mut self, msg: EnclaveEvent, _ctx: &mut Self::Context) -> Self::Result {
match msg.get_data() {
EnclaveEventData::KeyshareCreated(data) => match &self.committee {
Some(committee) if committee.contains(&data.node) => {
// if the committee is ready then process
self.dest.do_send(msg);
}
None => {
// if not buffer
self.buffer.push(msg);
}
_ => {}
},
EnclaveEventData::CommitteeFinalized(data) => {
self.dest.do_send(msg.clone()); // forward committee first
self.committee = Some(data.committee.iter().cloned().collect());
self.process_buffered_events();
}
_ => {
// forward all other events
self.dest.do_send(msg);
}
}
}
}
1 change: 1 addition & 0 deletions crates/aggregator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

mod committee_finalizer;
pub mod ext;
mod keyshare_created_filter_buffer;
mod plaintext_aggregator;
mod publickey_aggregator;
mod repo;
Expand Down
20 changes: 10 additions & 10 deletions crates/aggregator/src/plaintext_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use actix::prelude::*;
use anyhow::Result;
use e3_data::Persistable;
use e3_events::{
DecryptionshareCreated, Die, E3id, EnclaveEvent, EventBus, OrderedSet, PlaintextAggregated,
Seed,
prelude::*, BusHandle, DecryptionshareCreated, Die, E3id, EnclaveEvent, EnclaveEventData,
OrderedSet, PlaintextAggregated, Seed,
};
use e3_fhe::{Fhe, GetAggregatePlaintext};
use e3_sortition::{GetNodeIndex, Sortition};
Expand Down Expand Up @@ -72,15 +72,15 @@ struct ComputeAggregate {
#[deprecated = "To be replaced by ThresholdPlaintextAggregator"]
pub struct PlaintextAggregator {
fhe: Arc<Fhe>,
bus: Addr<EventBus<EnclaveEvent>>,
bus: BusHandle<EnclaveEvent>,
sortition: Addr<Sortition>,
e3_id: E3id,
state: Persistable<PlaintextAggregatorState>,
}

pub struct PlaintextAggregatorParams {
pub fhe: Arc<Fhe>,
pub bus: Addr<EventBus<EnclaveEvent>>,
pub bus: BusHandle<EnclaveEvent>,
pub sortition: Addr<Sortition>,
pub e3_id: E3id,
}
Expand Down Expand Up @@ -145,9 +145,9 @@ impl Actor for PlaintextAggregator {
impl Handler<EnclaveEvent> for PlaintextAggregator {
type Result = ();
fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result {
match msg {
EnclaveEvent::DecryptionshareCreated { data, .. } => ctx.notify(data),
EnclaveEvent::E3RequestComplete { .. } => ctx.notify(Die),
match msg.into_data() {
EnclaveEventData::DecryptionshareCreated(data) => ctx.notify(data),
EnclaveEventData::E3RequestComplete(_) => ctx.notify(Die),
_ => (),
}
}
Expand Down Expand Up @@ -234,12 +234,12 @@ impl Handler<ComputeAggregate> for PlaintextAggregator {
self.set_decryption(decrypted_output.clone())?;

// Dispatch the PlaintextAggregated event
let event = EnclaveEvent::from(PlaintextAggregated {
let event = PlaintextAggregated {
decrypted_output: vec![ArcBytes::from_bytes(&decrypted_output)],
e3_id: self.e3_id.clone(),
});
};

self.bus.do_send(event);
self.bus.publish(event);

Ok(())
}
Expand Down
Loading
Loading