From 44fc7473847df86f42afc14d798f7e698b084936 Mon Sep 17 00:00:00 2001 From: ctrlc03 <93448202+ctrlc03@users.noreply.github.com> Date: Sat, 21 Mar 2026 10:22:16 +0000 Subject: [PATCH 1/9] chore: run persist tests with aggregation enabled --- tests/integration/persist.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/persist.sh b/tests/integration/persist.sh index 3e0f96a1d7..f75326601d 100755 --- a/tests/integration/persist.sh +++ b/tests/integration/persist.sh @@ -69,7 +69,7 @@ pnpm committee:new \ --input-window-end "$INPUT_WINDOW_END" \ --e3-params "$ENCODED_PARAMS" \ --committee-size 0 \ - --proof-aggregation-enabled false + --proof-aggregation-enabled true waiton "$SCRIPT_DIR/output/pubkey.bin" From 7d077d65c9d85d8366902d3e7bd2eac8409a6b7e Mon Sep 17 00:00:00 2001 From: ctrlc03 <93448202+ctrlc03@users.noreply.github.com> Date: Sat, 21 Mar 2026 10:28:10 +0000 Subject: [PATCH 2/9] fix: remove skip_serializing_if from bincode-persisted ProofFoldState --- crates/aggregator/src/proof_fold.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/aggregator/src/proof_fold.rs b/crates/aggregator/src/proof_fold.rs index 90a469235e..5a10858404 100644 --- a/crates/aggregator/src/proof_fold.rs +++ b/crates/aggregator/src/proof_fold.rs @@ -24,12 +24,10 @@ pub struct ProofFoldState { accumulated: Option, remaining: Vec, /// Total fold steps (for progress logging). Set when fold starts. - #[serde(default, skip_serializing_if = "Option::is_none")] total_steps: Option, /// Set when all fold steps have completed. pub result: Option, /// `start` was called with zero proofs — folding is complete with no aggregate. - #[serde(default)] pub fold_input_was_empty: bool, } From ae757d1d0c6ff3f7c0dc738a44fab3341df6d8bd Mon Sep 17 00:00:00 2001 From: ctrlc03 <93448202+ctrlc03@users.noreply.github.com> Date: Sat, 21 Mar 2026 11:06:42 +0000 Subject: [PATCH 3/9] chore: better error matching and retry with no aggregation --- crates/utils/src/retry.rs | 46 +++++++++++++++++------------------- tests/integration/persist.sh | 2 +- 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/crates/utils/src/retry.rs b/crates/utils/src/retry.rs index f8bcd1c046..693a8edab2 100644 --- a/crates/utils/src/retry.rs +++ b/crates/utils/src/retry.rs @@ -46,33 +46,31 @@ where loop { match operation().await { Ok(value) => return Ok(value), - Err(re) => { - tracing::error!("RETRY FAILED {:?}", re); - match re { - RetryError::Retry(e) => { - if current_attempt >= max_attempts { - return Err(anyhow::anyhow!( - "Operation failed after {} attempts. Last error: {}", - max_attempts, - e - )); - } + Err(re) => match re { + RetryError::Retry(e) => { + if current_attempt >= max_attempts { + error!("Operation failed after {} attempts: {}", max_attempts, e); + return Err(anyhow::anyhow!( + "Operation failed after {} attempts. Last error: {}", + max_attempts, + e + )); + } - warn!( - "Attempt {}/{} failed, retrying in {}ms: {}", - current_attempt, max_attempts, delay_ms, e - ); + warn!( + "Attempt {}/{} failed, retrying in {}ms: {}", + current_attempt, max_attempts, delay_ms, e + ); - sleep(Duration::from_millis(delay_ms)).await; - current_attempt += 1; - delay_ms *= 2; // Exponential backoff - } - RetryError::Failure(e) => { - error!("FAILURE!: returning to caller."); - return Err(e); - } + sleep(Duration::from_millis(delay_ms)).await; + current_attempt += 1; + delay_ms *= 2; // Exponential backoff + } + RetryError::Failure(e) => { + error!("Non-retryable failure: {}", e); + return Err(e); } - } + }, } } } diff --git a/tests/integration/persist.sh b/tests/integration/persist.sh index f75326601d..3e0f96a1d7 100755 --- a/tests/integration/persist.sh +++ b/tests/integration/persist.sh @@ -69,7 +69,7 @@ pnpm committee:new \ --input-window-end "$INPUT_WINDOW_END" \ --e3-params "$ENCODED_PARAMS" \ --committee-size 0 \ - --proof-aggregation-enabled true + --proof-aggregation-enabled false waiton "$SCRIPT_DIR/output/pubkey.bin" From 916132e41049272b9cab7151f07b561fb1dd34cc Mon Sep 17 00:00:00 2001 From: ctrlc03 <93448202+ctrlc03@users.noreply.github.com> Date: Sat, 21 Mar 2026 12:10:01 +0000 Subject: [PATCH 4/9] test: run aggregation with more logging --- .github/workflows/ci.yml | 36 ++++++++++++++++++++---------- crates/events/src/eventstore.rs | 10 ++++++++- crates/net/src/net_sync_manager.rs | 10 +++++++-- tests/integration/persist.sh | 2 +- 4 files changed, 42 insertions(+), 16 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ead13f3975..2934df9009 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -119,7 +119,8 @@ jobs: rust_unit_tests: needs: [detect_changes] - if: needs.detect_changes.outputs.rust_unit_tests == 'true' + # if: needs.detect_changes.outputs.rust_unit_tests == 'true' + if: false timeout-minutes: 20 runs-on: 'ubuntu-latest' permissions: @@ -173,7 +174,8 @@ jobs: rust_integration_tests: needs: [detect_changes] - if: needs.detect_changes.outputs.rust_integration_tests == 'true' + # if: needs.detect_changes.outputs.rust_integration_tests == 'true' + if: false timeout-minutes: 45 runs-on: group: enclave-ci @@ -216,7 +218,8 @@ jobs: zk_prover_integration: needs: [detect_changes] - if: needs.detect_changes.outputs.zk_prover_integration == 'true' + # if: needs.detect_changes.outputs.zk_prover_integration == 'true' + if: false timeout-minutes: 30 runs-on: 'ubuntu-latest' steps: @@ -300,7 +303,8 @@ jobs: build_ciphernode_image: needs: [detect_changes] - if: needs.detect_changes.outputs.docker_ciphernode == 'true' + # if: needs.detect_changes.outputs.docker_ciphernode == 'true' + if: false timeout-minutes: 30 runs-on: 'ubuntu-latest' steps: @@ -343,7 +347,8 @@ jobs: test_contracts: needs: [detect_changes] - if: needs.detect_changes.outputs.contracts == 'true' + # if: needs.detect_changes.outputs.contracts == 'true' + if: false timeout-minutes: 15 runs-on: 'ubuntu-latest' permissions: @@ -390,7 +395,8 @@ jobs: test_net: needs: [detect_changes] - if: needs.detect_changes.outputs.net == 'true' + # if: needs.detect_changes.outputs.net == 'true' + if: false timeout-minutes: 15 runs-on: 'ubuntu-latest' steps: @@ -578,7 +584,8 @@ jobs: crisp_unit: needs: [detect_changes, build_crisp_sdk] - if: needs.detect_changes.outputs.crisp == 'true' + # if: needs.detect_changes.outputs.crisp == 'true' + if: false timeout-minutes: 30 runs-on: 'ubuntu-latest' steps: @@ -768,7 +775,8 @@ jobs: build_circuits: needs: [detect_changes] - if: needs.detect_changes.outputs.build_circuits == 'true' + # if: needs.detect_changes.outputs.build_circuits == 'true' + if: false timeout-minutes: 30 runs-on: 'ubuntu-latest' permissions: @@ -927,7 +935,8 @@ jobs: build_e3_support_dev: needs: [detect_changes] - if: needs.detect_changes.outputs.build_e3_support_dev == 'true' + # if: needs.detect_changes.outputs.build_e3_support_dev == 'true' + if: false timeout-minutes: 20 runs-on: 'ubuntu-latest' steps: @@ -1018,7 +1027,8 @@ jobs: build_crisp_sdk: needs: [detect_changes] - if: needs.detect_changes.outputs.crisp == 'true' + # if: needs.detect_changes.outputs.crisp == 'true' + if: false timeout-minutes: 20 runs-on: 'ubuntu-latest' steps: @@ -1068,7 +1078,8 @@ jobs: template_integration: needs: [detect_changes, build_enclave_cli, build_e3_support_dev, build_sdk] - if: needs.detect_changes.outputs.templates == 'true' + # if: needs.detect_changes.outputs.templates == 'true' + if: false timeout-minutes: 30 runs-on: group: enclave-ci @@ -1144,7 +1155,8 @@ jobs: test_enclave_init: needs: [detect_changes, build_enclave_cli, build_e3_support_dev] - if: needs.detect_changes.outputs.init == 'true' + # if: needs.detect_changes.outputs.init == 'true' + if: false timeout-minutes: 10 runs-on: 'ubuntu-latest' steps: diff --git a/crates/events/src/eventstore.rs b/crates/events/src/eventstore.rs index 93751e3aa5..11ea71715d 100644 --- a/crates/events/src/eventstore.rs +++ b/crates/events/src/eventstore.rs @@ -76,9 +76,17 @@ impl EventStore { limit: Option, ) -> Result>> { let Some(seq) = self.index.seek(query)? else { + tracing::debug!("query_by_ts: no index entry at or after ts={}", query); return Ok(vec![]); }; - Ok(self.collect_events(self.log.read_from(seq), filter, limit)) + let events: Vec<_> = self.log.read_from(seq).collect(); + let total = events.len(); + let result = self.collect_events(Box::new(events.into_iter()), filter, limit); + tracing::debug!( + "query_by_ts: ts={} -> seq={}, {} raw events, {} after filter", + query, seq, total, result.len() + ); + Ok(result) } /// Query events by sequence number. Returns events at or after the given sequence. diff --git a/crates/net/src/net_sync_manager.rs b/crates/net/src/net_sync_manager.rs index 5e43bf5e00..8a8821652d 100644 --- a/crates/net/src/net_sync_manager.rs +++ b/crates/net/src/net_sync_manager.rs @@ -249,13 +249,19 @@ impl Handler for NetSyncManager { return Ok(()); } let aggregate_id = fetch_request.aggregate_id(); - let events: Vec> = msg - .into_events() + let since = fetch_request.since(); + let all_events: Vec<_> = msg.into_events(); + let total_before_filter = all_events.len(); + let events: Vec> = all_events .into_iter() .filter(|e| e.source() == EventSource::Net) .take(limit) .map(|ev| ev.clone_unsequenced()) .collect(); + info!( + "Sync response for aggregate={}: {} total events from store (since={}), {} after source=Net filter", + aggregate_id, total_before_filter, since, events.len() + ); let next = if events.len() == limit { let last_event_ts = events.last().map(|e| e.ts()).unwrap_or(0); diff --git a/tests/integration/persist.sh b/tests/integration/persist.sh index 3e0f96a1d7..f75326601d 100755 --- a/tests/integration/persist.sh +++ b/tests/integration/persist.sh @@ -69,7 +69,7 @@ pnpm committee:new \ --input-window-end "$INPUT_WINDOW_END" \ --e3-params "$ENCODED_PARAMS" \ --committee-size 0 \ - --proof-aggregation-enabled false + --proof-aggregation-enabled true waiton "$SCRIPT_DIR/output/pubkey.bin" From a6a4ec1a3e0bb9976cd9a0bba244238af2c94355 Mon Sep 17 00:00:00 2001 From: ctrlc03 <93448202+ctrlc03@users.noreply.github.com> Date: Sat, 21 Mar 2026 12:51:40 +0000 Subject: [PATCH 5/9] chore: build cli in larger runner and mroe logging --- .github/workflows/ci.yml | 7 +++++-- crates/events/src/eventstore_router.rs | 8 ++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2934df9009..b6e53aa716 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -543,7 +543,9 @@ jobs: needs: [detect_changes] if: needs.detect_changes.outputs.build_enclave_cli == 'true' timeout-minutes: 20 - runs-on: 'ubuntu-latest' + runs-on: + group: enclave-ci + labels: [enclave-ci-runner] permissions: contents: read actions: write @@ -660,7 +662,8 @@ jobs: crisp_e2e: needs: [detect_changes, build_enclave_cli, build_crisp_sdk] - if: needs.detect_changes.outputs.crisp == 'true' + # if: needs.detect_changes.outputs.crisp == 'true' + if: false timeout-minutes: 45 runs-on: group: enclave-ci diff --git a/crates/events/src/eventstore_router.rs b/crates/events/src/eventstore_router.rs index 83253746f3..19c9ecf85d 100644 --- a/crates/events/src/eventstore_router.rs +++ b/crates/events/src/eventstore_router.rs @@ -101,6 +101,14 @@ impl EventStoreRouter { pub fn handle_store_event_requested(&mut self, msg: StoreEventRequested) { debug!("Handling store event requested...."); let aggregate_id = msg.event.aggregate_id(); + let has_store = self.stores.contains_key(&aggregate_id); + if !has_store { + tracing::warn!( + "EventStoreRouter: no store for aggregate={}, falling back to 0 (source={:?})", + aggregate_id, + msg.event.source() + ); + } let store_addr = self.stores.get(&aggregate_id).unwrap_or_else(|| { self.stores .get(&AggregateId::new(0)) From 53848830f417811da0c78c82b7a77f657d5d35d1 Mon Sep 17 00:00:00 2001 From: ctrlc03 <93448202+ctrlc03@users.noreply.github.com> Date: Sat, 21 Mar 2026 13:22:27 +0000 Subject: [PATCH 6/9] chore: more logging --- crates/events/src/eventstore.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/events/src/eventstore.rs b/crates/events/src/eventstore.rs index 11ea71715d..e02f0fe33d 100644 --- a/crates/events/src/eventstore.rs +++ b/crates/events/src/eventstore.rs @@ -42,6 +42,12 @@ impl EventStore { } let seq = self.log.append(&event)?; self.index.insert(ts, seq)?; + if event.source() == crate::EventSource::Net { + tracing::info!( + "EventStore: stored Net event ts={} seq={} aggregate={}", + ts, seq, event.aggregate_id() + ); + } Ok(Some(event.into_sequenced(seq))) } From 6821a49ca427d5c5f6b7b3d5461300859c6a8fe9 Mon Sep 17 00:00:00 2001 From: ctrlc03 <93448202+ctrlc03@users.noreply.github.com> Date: Sat, 21 Mar 2026 14:47:36 +0000 Subject: [PATCH 7/9] fix: correct timestamps on sync --- crates/events/src/eventstore.rs | 13 ++++++++---- crates/events/src/eventstore_router.rs | 19 ++++++++++++----- crates/sync/src/sync.rs | 28 +++++++++++++++++++++++--- 3 files changed, 48 insertions(+), 12 deletions(-) diff --git a/crates/events/src/eventstore.rs b/crates/events/src/eventstore.rs index e02f0fe33d..ab611a4798 100644 --- a/crates/events/src/eventstore.rs +++ b/crates/events/src/eventstore.rs @@ -45,7 +45,9 @@ impl EventStore { if event.source() == crate::EventSource::Net { tracing::info!( "EventStore: stored Net event ts={} seq={} aggregate={}", - ts, seq, event.aggregate_id() + ts, + seq, + event.aggregate_id() ); } Ok(Some(event.into_sequenced(seq))) @@ -82,15 +84,18 @@ impl EventStore { limit: Option, ) -> Result>> { let Some(seq) = self.index.seek(query)? else { - tracing::debug!("query_by_ts: no index entry at or after ts={}", query); + tracing::info!("query_by_ts: no index entry at or after ts={}", query); return Ok(vec![]); }; let events: Vec<_> = self.log.read_from(seq).collect(); let total = events.len(); let result = self.collect_events(Box::new(events.into_iter()), filter, limit); - tracing::debug!( + tracing::info!( "query_by_ts: ts={} -> seq={}, {} raw events, {} after filter", - query, seq, total, result.len() + query, + seq, + total, + result.len() ); Ok(result) } diff --git a/crates/events/src/eventstore_router.rs b/crates/events/src/eventstore_router.rs index 19c9ecf85d..09b4b8b71b 100644 --- a/crates/events/src/eventstore_router.rs +++ b/crates/events/src/eventstore_router.rs @@ -131,17 +131,23 @@ impl EventStoreRouter { let filter = msg.filter().cloned(); let sender = msg.sender(); + let available_stores: Vec<_> = self.stores.keys().collect(); let sub_queries: Vec<_> = query .into_iter() .filter_map(|(aggregate_id, ts)| { - self.stores - .get(&aggregate_id) - .map(|store_addr| (aggregate_id, ts, CorrelationId::new(), store_addr.clone())) + let store = self.stores.get(&aggregate_id); + if store.is_none() { + warn!( + "EventStoreRouter: no store for aggregate={} during TS query (available: {:?})", + aggregate_id, available_stores + ); + } + store.map(|store_addr| (aggregate_id, ts, CorrelationId::new(), store_addr.clone())) }) .collect(); if sub_queries.is_empty() { - debug!("No valid stores to query, sending empty response immediately"); + info!("EventStoreRouter: no valid stores to query, sending empty response immediately"); let response = EventStoreQueryResponse::new(parent_id, Vec::new()); sender.do_send(response); return Ok(()); @@ -157,7 +163,10 @@ impl EventStoreRouter { let get_events_msg = EventStoreQueryBy::::new(sub_query_id, ts, aggregator_addr.clone().recipient()) .with_options(limit, filter.clone()); - debug!("Sending query for aggregate {:?}", aggregate_id); + info!( + "EventStoreRouter: sending TS query for aggregate={} ts={}", + aggregate_id, ts + ); store_addr.do_send(get_events_msg); } diff --git a/crates/sync/src/sync.rs b/crates/sync/src/sync.rs index e976039a60..71de892740 100644 --- a/crates/sync/src/sync.rs +++ b/crates/sync/src/sync.rs @@ -55,7 +55,7 @@ pub async fn sync( // 2. Determine the evm blocks to read from based on the SnapshotMeta let evm_config = snapshot.to_evm_config(); - let _net_config = snapshot.to_net_config(); + let snapshot_net_config = snapshot.to_net_config(); // 3. Load EventStore events since the sequence number found in the snapshot into memory. info!("Loading EventStore events..."); @@ -96,13 +96,35 @@ pub async fn sync( "{} historical blockchain events loaded.", historical_evm_events.len() ); - let net_config = find_net_hlc(&historical_evm_events); + // Build the net sync cursor using snapshot timestamps (the original HLC timestamps + // from before the restart). We cannot use find_net_hlc(&historical_evm_events) because + // re-read EVM events get NEW HLC timestamps from hlc.receive() — the HLC is fresh on + // restart so it uses the current wallclock, producing timestamps that are later than what + // ciphernodes stored. This makes the sync query return 0 events. + // + // We still use find_net_hlc to determine WHICH aggregates need syncing (filtering out + // closed E3s), but replace the timestamps with the original ones from the snapshot. + let open_aggregates = find_net_hlc(&historical_evm_events); + let net_config: BTreeMap = open_aggregates + .into_iter() + .map(|(id, _)| { + let ts = snapshot_net_config.get(&id).copied().unwrap_or(0); + (id, ts) + }) + .collect(); + info!( + "Net sync config: {:?}", + net_config + .iter() + .map(|(id, ts)| format!("{}={}", id, ts)) + .collect::>() + ); + // 6. Load the historical libp2p events to memory info!("Waiting until NetReady..."); net_ready.await?; info!("NetReady!"); info!("Loading historical libp2p events..."); - // let (addr, rx) = actix_toolbox::oneshot::(); let events_received = bus.wait_for(EventType::HistoricalNetSyncEventsReceived); bus.publish_without_context(HistoricalNetSyncStart::new(net_config.clone()))?; let EnclaveEventData::HistoricalNetSyncEventsReceived(event) = From 9121f1ec39c835570bd4ef06eb66e5773accf268 Mon Sep 17 00:00:00 2001 From: ctrlc03 <93448202+ctrlc03@users.noreply.github.com> Date: Sat, 21 Mar 2026 15:21:42 +0000 Subject: [PATCH 8/9] chore: cleanup --- .github/workflows/ci.yml | 39 +++++++++----------------- crates/events/src/eventstore.rs | 17 ----------- crates/events/src/eventstore_router.rs | 27 ++++-------------- crates/net/src/net_sync_manager.rs | 6 ---- crates/sync/src/sync.rs | 7 ----- 5 files changed, 18 insertions(+), 78 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b6e53aa716..32fb8710d0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -119,8 +119,7 @@ jobs: rust_unit_tests: needs: [detect_changes] - # if: needs.detect_changes.outputs.rust_unit_tests == 'true' - if: false + if: needs.detect_changes.outputs.rust_unit_tests == 'true' timeout-minutes: 20 runs-on: 'ubuntu-latest' permissions: @@ -174,8 +173,7 @@ jobs: rust_integration_tests: needs: [detect_changes] - # if: needs.detect_changes.outputs.rust_integration_tests == 'true' - if: false + if: needs.detect_changes.outputs.rust_integration_tests == 'true' timeout-minutes: 45 runs-on: group: enclave-ci @@ -218,8 +216,7 @@ jobs: zk_prover_integration: needs: [detect_changes] - # if: needs.detect_changes.outputs.zk_prover_integration == 'true' - if: false + if: needs.detect_changes.outputs.zk_prover_integration == 'true' timeout-minutes: 30 runs-on: 'ubuntu-latest' steps: @@ -303,8 +300,7 @@ jobs: build_ciphernode_image: needs: [detect_changes] - # if: needs.detect_changes.outputs.docker_ciphernode == 'true' - if: false + if: needs.detect_changes.outputs.docker_ciphernode == 'true' timeout-minutes: 30 runs-on: 'ubuntu-latest' steps: @@ -347,8 +343,7 @@ jobs: test_contracts: needs: [detect_changes] - # if: needs.detect_changes.outputs.contracts == 'true' - if: false + if: needs.detect_changes.outputs.contracts == 'true' timeout-minutes: 15 runs-on: 'ubuntu-latest' permissions: @@ -395,8 +390,7 @@ jobs: test_net: needs: [detect_changes] - # if: needs.detect_changes.outputs.net == 'true' - if: false + if: needs.detect_changes.outputs.net == 'true' timeout-minutes: 15 runs-on: 'ubuntu-latest' steps: @@ -586,8 +580,7 @@ jobs: crisp_unit: needs: [detect_changes, build_crisp_sdk] - # if: needs.detect_changes.outputs.crisp == 'true' - if: false + if: needs.detect_changes.outputs.crisp == 'true' timeout-minutes: 30 runs-on: 'ubuntu-latest' steps: @@ -662,8 +655,7 @@ jobs: crisp_e2e: needs: [detect_changes, build_enclave_cli, build_crisp_sdk] - # if: needs.detect_changes.outputs.crisp == 'true' - if: false + if: needs.detect_changes.outputs.crisp == 'true' timeout-minutes: 45 runs-on: group: enclave-ci @@ -778,8 +770,7 @@ jobs: build_circuits: needs: [detect_changes] - # if: needs.detect_changes.outputs.build_circuits == 'true' - if: false + if: needs.detect_changes.outputs.build_circuits == 'true' timeout-minutes: 30 runs-on: 'ubuntu-latest' permissions: @@ -938,8 +929,7 @@ jobs: build_e3_support_dev: needs: [detect_changes] - # if: needs.detect_changes.outputs.build_e3_support_dev == 'true' - if: false + if: needs.detect_changes.outputs.build_e3_support_dev == 'true' timeout-minutes: 20 runs-on: 'ubuntu-latest' steps: @@ -1030,8 +1020,7 @@ jobs: build_crisp_sdk: needs: [detect_changes] - # if: needs.detect_changes.outputs.crisp == 'true' - if: false + if: needs.detect_changes.outputs.crisp == 'true' timeout-minutes: 20 runs-on: 'ubuntu-latest' steps: @@ -1081,8 +1070,7 @@ jobs: template_integration: needs: [detect_changes, build_enclave_cli, build_e3_support_dev, build_sdk] - # if: needs.detect_changes.outputs.templates == 'true' - if: false + if: needs.detect_changes.outputs.templates == 'true' timeout-minutes: 30 runs-on: group: enclave-ci @@ -1158,8 +1146,7 @@ jobs: test_enclave_init: needs: [detect_changes, build_enclave_cli, build_e3_support_dev] - # if: needs.detect_changes.outputs.init == 'true' - if: false + if: needs.detect_changes.outputs.init == 'true' timeout-minutes: 10 runs-on: 'ubuntu-latest' steps: diff --git a/crates/events/src/eventstore.rs b/crates/events/src/eventstore.rs index ab611a4798..b4c999d3a9 100644 --- a/crates/events/src/eventstore.rs +++ b/crates/events/src/eventstore.rs @@ -42,14 +42,6 @@ impl EventStore { } let seq = self.log.append(&event)?; self.index.insert(ts, seq)?; - if event.source() == crate::EventSource::Net { - tracing::info!( - "EventStore: stored Net event ts={} seq={} aggregate={}", - ts, - seq, - event.aggregate_id() - ); - } Ok(Some(event.into_sequenced(seq))) } @@ -84,19 +76,10 @@ impl EventStore { limit: Option, ) -> Result>> { let Some(seq) = self.index.seek(query)? else { - tracing::info!("query_by_ts: no index entry at or after ts={}", query); return Ok(vec![]); }; let events: Vec<_> = self.log.read_from(seq).collect(); - let total = events.len(); let result = self.collect_events(Box::new(events.into_iter()), filter, limit); - tracing::info!( - "query_by_ts: ts={} -> seq={}, {} raw events, {} after filter", - query, - seq, - total, - result.len() - ); Ok(result) } diff --git a/crates/events/src/eventstore_router.rs b/crates/events/src/eventstore_router.rs index 09b4b8b71b..83253746f3 100644 --- a/crates/events/src/eventstore_router.rs +++ b/crates/events/src/eventstore_router.rs @@ -101,14 +101,6 @@ impl EventStoreRouter { pub fn handle_store_event_requested(&mut self, msg: StoreEventRequested) { debug!("Handling store event requested...."); let aggregate_id = msg.event.aggregate_id(); - let has_store = self.stores.contains_key(&aggregate_id); - if !has_store { - tracing::warn!( - "EventStoreRouter: no store for aggregate={}, falling back to 0 (source={:?})", - aggregate_id, - msg.event.source() - ); - } let store_addr = self.stores.get(&aggregate_id).unwrap_or_else(|| { self.stores .get(&AggregateId::new(0)) @@ -131,23 +123,17 @@ impl EventStoreRouter { let filter = msg.filter().cloned(); let sender = msg.sender(); - let available_stores: Vec<_> = self.stores.keys().collect(); let sub_queries: Vec<_> = query .into_iter() .filter_map(|(aggregate_id, ts)| { - let store = self.stores.get(&aggregate_id); - if store.is_none() { - warn!( - "EventStoreRouter: no store for aggregate={} during TS query (available: {:?})", - aggregate_id, available_stores - ); - } - store.map(|store_addr| (aggregate_id, ts, CorrelationId::new(), store_addr.clone())) + self.stores + .get(&aggregate_id) + .map(|store_addr| (aggregate_id, ts, CorrelationId::new(), store_addr.clone())) }) .collect(); if sub_queries.is_empty() { - info!("EventStoreRouter: no valid stores to query, sending empty response immediately"); + debug!("No valid stores to query, sending empty response immediately"); let response = EventStoreQueryResponse::new(parent_id, Vec::new()); sender.do_send(response); return Ok(()); @@ -163,10 +149,7 @@ impl EventStoreRouter { let get_events_msg = EventStoreQueryBy::::new(sub_query_id, ts, aggregator_addr.clone().recipient()) .with_options(limit, filter.clone()); - info!( - "EventStoreRouter: sending TS query for aggregate={} ts={}", - aggregate_id, ts - ); + debug!("Sending query for aggregate {:?}", aggregate_id); store_addr.do_send(get_events_msg); } diff --git a/crates/net/src/net_sync_manager.rs b/crates/net/src/net_sync_manager.rs index 8a8821652d..5a0f7c35e9 100644 --- a/crates/net/src/net_sync_manager.rs +++ b/crates/net/src/net_sync_manager.rs @@ -249,19 +249,13 @@ impl Handler for NetSyncManager { return Ok(()); } let aggregate_id = fetch_request.aggregate_id(); - let since = fetch_request.since(); let all_events: Vec<_> = msg.into_events(); - let total_before_filter = all_events.len(); let events: Vec> = all_events .into_iter() .filter(|e| e.source() == EventSource::Net) .take(limit) .map(|ev| ev.clone_unsequenced()) .collect(); - info!( - "Sync response for aggregate={}: {} total events from store (since={}), {} after source=Net filter", - aggregate_id, total_before_filter, since, events.len() - ); let next = if events.len() == limit { let last_event_ts = events.last().map(|e| e.ts()).unwrap_or(0); diff --git a/crates/sync/src/sync.rs b/crates/sync/src/sync.rs index 71de892740..18052df76c 100644 --- a/crates/sync/src/sync.rs +++ b/crates/sync/src/sync.rs @@ -112,13 +112,6 @@ pub async fn sync( (id, ts) }) .collect(); - info!( - "Net sync config: {:?}", - net_config - .iter() - .map(|(id, ts)| format!("{}={}", id, ts)) - .collect::>() - ); // 6. Load the historical libp2p events to memory info!("Waiting until NetReady..."); From eea0f2e6bf25df0df5b45d851e5f1338347afdd1 Mon Sep 17 00:00:00 2001 From: ctrlc03 <93448202+ctrlc03@users.noreply.github.com> Date: Sat, 21 Mar 2026 16:14:06 +0000 Subject: [PATCH 9/9] chore: increase max read size --- crates/data/src/commit_log_event_log.rs | 12 +++++++++--- crates/evm-helpers/tests/integration.rs | 20 ++++++++++++-------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/crates/data/src/commit_log_event_log.rs b/crates/data/src/commit_log_event_log.rs index 39fa684e6b..5f74cb8acf 100644 --- a/crates/data/src/commit_log_event_log.rs +++ b/crates/data/src/commit_log_event_log.rs @@ -12,6 +12,9 @@ use e3_events::{EnclaveEvent, EventLog, Unsequenced}; use std::path::PathBuf; use tracing::error; +/// Maximum message size for both reads and writes (32 MB). +const MAX_MESSAGE_BYTES: usize = 32 * 1024 * 1024; + pub struct CommitLogEventLog { log: CommitLog, } @@ -19,8 +22,8 @@ pub struct CommitLogEventLog { impl CommitLogEventLog { pub fn new(path: &PathBuf) -> Result { let mut opts = LogOptions::new(path); - // TODO: drive this from config - currently set high to be permissive - opts.message_max_bytes(32 * 1024 * 1024); + // TODO: derive this from config - currently set high to be permissive + opts.message_max_bytes(MAX_MESSAGE_BYTES); let log = CommitLog::new(opts)?; Ok(Self { log }) } @@ -47,7 +50,10 @@ impl EventLog for CommitLogEventLog { let mut events = Vec::new(); loop { - let message_buf = match self.log.read(current_offset, ReadLimit::default()) { + let message_buf = match self + .log + .read(current_offset, ReadLimit::max_bytes(MAX_MESSAGE_BYTES)) + { Ok(msgs) => msgs, Err(_) => break, }; diff --git a/crates/evm-helpers/tests/integration.rs b/crates/evm-helpers/tests/integration.rs index 746490ea56..b7bdfef192 100644 --- a/crates/evm-helpers/tests/integration.rs +++ b/crates/evm-helpers/tests/integration.rs @@ -130,8 +130,10 @@ async fn test_overlapping_listener_handlers() -> Result<()> { println!("PublishMessage '{}' ({} since sent)", msg, time_diff); let _ = tx.try_send("waiting".to_string()); - // Wait 200ms before publishing to simulate long running handlers - sleep(Duration::from_millis(200)).await; + // Wait long enough to simulate a long-running handler. Must be large + // enough that "two" (sent 100ms later) arrives before this completes, + // even on slow CI runners with ~50ms event delivery latency. + sleep(Duration::from_millis(1000)).await; println!("Sending message: '{msg}'"); let _ = tx.try_send(msg); Ok(()) @@ -155,11 +157,11 @@ async fn test_overlapping_listener_handlers() -> Result<()> { let _ = tokio::spawn(async move { spawn_event_listener.listen().await }); // Events should be returned roughly in this order: - // 0ms : one - // 0ms : waiting - // 100ms : two - // 200ms : three - // 300ms : four + // 0ms : one + // 0ms : waiting + // 100ms : two + // 1000ms : three (after long-running handler completes) + // 1300ms : four let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis(); contract @@ -187,7 +189,9 @@ async fn test_overlapping_listener_handlers() -> Result<()> { .watch() .await?; - sleep(Duration::from_millis(300)).await; + // Wait for the long-running PublishMessage handler (1000ms) to complete + // before sending "four", so "three" arrives before "four". + sleep(Duration::from_millis(1200)).await; let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis(); contract