Skip to content

feat: do not replay unnecessary actions on sync#1442

Merged
cedoor merged 7 commits into
mainfrom
feat/do-not-replay-certain-actions-on-sync
Mar 18, 2026
Merged

feat: do not replay unnecessary actions on sync#1442
cedoor merged 7 commits into
mainfrom
feat/do-not-replay-certain-actions-on-sync

Conversation

@ctrlc03

@ctrlc03 ctrlc03 commented Mar 17, 2026

Copy link
Copy Markdown
Collaborator

fix #1292 and ensure we mark E3s as completed/failed so that we do not replay actions we don't need to whenever we restart a node

Summary by CodeRabbit

  • Improvements

    • Gates multiple event subscriptions so they do not run during historical replay; enables them once effects are active.
    • Networking enhancements: loopback address avoidance, backpressure warnings, per-peer failure tracking, and richer connect/disconnect logging.
  • Bug Fixes

    • Prevents replay-time scheduling and reduces noisy logs.
  • Tests

    • Added tests validating gated vs immediate event subscription behavior.

@vercel

vercel Bot commented Mar 17, 2026

Copy link
Copy Markdown

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
crisp Ready Ready Preview, Comment Mar 18, 2026 9:32am
enclave-docs Ready Ready Preview, Comment Mar 18, 2026 9:32am

Request Review

@coderabbitai

coderabbitai Bot commented Mar 17, 2026

Copy link
Copy Markdown
Contributor

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Subscriptions for CommitteeRequested, ComputeRequest, and E3Requested are delayed until an EffectsEnabled event fires; tests validate gated vs immediate subscriptions; net changes add loopback avoidance, backpressure warnings, and a per-peer failure tracker.

Changes

Cohort / File(s) Summary
Subscription gating
crates/committee_finalizer.rs, crates/multithread/src/multithread.rs, crates/sortition/src/sortition.rs
Deferred subscriptions for CommitteeRequested, ComputeRequest, and E3Requested behind an EffectsEnabled run_once handler so these events are not processed during historical replay; other state/event subscriptions retained.
Tests (gating semantics)
crates/sync/src/sync.rs
Adds async Actix-based tests verifying gated subscriptions ignore pre-EffectsEnabled events while immediate subscriptions receive events before and after EffectsEnabled.
Networking & peer tracking
crates/net/src/net_interface.rs
Adds loopback-address checks (avoid adding loopback to Kademlia), configurable channel sizes, runtime backpressure warnings with rate limiting, richer swarm event logging, and a PeerFailureTracker with TTL-based cleanup.
Import reorg (cosmetic)
crates/evm/src/enclave_sol_writer.rs
Reorganized imports only; no API or behavior changes.

Sequence Diagram(s)

sequenceDiagram
    participant Module as Component (Committee/Multithread/Sortition)
    participant Bus as Event Bus
    participant Effects as EffectsEnabled
    participant Target as Target Event (E3Requested / ComputeRequest / CommitteeRequested)

    rect rgba(100,150,200,0.5)
    Note over Module,Bus: Attach / startup
    Module->>Bus: Subscribe to EffectsEnabled
    Target-->>Bus: Target Event published (during replay)
    Note over Bus: Target ignored (no subscription)
    end

    rect rgba(150,100,200,0.5)
    Note over Bus,Effects: EffectsEnabled fires
    Effects-->>Bus: EffectsEnabled event
    Bus->>Module: Deliver EffectsEnabled (run_once)
    Module->>Bus: Subscribe to Target (enable normal handling)
    end

    rect rgba(100,200,150,0.5)
    Note over Bus,Target: Normal operation
    Target-->>Bus: Target Event published
    Bus->>Module: Target delivered & processed
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

ciphernode

Suggested reviewers

  • ryardley
  • 0xjei

Poem

🐇
I sat through replay's gentle hush and wait,
No early calls to rouse the sleeping state.
When EffectsEnabled rang its single chime,
I hopped to life — subscriptions wake in time.
Hooray — events now dance in proper rhyme! ✨

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main objective of the pull request: preventing unnecessary action replays during synchronization by gating event subscriptions behind EffectsEnabled.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/do-not-replay-certain-actions-on-sync
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/aggregator/src/committee_finalizer.rs`:
- Around line 89-92: Re-check the terminal state after the async timestamp
lookup: inside the future's continuation (the closure passed to .then(...) that
currently calls run_later), query act.terminal_e3s again for e3_id and bail out
if it's now terminal before scheduling the timer. Concretely, after
get_current_timestamp() completes and within the .then closure (the continuation
that calls run_later), add the same guard that checks
self/act.terminal_e3s.contains(&e3_id) and return early to avoid scheduling
timers for E3s that transitioned to E3Failed/E3StageChanged while the RPC was in
flight.

In `@crates/evm/src/ciphernode_registry_sol.rs`:
- Around line 314-316: The terminal-event filter in the EventType list is
missing E3Failed, so failed rounds still allow replayed events; update the
terminal set to include EventType::E3Failed and ensure the event writer/handler
that records terminal state (the code building the terminal EventType list and
the logic handling E3StageChanged) records/marks E3Failed as terminal when
E3StageChanged::Failed is observed (even if it arrives later) so subsequent
replayed TicketGenerated/CommitteeFinalizeRequested/PublicKeyAggregated for that
request are skipped; locate references to EventType (the enum) and the handling
code around E3RequestComplete and E3StageChanged to add E3Failed to the terminal
list and to record/short-circuit on E3StageChanged::Failed.
- Around line 354-374: The branches currently check terminal_e3s before calling
ctx.notify, but you must also perform the same terminal guard at the start of
each concrete handler to avoid executing queued transactions if the E3 became
terminal after notify; add an early-return guard in the Handler implementations
(Handler<TicketGenerated>::handle, Handler<CommitteeFinalizeRequested>::handle,
Handler<PublicKeyAggregated>::handle) that checks self.provider.chain_id() ==
data.e3_id.chain_id() and returns immediately if
self.terminal_e3s.contains(&data.e3_id) (i.e., skip processing/transaction
execution when the e3_id is terminal).

In `@crates/evm/src/enclave_sol_writer.rs`:
- Around line 96-118: The code currently inserts into terminal_e3s prematurely
in the EnclaveEventData::E3RequestComplete and the E3StageChanged (Failed)
handling, causing events to be suppressed before on-chain writes actually
succeed; change the logic so terminal_e3s is only updated after the
corresponding write completes successfully. Concretely: in the handlers for
EnclaveEventData::E3RequestComplete and the E3StageChanged Failed branch, remove
the immediate self.terminal_e3s.insert(...) and instead trigger/notify the write
flow (e.g., publishPlaintextOutput or processE3Failure) as you already do with
ctx.notify(data) and then insert the e3_id into terminal_e3s from the
success/completion callback or the code path that confirms
publishPlaintextOutput/processE3Failure finished without retries left; keep the
PlaintextAggregated check that guards ctx.notify but do not mark terminal until
write confirmation.

In `@crates/multithread/src/multithread.rs`:
- Around line 182-190: attach_with_zk currently subscribes to ComputeRequest
immediately via bus.subscribe_all([...ComputeRequest..., E3Failed,
E3StageChanged, E3RequestComplete], ...) which bypasses the replay-safe ordering
used by attach(); change attach_with_zk to perform the same two-phase
subscription: first subscribe only to terminal/marker events (E3Failed,
E3StageChanged, E3RequestComplete and any EffectsEnabled marker), wait until
EffectsEnabled is observed (or confirm effects are enabled), and only then
subscribe to EventType::ComputeRequest (or call subscribe for ComputeRequest in
the second phase). Update the subscription calls around
bus.subscribe_all/subscribe to mirror attach()’s order so stale compute jobs
cannot be consumed before EffectsEnabled.
- Around line 222-228: The branch handling EnclaveEventData::ComputeRequest
currently calls ctx.notify(TypedEvent::new(data, ec)) without re-checking
terminal_e3s, allowing a queued compute to run after the E3 became terminal;
change this so you only call ctx.notify when the E3 is still non-terminal at the
moment of enqueue: in the EnclaveEventData::ComputeRequest arm re-check
self.terminal_e3s.contains(&data.e3_id) (or otherwise acquire the same
guard/lock used for terminal_e3s) and skip calling
ctx.notify(TypedEvent::new(data, ec)) if it is now terminal, ensuring the queued
message is never posted for terminal E3s and avoiding relying on the handler to
re-check.

In `@crates/sortition/src/sortition.rs`:
- Around line 460-463: The handler currently checks terminal_e3s in sortition
but terminal_e3s is only populated later by E3Failed / E3StageChanged events, so
E3Requested can still trigger on replay; fix by hydrating terminal_e3s before
subscribing to E3Requested (or by deferring/buffering E3Requested processing
until after terminal stream ingestion). Concretely, ensure the initialization
path that registers the E3Requested handler (e.g., the subscribe_to_e3_requested
/ start_subscriptions call or the function that attaches the E3Requested
listener) is moved to after a new hydrate_terminal_e3s() step that scans past
events and populates the terminal_e3s set, or change the E3Requested handler
(the sortition entry point) to detect an "terminals_loaded" flag and queue
incoming E3Requested events until terminal_e3s is populated (flip the flag after
processing E3Failed / E3StageChanged replays).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 13ca9957-6559-4c68-b866-4182133c3c0a

📥 Commits

Reviewing files that changed from the base of the PR and between d053ae8 and 993b092.

📒 Files selected for processing (5)
  • crates/aggregator/src/committee_finalizer.rs
  • crates/evm/src/ciphernode_registry_sol.rs
  • crates/evm/src/enclave_sol_writer.rs
  • crates/multithread/src/multithread.rs
  • crates/sortition/src/sortition.rs

Comment thread crates/aggregator/src/committee_finalizer.rs Outdated
Comment thread crates/evm/src/ciphernode_registry_sol.rs Outdated
Comment thread crates/evm/src/ciphernode_registry_sol.rs
Comment thread crates/evm/src/enclave_sol_writer.rs Outdated
Comment thread crates/multithread/src/multithread.rs Outdated
Comment thread crates/sortition/src/sortition.rs Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/multithread/src/multithread.rs (1)

158-172: ⚠️ Potential issue | 🟠 Major

attach_with_zk bypasses the replay-safe subscription gating used by attach.

The attach() method gates ComputeRequest subscription behind the EffectsEnabled event to prevent proof generation during historical event replay (lines 139–146). However, attach_with_zk() directly subscribes to ComputeRequest at line 170 without this gating, creating an inconsistency. When a ZK backend is configured, stale compute jobs can be replayed before terminal markers are ingested, undermining replay safety.

Apply the same EffectsEnabled gating in attach_with_zk().

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/multithread/src/multithread.rs` around lines 158 - 172, attach_with_zk
currently subscribes to EventType::ComputeRequest immediately, bypassing the
replay-safety gating used in attach; modify attach_with_zk to mirror attach by
subscribing to EventType::EffectsEnabled (using the actor address recipient) and
only after receiving/handling that event subscribe to EventType::ComputeRequest
so proof generation is blocked during historical replay; preserve creation of
ZkProver/actor via ZkProver::new and with_zk_prover but move or add the
EffectsEnabled gating step around the bus.subscribe for
EventType::ComputeRequest to match attach's behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@crates/multithread/src/multithread.rs`:
- Around line 158-172: attach_with_zk currently subscribes to
EventType::ComputeRequest immediately, bypassing the replay-safety gating used
in attach; modify attach_with_zk to mirror attach by subscribing to
EventType::EffectsEnabled (using the actor address recipient) and only after
receiving/handling that event subscribe to EventType::ComputeRequest so proof
generation is blocked during historical replay; preserve creation of
ZkProver/actor via ZkProver::new and with_zk_prover but move or add the
EffectsEnabled gating step around the bus.subscribe for
EventType::ComputeRequest to match attach's behavior.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: b2077bc3-ade7-46d1-90e3-8b12e9ea0ed8

📥 Commits

Reviewing files that changed from the base of the PR and between 993b092 and 34035a6.

📒 Files selected for processing (6)
  • crates/aggregator/src/committee_finalizer.rs
  • crates/evm/src/ciphernode_registry_sol.rs
  • crates/evm/src/enclave_sol_writer.rs
  • crates/multithread/src/multithread.rs
  • crates/sortition/src/sortition.rs
  • crates/sync/src/sync.rs
💤 Files with no reviewable changes (1)
  • crates/evm/src/ciphernode_registry_sol.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
crates/sync/src/sync.rs (1)

526-537: Consider extracting Counter actor to module level.

The Counter actor struct is defined identically in both test functions. Extracting it to the test module's top level would reduce duplication.

♻️ Optional refactor

Add at the top of the mod tests block (around line 316):

// Test helper actor that counts received events
struct Counter(Arc<AtomicU32>);
impl actix::Actor for Counter {
    type Context = actix::Context<Self>;
}
impl actix::Handler<EnclaveEvent> for Counter {
    type Result = ();
    fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result {
        if matches!(msg.get_data(), EnclaveEventData::TestEvent(_)) {
            self.0.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
        }
    }
}

Then remove the inline definitions in both test functions.

Also applies to: 606-617

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/sync/src/sync.rs` around lines 526 - 537, The Counter actor is
duplicated in two tests; extract it to the test module top level to remove
duplication: move the struct Counter(Arc<AtomicU32>) and its impls (impl Actor
for Counter and impl Handler<EnclaveEvent> for Counter) into the mod tests scope
(above the test functions) and update the tests to use that shared Counter;
ensure the Handler implementation still matches on EnclaveEventData::TestEvent
and uses Ordering::SeqCst so behavior remains identical.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/evm/src/enclave_sol_writer.rs`:
- Around line 27-28: Remove the duplicate E3id import by editing the use
statement that reads use e3_events::{E3Stage, E3StageChanged, E3id}; — either
drop E3id from that brace list or consolidate the two use lines into a single
import that includes BusHandle, E3id, EnclaveEventData, E3Stage, and
E3StageChanged so E3id is only imported once (refer to the existing import
containing BusHandle, E3id, EnclaveEventData).

---

Nitpick comments:
In `@crates/sync/src/sync.rs`:
- Around line 526-537: The Counter actor is duplicated in two tests; extract it
to the test module top level to remove duplication: move the struct
Counter(Arc<AtomicU32>) and its impls (impl Actor for Counter and impl
Handler<EnclaveEvent> for Counter) into the mod tests scope (above the test
functions) and update the tests to use that shared Counter; ensure the Handler
implementation still matches on EnclaveEventData::TestEvent and uses
Ordering::SeqCst so behavior remains identical.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 6738b344-0042-4277-97af-7edf2a522209

📥 Commits

Reviewing files that changed from the base of the PR and between 34035a6 and c13f636.

📒 Files selected for processing (5)
  • crates/aggregator/src/committee_finalizer.rs
  • crates/evm/src/enclave_sol_writer.rs
  • crates/multithread/src/multithread.rs
  • crates/sortition/src/sortition.rs
  • crates/sync/src/sync.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • crates/multithread/src/multithread.rs
  • crates/aggregator/src/committee_finalizer.rs

Comment thread crates/evm/src/enclave_sol_writer.rs
Comment thread crates/multithread/src/multithread.rs Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
crates/net/src/net_interface.rs (2)

940-946: Minor: Double Instant::now() call.

Instant::now() is called twice — once in or_insert and once for the timestamp update. While the overhead is negligible, a single capture would be cleaner.

♻️ Optional: Single timestamp capture
 fn record_failure(&mut self, peer_id: &PeerId) -> u32 {
     self.cleanup_stale();
-    let entry = self.failures.entry(*peer_id).or_insert((0, Instant::now()));
+    let now = Instant::now();
+    let entry = self.failures.entry(*peer_id).or_insert((0, now));
     entry.0 += 1;
-    entry.1 = Instant::now();
+    entry.1 = now;
     entry.0
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/net/src/net_interface.rs` around lines 940 - 946, The record_failure
function calls Instant::now() twice; capture a single timestamp at the start
(e.g., let ts = Instant::now()) and use that ts for both or_insert((0, ts)) and
for updating entry.1 so failures.entry(*peer_id).or_insert((0, ts)) and entry.1
= ts reuse the same Instant; keep the rest of the logic (cleanup_stale,
increment entry.0, return) unchanged.

198-201: Good observability, consider rate-limiting the warning.

The backpressure warning provides valuable operational insight. However, if the channel remains above 75% capacity for an extended period, this could generate excessive log noise. Consider rate-limiting the warning (e.g., once per minute or after N consecutive occurrences).

♻️ Optional: Rate-limited backpressure warning
+// Add field to track last warning time (or use a counter approach)
+let mut last_backpressure_warn = Instant::now() - Duration::from_secs(60);
+
 // In the event processing branch:
 let queued = event_tx.len();
-if queued > EVENT_CHANNEL_SIZE * 3 / 4 {
+if queued > EVENT_CHANNEL_SIZE * 3 / 4 && last_backpressure_warn.elapsed() > Duration::from_secs(10) {
     warn!("Event broadcast channel backpressure: {queued}/{EVENT_CHANNEL_SIZE} queued");
+    last_backpressure_warn = Instant::now();
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/net/src/net_interface.rs` around lines 198 - 201, The backpressure
warn! around event_tx.len() and EVENT_CHANNEL_SIZE can spam logs; add
rate-limiting by tracking state (e.g., a last_backpressure_warn: Instant or
backpressure_count: u32 on the same struct that contains the code) and only call
warn! when elapsed since last_backpressure_warn > Duration::from_secs(60) or
when backpressure_count % N == 0, resetting the timer/counter when queued drops
below the threshold; use std::time::Instant (or incrementing counter) in the
function that checks event_tx.len() to gate the warn! macro so warnings are
emitted at most once per minute (or every N occurrences).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/multithread/src/multithread.rs`:
- Around line 170-177: The subscriptions to EventType::E3Failed,
EventType::E3StageChanged, and EventType::E3RequestComplete via
bus.subscribe_all are currently unused because the Handler<EnclaveEvent>
implementation ignores all terminal events; either remove those subscribe_all
calls or implement terminal-event handling: add a tracked set (e.g.,
completed_e3_ids) in the actor, update it in the Handler for
E3Failed/E3StageChanged/E3RequestComplete to record terminal E3 IDs, and consult
that set at the start of the Handler for EnclaveEvent::ComputeRequest (or
equivalent) to skip stale requests; if this is intended future work, add a TODO
comment near bus.subscribe_all explaining the plan.

---

Nitpick comments:
In `@crates/net/src/net_interface.rs`:
- Around line 940-946: The record_failure function calls Instant::now() twice;
capture a single timestamp at the start (e.g., let ts = Instant::now()) and use
that ts for both or_insert((0, ts)) and for updating entry.1 so
failures.entry(*peer_id).or_insert((0, ts)) and entry.1 = ts reuse the same
Instant; keep the rest of the logic (cleanup_stale, increment entry.0, return)
unchanged.
- Around line 198-201: The backpressure warn! around event_tx.len() and
EVENT_CHANNEL_SIZE can spam logs; add rate-limiting by tracking state (e.g., a
last_backpressure_warn: Instant or backpressure_count: u32 on the same struct
that contains the code) and only call warn! when elapsed since
last_backpressure_warn > Duration::from_secs(60) or when backpressure_count % N
== 0, resetting the timer/counter when queued drops below the threshold; use
std::time::Instant (or incrementing counter) in the function that checks
event_tx.len() to gate the warn! macro so warnings are emitted at most once per
minute (or every N occurrences).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 8244a971-497c-4c67-8bdd-19e09ee6993a

📥 Commits

Reviewing files that changed from the base of the PR and between c13f636 and 28d47a3.

📒 Files selected for processing (2)
  • crates/multithread/src/multithread.rs
  • crates/net/src/net_interface.rs

Comment thread crates/multithread/src/multithread.rs
@cedoor cedoor merged commit 208d2f0 into main Mar 18, 2026
27 checks passed
@github-actions github-actions Bot deleted the feat/do-not-replay-certain-actions-on-sync branch March 26, 2026 03:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Prevent Duplicate Committee Finalization on Aggregator Restart

3 participants