Skip to content

don't drop receiver error when implementing net request #1237

Description

@ryardley

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's read the file at the specified lines
cd crates/net/src && head -100 net_sync_manager.rs | tail -50

Repository: gnosisguild/enclave

Length of output: 1593


🏁 Script executed:

# Also get the full context around lines 61-77
cat -n crates/net/src/net_sync_manager.rs | sed -n '50,90p'

Repository: gnosisguild/enclave

Length of output: 1548


🏁 Script executed:

# Check if there are imports related to broadcast
rg "broadcast" crates/net/src/net_sync_manager.rs -A 2 -B 2

Repository: gnosisguild/enclave

Length of output: 1517


🏁 Script executed:

# Search for other event receiver patterns in the codebase
rg "events\.recv\(\)" --type rust -B 2 -A 5

Repository: gnosisguild/enclave

Length of output: 1453


🏁 Script executed:

# Search for broadcast error handling patterns in the codebase
rg "RecvError|broadcast.*error" --type rust -B 2 -A 2

Repository: gnosisguild/enclave

Length of output: 1554


🏁 Script executed:

# Check if there are any places where Lagged is explicitly handled
rg "Lagged" --type rust -B 3 -A 3

Repository: gnosisguild/enclave

Length of output: 537


🏁 Script executed:

# Look for loop patterns with broadcast receivers
rg "loop \{" -A 10 --type rust | grep -A 10 "recv()"

Repository: gnosisguild/enclave

Length of output: 3642


🏁 Script executed:

# Check documentation or comments about broadcast channel behavior
rg "broadcast|Lagged|Closed" crates/net/src/ --type rust -B 1 -A 1 | head -60

Repository: gnosisguild/enclave

Length of output: 3572


Handle broadcast receiver errors to prevent silent forwarder exit.
while let Ok(...) exits on both RecvError::Lagged and RecvError::Closed, silently breaking sync forwarding under load. Continue on Lagged (receiver fell behind) and break only on Closed (channel shut down).

Suggested change
-            while let Ok(event) = events.recv().await {
-                debug!("Received event {:?}", event);
-                match event {
-                    NetEvent::OutgoingSyncRequestSucceeded(value) => addr.do_send(value),
-                    NetEvent::SyncRequestReceived(value) => addr.do_send(value),
-                    _ => (),
-                }
-            }
+            loop {
+                match events.recv().await {
+                    Ok(event) => {
+                        debug!("Received event {:?}", event);
+                        match event {
+                            NetEvent::OutgoingSyncRequestSucceeded(value) => addr.do_send(value),
+                            NetEvent::SyncRequestReceived(value) => addr.do_send(value),
+                            _ => (),
+                        }
+                    }
+                    Err(broadcast::error::RecvError::Lagged(_)) => continue,
+                    Err(broadcast::error::RecvError::Closed) => break,
+                }
+            }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

        let mut events = rx.resubscribe();
        let addr = Self::new(bus, tx, rx, eventstore).start();

        // Forward from NetEvent
        tokio::spawn({
            debug!("Spawning event receive loop!");
            let addr = addr.clone();
            async move {
                loop {
                    match events.recv().await {
                        Ok(event) => {
                            debug!("Received event {:?}", event);
                            match event {
                                NetEvent::OutgoingSyncRequestSucceeded(value) => addr.do_send(value),
                                NetEvent::SyncRequestReceived(value) => addr.do_send(value),
                                _ => (),
                            }
                        }
                        Err(broadcast::error::RecvError::Lagged(_)) => continue,
                        Err(broadcast::error::RecvError::Closed) => break,
                    }
                }
            }
🤖 Prompt for AI Agents
In `@crates/net/src/net_sync_manager.rs` around lines 61 - 77, The current
forwarder loop uses "while let Ok(event) = events.recv().await" which silently
exits on both RecvError::Lagged and RecvError::Closed; change it to an explicit
loop that matches events.recv().await and on Ok(event) continues to match
NetEvent variants (NetEvent::OutgoingSyncRequestSucceeded,
NetEvent::SyncRequestReceived) and call addr.do_send(value), on
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) continue (log debug if
desired), and on Err(RecvError::Closed) break to stop the loop; update the block
where events is created from rx.resubscribe() and the async move spawn to use
this match-based handling so lagging receivers don't terminate the forwarder.

Originally posted by @coderabbitai[bot] in #1153 (comment)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions