feat: sync mode preparation [skip-line-limit]#1153
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
📝 WalkthroughWalkthroughThis pull request introduces major infrastructure improvements for event synchronization, EVM integration, and data persistence across the ciphernode system. It creates a new Changes
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly Related PRs
Suggested Labels
Suggested Reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Had a chat with folks today about attempting to merge this PR as is and complete the Sync step in a separate PR. |
|
I am thinking it would be good to connect any feedback to a list of things to do on #1050 - I can then work through that list in the next PR instead of make changes here. |
There was a problem hiding this comment.
Actionable comments posted: 15
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
crates/keyshare/src/threshold_keyshare.rs (1)
410-424: Fix move/borrow conflict in response dispatch.Line 411 borrows
msgfor the match and then movesmsginto handler calls, which will fail borrow checking. Extract or clone the response before dispatch.🔧 Proposed fix
pub fn handle_compute_response(&mut self, msg: TypedEvent<ComputeResponse>) -> Result<()> { - match &msg.response { + let response = msg.response.clone(); + match response { TrBFVResponse::GenEsiSss(_) => self.handle_gen_esi_sss_response(msg), TrBFVResponse::GenPkShareAndSkSss(_) => { self.handle_gen_pk_share_and_sk_sss_response(msg) } TrBFVResponse::CalculateDecryptionKey(_) => { self.handle_calculate_decryption_key_response(msg) } TrBFVResponse::CalculateDecryptionShare(_) => { self.handle_calculate_decryption_share_response(msg) } _ => Ok(()), } }crates/entrypoint/src/start/start.rs (1)
9-19: Remove unused imports.The following imports are not used in the function and should be removed:
RepositoriesFactory(line 12)BusHandle(line 13)NetEventTranslator,NetRepositoryFactory(line 14)JoinHandle(line 18)crates/events/src/enclave_event/mod.rs (2)
372-389: Error events should inheritaggregate_idfrom their cause.Even when
caused_byis present,aggregate_idis forced to0, which routes error events to the default aggregate. That can break per-aggregate sequencing (and commitlog grouping) for errors tied to a specific aggregate.🔧 Suggested fix
- let aggregate_id = AggregateId::new(0); // Error events use default aggregate_id + let aggregate_id = caused_by + .as_ref() + .map(|cause| cause.aggregate_id()) + .unwrap_or_else(|| AggregateId::new(0)); // Fallback for uncaused errors
405-438: AddComputeRequestandComputeRequestErrorto theget_e3_idmatch statement.Both variants carry
e3_idinformation but aren't mapped inget_e3_id, causing them to incorrectly default to aggregate ID 0, which breaks per-aggregate event ordering.
🤖 Fix all issues with AI agents
In `@crates/ciphernode-builder/src/ciphernode_builder.rs`:
- Around line 293-306: create_aggregate_config currently iterates self.chains
and calls provider_cache.ensure_read_provider for every chain, which attempts to
validate providers for disabled chains; change it to only process enabled chains
(the same enabled filter used in setup_evm_system) before calling
ensure_read_provider so disabled chains are skipped, i.e., build chain_providers
from self.chains.iter().filter(|c| c.enabled()) (or the project's equivalent
predicate) and push only those (chain.clone(), provider.chain_id()) entries,
then compute delays and return AggregateConfig as before.
In `@crates/ciphernode-builder/src/ciphernode.rs`:
- Around line 13-16: Doc still claims Clone support for CiphernodeHandle but the
struct no longer derives Clone; either reintroduce Clone for CiphernodeHandle by
adding #[derive(Clone, Debug)] if cloneable behavior is intended, or update the
doc comment above struct CiphernodeHandle to remove the sentence "but they
cannot await the task" about clones and any mention that clones are available
(rephrase to "The handle is sharable across tasks via shared references" or
similar) so the docs reflect the actual implementation.
- Around line 67-69: The split(self) method consumes the Ciphernode and returns
(BusHandle, JoinHandle<Result<()>>), which drops the only Addr references to the
history and errors actors (stopping them); update the API or docs to make this
explicit and/or provide a non-consuming alternative: add a doc comment on
Ciphernode::split explaining that calling split will drop the last actor Addrs
(history, errors) and stop those actors, and if callers need to keep those
actors expose their handles instead (e.g. provide an additional method like
split_preserve or a method that returns the BusHandle plus cloned Addr handles
for history/errors) so callers can retain actor lifetimes; reference the split
function, BusHandle, JoinHandle<Result<()>>, and the history/errors actor Addrs
in the doc or alternative API.
In `@crates/config/src/rpc.rs`:
- Around line 77-90: The host_with_port() helper currently formats
"{hostname}:{port}" which breaks IPv6 addresses (e.g., "::1:8545"); update
host_with_port() to detect IPv6 by checking hostname returned from hostname()
for ':' (and not already wrapped) and wrap it in square brackets before
appending the port so IPv6 becomes "[::1]:8545"; keep using the existing
hostname(), port() accessors and ensure you only add brackets when necessary to
avoid double-bracketing.
In `@crates/data/src/data_store.rs`:
- Around line 122-125: The docstring for scope_bytes is wrong: it says "Get a
clone of the scope bytes" but the function signature pub fn scope_bytes(&self)
-> &[u8] returns a borrow; either update the comment to state it returns a
borrowed slice or change the API to return an owned Vec<u8> by changing the
signature to pub fn scope_bytes(&self) -> Vec<u8> and returning
self.scope.clone(); update any call sites accordingly. Ensure to reference the
scope_bytes method and the self.scope field when making the fix.
In `@crates/events/src/eventstore_router.rs`:
- Around line 31-46: The current handle_store_event_requested uses
unwrap_or_else with expect on self.stores.get(&AggregateId::new(0)) which can
panic if the default EventStore is not configured and also always returns
Ok(()), making Result<()> misleading; fix by validating the invariant in the
event router constructor (e.g., Router::new or the type that builds self.stores)
to ensure self.stores contains AggregateId::new(0) and return a construction
error if missing, or modify handle_store_event_requested to return an Err when
neither the specific aggregate_id nor AggregateId::new(0) exist (replace the
unwrap_or_else/expect with a match that returns Err on missing default), and
update callers/tests as needed to handle the error path instead of
unconditionally returning Ok(()).
In `@crates/evm/src/enclave_sol.rs`:
- Around line 20-34: The attach function can leave the reader actor running if
EnclaveSolWriter::attach fails; after creating the reader via
EnclaveSolReader::setup, ensure you clean it up on writer attach failure by
stopping or terminating the reader before returning the error (or alternatively
start the writer before the reader if writer has no dependency on the reader).
Concretely, update the attach function: call EnclaveSolWriter::attach only after
ensuring writer prerequisites, or if you keep the current order, catch the error
from EnclaveSolWriter::attach and call the reader actor shutdown/stop method on
the Addr returned by EnclaveSolReader::setup (the Addr<EvmParser> named addr)
before propagating the error.
In `@crates/evm/src/evm_chain_gateway.rs`:
- Around line 132-143: In handle_evm_event replace the panic on unexpected
EnclaveEvmEvent variants with returning an error via bail! so the error is
propagated through the function's Result and handled by the existing trap()
wrapper; specifically update the match arm in handle_evm_event (which currently
panics) to call bail! with a descriptive message that includes the unexpected
variant (or its debug display) so process_evm_event and
forward_historical_sync_complete remain unchanged and the caller receives a
proper error instead of crashing.
In `@crates/evm/src/evm_router.rs`:
- Around line 13-69: Docstring mismatch: the EvmRouter comment states non-log
events are dropped but Handler<EnclaveEvmEvent> actually forwards non-log events
to the fallback (set via add_fallback / field fallback). Fix by updating the
top-level doc comment for struct EvmRouter (the /// comment above pub struct
EvmRouter) to state that non-Log events are forwarded to the optional fallback
EvmEventProcessor when present (and dropped only if fallback is None);
alternatively, if you prefer dropping, change the
Handler<EnclaveEvmEvent>::handle branch to not call self.fallback and instead
drop the message—make the change consistently for add_fallback, fallback, and
the handler so behavior and documentation align.
In `@crates/evm/src/lib.rs`:
- Line 20: The module declaration and filename have a typo: change the module
name used in the source from one_shot_runnner to one_shot_runner and rename the
file one_shot_runnner.rs to one_shot_runner.rs; update the module declaration
line (currently "mod one_shot_runnner;") to "mod one_shot_runner;" so the
declaration (mod one_shot_runnner) and the file name match the corrected
spelling.
In `@crates/evm/src/one_shot_runnner.rs`:
- Around line 1-6: The file is misspelled as one_shot_runnner.rs (three "n"s);
rename it to one_shot_runner.rs and update any module declarations, use/imports,
and references to match the corrected module name (e.g., change occurrences of
one_shot_runnner to one_shot_runner in mod declarations, use statements, and
tests), and update any Cargo or workspace references that mention the old
filename so builds and imports continue to resolve correctly.
In `@crates/net/src/net_interface.rs`:
- Around line 608-621: In handle_sync_response, don't ignore errors from
swarm.behaviour_mut().sync.send_response(channel, value): capture the Err(res)
and call warn! with a descriptive message and the error details (e.g.
warn!("failed to send sync response for channel {:?}: {:?}", channel, res) or
similar) so failures are observable; keep the existing try_take() usage and
ensure the warn! call references the channel / response context and the error
returned by send_response.
In `@crates/net/src/net_sync_manager.rs`:
- Around line 61-77: The current forwarder loop uses "while let Ok(event) =
events.recv().await" which silently exits on both RecvError::Lagged and
RecvError::Closed; change it to an explicit loop that matches
events.recv().await and on Ok(event) continues to match NetEvent variants
(NetEvent::OutgoingSyncRequestSucceeded, NetEvent::SyncRequestReceived) and call
addr.do_send(value), on Err(tokio::sync::broadcast::error::RecvError::Lagged(_))
continue (log debug if desired), and on Err(RecvError::Closed) break to stop the
loop; update the block where events is created from rx.resubscribe() and the
async move spawn to use this match-based handling so lagging receivers don't
terminate the forwarder.
- Around line 148-167: The handler for ReceiveEvents in NetSyncManager currently
uses self.requests.get(&msg.id()), leaving entries in the requests map and
causing unbounded growth; change the lookup to remove the entry (use
self.requests.remove(&msg.id())) so the request is deleted once handled, but
still clone the channel for sending (channel.to_owned()) before/after removal as
needed to preserve the OnceTake value, and then proceed to send the
NetCommand::SyncResponse via self.tx.try_send as before.
In `@crates/sync/src/sync.rs`:
- Around line 59-70: handle_sync_end currently sorts only by timestamp via
self.evm_buffer.sort_by_key(|i| i.ts()), which is unstable when timestamps
collide; update the sort key on evm_buffer in handle_sync_end to include
deterministic tie-breaker fields (for example use a tuple of i.ts(),
i.chain_id(), and i.block) so events with identical ts() are consistently
ordered across runs and nodes before draining and publishing.
🧹 Nitpick comments (32)
crates/multithread/src/multithread.rs (1)
36-36: Import added for future sync migration.This
NotifySyncimport is not currently used within this file. Based on the PR objectives, this appears to be scaffolding for the upcoming sync implementation wherectx.notify()calls may be migrated tonotify_sync(). The existingctx.notify(data.clone())on line 103 is a likely candidate for this migration.If this is indeed preparatory, consider completing the migration in this file or removing the import until it's needed to keep the diff minimal.
crates/utils/src/helpers.rs (1)
48-51: Consider handling mutex poisoning for robustness.The
.unwrap()on the mutex lock will panic if the mutex is poisoned (a thread panicked while holding it). While this is a common Rust idiom since poisoning typically indicates a bug, in a sync/distributed system context you may want more graceful handling.♻️ Optional: Handle poisoned mutex gracefully
/// Takes the item, returning `None` if already taken. pub fn take(&self) -> Option<T> { - self.0.lock().unwrap().take() + self.0.lock().ok()?.take() }This would return
Noneon poisoning rather than panicking, treating it the same as "already taken". Alternatively, keep the current behavior if panicking on poisoning is preferred.crates/data/src/write_buffer.rs (2)
26-29: Minor: Typo and non-idiomatic pattern.Line 26 has a typo: "AggregatId" should be "AggregateId". Also,
if let None = ...is non-idiomatic; preferis_none().Suggested fix
- // Always handle AggregatId of 0 with a delay of 0 - if let None = delays.get(&AggregateId::new(0)) { + // Always handle AggregateId of 0 with a delay of 0 + if delays.get(&AggregateId::new(0)).is_none() { delays.insert(AggregateId::new(0), 0); }
80-92: Unnecessary clone of owned value.
msgis already owned (moved into the function). The.clone()on line 91 is unnecessary sincemsgcan be consumed directly.Suggested fix
fn handle_insert(&mut self, msg: Insert) { let aggregate_id = if let Some(event_ctx) = msg.ctx() { - event_ctx.aggregate_id().clone() + event_ctx.aggregate_id() } else { AggregateId::new(0) }; let agg_buffer = self .aggregate_buffers .entry(aggregate_id) .or_insert_with(|| AggregateBuffer::new()); - agg_buffer.buffer.push(msg.clone()); + agg_buffer.buffer.push(msg); }crates/utils/src/path.rs (3)
15-15: Consider accepting&Pathinstead of&PathBuffor broader compatibility.Using
&Pathis more idiomatic in Rust sincePathBufderefs toPath, and this allows callers to pass either type without explicit conversion.Proposed signature change
-pub fn enumerate_path(path: &PathBuf, index: usize) -> PathBuf { +pub fn enumerate_path(path: &Path, index: usize) -> PathBuf {You'll also need to update the import:
-use std::path::PathBuf; +use std::path::{Path, PathBuf};
16-61: Opportunity to reduce duplication with a helper function.The filename enumeration logic is duplicated between the parent and no-parent branches (lines 18-33 vs 40-56). Consider extracting a helper.
Refactored implementation
+fn enumerate_filename(file_name: &std::ffi::OsStr, index: usize) -> String { + if let Some(file_name_str) = file_name.to_str() { + if let Some(dot_pos) = file_name_str.rfind('.') { + let (stem, extension) = file_name_str.split_at(dot_pos); + format!("{}.{}{}", stem, index, extension) + } else { + format!("{}.{}", file_name_str, index) + } + } else { + format!("{}.{}", file_name.to_string_lossy(), index) + } +} + pub fn enumerate_path(path: &PathBuf, index: usize) -> PathBuf { - if let Some(parent) = path.parent() { - if let Some(file_name) = path.file_name() { - if let Some(file_name_str) = file_name.to_str() { - if let Some(dot_pos) = file_name_str.rfind('.') { - // Has extension - let (stem, extension) = file_name_str.split_at(dot_pos); - let new_name = format!("{}.{}{}", stem, index, extension); - parent.join(new_name) - } else { - // No extension - let new_name = format!("{}.{}", file_name_str, index); - parent.join(new_name) - } - } else { - // Invalid UTF-8 in filename, append index directly - let new_name = format!("{}.{}", file_name.to_string_lossy(), index); - parent.join(new_name) - } - } else { - // Path ends with '/', just append index - path.join(format!("{}", index)) - } - } else { - // No parent, just modify the filename directly - if let Some(file_name) = path.file_name() { - if let Some(file_name_str) = file_name.to_str() { - if let Some(dot_pos) = file_name_str.rfind('.') { - // Has extension - let (stem, extension) = file_name_str.split_at(dot_pos); - let new_name = format!("{}.{}{}", stem, index, extension); - PathBuf::from(new_name) - } else { - // No extension - let new_name = format!("{}.{}", file_name_str, index); - PathBuf::from(new_name) - } - } else { - // Invalid UTF-8 in filename, append index directly - let new_name = format!("{}.{}", file_name.to_string_lossy(), index); - PathBuf::from(new_name) - } - } else { - // Empty path, just return the index as a path - PathBuf::from(format!("{}", index)) - } - } + match (path.parent(), path.file_name()) { + (Some(parent), Some(file_name)) if !parent.as_os_str().is_empty() => { + parent.join(enumerate_filename(file_name, index)) + } + (_, Some(file_name)) => PathBuf::from(enumerate_filename(file_name, index)), + _ => PathBuf::from(format!("{}", index)), + } }
64-95: Tests look good, consider adding trailing slash case.The test coverage is solid for common cases. The trailing slash handling (line 36) is implemented but untested.
Optional additional test
#[test] fn test_enumerate_path_trailing_slash() { let path = PathBuf::from("/foo/bar/"); let result = enumerate_path(&path, 3); assert_eq!(result, PathBuf::from("/foo/bar/3")); }crates/events/src/enclave_event/sync_end.rs (1)
14-20: Consider implementingDefaultfor the unit struct.Since
SyncEndis a unit struct andnew()takes no arguments, implementingDefaultwould be more idiomatic and enable use with..Default::default()patterns.♻️ Proposed change
#[derive(Message, Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] #[rtype(result = "()")] -pub struct SyncEnd; +pub struct SyncEnd; -impl SyncEnd { - pub fn new() -> Self { - Self {} - } -} +impl Default for SyncEnd { + fn default() -> Self { + Self + } +} + +impl SyncEnd { + pub fn new() -> Self { + Self::default() + } +}crates/events/src/enclave_event/outgoing_sync_requested.rs (1)
15-18: Address TODO: Clarify the EVM sync trigger requirement.The TODO indicates this event should also trigger EVM sync. Consider documenting the intended behavior or creating a tracking issue for the follow-up work.
Would you like me to open a new issue to track implementing the EVM sync trigger functionality?
crates/config/src/contract.rs (1)
35-41: Minor: Unnecessary.clone()onCopytype.
Option<u64>implementsCopy, so the.clone()on line 38 is redundant.Suggested fix
pub fn deploy_block(&self) -> Option<u64> { use Contract::*; match self { - Full { deploy_block, .. } => deploy_block.clone(), + Full { deploy_block, .. } => *deploy_block, AddressOnly(_) => None, } }crates/sync/src/sync.rs (1)
138-140: Consider shortening the test settle delay.A shorter sleep reduces test time while remaining reliable.
Based on learnings: In the Rust test function `test_logs` in `packages/ciphernode/evm/tests/evm_reader.rs`, a sleep duration of 1 millisecond is sufficient for reliable event processing, even in CI environments.♻️ Suggested tweak
async fn settle() { - sleep(Duration::from_millis(100)).await; + sleep(Duration::from_millis(1)).await; }crates/ciphernode-builder/src/event_system.rs (3)
46-50: Consider adding documentation to the new public enum.
EventStoreAddrsis a public enum that callers may need to match against. Brief documentation explaining its purpose and when each variant is used would help consumers.📝 Suggested documentation
+/// Addresses for per-aggregate EventStores, differentiated by backend type. #[derive(Clone)] pub enum EventStoreAddrs { + /// In-memory event stores keyed by aggregate index. InMem(HashMap<usize, Addr<EventStore<InMemSequenceIndex, InMemEventLog>>>), + /// Persistent (sled-backed) event stores keyed by aggregate index. Persisted(HashMap<usize, Addr<EventStore<SledSequenceIndex, CommitLogEventLog>>>), }
266-299: Router instances are not cached—new router created on each call.
in_mem_eventstore_router()andpersisted_eventstore_router()create and start a newEventStoreRouteractor on every invocation. This could lead to multiple router actors if called repeatedly. Consider caching the router address similar to howeventstore_addrsis cached.♻️ Potential caching approach
Add a
OnceCellfor each router type inEventSystem:eventstore_router_inmem: OnceCell<Addr<EventStoreRouter<InMemSequenceIndex, InMemEventLog>>>, eventstore_router_persisted: OnceCell<Addr<EventStoreRouter<SledSequenceIndex, CommitLogEventLog>>>,Then use
get_or_try_initin the router methods to ensure only one router is created per backend.
552-593: Test validates multiple in-memory aggregates but only single persisted aggregate.The test creates an
AggregateConfigwith 3 aggregates for in-memory, but then creates a new persisted system with an empty config (AggregateConfig::new(HashMap::new())). This tests different configurations, which is fine, but consider adding a test case that validates multiple persisted aggregates as well to ensure theenumerate_pathlogic works correctly across multiple sled trees.Cargo.toml (1)
56-66: Ensure release automation has signing keys available.With commit/tag signing enabled, any automated release flow will fail unless signing keys are configured in the release environment.
crates/evm/src/evm_parser.rs (1)
66-74: Avoid silent truncation when mappingchain_idinto au32.If
chain_idever exceedsu32::MAX, the current cast will silently wrap and could cause node collisions.🔧 Safer conversion
- let node = chain_id as u32; + let node = u32::try_from(chain_id) + .expect("chain_id must fit into u32 for HLC node id");crates/aggregator/src/publickey_aggregator.rs (1)
144-151: Redundant.clone()on&self.bus.
&self.bus.clone()creates a clone and then immediately borrows it. IfBusHandleis alreadyCloneand thetrapfunction takes a reference, you can simplify to&self.bus.Suggested simplification
- trap(EType::KeyGeneration, &self.bus.clone(), || { + trap(EType::KeyGeneration, &self.bus, || {crates/config/src/chain_config.rs (1)
57-57: Consider adding a brief comment for the min computation.The
[lowest_block, deploy_block].into_iter().flatten().min()pattern is compact but may be non-obvious to future readers. A brief inline comment would improve clarity.Suggested comment
+ // Track the minimum deploy_block across all contracts lowest_block = [lowest_block, deploy_block].into_iter().flatten().min();crates/evm/src/enclave_sol_reader.rs (1)
140-143: Consider keeping a thinattachwrapper for compatibility.If downstream code still references
EnclaveSolReader::attach, removing it can be a breaking change. Consider reintroducingattachas a simple wrapper aroundsetup(and deprecate later if needed). Based on learnings, theEnclaveSolReader::attachmethod should be retained even if it directly calls the underlying reader.♻️ Suggested compatibility shim
impl EnclaveSolReader { pub fn setup(next: &EvmEventProcessor) -> Addr<EvmParser> { EvmParser::new(next, extractor).start() } + + pub fn attach(next: &EvmEventProcessor) -> Addr<EvmParser> { + Self::setup(next) + } }crates/events/src/event_context.rs (1)
84-90: Consider borrowing the cause context to avoid moves.
from_causeonly reads the cause; taking&EventContext<Sequenced>avoids unnecessary moves and makes call sites more flexible.♻️ Possible adjustment
pub fn from_cause( id: EventId, - cause: EventContext<Sequenced>, + cause: &EventContext<Sequenced>, ts: u128, aggregate_id: AggregateId, ) -> Self { EventContext::new(id, cause.id(), cause.origin_id(), ts, aggregate_id) }- let two = - EventContext::from_cause(EventId::hash(2), one, 2, AggregateId::new(1)).sequence(2); + let two = + EventContext::from_cause(EventId::hash(2), &one, 2, AggregateId::new(1)).sequence(2); ... - let three = - EventContext::from_cause(EventId::hash(3), two, 3, AggregateId::new(1)).sequence(3); + let three = + EventContext::from_cause(EventId::hash(3), &two, 3, AggregateId::new(1)).sequence(3);Also applies to: 154-160
crates/evm/src/evm_hub.rs (2)
30-39: Avoid cloning the processor list on every event.
Cloningself.nextsallocates per message; iterating by reference keeps the same behavior with less churn.Suggested change
- for next in self.nexts.clone() { - next.do_send(msg.clone()); - } + for next in &self.nexts { + next.do_send(msg.clone()); + }
57-85: Trim the test sleep to keep CI fast.
10ms is likely more than needed here.Suggested change
- sleep(Duration::from_millis(10)).await; + sleep(Duration::from_millis(1)).await;Based on learnings: In the Rust test function
test_logsinpackages/ciphernode/evm/tests/evm_reader.rs, a sleep duration of 1 millisecond is sufficient for reliable event processing, even in CI environments.crates/evm/src/evm_router.rs (1)
101-138: Trim the test sleep to keep CI fast.
10ms is likely more than needed for this mailbox flush.Suggested change
- sleep(Duration::from_millis(10)).await; + sleep(Duration::from_millis(1)).await;Based on learnings: In the Rust test function
test_logsinpackages/ciphernode/evm/tests/evm_reader.rs, a sleep duration of 1 millisecond is sufficient for reliable event processing, even in CI environments.crates/events/src/sync.rs (1)
12-29: Prefer a singleChainIdalias to avoid API confusion.
Having bothChainidandChainIdin the public surface is easy to misread.Suggested change
-type Chainid = u64; +type ChainId = u64; @@ -type ChainId = u64; type DeployBlock = u64;crates/events/src/enclave_event/sync_start.rs (1)
44-46: Consider documenting or reordering thesplit()return tuple.The
split()method returns(data, ts, block)but the struct fields are ordereddata, block, chain_id, ts, id. This mismatch could lead to confusion at call sites. Consider either:
- Adding a doc comment clarifying the return order, or
- Returning a named struct instead of a tuple for clarity
crates/ciphernode-builder/src/evm_system.rs (1)
50-78: Consider usingstd::mem::takeinstead ofreplace.
std::mem::take(&mut self.route_factories)is more idiomatic thanreplace(&mut self.route_factories, Vec::new())for taking ownership while leaving an empty default.Also note that
build()is not idempotent—calling it twice will result in an emptyroute_factorieson the second call, silently producing a router with no routes. Consider either consumingselfor documenting this single-use behavior.♻️ Suggested improvement
-use std::mem::replace; +use std::mem::take;- let route_factories = replace(&mut self.route_factories, Vec::new()); + let route_factories = take(&mut self.route_factories);crates/evm/tests/integration.rs (1)
100-116: Semicolon afterOk(())creates unit return in match arm.Inside the
trapclosure at Line 113, the semicolon afterself.bus.publish(SyncEnd::new())?;followed by nothing before the closing brace means the match arm's expression evaluates to(). This works but is subtle—theOk(())at Line 114 is the closure's return, not the match arm's value. Consider adding a brief comment or restructuring for clarity.crates/evm/src/evm_read_interface.rs (1)
246-268: TimestampTracker caches only single block; consider edge cases.The
TimestampTrackercaches only the most recent(block_number, timestamp)pair. This works well for sequential log processing within the same block, but if logs arrive out of order or from different blocks interleaved, it will re-fetch timestamps unnecessarily.Also, returning
0as a fallback timestamp (Lines 249, 264) could cause issues downstream if timestamp ordering matters. Consider logging atwarnlevel when using fallback values to aid debugging.crates/data/src/persistable.rs (2)
162-164: Consider documenting the[0]sentinel value convention.The check
if bytes == [0]treats a single zero byte as "not found". This is a sentinel value pattern that could be surprising to maintainers. Consider adding a comment explaining this convention or extracting it to a named constant.📝 Suggested documentation
+ // A single zero byte [0] is used as a sentinel to indicate "no data" + // This distinguishes between "key not found" (None) and "empty value stored" if bytes == [0] { return Ok(None); }
168-187: Serialization errors are silently dropped.When
bincode::serializefails (line 176), the error is logged but the method returns without persisting. While logging is good, callers have no way to know the write failed. Consider whether this is acceptable for your use case.crates/sortition/src/ciphernode_selector.rs (1)
98-98: Minor: Consider if clones are necessary for params and error_size.The
.clone()calls ondata.paramsanddata.error_sizemay be necessary ifE3Metarequires owned values, but worth verifying they can't be moved or borrowed instead.Also applies to: 100-100
crates/events/src/enclave_event/mod.rs (1)
441-451: Preferctx.aggregate_id()forEnclaveEvent.Now that
EventContextis authoritative (and persisted), using payload-derived aggregate ids can drift from stored context.♻️ Suggested refactor
impl<S: SeqState> WithAggregateId for EnclaveEvent<S> { fn get_aggregate_id(&self) -> AggregateId { - self.payload.get_aggregate_id() + self.ctx.aggregate_id() } }
ctrlc03
left a comment
There was a problem hiding this comment.
other than checking Coderabbit comments, looks fine to merge to me.
|
Made note of coderabbit stuff will address in future PRs |
part of #1050
This PR
We are part of the way through getting sync together. Much of this is preparatory. The next PR will be Sync implementation.
Event writing system and snapshot:
EventContextto attach context toEnclaveEvent. Why? so that we can have provenance between events for state reconstruction. Also we need to have the aggregate_id on the snapshots.TypedEvent<T>. Why? so that we can disambiguate destructured events from the ctx in order to pass the context through to dispatchers.Persistableby removing unnecessary DataStore and Repository flow. Why? persistable was far more complex than it needed to be and we much better understand the actual reasons we need it now which is to stream state to snapshots.Persistable<T>to be constructed directly offStoreConnector, Why? Using a StoreConnector in order to aid factory functions helps with our API.agid -> timestamps. Why? because we are using multiple aggregates we need to ask for the timestamps for each aggregate.self.notify_sync(ctx, evt)) Why? This is a good practice in general as it will aid in auditing but may help by allowing us to set the context as we handle the EnclaveEvent - we will need to forward context between actors still (see TypedEvent) but this will simplify how often we must do this.Event system
Synchronization
Next PR
Event writing anf snapshotting
Event system
Synchronization
General
Summary by CodeRabbit
New Features
Refactor
✏️ Tip: You can customize this high-level summary in your review settings.