feat: event sync [skip-line-limit]#1239
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThreads EventContext through events via TypedEvent, converts many handlers to context-aware variants, replaces WriteBuffer with SnapshotBuffer/BatchRouter/TimelockQueue, expands BusHandle publish APIs, introduces historical sync types and a new async sync orchestration, and adds mailbox capacity limits across actors. Changes
Sequence Diagram(s)sequenceDiagram
participant Sync as Sync::sync()
participant EventStore as EventStore
participant EVM as Historical EVM
participant Bus as EventBus
participant Snapshot as SnapshotBuffer
Sync->>EventStore: EventStoreQueryBy<TsAgg/SeqAgg> (since snapshot)
EventStore-->>Sync: EventStoreQueryResponse (sequenced events)
Sync->>EVM: HistoricalEvmSyncStart (request historical EVM events)
EVM-->>Sync: HistoricalEvmEvents (EnclaveEvent<Unsequenced>)
Sync->>Sync: sort events by ts
Sync->>Bus: publish EffectsEnabled (with context)
Bus-->>Snapshot: SnapshotBuffer Start (wires BatchRouter & TimelockQueue)
Sync->>Bus: publish SyncEnded
Bus-->>All: consumers receive sequenced events
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 3 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
- Update TypedEvent handlers to preserve event context across the system - Add publish_with_ctx method to maintain event causality chains - Refactor aggregator and keyshare components to use new context-aware publishing - Update bus handle to support both origin and context-aware event publishing - Ensure proper event correlation across distributed components
Update Persistable trait to support EventContext in try_mutate method for proper event synchronization. Add context tracking in persistable state and modify all callers to pass EventContext when mutating state.
|
I am now just trying to get tests pass and get this merged. Net events don't sync but blockchain events do. |
There was a problem hiding this comment.
Actionable comments posted: 6
Note
Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
crates/evm/src/evm_chain_gateway.rs (1)
149-161:⚠️ Potential issue | 🟠 Major
panic!inhandle_evm_eventwill crash the actor on unexpected variants.Line 159 uses
panic!for an unexpectedEnclaveEvmEventvariant. While the match arms cover the expected cases, apanic!bypasses thetraperror handler inHandler<EnclaveEvmEvent>(line 208) and will terminate the actor. Consider usingbail!or returning an error instead:Suggested fix
- _ => panic!("EvmChainGateway is only designed to receive EnclaveEvmEvent::HistoricalSyncComplete or EnclaveEvmEvent::Event events"), + _ => bail!("EvmChainGateway received unexpected EnclaveEvmEvent variant"),crates/events/src/eventstore.rs (1)
26-45:⚠️ Potential issue | 🟡 Minor
storage_errorscounter never resets — duplicates accumulate over the process lifetime.Once
storage_errorsexceedsMAX_STORAGE_ERRORS(10), the next duplicate causes abail!, which propagates to the handler and triggers a panic viamajor_issue. This counter only ever increments and is never reset. If duplicate timestamps are expected during snapshot recovery (as the warn message suggests), 10 may not be enough slack for larger snapshots, and the counter has no way to reset after recovery completes.Consider either resetting the counter after sync completes (e.g., via a message) or evaluating whether this threshold is sufficient for your expected snapshot sizes.
crates/multithread/src/multithread.rs (1)
122-136:⚠️ Potential issue | 🟠 Major
attach_with_zkbypassesEffectsEnabledsynchronization, creating inconsistent behavior withattach.
attach()defers theComputeRequestsubscription untilEffectsEnabledfires, butattach_with_zk()subscribes immediately (line 134). This means compute requests could be processed before the system finishes syncing when ZK is enabled, potentially causing incorrect behavior ifEffectsEnabledsynchronization is critical to system correctness during replay or initialization.
🤖 Fix all issues with AI agents
In `@crates/ciphernode-builder/src/ciphernode_builder.rs`:
- Line 75: The CiphernodeBuilder field start_buffer is never applied during
build(): ensure the builder's testmode_start_buffer_immediately() choice is
propagated to the created EventSystem by reading self.start_buffer inside
build() and either setting EventSystem::start_buffer to that value or calling
EventSystem::start_buffer_immediately() when self.start_buffer is true; update
build() to transfer the builder's start_buffer into the EventSystem instance
(referencing CiphernodeBuilder::start_buffer, CiphernodeBuilder::build,
EventSystem and its start_buffer_immediately() method).
In `@crates/events/src/bus_handle.rs`:
- Around line 432-439: The BusHandle::handle implementation currently drops
block metadata by calling publish_from_remote(data, ts, None, source) which
loses the original event's block height; preserve block information by passing
the event's block (e.g., msg.block()) into publish_from_remote instead of None
(or explicitly verify and document that erasing block data is acceptable for all
downstream uses like BatchRouter.get_highest_block and AggregateState
snapshot/recovery). Locate the handle method handling EnclaveEvent<Sequenced>
and update the publish_from_remote call to forward the block from msg, ensuring
downstream watermark and recovery logic receive the original block data.
In `@crates/events/src/snapshot_buffer/batch_router.rs`:
- Around line 125-178: The code in Handler<EnclaveEvent<Sequenced>>::handle
computes prev_seq with ec.seq() - 1 which can underflow when ec.seq() == 0;
change this to use a checked or saturating subtraction (e.g.,
ec.seq().checked_sub(1) or ec.seq().saturating_sub(1)) and branch on the checked
result so you only call self.batches.contains_key(&prev_seq) and access
self.aggregates.get(&prev_seq) when a valid prev_seq exists; update the logic
around prev_seq, the timelock StartTimelock creation, and related debug logs
(references: handle, ec.seq(), prev_seq, self.batches.contains_key,
self.aggregates.get, StartTimelock::new) to use the non-underflowing value or
skip the prev-step entirely when seq == 0.
In `@crates/events/src/snapshot_buffer/timelock_queue.rs`:
- Around line 150-173: The handler for Tick in TimelockQueue currently pops a
timelock (self.timelocks.pop()) before calling
batch_router.try_send(FlushSeq(seq)), so any failure from try_send will drop the
timelock permanently; change the logic to either (a) peek at the next timelock
(without popping) and only pop after a successful send, or (b) if you must pop
first, catch send failures and re-insert the timelock back into self.timelocks
(preserving ordering) when batch_router.try_send(FlushSeq(seq)) fails, or (c)
replace try_send with a fire-and-forget do_send if losing timelocks is
unacceptable—apply this change inside the Tick handler (referencing
TimelockQueue::handle, self.timelocks.pop(), next_timelock_lt, and
batch_router.try_send).
In `@crates/net/src/net_event_buffer.rs`:
- Around line 89-107: The NetEventBuffer Actor::started method currently spawns
a task that calls addr.do_send(...) but does not set a mailbox capacity, risking
silent drops; update NetEventBuffer::started to call
ctx.set_mailbox_capacity(MAILBOX_LIMIT) (the same MAILBOX_LIMIT used by
Batch/BatchRouter/ThresholdShareCollector/EncryptionKeyCollector) before
spawning the async task so the actor mailbox can accommodate bursts during sync.
In `@crates/sync/src/sync.rs`:
- Around line 140-147: Replace the eprintln! call with tracing's structured
logger: in the timeout branch where you iterate over
expected.difference(&received) (variables expected, received, chain_id) use
tracing::error! (or add use tracing::error) to emit "Timeout waiting for
historical events from chain {}" including the chain_id value so logs are
captured by tracing rather than printed to stderr; ensure the tracing crate is
in scope for the module if not already.
🟡 Minor comments (17)
crates/utils/src/helpers.rs-11-12 (1)
11-12:⚠️ Potential issue | 🟡 MinorRemove debug logging artifact from generic utility.
"take has been called!"atinfo!level in a general-purpose utility will add noise to every consumer's logs. This reads like a temporary debug aid (the PR description itself mentions "debug multithread issues"). Remove it before merging; if you need to keep it for future diagnostics, drop it totrace!and include the type name or caller context.Proposed fix
-use tracing::info; - +pub fn take(&self) -> Option<T> { - info!("take has been called!"); self.0.lock().unwrap().take() }Also applies to: 52-52
crates/evm/src/fix_historical_order.rs-12-12 (1)
12-12:⚠️ Potential issue | 🟡 MinorUnused
infoimport.All
info!calls have been replaced withdebug!, butinfois still imported. This will produce a compiler warning.Proposed fix
-use tracing::{debug, info}; +use tracing::debug;crates/ciphernode-builder/src/evm_system.rs-89-89 (1)
89-89:⚠️ Potential issue | 🟡 MinorTypo: "Finaly" → "Finally".
- // Finaly subscribe to the bus and wait for HistoricalEvmSyncStart + // Finally subscribe to the bus and wait for HistoricalEvmSyncStartcrates/utils/src/retry.rs-50-50 (1)
50-50:⚠️ Potential issue | 🟡 MinorDebug artifact: unconditional
error!log before the match is noisy and misleading.This fires at
errorlevel for every retry attempt, including retryable ones that are already logged atwarnon line 62. Retryable errors appearing aserror!will pollute logs and potentially trigger monitoring alerts for expected/recoverable situations. For actualFailurevariants, it duplicates theerror!on line 71.Consider removing this line or downgrading to
trace!/debug!if you want it for development.Proposed fix
Err(re) => { - tracing::error!("RETRY FAILED {:?}", re); match re {crates/ciphernode-builder/src/ciphernode_builder.rs-193-198 (1)
193-198:⚠️ Potential issue | 🟡 MinorIncomplete doc comment — sentence is cut off.
Line 194: "for tests that don't specifically" ends abruptly. Please complete the sentence.
crates/data/src/data_store.rs-179-179 (1)
179-179:⚠️ Potential issue | 🟡 MinorRemove debug
println!left in production code.This appears to be a leftover debug statement. Use
tracing::debug!ortracing::trace!if logging is desired here.🧹 Proposed fix
pub fn from_sled_store_with_buffer( addr: &Addr<SledStore>, snapshot_buffer: impl Into<Recipient<Insert>>, ) -> Self { - println!("from_sled_store_with_buffer..."); Self { addr: StoreAddr::Sled(addr.clone()), get: addr.clone().recipient(),crates/events/src/sequencer.rs-62-72 (1)
62-72:⚠️ Potential issue | 🟡 MinorSame panic concern applies to
StoreEventResponsehandling.If
self.buffer.try_send(event.clone())fails because the snapshot buffer's mailbox is temporarily full, the node panics. Since this path also feeds the bus, a brief queue overflow would be fatal. Consider whether a bounded retry ordo_send(which queues without backpressure) would be more appropriate for the buffer, given thatMAILBOX_LIMIT_LARGEis already set on the sequencer itself.crates/events/src/sequencer.rs-50-59 (1)
50-59:⚠️ Potential issue | 🟡 MinorPanic on
try_sendfailure is intentional but aggressive.
try_sendcan fail both when the recipient is stopped and when its mailbox is full (SendError::Full). A transient mailbox-full condition on the event store—e.g., during a burst of events—would crash the entire node. If this is acceptable for the critical sequencing path, a comment documenting that decision would help future readers.crates/net/src/events.rs-300-300 (1)
300-300:⚠️ Potential issue | 🟡 MinorTypo: "RECEIVED and event" → "RECEIVED an event".
crates/net/src/net_sync_manager.rs-108-127 (1)
108-127:⚠️ Potential issue | 🟡 MinorPotential race between
AllPeersDialedbroadcast andawait_eventresubscribe.If
AllPeersDialedis broadcast on theNetEventchannel (net_interface.rs line 149) beforeawait_eventcallsresubscribe()(events.rs line 336), but after theHistoricalNetSyncStarthandler readsself.peers_readyasfalse(line 124), theawait_eventwill miss theAllPeersDialedsignal and wait the full 30-second timeout.This can happen if the actor's mailbox receives
HistoricalNetSyncStartbefore the forwardedAllPeersDialedfrom thetokio::spawnloop (lines 67–81), even though the broadcast already fired.The 30-second timeout prevents a hang, so the worst case is a delayed startup. Not a blocker, but worth noting for future refinement (e.g., re-checking
peers_readyor using a sharedNotify).crates/events/src/event_context.rs-34-52 (1)
34-52:⚠️ Potential issue | 🟡 Minor
from_chain_idsilently maps overflow toAggregateId(0)on 32-bit targets.Line 40:
id.try_into().unwrap_or(0)silently maps anyu64that doesn't fit inusizetoAggregateId(0), which is the system-wide default aggregate. On 64-bit platforms this is a non-issue, but if this code ever runs on 32-bit (e.g., WASM32), chain IDs aboveu32::MAXwould silently route to the default aggregate.Unlikely to matter in practice; just flagging for awareness.
crates/sync/src/sync.rs-32-32 (1)
32-32:⚠️ Potential issue | 🟡 MinorTypo: "snapsshot" → "snapshot".
- // 1. Load snapsshot metadata + // 1. Load snapshot metadatacrates/events/src/enclave_event/sync_start.rs-64-72 (1)
64-72:⚠️ Potential issue | 🟡 MinorStale doc comment on
HistoricalNetSyncStart.sender.Line 67: the comment says "so that the evm can communicate" but this is the net sync start, not evm. Copy-paste from the evm variant above.
Suggested fix
#[serde(skip)] - /// We include the sender here so that the evm can communicate directly with the sync actor + /// We include the sender here so that the net layer can communicate directly with the sync actor pub sender: Option<Recipient<HistoricalNetEventsReceived>>,crates/events/src/eventstore.rs-67-81 (1)
67-81:⚠️ Potential issue | 🟡 MinorCopy-paste comment on the Seq handler.
Line 68: the comment says "if there are no events after the timestamp" but this handler queries by sequence number, not timestamp. Also, unlike the Ts handler, there's no early-return for an empty result —
read_fromon a non-existent sequence will just return an empty iterator, so it still works correctly.Suggested comment fix
pub fn handle_event_store_query_seq(&mut self, msg: EventStoreQueryBy<Seq>) -> Result<()> { - // if there are no events after the timestamp return an empty vector + // Read events from the given sequence number and return them let id = msg.id();crates/sortition/src/sortition.rs-1-1 (1)
1-1:⚠️ Potential issue | 🟡 MinorInvalid SPDX license identifier:
LGPL-4.0-onlydoes not exist.GNU LGPL versions are 2.0, 2.1, and 3.0. All other files in this PR use
LGPL-3.0-only. This appears to be a typo.Proposed fix
-// SPDX-License-Identifier: LGPL-4.0-only +// SPDX-License-Identifier: LGPL-3.0-onlycrates/aggregator/src/threshold_plaintext_aggregator.rs-293-318 (1)
293-318:⚠️ Potential issue | 🟡 MinorWrong error type:
EType::PublickeyAggregationused in the Plaintext aggregator's decryption share handler.Lines 300–301 and 329–331 use
EType::PublickeyAggregation, but this is theThresholdPlaintextAggregator, not thePublicKeyAggregator. TheComputeAggregateandComputeResponsehandlers on lines 387 and 398 correctly useEType::PlaintextAggregation. This looks like a copy-paste from the public key aggregator.Proposed fix
impl Handler<TypedEvent<DecryptionshareCreated>> for ThresholdPlaintextAggregator { type Result = (); fn handle( &mut self, msg: TypedEvent<DecryptionshareCreated>, ctx: &mut Self::Context, ) -> Self::Result { trap( - EType::PublickeyAggregation, + EType::PlaintextAggregation, &self.bus.with_ec(msg.get_ctx()),impl Handler<E3CommitteeContainsResponse<TypedEvent<DecryptionshareCreated>>> for ThresholdPlaintextAggregator { type Result = (); fn handle( &mut self, msg: E3CommitteeContainsResponse<TypedEvent<DecryptionshareCreated>>, ctx: &mut Self::Context, ) -> Self::Result { trap( - EType::PublickeyAggregation, + EType::PlaintextAggregation, &self.bus.with_ec(msg.get_ctx()),crates/keyshare/src/threshold_keyshare.rs-988-996 (1)
988-996:⚠️ Potential issue | 🟡 MinorErrors silently discarded in
ThresholdShareCreatedandEncryptionKeyCreateddispatch.Lines 990–991 and 994–995 use
let _ = self.handle_...(...)which silently swallows any errors returned from these handlers. Other event paths in this file usetrapto report errors to the bus. If forwarding to a collector fails (e.g., collector not initialized), the error will be lost.Consider wrapping these in
trapconsistent with the other dispatch paths, or at minimum logging the error:Proposed fix
EnclaveEventData::ThresholdShareCreated(data) => { - let _ = - self.handle_threshold_share_created(TypedEvent::new(data, ec), ctx.address()); + trap( + EType::KeyGeneration, + &self.bus.with_ec(&ec), + || self.handle_threshold_share_created(TypedEvent::new(data, ec), ctx.address()), + ) } EnclaveEventData::EncryptionKeyCreated(data) => { - let _ = - self.handle_encryption_key_created(TypedEvent::new(data, ec), ctx.address()); + trap( + EType::KeyGeneration, + &self.bus.with_ec(&ec), + || self.handle_encryption_key_created(TypedEvent::new(data, ec), ctx.address()), + ) }Note: this fix has a borrow issue since
ecis moved intoTypedEvent::newand also borrowed bywith_ec. You'd need to cloneecor restructure slightly. But the currentlet _ =pattern should at least be addressed.
🧹 Nitpick comments (50)
crates/ciphernode-builder/src/evm_system.rs (3)
70-70: Preferstd::mem::takeoverreplacewith default value.
Vec<T>implementsDefault, sostd::mem::take(&mut self.route_factories)is the idiomatic shorthand forreplace(&mut self.route_factories, Vec::new()).♻️ Suggested diff
- let route_factories = replace(&mut self.route_factories, Vec::new()); + let route_factories = std::mem::take(&mut self.route_factories);You can then also simplify the import on line 7:
-use std::mem::replace; +use std::mem::take;or just inline
std::mem::take.
53-92: Callingbuild()more than once will silently create a broken subscription.Since
buildtakes&mut selfand drainsroute_factoriesviareplace, a second call would subscribe a new pipeline to the bus that has an empty router (fallback only, no contract routes). There's no guard or consumed-self pattern preventing this. Ifbuildis intended to be called exactly once, consider takingselfby value to enforce it at compile time.♻️ Suggested signature change
- pub fn build(&mut self) { + pub fn build(mut self) {This consumes the builder, preventing accidental double-invocation. If the current
&mut selfis required for integration reasons, a runtime guard (e.g., abuilt: boolflag) would also work.
96-98: Use theRouteFactoryalias for consistency.The parameter type
Box<dyn RouteFn>is exactlyRouteFactory(line 23). Using the alias keeps the vocabulary consistent.fn configure_router( next: impl Into<EvmEventProcessor>, - route_factories: Vec<(Address, Box<dyn RouteFn>)>, + route_factories: Vec<(Address, RouteFactory)>, ) -> EvmRouter {crates/net/src/document_publisher.rs (2)
540-542:EventConverteris missingMAILBOX_LIMITon startup, unlikeDocumentPublisher.
DocumentPublishersetsctx.set_mailbox_capacity(MAILBOX_LIMIT)in itsstartedhook (Line 138), butEventConverter'sActorimpl is empty. SinceEventConverteralso receivesEnclaveEventmessages (subscribed to three event types), it should be consistent.Proposed fix
impl Actor for EventConverter { type Context = actix::Context<Self>; + fn started(&mut self, ctx: &mut Self::Context) { + ctx.set_mailbox_capacity(MAILBOX_LIMIT) + } }
222-228: Unnecessary.clone()on an already-ownedmsg.
msgis owned (DocumentPublishedNotificationis passed by value into the handler). The.clone()on Line 222 is redundant sincemsgcan be moved directly into the async block captured bytrap_fut.Proposed fix
- let msg = msg.clone(); trap_fut( EType::IO, &bus, - handle_document_published_notification(tx, rx, bus.clone(), ids, msg), + handle_document_published_notification(tx, rx, bus.clone(), ids, msg.clone()), )Actually, looking again —
msgis already owned and not used after thetrap_futcall, so the clone is unnecessary entirely:- let msg = msg.clone(); trap_fut( EType::IO, &bus, handle_document_published_notification(tx, rx, bus.clone(), ids, msg), )crates/evm/tests/integration.rs (1)
100-110: InconsistentArcwrapping ofEthProviderbetween the two tests.In
evm_reader(line 102), the provider is wrapped inArc::new(...), while inensure_historical_events(line 172) it is not. Both are passed toEvmSystemChainBuilder::newby reference. IfArcisn't required by the API, consider removing it fromevm_readerfor consistency; otherwise, add it toensure_historical_events.Also applies to: 170-178
crates/ciphernode-builder/src/event_system.rs (6)
32-55: Debugprintln!statements should usetracing::infofor consistency.Lines 50, 353, and 367 use
println!while the rest of the file usestracing::info. These look like debug leftovers.🔧 Suggested fix
fn get_or_init_store(&self, handle: &BusHandle) -> Result<Addr<SledStore>> { - println!("get_or_init_store in {:?} ...", self.sled_path); + info!("get_or_init_store in {:?} ...", self.sled_path); self.storeSimilarly for lines 353 and 367:
- println!("handle"); + info!("handle");- println!("store()..."); + info!("store()...");
69-84: Stale doc comment: referencesWriteBufferbut implementation usesSnapshotBuffer.Line 76 mentions "WriteBuffer for batching inserts from actors into a snapshot" but the field on line 84 is now
Addr<SnapshotBuffer>. The inline comment on line 83 also says "WriteBuffer for batching…".📝 Suggested fix
-/// - **WriteBuffer** for batching inserts from actors into a snapshot +/// - **SnapshotBuffer** for batching inserts from actors into a snapshot- /// WriteBuffer for batching inserts from actors into a snapshot + /// SnapshotBuffer for batching inserts from actors into a snapshot
317-342: Copy-paste log messages: all three methods log"eventstore_reader..."— should be distinct.
eventstore_router()(line 318),eventstore_getter_seq()(line 327), andeventstore_getter_ts()(line 336) all log"eventstore_reader...". At minimum the first should say"eventstore_router...", and ideally all three are distinguishable for debugging.
375-384: Redundantget_or_init_store+handle()calls in the Persisted branch.
baseis moved intoUpdateDestinationon line 378, thenget_or_init_store(&self.handle()?)is called again on line 381 just to get the sameAddr. Clonebasebefore moving it (same pattern used in the InMem branch on line 372) to avoid the extra calls.🔧 Suggested fix
EventSystemBackend::Persisted(b) => { let base = b.get_or_init_store(&self.handle()?)?; let buffer = self.buffer()?; - buffer.try_send(UpdateDestination::new(base))?; - - DataStore::from_sled_store_with_buffer( - &b.get_or_init_store(&self.handle()?)?, - self.buffer()?, - ) + buffer.try_send(UpdateDestination::new(base.clone()))?; + DataStore::from_sled_store_with_buffer(&base, buffer) }
507-507: Remove commented-out code.Line 507 has
// system.buffer()?.send(Start).await?;— this appears to be a debug artifact from the oldWriteBuffer/Startmessage flow. Remove to keep the test clean.
288-342: Each call toin_mem_eventstore_router()andpersisted_eventstore_router()spawns a new router actor without caching.The three public getters (
eventstore_router,eventstore_getter_seq,eventstore_getter_ts) each delegate to these methods, creating a fresh actor on every invocation. Whileeventstore_addrsproperly caches results viaOnceCell, the router methods do not.Current usage shows single invocation per getter during system initialization (lines 226, 394-395), so this is not a practical issue today. However, consider caching the router
Addrin aOnceCellto prevent accidental multiple invocations if the access patterns change.crates/utils/src/constants.rs (1)
10-12: Incomplete comment on line 10.
// Max messageappears truncated. Consider clarifying, e.g.,// Max mailbox capacity for actor message queues.crates/utils/src/error.rs (1)
10-12: Nit: preferString::fromor.to_string()overformat!with no interpolation args.Suggested fix
- format!("System has crashed. Nothing personal. Goodbye.") + "System has crashed. Nothing personal. Goodbye.".to_string()crates/events/src/enclave_event/enclave_error.rs (1)
104-145: Dispatchers look clean; consider derivingDefault.All three dispatchers are unit structs with trivial
new()constructors. You could deriveDefaultto get the standard trait for free, but this is purely optional.Example for PanicDispatcher (same for the others)
+#[derive(Default)] pub struct PanicDispatcher; - -impl PanicDispatcher { - pub fn new() -> Self { - Self {} - } -}crates/events/src/snapshot_buffer/aggregate_config.rs (3)
24-32: Typo in comment and non-idiomaticNonecheck.Line 27: "AggregatId" → "AggregateId". Also,
if let None = ...is non-idiomatic; prefer theentryAPI which is both cleaner and avoids a double lookup.♻️ Proposed fix
- /// Create a new AggregateConfig with the specified delays - pub fn new(mut delays: HashMap<AggregateId, Duration>) -> Self { - // Always handle AggregatId of 0 with a delay of 0 - if let None = delays.get(&AggregateId::new(0)) { - delays.insert(AggregateId::new(0), Duration::from_micros(0)); - } - Self { delays } + /// Create a new AggregateConfig with the specified delays + pub fn new(mut delays: HashMap<AggregateId, Duration>) -> Self { + // Always handle AggregateId of 0 with a delay of 0 + delays + .entry(AggregateId::new(0)) + .or_insert(Duration::from_micros(0)); + Self { delays }
15-42: Twoimplblocks for the same struct can be merged.There's no trait-related reason to split
AggregateConfigacross twoimplblocks. Merging them improves readability.
11-12:delaysfield ispub, allowing bypass of the constructor invariant.The
new()constructor guaranteesAggregateId(0)exists, but sincedelaysispub, callers can directly mutate the map and remove it. Consider making the field private if you want to preserve that invariant.♻️ Proposed fix
pub struct AggregateConfig { - pub delays: HashMap<AggregateId, Duration>, + delays: HashMap<AggregateId, Duration>, }crates/aggregator/src/committee_finalizer.rs (1)
127-131: Minor formatting nit: missing space after comma.Line 130:
},ec)?;→}, ec)?;Proposed fix
- },ec)?; + }, ec)?;crates/utils/src/formatters.rs (1)
78-88: Consider guarding againsthue_max <= hue_min.If
hue_max - hue_minis zero or negative, line 83 would produce a division-by-zero (NaN) or negative modulus. Currently the only call site uses(30.0, 330.0)which is safe, but adebug_assert!would protect against future misuse.♻️ Proposed guard
fn hash_str_to_ansi_color_in_hue_range(s: &str, hue_min: f32, hue_max: f32) -> u8 { + debug_assert!(hue_max > hue_min, "hue_max must be greater than hue_min"); let hash: u32 = scrates/utils/src/actix/channel.rs (1)
60-67: Usetracing::error!instead ofeprintln!for consistency.The rest of the codebase uses the
tracingcrate for logging. Usingeprintln!here bypasses structured logging and won't appear in log collectors.♻️ Proposed fix
+use tracing::error; + // ... fn handle(&mut self, m: M, _: &mut Context<Self>) -> Self::Result { let s = self.0.clone(); actix::spawn(async move { if let Err(e) = s.send(m).await { - eprintln!("Failed to send message: {}", e); + error!("Failed to send message: {}", e); } }); }crates/events/src/snapshot_buffer/timelock_queue.rs (2)
61-83:OrdandPartialEqare inconsistent forTimelock.
Eq/PartialEqare derived (comparing all fields), butOrd::cmponly considersexpiry. TwoTimelockvalues with the sameexpirybut differentseqwould satisfya.cmp(b) == Equalwhilea != b, violating theOrdtrait contract. WhileBinaryHeaplikely works fine in practice, this is technically unsound.Either derive
Ord/PartialOrdtoo (comparing all fields), or implementPartialEqmanually to matchOrd:Option A: include seq as a tiebreaker in Ord
impl Ord for Timelock { fn cmp(&self, other: &Self) -> Ordering { - self.expiry.cmp(&other.expiry) + self.expiry.cmp(&other.expiry).then_with(|| self.seq.cmp(&other.seq)) } }
163-163: Prefer!self.timelocks.is_empty()overself.timelocks.len() > 0.Idiomatic Rust uses
is_empty()for checking emptiness.Suggested fix
- while self.timelocks.len() > 0 && self.next_timelock_lt(now_time) { + while !self.timelocks.is_empty() && self.next_timelock_lt(now_time) {crates/data/src/repositories.rs (1)
32-34:in_mem()requires a running actix runtime — consider documenting this constraint.
InMemStore::new(false).start()will panic if called outside an actix system context. If this is intended only for tests (where a runtime is typically set up), a brief doc comment or naming convention (e.g.,in_mem_for_test) would make that expectation clear to future callers.crates/net/src/lib.rs (2)
44-44: Hardcoded gossip topic appears intentional but temporary.
"tmp-enclave-gossip-topic"reads like a placeholder. If this is meant to be configurable or derived from network identity, consider adding a TODO or passing it as a parameter.
47-53: Keypair bytes remain in memory after use.After constructing the
ed25519::Keypairon Line 57, the decryptedbytesVec stays in memory until dropped. For key material, consider zeroing the buffer after use (e.g., viazeroize). This is a defense-in-depth measure — not critical, but good hygiene for private keys.crates/events/src/snapshot_buffer/batch.rs (1)
49-61: Minor idiomatic Rust improvements.Two small nits:
- Line 52:
replace(&mut self.inserts, Vec::new())can be simplified tostd::mem::take(&mut self.inserts), which is more idiomatic and avoids the explicitVec::new().- Line 54:
inserts.len() > 0is conventionally written as!inserts.is_empty()(Clippylen_zerolint).Proposed diff
- let inserts = replace(&mut self.inserts, Vec::new()); + let inserts = std::mem::take(&mut self.inserts); trap(EType::IO, &PanicDispatcher::new(), || { - if inserts.len() > 0 { + if !inserts.is_empty() { self.db.try_send(InsertBatch::new(inserts))?; }crates/net/src/net_event_buffer.rs (1)
97-105: Silently dropping lagged events could lose data during sync.On
RecvError::Lagged, the task justcontinues, meaning events that arrived while the receiver fell behind are permanently lost. If the input broadcast channel is producing events faster than the actor can process them, this could silently drop sync-relevant events. Consider logging a warning on lag so it's observable.Proposed fix
Ok(event) => addr.do_send(IncomingNetEvent(event)), - Err(RecvError::Lagged(_)) => continue, + Err(RecvError::Lagged(n)) => { + tracing::warn!("NetEventBuffer: broadcast receiver lagged, dropped {n} events"); + continue; + } Err(RecvError::Closed) => break,crates/data/src/persistable.rs (1)
270-275: Redundant.clone()after.into().
value.into()already produces an ownedEventContext<Sequenced>, making the subsequent.clone()unnecessary.♻️ Proposed fix
fn set_ctx<C>(&mut self, value: C) where C: Into<EventContext<Sequenced>>, { - self.ctx = Some(value.into().clone()) + self.ctx = Some(value.into()) }crates/evm/src/events.rs (1)
66-68: Consider returning a named struct or documenting the tuple order forsplit().
split()returns(EnclaveEventData, u128, u64)which silently dropschain_idandid, and the positional meaning of theu128(ts) vsu64(block) isn't self-documenting. Compare withEnclaveEvent::into_components(atcrates/events/src/enclave_event/mod.rs:296-298) which returns semantically clearer types. This is low-priority since callers are presumably limited to the Sync actor.crates/data/src/data_store.rs (1)
204-224:from_in_memandfrom_sled_storeduplicate the existingFromimpls.These named constructors (lines 204-224) produce the exact same
DataStoreas theFrom<&Addr<InMemStore>>andFrom<&Addr<SledStore>>implementations (lines 227-250). Consider having theFromimpls delegate to the named methods (or vice versa) to avoid the duplication.crates/sortition/src/ciphernode_selector.rs (1)
187-201:contains+positioniterates the committee list twice.Since
contains(line 188) already confirms membership, thepositioncall (line 194) will always returnSome. You could eliminate thecontainscheck and usepositionalone, letting theNonearm handle "not in committee."♻️ Proposed simplification
- // Check if this node is in the finalized committee - if !msg.committee.contains(&self.address) { - info!(node = self.address, "Node not in finalized committee"); - return Ok(()); - } - - // Retrieve E3 metadata from repository - let Some(party_id) = msg.committee.iter().position(|addr| addr == &self.address) - else { - info!( - node = self.address, - "Node address not found in committee list (should not happen)" - ); + let Some(party_id) = msg.committee.iter().position(|addr| addr == &self.address) else { + info!(node = self.address, "Node not in finalized committee"); return Ok(()); };crates/net/src/net_interface.rs (2)
396-396: Verbose debug-style logging with***markers.Lines 396, 415, 434, 452, 463, 651 use
info!with***delimiters. These are clearly temporary debugging aids. Consider downgrading todebug!ortrace!before merging to avoid noisy production logs.
700-719: Correlator key collision risk when storing both Kademlia QueryId and request-response RequestId in the same map.Both Kademlia
QueryId(used for PUT/GET record operations) and request-responseRequestId(used for sync operations) flow through the singleCorrelatormap and are converted toStringkeys viaDisplay. If their representations both produce identical numeric strings (e.g., both display as"42"), a latertrack()orexpire()call from one subsystem could shadow or consume an entry from the other. Prefixing the stringified ID with a subsystem identifier (e.g.,"kad:42"vs"rr:42") eliminates this ambiguity.Example prefix approach
- pub fn track(&mut self, query_id: impl Display, correlation_id: CorrelationId) { - self.inner.insert(format!("{query_id}"), correlation_id); + pub fn track(&mut self, query_id: impl Display, correlation_id: CorrelationId) { + self.inner.insert(format!("kad:{query_id}"), correlation_id); }Alternatively, use separate maps per subsystem or a newtype wrapper.
crates/events/src/eventstore_router.rs (1)
116-201: Near-identical logic inhandle_event_store_query_tsandhandle_event_store_query_seq.These two methods share the same structure: extract parent ID → build sub-queries → handle empty case → spawn aggregator → dispatch. The only differences are the query type parameters (
Ts/TsAggvsSeq/SeqAgg) and the value type (u128vsu64). Consider extracting a generic helper to reduce duplication if more query types are added.crates/net/src/events.rs (2)
290-313: Overly verboseinfo!-level logging incall_and_await_response.Every command send, every received event, every correlation mismatch, and every matcher failure is logged at
info!level. In a production system with many concurrent events, this will produce excessive noise. Consider:
- Line 290–293:
debug!for command sends- Line 300:
trace!for all received events- Lines 306–312:
trace!for correlation mismatches (these will fire for every non-matching event)
328-354:await_event— clean utility, minor nit on error message.Line 352:
anyhow::anyhow!(format!("Timed out waiting for event"))— theformat!()wrapper is unnecessary sinceanyhow!already supports format strings directly.Simplify
- .map_err(|_| anyhow::anyhow!(format!("Timed out waiting for event")))?; + .map_err(|_| anyhow::anyhow!("Timed out waiting for event"))?;crates/net/src/net_sync_manager.rs (1)
238-250: Cloning everyNetEventin the matcher closure is unnecessary.Line 240:
|e| match e.clone()clones every event received on the broadcast channel, not just matching ones. Match on the reference instead and only clone on match.Avoid unnecessary clones
|e| match e.clone() { - NetEvent::OutgoingSyncRequestSucceeded(value) => Some(Ok(value)), - NetEvent::OutgoingSyncRequestFailed(error) => { + |e| match e { + NetEvent::OutgoingSyncRequestSucceeded(value) => Some(Ok(value.clone())), + NetEvent::OutgoingSyncRequestFailed(error) => { Some(Err(anyhow!("Outgoing sync request failed: {:?}", error))) } _ => None, },crates/events/src/snapshot_buffer/snapshot_buffer.rs (1)
46-70: Significant amount of commented-out code and unused parameter.
new(_: bool)ignores itsstart_bufferingparameter, and ~40 lines of commented-out code remain (SnapshotBufferStateenum,EffectsEnabledhandler, state-based routing logic in multiple handlers). This appears to be incomplete state-machine logic that was deferred.If this is intentional WIP, consider adding a TODO comment explaining when the state machine will be re-enabled. If it's abandoned, remove the dead code and the unused parameter.
crates/net/src/net_event_translator.rs (1)
121-140:warn!on every gossip publish is too noisy for normal operation.Line 129 logs at
warn!level for every forwardable event published to gossip. This will produce significant log noise under normal operation. Consider usingdebug!ortrace!instead, reservingwarn!for unexpected conditions.Suggested fix
- warn!("GossipPublish event: {}", msg.event_type()); + debug!("GossipPublish event: {}", msg.event_type());crates/events/src/snapshot_buffer/batch_router.rs (1)
21-21: Local type aliasSeqshadows a crate-level public type.
type Seq = u64here shadowscrate::Seq(used in eventstore.rs). While this works becauseSeqisn't imported from the crate in this file, it may confuse readers navigating between files. Consider renaming to something likeSeqNumor just usingu64directly.crates/sync/src/sync.rs (2)
46-61: EventStore query via oneshot has no timeout — could hang if eventstore is unresponsive.Line 54:
rx.await?will block indefinitely if the eventstore actor never responds (e.g., it panicked or its mailbox is full andtry_sendon line 49 failed silently after being enqueued). Consider wrapping with a timeout similar to the EVM event collection on line 72.
254-388: Large block of commented-out test code.This commented-out test should either be restored (once bugs are fixed) or removed and tracked via an issue. Leaving it in the codebase adds noise.
Would you like me to open an issue to track re-enabling this test?
crates/events/src/bus_handle.rs (1)
236-241: Redundant.clone()onvalue.into().
value.into()already produces an ownedEventContext<Sequenced>, so the trailing.clone()is unnecessary.Suggested fix
fn set_ctx<C>(&mut self, value: C) where C: Into<EventContext<Sequenced>>, { - self.ctx = Some(value.into().clone()); + self.ctx = Some(value.into()); }crates/multithread/src/multithread.rs (1)
179-212: Variablectxshadows Actix context convention — consider renaming toecfor consistency.Throughout the rest of this PR (and this very file on line 154),
ecis the convention forEventContext. Here on line 190,ctxis used instead, which could be confused with the Actix actor context parameter common in handler methods.Proposed fix
- let (msg, ctx) = msg.into_components(); - // We spawn a thread on rayon moving to "sync"-land + let (msg, ec) = msg.into_components(); + // We spawn a thread on rayon moving to "sync"-land ... - Ok(val) => bus.publish(val, ctx)?, + Ok(val) => bus.publish(val, ec)?, ... - bus.publish(e, ctx)? + bus.publish(e, ec)?crates/sortition/src/sortition.rs (2)
334-353: Typo in method name + minor style nits.
get_committe→get_committeeunwrap_or_else(|| Vec::new())→unwrap_or_default()committee.len() == 0→committee.is_empty()Proposed fix
- fn get_committe(&self, e3_id: &E3id) -> Vec<String> { + fn get_committee(&self, e3_id: &E3id) -> Vec<String> { self.finalized_committees .get() .and_then(|committees| committees.get(e3_id).cloned()) - .unwrap_or_else(|| Vec::new()) + .unwrap_or_default() } fn committee_contains(&mut self, e3_id: E3id, node: String) -> bool { - let committee = self.get_committe(&e3_id); + let committee = self.get_committee(&e3_id); - if committee.len() == 0 { + if committee.is_empty() {
680-698: Unnecessaryclone()on bus reference.Line 690 uses
&self.bus.clone()but sincetraponly borrows the bus,&self.bussuffices. Unlike other handlers, there's noecavailable here sowith_eccan't be used — the plain bus reference is correct for error context.Proposed fix
- trap(EType::Sortition, &self.bus.clone(), || { + trap(EType::Sortition, &self.bus, || {crates/events/src/events.rs (1)
175-183:sender()consumesself— intentional but worth noting.
sender(self)takes ownership, meaningid()andquery()must be called beforesender(). This is fine as a deliberate API choice that prevents accidental use-after-move, but callers need to be aware of the ordering constraint.If multiple accesses are needed in the same scope, consider returning
&Recipient<EventStoreQueryResponse>or extracting all fields together (e.g., adestructure()method). Not urgent — the current API is usable.crates/keyshare/src/threshold_keyshare.rs (2)
459-470: Remove commented-out code block.This dead code was replaced by the
EncryptionKeyPendingpublish on lines 471–478. Commented-out code should be cleaned up before merge.Proposed fix
- // let state = self.state.try_get()?; - // self.bus.publish( - // EncryptionKeyCreated { - // e3_id: state.e3_id.clone(), - // key: Arc::new(EncryptionKey { - // party_id: state.party_id, - // pk_bfv: pk_bfv_bytes, - // }), - // external: false, - // }, - // ec, - // )?; self.bus.publish(
1100-1153: Unnecessaryself.bus.clone()—&self.bussuffices.Lines 1107 and 1135 clone the bus only to borrow the result. Since
traptakes&impl ErrorDispatcher, a direct reference works. Same pattern as noted insortition.rs.Proposed fix
- trap(EType::KeyGeneration, &self.bus.clone(), || { + trap(EType::KeyGeneration, &self.bus, || {(Apply to both handlers at lines 1107 and 1135)
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@crates/events/src/eventstore_router.rs`:
- Around line 45-76: QueryAggregator currently waits indefinitely for sub-query
responses in Handler<EventStoreQueryResponse>::handle because self.pending may
never empty; add a timeout guard in the actor so a non-responding EventStore
doesn't leak it. In the QueryAggregator actor (look for impl Actor for
QueryAggregator and the handle method for EventStoreQueryResponse) schedule a
ctx.run_later(deadline, move |act, ctx| {
act.sender.do_send(error_response_or_partial(EventStoreQueryResponse::new(act.parent_id,
std::mem::take(&mut act.collected_events)))); ctx.notify(Die); }) when the query
is started (or when the first sub-query is sent) so that if pending is non-empty
after the deadline it logs the timeout, sends an error/partial
EventStoreQueryResponse to sender, and calls ctx.notify(Die); cancel or avoid
rescheduling if pending becomes empty in handle (i.e., clear or ignore the
timeout if all responses arrived).
🧹 Nitpick comments (3)
crates/events/src/eventstore_router.rs (3)
116-201:handle_event_store_query_tsandhandle_event_store_query_seqare nearly identical.The two methods differ only in the generic type parameter (
TsAgg/TsvsSeqAgg/Seq). If the trait bounds allow it, consider extracting the common logic into a generic helper to reduce duplication.
232-239: Error messages for TsAgg and SeqAgg query failures are identical.Both handlers log
"Failed to route get events after request". Consider differentiating them (e.g., "timestamp query" vs "sequence query") to aid debugging.Suggested diff
fn handle(&mut self, msg: EventStoreQueryBy<SeqAgg>, ctx: &mut Self::Context) -> Self::Result { if let Err(e) = self.handle_event_store_query_seq(msg, ctx) { - error!("Failed to route get events after request: {}", e); + error!("Failed to route sequence query request: {}", e); } }And similarly for the TsAgg handler:
fn handle(&mut self, msg: EventStoreQueryBy<TsAgg>, ctx: &mut Self::Context) -> Self::Result { if let Err(e) = self.handle_event_store_query_ts(msg, ctx) { - error!("Failed to route get events after request: {}", e); + error!("Failed to route timestamp query request: {}", e); } }
40-42: Remove unusedpending_aggregates()method or add annotation if intentionally kept for future use.The method at lines 40-42 is not called anywhere in the codebase. If it's not part of a public API contract, remove it to keep the code clean.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@crates/ciphernode-builder/src/ciphernode_builder.rs`:
- Around line 196-201: The doc comment for the builder method
testmode_start_buffer_immediately is cut off mid-sentence; update the doc
comment to be a complete, clear sentence explaining that this forces
SnapshotBuffer to start immediately instead of waiting for SyncEnded, and why
that is useful (e.g., for tests that don't explicitly trigger SyncEnded or start
buffering). Locate the method named testmode_start_buffer_immediately and
replace the incomplete comment with a full sentence (or two) that describes
behavior, intended use in tests, and any side-effects.
🧹 Nitpick comments (4)
crates/zk-prover/src/actors/proof_request.rs (1)
130-141: Inconsistent:ecis not extracted and theTypedEventwrapper is accessed directly.Every other handler in this file and in
proof_verification.rsdestructures withlet (msg, ec) = msg.into_components()before operating on the inner data. Here,msg.get_err()andmsg.correlation_id()are called directly on theTypedEvent<ComputeRequestError>, presumably viaDeref. This works but is inconsistent with the established pattern.Since the error path doesn't need to republish (so
ecis unused), this is purely a consistency concern, but it would make future changes easier if the pattern is uniform.♻️ Suggested refactor for consistency
fn handle_compute_request_error(&mut self, msg: TypedEvent<ComputeRequestError>) { - let ComputeRequestErrorKind::Zk(err) = msg.get_err() else { + let (msg, _ec) = msg.into_components(); + let ComputeRequestErrorKind::Zk(err) = msg.get_err() else { return; }; - if let Some(pending) = self.pending.remove(msg.correlation_id()) { + if let Some(pending) = self.pending.remove(msg.correlation_id()) {crates/zk-prover/src/actors/proof_verification.rs (1)
220-225: Unnecessaryec.clone()in the success branch.Since the
ifandelsebranches are mutually exclusive, the success branch is the last (and only) consumer ofechere. You can passecdirectly instead ofec.clone().♻️ Minor optimization
- self.publish_key_created(msg.e3_id, msg.key, ec.clone()); + self.publish_key_created(msg.e3_id, msg.key, ec);crates/events/src/enclave_event/mod.rs (2)
588-598: Display formatting uses Debug output — can be very verbose.
colorize_event_ids(self)formats viaDebug, which forEnclaveEventincludes the full payload. In high-throughput logging, this could produce very large log lines. If this is intentional for debugging/development, it's fine, but consider a more compact representation for production use.
297-303: Consider deprecatingsplit()in favor ofinto_components().
split()discards all context exceptts, whileinto_components()preserves the fullEventContext<S>. The method has three active callers in the codebase (integration tests,crates/net/src/events.rs, andcrates/events/src/bus_handle.rs), all of which could migrate tointo_components()with minimal overhead.
hmzakhalid
left a comment
There was a problem hiding this comment.
Thanks a lot this Rudi, I went through this yesterday and followed the sync event flow, everything seems good over all. Left a few comments.
Will take another look later today to see if I find something.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/ciphernode-builder/src/event_system.rs (1)
317-342:⚠️ Potential issue | 🟠 MajorEach call to
eventstore_router()/eventstore_getter_seq()/eventstore_getter_ts()spawns a newEventStoreRouteractor.Unlike other components (
sequencer,buffer,eventbus) which are cached inOnceCell, every invocation of these three methods callsin_mem_eventstore_router()orpersisted_eventstore_router(), each of which creates and.start()s a new router actor. If called more than once, this results in orphaned router actors.Additionally, all three methods log
"eventstore_reader..."— a copy-paste artifact that makes debugging harder.#!/bin/bash # Check how many times these methods are called rg -n 'eventstore_router\(\)|eventstore_getter_seq\(\)|eventstore_getter_ts\(\)' --type rust -C1
🤖 Fix all issues with AI agents
In `@crates/events/src/sequencer.rs`:
- Around line 61-65: The panic message inside Sequencer where
handle_store_event_response(msg) returns Err still mentions "snapshot_buffer or
bus."; update that panic call to remove the stale "snapshot_buffer" reference so
it only says the event could not be sent to the bus (e.g., change the
major_issue message passed to panic in the error branch of
Sequencer::handle_store_event_response to reference only the bus). Ensure you
adjust the string passed to major_issue and keep the error variable e included.
In `@crates/events/src/snapshot_buffer/snapshot_buffer.rs`:
- Around line 54-60: SnapshotBuffer::new currently drops the start_buffering
boolean so EventSystem::start_buffer_immediately has no effect; modify
SnapshotBuffer (add a field like start_buffering: bool) and have
SnapshotBuffer::new(start_buffering: bool) store it, then use that flag when
initializing or starting the router/tickable (e.g., pass it into BatchRouter
creation or call the router start method immediately when start_buffering is
true) so the buffer begins processing immediately when requested; update any
call sites (spawn()/with_clock()) that pass the flag to match the new
constructor signature.
🧹 Nitpick comments (7)
crates/events/src/snapshot_buffer/snapshot_buffer.rs (2)
149-159: Unusedctxparameter will trigger a compiler warning.All other handlers use
_for the unused context parameter, but this one names itctx.- fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result {
215-217: Preferstd::mem::takeoverreplace(..., Vec::new()).
takeis the idiomatic shorthand for replacing with default.- replace(&mut self.evts, Vec::new()) + std::mem::take(&mut self.evts)crates/ciphernode-builder/src/event_system.rs (3)
49-55: Replaceprintln!withtracingmacros for consistency.Lines 50, 353, and 367 use
println!while the rest of the file usestracing::info!.println!bypasses structured logging, won't appear in log collectors, and can't be filtered by level.- println!("get_or_init_store in {:?} ...", self.sled_path); + info!("get_or_init_store in {:?} ...", self.sled_path);- println!("handle"); + info!("handle");- println!("store()..."); + info!("store()...");Also applies to: 353-353, 367-367
375-384:get_or_init_storeis called twice in the Persisted branch.Lines 376 and 381 both call
b.get_or_init_store(&self.handle()?). WhileOnceCellmakes the second call cheap, this is redundant and harder to read. Reuse thebasevariable from line 376.EventSystemBackend::Persisted(b) => { let base = b.get_or_init_store(&self.handle()?)?; let buffer = self.buffer()?; buffer.try_send(UpdateDestination::new(base))?; DataStore::from_sled_store_with_buffer( - &b.get_or_init_store(&self.handle()?)?, + &b.get_or_init_store(&self.handle()?)?, // base was moved above self.buffer()?, )Note:
basewas moved intoUpdateDestination::new(base)on line 378, so it can't be reused as-is. Either clonebasebefore the move or restructure:Suggested fix
EventSystemBackend::Persisted(b) => { let base = b.get_or_init_store(&self.handle()?)?; let buffer = self.buffer()?; - buffer.try_send(UpdateDestination::new(base))?; - - DataStore::from_sled_store_with_buffer( - &b.get_or_init_store(&self.handle()?)?, - self.buffer()?, - ) + buffer.try_send(UpdateDestination::new(base.clone()))?; + DataStore::from_sled_store_with_buffer(&base, buffer) }This also avoids the second call to
self.buffer()andself.handle().
369-374:self.buffer()?called twice in InMem branch.
bufferis already bound on line 371 but line 373 callsself.buffer()?again.EventSystemBackend::InMem(b) => { let base = b.get_or_init_store(); let buffer = self.buffer()?; buffer.try_send(UpdateDestination::new(base.clone()))?; - DataStore::from_in_mem_with_buffer(&base, self.buffer()?) + DataStore::from_in_mem_with_buffer(&base, buffer) }crates/sync/src/sync.rs (2)
25-114: Sync orchestration flow looks solid overall.The step-by-step approach with logging at each phase is clear and well-structured. A couple of minor observations:
- Line 32: typo
"snapsshot"→"snapshot"in the comment.- Line 105:
bus.naked_dispatch(event)is fire-and-forget with no error propagation. If dispatch can fail silently, historical events could be lost without notice. Consider whether this should return/propagate errors like the othertry_sendcalls above.
221-229:to_sequence_map: consider using.collect()instead of manual fold.Minor readability nit —
.map(|s| (s.aggregate_id, s.seq)).collect()achieves the same result more concisely.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@crates/ciphernode-builder/src/event_system.rs`:
- Line 304: The three misleading info! logs all read "eventstore_reader..." —
update the log messages in the functions eventstore_router(),
eventstore_getter_seq(), and eventstore_getter_ts() so each reflects its actual
role (e.g., "eventstore_router...", "eventstore_getter_seq...",
"eventstore_getter_ts...") using the existing info! macro calls; locate the
info! invocation inside each named function and change the string literal to a
clear, unique message for that function.
- Around line 49-50: Replace leftover println!/print! debug artifacts in
get_or_init_store (and the other occurrences at lines referenced) with the
crate's tracing macros: use trace! or debug! (choose trace! for very verbose
internals, debug! for higher-level diagnostics) and include contextual data like
self.sled_path and any relevant variables in the macro call; if the output was
purely temporary and not needed, remove the statement instead. Locate the calls
in the get_or_init_store function and the two other spots (previously using
print!/println!) and convert them to tracing::{trace!, debug!} with structured
fields (e.g., trace!("get_or_init_store", sled_path = ?self.sled_path)) or drop
them if redundant.
- Around line 200-206: get_enclave_bus_handle() currently calls handle() but
never calls store(), leaving SnapshotBuffer wired to NoopBatchReceiver and
dropping InsertBatch messages; update get_enclave_bus_handle() to enforce the
required initialization order by invoking the store() path after obtaining the
handle (e.g., call self.store(...) or otherwise send the UpdateDestination to
SnapshotBuffer) so the buffer's UpdateDestination is set before returning the
handle, and reference the existing methods get_enclave_bus_handle(), handle(),
store(), buffer(), and SnapshotBuffer/UpdateDestination when making the change.
🧹 Nitpick comments (6)
crates/events/src/snapshot_buffer/snapshot_buffer.rs (3)
141-151: Unusedctxparameter.
ctxis bound but unused inHandler<EnclaveEvent>. Prefix with_to match the convention used in all other handlers in this file.Proposed fix
- fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result {
94-103: Silent message dropping when dependencies areNone.All forwarding handlers (
FlushSeq,StartTimelock,Insert,EnclaveEvent,Tick,UpdateDestination) silently discard messages if the corresponding dependency hasn't been set yet. While the initialization window is extremely narrow (SetDependencies is the first message), consider adding awarn!ordebug!log when a message is dropped due to a missing dependency. This would aid debugging if the wiring order ever changes orSetDependenciesdelivery fails.
205-210: Preferstd::mem::takeoverreplacewith a new emptyVec.
std::mem::takeis the idiomatic shorthand here.Proposed fix
- use std::mem::replace; ... - replace(&mut self.evts, Vec::new()) + std::mem::take(&mut self.evts)crates/ciphernode-builder/src/event_system.rs (3)
312-328:eventstore_getter_seqandeventstore_getter_tsare near-identical.The two methods differ only in the generic parameter (
SeqAggvsTsAgg). If more aggregation types are added, this pattern will proliferate. A generic helper could reduce duplication:Possible refactor sketch
// Where M is the query message type and EventStoreRouter impls the right Actix Handler fn eventstore_getter<M>(&self) -> Result<Recipient<M>> where M: actix::Message + Send + 'static, M::Result: Send, EventStoreRouter<InMemSequenceIndex, InMemEventLog>: Handler<M>, EventStoreRouter<SledSequenceIndex, CommitLogEventLog>: Handler<M>, { let eventstores = self.eventstore_addrs()?; match &eventstores { EventStoreAddrs::InMem(_) => Ok(self.in_mem_eventstore_router()?.recipient()), EventStoreAddrs::Persisted(_) => Ok(self.persisted_eventstore_router()?.recipient()), } }This becomes more valuable once the router-caching issue above is addressed.
354-374: Redundantself.buffer()andb.get_or_init_store()calls instore().In the
InMembranch,self.buffer()is called on line 357 and again on line 359. In thePersistedbranch,b.get_or_init_store(&self.handle()?)is called on line 362 and again on line 367. While idempotent viaOnceCell, this is unnecessarily verbose and harder to follow.Proposed fix for the Persisted branch
EventSystemBackend::Persisted(b) => { - let base = b.get_or_init_store(&self.handle()?)?; + let handle = self.handle()?; + let base = b.get_or_init_store(&handle)?; let buffer = self.buffer()?; buffer.try_send(UpdateDestination::new(base.clone()))?; - DataStore::from_sled_store_with_buffer( - &b.get_or_init_store(&self.handle()?)?, - self.buffer()?, - ) + DataStore::from_sled_store_with_buffer(&base, buffer) }
302-328: Cache router instances to avoid creating multiple stateless proxies at initialization.
in_mem_eventstore_router()andpersisted_eventstore_router()each spawn a newEventStoreRouteractor without caching. Sinceeventstore_router(),eventstore_getter_seq(), andeventstore_getter_ts()each call one of these methods during system initialization, you end up with three separate stateless router instances. While the underlyingEventStoreactors are cached viaOnceCell, the routers should be cached similarly to avoid unnecessary proxy actor creation.Also remove debug
println!statements on lines 50, 339, and 353 (use logging macros instead), and fix misleading log messages that say "eventstore_reader..." on methods that are routers/getters.
hmzakhalid
left a comment
There was a problem hiding this comment.
Thanks for all the fixes
Progresses #1050
This does not fetch libp2p events from nodes. Infra is setup but there are some issues that need further work. Currently this is disabled in order to integrate.
Done
trapandtrap_futGetEventsAfterResponse→EventStoreQueryResponse,EventStoreQueryBy<SeqAgg>,EventStoreQueryBy<TimeAgg>)netandevmboth use event buffering untilSyncEndEnableEffectsevent (evm writers, net commands, multithread, persistable/snapshotbuffer)Example of color coded events:
To do as a follow up PR
Summary by CodeRabbit