Skip to content

feat: event sync [skip-line-limit]#1239

Merged
hmzakhalid merged 79 commits into
mainfrom
ry/1050-event-sync
Feb 15, 2026
Merged

feat: event sync [skip-line-limit]#1239
hmzakhalid merged 79 commits into
mainfrom
ry/1050-event-sync

Conversation

@ryardley

@ryardley ryardley commented Jan 30, 2026

Copy link
Copy Markdown
Contributor

Progresses #1050

This does not fetch libp2p events from nodes. Infra is setup but there are some issues that need further work. Currently this is disabled in order to integrate.

Done

  • Weaves event context through the system
  • Passes to bus.publish() so that events have providence
  • Flatten out async calling to sortition
  • Passes event context to Persistable so that inserts know the aggregate id of the events
  • Added better error handling where appropriate with trap and trap_fut
  • Pass event context to errors so that errors have event provenance
  • EventIds now have colors in output to aid better debugging
  • Fix regex conflicts in ./examples/CRISP/Cargo.lock
  • Force context on all events and snapshots
  • Inject context into bus and persistence at the start of all sequential handlers or use TypedEvents for notified handlers
  • Ensure blockheight watermark is saved on each aggregate write
  • Query snapshot information in Sync
  • Ensure that evm package only emits EnclaveEvents and not custom events
  • Fetch initial information for Sync from eventsystem
  • Fetch historical events from evm
  • Fetch historical events from net
  • Sort events by timestamp before publishing them
  • Fix up event store query events (GetEventsAfterResponseEventStoreQueryResponse, EventStoreQueryBy<SeqAgg>, EventStoreQueryBy<TimeAgg>)
  • Add logging to sync process
  • Have net and evm both use event buffering until SyncEnd
  • Hookup effects to EnableEffects event (evm writers, net commands, multithread, persistable/snapshotbuffer)
  • Ensure EnclaveEvent must have source field and this must be filtered on for net fetch
  • Debug multithread not receiving events

Example of color coded events:

image

To do as a follow up PR

  • Setup second block based subscription
  • Delay snapshot information by some time (and see what breaks)
  • Delay live evm processing by two blocks
  • Open request edgecase detect in complete events
  • Test a sync in the middle of aggregation events
  • Review use of persistable in E3Router where aggregates are difficult to manage
  • Debug net fetch not working

Summary by CodeRabbit

  • New Features
    • Historical EVM/Net sync, snapshot buffering with timelock-driven batching, event extractor and net setup helper returning runtime handle + peer id.
  • Improvements
    • Context-aware events (source, block, timestamps) threaded across components; richer event querying by time/sequence/aggregate; mailbox sizing for actors; enhanced logging and colored event id formatting.
  • Bug Fixes
    • More robust buffering, storage and publish error handling; safer replay and sync flows; improved test helpers and tracing.

@vercel

vercel Bot commented Jan 30, 2026

Copy link
Copy Markdown

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
crisp Ready Ready Preview, Comment Feb 14, 2026 2:12pm
enclave-docs Ready Ready Preview, Comment Feb 14, 2026 2:12pm

Request Review

@coderabbitai

coderabbitai Bot commented Jan 30, 2026

Copy link
Copy Markdown
Contributor

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Threads EventContext through events via TypedEvent, converts many handlers to context-aware variants, replaces WriteBuffer with SnapshotBuffer/BatchRouter/TimelockQueue, expands BusHandle publish APIs, introduces historical sync types and a new async sync orchestration, and adds mailbox capacity limits across actors.

Changes

Cohort / File(s) Summary
TypedEvent & context migration
crates/aggregator/src/committee_finalizer.rs, crates/aggregator/src/publickey_aggregator.rs, crates/aggregator/src/threshold_plaintext_aggregator.rs, crates/keyshare/src/*, crates/multithread/src/multithread.rs, crates/net/src/document_publisher.rs, crates/sortition/src/*, crates/zk-prover/src/actors/*, crates/evm/src/*
Many Handler<T> implementations changed to Handler<TypedEvent<T>>. Handlers now call into_components() to extract (msg, ec); state mutations and publishes are threaded with EventContext. Audit handler signatures, notify/do_send sites, trap usage, and event-context propagation.
EventContext, TypedEvent & Bus API
crates/events/src/event_context.rs, crates/events/src/enclave_event/typed_event.rs, crates/events/src/traits.rs, crates/events/src/bus_handle.rs, crates/events/src/enclave_event/mod.rs
EventContext gains block and EventSource; TypedEvent adds into_components and source accessors; BusHandle adds with_ec, publish_without_context, publish_from_remote*, and generic set_ctx. Update trait signatures and all bus.publish call sites to pass/propagate context.
SnapshotBuffer / batching stack (WriteBuffer removed)
crates/events/src/snapshot_buffer/*, crates/ciphernode-builder/src/event_system.rs, crates/data/src/write_buffer.rs (deleted), crates/data/src/data_store.rs
Removed old WriteBuffer; added SnapshotBuffer, Batch, BatchRouter, TimelockQueue, and AggregateConfig. EventSystem and DataStore adjusted to accept SnapshotBuffer recipients and new getters. Review UpdateDestination/FlushSeq wiring and tests updated accordingly.
Event store query & router (generic queries)
crates/events/src/events.rs, crates/events/src/eventstore.rs, crates/events/src/eventstore_router.rs
Replaced Get/Receive flows with generic EventStoreQueryBy<Q: QueryKind> and EventStoreQueryResponse. Added QueryAggregator actor to fan-out per-aggregate sub-queries and aggregate results. Validate query kinds (Seq/Ts/SeqAgg/TsAgg) and aggregator correctness.
Sync orchestration & historical EVM flow
crates/sync/src/sync.rs, crates/sync/src/repo.rs, crates/sync/src/lib.rs, crates/evm/src/*, crates/net/src/*
Introduces async sync(...) orchestration, SnapshotMeta/AggregateState, collect_historical_evm_events, HistoricalEvmSyncStart/HistoricalNetSyncStart variants, NetEventBuffer, and setup_net helper. Review snapshot load, eventstore queries, historical EVM collection (timestamp sort), EffectsEnabled emission, and SyncEnded ordering.
Mailbox limits & actor lifecycle
crates/utils/src/constants.rs, many started() additions across actors
Adds MAILBOX_LIMIT and MAILBOX_LIMIT_LARGE; many actors set mailbox capacity in started(). Check capacity choices and potential ordering/backpressure effects.
Persistable & repositories
crates/data/src/persistable.rs, crates/data/src/repositories.rs, crates/data/src/*
Persistable gains try_mutate/try_mutate_without_context and a generic set_ctx; Repositories derives Clone and adds in_mem() helper. Confirm uses of try_mutate(&ec, ...) and persisted snapshot semantics.
Sequencer, Bus & EventStore
crates/events/src/sequencer.rs, crates/events/src/bus_handle.rs, crates/events/src/eventstore.rs
Sequencer now uses StoreEventRequested/StoreEventResponse; EventStore adds storage_errors tracking and query handlers for EventStoreQueryBy<Ts/Seq>. BusHandle extended for contextual publishing. Review try_send usage and storage-error panic paths.
Net / EVM integration & correlation
crates/net/src/lib.rs, crates/net/src/net_event_buffer.rs, crates/net/src/net_interface.rs, crates/evm/src/*
Adds setup_net API, NetEventBuffer actor, correlation_id threading for sync requests/responses, EventSource attribution when converting to EnclaveEvent, and NetEventTranslator refactor. Verify correlation handling, buffering during sync, and source tagging.
Utilities, test helpers & infra
crates/utils/src/*, crates/test-helpers/*, crates/tests/*, crates/ciphernode-builder/*
Adds actix channel adapters, oneshot runner mailbox sizing, major_issue helper, colorized formatter, with_tracing test helper, testmode_start_buffer_immediately builder flag, and test updates using publish_without_context. Confirm test harness and builder test hooks.
Removals, renames & API shifts
crates/events/src/enclave_event/*, crates/evm/src/enclave_sol.rs (removed), crates/data/src/write_buffer.rs (removed), crates/events/src/lib.rs (new modules)
Multiple event type renames and new variants (e.g., SyncStart/SyncEnd → Historical*/SyncEnded, EffectsEnabled), module exports added/removed, and some constructor signatures changed. Ensure call sites updated to new names and parameters.

Sequence Diagram(s)

sequenceDiagram
    participant Sync as Sync::sync()
    participant EventStore as EventStore
    participant EVM as Historical EVM
    participant Bus as EventBus
    participant Snapshot as SnapshotBuffer

    Sync->>EventStore: EventStoreQueryBy<TsAgg/SeqAgg> (since snapshot)
    EventStore-->>Sync: EventStoreQueryResponse (sequenced events)
    Sync->>EVM: HistoricalEvmSyncStart (request historical EVM events)
    EVM-->>Sync: HistoricalEvmEvents (EnclaveEvent<Unsequenced>)
    Sync->>Sync: sort events by ts
    Sync->>Bus: publish EffectsEnabled (with context)
    Bus-->>Snapshot: SnapshotBuffer Start (wires BatchRouter & TimelockQueue)
    Sync->>Bus: publish SyncEnded
    Bus-->>All: consumers receive sequenced events
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

Suggested labels

ciphernode

Suggested reviewers

  • hmzakhalid
  • ctrlc03

"🐇 I nibble at events, wrap them neat in TypedEvent,
Context threaded through each hop, mailboxes set, not spent.
Buffers sleep then wake to batch, timelocks tick and sing,
Historical sync rolls out its dance, and rabbits hop for spring. 🥕"

🚥 Pre-merge checks | ✅ 3 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 18.69% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: event sync' clearly and concisely summarizes the main feature being added - event synchronization infrastructure. It directly reflects the primary change throughout the changeset.
Merge Conflict Detection ✅ Passed ✅ No merge conflicts detected when merging into main

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ry/1050-event-sync

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

- Update TypedEvent handlers to preserve event context across the system
- Add publish_with_ctx method to maintain event causality chains
- Refactor aggregator and keyshare components to use new context-aware publishing
- Update bus handle to support both origin and context-aware event publishing
- Ensure proper event correlation across distributed components
Update Persistable trait to support EventContext in try_mutate method for
proper event synchronization. Add context tracking in persistable state
and modify all callers to pass EventContext when mutating state.
@vercel vercel Bot temporarily deployed to Preview – crisp February 1, 2026 05:01 Inactive
@vercel vercel Bot temporarily deployed to Preview – enclave-docs February 1, 2026 05:01 Inactive
@ryardley

Copy link
Copy Markdown
Contributor Author

I am now just trying to get tests pass and get this merged. Net events don't sync but blockchain events do.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
crates/evm/src/evm_chain_gateway.rs (1)

149-161: ⚠️ Potential issue | 🟠 Major

panic! in handle_evm_event will crash the actor on unexpected variants.

Line 159 uses panic! for an unexpected EnclaveEvmEvent variant. While the match arms cover the expected cases, a panic! bypasses the trap error handler in Handler<EnclaveEvmEvent> (line 208) and will terminate the actor. Consider using bail! or returning an error instead:

Suggested fix
-            _ => panic!("EvmChainGateway is only designed to receive EnclaveEvmEvent::HistoricalSyncComplete or EnclaveEvmEvent::Event events"),
+            _ => bail!("EvmChainGateway received unexpected EnclaveEvmEvent variant"),
crates/events/src/eventstore.rs (1)

26-45: ⚠️ Potential issue | 🟡 Minor

storage_errors counter never resets — duplicates accumulate over the process lifetime.

Once storage_errors exceeds MAX_STORAGE_ERRORS (10), the next duplicate causes a bail!, which propagates to the handler and triggers a panic via major_issue. This counter only ever increments and is never reset. If duplicate timestamps are expected during snapshot recovery (as the warn message suggests), 10 may not be enough slack for larger snapshots, and the counter has no way to reset after recovery completes.

Consider either resetting the counter after sync completes (e.g., via a message) or evaluating whether this threshold is sufficient for your expected snapshot sizes.

crates/multithread/src/multithread.rs (1)

122-136: ⚠️ Potential issue | 🟠 Major

attach_with_zk bypasses EffectsEnabled synchronization, creating inconsistent behavior with attach.

attach() defers the ComputeRequest subscription until EffectsEnabled fires, but attach_with_zk() subscribes immediately (line 134). This means compute requests could be processed before the system finishes syncing when ZK is enabled, potentially causing incorrect behavior if EffectsEnabled synchronization is critical to system correctness during replay or initialization.

🤖 Fix all issues with AI agents
In `@crates/ciphernode-builder/src/ciphernode_builder.rs`:
- Line 75: The CiphernodeBuilder field start_buffer is never applied during
build(): ensure the builder's testmode_start_buffer_immediately() choice is
propagated to the created EventSystem by reading self.start_buffer inside
build() and either setting EventSystem::start_buffer to that value or calling
EventSystem::start_buffer_immediately() when self.start_buffer is true; update
build() to transfer the builder's start_buffer into the EventSystem instance
(referencing CiphernodeBuilder::start_buffer, CiphernodeBuilder::build,
EventSystem and its start_buffer_immediately() method).

In `@crates/events/src/bus_handle.rs`:
- Around line 432-439: The BusHandle::handle implementation currently drops
block metadata by calling publish_from_remote(data, ts, None, source) which
loses the original event's block height; preserve block information by passing
the event's block (e.g., msg.block()) into publish_from_remote instead of None
(or explicitly verify and document that erasing block data is acceptable for all
downstream uses like BatchRouter.get_highest_block and AggregateState
snapshot/recovery). Locate the handle method handling EnclaveEvent<Sequenced>
and update the publish_from_remote call to forward the block from msg, ensuring
downstream watermark and recovery logic receive the original block data.

In `@crates/events/src/snapshot_buffer/batch_router.rs`:
- Around line 125-178: The code in Handler<EnclaveEvent<Sequenced>>::handle
computes prev_seq with ec.seq() - 1 which can underflow when ec.seq() == 0;
change this to use a checked or saturating subtraction (e.g.,
ec.seq().checked_sub(1) or ec.seq().saturating_sub(1)) and branch on the checked
result so you only call self.batches.contains_key(&prev_seq) and access
self.aggregates.get(&prev_seq) when a valid prev_seq exists; update the logic
around prev_seq, the timelock StartTimelock creation, and related debug logs
(references: handle, ec.seq(), prev_seq, self.batches.contains_key,
self.aggregates.get, StartTimelock::new) to use the non-underflowing value or
skip the prev-step entirely when seq == 0.

In `@crates/events/src/snapshot_buffer/timelock_queue.rs`:
- Around line 150-173: The handler for Tick in TimelockQueue currently pops a
timelock (self.timelocks.pop()) before calling
batch_router.try_send(FlushSeq(seq)), so any failure from try_send will drop the
timelock permanently; change the logic to either (a) peek at the next timelock
(without popping) and only pop after a successful send, or (b) if you must pop
first, catch send failures and re-insert the timelock back into self.timelocks
(preserving ordering) when batch_router.try_send(FlushSeq(seq)) fails, or (c)
replace try_send with a fire-and-forget do_send if losing timelocks is
unacceptable—apply this change inside the Tick handler (referencing
TimelockQueue::handle, self.timelocks.pop(), next_timelock_lt, and
batch_router.try_send).

In `@crates/net/src/net_event_buffer.rs`:
- Around line 89-107: The NetEventBuffer Actor::started method currently spawns
a task that calls addr.do_send(...) but does not set a mailbox capacity, risking
silent drops; update NetEventBuffer::started to call
ctx.set_mailbox_capacity(MAILBOX_LIMIT) (the same MAILBOX_LIMIT used by
Batch/BatchRouter/ThresholdShareCollector/EncryptionKeyCollector) before
spawning the async task so the actor mailbox can accommodate bursts during sync.

In `@crates/sync/src/sync.rs`:
- Around line 140-147: Replace the eprintln! call with tracing's structured
logger: in the timeout branch where you iterate over
expected.difference(&received) (variables expected, received, chain_id) use
tracing::error! (or add use tracing::error) to emit "Timeout waiting for
historical events from chain {}" including the chain_id value so logs are
captured by tracing rather than printed to stderr; ensure the tracing crate is
in scope for the module if not already.
🟡 Minor comments (17)
crates/utils/src/helpers.rs-11-12 (1)

11-12: ⚠️ Potential issue | 🟡 Minor

Remove debug logging artifact from generic utility.

"take has been called!" at info! level in a general-purpose utility will add noise to every consumer's logs. This reads like a temporary debug aid (the PR description itself mentions "debug multithread issues"). Remove it before merging; if you need to keep it for future diagnostics, drop it to trace! and include the type name or caller context.

Proposed fix
-use tracing::info;
-
+
     pub fn take(&self) -> Option<T> {
-        info!("take has been called!");
         self.0.lock().unwrap().take()
     }

Also applies to: 52-52

crates/evm/src/fix_historical_order.rs-12-12 (1)

12-12: ⚠️ Potential issue | 🟡 Minor

Unused info import.

All info! calls have been replaced with debug!, but info is still imported. This will produce a compiler warning.

Proposed fix
-use tracing::{debug, info};
+use tracing::debug;
crates/ciphernode-builder/src/evm_system.rs-89-89 (1)

89-89: ⚠️ Potential issue | 🟡 Minor

Typo: "Finaly" → "Finally".

-        // Finaly subscribe to the bus and wait for HistoricalEvmSyncStart
+        // Finally subscribe to the bus and wait for HistoricalEvmSyncStart
crates/utils/src/retry.rs-50-50 (1)

50-50: ⚠️ Potential issue | 🟡 Minor

Debug artifact: unconditional error! log before the match is noisy and misleading.

This fires at error level for every retry attempt, including retryable ones that are already logged at warn on line 62. Retryable errors appearing as error! will pollute logs and potentially trigger monitoring alerts for expected/recoverable situations. For actual Failure variants, it duplicates the error! on line 71.

Consider removing this line or downgrading to trace!/debug! if you want it for development.

Proposed fix
             Err(re) => {
-                tracing::error!("RETRY FAILED {:?}", re);
                 match re {
crates/ciphernode-builder/src/ciphernode_builder.rs-193-198 (1)

193-198: ⚠️ Potential issue | 🟡 Minor

Incomplete doc comment — sentence is cut off.

Line 194: "for tests that don't specifically" ends abruptly. Please complete the sentence.

crates/data/src/data_store.rs-179-179 (1)

179-179: ⚠️ Potential issue | 🟡 Minor

Remove debug println! left in production code.

This appears to be a leftover debug statement. Use tracing::debug! or tracing::trace! if logging is desired here.

🧹 Proposed fix
     pub fn from_sled_store_with_buffer(
         addr: &Addr<SledStore>,
         snapshot_buffer: impl Into<Recipient<Insert>>,
     ) -> Self {
-        println!("from_sled_store_with_buffer...");
         Self {
             addr: StoreAddr::Sled(addr.clone()),
             get: addr.clone().recipient(),
crates/events/src/sequencer.rs-62-72 (1)

62-72: ⚠️ Potential issue | 🟡 Minor

Same panic concern applies to StoreEventResponse handling.

If self.buffer.try_send(event.clone()) fails because the snapshot buffer's mailbox is temporarily full, the node panics. Since this path also feeds the bus, a brief queue overflow would be fatal. Consider whether a bounded retry or do_send (which queues without backpressure) would be more appropriate for the buffer, given that MAILBOX_LIMIT_LARGE is already set on the sequencer itself.

crates/events/src/sequencer.rs-50-59 (1)

50-59: ⚠️ Potential issue | 🟡 Minor

Panic on try_send failure is intentional but aggressive.

try_send can fail both when the recipient is stopped and when its mailbox is full (SendError::Full). A transient mailbox-full condition on the event store—e.g., during a burst of events—would crash the entire node. If this is acceptable for the critical sequencing path, a comment documenting that decision would help future readers.

crates/net/src/events.rs-300-300 (1)

300-300: ⚠️ Potential issue | 🟡 Minor

Typo: "RECEIVED and event" → "RECEIVED an event".

crates/net/src/net_sync_manager.rs-108-127 (1)

108-127: ⚠️ Potential issue | 🟡 Minor

Potential race between AllPeersDialed broadcast and await_event resubscribe.

If AllPeersDialed is broadcast on the NetEvent channel (net_interface.rs line 149) before await_event calls resubscribe() (events.rs line 336), but after the HistoricalNetSyncStart handler reads self.peers_ready as false (line 124), the await_event will miss the AllPeersDialed signal and wait the full 30-second timeout.

This can happen if the actor's mailbox receives HistoricalNetSyncStart before the forwarded AllPeersDialed from the tokio::spawn loop (lines 67–81), even though the broadcast already fired.

The 30-second timeout prevents a hang, so the worst case is a delayed startup. Not a blocker, but worth noting for future refinement (e.g., re-checking peers_ready or using a shared Notify).

crates/events/src/event_context.rs-34-52 (1)

34-52: ⚠️ Potential issue | 🟡 Minor

from_chain_id silently maps overflow to AggregateId(0) on 32-bit targets.

Line 40: id.try_into().unwrap_or(0) silently maps any u64 that doesn't fit in usize to AggregateId(0), which is the system-wide default aggregate. On 64-bit platforms this is a non-issue, but if this code ever runs on 32-bit (e.g., WASM32), chain IDs above u32::MAX would silently route to the default aggregate.

Unlikely to matter in practice; just flagging for awareness.

crates/sync/src/sync.rs-32-32 (1)

32-32: ⚠️ Potential issue | 🟡 Minor

Typo: "snapsshot" → "snapshot".

-    // 1. Load snapsshot metadata
+    // 1. Load snapshot metadata
crates/events/src/enclave_event/sync_start.rs-64-72 (1)

64-72: ⚠️ Potential issue | 🟡 Minor

Stale doc comment on HistoricalNetSyncStart.sender.

Line 67: the comment says "so that the evm can communicate" but this is the net sync start, not evm. Copy-paste from the evm variant above.

Suggested fix
     #[serde(skip)]
-    /// We include the sender here so that the evm can communicate directly with the sync actor
+    /// We include the sender here so that the net layer can communicate directly with the sync actor
     pub sender: Option<Recipient<HistoricalNetEventsReceived>>,
crates/events/src/eventstore.rs-67-81 (1)

67-81: ⚠️ Potential issue | 🟡 Minor

Copy-paste comment on the Seq handler.

Line 68: the comment says "if there are no events after the timestamp" but this handler queries by sequence number, not timestamp. Also, unlike the Ts handler, there's no early-return for an empty result — read_from on a non-existent sequence will just return an empty iterator, so it still works correctly.

Suggested comment fix
     pub fn handle_event_store_query_seq(&mut self, msg: EventStoreQueryBy<Seq>) -> Result<()> {
-        // if there are no events after the timestamp return an empty vector
+        // Read events from the given sequence number and return them
         let id = msg.id();
crates/sortition/src/sortition.rs-1-1 (1)

1-1: ⚠️ Potential issue | 🟡 Minor

Invalid SPDX license identifier: LGPL-4.0-only does not exist.

GNU LGPL versions are 2.0, 2.1, and 3.0. All other files in this PR use LGPL-3.0-only. This appears to be a typo.

Proposed fix
-// SPDX-License-Identifier: LGPL-4.0-only
+// SPDX-License-Identifier: LGPL-3.0-only
crates/aggregator/src/threshold_plaintext_aggregator.rs-293-318 (1)

293-318: ⚠️ Potential issue | 🟡 Minor

Wrong error type: EType::PublickeyAggregation used in the Plaintext aggregator's decryption share handler.

Lines 300–301 and 329–331 use EType::PublickeyAggregation, but this is the ThresholdPlaintextAggregator, not the PublicKeyAggregator. The ComputeAggregate and ComputeResponse handlers on lines 387 and 398 correctly use EType::PlaintextAggregation. This looks like a copy-paste from the public key aggregator.

Proposed fix
 impl Handler<TypedEvent<DecryptionshareCreated>> for ThresholdPlaintextAggregator {
     type Result = ();
     fn handle(
         &mut self,
         msg: TypedEvent<DecryptionshareCreated>,
         ctx: &mut Self::Context,
     ) -> Self::Result {
         trap(
-            EType::PublickeyAggregation,
+            EType::PlaintextAggregation,
             &self.bus.with_ec(msg.get_ctx()),
 impl Handler<E3CommitteeContainsResponse<TypedEvent<DecryptionshareCreated>>>
     for ThresholdPlaintextAggregator
 {
     type Result = ();
     fn handle(
         &mut self,
         msg: E3CommitteeContainsResponse<TypedEvent<DecryptionshareCreated>>,
         ctx: &mut Self::Context,
     ) -> Self::Result {
         trap(
-            EType::PublickeyAggregation,
+            EType::PlaintextAggregation,
             &self.bus.with_ec(msg.get_ctx()),
crates/keyshare/src/threshold_keyshare.rs-988-996 (1)

988-996: ⚠️ Potential issue | 🟡 Minor

Errors silently discarded in ThresholdShareCreated and EncryptionKeyCreated dispatch.

Lines 990–991 and 994–995 use let _ = self.handle_...(...) which silently swallows any errors returned from these handlers. Other event paths in this file use trap to report errors to the bus. If forwarding to a collector fails (e.g., collector not initialized), the error will be lost.

Consider wrapping these in trap consistent with the other dispatch paths, or at minimum logging the error:

Proposed fix
             EnclaveEventData::ThresholdShareCreated(data) => {
-                let _ =
-                    self.handle_threshold_share_created(TypedEvent::new(data, ec), ctx.address());
+                trap(
+                    EType::KeyGeneration,
+                    &self.bus.with_ec(&ec),
+                    || self.handle_threshold_share_created(TypedEvent::new(data, ec), ctx.address()),
+                )
             }
             EnclaveEventData::EncryptionKeyCreated(data) => {
-                let _ =
-                    self.handle_encryption_key_created(TypedEvent::new(data, ec), ctx.address());
+                trap(
+                    EType::KeyGeneration,
+                    &self.bus.with_ec(&ec),
+                    || self.handle_encryption_key_created(TypedEvent::new(data, ec), ctx.address()),
+                )
             }

Note: this fix has a borrow issue since ec is moved into TypedEvent::new and also borrowed by with_ec. You'd need to clone ec or restructure slightly. But the current let _ = pattern should at least be addressed.

🧹 Nitpick comments (50)
crates/ciphernode-builder/src/evm_system.rs (3)

70-70: Prefer std::mem::take over replace with default value.

Vec<T> implements Default, so std::mem::take(&mut self.route_factories) is the idiomatic shorthand for replace(&mut self.route_factories, Vec::new()).

♻️ Suggested diff
-            let route_factories = replace(&mut self.route_factories, Vec::new());
+            let route_factories = std::mem::take(&mut self.route_factories);

You can then also simplify the import on line 7:

-use std::mem::replace;
+use std::mem::take;

or just inline std::mem::take.


53-92: Calling build() more than once will silently create a broken subscription.

Since build takes &mut self and drains route_factories via replace, a second call would subscribe a new pipeline to the bus that has an empty router (fallback only, no contract routes). There's no guard or consumed-self pattern preventing this. If build is intended to be called exactly once, consider taking self by value to enforce it at compile time.

♻️ Suggested signature change
-    pub fn build(&mut self) {
+    pub fn build(mut self) {

This consumes the builder, preventing accidental double-invocation. If the current &mut self is required for integration reasons, a runtime guard (e.g., a built: bool flag) would also work.


96-98: Use the RouteFactory alias for consistency.

The parameter type Box<dyn RouteFn> is exactly RouteFactory (line 23). Using the alias keeps the vocabulary consistent.

 fn configure_router(
     next: impl Into<EvmEventProcessor>,
-    route_factories: Vec<(Address, Box<dyn RouteFn>)>,
+    route_factories: Vec<(Address, RouteFactory)>,
 ) -> EvmRouter {
crates/net/src/document_publisher.rs (2)

540-542: EventConverter is missing MAILBOX_LIMIT on startup, unlike DocumentPublisher.

DocumentPublisher sets ctx.set_mailbox_capacity(MAILBOX_LIMIT) in its started hook (Line 138), but EventConverter's Actor impl is empty. Since EventConverter also receives EnclaveEvent messages (subscribed to three event types), it should be consistent.

Proposed fix
 impl Actor for EventConverter {
     type Context = actix::Context<Self>;
+    fn started(&mut self, ctx: &mut Self::Context) {
+        ctx.set_mailbox_capacity(MAILBOX_LIMIT)
+    }
 }

222-228: Unnecessary .clone() on an already-owned msg.

msg is owned (DocumentPublishedNotification is passed by value into the handler). The .clone() on Line 222 is redundant since msg can be moved directly into the async block captured by trap_fut.

Proposed fix
-        let msg = msg.clone();
         trap_fut(
             EType::IO,
             &bus,
-            handle_document_published_notification(tx, rx, bus.clone(), ids, msg),
+            handle_document_published_notification(tx, rx, bus.clone(), ids, msg.clone()),
         )

Actually, looking again — msg is already owned and not used after the trap_fut call, so the clone is unnecessary entirely:

-        let msg = msg.clone();
         trap_fut(
             EType::IO,
             &bus,
             handle_document_published_notification(tx, rx, bus.clone(), ids, msg),
         )
crates/evm/tests/integration.rs (1)

100-110: Inconsistent Arc wrapping of EthProvider between the two tests.

In evm_reader (line 102), the provider is wrapped in Arc::new(...), while in ensure_historical_events (line 172) it is not. Both are passed to EvmSystemChainBuilder::new by reference. If Arc isn't required by the API, consider removing it from evm_reader for consistency; otherwise, add it to ensure_historical_events.

Also applies to: 170-178

crates/ciphernode-builder/src/event_system.rs (6)

32-55: Debug println! statements should use tracing::info for consistency.

Lines 50, 353, and 367 use println! while the rest of the file uses tracing::info. These look like debug leftovers.

🔧 Suggested fix
 fn get_or_init_store(&self, handle: &BusHandle) -> Result<Addr<SledStore>> {
-    println!("get_or_init_store in {:?} ...", self.sled_path);
+    info!("get_or_init_store in {:?} ...", self.sled_path);
     self.store

Similarly for lines 353 and 367:

-    println!("handle");
+    info!("handle");
-    println!("store()...");
+    info!("store()...");

69-84: Stale doc comment: references WriteBuffer but implementation uses SnapshotBuffer.

Line 76 mentions "WriteBuffer for batching inserts from actors into a snapshot" but the field on line 84 is now Addr<SnapshotBuffer>. The inline comment on line 83 also says "WriteBuffer for batching…".

📝 Suggested fix
-/// - **WriteBuffer** for batching inserts from actors into a snapshot
+/// - **SnapshotBuffer** for batching inserts from actors into a snapshot
-    /// WriteBuffer for batching inserts from actors into a snapshot
+    /// SnapshotBuffer for batching inserts from actors into a snapshot

317-342: Copy-paste log messages: all three methods log "eventstore_reader..." — should be distinct.

eventstore_router() (line 318), eventstore_getter_seq() (line 327), and eventstore_getter_ts() (line 336) all log "eventstore_reader...". At minimum the first should say "eventstore_router...", and ideally all three are distinguishable for debugging.


375-384: Redundant get_or_init_store + handle() calls in the Persisted branch.

base is moved into UpdateDestination on line 378, then get_or_init_store(&self.handle()?) is called again on line 381 just to get the same Addr. Clone base before moving it (same pattern used in the InMem branch on line 372) to avoid the extra calls.

🔧 Suggested fix
 EventSystemBackend::Persisted(b) => {
     let base = b.get_or_init_store(&self.handle()?)?;
     let buffer = self.buffer()?;
-    buffer.try_send(UpdateDestination::new(base))?;
-
-    DataStore::from_sled_store_with_buffer(
-        &b.get_or_init_store(&self.handle()?)?,
-        self.buffer()?,
-    )
+    buffer.try_send(UpdateDestination::new(base.clone()))?;
+    DataStore::from_sled_store_with_buffer(&base, buffer)
 }

507-507: Remove commented-out code.

Line 507 has // system.buffer()?.send(Start).await?; — this appears to be a debug artifact from the old WriteBuffer / Start message flow. Remove to keep the test clean.


288-342: Each call to in_mem_eventstore_router() and persisted_eventstore_router() spawns a new router actor without caching.

The three public getters (eventstore_router, eventstore_getter_seq, eventstore_getter_ts) each delegate to these methods, creating a fresh actor on every invocation. While eventstore_addrs properly caches results via OnceCell, the router methods do not.

Current usage shows single invocation per getter during system initialization (lines 226, 394-395), so this is not a practical issue today. However, consider caching the router Addr in a OnceCell to prevent accidental multiple invocations if the access patterns change.

crates/utils/src/constants.rs (1)

10-12: Incomplete comment on line 10.

// Max message appears truncated. Consider clarifying, e.g., // Max mailbox capacity for actor message queues.

crates/utils/src/error.rs (1)

10-12: Nit: prefer String::from or .to_string() over format! with no interpolation args.

Suggested fix
-    format!("System has crashed. Nothing personal. Goodbye.")
+    "System has crashed. Nothing personal. Goodbye.".to_string()
crates/events/src/enclave_event/enclave_error.rs (1)

104-145: Dispatchers look clean; consider deriving Default.

All three dispatchers are unit structs with trivial new() constructors. You could derive Default to get the standard trait for free, but this is purely optional.

Example for PanicDispatcher (same for the others)
+#[derive(Default)]
 pub struct PanicDispatcher;
-
-impl PanicDispatcher {
-    pub fn new() -> Self {
-        Self {}
-    }
-}
crates/events/src/snapshot_buffer/aggregate_config.rs (3)

24-32: Typo in comment and non-idiomatic None check.

Line 27: "AggregatId" → "AggregateId". Also, if let None = ... is non-idiomatic; prefer the entry API which is both cleaner and avoids a double lookup.

♻️ Proposed fix
-    /// Create a new AggregateConfig with the specified delays
-    pub fn new(mut delays: HashMap<AggregateId, Duration>) -> Self {
-        // Always handle AggregatId of 0 with a delay of 0
-        if let None = delays.get(&AggregateId::new(0)) {
-            delays.insert(AggregateId::new(0), Duration::from_micros(0));
-        }
-        Self { delays }
+    /// Create a new AggregateConfig with the specified delays
+    pub fn new(mut delays: HashMap<AggregateId, Duration>) -> Self {
+        // Always handle AggregateId of 0 with a delay of 0
+        delays
+            .entry(AggregateId::new(0))
+            .or_insert(Duration::from_micros(0));
+        Self { delays }

15-42: Two impl blocks for the same struct can be merged.

There's no trait-related reason to split AggregateConfig across two impl blocks. Merging them improves readability.


11-12: delays field is pub, allowing bypass of the constructor invariant.

The new() constructor guarantees AggregateId(0) exists, but since delays is pub, callers can directly mutate the map and remove it. Consider making the field private if you want to preserve that invariant.

♻️ Proposed fix
 pub struct AggregateConfig {
-    pub delays: HashMap<AggregateId, Duration>,
+    delays: HashMap<AggregateId, Duration>,
 }
crates/aggregator/src/committee_finalizer.rs (1)

127-131: Minor formatting nit: missing space after comma.

Line 130: },ec)?;}, ec)?;

Proposed fix
-                                    },ec)?;
+                                    }, ec)?;
crates/utils/src/formatters.rs (1)

78-88: Consider guarding against hue_max <= hue_min.

If hue_max - hue_min is zero or negative, line 83 would produce a division-by-zero (NaN) or negative modulus. Currently the only call site uses (30.0, 330.0) which is safe, but a debug_assert! would protect against future misuse.

♻️ Proposed guard
 fn hash_str_to_ansi_color_in_hue_range(s: &str, hue_min: f32, hue_max: f32) -> u8 {
+    debug_assert!(hue_max > hue_min, "hue_max must be greater than hue_min");
     let hash: u32 = s
crates/utils/src/actix/channel.rs (1)

60-67: Use tracing::error! instead of eprintln! for consistency.

The rest of the codebase uses the tracing crate for logging. Using eprintln! here bypasses structured logging and won't appear in log collectors.

♻️ Proposed fix
+use tracing::error;
+
 // ...
 
     fn handle(&mut self, m: M, _: &mut Context<Self>) -> Self::Result {
         let s = self.0.clone();
         actix::spawn(async move {
             if let Err(e) = s.send(m).await {
-                eprintln!("Failed to send message: {}", e);
+                error!("Failed to send message: {}", e);
             }
         });
     }
crates/events/src/snapshot_buffer/timelock_queue.rs (2)

61-83: Ord and PartialEq are inconsistent for Timelock.

Eq/PartialEq are derived (comparing all fields), but Ord::cmp only considers expiry. Two Timelock values with the same expiry but different seq would satisfy a.cmp(b) == Equal while a != b, violating the Ord trait contract. While BinaryHeap likely works fine in practice, this is technically unsound.

Either derive Ord/PartialOrd too (comparing all fields), or implement PartialEq manually to match Ord:

Option A: include seq as a tiebreaker in Ord
 impl Ord for Timelock {
     fn cmp(&self, other: &Self) -> Ordering {
-        self.expiry.cmp(&other.expiry)
+        self.expiry.cmp(&other.expiry).then_with(|| self.seq.cmp(&other.seq))
     }
 }

163-163: Prefer !self.timelocks.is_empty() over self.timelocks.len() > 0.

Idiomatic Rust uses is_empty() for checking emptiness.

Suggested fix
-            while self.timelocks.len() > 0 && self.next_timelock_lt(now_time) {
+            while !self.timelocks.is_empty() && self.next_timelock_lt(now_time) {
crates/data/src/repositories.rs (1)

32-34: in_mem() requires a running actix runtime — consider documenting this constraint.

InMemStore::new(false).start() will panic if called outside an actix system context. If this is intended only for tests (where a runtime is typically set up), a brief doc comment or naming convention (e.g., in_mem_for_test) would make that expectation clear to future callers.

crates/net/src/lib.rs (2)

44-44: Hardcoded gossip topic appears intentional but temporary.

"tmp-enclave-gossip-topic" reads like a placeholder. If this is meant to be configurable or derived from network identity, consider adding a TODO or passing it as a parameter.


47-53: Keypair bytes remain in memory after use.

After constructing the ed25519::Keypair on Line 57, the decrypted bytes Vec stays in memory until dropped. For key material, consider zeroing the buffer after use (e.g., via zeroize). This is a defense-in-depth measure — not critical, but good hygiene for private keys.

crates/events/src/snapshot_buffer/batch.rs (1)

49-61: Minor idiomatic Rust improvements.

Two small nits:

  1. Line 52: replace(&mut self.inserts, Vec::new()) can be simplified to std::mem::take(&mut self.inserts), which is more idiomatic and avoids the explicit Vec::new().
  2. Line 54: inserts.len() > 0 is conventionally written as !inserts.is_empty() (Clippy len_zero lint).
Proposed diff
-        let inserts = replace(&mut self.inserts, Vec::new());
+        let inserts = std::mem::take(&mut self.inserts);
         trap(EType::IO, &PanicDispatcher::new(), || {
-            if inserts.len() > 0 {
+            if !inserts.is_empty() {
                 self.db.try_send(InsertBatch::new(inserts))?;
             }
crates/net/src/net_event_buffer.rs (1)

97-105: Silently dropping lagged events could lose data during sync.

On RecvError::Lagged, the task just continues, meaning events that arrived while the receiver fell behind are permanently lost. If the input broadcast channel is producing events faster than the actor can process them, this could silently drop sync-relevant events. Consider logging a warning on lag so it's observable.

Proposed fix
                     Ok(event) => addr.do_send(IncomingNetEvent(event)),
-                    Err(RecvError::Lagged(_)) => continue,
+                    Err(RecvError::Lagged(n)) => {
+                        tracing::warn!("NetEventBuffer: broadcast receiver lagged, dropped {n} events");
+                        continue;
+                    }
                     Err(RecvError::Closed) => break,
crates/data/src/persistable.rs (1)

270-275: Redundant .clone() after .into().

value.into() already produces an owned EventContext<Sequenced>, making the subsequent .clone() unnecessary.

♻️ Proposed fix
     fn set_ctx<C>(&mut self, value: C)
     where
         C: Into<EventContext<Sequenced>>,
     {
-        self.ctx = Some(value.into().clone())
+        self.ctx = Some(value.into())
     }
crates/evm/src/events.rs (1)

66-68: Consider returning a named struct or documenting the tuple order for split().

split() returns (EnclaveEventData, u128, u64) which silently drops chain_id and id, and the positional meaning of the u128 (ts) vs u64 (block) isn't self-documenting. Compare with EnclaveEvent::into_components (at crates/events/src/enclave_event/mod.rs:296-298) which returns semantically clearer types. This is low-priority since callers are presumably limited to the Sync actor.

crates/data/src/data_store.rs (1)

204-224: from_in_mem and from_sled_store duplicate the existing From impls.

These named constructors (lines 204-224) produce the exact same DataStore as the From<&Addr<InMemStore>> and From<&Addr<SledStore>> implementations (lines 227-250). Consider having the From impls delegate to the named methods (or vice versa) to avoid the duplication.

crates/sortition/src/ciphernode_selector.rs (1)

187-201: contains + position iterates the committee list twice.

Since contains (line 188) already confirms membership, the position call (line 194) will always return Some. You could eliminate the contains check and use position alone, letting the None arm handle "not in committee."

♻️ Proposed simplification
-                // Check if this node is in the finalized committee
-                if !msg.committee.contains(&self.address) {
-                    info!(node = self.address, "Node not in finalized committee");
-                    return Ok(());
-                }
-
-                // Retrieve E3 metadata from repository
-                let Some(party_id) = msg.committee.iter().position(|addr| addr == &self.address)
-                else {
-                    info!(
-                        node = self.address,
-                        "Node address not found in committee list (should not happen)"
-                    );
+                let Some(party_id) = msg.committee.iter().position(|addr| addr == &self.address) else {
+                    info!(node = self.address, "Node not in finalized committee");
                     return Ok(());
                 };
crates/net/src/net_interface.rs (2)

396-396: Verbose debug-style logging with *** markers.

Lines 396, 415, 434, 452, 463, 651 use info! with *** delimiters. These are clearly temporary debugging aids. Consider downgrading to debug! or trace! before merging to avoid noisy production logs.


700-719: Correlator key collision risk when storing both Kademlia QueryId and request-response RequestId in the same map.

Both Kademlia QueryId (used for PUT/GET record operations) and request-response RequestId (used for sync operations) flow through the single Correlator map and are converted to String keys via Display. If their representations both produce identical numeric strings (e.g., both display as "42"), a later track() or expire() call from one subsystem could shadow or consume an entry from the other. Prefixing the stringified ID with a subsystem identifier (e.g., "kad:42" vs "rr:42") eliminates this ambiguity.

Example prefix approach
-    pub fn track(&mut self, query_id: impl Display, correlation_id: CorrelationId) {
-        self.inner.insert(format!("{query_id}"), correlation_id);
+    pub fn track(&mut self, query_id: impl Display, correlation_id: CorrelationId) {
+        self.inner.insert(format!("kad:{query_id}"), correlation_id);
     }

Alternatively, use separate maps per subsystem or a newtype wrapper.

crates/events/src/eventstore_router.rs (1)

116-201: Near-identical logic in handle_event_store_query_ts and handle_event_store_query_seq.

These two methods share the same structure: extract parent ID → build sub-queries → handle empty case → spawn aggregator → dispatch. The only differences are the query type parameters (Ts/TsAgg vs Seq/SeqAgg) and the value type (u128 vs u64). Consider extracting a generic helper to reduce duplication if more query types are added.

crates/net/src/events.rs (2)

290-313: Overly verbose info!-level logging in call_and_await_response.

Every command send, every received event, every correlation mismatch, and every matcher failure is logged at info! level. In a production system with many concurrent events, this will produce excessive noise. Consider:

  • Line 290–293: debug! for command sends
  • Line 300: trace! for all received events
  • Lines 306–312: trace! for correlation mismatches (these will fire for every non-matching event)

328-354: await_event — clean utility, minor nit on error message.

Line 352: anyhow::anyhow!(format!("Timed out waiting for event")) — the format!() wrapper is unnecessary since anyhow! already supports format strings directly.

Simplify
-    .map_err(|_| anyhow::anyhow!(format!("Timed out waiting for event")))?;
+    .map_err(|_| anyhow::anyhow!("Timed out waiting for event"))?;
crates/net/src/net_sync_manager.rs (1)

238-250: Cloning every NetEvent in the matcher closure is unnecessary.

Line 240: |e| match e.clone() clones every event received on the broadcast channel, not just matching ones. Match on the reference instead and only clone on match.

Avoid unnecessary clones
         |e| match e.clone() {
-            NetEvent::OutgoingSyncRequestSucceeded(value) => Some(Ok(value)),
-            NetEvent::OutgoingSyncRequestFailed(error) => {
+        |e| match e {
+            NetEvent::OutgoingSyncRequestSucceeded(value) => Some(Ok(value.clone())),
+            NetEvent::OutgoingSyncRequestFailed(error) => {
                 Some(Err(anyhow!("Outgoing sync request failed: {:?}", error)))
             }
             _ => None,
         },
crates/events/src/snapshot_buffer/snapshot_buffer.rs (1)

46-70: Significant amount of commented-out code and unused parameter.

new(_: bool) ignores its start_buffering parameter, and ~40 lines of commented-out code remain (SnapshotBufferState enum, EffectsEnabled handler, state-based routing logic in multiple handlers). This appears to be incomplete state-machine logic that was deferred.

If this is intentional WIP, consider adding a TODO comment explaining when the state machine will be re-enabled. If it's abandoned, remove the dead code and the unused parameter.

crates/net/src/net_event_translator.rs (1)

121-140: warn! on every gossip publish is too noisy for normal operation.

Line 129 logs at warn! level for every forwardable event published to gossip. This will produce significant log noise under normal operation. Consider using debug! or trace! instead, reserving warn! for unexpected conditions.

Suggested fix
-        warn!("GossipPublish event: {}", msg.event_type());
+        debug!("GossipPublish event: {}", msg.event_type());
crates/events/src/snapshot_buffer/batch_router.rs (1)

21-21: Local type alias Seq shadows a crate-level public type.

type Seq = u64 here shadows crate::Seq (used in eventstore.rs). While this works because Seq isn't imported from the crate in this file, it may confuse readers navigating between files. Consider renaming to something like SeqNum or just using u64 directly.

crates/sync/src/sync.rs (2)

46-61: EventStore query via oneshot has no timeout — could hang if eventstore is unresponsive.

Line 54: rx.await? will block indefinitely if the eventstore actor never responds (e.g., it panicked or its mailbox is full and try_send on line 49 failed silently after being enqueued). Consider wrapping with a timeout similar to the EVM event collection on line 72.


254-388: Large block of commented-out test code.

This commented-out test should either be restored (once bugs are fixed) or removed and tracked via an issue. Leaving it in the codebase adds noise.

Would you like me to open an issue to track re-enabling this test?

crates/events/src/bus_handle.rs (1)

236-241: Redundant .clone() on value.into().

value.into() already produces an owned EventContext<Sequenced>, so the trailing .clone() is unnecessary.

Suggested fix
     fn set_ctx<C>(&mut self, value: C)
     where
         C: Into<EventContext<Sequenced>>,
     {
-        self.ctx = Some(value.into().clone());
+        self.ctx = Some(value.into());
     }
crates/multithread/src/multithread.rs (1)

179-212: Variable ctx shadows Actix context convention — consider renaming to ec for consistency.

Throughout the rest of this PR (and this very file on line 154), ec is the convention for EventContext. Here on line 190, ctx is used instead, which could be confused with the Actix actor context parameter common in handler methods.

Proposed fix
-    let (msg, ctx) = msg.into_components();
-    // We spawn a thread on rayon moving to "sync"-land
+    let (msg, ec) = msg.into_components();
+    // We spawn a thread on rayon moving to "sync"-land
...
-        Ok(val) => bus.publish(val, ctx)?,
+        Ok(val) => bus.publish(val, ec)?,
...
-            bus.publish(e, ctx)?
+            bus.publish(e, ec)?
crates/sortition/src/sortition.rs (2)

334-353: Typo in method name + minor style nits.

  • get_committeget_committee
  • unwrap_or_else(|| Vec::new())unwrap_or_default()
  • committee.len() == 0committee.is_empty()
Proposed fix
-    fn get_committe(&self, e3_id: &E3id) -> Vec<String> {
+    fn get_committee(&self, e3_id: &E3id) -> Vec<String> {
         self.finalized_committees
             .get()
             .and_then(|committees| committees.get(e3_id).cloned())
-            .unwrap_or_else(|| Vec::new())
+            .unwrap_or_default()
     }
 
     fn committee_contains(&mut self, e3_id: E3id, node: String) -> bool {
-        let committee = self.get_committe(&e3_id);
+        let committee = self.get_committee(&e3_id);
 
-        if committee.len() == 0 {
+        if committee.is_empty() {

680-698: Unnecessary clone() on bus reference.

Line 690 uses &self.bus.clone() but since trap only borrows the bus, &self.bus suffices. Unlike other handlers, there's no ec available here so with_ec can't be used — the plain bus reference is correct for error context.

Proposed fix
-        trap(EType::Sortition, &self.bus.clone(), || {
+        trap(EType::Sortition, &self.bus, || {
crates/events/src/events.rs (1)

175-183: sender() consumes self — intentional but worth noting.

sender(self) takes ownership, meaning id() and query() must be called before sender(). This is fine as a deliberate API choice that prevents accidental use-after-move, but callers need to be aware of the ordering constraint.

If multiple accesses are needed in the same scope, consider returning &Recipient<EventStoreQueryResponse> or extracting all fields together (e.g., a destructure() method). Not urgent — the current API is usable.

crates/keyshare/src/threshold_keyshare.rs (2)

459-470: Remove commented-out code block.

This dead code was replaced by the EncryptionKeyPending publish on lines 471–478. Commented-out code should be cleaned up before merge.

Proposed fix
-        // let state = self.state.try_get()?;
-        // self.bus.publish(
-        //     EncryptionKeyCreated {
-        //         e3_id: state.e3_id.clone(),
-        //         key: Arc::new(EncryptionKey {
-        //             party_id: state.party_id,
-        //             pk_bfv: pk_bfv_bytes,
-        //         }),
-        //         external: false,
-        //     },
-        //     ec,
-        // )?;
         self.bus.publish(

1100-1153: Unnecessary self.bus.clone()&self.bus suffices.

Lines 1107 and 1135 clone the bus only to borrow the result. Since trap takes &impl ErrorDispatcher, a direct reference works. Same pattern as noted in sortition.rs.

Proposed fix
-        trap(EType::KeyGeneration, &self.bus.clone(), || {
+        trap(EType::KeyGeneration, &self.bus, || {

(Apply to both handlers at lines 1107 and 1135)

Comment thread crates/ciphernode-builder/src/ciphernode_builder.rs
Comment thread crates/events/src/bus_handle.rs
Comment thread crates/events/src/snapshot_buffer/batch_router.rs
Comment thread crates/events/src/snapshot_buffer/timelock_queue.rs
Comment thread crates/net/src/net_event_buffer.rs
Comment thread crates/sync/src/sync.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@crates/events/src/eventstore_router.rs`:
- Around line 45-76: QueryAggregator currently waits indefinitely for sub-query
responses in Handler<EventStoreQueryResponse>::handle because self.pending may
never empty; add a timeout guard in the actor so a non-responding EventStore
doesn't leak it. In the QueryAggregator actor (look for impl Actor for
QueryAggregator and the handle method for EventStoreQueryResponse) schedule a
ctx.run_later(deadline, move |act, ctx| {
act.sender.do_send(error_response_or_partial(EventStoreQueryResponse::new(act.parent_id,
std::mem::take(&mut act.collected_events)))); ctx.notify(Die); }) when the query
is started (or when the first sub-query is sent) so that if pending is non-empty
after the deadline it logs the timeout, sends an error/partial
EventStoreQueryResponse to sender, and calls ctx.notify(Die); cancel or avoid
rescheduling if pending becomes empty in handle (i.e., clear or ignore the
timeout if all responses arrived).
🧹 Nitpick comments (3)
crates/events/src/eventstore_router.rs (3)

116-201: handle_event_store_query_ts and handle_event_store_query_seq are nearly identical.

The two methods differ only in the generic type parameter (TsAgg/Ts vs SeqAgg/Seq). If the trait bounds allow it, consider extracting the common logic into a generic helper to reduce duplication.


232-239: Error messages for TsAgg and SeqAgg query failures are identical.

Both handlers log "Failed to route get events after request". Consider differentiating them (e.g., "timestamp query" vs "sequence query") to aid debugging.

Suggested diff
     fn handle(&mut self, msg: EventStoreQueryBy<SeqAgg>, ctx: &mut Self::Context) -> Self::Result {
         if let Err(e) = self.handle_event_store_query_seq(msg, ctx) {
-            error!("Failed to route get events after request: {}", e);
+            error!("Failed to route sequence query request: {}", e);
         }
     }

And similarly for the TsAgg handler:

     fn handle(&mut self, msg: EventStoreQueryBy<TsAgg>, ctx: &mut Self::Context) -> Self::Result {
         if let Err(e) = self.handle_event_store_query_ts(msg, ctx) {
-            error!("Failed to route get events after request: {}", e);
+            error!("Failed to route timestamp query request: {}", e);
         }
     }

40-42: Remove unused pending_aggregates() method or add annotation if intentionally kept for future use.

The method at lines 40-42 is not called anywhere in the codebase. If it's not part of a public API contract, remove it to keep the code clean.

Comment thread crates/events/src/eventstore_router.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@crates/ciphernode-builder/src/ciphernode_builder.rs`:
- Around line 196-201: The doc comment for the builder method
testmode_start_buffer_immediately is cut off mid-sentence; update the doc
comment to be a complete, clear sentence explaining that this forces
SnapshotBuffer to start immediately instead of waiting for SyncEnded, and why
that is useful (e.g., for tests that don't explicitly trigger SyncEnded or start
buffering). Locate the method named testmode_start_buffer_immediately and
replace the incomplete comment with a full sentence (or two) that describes
behavior, intended use in tests, and any side-effects.
🧹 Nitpick comments (4)
crates/zk-prover/src/actors/proof_request.rs (1)

130-141: Inconsistent: ec is not extracted and the TypedEvent wrapper is accessed directly.

Every other handler in this file and in proof_verification.rs destructures with let (msg, ec) = msg.into_components() before operating on the inner data. Here, msg.get_err() and msg.correlation_id() are called directly on the TypedEvent<ComputeRequestError>, presumably via Deref. This works but is inconsistent with the established pattern.

Since the error path doesn't need to republish (so ec is unused), this is purely a consistency concern, but it would make future changes easier if the pattern is uniform.

♻️ Suggested refactor for consistency
 fn handle_compute_request_error(&mut self, msg: TypedEvent<ComputeRequestError>) {
-    let ComputeRequestErrorKind::Zk(err) = msg.get_err() else {
+    let (msg, _ec) = msg.into_components();
+    let ComputeRequestErrorKind::Zk(err) = msg.get_err() else {
         return;
     };
 
-    if let Some(pending) = self.pending.remove(msg.correlation_id()) {
+    if let Some(pending) = self.pending.remove(msg.correlation_id()) {
crates/zk-prover/src/actors/proof_verification.rs (1)

220-225: Unnecessary ec.clone() in the success branch.

Since the if and else branches are mutually exclusive, the success branch is the last (and only) consumer of ec here. You can pass ec directly instead of ec.clone().

♻️ Minor optimization
-            self.publish_key_created(msg.e3_id, msg.key, ec.clone());
+            self.publish_key_created(msg.e3_id, msg.key, ec);
crates/events/src/enclave_event/mod.rs (2)

588-598: Display formatting uses Debug output — can be very verbose.

colorize_event_ids(self) formats via Debug, which for EnclaveEvent includes the full payload. In high-throughput logging, this could produce very large log lines. If this is intentional for debugging/development, it's fine, but consider a more compact representation for production use.


297-303: Consider deprecating split() in favor of into_components().

split() discards all context except ts, while into_components() preserves the full EventContext<S>. The method has three active callers in the codebase (integration tests, crates/net/src/events.rs, and crates/events/src/bus_handle.rs), all of which could migrate to into_components() with minimal overhead.

Comment thread crates/ciphernode-builder/src/ciphernode_builder.rs

@hmzakhalid hmzakhalid left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot this Rudi, I went through this yesterday and followed the sync event flow, everything seems good over all. Left a few comments.
Will take another look later today to see if I find something.

Comment thread crates/events/src/eventstore.rs
Comment thread crates/events/src/enclave_event/mod.rs
Comment thread crates/events/src/snapshot_buffer/batch_router.rs
Comment thread crates/sync/src/sync.rs
Comment thread crates/events/src/bus_handle.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/ciphernode-builder/src/event_system.rs (1)

317-342: ⚠️ Potential issue | 🟠 Major

Each call to eventstore_router() / eventstore_getter_seq() / eventstore_getter_ts() spawns a new EventStoreRouter actor.

Unlike other components (sequencer, buffer, eventbus) which are cached in OnceCell, every invocation of these three methods calls in_mem_eventstore_router() or persisted_eventstore_router(), each of which creates and .start()s a new router actor. If called more than once, this results in orphaned router actors.

Additionally, all three methods log "eventstore_reader..." — a copy-paste artifact that makes debugging harder.

#!/bin/bash
# Check how many times these methods are called
rg -n 'eventstore_router\(\)|eventstore_getter_seq\(\)|eventstore_getter_ts\(\)' --type rust -C1
🤖 Fix all issues with AI agents
In `@crates/events/src/sequencer.rs`:
- Around line 61-65: The panic message inside Sequencer where
handle_store_event_response(msg) returns Err still mentions "snapshot_buffer or
bus."; update that panic call to remove the stale "snapshot_buffer" reference so
it only says the event could not be sent to the bus (e.g., change the
major_issue message passed to panic in the error branch of
Sequencer::handle_store_event_response to reference only the bus). Ensure you
adjust the string passed to major_issue and keep the error variable e included.

In `@crates/events/src/snapshot_buffer/snapshot_buffer.rs`:
- Around line 54-60: SnapshotBuffer::new currently drops the start_buffering
boolean so EventSystem::start_buffer_immediately has no effect; modify
SnapshotBuffer (add a field like start_buffering: bool) and have
SnapshotBuffer::new(start_buffering: bool) store it, then use that flag when
initializing or starting the router/tickable (e.g., pass it into BatchRouter
creation or call the router start method immediately when start_buffering is
true) so the buffer begins processing immediately when requested; update any
call sites (spawn()/with_clock()) that pass the flag to match the new
constructor signature.
🧹 Nitpick comments (7)
crates/events/src/snapshot_buffer/snapshot_buffer.rs (2)

149-159: Unused ctx parameter will trigger a compiler warning.

All other handlers use _ for the unused context parameter, but this one names it ctx.

-    fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result {
+    fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result {

215-217: Prefer std::mem::take over replace(..., Vec::new()).

take is the idiomatic shorthand for replacing with default.

-            replace(&mut self.evts, Vec::new())
+            std::mem::take(&mut self.evts)
crates/ciphernode-builder/src/event_system.rs (3)

49-55: Replace println! with tracing macros for consistency.

Lines 50, 353, and 367 use println! while the rest of the file uses tracing::info!. println! bypasses structured logging, won't appear in log collectors, and can't be filtered by level.

-        println!("get_or_init_store in {:?} ...", self.sled_path);
+        info!("get_or_init_store in {:?} ...", self.sled_path);
-        println!("handle");
+        info!("handle");
-        println!("store()...");
+        info!("store()...");

Also applies to: 353-353, 367-367


375-384: get_or_init_store is called twice in the Persisted branch.

Lines 376 and 381 both call b.get_or_init_store(&self.handle()?). While OnceCell makes the second call cheap, this is redundant and harder to read. Reuse the base variable from line 376.

             EventSystemBackend::Persisted(b) => {
                 let base = b.get_or_init_store(&self.handle()?)?;
                 let buffer = self.buffer()?;
                 buffer.try_send(UpdateDestination::new(base))?;
 
                 DataStore::from_sled_store_with_buffer(
-                    &b.get_or_init_store(&self.handle()?)?,
+                    &b.get_or_init_store(&self.handle()?)?, // base was moved above
                     self.buffer()?,
                 )

Note: base was moved into UpdateDestination::new(base) on line 378, so it can't be reused as-is. Either clone base before the move or restructure:

Suggested fix
             EventSystemBackend::Persisted(b) => {
                 let base = b.get_or_init_store(&self.handle()?)?;
                 let buffer = self.buffer()?;
-                buffer.try_send(UpdateDestination::new(base))?;
-
-                DataStore::from_sled_store_with_buffer(
-                    &b.get_or_init_store(&self.handle()?)?,
-                    self.buffer()?,
-                )
+                buffer.try_send(UpdateDestination::new(base.clone()))?;
+                DataStore::from_sled_store_with_buffer(&base, buffer)
             }

This also avoids the second call to self.buffer() and self.handle().


369-374: self.buffer()? called twice in InMem branch.

buffer is already bound on line 371 but line 373 calls self.buffer()? again.

             EventSystemBackend::InMem(b) => {
                 let base = b.get_or_init_store();
                 let buffer = self.buffer()?;
                 buffer.try_send(UpdateDestination::new(base.clone()))?;
-                DataStore::from_in_mem_with_buffer(&base, self.buffer()?)
+                DataStore::from_in_mem_with_buffer(&base, buffer)
             }
crates/sync/src/sync.rs (2)

25-114: Sync orchestration flow looks solid overall.

The step-by-step approach with logging at each phase is clear and well-structured. A couple of minor observations:

  • Line 32: typo "snapsshot""snapshot" in the comment.
  • Line 105: bus.naked_dispatch(event) is fire-and-forget with no error propagation. If dispatch can fail silently, historical events could be lost without notice. Consider whether this should return/propagate errors like the other try_send calls above.

221-229: to_sequence_map: consider using .collect() instead of manual fold.

Minor readability nit — .map(|s| (s.aggregate_id, s.seq)).collect() achieves the same result more concisely.

Comment thread crates/events/src/sequencer.rs
Comment thread crates/events/src/snapshot_buffer/snapshot_buffer.rs Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@crates/ciphernode-builder/src/event_system.rs`:
- Line 304: The three misleading info! logs all read "eventstore_reader..." —
update the log messages in the functions eventstore_router(),
eventstore_getter_seq(), and eventstore_getter_ts() so each reflects its actual
role (e.g., "eventstore_router...", "eventstore_getter_seq...",
"eventstore_getter_ts...") using the existing info! macro calls; locate the
info! invocation inside each named function and change the string literal to a
clear, unique message for that function.
- Around line 49-50: Replace leftover println!/print! debug artifacts in
get_or_init_store (and the other occurrences at lines referenced) with the
crate's tracing macros: use trace! or debug! (choose trace! for very verbose
internals, debug! for higher-level diagnostics) and include contextual data like
self.sled_path and any relevant variables in the macro call; if the output was
purely temporary and not needed, remove the statement instead. Locate the calls
in the get_or_init_store function and the two other spots (previously using
print!/println!) and convert them to tracing::{trace!, debug!} with structured
fields (e.g., trace!("get_or_init_store", sled_path = ?self.sled_path)) or drop
them if redundant.
- Around line 200-206: get_enclave_bus_handle() currently calls handle() but
never calls store(), leaving SnapshotBuffer wired to NoopBatchReceiver and
dropping InsertBatch messages; update get_enclave_bus_handle() to enforce the
required initialization order by invoking the store() path after obtaining the
handle (e.g., call self.store(...) or otherwise send the UpdateDestination to
SnapshotBuffer) so the buffer's UpdateDestination is set before returning the
handle, and reference the existing methods get_enclave_bus_handle(), handle(),
store(), buffer(), and SnapshotBuffer/UpdateDestination when making the change.
🧹 Nitpick comments (6)
crates/events/src/snapshot_buffer/snapshot_buffer.rs (3)

141-151: Unused ctx parameter.

ctx is bound but unused in Handler<EnclaveEvent>. Prefix with _ to match the convention used in all other handlers in this file.

Proposed fix
-    fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result {
+    fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result {

94-103: Silent message dropping when dependencies are None.

All forwarding handlers (FlushSeq, StartTimelock, Insert, EnclaveEvent, Tick, UpdateDestination) silently discard messages if the corresponding dependency hasn't been set yet. While the initialization window is extremely narrow (SetDependencies is the first message), consider adding a warn! or debug! log when a message is dropped due to a missing dependency. This would aid debugging if the wiring order ever changes or SetDependencies delivery fails.


205-210: Prefer std::mem::take over replace with a new empty Vec.

std::mem::take is the idiomatic shorthand here.

Proposed fix
-    use std::mem::replace;
...
-            replace(&mut self.evts, Vec::new())
+            std::mem::take(&mut self.evts)
crates/ciphernode-builder/src/event_system.rs (3)

312-328: eventstore_getter_seq and eventstore_getter_ts are near-identical.

The two methods differ only in the generic parameter (SeqAgg vs TsAgg). If more aggregation types are added, this pattern will proliferate. A generic helper could reduce duplication:

Possible refactor sketch
// Where M is the query message type and EventStoreRouter impls the right Actix Handler
fn eventstore_getter<M>(&self) -> Result<Recipient<M>>
where
    M: actix::Message + Send + 'static,
    M::Result: Send,
    EventStoreRouter<InMemSequenceIndex, InMemEventLog>: Handler<M>,
    EventStoreRouter<SledSequenceIndex, CommitLogEventLog>: Handler<M>,
{
    let eventstores = self.eventstore_addrs()?;
    match &eventstores {
        EventStoreAddrs::InMem(_) => Ok(self.in_mem_eventstore_router()?.recipient()),
        EventStoreAddrs::Persisted(_) => Ok(self.persisted_eventstore_router()?.recipient()),
    }
}

This becomes more valuable once the router-caching issue above is addressed.


354-374: Redundant self.buffer() and b.get_or_init_store() calls in store().

In the InMem branch, self.buffer() is called on line 357 and again on line 359. In the Persisted branch, b.get_or_init_store(&self.handle()?) is called on line 362 and again on line 367. While idempotent via OnceCell, this is unnecessarily verbose and harder to follow.

Proposed fix for the Persisted branch
             EventSystemBackend::Persisted(b) => {
-                let base = b.get_or_init_store(&self.handle()?)?;
+                let handle = self.handle()?;
+                let base = b.get_or_init_store(&handle)?;
                 let buffer = self.buffer()?;
                 buffer.try_send(UpdateDestination::new(base.clone()))?;
 
-                DataStore::from_sled_store_with_buffer(
-                    &b.get_or_init_store(&self.handle()?)?,
-                    self.buffer()?,
-                )
+                DataStore::from_sled_store_with_buffer(&base, buffer)
             }

302-328: Cache router instances to avoid creating multiple stateless proxies at initialization.

in_mem_eventstore_router() and persisted_eventstore_router() each spawn a new EventStoreRouter actor without caching. Since eventstore_router(), eventstore_getter_seq(), and eventstore_getter_ts() each call one of these methods during system initialization, you end up with three separate stateless router instances. While the underlying EventStore actors are cached via OnceCell, the routers should be cached similarly to avoid unnecessary proxy actor creation.

Also remove debug println! statements on lines 50, 339, and 353 (use logging macros instead), and fix misleading log messages that say "eventstore_reader..." on methods that are routers/getters.

Comment thread crates/ciphernode-builder/src/event_system.rs Outdated
Comment thread crates/ciphernode-builder/src/event_system.rs
Comment thread crates/ciphernode-builder/src/event_system.rs Outdated

@hmzakhalid hmzakhalid left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the fixes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants