Skip to content

Backlog handling of disconnected#88

Merged
louisponet merged 3 commits into
mainfrom
lopo/backlog_handling_of_disconnected
Jun 7, 2026
Merged

Backlog handling of disconnected#88
louisponet merged 3 commits into
mainfrom
lopo/backlog_handling_of_disconnected

Conversation

@louisponet

Copy link
Copy Markdown
Contributor

Write to backlog for a disconnected connection

@louisponet louisponet requested a review from a team June 7, 2026 16:02

@devin-ai-integration devin-ai-integration Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 3 potential issues.

View 2 additional findings in Devin Review.

Open in Devin Review

Comment on lines +184 to +189
for (_, c) in &mut self.to_be_reconnected {
let ConnectionVariant::Outbound(tcp) = c else {
unreachable!("only outbound should be auto reconnected");
};
tcp.backlog_push_shared(&self.bcast_header, &self.bcast_payload);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Broadcast sends duplicate messages to streams that disconnect during drain_backlog

When broadcast() calls write_or_enqueue_shared() on a stream with a non-empty backlog and drain_backlog() returns Disconnected, the new code at stream.rs:400-401 pushes the broadcast frame into the stream's send_backlog. Then broadcast() calls disconnect_at_index_pending(i) which moves the stream (with its backlog already containing the frame) into self.to_be_reconnected via connector.rs:128. Immediately after the first loop, the second loop at connector.rs:184-189 iterates over ALL entries in to_be_reconnected and calls backlog_push_shared on each — pushing the same frame a second time into the same stream's backlog. When the connection is later re-established, the duplicate message will be sent to the peer.

Prompt for agents
The broadcast method has two mechanisms that both push the same frame to a disconnected stream's backlog:

1. In stream.rs write_or_enqueue_shared (lines 399-402): when drain_backlog returns Disconnected, the new code enqueues the current frame into the stream's send_backlog.
2. In connector.rs broadcast (lines 184-189): after the main loop, a second loop pushes the frame to ALL entries in to_be_reconnected, including streams that just transitioned there from the first loop.

This causes the message to be queued twice for outbound streams that disconnect during the drain_backlog path.

Possible fixes:
- Track which streams were already in to_be_reconnected before the broadcast loop started (e.g., record the initial length), and only iterate over those in the second loop.
- Or, remove the enqueue_back from write_or_enqueue_shared's disconnect path, and instead let the broadcast loop handle it. But this would break the Single send path in connector.rs write_or_enqueue_with, which relies on the stream-level enqueue to preserve the message. So the first approach (limiting the second loop to pre-existing entries) is cleaner.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines 215 to 223
} else if let Some(c) =
self.to_be_reconnected.iter_mut().find_map(|(t, c)| (*t == token).then_some(c))
{
let ConnectionVariant::Outbound(c) = c else {
unreachable!("only outbounds can be in to be reconnected");
};
c.backlog_push(serialise);
error!("tcp sending: unknown token {token:?}");
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Error log fires on valid backlog-push path and silent drop for truly unknown tokens

The error!("tcp sending: unknown token {token:?}") at connector.rs:222 is inside the else if block that successfully finds the token in to_be_reconnected and pushes to its backlog. This emits a noisy ERROR-level log on every single-target send to a disconnected-but-reconnecting stream, with an incorrect "unknown token" message. Additionally, by converting the unconditional else to else if, the case where a token is not found in either self.conns or self.to_be_reconnected is now silently dropped with no diagnostic logging at all — a regression from the original code that logged an error for truly unknown tokens.

Suggested change
} else if let Some(c) =
self.to_be_reconnected.iter_mut().find_map(|(t, c)| (*t == token).then_some(c))
{
let ConnectionVariant::Outbound(c) = c else {
unreachable!("only outbounds can be in to be reconnected");
};
c.backlog_push(serialise);
error!("tcp sending: unknown token {token:?}");
}
} else if let Some(c) =
self.to_be_reconnected.iter_mut().find_map(|(t, c)| (*t == token).then_some(c))
{
let ConnectionVariant::Outbound(c) = c else {
unreachable!("only outbounds can be in to be reconnected");
};
c.backlog_push(serialise);
} else {
error!("tcp sending: unknown token {token:?}");
}
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +184 to +189
for (_, c) in &mut self.to_be_reconnected {
let ConnectionVariant::Outbound(tcp) = c else {
unreachable!("only outbound should be auto reconnected");
};
tcp.backlog_push_shared(&self.bcast_header, &self.bcast_payload);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Unbounded backlog growth for disconnected streams

The new code queues messages into send_backlog for streams in to_be_reconnected without any size limits. While ConnectionManager has a max_backlog field (connector.rs:70-71) that enforces backlog limits for connected streams, there's no equivalent check for disconnected streams. If a stream remains disconnected for a long time while broadcasts or single sends continue, the backlog could grow unboundedly, consuming significant memory. The max_backlog enforcement (presumably done in a flush/check pass not shown in this diff) would only apply once the stream reconnects and moves back to conns. This could be an issue in production scenarios where a peer is down for extended periods.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Preserve the current frame exactly once when an outbound stream disconnects during broadcast or single-token sends, then queue it on the reconnect backlog after the stream has moved into the reconnect set.

Apply the existing max-backlog threshold and timeout to both active streams and disconnected outbound streams so reconnect backlogs cannot grow indefinitely while a peer is down.
@louisponet louisponet merged commit 89d5a0e into main Jun 7, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants