diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ead13f3975..32fb8710d0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -537,7 +537,9 @@ jobs: needs: [detect_changes] if: needs.detect_changes.outputs.build_enclave_cli == 'true' timeout-minutes: 20 - runs-on: 'ubuntu-latest' + runs-on: + group: enclave-ci + labels: [enclave-ci-runner] permissions: contents: read actions: write @@ -653,7 +655,7 @@ jobs: crisp_e2e: needs: [detect_changes, build_enclave_cli, build_crisp_sdk] - if: needs.detect_changes.outputs.crisp == 'true' + if: needs.detect_changes.outputs.crisp == 'true' timeout-minutes: 45 runs-on: group: enclave-ci diff --git a/crates/aggregator/src/proof_fold.rs b/crates/aggregator/src/proof_fold.rs index 90a469235e..5a10858404 100644 --- a/crates/aggregator/src/proof_fold.rs +++ b/crates/aggregator/src/proof_fold.rs @@ -24,12 +24,10 @@ pub struct ProofFoldState { accumulated: Option, remaining: Vec, /// Total fold steps (for progress logging). Set when fold starts. - #[serde(default, skip_serializing_if = "Option::is_none")] total_steps: Option, /// Set when all fold steps have completed. pub result: Option, /// `start` was called with zero proofs — folding is complete with no aggregate. - #[serde(default)] pub fold_input_was_empty: bool, } diff --git a/crates/data/src/commit_log_event_log.rs b/crates/data/src/commit_log_event_log.rs index 39fa684e6b..5f74cb8acf 100644 --- a/crates/data/src/commit_log_event_log.rs +++ b/crates/data/src/commit_log_event_log.rs @@ -12,6 +12,9 @@ use e3_events::{EnclaveEvent, EventLog, Unsequenced}; use std::path::PathBuf; use tracing::error; +/// Maximum message size for both reads and writes (32 MB). +const MAX_MESSAGE_BYTES: usize = 32 * 1024 * 1024; + pub struct CommitLogEventLog { log: CommitLog, } @@ -19,8 +22,8 @@ pub struct CommitLogEventLog { impl CommitLogEventLog { pub fn new(path: &PathBuf) -> Result { let mut opts = LogOptions::new(path); - // TODO: drive this from config - currently set high to be permissive - opts.message_max_bytes(32 * 1024 * 1024); + // TODO: derive this from config - currently set high to be permissive + opts.message_max_bytes(MAX_MESSAGE_BYTES); let log = CommitLog::new(opts)?; Ok(Self { log }) } @@ -47,7 +50,10 @@ impl EventLog for CommitLogEventLog { let mut events = Vec::new(); loop { - let message_buf = match self.log.read(current_offset, ReadLimit::default()) { + let message_buf = match self + .log + .read(current_offset, ReadLimit::max_bytes(MAX_MESSAGE_BYTES)) + { Ok(msgs) => msgs, Err(_) => break, }; diff --git a/crates/events/src/eventstore.rs b/crates/events/src/eventstore.rs index 93751e3aa5..b4c999d3a9 100644 --- a/crates/events/src/eventstore.rs +++ b/crates/events/src/eventstore.rs @@ -78,7 +78,9 @@ impl EventStore { let Some(seq) = self.index.seek(query)? else { return Ok(vec![]); }; - Ok(self.collect_events(self.log.read_from(seq), filter, limit)) + let events: Vec<_> = self.log.read_from(seq).collect(); + let result = self.collect_events(Box::new(events.into_iter()), filter, limit); + Ok(result) } /// Query events by sequence number. Returns events at or after the given sequence. diff --git a/crates/evm-helpers/tests/integration.rs b/crates/evm-helpers/tests/integration.rs index 746490ea56..b7bdfef192 100644 --- a/crates/evm-helpers/tests/integration.rs +++ b/crates/evm-helpers/tests/integration.rs @@ -130,8 +130,10 @@ async fn test_overlapping_listener_handlers() -> Result<()> { println!("PublishMessage '{}' ({} since sent)", msg, time_diff); let _ = tx.try_send("waiting".to_string()); - // Wait 200ms before publishing to simulate long running handlers - sleep(Duration::from_millis(200)).await; + // Wait long enough to simulate a long-running handler. Must be large + // enough that "two" (sent 100ms later) arrives before this completes, + // even on slow CI runners with ~50ms event delivery latency. + sleep(Duration::from_millis(1000)).await; println!("Sending message: '{msg}'"); let _ = tx.try_send(msg); Ok(()) @@ -155,11 +157,11 @@ async fn test_overlapping_listener_handlers() -> Result<()> { let _ = tokio::spawn(async move { spawn_event_listener.listen().await }); // Events should be returned roughly in this order: - // 0ms : one - // 0ms : waiting - // 100ms : two - // 200ms : three - // 300ms : four + // 0ms : one + // 0ms : waiting + // 100ms : two + // 1000ms : three (after long-running handler completes) + // 1300ms : four let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis(); contract @@ -187,7 +189,9 @@ async fn test_overlapping_listener_handlers() -> Result<()> { .watch() .await?; - sleep(Duration::from_millis(300)).await; + // Wait for the long-running PublishMessage handler (1000ms) to complete + // before sending "four", so "three" arrives before "four". + sleep(Duration::from_millis(1200)).await; let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis(); contract diff --git a/crates/net/src/net_sync_manager.rs b/crates/net/src/net_sync_manager.rs index 5e43bf5e00..5a0f7c35e9 100644 --- a/crates/net/src/net_sync_manager.rs +++ b/crates/net/src/net_sync_manager.rs @@ -249,8 +249,8 @@ impl Handler for NetSyncManager { return Ok(()); } let aggregate_id = fetch_request.aggregate_id(); - let events: Vec> = msg - .into_events() + let all_events: Vec<_> = msg.into_events(); + let events: Vec> = all_events .into_iter() .filter(|e| e.source() == EventSource::Net) .take(limit) diff --git a/crates/sync/src/sync.rs b/crates/sync/src/sync.rs index e976039a60..18052df76c 100644 --- a/crates/sync/src/sync.rs +++ b/crates/sync/src/sync.rs @@ -55,7 +55,7 @@ pub async fn sync( // 2. Determine the evm blocks to read from based on the SnapshotMeta let evm_config = snapshot.to_evm_config(); - let _net_config = snapshot.to_net_config(); + let snapshot_net_config = snapshot.to_net_config(); // 3. Load EventStore events since the sequence number found in the snapshot into memory. info!("Loading EventStore events..."); @@ -96,13 +96,28 @@ pub async fn sync( "{} historical blockchain events loaded.", historical_evm_events.len() ); - let net_config = find_net_hlc(&historical_evm_events); + // Build the net sync cursor using snapshot timestamps (the original HLC timestamps + // from before the restart). We cannot use find_net_hlc(&historical_evm_events) because + // re-read EVM events get NEW HLC timestamps from hlc.receive() — the HLC is fresh on + // restart so it uses the current wallclock, producing timestamps that are later than what + // ciphernodes stored. This makes the sync query return 0 events. + // + // We still use find_net_hlc to determine WHICH aggregates need syncing (filtering out + // closed E3s), but replace the timestamps with the original ones from the snapshot. + let open_aggregates = find_net_hlc(&historical_evm_events); + let net_config: BTreeMap = open_aggregates + .into_iter() + .map(|(id, _)| { + let ts = snapshot_net_config.get(&id).copied().unwrap_or(0); + (id, ts) + }) + .collect(); + // 6. Load the historical libp2p events to memory info!("Waiting until NetReady..."); net_ready.await?; info!("NetReady!"); info!("Loading historical libp2p events..."); - // let (addr, rx) = actix_toolbox::oneshot::(); let events_received = bus.wait_for(EventType::HistoricalNetSyncEventsReceived); bus.publish_without_context(HistoricalNetSyncStart::new(net_config.clone()))?; let EnclaveEventData::HistoricalNetSyncEventsReceived(event) = diff --git a/crates/utils/src/retry.rs b/crates/utils/src/retry.rs index f8bcd1c046..693a8edab2 100644 --- a/crates/utils/src/retry.rs +++ b/crates/utils/src/retry.rs @@ -46,33 +46,31 @@ where loop { match operation().await { Ok(value) => return Ok(value), - Err(re) => { - tracing::error!("RETRY FAILED {:?}", re); - match re { - RetryError::Retry(e) => { - if current_attempt >= max_attempts { - return Err(anyhow::anyhow!( - "Operation failed after {} attempts. Last error: {}", - max_attempts, - e - )); - } + Err(re) => match re { + RetryError::Retry(e) => { + if current_attempt >= max_attempts { + error!("Operation failed after {} attempts: {}", max_attempts, e); + return Err(anyhow::anyhow!( + "Operation failed after {} attempts. Last error: {}", + max_attempts, + e + )); + } - warn!( - "Attempt {}/{} failed, retrying in {}ms: {}", - current_attempt, max_attempts, delay_ms, e - ); + warn!( + "Attempt {}/{} failed, retrying in {}ms: {}", + current_attempt, max_attempts, delay_ms, e + ); - sleep(Duration::from_millis(delay_ms)).await; - current_attempt += 1; - delay_ms *= 2; // Exponential backoff - } - RetryError::Failure(e) => { - error!("FAILURE!: returning to caller."); - return Err(e); - } + sleep(Duration::from_millis(delay_ms)).await; + current_attempt += 1; + delay_ms *= 2; // Exponential backoff + } + RetryError::Failure(e) => { + error!("Non-retryable failure: {}", e); + return Err(e); } - } + }, } } } diff --git a/tests/integration/persist.sh b/tests/integration/persist.sh index 3e0f96a1d7..f75326601d 100755 --- a/tests/integration/persist.sh +++ b/tests/integration/persist.sh @@ -69,7 +69,7 @@ pnpm committee:new \ --input-window-end "$INPUT_WINDOW_END" \ --e3-params "$ENCODED_PARAMS" \ --committee-size 0 \ - --proof-aggregation-enabled false + --proof-aggregation-enabled true waiton "$SCRIPT_DIR/output/pubkey.bin"