Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
e2846cc
draft refactor to generic reqres
Feb 23, 2026
3062690
tidy up debug comments
Feb 24, 2026
d8bf5ed
update abstraction
Feb 24, 2026
063f139
update direct requester
Feb 24, 2026
ed2e0ed
formatting
Feb 24, 2026
675447b
headers
Feb 24, 2026
afa2568
make it easy to test with direct requester tester
Feb 24, 2026
9730fe2
Merge branch 'main' into ry/1325-net-event-sync
ryardley Feb 24, 2026
7281088
add failure test
Feb 24, 2026
42931a0
fix bad type
Feb 24, 2026
c2a9eb8
formatting
Feb 24, 2026
3d4d60c
Merge branch 'main' into ry/1325-net-event-sync
ryardley Feb 26, 2026
fbb75cc
refactor: add DirectResponder and ProtocolResponse for structured req…
Feb 26, 2026
1ec5d44
headers
Feb 26, 2026
d272971
tidy up imports
Feb 26, 2026
2dcf48c
first attempt at batch query
Feb 26, 2026
c9417a1
fix type
Feb 26, 2026
70d6aad
feat: add limit and filter to EventStoreQueryBy, make EventLog and St…
Feb 26, 2026
d6c5076
refactor: simplify EventLog trait by removing type parameter
Feb 26, 2026
c7346fb
tidy up formatting
Feb 26, 2026
8008052
fix: improve doc examples and add error context in DirectResponder
Feb 26, 2026
83e4336
Merge branch 'main' into ry/1325-net-event-sync
ryardley Feb 26, 2026
37f4aff
feat: propagate limit and filter options in EventStoreRouter
Feb 26, 2026
090abb4
avoid spurious intermediate event states
Feb 26, 2026
76e8b90
add test event builder for testing sync events
Feb 27, 2026
85c0e1e
add tests and use feature flag
Feb 27, 2026
795a7dc
Merge branch 'main' into ry/1325-net-event-sync
ryardley Feb 27, 2026
5918599
Merge branch 'main' into ry/1325-net-event-sync
ryardley Feb 28, 2026
0b72951
update HistoricalSyncNetEventsReceived
Feb 28, 2026
5c62fca
Merge branch 'main' into ry/1325-net-event-sync
ryardley Feb 28, 2026
6dad25e
remove generic
Mar 1, 2026
3712e31
refactor to allow simuating libp2p at the net_interface level
Mar 1, 2026
e4ac1b4
fix not awaiting async funnction
Mar 1, 2026
725a452
fix pnpm lock
Mar 1, 2026
691bfad
update pnpm lock file
Mar 1, 2026
ccd379d
fix headers
Mar 1, 2026
1fd7972
Merge branch 'main' into ry/1325-net-event-sync
Mar 1, 2026
db06cee
fix tests
Mar 1, 2026
52e1469
update names
Mar 1, 2026
36fc707
rename and tidy up
Mar 1, 2026
186d572
fix up bad field
Mar 1, 2026
19df585
Merge branch 'main' into ry/1325-net-event-sync
ryardley Mar 2, 2026
238d0fe
--wip-- [skip ci]
Mar 2, 2026
e4b3fde
tidy up libp2p simulation to actually use net components
Mar 2, 2026
73eaada
add todo comment for tomorrow
Mar 2, 2026
f103b2a
make all tests pass locally
Mar 2, 2026
470d088
remove old history collector
Mar 2, 2026
281466d
ensure timeouts are long enough so that tests pass
Mar 3, 2026
b13edbd
Merge branch 'main' into ry/1325-net-event-sync
ryardley Mar 3, 2026
b1ed650
unsubscribe on wait_for
Mar 3, 2026
8f4a599
Merge branch 'main' into ry/1325-net-event-sync
Mar 3, 2026
a4ccf08
fix bad merge
Mar 4, 2026
23fcb76
fix up unlikely error condition
Mar 4, 2026
cbfae20
fix up bugs in requester
Mar 4, 2026
19bd161
Revert "fix up unlikely error condition"
Mar 4, 2026
dfecba9
add other code fixes
Mar 4, 2026
0b6ebfa
Merge branch 'main' into ry/1325-net-event-sync
ryardley Mar 4, 2026
94e6e13
Merge branch 'main' into ry/1325-net-event-sync
ryardley Mar 6, 2026
5a74145
add comma
Mar 6, 2026
accc8ea
fix up output and use arcbytes for large keys
Mar 6, 2026
6179f2e
add public key aggregated event
Mar 6, 2026
fd72e35
Merge branch 'main' into ry/1325-net-event-sync
ryardley Mar 7, 2026
a449bf3
Merge branch 'main' into ry/1325-net-event-sync
ryardley Mar 8, 2026
8707413
Merge branch 'main' into ry/1325-net-event-sync
ryardley Mar 9, 2026
c5deb83
use iter() over to_vec().into_iter()
Mar 9, 2026
3554b13
Merge branch 'main' into ry/1325-net-event-sync
ryardley Mar 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions crates/aggregator/src/publickey_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ pub enum PublicKeyAggregatorState {
no_proof_parties: Vec<u64>,
},
GeneratingC5Proof {
public_key: Vec<u8>,
public_key: ArcBytes,
public_key_hash: [u8; 32],
keyshare_bytes: Vec<ArcBytes>,
nodes: OrderedSet<String>,
},
Complete {
public_key: Vec<u8>,
public_key: ArcBytes,
keyshares: OrderedSet<ArcBytes>,
nodes: OrderedSet<String>,
},
Expand Down Expand Up @@ -293,12 +293,13 @@ impl PublicKeyAggregator {
let committee_h = honest_keyshares.len();

info!("Publishing PkAggregationProofPending for C5 proof generation...");
let pubkey = ArcBytes::from_bytes(&pubkey);
self.bus.publish(
PkAggregationProofPending {
e3_id: self.e3_id.clone(),
proof_request: PkAggregationProofRequest {
keyshare_bytes: honest_keyshares.clone(),
aggregated_pk_bytes: ArcBytes::from_bytes(&pubkey),
aggregated_pk_bytes: pubkey.clone(),
params_preset: self.params_preset.clone(),
// this field is not really used in the circuit, we only use H
committee_n: committee_h,
Expand All @@ -316,7 +317,7 @@ impl PublicKeyAggregator {
// Transition to GeneratingC5Proof
self.state.try_mutate(&ec, |_| {
Ok(PublicKeyAggregatorState::GeneratingC5Proof {
public_key: pubkey,
public_key: pubkey.clone(),
public_key_hash,
keyshare_bytes: honest_keyshares,
nodes: honest_nodes_set,
Expand Down
1 change: 1 addition & 0 deletions crates/ciphernode-builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ e3-sortition.workspace = true
e3-sync.workspace = true
e3-trbfv.workspace = true
e3-utils.workspace = true
libp2p.workspace = true
rayon.workspace = true
tempfile.workspace = true
tokio.workspace = true
Expand Down
27 changes: 19 additions & 8 deletions crates/ciphernode-builder/src/ciphernode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use actix::Addr;
use anyhow::Result;
use e3_data::{DataStore, InMemStore, StoreAddr};
use e3_events::{BusHandle, EnclaveEvent, HistoryCollector};
use tokio::task::JoinHandle;
use e3_net::NetChannelBridge;
use libp2p::PeerId;

/// A Sharable handle to a Ciphernode. NOTE: clones are available for use in the CiphernodeSystem
/// but they cannot await the task.
Expand All @@ -19,19 +20,27 @@ pub struct CiphernodeHandle {
pub bus: BusHandle,
pub history: Option<Addr<HistoryCollector<EnclaveEvent>>>,
pub errors: Option<Addr<HistoryCollector<EnclaveEvent>>>,
pub peer_id: String,
pub join_handle: JoinHandle<Result<()>>,
pub peer_id: PeerId,
pub channel_bridge: Option<NetChannelBridge>,
}

impl PartialEq for CiphernodeHandle {
fn eq(&self, other: &Self) -> bool {
self.address == other.address && self.peer_id == other.peer_id
}
}

impl Eq for CiphernodeHandle {}

impl CiphernodeHandle {
pub fn new(
address: String,
store: DataStore,
bus: BusHandle,
history: Option<Addr<HistoryCollector<EnclaveEvent>>>,
errors: Option<Addr<HistoryCollector<EnclaveEvent>>>,
peer_id: String,
join_handle: JoinHandle<Result<()>>,
peer_id: PeerId,
channel_bridge: Option<NetChannelBridge>,
) -> Self {
Self {
address,
Expand All @@ -40,7 +49,7 @@ impl CiphernodeHandle {
history,
errors,
peer_id,
join_handle,
channel_bridge,
}
}

Expand All @@ -64,8 +73,10 @@ impl CiphernodeHandle {
&self.store
}

pub fn split(self) -> (BusHandle, JoinHandle<Result<()>>) {
(self.bus, self.join_handle)
pub fn channel_bridge(&self) -> Result<NetChannelBridge> {
Ok(self.channel_bridge.clone().ok_or(anyhow::anyhow!(
"No channel bridge exists. We are likely not in test mode"
))?)
}

pub fn in_mem_store(&self) -> Option<&Addr<InMemStore>> {
Expand Down
40 changes: 22 additions & 18 deletions crates/ciphernode-builder/src/ciphernode_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use e3_fhe::ext::FheExtension;
use e3_fhe_params::{BfvPreset, DEFAULT_BFV_PRESET};
use e3_keyshare::ext::ThresholdKeyshareExtension;
use e3_multithread::{Multithread, MultithreadReport, TaskPool};
use e3_net::{setup_net, NetRepositoryFactory};
use e3_net::{
create_channel_bridge, setup_libp2p_keypair, setup_net, setup_net_interface,
NetRepositoryFactory,
};
use e3_request::E3Router;
use e3_sortition::{
CiphernodeSelector, CiphernodeSelectorFactory, FinalizedCommitteesRepositoryFactory,
Expand All @@ -32,6 +35,7 @@ use e3_sortition::{
use e3_sync::sync;
use e3_utils::SharedRng;
use e3_zk_prover::{setup_zk_actors, ZkBackend};
use libp2p::PeerId;
use std::time::Duration;
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use tracing::{error, info};
Expand Down Expand Up @@ -496,28 +500,28 @@ impl CiphernodeBuilder {
))
}

info!("building...");

info!("E3Router building...");
e3_builder.build().await?;

let (join_handle, peer_id) = if let Some(net_config) = self.net_config {
let topic = "enclave-gossip";
let (peer_id, interface, channel_bridge) = if let Some(net_config) = self.net_config {
// Setup real net interface
let repositories = store.repositories();
setup_net(
bus.clone(),
net_config.peers,
&self.cipher,
net_config.quic_port,
repositories.libp2p_keypair(),
eventstore_ts,
)
.await?
let keypair = setup_libp2p_keypair(repositories.libp2p_keypair(), &self.cipher).await?;
let peer_id = keypair.peer_id();
let interface =
setup_net_interface(topic, keypair, net_config.peers, net_config.quic_port)?;
(peer_id, interface, None)
} else {
(
tokio::spawn(std::future::ready(Ok(()))),
"-not set-".to_string(),
)
// Setup test net interface with random PeerId
let (interface, channel_bridge) = create_channel_bridge();
let peer_id = PeerId::random();
let channel_bridge = Some(channel_bridge);
(peer_id, interface, channel_bridge)
};

setup_net(topic, bus.clone(), eventstore_ts, interface)?;

// Run the sync routine
sync(
&bus,
Expand All @@ -535,7 +539,7 @@ impl CiphernodeBuilder {
history,
errors,
peer_id,
join_handle,
channel_bridge,
))
}

Expand Down
51 changes: 9 additions & 42 deletions crates/entrypoint/src/helpers/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,53 +7,20 @@
use e3_ciphernode_builder::CiphernodeHandle;
use e3_events::{prelude::*, Shutdown};
use std::time::Duration;
use tokio::{
select,
signal::unix::{signal, SignalKind},
};
use tokio::signal::unix::{signal, SignalKind};
use tracing::{error, info};

pub async fn listen_for_shutdown(node: CiphernodeHandle) {
let (bus, mut handle) = node.split();
let bus = node.bus;
let mut sigterm =
signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal stream");
select! {
_ = sigterm.recv() => {
info!("SIGTERM received, initiating graceful shutdown...");
sigterm.recv().await;
info!("SIGTERM received, initiating graceful shutdown...");

// Stop the actor system
match bus.publish_without_context(Shutdown){
Ok(_) => (),
Err(e) => error!("Shutdown failed to publish! {e}")
}

// Wait for all events to propagate
tokio::time::sleep(Duration::from_secs(2)).await;

// Abort the spawned task
handle.abort();

// Wait for all actor processes to disconnect
tokio::time::sleep(Duration::from_secs(2)).await;

// Wait for the task to finish
let _ = handle.await;

info!("Graceful shutdown complete");

}
result = &mut handle => {
match result {
Ok(Ok(_)) => {
info!("Completed");
}
Ok(Err(e)) => {
error!("Failed: {}", e);
}
Err(e) => {
error!("Panicked: {}", e);
}
}
}
if let Err(e) = bus.publish_without_context(Shutdown) {
error!("Shutdown failed to publish! {e}");
}

tokio::time::sleep(Duration::from_secs(2)).await;
info!("Graceful shutdown complete");
}
18 changes: 16 additions & 2 deletions crates/events/src/bus_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
use actix::{Actor, Addr, Handler, Recipient};
use anyhow::Result;
use derivative::Derivative;
use e3_utils::MAILBOX_LIMIT;
use std::marker::PhantomData;
use e3_utils::{actix::channel::oneshot, MAILBOX_LIMIT};
use std::{future::Future, marker::PhantomData, pin::Pin};
use tracing::error;

use crate::{
Expand Down Expand Up @@ -287,6 +287,20 @@ impl<S> EventSubscriber<EnclaveEvent<Sequenced>> for BusHandle<S> {
self.event_bus
.do_send(Unsubscribe::new(event_type, recipient));
}

fn wait_for(
&self,
event_type: EventType,
) -> Pin<Box<dyn Future<Output = Result<EnclaveEvent<Sequenced>>> + Send>> {
let (addr, rx) = oneshot::<EnclaveEvent<Sequenced>>();
self.subscribe(event_type, addr.clone());
let bus = self.event_bus.clone();
Box::pin(async move {
let r = rx.await?;
bus.do_send(Unsubscribe::new(event_type, addr));
Ok(r)
})
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

impl<S> EventContextManager for BusHandle<S> {
Expand Down
2 changes: 1 addition & 1 deletion crates/events/src/enclave_event/enable_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::fmt::{self, Display};
#[derive(Message, Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[rtype(result = "()")]
pub struct EffectsEnabled {
pub correlation_id: CorrelationId,
correlation_id: CorrelationId,
}

impl EffectsEnabled {
Expand Down
Loading
Loading