Skip to content

feat: sync mode preparation [skip-line-limit]#1153

Merged
ryardley merged 113 commits into
mainfrom
ry/1050-event-ids
Jan 30, 2026
Merged

feat: sync mode preparation [skip-line-limit]#1153
ryardley merged 113 commits into
mainfrom
ry/1050-event-ids

Conversation

@ryardley

@ryardley ryardley commented Jan 8, 2026

Copy link
Copy Markdown
Contributor

part of #1050

This PR

We are part of the way through getting sync together. Much of this is preparatory. The next PR will be Sync implementation.

Event writing system and snapshot:

  • Setup EventContext to attach context to EnclaveEvent. Why? so that we can have provenance between events for state reconstruction. Also we need to have the aggregate_id on the snapshots.
  • Setup TypedEvent<T>. Why? so that we can disambiguate destructured events from the ctx in order to pass the context through to dispatchers.
  • Simplify Persistable by removing unnecessary DataStore and Repository flow. Why? persistable was far more complex than it needed to be and we much better understand the actual reasons we need it now which is to stream state to snapshots.
  • Enable Persistable<T> to be constructed directly off StoreConnector, Why? Using a StoreConnector in order to aid factory functions helps with our API.
  • Setup aggregate based delay buffer so that aggregates are delayed based on age (this will be configured based on chain_id finalization times) Why? eventually we will want to have event sourcing aggregates that correspond to the blockchains that have originated the events we are processing. The reason for this is that if one chain has a reorg we will want to treat these differently. Ideally we set the snapshot delays to be the finalization times of the chain that is represented by the aggregate. This makes the snapshot be able to be delayed by various amounts based on the configuration. We cannot yet change our eventlog retrospectively and it would add a whole lot of complexity to do this but we are ready to handle this.
  • Store sequence number with aggregate in snapshot. Why? so when the inserts are flushed to disk the disk contains the last seq and aggregate_id that the state knew so we can synchronize them
  • Add one commitlog per aggregate_id when sequencing events. Why? so that we hold separate streams of events and can manipulate them separately whilst reconstructing aggregate bound snapshots separately.
  • When requesting events we need to request with a hashmap of agid -> timestamps. Why? because we are using multiple aggregates we need to ask for the timestamps for each aggregate.
  • Make handlers run strictly sequentially to prepare for easily attaching context to publishers (self.notify_sync(ctx, evt)) Why? This is a good practice in general as it will aid in auditing but may help by allowing us to set the context as we handle the EnclaveEvent - we will need to forward context between actors still (see TypedEvent) but this will simplify how often we must do this.

Event system

  • Add net request framework to query nodes for events Why? to prepare for requesting for libp2p events from the network.
  • Investigate what is required for evm refactor for syncing
  • Refactor evm into EvmSystem Why? previously we had one read provider per contract per chain listening for events and had to coordinate between all the providers in order to know when historical events had finished syncing. Now we have one read provider per chain that reads logs and sends them up to be routed and then parsed before being managed by either sending them to the bus or to the sync actor. but why? We had to do some major refactoring to the evm component to organize sync, ensure blockchain events have the correct timestamps on them and ensure there is a clearer distinction between histrical and live event management - I have added to this by setting up a kind of event pipeline that is neat and extensible moving forward. Also this will help with sync to blockchain time #1020
  • Moved the localhost check in the evm reader to a config parsing check. Why? because it is a better design. There are questions about the wisdom of checking this this way as you might run a localhost local node but we can look at this later if it becomes a problem.
  • Get tests pasing as before

Synchronization

  • Add sync scaffold process on startup - This is rough and does basically what was happening before.
  • Create basic test for Sync doing what it is currenty doing - just to check

Next PR

Event writing anf snapshotting

  • force context on all events and snapshots
    • inject context into bus and persistence at the start of all sequential handlers or use TypedEvents for notified handlers.
  • Ensure blockheight watermark is saved on each aggregate write
  • Setup second block based subscription
  • Query snapshot information
  • Delay snapshot information by finalization time

Event system

  • Delay live evm processing by two blocks

Synchronization

  • Fetch initial information for Sync from eventsystem
  • Fetch historical events from evm
  • Fetch historical events from net
  • Sort events by timestamp before publishing them
  • Open request edgecase detect in complete events

General

  • Test a sync in the middle of aggregation events

Summary by CodeRabbit

  • New Features

    • Added network synchronization for coordinating events across peers.
    • Implemented multi-chain EVM event handling with historical and live event support.
    • Added provider caching for efficient blockchain interactions.
  • Refactor

    • Redesigned event architecture to support context tracking and causation chains.
    • Restructured EVM event processing pipeline for improved ordering and reliability.
    • Extended network protocol for peer-to-peer synchronization.

✏️ Tip: You can customize this high-level summary in your review settings.

@vercel

vercel Bot commented Jan 8, 2026

Copy link
Copy Markdown

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Review Updated (UTC)
crisp Ready Ready Preview, Comment Jan 29, 2026 9:17am
enclave-docs Ready Ready Preview, Comment Jan 29, 2026 9:17am

Request Review

@coderabbitai

coderabbitai Bot commented Jan 8, 2026

Copy link
Copy Markdown
Contributor
📝 Walkthrough

Walkthrough

This pull request introduces major infrastructure improvements for event synchronization, EVM integration, and data persistence across the ciphernode system. It creates a new sync crate with a Synchronizer actor, refactors EVM event handling with new components (EvmParser, EvmReadInterface, EvmChainGateway), implements comprehensive event context tracking, adds peer network synchronization, and replaces the Repository pattern with StoreConnector/Persistable abstractions. Significant updates to CiphernodeBuilder, BusHandle, and event storage systems enable multi-chain EVM event ordering and causation tracking.

Changes

Cohort / File(s) Summary
Sync Infrastructure
crates/sync/*, crates/events/src/sync.rs, crates/events/src/event_context.rs
New Synchronizer actor and EvmEventConfig system for coordinating multi-chain EVM event ordering and historical sync completion. Introduces EventContext<S> generic context tracking with causation IDs, origin tracking, and per-aggregate sequencing.
EVM Event Pipeline Refactor
crates/evm/src/{events.rs, evm_parser.rs, evm_read_interface.rs, evm_chain_gateway.rs, evm_hub.rs, evm_router.rs, fix_historical_order.rs, one_shot_runnner.rs, sync_start_extractor.rs}
Complete rewrite of EVM event ingestion replacing EvmEventReader with EvmParserEvmReadInterfaceEvmRouter/EvmHub pipeline. Adds EvmChainGateway for state-based event flow coordination and FixHistoricalOrder for reordering guarantees. Removes 352-line event_reader.rs module.
Event System Core Changes
crates/events/src/{enclave_event/mod.rs, bus_handle.rs, hlc.rs, eventstore_router.rs, events.rs, traits.rs}
Restructures EnclaveEvent from separate fields (id, seq, ts) to unified EventContext<S>. Updates BusHandle with context management and routing. Adds eventstore_router for multi-aggregate event routing. Introduces EventContextAccessors, EventContextSeq, WithAggregateId traits. Adds new sync event types.
New Sync Event Types
crates/events/src/enclave_event/{evm_sync_events_received.rs, net_sync_events_received.rs, outgoing_sync_requested.rs, sync_effect.rs, sync_end.rs, sync_start.rs, typed_event.rs}
Seven new message types for event synchronization flow. SyncStart carries EvmEventConfig; EvmEvent and SyncEvmEvent for EVM-specific routing; TypedEvent<T> for context-carrying payloads.
Data Persistence Overhaul
crates/data/src/{persistable.rs, data_store.rs, events.rs, repository.rs, write_buffer.rs, in_mem_event_log.rs, commit_log_event_log.rs}
Replaces Repository pattern with StoreConnector abstraction. Introduces Persistable<T> with staging/commit support and context management. Adds AggregateConfig and per-aggregate WriteBuffer with expiration delays. Updates Insert struct with optional EventContext. Exposes DataStore accessor methods.
CiphernodeBuilder Enhancement
crates/ciphernode-builder/src/{ciphernode_builder.rs, ciphernode.rs, event_system.rs, provider_caches.rs, evm_system.rs, lib.rs}
Major builder refactoring: adds with_net() for network config, ProviderCache<State> typestate pattern for read/write provider setup, EventStoreRouter for per-aggregate routing, EvmSystemChainBuilder for per-chain EVM setup. CiphernodeHandle gains peer_id and join_handle fields with split() method. EventSystem now supports per-aggregate EventStore maps.
Aggregator & Network Event Handling
crates/aggregator/src/*.rs, crates/keyshare/src/threshold_keyshare.rs, crates/net/src/{events.rs, document_publisher.rs, net_sync_manager.rs, net_interface.rs}
Replace ctx.notify() with notify_sync() mechanism for synchronous dispatch. Add NotifySync trait implementation across aggregators. Introduce network sync protocol with SyncRequestValue, SyncResponseValue, and NetSyncManager for peer synchronization. Update net_interface with CBOR-based sync request/response.
Configuration & Contract Address Handling
crates/config/src/{contract.rs, chain_config.rs, rpc.rs}, crates/cli/src/{print_env.rs, ciphernode/context.rs}
Refactor address handling: split address() into address_str() and address() returning Result<Address>. Add RpcProtocol enum with is_websocket(), is_secure(), is_local() checks. Enhance ChainConfig with finalization_ms, chain_id, and TryFrom<ChainConfig> for EvmEventConfigChain. Update all callsites to use new accessors.
Startup & Entrypoint
crates/entrypoint/src/{helpers/shutdown.rs, start/{start.rs, aggregator_start.rs}}, crates/cli/src/start.rs
Consolidate return values: functions now return unified CiphernodeHandle instead of (BusHandle, JoinHandle<Result<()>>, String). Update listen_for_shutdown() signature to accept CiphernodeHandle and extract components via split(). Builder patterns in startup simplified with integrated net/EVM setup.
Utility Extensions
crates/utils/src/{actix.rs, helpers.rs, path.rs, lib.rs}
Add NotifySync<M> trait with blanket impl for actor message dispatch. Introduce OnceTake<T> for single-use shared value passing. Add enumerate_path() utility for indexed path generation. Update to_ordered_vec() with Ord + Copy constraints.
Sortition & Selection
crates/sortition/src/{sortition.rs, ciphernode_selector.rs}
Remove Sortition actor dependency from CiphernodeSelector. Introduce WithSortitionPartyTicket<T> wrapper to carry party ticket alongside message. Update to use E3RequestComplete instead of E3Requested. Simplify selection logic with direct is_selected() checks.
Test Infrastructure & CI
.husky/pre-commit, Cargo.toml, crates/Dockerfile, crates/evm/tests/integration.rs, crates/test-helpers/src/ciphernode_system.rs, crates/tests/tests/integration.rs, scripts/run-crisp-test.sh, examples/CRISP/server/Dockerfile
Remove pre-commit lint-staged hook. Add crates/sync to workspace. Update Dockerfiles to include sync crate. Refactor EVM integration tests to use actor-based FakeSyncActor and EvmSystemChainBuilder. Update test helpers to use &CiphernodeHandle references. Add cargo build step to CI script.

Estimated Code Review Effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly Related PRs

  • gnosisguild/enclave#1086: Introduces parallel sync crate and synchronizer infrastructure with overlapping EventSystem and CiphernodeBuilder modifications.
  • gnosisguild/enclave#698: Modifies sortition subsystem with changes to public APIs and actor composition patterns.
  • gnosisguild/enclave#1073: Extensive overlapping changes to events subsystem including EnclaveEvent context refactoring, BusHandle restructuring, and event sequencing.

Suggested Labels

ciphernode, events, evm, network, data-persistence

Suggested Reviewers

  • ctrlc03
  • hmzakhalid

Poem

🐰 Behold the grand refactor's might,
Events now dance with context bright,
Sync across the chains we glide,
With TypedEvents as our guide,
Multi-aggregate buffers flow,
Watch the ciphernode's new glow! ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 26.54% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: sync mode preparation' clearly describes the main objective of preparatory work toward implementing Sync mode, which aligns with the PR's core purpose of event context, persistence, and synchronization infrastructure changes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ryardley ryardley changed the title Ry/1050 event ids feat: event context with event provenance Jan 8, 2026
@ryardley

Copy link
Copy Markdown
Contributor Author

Had a chat with folks today about attempting to merge this PR as is and complete the Sync step in a separate PR.

@ryardley

Copy link
Copy Markdown
Contributor Author

I am thinking it would be good to connect any feedback to a list of things to do on #1050 - I can then work through that list in the next PR instead of make changes here.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 15

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
crates/keyshare/src/threshold_keyshare.rs (1)

410-424: Fix move/borrow conflict in response dispatch.

Line 411 borrows msg for the match and then moves msg into handler calls, which will fail borrow checking. Extract or clone the response before dispatch.

🔧 Proposed fix
 pub fn handle_compute_response(&mut self, msg: TypedEvent<ComputeResponse>) -> Result<()> {
-    match &msg.response {
+    let response = msg.response.clone();
+    match response {
         TrBFVResponse::GenEsiSss(_) => self.handle_gen_esi_sss_response(msg),
         TrBFVResponse::GenPkShareAndSkSss(_) => {
             self.handle_gen_pk_share_and_sk_sss_response(msg)
         }
         TrBFVResponse::CalculateDecryptionKey(_) => {
             self.handle_calculate_decryption_key_response(msg)
         }
         TrBFVResponse::CalculateDecryptionShare(_) => {
             self.handle_calculate_decryption_share_response(msg)
         }
         _ => Ok(()),
     }
 }
crates/entrypoint/src/start/start.rs (1)

9-19: Remove unused imports.

The following imports are not used in the function and should be removed:

  • RepositoriesFactory (line 12)
  • BusHandle (line 13)
  • NetEventTranslator, NetRepositoryFactory (line 14)
  • JoinHandle (line 18)
crates/events/src/enclave_event/mod.rs (2)

372-389: Error events should inherit aggregate_id from their cause.

Even when caused_by is present, aggregate_id is forced to 0, which routes error events to the default aggregate. That can break per-aggregate sequencing (and commitlog grouping) for errors tied to a specific aggregate.

🔧 Suggested fix
-        let aggregate_id = AggregateId::new(0); // Error events use default aggregate_id
+        let aggregate_id = caused_by
+            .as_ref()
+            .map(|cause| cause.aggregate_id())
+            .unwrap_or_else(|| AggregateId::new(0)); // Fallback for uncaused errors

405-438: Add ComputeRequest and ComputeRequestError to the get_e3_id match statement.

Both variants carry e3_id information but aren't mapped in get_e3_id, causing them to incorrectly default to aggregate ID 0, which breaks per-aggregate event ordering.

🤖 Fix all issues with AI agents
In `@crates/ciphernode-builder/src/ciphernode_builder.rs`:
- Around line 293-306: create_aggregate_config currently iterates self.chains
and calls provider_cache.ensure_read_provider for every chain, which attempts to
validate providers for disabled chains; change it to only process enabled chains
(the same enabled filter used in setup_evm_system) before calling
ensure_read_provider so disabled chains are skipped, i.e., build chain_providers
from self.chains.iter().filter(|c| c.enabled()) (or the project's equivalent
predicate) and push only those (chain.clone(), provider.chain_id()) entries,
then compute delays and return AggregateConfig as before.

In `@crates/ciphernode-builder/src/ciphernode.rs`:
- Around line 13-16: Doc still claims Clone support for CiphernodeHandle but the
struct no longer derives Clone; either reintroduce Clone for CiphernodeHandle by
adding #[derive(Clone, Debug)] if cloneable behavior is intended, or update the
doc comment above struct CiphernodeHandle to remove the sentence "but they
cannot await the task" about clones and any mention that clones are available
(rephrase to "The handle is sharable across tasks via shared references" or
similar) so the docs reflect the actual implementation.
- Around line 67-69: The split(self) method consumes the Ciphernode and returns
(BusHandle, JoinHandle<Result<()>>), which drops the only Addr references to the
history and errors actors (stopping them); update the API or docs to make this
explicit and/or provide a non-consuming alternative: add a doc comment on
Ciphernode::split explaining that calling split will drop the last actor Addrs
(history, errors) and stop those actors, and if callers need to keep those
actors expose their handles instead (e.g. provide an additional method like
split_preserve or a method that returns the BusHandle plus cloned Addr handles
for history/errors) so callers can retain actor lifetimes; reference the split
function, BusHandle, JoinHandle<Result<()>>, and the history/errors actor Addrs
in the doc or alternative API.

In `@crates/config/src/rpc.rs`:
- Around line 77-90: The host_with_port() helper currently formats
"{hostname}:{port}" which breaks IPv6 addresses (e.g., "::1:8545"); update
host_with_port() to detect IPv6 by checking hostname returned from hostname()
for ':' (and not already wrapped) and wrap it in square brackets before
appending the port so IPv6 becomes "[::1]:8545"; keep using the existing
hostname(), port() accessors and ensure you only add brackets when necessary to
avoid double-bracketing.

In `@crates/data/src/data_store.rs`:
- Around line 122-125: The docstring for scope_bytes is wrong: it says "Get a
clone of the scope bytes" but the function signature pub fn scope_bytes(&self)
-> &[u8] returns a borrow; either update the comment to state it returns a
borrowed slice or change the API to return an owned Vec<u8> by changing the
signature to pub fn scope_bytes(&self) -> Vec<u8> and returning
self.scope.clone(); update any call sites accordingly. Ensure to reference the
scope_bytes method and the self.scope field when making the fix.

In `@crates/events/src/eventstore_router.rs`:
- Around line 31-46: The current handle_store_event_requested uses
unwrap_or_else with expect on self.stores.get(&AggregateId::new(0)) which can
panic if the default EventStore is not configured and also always returns
Ok(()), making Result<()> misleading; fix by validating the invariant in the
event router constructor (e.g., Router::new or the type that builds self.stores)
to ensure self.stores contains AggregateId::new(0) and return a construction
error if missing, or modify handle_store_event_requested to return an Err when
neither the specific aggregate_id nor AggregateId::new(0) exist (replace the
unwrap_or_else/expect with a match that returns Err on missing default), and
update callers/tests as needed to handle the error path instead of
unconditionally returning Ok(()).

In `@crates/evm/src/enclave_sol.rs`:
- Around line 20-34: The attach function can leave the reader actor running if
EnclaveSolWriter::attach fails; after creating the reader via
EnclaveSolReader::setup, ensure you clean it up on writer attach failure by
stopping or terminating the reader before returning the error (or alternatively
start the writer before the reader if writer has no dependency on the reader).
Concretely, update the attach function: call EnclaveSolWriter::attach only after
ensuring writer prerequisites, or if you keep the current order, catch the error
from EnclaveSolWriter::attach and call the reader actor shutdown/stop method on
the Addr returned by EnclaveSolReader::setup (the Addr<EvmParser> named addr)
before propagating the error.

In `@crates/evm/src/evm_chain_gateway.rs`:
- Around line 132-143: In handle_evm_event replace the panic on unexpected
EnclaveEvmEvent variants with returning an error via bail! so the error is
propagated through the function's Result and handled by the existing trap()
wrapper; specifically update the match arm in handle_evm_event (which currently
panics) to call bail! with a descriptive message that includes the unexpected
variant (or its debug display) so process_evm_event and
forward_historical_sync_complete remain unchanged and the caller receives a
proper error instead of crashing.

In `@crates/evm/src/evm_router.rs`:
- Around line 13-69: Docstring mismatch: the EvmRouter comment states non-log
events are dropped but Handler<EnclaveEvmEvent> actually forwards non-log events
to the fallback (set via add_fallback / field fallback). Fix by updating the
top-level doc comment for struct EvmRouter (the /// comment above pub struct
EvmRouter) to state that non-Log events are forwarded to the optional fallback
EvmEventProcessor when present (and dropped only if fallback is None);
alternatively, if you prefer dropping, change the
Handler<EnclaveEvmEvent>::handle branch to not call self.fallback and instead
drop the message—make the change consistently for add_fallback, fallback, and
the handler so behavior and documentation align.

In `@crates/evm/src/lib.rs`:
- Line 20: The module declaration and filename have a typo: change the module
name used in the source from one_shot_runnner to one_shot_runner and rename the
file one_shot_runnner.rs to one_shot_runner.rs; update the module declaration
line (currently "mod one_shot_runnner;") to "mod one_shot_runner;" so the
declaration (mod one_shot_runnner) and the file name match the corrected
spelling.

In `@crates/evm/src/one_shot_runnner.rs`:
- Around line 1-6: The file is misspelled as one_shot_runnner.rs (three "n"s);
rename it to one_shot_runner.rs and update any module declarations, use/imports,
and references to match the corrected module name (e.g., change occurrences of
one_shot_runnner to one_shot_runner in mod declarations, use statements, and
tests), and update any Cargo or workspace references that mention the old
filename so builds and imports continue to resolve correctly.

In `@crates/net/src/net_interface.rs`:
- Around line 608-621: In handle_sync_response, don't ignore errors from
swarm.behaviour_mut().sync.send_response(channel, value): capture the Err(res)
and call warn! with a descriptive message and the error details (e.g.
warn!("failed to send sync response for channel {:?}: {:?}", channel, res) or
similar) so failures are observable; keep the existing try_take() usage and
ensure the warn! call references the channel / response context and the error
returned by send_response.

In `@crates/net/src/net_sync_manager.rs`:
- Around line 61-77: The current forwarder loop uses "while let Ok(event) =
events.recv().await" which silently exits on both RecvError::Lagged and
RecvError::Closed; change it to an explicit loop that matches
events.recv().await and on Ok(event) continues to match NetEvent variants
(NetEvent::OutgoingSyncRequestSucceeded, NetEvent::SyncRequestReceived) and call
addr.do_send(value), on Err(tokio::sync::broadcast::error::RecvError::Lagged(_))
continue (log debug if desired), and on Err(RecvError::Closed) break to stop the
loop; update the block where events is created from rx.resubscribe() and the
async move spawn to use this match-based handling so lagging receivers don't
terminate the forwarder.
- Around line 148-167: The handler for ReceiveEvents in NetSyncManager currently
uses self.requests.get(&msg.id()), leaving entries in the requests map and
causing unbounded growth; change the lookup to remove the entry (use
self.requests.remove(&msg.id())) so the request is deleted once handled, but
still clone the channel for sending (channel.to_owned()) before/after removal as
needed to preserve the OnceTake value, and then proceed to send the
NetCommand::SyncResponse via self.tx.try_send as before.

In `@crates/sync/src/sync.rs`:
- Around line 59-70: handle_sync_end currently sorts only by timestamp via
self.evm_buffer.sort_by_key(|i| i.ts()), which is unstable when timestamps
collide; update the sort key on evm_buffer in handle_sync_end to include
deterministic tie-breaker fields (for example use a tuple of i.ts(),
i.chain_id(), and i.block) so events with identical ts() are consistently
ordered across runs and nodes before draining and publishing.
🧹 Nitpick comments (32)
crates/multithread/src/multithread.rs (1)

36-36: Import added for future sync migration.

This NotifySync import is not currently used within this file. Based on the PR objectives, this appears to be scaffolding for the upcoming sync implementation where ctx.notify() calls may be migrated to notify_sync(). The existing ctx.notify(data.clone()) on line 103 is a likely candidate for this migration.

If this is indeed preparatory, consider completing the migration in this file or removing the import until it's needed to keep the diff minimal.

crates/utils/src/helpers.rs (1)

48-51: Consider handling mutex poisoning for robustness.

The .unwrap() on the mutex lock will panic if the mutex is poisoned (a thread panicked while holding it). While this is a common Rust idiom since poisoning typically indicates a bug, in a sync/distributed system context you may want more graceful handling.

♻️ Optional: Handle poisoned mutex gracefully
     /// Takes the item, returning `None` if already taken.
     pub fn take(&self) -> Option<T> {
-        self.0.lock().unwrap().take()
+        self.0.lock().ok()?.take()
     }

This would return None on poisoning rather than panicking, treating it the same as "already taken". Alternatively, keep the current behavior if panicking on poisoning is preferred.

crates/data/src/write_buffer.rs (2)

26-29: Minor: Typo and non-idiomatic pattern.

Line 26 has a typo: "AggregatId" should be "AggregateId". Also, if let None = ... is non-idiomatic; prefer is_none().

Suggested fix
-        // Always handle AggregatId of 0 with a delay of 0
-        if let None = delays.get(&AggregateId::new(0)) {
+        // Always handle AggregateId of 0 with a delay of 0
+        if delays.get(&AggregateId::new(0)).is_none() {
             delays.insert(AggregateId::new(0), 0);
         }

80-92: Unnecessary clone of owned value.

msg is already owned (moved into the function). The .clone() on line 91 is unnecessary since msg can be consumed directly.

Suggested fix
     fn handle_insert(&mut self, msg: Insert) {
         let aggregate_id = if let Some(event_ctx) = msg.ctx() {
-            event_ctx.aggregate_id().clone()
+            event_ctx.aggregate_id()
         } else {
             AggregateId::new(0)
         };
 
         let agg_buffer = self
             .aggregate_buffers
             .entry(aggregate_id)
             .or_insert_with(|| AggregateBuffer::new());
-        agg_buffer.buffer.push(msg.clone());
+        agg_buffer.buffer.push(msg);
     }
crates/utils/src/path.rs (3)

15-15: Consider accepting &Path instead of &PathBuf for broader compatibility.

Using &Path is more idiomatic in Rust since PathBuf derefs to Path, and this allows callers to pass either type without explicit conversion.

Proposed signature change
-pub fn enumerate_path(path: &PathBuf, index: usize) -> PathBuf {
+pub fn enumerate_path(path: &Path, index: usize) -> PathBuf {

You'll also need to update the import:

-use std::path::PathBuf;
+use std::path::{Path, PathBuf};

16-61: Opportunity to reduce duplication with a helper function.

The filename enumeration logic is duplicated between the parent and no-parent branches (lines 18-33 vs 40-56). Consider extracting a helper.

Refactored implementation
+fn enumerate_filename(file_name: &std::ffi::OsStr, index: usize) -> String {
+    if let Some(file_name_str) = file_name.to_str() {
+        if let Some(dot_pos) = file_name_str.rfind('.') {
+            let (stem, extension) = file_name_str.split_at(dot_pos);
+            format!("{}.{}{}", stem, index, extension)
+        } else {
+            format!("{}.{}", file_name_str, index)
+        }
+    } else {
+        format!("{}.{}", file_name.to_string_lossy(), index)
+    }
+}
+
 pub fn enumerate_path(path: &PathBuf, index: usize) -> PathBuf {
-    if let Some(parent) = path.parent() {
-        if let Some(file_name) = path.file_name() {
-            if let Some(file_name_str) = file_name.to_str() {
-                if let Some(dot_pos) = file_name_str.rfind('.') {
-                    // Has extension
-                    let (stem, extension) = file_name_str.split_at(dot_pos);
-                    let new_name = format!("{}.{}{}", stem, index, extension);
-                    parent.join(new_name)
-                } else {
-                    // No extension
-                    let new_name = format!("{}.{}", file_name_str, index);
-                    parent.join(new_name)
-                }
-            } else {
-                // Invalid UTF-8 in filename, append index directly
-                let new_name = format!("{}.{}", file_name.to_string_lossy(), index);
-                parent.join(new_name)
-            }
-        } else {
-            // Path ends with '/', just append index
-            path.join(format!("{}", index))
-        }
-    } else {
-        // No parent, just modify the filename directly
-        if let Some(file_name) = path.file_name() {
-            if let Some(file_name_str) = file_name.to_str() {
-                if let Some(dot_pos) = file_name_str.rfind('.') {
-                    // Has extension
-                    let (stem, extension) = file_name_str.split_at(dot_pos);
-                    let new_name = format!("{}.{}{}", stem, index, extension);
-                    PathBuf::from(new_name)
-                } else {
-                    // No extension
-                    let new_name = format!("{}.{}", file_name_str, index);
-                    PathBuf::from(new_name)
-                }
-            } else {
-                // Invalid UTF-8 in filename, append index directly
-                let new_name = format!("{}.{}", file_name.to_string_lossy(), index);
-                PathBuf::from(new_name)
-            }
-        } else {
-            // Empty path, just return the index as a path
-            PathBuf::from(format!("{}", index))
-        }
-    }
+    match (path.parent(), path.file_name()) {
+        (Some(parent), Some(file_name)) if !parent.as_os_str().is_empty() => {
+            parent.join(enumerate_filename(file_name, index))
+        }
+        (_, Some(file_name)) => PathBuf::from(enumerate_filename(file_name, index)),
+        _ => PathBuf::from(format!("{}", index)),
+    }
 }

64-95: Tests look good, consider adding trailing slash case.

The test coverage is solid for common cases. The trailing slash handling (line 36) is implemented but untested.

Optional additional test
#[test]
fn test_enumerate_path_trailing_slash() {
    let path = PathBuf::from("/foo/bar/");
    let result = enumerate_path(&path, 3);
    assert_eq!(result, PathBuf::from("/foo/bar/3"));
}
crates/events/src/enclave_event/sync_end.rs (1)

14-20: Consider implementing Default for the unit struct.

Since SyncEnd is a unit struct and new() takes no arguments, implementing Default would be more idiomatic and enable use with ..Default::default() patterns.

♻️ Proposed change
 #[derive(Message, Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
 #[rtype(result = "()")]
-pub struct SyncEnd;
+pub struct SyncEnd;
 
-impl SyncEnd {
-    pub fn new() -> Self {
-        Self {}
-    }
-}
+impl Default for SyncEnd {
+    fn default() -> Self {
+        Self
+    }
+}
+
+impl SyncEnd {
+    pub fn new() -> Self {
+        Self::default()
+    }
+}
crates/events/src/enclave_event/outgoing_sync_requested.rs (1)

15-18: Address TODO: Clarify the EVM sync trigger requirement.

The TODO indicates this event should also trigger EVM sync. Consider documenting the intended behavior or creating a tracking issue for the follow-up work.

Would you like me to open a new issue to track implementing the EVM sync trigger functionality?

crates/config/src/contract.rs (1)

35-41: Minor: Unnecessary .clone() on Copy type.

Option<u64> implements Copy, so the .clone() on line 38 is redundant.

Suggested fix
     pub fn deploy_block(&self) -> Option<u64> {
         use Contract::*;
         match self {
-            Full { deploy_block, .. } => deploy_block.clone(),
+            Full { deploy_block, .. } => *deploy_block,
             AddressOnly(_) => None,
         }
     }
crates/sync/src/sync.rs (1)

138-140: Consider shortening the test settle delay.

A shorter sleep reduces test time while remaining reliable.

♻️ Suggested tweak
 async fn settle() {
-    sleep(Duration::from_millis(100)).await;
+    sleep(Duration::from_millis(1)).await;
 }
Based on learnings: In the Rust test function `test_logs` in `packages/ciphernode/evm/tests/evm_reader.rs`, a sleep duration of 1 millisecond is sufficient for reliable event processing, even in CI environments.
crates/ciphernode-builder/src/event_system.rs (3)

46-50: Consider adding documentation to the new public enum.

EventStoreAddrs is a public enum that callers may need to match against. Brief documentation explaining its purpose and when each variant is used would help consumers.

📝 Suggested documentation
+/// Addresses for per-aggregate EventStores, differentiated by backend type.
 #[derive(Clone)]
 pub enum EventStoreAddrs {
+    /// In-memory event stores keyed by aggregate index.
     InMem(HashMap<usize, Addr<EventStore<InMemSequenceIndex, InMemEventLog>>>),
+    /// Persistent (sled-backed) event stores keyed by aggregate index.
     Persisted(HashMap<usize, Addr<EventStore<SledSequenceIndex, CommitLogEventLog>>>),
 }

266-299: Router instances are not cached—new router created on each call.

in_mem_eventstore_router() and persisted_eventstore_router() create and start a new EventStoreRouter actor on every invocation. This could lead to multiple router actors if called repeatedly. Consider caching the router address similar to how eventstore_addrs is cached.

♻️ Potential caching approach

Add a OnceCell for each router type in EventSystem:

eventstore_router_inmem: OnceCell<Addr<EventStoreRouter<InMemSequenceIndex, InMemEventLog>>>,
eventstore_router_persisted: OnceCell<Addr<EventStoreRouter<SledSequenceIndex, CommitLogEventLog>>>,

Then use get_or_try_init in the router methods to ensure only one router is created per backend.


552-593: Test validates multiple in-memory aggregates but only single persisted aggregate.

The test creates an AggregateConfig with 3 aggregates for in-memory, but then creates a new persisted system with an empty config (AggregateConfig::new(HashMap::new())). This tests different configurations, which is fine, but consider adding a test case that validates multiple persisted aggregates as well to ensure the enumerate_path logic works correctly across multiple sled trees.

Cargo.toml (1)

56-66: Ensure release automation has signing keys available.

With commit/tag signing enabled, any automated release flow will fail unless signing keys are configured in the release environment.

crates/evm/src/evm_parser.rs (1)

66-74: Avoid silent truncation when mapping chain_id into a u32.

If chain_id ever exceeds u32::MAX, the current cast will silently wrap and could cause node collisions.

🔧 Safer conversion
-    let node = chain_id as u32;
+    let node = u32::try_from(chain_id)
+        .expect("chain_id must fit into u32 for HLC node id");
crates/aggregator/src/publickey_aggregator.rs (1)

144-151: Redundant .clone() on &self.bus.

&self.bus.clone() creates a clone and then immediately borrows it. If BusHandle is already Clone and the trap function takes a reference, you can simplify to &self.bus.

Suggested simplification
-        trap(EType::KeyGeneration, &self.bus.clone(), || {
+        trap(EType::KeyGeneration, &self.bus, || {
crates/config/src/chain_config.rs (1)

57-57: Consider adding a brief comment for the min computation.

The [lowest_block, deploy_block].into_iter().flatten().min() pattern is compact but may be non-obvious to future readers. A brief inline comment would improve clarity.

Suggested comment
+            // Track the minimum deploy_block across all contracts
             lowest_block = [lowest_block, deploy_block].into_iter().flatten().min();
crates/evm/src/enclave_sol_reader.rs (1)

140-143: Consider keeping a thin attach wrapper for compatibility.

If downstream code still references EnclaveSolReader::attach, removing it can be a breaking change. Consider reintroducing attach as a simple wrapper around setup (and deprecate later if needed). Based on learnings, the EnclaveSolReader::attach method should be retained even if it directly calls the underlying reader.

♻️ Suggested compatibility shim
 impl EnclaveSolReader {
     pub fn setup(next: &EvmEventProcessor) -> Addr<EvmParser> {
         EvmParser::new(next, extractor).start()
     }
+
+    pub fn attach(next: &EvmEventProcessor) -> Addr<EvmParser> {
+        Self::setup(next)
+    }
 }
crates/events/src/event_context.rs (1)

84-90: Consider borrowing the cause context to avoid moves.

from_cause only reads the cause; taking &EventContext<Sequenced> avoids unnecessary moves and makes call sites more flexible.

♻️ Possible adjustment
 pub fn from_cause(
     id: EventId,
-    cause: EventContext<Sequenced>,
+    cause: &EventContext<Sequenced>,
     ts: u128,
     aggregate_id: AggregateId,
 ) -> Self {
     EventContext::new(id, cause.id(), cause.origin_id(), ts, aggregate_id)
 }
-        let two =
-            EventContext::from_cause(EventId::hash(2), one, 2, AggregateId::new(1)).sequence(2);
+        let two =
+            EventContext::from_cause(EventId::hash(2), &one, 2, AggregateId::new(1)).sequence(2);
 ...
-        let three =
-            EventContext::from_cause(EventId::hash(3), two, 3, AggregateId::new(1)).sequence(3);
+        let three =
+            EventContext::from_cause(EventId::hash(3), &two, 3, AggregateId::new(1)).sequence(3);

Also applies to: 154-160

crates/evm/src/evm_hub.rs (2)

30-39: Avoid cloning the processor list on every event.
Cloning self.nexts allocates per message; iterating by reference keeps the same behavior with less churn.

Suggested change
-        for next in self.nexts.clone() {
-            next.do_send(msg.clone());
-        }
+        for next in &self.nexts {
+            next.do_send(msg.clone());
+        }

57-85: Trim the test sleep to keep CI fast.
10ms is likely more than needed here.

Suggested change
-        sleep(Duration::from_millis(10)).await;
+        sleep(Duration::from_millis(1)).await;

Based on learnings: In the Rust test function test_logs in packages/ciphernode/evm/tests/evm_reader.rs, a sleep duration of 1 millisecond is sufficient for reliable event processing, even in CI environments.

crates/evm/src/evm_router.rs (1)

101-138: Trim the test sleep to keep CI fast.
10ms is likely more than needed for this mailbox flush.

Suggested change
-        sleep(Duration::from_millis(10)).await;
+        sleep(Duration::from_millis(1)).await;

Based on learnings: In the Rust test function test_logs in packages/ciphernode/evm/tests/evm_reader.rs, a sleep duration of 1 millisecond is sufficient for reliable event processing, even in CI environments.

crates/events/src/sync.rs (1)

12-29: Prefer a single ChainId alias to avoid API confusion.
Having both Chainid and ChainId in the public surface is easy to misread.

Suggested change
-type Chainid = u64;
+type ChainId = u64;
@@
-type ChainId = u64;
 type DeployBlock = u64;
crates/events/src/enclave_event/sync_start.rs (1)

44-46: Consider documenting or reordering the split() return tuple.

The split() method returns (data, ts, block) but the struct fields are ordered data, block, chain_id, ts, id. This mismatch could lead to confusion at call sites. Consider either:

  1. Adding a doc comment clarifying the return order, or
  2. Returning a named struct instead of a tuple for clarity
crates/ciphernode-builder/src/evm_system.rs (1)

50-78: Consider using std::mem::take instead of replace.

std::mem::take(&mut self.route_factories) is more idiomatic than replace(&mut self.route_factories, Vec::new()) for taking ownership while leaving an empty default.

Also note that build() is not idempotent—calling it twice will result in an empty route_factories on the second call, silently producing a router with no routes. Consider either consuming self or documenting this single-use behavior.

♻️ Suggested improvement
-use std::mem::replace;
+use std::mem::take;
-            let route_factories = replace(&mut self.route_factories, Vec::new());
+            let route_factories = take(&mut self.route_factories);
crates/evm/tests/integration.rs (1)

100-116: Semicolon after Ok(()) creates unit return in match arm.

Inside the trap closure at Line 113, the semicolon after self.bus.publish(SyncEnd::new())?; followed by nothing before the closing brace means the match arm's expression evaluates to (). This works but is subtle—the Ok(()) at Line 114 is the closure's return, not the match arm's value. Consider adding a brief comment or restructuring for clarity.

crates/evm/src/evm_read_interface.rs (1)

246-268: TimestampTracker caches only single block; consider edge cases.

The TimestampTracker caches only the most recent (block_number, timestamp) pair. This works well for sequential log processing within the same block, but if logs arrive out of order or from different blocks interleaved, it will re-fetch timestamps unnecessarily.

Also, returning 0 as a fallback timestamp (Lines 249, 264) could cause issues downstream if timestamp ordering matters. Consider logging at warn level when using fallback values to aid debugging.

crates/data/src/persistable.rs (2)

162-164: Consider documenting the [0] sentinel value convention.

The check if bytes == [0] treats a single zero byte as "not found". This is a sentinel value pattern that could be surprising to maintainers. Consider adding a comment explaining this convention or extracting it to a named constant.

📝 Suggested documentation
+    // A single zero byte [0] is used as a sentinel to indicate "no data"
+    // This distinguishes between "key not found" (None) and "empty value stored"
     if bytes == [0] {
         return Ok(None);
     }

168-187: Serialization errors are silently dropped.

When bincode::serialize fails (line 176), the error is logged but the method returns without persisting. While logging is good, callers have no way to know the write failed. Consider whether this is acceptable for your use case.

crates/sortition/src/ciphernode_selector.rs (1)

98-98: Minor: Consider if clones are necessary for params and error_size.

The .clone() calls on data.params and data.error_size may be necessary if E3Meta requires owned values, but worth verifying they can't be moved or borrowed instead.

Also applies to: 100-100

crates/events/src/enclave_event/mod.rs (1)

441-451: Prefer ctx.aggregate_id() for EnclaveEvent.

Now that EventContext is authoritative (and persisted), using payload-derived aggregate ids can drift from stored context.

♻️ Suggested refactor
 impl<S: SeqState> WithAggregateId for EnclaveEvent<S> {
     fn get_aggregate_id(&self) -> AggregateId {
-        self.payload.get_aggregate_id()
+        self.ctx.aggregate_id()
     }
 }

Comment thread crates/ciphernode-builder/src/ciphernode_builder.rs
Comment thread crates/ciphernode-builder/src/ciphernode.rs
Comment thread crates/ciphernode-builder/src/ciphernode.rs
Comment thread crates/config/src/rpc.rs
Comment thread crates/data/src/data_store.rs
Comment thread crates/evm/src/one_shot_runnner.rs
Comment thread crates/net/src/net_interface.rs
Comment thread crates/net/src/net_sync_manager.rs
Comment thread crates/net/src/net_sync_manager.rs
Comment thread crates/sync/src/sync.rs

@ctrlc03 ctrlc03 left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other than checking Coderabbit comments, looks fine to merge to me.

@ryardley

Copy link
Copy Markdown
Contributor Author

Made note of coderabbit stuff will address in future PRs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants