feat: libp2p sync [skip-line-limit]#1358
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughIntegrates libp2p into the workspace and rewires networking and sync: CiphernodeHandle now uses Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Syncer as Sync Orchestrator
participant Net as Libp2pNetInterface
participant Store as Event Store
Syncer->>Syncer: await NetReady
Syncer->>Syncer: compute net_hlc (filter active rounds)
Syncer->>Net: HistoricalNetSyncStart(net_hlc)
Net->>Store: FetchEventsSince(aggregate, since, limit)
Store-->>Net: EventBatch(events, next_cursor)
Net-->>Syncer: HistoricalNetSyncEventsReceived(EventBatch...)
Syncer->>Syncer: combine historical + local events
sequenceDiagram
autonumber
participant Requester
participant Net as Libp2pNetInterface
participant Responder
Requester->>Net: OutgoingRequest(payload, target, corr_id)
Net->>Responder: IncomingRequest(request, corr_id)
Responder->>Responder: process & prepare ProtocolResponse
Responder->>Net: IncomingResponse(ProtocolResponse, corr_id)
Net-->>Requester: OutgoingRequestSucceeded(response, corr_id)
Estimated code review effort🎯 5 (Critical) | ⏱️ ~110 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…uest handling - Add DirectResponder helper for responding to incoming libp2p requests - Add ProtocolResponse enum (Ok, BadRequest, Error) for structured responses - Update DirectRequester to handle ProtocolResponse variants - Refactor NetSyncManager to use DirectResponder instead of raw channels - Update net_interface to integrate DirectResponder into request handling
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
crates/events/src/bus_handle.rs (1)
291-303:⚠️ Potential issue | 🟠 Major
wait_forstill leaks subscription on future cancellation.Line 298 subscribes, but if the returned future is dropped/cancelled before
rx.awaitcompletes, Line 300 is never executed, leaving a stale subscription. Please make unsubscribe run on drop as well.🔧 Proposed fix (drop-guard cleanup)
fn wait_for( &self, event_type: EventType, ) -> Pin<Box<dyn Future<Output = Result<EnclaveEvent<Sequenced>>> + Send>> { let bus = self.event_bus.clone(); Box::pin(async move { let (addr, rx) = oneshot::<EnclaveEvent<Sequenced>>(); bus.do_send(Subscribe::new(event_type, addr.clone())); - let received = rx.await; - bus.do_send(Unsubscribe::new(event_type, addr)); - Ok(received?) + + struct UnsubscribeOnDrop { + bus: Addr<EventBus<EnclaveEvent<Sequenced>>>, + event_type: EventType, + addr: Recipient<EnclaveEvent<Sequenced>>, + } + + impl Drop for UnsubscribeOnDrop { + fn drop(&mut self) { + self.bus + .do_send(Unsubscribe::new(self.event_type, self.addr.clone())); + } + } + + let _cleanup = UnsubscribeOnDrop { + bus: bus.clone(), + event_type, + addr, + }; + + Ok(rx.await?) }) }#!/bin/bash set -euo pipefail # Inspect current wait_for implementation and confirm unsubscribe is only post-await. sed -n '286,310p' crates/events/src/bus_handle.rs # Find callsites that may cancel/drop wait_for futures (timeouts/select paths). rg -nP --type rust '\.wait_for\s*\(' -C2 rg -nP --type rust 'tokio::select!\s*|timeout\s*\(' -C2🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/events/src/bus_handle.rs` around lines 291 - 303, wait_for currently subscribes then only unsubscribes after rx.await, leaking if the future is cancelled; fix by introducing a drop-guard that sends Unsubscribe when dropped. Implement a small SubscriptionGuard struct (holding bus: Addr<...>, event_type: EventType, addr: oneshot::Sender/Addr) with a Drop impl that calls bus.do_send(Unsubscribe::new(event_type, addr.clone())), create the guard immediately after subscribing in wait_for, and remove or replace the explicit Unsubscribe::new call so the guard handles cleanup on both normal completion and cancellation; keep the guard alive across rx.await and let it be dropped (or explicitly drop it) after awaiting to perform the unsubscribe.
🧹 Nitpick comments (2)
crates/net/src/direct_requester.rs (1)
127-133: Consider preserving deserialization error details for easier debugging.The deserialization at line 130 discards the original error, which could make debugging type conversion issues harder—especially if the peer sends an unexpected response format.
💡 Suggested improvement
match response { - ProtocolResponse::Ok(data) => Ok(data - .try_into() - .map_err(|_| anyhow!("Could not deserialize ProtocolResponse"))?), + ProtocolResponse::Ok(data) => { + let len = data.len(); + data.try_into().map_err(|_| { + anyhow!( + "Could not deserialize ProtocolResponse (payload size: {} bytes)", + len + ) + }) + } ProtocolResponse::BadRequest(msg) => Err(anyhow!("BadRequest: {}", msg)), ProtocolResponse::Error(msg) => Err(anyhow!("ProtocolError: {}", msg)), }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/net/src/direct_requester.rs` around lines 127 - 133, The ProtocolResponse::Ok branch discards the original deserialization error; change the map_err on the try_into() call to preserve the underlying error (e.g., map_err(|e| anyhow!("Could not deserialize ProtocolResponse: {}", e)) or use .context("Could not deserialize ProtocolResponse")) so the original conversion error from try_into() is included in the returned anyhow error; update the ProtocolResponse::Ok handling to wrap the original error rather than replacing it with a generic message.crates/tests/tests/integration.rs (1)
921-924: Replace rawXXXwith a trackable TODO reference.Line 921 currently uses
// XXX: ENABLE THIS!!. Prefer an issue-linked TODO so this doesn’t become unowned test debt.If you want, I can draft a concise follow-up issue template for re-enabling this test once the trBFV sync port is complete.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/tests/tests/integration.rs` around lines 921 - 924, Replace the raw "XXX" comment above the test_stopped_keyshares_retain_state() with a tracked TODO that references a concrete issue or ticket (e.g. "TODO: re-enable after trBFV sync - issue #<ID>" or a full issue URL); update the comment to use a consistent TODO format (e.g. // TODO(issue-1234): re-enable test after trBFV sync) so the disabled #[ignore = "..."] test is linked to a traceable task and won’t become unowned debt.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/net/src/direct_responder.rs`:
- Around line 123-132: The error message in try_request_into is misleading (it
says "serialize request bytes" while performing a deserialization via T:
TryFrom<Vec<u8>>); update the map_err closure in
DirectResponder::try_request_into to return a correct, descriptive error such as
"Could not deserialize request bytes" (optionally including target type/context)
so failures reflect the actual operation being performed on
self.request.clone().try_into().
In `@crates/tests/tests/integration.rs`:
- Around line 618-633: Replace incorrect second-based durations with
millisecond-based durations in the test timeouts: update calls to
Duration::from_secs(5000), Duration::from_secs(600), and
Duration::from_secs(1000) used with expect_events_with_timeouts and any other
test helpers to Duration::from_millis(5000), Duration::from_millis(600), and
Duration::from_millis(1000) respectively so the timeouts match the intended
millisecond scale (consistent with take_history_with_timeout()); locate these
changes around the expect_events_with_timeouts invocation and related test
helpers.
---
Duplicate comments:
In `@crates/events/src/bus_handle.rs`:
- Around line 291-303: wait_for currently subscribes then only unsubscribes
after rx.await, leaking if the future is cancelled; fix by introducing a
drop-guard that sends Unsubscribe when dropped. Implement a small
SubscriptionGuard struct (holding bus: Addr<...>, event_type: EventType, addr:
oneshot::Sender/Addr) with a Drop impl that calls
bus.do_send(Unsubscribe::new(event_type, addr.clone())), create the guard
immediately after subscribing in wait_for, and remove or replace the explicit
Unsubscribe::new call so the guard handles cleanup on both normal completion and
cancellation; keep the guard alive across rx.await and let it be dropped (or
explicitly drop it) after awaiting to perform the unsubscribe.
---
Nitpick comments:
In `@crates/net/src/direct_requester.rs`:
- Around line 127-133: The ProtocolResponse::Ok branch discards the original
deserialization error; change the map_err on the try_into() call to preserve the
underlying error (e.g., map_err(|e| anyhow!("Could not deserialize
ProtocolResponse: {}", e)) or use .context("Could not deserialize
ProtocolResponse")) so the original conversion error from try_into() is included
in the returned anyhow error; update the ProtocolResponse::Ok handling to wrap
the original error rather than replacing it with a generic message.
In `@crates/tests/tests/integration.rs`:
- Around line 921-924: Replace the raw "XXX" comment above the
test_stopped_keyshares_retain_state() with a tracked TODO that references a
concrete issue or ticket (e.g. "TODO: re-enable after trBFV sync - issue #<ID>"
or a full issue URL); update the comment to use a consistent TODO format (e.g.
// TODO(issue-1234): re-enable test after trBFV sync) so the disabled #[ignore =
"..."] test is linked to a traceable task and won’t become unowned debt.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
crates/events/src/bus_handle.rscrates/net/src/direct_requester.rscrates/net/src/direct_responder.rscrates/tests/tests/integration.rs
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/net/src/net_interface.rs (1)
151-160:⚠️ Potential issue | 🟠 MajorHandle startup dial task failures explicitly.
At Line 151, the spawned task uses
?, but itsJoinHandleis dropped, soErrresults are silently discarded. This can hide initial dial failures and make startup look healthy when it isn’t.Suggested fix
tokio::spawn({ let event_tx = event_tx.clone(); let cmd_tx = cmd_tx.clone(); let peers = self.peers.clone(); async move { - dial_peers(&cmd_tx, &event_tx, &peers).await?; - event_tx.send(NetEvent::AllPeersDialed)?; - return anyhow::Ok(()); + if let Err(e) = dial_peers(&cmd_tx, &event_tx, &peers).await { + error!("Initial peer dial failed: {e}"); + return; + } + if let Err(e) = event_tx.send(NetEvent::AllPeersDialed) { + debug!("No listeners for AllPeersDialed: {e}"); + } } });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/net/src/net_interface.rs` around lines 151 - 160, The spawned startup task swallowing errors because its JoinHandle is dropped: change the tokio::spawn around the async block that calls dial_peers to capture and handle errors explicitly (e.g., store or await the JoinHandle, spawn a wrapper that logs or sends a failure event on Err, or propagate the error over cmd_tx/event_tx). Specifically update the async closure that calls dial_peers(&cmd_tx, &event_tx, &peers).await and currently uses `?` so failures are lost — ensure failures are caught (match or .await the JoinHandle) and send a NetEvent (or log via process logger) describing the dial failure instead of silently dropping the result so startup dial failures are observable.
♻️ Duplicate comments (1)
crates/net/src/net_sync_manager.rs (1)
194-201:⚠️ Potential issue | 🟠 MajorAvoid orphaning
requestsentries when EventStore dispatch fails.On Line 194 the responder is stored before Line 197 dispatch. If
eventstore.try_send(...)fails, the map entry is left behind and the caller gets no immediate failure response.Suggested fix
- let fetch_request: FetchEventsSince = msg.responder.try_request_into()?; - self.requests.insert(id, msg.responder); + let responder = msg.responder; + let fetch_request: FetchEventsSince = responder.try_request_into()?; let query: HashMap<AggregateId, u128> = HashMap::from([(fetch_request.aggregate_id(), fetch_request.since())]); - self.eventstore.try_send(EventStoreQueryBy::<TsAgg>::new( + if let Err(e) = self.eventstore.try_send(EventStoreQueryBy::<TsAgg>::new( id, query, ctx.address().recipient(), - ))?; + )) { + let _ = responder.bad_request(format!("eventstore unavailable: {e}")); + return Ok(()); + } + self.requests.insert(id, responder); Ok(())🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/net/src/net_sync_manager.rs` around lines 194 - 201, The code inserts the responder into self.requests before dispatching the EventStore query, which can orphan entries if eventstore.try_send(...) fails; modify the logic in the handler around self.requests and eventstore.try_send so that you only insert the responder after try_send succeeds, or if you must insert before, ensure you remove the entry and send an immediate failure to the stored responder when EventStoreQueryBy::<TsAgg>::new(...) / eventstore.try_send(...) returns Err; specifically update the block that uses self.requests.insert(id, msg.responder) and the subsequent eventstore.try_send(...) call (and where fetch_request.aggregate_id() / fetch_request.since() are used) to either move the insert to after a successful try_send or add error handling that cleans up self.requests and replies to the responder on failure.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/net/src/net_sync_manager.rs`:
- Around line 45-47: The try_from implementation deserializes untrusted bytes
without size limits; change it to use bincode bounded deserialization by calling
bincode::DefaultOptions::new().with_limit(MAX_SYNC_PAYLOAD_SIZE).deserialize(&value)
instead of bincode::deserialize(&value), and keep the existing context("failed
to deserialize sync response") on the Result; add a clearly named constant
(e.g., MAX_SYNC_PAYLOAD_SIZE) for the maximum allowed bytes (choose an
appropriate u64) and reference that constant in the call so try_from enforces a
safe payload size for untrusted peer sync payloads.
---
Outside diff comments:
In `@crates/net/src/net_interface.rs`:
- Around line 151-160: The spawned startup task swallowing errors because its
JoinHandle is dropped: change the tokio::spawn around the async block that calls
dial_peers to capture and handle errors explicitly (e.g., store or await the
JoinHandle, spawn a wrapper that logs or sends a failure event on Err, or
propagate the error over cmd_tx/event_tx). Specifically update the async closure
that calls dial_peers(&cmd_tx, &event_tx, &peers).await and currently uses `?`
so failures are lost — ensure failures are caught (match or .await the
JoinHandle) and send a NetEvent (or log via process logger) describing the dial
failure instead of silently dropping the result so startup dial failures are
observable.
---
Duplicate comments:
In `@crates/net/src/net_sync_manager.rs`:
- Around line 194-201: The code inserts the responder into self.requests before
dispatching the EventStore query, which can orphan entries if
eventstore.try_send(...) fails; modify the logic in the handler around
self.requests and eventstore.try_send so that you only insert the responder
after try_send succeeds, or if you must insert before, ensure you remove the
entry and send an immediate failure to the stored responder when
EventStoreQueryBy::<TsAgg>::new(...) / eventstore.try_send(...) returns Err;
specifically update the block that uses self.requests.insert(id, msg.responder)
and the subsequent eventstore.try_send(...) call (and where
fetch_request.aggregate_id() / fetch_request.since() are used) to either move
the insert to after a successful try_send or add error handling that cleans up
self.requests and replies to the responder on failure.
be5ec44 to
a4ccf08
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
crates/tests/tests/integration.rs (1)
361-374: Minor naming and simplification suggestions.The parameter
repoinpushis confusing (reads like "repository"). Considerentryor just destructure inline.The
repeathelper can be simplified:✨ Suggested refinements
fn repeat(ch: char, num: usize) -> String { - let mut s = String::new(); - while s.len() < num { - s.push(ch); - } - s + std::iter::repeat(ch).take(num).collect() } impl Report { - pub fn push(&mut self, repo: (&str, Duration)) { - let (label, dur) = repo; + pub fn push(&mut self, (label, dur): (&str, Duration)) { self.show(label); self.inner.push((label.to_owned(), dur)); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/tests/tests/integration.rs` around lines 361 - 374, Rename and simplify: in Report::push, avoid the confusing parameter name repo by destructuring the tuple in the signature (e.g., change pub fn push(&mut self, repo: (&str, Duration)) to pub fn push(&mut self, (label, dur): (&str, Duration)) or rename repo to entry/tuple and use (label, dur) = ... inside the body; update uses of label/dur accordingly. Simplify the helper function repeat by replacing the manual loop with an iterator-based construction such as using std::iter::repeat(ch).take(num).collect::<String>() (keep the function name repeat) to produce the repeated-char String in one expression.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/tests/tests/integration.rs`:
- Line 984: In create_local_ciphernodes, the async call to
simulate_libp2p_net(&result) is missing .await; update the call to
simulate_libp2p_net(&result).await so the future is awaited (ensure
create_local_ciphernodes is async or already returns a future), matching the
correct usage seen at other sites (lines using simulate_libp2p_net.await) to
avoid the compilation error when re-enabling
test_duplicate_e3_id_with_different_chain_id.
---
Nitpick comments:
In `@crates/tests/tests/integration.rs`:
- Around line 361-374: Rename and simplify: in Report::push, avoid the confusing
parameter name repo by destructuring the tuple in the signature (e.g., change
pub fn push(&mut self, repo: (&str, Duration)) to pub fn push(&mut self, (label,
dur): (&str, Duration)) or rename repo to entry/tuple and use (label, dur) = ...
inside the body; update uses of label/dur accordingly. Simplify the helper
function repeat by replacing the manual loop with an iterator-based construction
such as using std::iter::repeat(ch).take(num).collect::<String>() (keep the
function name repeat) to produce the repeated-char String in one expression.
ℹ️ Review info
Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 286df9a9-0b24-4401-ab1d-60530abe4e3e
📒 Files selected for processing (1)
crates/tests/tests/integration.rs
This reverts commit 23fcb76.
Closes #1325
DirectRequesterDirectRequesterTester(see here)TestEventBuilderfor easier and more universal testingDirectResponderadding protocol enumeration signalling Bad requests etc.fetch_all_batched_eventsto sync processLibp2p Simulation Refactor
The idea here is so that we can better test net based logic instead of glossing over some of the libp2p mechanisms such as DHT entries and NetSyncManager logic without setting up an actual libp2p network.
Before:
graph LR subgraph NodeA ["Node A"] A_App["Application Logic"] A_Bus["EventBus"] A_Enc["EnclaveEvent"] end subgraph Sim ["Simulation Layer"] Copy["Direct Copy"] end subgraph NodeB ["Node B"] B_Bus["EventBus"] B_App["Application Logic"] end A_App --> A_Bus --> A_Enc A_Enc -. "EnclaveEvent" .-> Copy Copy -. "EnclaveEvent" .-> B_Bus B_Bus --> B_AppAfter:
graph LR subgraph NodeA ["Node A"] A_App["Application Logic"] A_Bus["EventBus"] A_Plugins["Plugins (e.g. DocumentPublisher)"] A_Net["NetCommand"] end subgraph Sim ["Simulation Layer"] Convert["NetCommand → NetEvent Conversion"] end subgraph NodeB ["Node B"] B_NetEvt["NetEvent"] B_Plugins["Plugins (e.g. DocumentPublisher)"] B_Bus["EventBus"] B_App["Application Logic"] end A_App --> A_Bus --> A_Plugins --> A_Net A_Net -. "NetCommand" .-> Convert Convert -. "NetEvent" .-> B_NetEvt B_NetEvt --> B_Plugins --> B_Bus --> B_AppSummary by CodeRabbit
New Features
Improvements
Bug Fixes
Tests / Tooling