Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions crates/aggregator/src/committee_finalizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

use actix::prelude::*;
use e3_events::{
prelude::*, trap, BusHandle, CommitteeFinalizeRequested, CommitteeRequested, E3Failed, E3Stage,
E3StageChanged, EType, EnclaveEvent, EnclaveEventData, EventType, Shutdown, TypedEvent,
prelude::*, run_once, trap, BusHandle, CommitteeFinalizeRequested, CommitteeRequested,
E3Failed, E3Stage, E3StageChanged, EType, EffectsEnabled, EnclaveEvent, EnclaveEventData,
EventType, Shutdown, TypedEvent,
};
use e3_utils::{NotifySync, MAILBOX_LIMIT};
use std::collections::HashMap;
Expand All @@ -32,16 +33,31 @@ impl CommitteeFinalizer {
pub fn attach(bus: &BusHandle) -> Addr<Self> {
let addr = CommitteeFinalizer::new(bus).start();

// Subscribe to state-building / cleanup events immediately
bus.subscribe_all(
&[
EventType::CommitteeRequested,
EventType::Shutdown,
EventType::E3Failed,
EventType::E3StageChanged,
],
addr.clone().recipient(),
);

// Gate CommitteeRequested behind EffectsEnabled — finalization should not
// be scheduled during historical event replay.
bus.subscribe(
EventType::EffectsEnabled,
run_once::<EffectsEnabled>({
let bus = bus.clone();
let addr = addr.clone();
move |_| {
bus.subscribe(EventType::CommitteeRequested, addr.into());
Ok(())
}
})
.recipient(),
);

addr
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/evm/src/enclave_sol_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use e3_events::EventType;
use e3_events::Shutdown;
use e3_events::{prelude::*, EffectsEnabled};
use e3_events::{run_once, EnclaveEvent};
use e3_events::{E3Stage, E3StageChanged};
use e3_events::{E3id, EType, PlaintextAggregated};
use e3_events::{E3Stage, E3StageChanged, E3id};
use e3_events::{EType, PlaintextAggregated};
Comment thread
ctrlc03 marked this conversation as resolved.
use e3_utils::NotifySync;
use e3_utils::MAILBOX_LIMIT;
use tracing::info;
Expand Down
27 changes: 25 additions & 2 deletions crates/multithread/src/multithread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ impl Multithread {
) -> Addr<Self> {
let addr = Self::new(bus.clone(), rng.clone(), cipher.clone(), task_pool, report).start();

// Gate ComputeRequest behind EffectsEnabled — proof generation should
// not trigger during historical event replay.
bus.subscribe(
EventType::EffectsEnabled,
run_once::<EffectsEnabled>({
Expand Down Expand Up @@ -165,7 +167,29 @@ impl Multithread {
let actor = Self::new(bus.clone(), rng.clone(), cipher.clone(), task_pool, report)
.with_zk_prover(zk_prover);
let addr = actor.start();
bus.subscribe(EventType::ComputeRequest, addr.clone().recipient());
bus.subscribe_all(
&[
EventType::E3Failed,
EventType::E3StageChanged,
EventType::E3RequestComplete,
],
addr.clone().into(),
);
Comment thread
hmzakhalid marked this conversation as resolved.

bus.subscribe(
EventType::EffectsEnabled,
run_once::<EffectsEnabled>({
let bus = bus.clone();
let addr = addr.clone();
move |_| {
bus.subscribe(EventType::ComputeRequest, addr.clone().recipient());
info!("Multithread actor with ZK listening for events.");
Ok(())
}
})
.recipient(),
);

addr
}

Expand All @@ -184,7 +208,6 @@ impl Actor for Multithread {
impl Handler<EnclaveEvent> for Multithread {
type Result = ();
fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result {
info!("Multithread received EnclaveEvent!");
let (data, ec) = msg.into_components();
match data {
EnclaveEventData::ComputeRequest(data) => ctx.notify(TypedEvent::new(data, ec)),
Expand Down
16 changes: 12 additions & 4 deletions crates/net/src/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ use e3_utils::{retry_with_backoff, to_retry, OnceTake, RetryError};
const DIAL_DELAY: u64 = 3000;
const DIAL_RETRIES: u32 = 10;

/// Per-connection timeout: if no swarm events arrive within this window during
/// a single dial attempt, we treat it as timed out and retry.
const DIAL_EVENT_TIMEOUT: Duration = Duration::from_secs(60);

/// Dial a single Multiaddr with retries and return an error should those retries not work
async fn dial_multiaddr(
cmd_tx: &mpsc::Sender<NetCommand>,
Expand Down Expand Up @@ -53,18 +57,22 @@ fn trace_error(r: Result<()>) {
/// * `cmd_tx` - Sender for network peer commands
/// * `event_tx` - Broadcast sender for peer events
/// * `peers` - List of peer addresses to connect to
///
/// # Returns
/// The number of peers that were successfully connected to.
pub async fn dial_peers(
cmd_tx: &mpsc::Sender<NetCommand>,
event_tx: &broadcast::Sender<NetEvent>,
peers: &Vec<String>,
) -> Result<()> {
) -> Result<usize> {
let futures: Vec<_> = peers
.iter()
.map(|addr| dial_multiaddr(cmd_tx, event_tx, addr))
.collect();
let results = join_all(futures).await;
let connected = results.iter().filter(|r| r.is_ok()).count();
results.into_iter().for_each(trace_error);
Ok(())
Ok(connected)
}

/// Attempt a connection with retries to a multiaddr.
Expand Down Expand Up @@ -140,8 +148,8 @@ async fn wait_for_connection(
_ => (),
}
}
_ = sleep(Duration::from_secs(60)) => {
warn!("Connection attempt timed out after 60 seconds of no events");
_ = sleep(DIAL_EVENT_TIMEOUT) => {
warn!("Connection attempt timed out after {:?} of no events", DIAL_EVENT_TIMEOUT);
return Err(RetryError::Retry(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"Connection attempt timed out",
Expand Down
12 changes: 7 additions & 5 deletions crates/net/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,6 @@ pub struct OutgoingRequestFailed {
pub error: String,
}

#[derive(Message, Debug, Clone)]
#[rtype("()")]
pub struct AllPeersDialed;

/// Libp2pNetInterface Commands are sent to the network peer over a mspc channel
#[derive(Debug, Clone)]
pub enum NetCommand {
Expand Down Expand Up @@ -259,7 +255,13 @@ pub enum NetEvent {
/// Received response from a peer in response to an outgoing request
OutgoingRequestSucceeded(OutgoingRequestSucceeded),
OutgoingRequestFailed(OutgoingRequestFailed),
AllPeersDialed,
/// All configured peers have been dialed (not all necessarily connected).
AllPeersDialed {
/// Number of peers that successfully connected.
connected: usize,
/// Total number of peers that were dialed.
total: usize,
},
}

#[derive(Clone, Debug)]
Expand Down
Loading
Loading