Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/net/src/net_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ impl Libp2pNetInterface {
self.swarm.listen_on(addr.parse()?)?;

trace!("Peers to dial: {:?}", self.peers);
if self.peers.is_empty() {
info!("Found 0 peers to dial");
} else {
info!("Found {} peer(s) to dial:", self.peers.len());
for peer in &self.peers {
info!(" -> {}", peer);
}
}
tokio::spawn({
let event_tx = event_tx.clone();
let cmd_tx = cmd_tx.clone();
Expand Down
52 changes: 37 additions & 15 deletions crates/net/src/net_sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,32 +360,54 @@ async fn handle_sync_request_event(

let mut all_events: Vec<EnclaveEvent<Unsequenced>> = Vec::new();
let mut latest_timestamp: u128 = 0;
let mut failed_aggregates: Vec<AggregateId> = Vec::new();

for (aggregate_id, since) in event.since.iter() {
info!(
"Requesting batched events for aggregate_id={} since={}",
aggregate_id, since
);
let requester = DirectRequester::builder(net_cmds.clone(), net_events.clone()).build();
let events: Vec<EnclaveEvent<Unsequenced>> =
fetch_all_batched_events(requester, PeerTarget::Random, *aggregate_id, *since, 100)
.await?;

info!(
"Received {} events for aggregate_id={}",
events.len(),
aggregate_id
);

for enclave_event in events {
let ts = enclave_event.ts();
if ts > latest_timestamp {
latest_timestamp = ts;
match fetch_all_batched_events::<EnclaveEvent<Unsequenced>>(
requester,
PeerTarget::Random,
*aggregate_id,
*since,
100,
)
.await
{
Ok(events) => {
info!(
"Received {} events for aggregate_id={}",
events.len(),
aggregate_id
);
for enclave_event in events {
let ts = enclave_event.ts();
if ts > latest_timestamp {
latest_timestamp = ts;
}
all_events.push(enclave_event);
}
}
Err(e) => {
warn!(
"Failed to fetch events for aggregate_id={}: {e}. Continuing with available events.",
aggregate_id
);
failed_aggregates.push(*aggregate_id);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
all_events.push(enclave_event);
}
}

if !failed_aggregates.is_empty() {
bail!(
"failed to fetch historical net events for aggregates: {:?}",
failed_aggregates
);
}

info!(
"Sync complete: collected {} events across {} aggregates, latest_timestamp={}",
all_events.len(),
Expand Down
51 changes: 40 additions & 11 deletions crates/sortition/src/ciphernode_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ use e3_utils::MAILBOX_LIMIT;
use std::collections::HashMap;
use tracing::info;

/// Build an `E3Meta` from an `E3Requested` event's fields.
fn e3_meta_from(req: &E3Requested) -> E3Meta {
E3Meta {
seed: req.seed,
threshold_n: req.threshold_n,
threshold_m: req.threshold_m,
params: req.params.clone(),
esi_per_ct: req.esi_per_ct,
error_size: req.error_size.clone(),
}
}

/// CiphernodeSelector is an actor that determines if a ciphernode is part of a committee and if so
/// emits a TicketGenerated event (score sortition) to the event bus
pub struct CiphernodeSelector {
Expand Down Expand Up @@ -57,6 +69,7 @@ impl CiphernodeSelector {
let e3_cache = selector_store.load_or_default(HashMap::new()).await?;
let addr = CiphernodeSelector::new(bus, e3_cache, address).start();

bus.subscribe(EventType::E3Requested, addr.clone().recipient());
bus.subscribe(EventType::E3RequestComplete, addr.clone().recipient());
bus.subscribe(EventType::CommitteeFinalized, addr.clone().recipient());
bus.subscribe(EventType::Shutdown, addr.clone().recipient());
Expand All @@ -71,6 +84,7 @@ impl Handler<EnclaveEvent> for CiphernodeSelector {
fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result {
let (msg, ec) = msg.into_components();
match msg {
EnclaveEventData::E3Requested(data) => self.notify_sync(ctx, TypedEvent::new(data, ec)),
EnclaveEventData::E3RequestComplete(data) => {
self.notify_sync(ctx, TypedEvent::new(data, ec))
}
Expand All @@ -83,6 +97,31 @@ impl Handler<EnclaveEvent> for CiphernodeSelector {
}
}

/// Handles `E3Requested` events received directly from the EventBus.
///
/// This handler populates `e3_cache` during sync replay, when `Sortition` gates its
/// `E3Requested` subscription behind `EffectsEnabled` and therefore does NOT forward
/// `WithSortitionTicket` messages to us. Without this handler the cache would be empty
/// when `CommitteeFinalized` arrives during replay, causing a missing-meta error.
///
/// During live operation both this handler AND the `WithSortitionTicket` handler fire for
/// the same E3. `or_insert` ensures the first write wins; the `WithSortitionTicket`
/// handler then overwrites with identical data via `insert`.
impl Handler<TypedEvent<E3Requested>> for CiphernodeSelector {
type Result = ();

fn handle(&mut self, msg: TypedEvent<E3Requested>, _: &mut Self::Context) -> Self::Result {
trap(EType::Sortition, &self.bus.with_ec(msg.get_ctx()), || {
self.e3_cache.try_mutate(msg.get_ctx(), |mut cache| {
cache
.entry(msg.e3_id.clone())
.or_insert_with(|| e3_meta_from(&msg));
Ok(cache)
})
})
}
}

impl Handler<WithSortitionTicket<TypedEvent<E3Requested>>> for CiphernodeSelector {
type Result = ();

Expand All @@ -97,17 +136,7 @@ impl Handler<WithSortitionTicket<TypedEvent<E3Requested>>> for CiphernodeSelecto
"Mutating e3_cache: appending data: {:?}",
data.e3_id.clone()
);
cache.insert(
data.e3_id.clone(),
E3Meta {
seed: data.seed,
threshold_n: data.threshold_n,
threshold_m: data.threshold_m,
params: data.params.clone(),
esi_per_ct: data.esi_per_ct,
error_size: data.error_size.clone(),
},
);
cache.insert(data.e3_id.clone(), e3_meta_from(&data));
Ok(cache)
})?;

Expand Down
1 change: 1 addition & 0 deletions crates/sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ fn is_infrastructure_event(event: &EnclaveEvent) -> bool {
EnclaveEventData::SyncEnded(_)
| EnclaveEventData::EffectsEnabled(_)
| EnclaveEventData::HistoricalEvmSyncStart(_)
| EnclaveEventData::HistoricalNetSyncStart(_)
)
}

Expand Down
Loading