Backlog handling of disconnected#88
Conversation
| for (_, c) in &mut self.to_be_reconnected { | ||
| let ConnectionVariant::Outbound(tcp) = c else { | ||
| unreachable!("only outbound should be auto reconnected"); | ||
| }; | ||
| tcp.backlog_push_shared(&self.bcast_header, &self.bcast_payload); | ||
| } |
There was a problem hiding this comment.
🔴 Broadcast sends duplicate messages to streams that disconnect during drain_backlog
When broadcast() calls write_or_enqueue_shared() on a stream with a non-empty backlog and drain_backlog() returns Disconnected, the new code at stream.rs:400-401 pushes the broadcast frame into the stream's send_backlog. Then broadcast() calls disconnect_at_index_pending(i) which moves the stream (with its backlog already containing the frame) into self.to_be_reconnected via connector.rs:128. Immediately after the first loop, the second loop at connector.rs:184-189 iterates over ALL entries in to_be_reconnected and calls backlog_push_shared on each — pushing the same frame a second time into the same stream's backlog. When the connection is later re-established, the duplicate message will be sent to the peer.
Prompt for agents
The broadcast method has two mechanisms that both push the same frame to a disconnected stream's backlog:
1. In stream.rs write_or_enqueue_shared (lines 399-402): when drain_backlog returns Disconnected, the new code enqueues the current frame into the stream's send_backlog.
2. In connector.rs broadcast (lines 184-189): after the main loop, a second loop pushes the frame to ALL entries in to_be_reconnected, including streams that just transitioned there from the first loop.
This causes the message to be queued twice for outbound streams that disconnect during the drain_backlog path.
Possible fixes:
- Track which streams were already in to_be_reconnected before the broadcast loop started (e.g., record the initial length), and only iterate over those in the second loop.
- Or, remove the enqueue_back from write_or_enqueue_shared's disconnect path, and instead let the broadcast loop handle it. But this would break the Single send path in connector.rs write_or_enqueue_with, which relies on the stream-level enqueue to preserve the message. So the first approach (limiting the second loop to pre-existing entries) is cleaner.
Was this helpful? React with 👍 or 👎 to provide feedback.
| } else if let Some(c) = | ||
| self.to_be_reconnected.iter_mut().find_map(|(t, c)| (*t == token).then_some(c)) | ||
| { | ||
| let ConnectionVariant::Outbound(c) = c else { | ||
| unreachable!("only outbounds can be in to be reconnected"); | ||
| }; | ||
| c.backlog_push(serialise); | ||
| error!("tcp sending: unknown token {token:?}"); | ||
| } |
There was a problem hiding this comment.
🟡 Error log fires on valid backlog-push path and silent drop for truly unknown tokens
The error!("tcp sending: unknown token {token:?}") at connector.rs:222 is inside the else if block that successfully finds the token in to_be_reconnected and pushes to its backlog. This emits a noisy ERROR-level log on every single-target send to a disconnected-but-reconnecting stream, with an incorrect "unknown token" message. Additionally, by converting the unconditional else to else if, the case where a token is not found in either self.conns or self.to_be_reconnected is now silently dropped with no diagnostic logging at all — a regression from the original code that logged an error for truly unknown tokens.
| } else if let Some(c) = | |
| self.to_be_reconnected.iter_mut().find_map(|(t, c)| (*t == token).then_some(c)) | |
| { | |
| let ConnectionVariant::Outbound(c) = c else { | |
| unreachable!("only outbounds can be in to be reconnected"); | |
| }; | |
| c.backlog_push(serialise); | |
| error!("tcp sending: unknown token {token:?}"); | |
| } | |
| } else if let Some(c) = | |
| self.to_be_reconnected.iter_mut().find_map(|(t, c)| (*t == token).then_some(c)) | |
| { | |
| let ConnectionVariant::Outbound(c) = c else { | |
| unreachable!("only outbounds can be in to be reconnected"); | |
| }; | |
| c.backlog_push(serialise); | |
| } else { | |
| error!("tcp sending: unknown token {token:?}"); | |
| } |
Was this helpful? React with 👍 or 👎 to provide feedback.
| for (_, c) in &mut self.to_be_reconnected { | ||
| let ConnectionVariant::Outbound(tcp) = c else { | ||
| unreachable!("only outbound should be auto reconnected"); | ||
| }; | ||
| tcp.backlog_push_shared(&self.bcast_header, &self.bcast_payload); | ||
| } |
There was a problem hiding this comment.
🚩 Unbounded backlog growth for disconnected streams
The new code queues messages into send_backlog for streams in to_be_reconnected without any size limits. While ConnectionManager has a max_backlog field (connector.rs:70-71) that enforces backlog limits for connected streams, there's no equivalent check for disconnected streams. If a stream remains disconnected for a long time while broadcasts or single sends continue, the backlog could grow unboundedly, consuming significant memory. The max_backlog enforcement (presumably done in a flush/check pass not shown in this diff) would only apply once the stream reconnects and moves back to conns. This could be an issue in production scenarios where a peer is down for extended periods.
Was this helpful? React with 👍 or 👎 to provide feedback.
Preserve the current frame exactly once when an outbound stream disconnects during broadcast or single-token sends, then queue it on the reconnect backlog after the stream has moved into the reconnect set. Apply the existing max-backlog threshold and timeout to both active streams and disconnected outbound streams so reconnect backlogs cannot grow indefinitely while a peer is down.
Write to backlog for a disconnected connection