Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,9 @@ jobs:
needs: [detect_changes]
if: needs.detect_changes.outputs.build_enclave_cli == 'true'
timeout-minutes: 20
runs-on: 'ubuntu-latest'
runs-on:
group: enclave-ci
labels: [enclave-ci-runner]
Comment thread
ctrlc03 marked this conversation as resolved.
permissions:
contents: read
actions: write
Expand Down Expand Up @@ -653,7 +655,7 @@ jobs:

crisp_e2e:
needs: [detect_changes, build_enclave_cli, build_crisp_sdk]
if: needs.detect_changes.outputs.crisp == 'true'
if: needs.detect_changes.outputs.crisp == 'true'
timeout-minutes: 45
runs-on:
group: enclave-ci
Expand Down
2 changes: 0 additions & 2 deletions crates/aggregator/src/proof_fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ pub struct ProofFoldState {
accumulated: Option<Proof>,
remaining: Vec<Proof>,
/// Total fold steps (for progress logging). Set when fold starts.
#[serde(default, skip_serializing_if = "Option::is_none")]
total_steps: Option<usize>,
/// Set when all fold steps have completed.
pub result: Option<Proof>,
/// `start` was called with zero proofs — folding is complete with no aggregate.
#[serde(default)]
pub fold_input_was_empty: bool,
}

Expand Down
12 changes: 9 additions & 3 deletions crates/data/src/commit_log_event_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@ use e3_events::{EnclaveEvent, EventLog, Unsequenced};
use std::path::PathBuf;
use tracing::error;

/// Maximum message size for both reads and writes (32 MB).
const MAX_MESSAGE_BYTES: usize = 32 * 1024 * 1024;

pub struct CommitLogEventLog {
log: CommitLog,
}

impl CommitLogEventLog {
pub fn new(path: &PathBuf) -> Result<Self> {
let mut opts = LogOptions::new(path);
// TODO: drive this from config - currently set high to be permissive
opts.message_max_bytes(32 * 1024 * 1024);
// TODO: derive this from config - currently set high to be permissive
opts.message_max_bytes(MAX_MESSAGE_BYTES);
let log = CommitLog::new(opts)?;
Ok(Self { log })
}
Expand All @@ -47,7 +50,10 @@ impl EventLog for CommitLogEventLog {
let mut events = Vec::new();

loop {
let message_buf = match self.log.read(current_offset, ReadLimit::default()) {
let message_buf = match self
.log
.read(current_offset, ReadLimit::max_bytes(MAX_MESSAGE_BYTES))
{
Ok(msgs) => msgs,
Err(_) => break,
};
Expand Down
4 changes: 3 additions & 1 deletion crates/events/src/eventstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ impl<I: SequenceIndex, L: EventLog> EventStore<I, L> {
let Some(seq) = self.index.seek(query)? else {
return Ok(vec![]);
};
Ok(self.collect_events(self.log.read_from(seq), filter, limit))
let events: Vec<_> = self.log.read_from(seq).collect();
let result = self.collect_events(Box::new(events.into_iter()), filter, limit);
Ok(result)
Comment thread
ctrlc03 marked this conversation as resolved.
}

/// Query events by sequence number. Returns events at or after the given sequence.
Expand Down
20 changes: 12 additions & 8 deletions crates/evm-helpers/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,10 @@ async fn test_overlapping_listener_handlers() -> Result<()> {
println!("PublishMessage '{}' ({} since sent)", msg, time_diff);

let _ = tx.try_send("waiting".to_string());
// Wait 200ms before publishing to simulate long running handlers
sleep(Duration::from_millis(200)).await;
// Wait long enough to simulate a long-running handler. Must be large
// enough that "two" (sent 100ms later) arrives before this completes,
// even on slow CI runners with ~50ms event delivery latency.
sleep(Duration::from_millis(1000)).await;
println!("Sending message: '{msg}'");
let _ = tx.try_send(msg);
Ok(())
Expand All @@ -155,11 +157,11 @@ async fn test_overlapping_listener_handlers() -> Result<()> {
let _ = tokio::spawn(async move { spawn_event_listener.listen().await });

// Events should be returned roughly in this order:
// 0ms : one
// 0ms : waiting
// 100ms : two
// 200ms : three
// 300ms : four
// 0ms : one
// 0ms : waiting
// 100ms : two
// 1000ms : three (after long-running handler completes)
// 1300ms : four

let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
contract
Expand Down Expand Up @@ -187,7 +189,9 @@ async fn test_overlapping_listener_handlers() -> Result<()> {
.watch()
.await?;

sleep(Duration::from_millis(300)).await;
// Wait for the long-running PublishMessage handler (1000ms) to complete
// before sending "four", so "three" arrives before "four".
sleep(Duration::from_millis(1200)).await;

let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
contract
Expand Down
4 changes: 2 additions & 2 deletions crates/net/src/net_sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ impl Handler<EventStoreQueryResponse> for NetSyncManager {
return Ok(());
}
let aggregate_id = fetch_request.aggregate_id();
let events: Vec<EnclaveEvent<Unsequenced>> = msg
.into_events()
let all_events: Vec<_> = msg.into_events();
let events: Vec<EnclaveEvent<Unsequenced>> = all_events
.into_iter()
.filter(|e| e.source() == EventSource::Net)
.take(limit)
Expand Down
21 changes: 18 additions & 3 deletions crates/sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub async fn sync(

// 2. Determine the evm blocks to read from based on the SnapshotMeta
let evm_config = snapshot.to_evm_config();
let _net_config = snapshot.to_net_config();
let snapshot_net_config = snapshot.to_net_config();

// 3. Load EventStore events since the sequence number found in the snapshot into memory.
info!("Loading EventStore events...");
Expand Down Expand Up @@ -96,13 +96,28 @@ pub async fn sync(
"{} historical blockchain events loaded.",
historical_evm_events.len()
);
let net_config = find_net_hlc(&historical_evm_events);
// Build the net sync cursor using snapshot timestamps (the original HLC timestamps
// from before the restart). We cannot use find_net_hlc(&historical_evm_events) because
// re-read EVM events get NEW HLC timestamps from hlc.receive() — the HLC is fresh on
// restart so it uses the current wallclock, producing timestamps that are later than what
// ciphernodes stored. This makes the sync query return 0 events.
//
// We still use find_net_hlc to determine WHICH aggregates need syncing (filtering out
// closed E3s), but replace the timestamps with the original ones from the snapshot.
let open_aggregates = find_net_hlc(&historical_evm_events);
let net_config: BTreeMap<AggregateId, u128> = open_aggregates
.into_iter()
.map(|(id, _)| {
let ts = snapshot_net_config.get(&id).copied().unwrap_or(0);
(id, ts)
})
.collect();

// 6. Load the historical libp2p events to memory
info!("Waiting until NetReady...");
net_ready.await?;
info!("NetReady!");
info!("Loading historical libp2p events...");
// let (addr, rx) = actix_toolbox::oneshot::<HistoricalNetSyncEventsReceived>();
let events_received = bus.wait_for(EventType::HistoricalNetSyncEventsReceived);
bus.publish_without_context(HistoricalNetSyncStart::new(net_config.clone()))?;
let EnclaveEventData::HistoricalNetSyncEventsReceived(event) =
Expand Down
46 changes: 22 additions & 24 deletions crates/utils/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,31 @@ where
loop {
match operation().await {
Ok(value) => return Ok(value),
Err(re) => {
tracing::error!("RETRY FAILED {:?}", re);
match re {
RetryError::Retry(e) => {
if current_attempt >= max_attempts {
return Err(anyhow::anyhow!(
"Operation failed after {} attempts. Last error: {}",
max_attempts,
e
));
}
Err(re) => match re {
RetryError::Retry(e) => {
if current_attempt >= max_attempts {
error!("Operation failed after {} attempts: {}", max_attempts, e);
return Err(anyhow::anyhow!(
"Operation failed after {} attempts. Last error: {}",
max_attempts,
e
));
}

warn!(
"Attempt {}/{} failed, retrying in {}ms: {}",
current_attempt, max_attempts, delay_ms, e
);
warn!(
"Attempt {}/{} failed, retrying in {}ms: {}",
current_attempt, max_attempts, delay_ms, e
);

sleep(Duration::from_millis(delay_ms)).await;
current_attempt += 1;
delay_ms *= 2; // Exponential backoff
}
RetryError::Failure(e) => {
error!("FAILURE!: returning to caller.");
return Err(e);
}
sleep(Duration::from_millis(delay_ms)).await;
current_attempt += 1;
delay_ms *= 2; // Exponential backoff
}
RetryError::Failure(e) => {
error!("Non-retryable failure: {}", e);
return Err(e);
}
}
},
}
}
}
2 changes: 1 addition & 1 deletion tests/integration/persist.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pnpm committee:new \
--input-window-end "$INPUT_WINDOW_END" \
--e3-params "$ENCODED_PARAMS" \
--committee-size 0 \
--proof-aggregation-enabled false
--proof-aggregation-enabled true

waiton "$SCRIPT_DIR/output/pubkey.bin"

Expand Down
Loading