Skip to content

feat: libp2p sync [skip-line-limit]#1358

Merged
ryardley merged 66 commits into
mainfrom
ry/1325-net-event-sync
Mar 9, 2026
Merged

feat: libp2p sync [skip-line-limit]#1358
ryardley merged 66 commits into
mainfrom
ry/1325-net-event-sync

Conversation

@ryardley

@ryardley ryardley commented Feb 24, 2026

Copy link
Copy Markdown
Contributor

Closes #1325

  • Refactor sync -> request_response
  • EventStore querying is now a thing - you can query event stores with limits and filters.
  • Simplify direct requesting over libp2p with DirectRequester
  • Simpify testing direct requesting with DirectRequesterTester (see here)
  • Added a TestEventBuilder for easier and more universal testing
  • Tidied up the HistoryCollector significanty by splitting into two actors getting more information from failures and adding expectations.
  • Better abstractions around the place more unit tests
  • Simplify direct responding using DirectResponder adding protocol enumeration signalling Bad requests etc.
  • apply fetch_all_batched_events to sync process
  • Only request events for active E3 rounds during sync (before request)
  • Extend testing simulation down to netevents to be able to test sync. Because of this we don't have access to all the events we used to in the test Aggregator never sees nodes internal events so I had to delete the waits for these and just wait for the events we actually do see.
  • Enable sync on startup
  • Created a rough Report abstraction for the tests which indicates various test phases in the test output as well as compiling the report
  • Re-enable state synchronization test (could do in a followup PR due to timelines?)

Libp2p Simulation Refactor

The idea here is so that we can better test net based logic instead of glossing over some of the libp2p mechanisms such as DHT entries and NetSyncManager logic without setting up an actual libp2p network.

Before:

graph LR
    subgraph NodeA ["Node A"]
        A_App["Application Logic"]
        A_Bus["EventBus"]
        A_Enc["EnclaveEvent"]
    end

    subgraph Sim ["Simulation Layer"]
        Copy["Direct Copy"]
    end

    subgraph NodeB ["Node B"]
        B_Bus["EventBus"]
        B_App["Application Logic"]
    end

    A_App --> A_Bus --> A_Enc
    A_Enc -. "EnclaveEvent" .-> Copy
    Copy -. "EnclaveEvent" .-> B_Bus
    B_Bus --> B_App
Loading

After:

graph LR
    subgraph NodeA ["Node A"]
        A_App["Application Logic"]
        A_Bus["EventBus"]
        A_Plugins["Plugins (e.g. DocumentPublisher)"]
        A_Net["NetCommand"]
    end

    subgraph Sim ["Simulation Layer"]
        Convert["NetCommand → NetEvent Conversion"]
    end

    subgraph NodeB ["Node B"]
        B_NetEvt["NetEvent"]
        B_Plugins["Plugins (e.g. DocumentPublisher)"]
        B_Bus["EventBus"]
        B_App["Application Logic"]
    end

    A_App --> A_Bus --> A_Plugins --> A_Net
    A_Net -. "NetCommand" .-> Convert
    Convert -. "NetEvent" .-> B_NetEvt
    B_NetEvt --> B_Plugins --> B_Bus --> B_App
Loading

Summary by CodeRabbit

  • New Features

    • Libp2p-backed network interface with direct peer-to-peer request/response API and channel bridge testing utilities
    • Batched event fetching with cursor-based pagination and end-to-end batched sync
  • Improvements

    • Event store queries: filtering, pagination limits and per-call timeouts
    • Network readiness signaling with delayed historical net loading
    • More flexible timeouts for event retrieval and test utilities
    • Simplified graceful shutdown sequence
  • Bug Fixes

    • Improved duplicate event handling and storage error reporting
  • Tests / Tooling

    • Enhanced test helpers, libp2p mock, and event-builder utilities

@vercel

vercel Bot commented Feb 24, 2026

Copy link
Copy Markdown

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
crisp Ready Ready Preview, Comment Mar 9, 2026 9:41pm
enclave-docs Ready Ready Preview, Comment Mar 9, 2026 9:41pm

Request Review

@coderabbitai

coderabbitai Bot commented Feb 24, 2026

Copy link
Copy Markdown
Contributor

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Integrates libp2p into the workspace and rewires networking and sync: CiphernodeHandle now uses PeerId and NetChannelBridge; adds DirectRequester/DirectResponder and channel-bridge testing; implements cursor-based batched net sync and EventStore pagination/filtering; refactors net interface, events, and test-helpers.

Changes

Cohort / File(s) Summary
Ciphernode
crates/ciphernode-builder/Cargo.toml, crates/ciphernode-builder/src/ciphernode.rs, crates/ciphernode-builder/src/ciphernode_builder.rs
Adds libp2p workspace dep; CiphernodeHandle now stores libp2p::PeerId and Option<NetChannelBridge> (replacing string/join-handle); builder wiring updated to create libp2p keypair/interface and pass channel_bridge.
Net interface & handles
crates/net/src/lib.rs, crates/net/src/net_interface.rs, crates/net/src/net_interface_handle.rs, crates/net/src/net_interface_handle.rs
Introduce Libp2pKeypair, Libp2pNetInterface, setup_libp2p_keypair, setup_net_interface; add NetInterfaceHandle and NetChannelBridge test bridging utilities; setup_net signature changed to accept interface handle.
Direct request/response
crates/net/src/direct_requester.rs, crates/net/src/direct_responder.rs
New DirectRequester/DirectResponder modules: builder, retry/timeouts, PeerTarget, test harness (DirectRequesterTester), and unit tests.
Net events & commands
crates/net/src/events.rs, crates/net/src/dialer.rs
Replace Sync-specific model with generic OutgoingRequest/IncomingRequest/ProtocolResponse flow; add PeerTarget; change NetCommand::Dial to OnceTake<DialOpts>; update NetEvent/NetCommand variants.
Net sync & batching
crates/net/src/net_sync_manager.rs, crates/net/src/net_event_batch.rs, crates/net/src/net_event_translator.rs
Add cursor-based EventBatch/FetchEventsSince and batched fetch helpers; NetSyncManager switches to DirectResponder and batched aggregation; emits NetReady and integrates historical sync; translator adds BloomFilter/topic state.
Event model & test-helpers
crates/events/src/enclave_event/..., crates/test-helpers/src/libp2p_mock.rs, crates/test-helpers/src/ciphernode_system.rs, crates/test-helpers/src/lib.rs, crates/test-helpers/Cargo.toml
Rename/rework net-sync events (NetReady, HistoricalNetSyncEventsReceived), add TestEventBuilder, make several correlation_id fields private, add Libp2pMock and async simulate_libp2p_net, update ciphernode test helpers to use PeerId/NetChannelBridge.
Event store, bus & traits
crates/events/src/eventstore.rs, crates/events/src/events.rs, crates/events/src/eventstore_router.rs, crates/events/src/eventbus.rs, crates/events/src/bus_handle.rs, crates/events/src/traits.rs
Add EventStoreFilter and limit/filter options; refactor EventStore with store_event, query_by_ts/seq, and collect_events; TakeEvents returns TakeEventsResult with timeout semantics via HistoryCollectorWaiter; add async wait_for on EventSubscriber/BusHandle.
Sync orchestration & entrypoint
crates/sync/src/sync.rs, crates/entrypoint/src/helpers/shutdown.rs
Sync now waits for NetReady, computes net-HLC to select active rounds, publishes HistoricalNetSyncStart and awaits results; shutdown simplified to publish + sleep/log.
Tests & utilities
crates/tests/tests/integration.rs, crates/evm/src/evm_chain_gateway.rs, crates/utils/src/retry.rs, crates/zk-prover/tests/backend_tests.rs, crates/sync/Cargo.toml
Test updates to use TestEventBuilder and TakeEventsResult.events; integration reporting refactor; increased BACKOFF_DELAY constant; optional test skipping via env var; added e3-events dev-dep with test-helpers.
Docs & minor
crates/net/src/document_publisher.rs, crates/net/src/net_event_batch.rs
Doc/comments updated to Libp2p naming; added net_event_batch module for batched fetching; small test adjustments.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant Syncer as Sync Orchestrator
    participant Net as Libp2pNetInterface
    participant Store as Event Store

    Syncer->>Syncer: await NetReady
    Syncer->>Syncer: compute net_hlc (filter active rounds)
    Syncer->>Net: HistoricalNetSyncStart(net_hlc)
    Net->>Store: FetchEventsSince(aggregate, since, limit)
    Store-->>Net: EventBatch(events, next_cursor)
    Net-->>Syncer: HistoricalNetSyncEventsReceived(EventBatch...)
    Syncer->>Syncer: combine historical + local events
Loading
sequenceDiagram
    autonumber
    participant Requester
    participant Net as Libp2pNetInterface
    participant Responder

    Requester->>Net: OutgoingRequest(payload, target, corr_id)
    Net->>Responder: IncomingRequest(request, corr_id)
    Responder->>Responder: process & prepare ProtocolResponse
    Responder->>Net: IncomingResponse(ProtocolResponse, corr_id)
    Net-->>Requester: OutgoingRequestSucceeded(response, corr_id)
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~110 minutes

Possibly related PRs

Suggested labels

ciphernode

Suggested reviewers

  • hmzakhalid
  • ctrlc03

Poem

🐰
I hopped through crates with nimble paws,
Bridged peers and packed batchy laws,
PeerId donned, channels in tow,
Requests leap, responses glow,
Sync hums softly — ready, go!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 48.06% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: libp2p sync' clearly describes the main feature being added - libp2p synchronization implementation.
Linked Issues check ✅ Passed The PR comprehensively implements all core requirements from #1325: cursor-based batched sync with event HLCs, filtering for active E3 rounds, generic request/response behavior, and remote sync management.
Out of Scope Changes check ✅ Passed All changes align with #1325 objectives. Supporting additions (DirectRequester/Responder, test helpers, event store filtering, HistoryCollector refactoring) are necessary dependencies for the core sync implementation.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ry/1325-net-event-sync

Tip

Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs).
Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

ryardley added 2 commits February 26, 2026 06:01
…uest handling

- Add DirectResponder helper for responding to incoming libp2p requests
- Add ProtocolResponse enum (Ok, BadRequest, Error) for structured responses
- Update DirectRequester to handle ProtocolResponse variants
- Refactor NetSyncManager to use DirectResponder instead of raw channels
- Update net_interface to integrate DirectResponder into request handling

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
crates/events/src/bus_handle.rs (1)

291-303: ⚠️ Potential issue | 🟠 Major

wait_for still leaks subscription on future cancellation.

Line 298 subscribes, but if the returned future is dropped/cancelled before rx.await completes, Line 300 is never executed, leaving a stale subscription. Please make unsubscribe run on drop as well.

🔧 Proposed fix (drop-guard cleanup)
 fn wait_for(
     &self,
     event_type: EventType,
 ) -> Pin<Box<dyn Future<Output = Result<EnclaveEvent<Sequenced>>> + Send>> {
     let bus = self.event_bus.clone();
     Box::pin(async move {
         let (addr, rx) = oneshot::<EnclaveEvent<Sequenced>>();
         bus.do_send(Subscribe::new(event_type, addr.clone()));
-        let received = rx.await;
-        bus.do_send(Unsubscribe::new(event_type, addr));
-        Ok(received?)
+
+        struct UnsubscribeOnDrop {
+            bus: Addr<EventBus<EnclaveEvent<Sequenced>>>,
+            event_type: EventType,
+            addr: Recipient<EnclaveEvent<Sequenced>>,
+        }
+
+        impl Drop for UnsubscribeOnDrop {
+            fn drop(&mut self) {
+                self.bus
+                    .do_send(Unsubscribe::new(self.event_type, self.addr.clone()));
+            }
+        }
+
+        let _cleanup = UnsubscribeOnDrop {
+            bus: bus.clone(),
+            event_type,
+            addr,
+        };
+
+        Ok(rx.await?)
     })
 }
#!/bin/bash
set -euo pipefail

# Inspect current wait_for implementation and confirm unsubscribe is only post-await.
sed -n '286,310p' crates/events/src/bus_handle.rs

# Find callsites that may cancel/drop wait_for futures (timeouts/select paths).
rg -nP --type rust '\.wait_for\s*\(' -C2
rg -nP --type rust 'tokio::select!\s*|timeout\s*\(' -C2
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/events/src/bus_handle.rs` around lines 291 - 303, wait_for currently
subscribes then only unsubscribes after rx.await, leaking if the future is
cancelled; fix by introducing a drop-guard that sends Unsubscribe when dropped.
Implement a small SubscriptionGuard struct (holding bus: Addr<...>, event_type:
EventType, addr: oneshot::Sender/Addr) with a Drop impl that calls
bus.do_send(Unsubscribe::new(event_type, addr.clone())), create the guard
immediately after subscribing in wait_for, and remove or replace the explicit
Unsubscribe::new call so the guard handles cleanup on both normal completion and
cancellation; keep the guard alive across rx.await and let it be dropped (or
explicitly drop it) after awaiting to perform the unsubscribe.
🧹 Nitpick comments (2)
crates/net/src/direct_requester.rs (1)

127-133: Consider preserving deserialization error details for easier debugging.

The deserialization at line 130 discards the original error, which could make debugging type conversion issues harder—especially if the peer sends an unexpected response format.

💡 Suggested improvement
         match response {
-            ProtocolResponse::Ok(data) => Ok(data
-                .try_into()
-                .map_err(|_| anyhow!("Could not deserialize ProtocolResponse"))?),
+            ProtocolResponse::Ok(data) => {
+                let len = data.len();
+                data.try_into().map_err(|_| {
+                    anyhow!(
+                        "Could not deserialize ProtocolResponse (payload size: {} bytes)",
+                        len
+                    )
+                })
+            }
             ProtocolResponse::BadRequest(msg) => Err(anyhow!("BadRequest: {}", msg)),
             ProtocolResponse::Error(msg) => Err(anyhow!("ProtocolError: {}", msg)),
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/net/src/direct_requester.rs` around lines 127 - 133, The
ProtocolResponse::Ok branch discards the original deserialization error; change
the map_err on the try_into() call to preserve the underlying error (e.g.,
map_err(|e| anyhow!("Could not deserialize ProtocolResponse: {}", e)) or use
.context("Could not deserialize ProtocolResponse")) so the original conversion
error from try_into() is included in the returned anyhow error; update the
ProtocolResponse::Ok handling to wrap the original error rather than replacing
it with a generic message.
crates/tests/tests/integration.rs (1)

921-924: Replace raw XXX with a trackable TODO reference.

Line 921 currently uses // XXX: ENABLE THIS!!. Prefer an issue-linked TODO so this doesn’t become unowned test debt.

If you want, I can draft a concise follow-up issue template for re-enabling this test once the trBFV sync port is complete.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/tests/tests/integration.rs` around lines 921 - 924, Replace the raw
"XXX" comment above the test_stopped_keyshares_retain_state() with a tracked
TODO that references a concrete issue or ticket (e.g. "TODO: re-enable after
trBFV sync - issue #<ID>" or a full issue URL); update the comment to use a
consistent TODO format (e.g. // TODO(issue-1234): re-enable test after trBFV
sync) so the disabled #[ignore = "..."] test is linked to a traceable task and
won’t become unowned debt.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/net/src/direct_responder.rs`:
- Around line 123-132: The error message in try_request_into is misleading (it
says "serialize request bytes" while performing a deserialization via T:
TryFrom<Vec<u8>>); update the map_err closure in
DirectResponder::try_request_into to return a correct, descriptive error such as
"Could not deserialize request bytes" (optionally including target type/context)
so failures reflect the actual operation being performed on
self.request.clone().try_into().

In `@crates/tests/tests/integration.rs`:
- Around line 618-633: Replace incorrect second-based durations with
millisecond-based durations in the test timeouts: update calls to
Duration::from_secs(5000), Duration::from_secs(600), and
Duration::from_secs(1000) used with expect_events_with_timeouts and any other
test helpers to Duration::from_millis(5000), Duration::from_millis(600), and
Duration::from_millis(1000) respectively so the timeouts match the intended
millisecond scale (consistent with take_history_with_timeout()); locate these
changes around the expect_events_with_timeouts invocation and related test
helpers.

---

Duplicate comments:
In `@crates/events/src/bus_handle.rs`:
- Around line 291-303: wait_for currently subscribes then only unsubscribes
after rx.await, leaking if the future is cancelled; fix by introducing a
drop-guard that sends Unsubscribe when dropped. Implement a small
SubscriptionGuard struct (holding bus: Addr<...>, event_type: EventType, addr:
oneshot::Sender/Addr) with a Drop impl that calls
bus.do_send(Unsubscribe::new(event_type, addr.clone())), create the guard
immediately after subscribing in wait_for, and remove or replace the explicit
Unsubscribe::new call so the guard handles cleanup on both normal completion and
cancellation; keep the guard alive across rx.await and let it be dropped (or
explicitly drop it) after awaiting to perform the unsubscribe.

---

Nitpick comments:
In `@crates/net/src/direct_requester.rs`:
- Around line 127-133: The ProtocolResponse::Ok branch discards the original
deserialization error; change the map_err on the try_into() call to preserve the
underlying error (e.g., map_err(|e| anyhow!("Could not deserialize
ProtocolResponse: {}", e)) or use .context("Could not deserialize
ProtocolResponse")) so the original conversion error from try_into() is included
in the returned anyhow error; update the ProtocolResponse::Ok handling to wrap
the original error rather than replacing it with a generic message.

In `@crates/tests/tests/integration.rs`:
- Around line 921-924: Replace the raw "XXX" comment above the
test_stopped_keyshares_retain_state() with a tracked TODO that references a
concrete issue or ticket (e.g. "TODO: re-enable after trBFV sync - issue #<ID>"
or a full issue URL); update the comment to use a consistent TODO format (e.g.
// TODO(issue-1234): re-enable test after trBFV sync) so the disabled #[ignore =
"..."] test is linked to a traceable task and won’t become unowned debt.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8f4a599 and 65489ad.

📒 Files selected for processing (4)
  • crates/events/src/bus_handle.rs
  • crates/net/src/direct_requester.rs
  • crates/net/src/direct_responder.rs
  • crates/tests/tests/integration.rs

Comment thread crates/net/src/direct_responder.rs
Comment thread crates/tests/tests/integration.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/net/src/net_interface.rs (1)

151-160: ⚠️ Potential issue | 🟠 Major

Handle startup dial task failures explicitly.

At Line 151, the spawned task uses ?, but its JoinHandle is dropped, so Err results are silently discarded. This can hide initial dial failures and make startup look healthy when it isn’t.

Suggested fix
         tokio::spawn({
             let event_tx = event_tx.clone();
             let cmd_tx = cmd_tx.clone();
             let peers = self.peers.clone();
             async move {
-                dial_peers(&cmd_tx, &event_tx, &peers).await?;
-                event_tx.send(NetEvent::AllPeersDialed)?;
-                return anyhow::Ok(());
+                if let Err(e) = dial_peers(&cmd_tx, &event_tx, &peers).await {
+                    error!("Initial peer dial failed: {e}");
+                    return;
+                }
+                if let Err(e) = event_tx.send(NetEvent::AllPeersDialed) {
+                    debug!("No listeners for AllPeersDialed: {e}");
+                }
             }
         });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/net/src/net_interface.rs` around lines 151 - 160, The spawned startup
task swallowing errors because its JoinHandle is dropped: change the
tokio::spawn around the async block that calls dial_peers to capture and handle
errors explicitly (e.g., store or await the JoinHandle, spawn a wrapper that
logs or sends a failure event on Err, or propagate the error over
cmd_tx/event_tx). Specifically update the async closure that calls
dial_peers(&cmd_tx, &event_tx, &peers).await and currently uses `?` so failures
are lost — ensure failures are caught (match or .await the JoinHandle) and send
a NetEvent (or log via process logger) describing the dial failure instead of
silently dropping the result so startup dial failures are observable.
♻️ Duplicate comments (1)
crates/net/src/net_sync_manager.rs (1)

194-201: ⚠️ Potential issue | 🟠 Major

Avoid orphaning requests entries when EventStore dispatch fails.

On Line 194 the responder is stored before Line 197 dispatch. If eventstore.try_send(...) fails, the map entry is left behind and the caller gets no immediate failure response.

Suggested fix
-            let fetch_request: FetchEventsSince = msg.responder.try_request_into()?;
-            self.requests.insert(id, msg.responder);
+            let responder = msg.responder;
+            let fetch_request: FetchEventsSince = responder.try_request_into()?;
             let query: HashMap<AggregateId, u128> =
                 HashMap::from([(fetch_request.aggregate_id(), fetch_request.since())]);
-            self.eventstore.try_send(EventStoreQueryBy::<TsAgg>::new(
+            if let Err(e) = self.eventstore.try_send(EventStoreQueryBy::<TsAgg>::new(
                 id,
                 query,
                 ctx.address().recipient(),
-            ))?;
+            )) {
+                let _ = responder.bad_request(format!("eventstore unavailable: {e}"));
+                return Ok(());
+            }
+            self.requests.insert(id, responder);
             Ok(())
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/net/src/net_sync_manager.rs` around lines 194 - 201, The code inserts
the responder into self.requests before dispatching the EventStore query, which
can orphan entries if eventstore.try_send(...) fails; modify the logic in the
handler around self.requests and eventstore.try_send so that you only insert the
responder after try_send succeeds, or if you must insert before, ensure you
remove the entry and send an immediate failure to the stored responder when
EventStoreQueryBy::<TsAgg>::new(...) / eventstore.try_send(...) returns Err;
specifically update the block that uses self.requests.insert(id, msg.responder)
and the subsequent eventstore.try_send(...) call (and where
fetch_request.aggregate_id() / fetch_request.since() are used) to either move
the insert to after a successful try_send or add error handling that cleans up
self.requests and replies to the responder on failure.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/net/src/net_sync_manager.rs`:
- Around line 45-47: The try_from implementation deserializes untrusted bytes
without size limits; change it to use bincode bounded deserialization by calling
bincode::DefaultOptions::new().with_limit(MAX_SYNC_PAYLOAD_SIZE).deserialize(&value)
instead of bincode::deserialize(&value), and keep the existing context("failed
to deserialize sync response") on the Result; add a clearly named constant
(e.g., MAX_SYNC_PAYLOAD_SIZE) for the maximum allowed bytes (choose an
appropriate u64) and reference that constant in the call so try_from enforces a
safe payload size for untrusted peer sync payloads.

---

Outside diff comments:
In `@crates/net/src/net_interface.rs`:
- Around line 151-160: The spawned startup task swallowing errors because its
JoinHandle is dropped: change the tokio::spawn around the async block that calls
dial_peers to capture and handle errors explicitly (e.g., store or await the
JoinHandle, spawn a wrapper that logs or sends a failure event on Err, or
propagate the error over cmd_tx/event_tx). Specifically update the async closure
that calls dial_peers(&cmd_tx, &event_tx, &peers).await and currently uses `?`
so failures are lost — ensure failures are caught (match or .await the
JoinHandle) and send a NetEvent (or log via process logger) describing the dial
failure instead of silently dropping the result so startup dial failures are
observable.

---

Duplicate comments:
In `@crates/net/src/net_sync_manager.rs`:
- Around line 194-201: The code inserts the responder into self.requests before
dispatching the EventStore query, which can orphan entries if
eventstore.try_send(...) fails; modify the logic in the handler around
self.requests and eventstore.try_send so that you only insert the responder
after try_send succeeds, or if you must insert before, ensure you remove the
entry and send an immediate failure to the stored responder when
EventStoreQueryBy::<TsAgg>::new(...) / eventstore.try_send(...) returns Err;
specifically update the block that uses self.requests.insert(id, msg.responder)
and the subsequent eventstore.try_send(...) call (and where
fetch_request.aggregate_id() / fetch_request.since() are used) to either move
the insert to after a successful try_send or add error handling that cleans up
self.requests and replies to the responder on failure.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 65489ad and be5ec44.

📒 Files selected for processing (2)
  • crates/net/src/net_interface.rs
  • crates/net/src/net_sync_manager.rs

Comment thread crates/net/src/net_sync_manager.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
crates/tests/tests/integration.rs (1)

361-374: Minor naming and simplification suggestions.

The parameter repo in push is confusing (reads like "repository"). Consider entry or just destructure inline.

The repeat helper can be simplified:

✨ Suggested refinements
 fn repeat(ch: char, num: usize) -> String {
-    let mut s = String::new();
-    while s.len() < num {
-        s.push(ch);
-    }
-    s
+    std::iter::repeat(ch).take(num).collect()
 }

 impl Report {
-    pub fn push(&mut self, repo: (&str, Duration)) {
-        let (label, dur) = repo;
+    pub fn push(&mut self, (label, dur): (&str, Duration)) {
         self.show(label);
         self.inner.push((label.to_owned(), dur));
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/tests/tests/integration.rs` around lines 361 - 374, Rename and
simplify: in Report::push, avoid the confusing parameter name repo by
destructuring the tuple in the signature (e.g., change pub fn push(&mut self,
repo: (&str, Duration)) to pub fn push(&mut self, (label, dur): (&str,
Duration)) or rename repo to entry/tuple and use (label, dur) = ... inside the
body; update uses of label/dur accordingly. Simplify the helper function repeat
by replacing the manual loop with an iterator-based construction such as using
std::iter::repeat(ch).take(num).collect::<String>() (keep the function name
repeat) to produce the repeated-char String in one expression.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/tests/tests/integration.rs`:
- Line 984: In create_local_ciphernodes, the async call to
simulate_libp2p_net(&result) is missing .await; update the call to
simulate_libp2p_net(&result).await so the future is awaited (ensure
create_local_ciphernodes is async or already returns a future), matching the
correct usage seen at other sites (lines using simulate_libp2p_net.await) to
avoid the compilation error when re-enabling
test_duplicate_e3_id_with_different_chain_id.

---

Nitpick comments:
In `@crates/tests/tests/integration.rs`:
- Around line 361-374: Rename and simplify: in Report::push, avoid the confusing
parameter name repo by destructuring the tuple in the signature (e.g., change
pub fn push(&mut self, repo: (&str, Duration)) to pub fn push(&mut self, (label,
dur): (&str, Duration)) or rename repo to entry/tuple and use (label, dur) = ...
inside the body; update uses of label/dur accordingly. Simplify the helper
function repeat by replacing the manual loop with an iterator-based construction
such as using std::iter::repeat(ch).take(num).collect::<String>() (keep the
function name repeat) to produce the repeated-char String in one expression.

ℹ️ Review info
Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 286df9a9-0b24-4401-ab1d-60530abe4e3e

📥 Commits

Reviewing files that changed from the base of the PR and between be5ec44 and a4ccf08.

📒 Files selected for processing (1)
  • crates/tests/tests/integration.rs

Comment thread crates/tests/tests/integration.rs
Comment thread crates/sync/src/sync.rs Outdated

@ctrlc03 ctrlc03 left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great work!

@coderabbitai coderabbitai Bot mentioned this pull request Apr 6, 2026
7 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

net sync

2 participants