feat(xmtp_api_d14n): BidiConnection actor — own both wire halves, internal liveness#3772
Conversation
|
Claude finished @tylerhawkes's task —— View job Code Review — June 26, 2026, 19:35 UTC
Review FeedbackOverall, this is excellent code — well-documented, properly abstracted, and thoroughly tested. The ownership-based teardown design is sound. A few observations: Minor Issues
Code Quality
Performance
Security
Test Coverage
Recommendation: Approve after addressing the minor test gap (malformed |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3772 +/- ##
==========================================
+ Coverage 84.40% 84.54% +0.13%
==========================================
Files 409 410 +1
Lines 60138 60716 +578
==========================================
+ Hits 50759 51331 +572
- Misses 9379 9385 +6 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
22b2c8b to
a9ffc28
Compare
e06c6e2 to
a9b8444
Compare
a9b8444 to
1993216
Compare
a9ffc28 to
249cf99
Compare
1993216 to
294f32b
Compare
249cf99 to
13bce00
Compare
…#3769) **Stack 1/4** of the XIP-83 bidi client lane: #3769 → #3770 → #3771 → #3772. Regenerated `xmtp.mls.api.v1` from xmtp/proto#337: the bidirectional `Subscribe` RPC with versioned `SubscribeRequest`/`SubscribeResponse`, id-based `Mutate` (cursors, `history_only`), `Ping`/`Pong`, `TopicsLive`, `CATCHUP_COMPLETE`, and STARTED `capabilities`. Purely additive (+1,896 generated lines); `proto_version` pinned to that branch's sha — draft until the proto PR merges.
294f32b to
630c62a
Compare
13bce00 to
0f1cead
Compare
630c62a to
b1273c9
Compare
0f1cead to
8bd3ec0
Compare
**Stack 2/4** of the XIP-83 bidi client lane: #3769 → #3770 → #3771 → #3772. Adds one primitive to the protocol-agnostic `Client` trait: `bidi_stream(request, path, BoxDynStream<Bytes>) -> http::Response<BytesStream>` — outbound stream of encoded protobuf messages in, inbound message stream out. Default implementation errors ("not supported by this transport"), so gRPC-Web/wasm and mocks are untouched; forwarded through the `&T`/`Box`/`Arc`/boxed-client impls. `GrpcClient` overrides it natively via tonic `Grpc::streaming` + the existing `TransparentCodec`, reusing the NonBlocking stream machinery verbatim. `build_tonic_request` made generic over the body type. 🤖 Generated with [Claude Code](https://claude.com/claude-code)
ApprovabilityVerdict: Needs human review 3 blocking correctness issues found. This PR introduces a substantial new BidiConnection actor (~1000 lines) with complex async networking logic for bidirectional stream management, liveness probing, and backpressure handling. New features of this scope introducing significant runtime behavior warrant human review, and there's an unresolved comment about potential blocking behavior in the actor loop. You can customize Macroscope's approvability policy. Learn more. |
|
one thing we'll want to figure out with BiDi is how we handle any potential difference in It could make the delineation between MLS and Bidi stream handling easier than if raw Bidi stream handling is implemented in |
| } | ||
| } | ||
| Some(Response::Messages(messages)) => { | ||
| if !messages.group_messages.is_empty() |
There was a problem hiding this comment.
these message types are where we will need to be careful about the v3/d14n differences. based on client type we can re-use the d14n or v3 extractor schemes here. namely, d14n message return from the D14nClientStreams include logic for the iceboxing & dependency resolution based on depends_on that v3 does not include. If we are going to use this bidi stream across many clients, we will need a new cursor store implementation that is decoupled from MLS (and potentially includes its own on-disk storage solution or sqlite db?) in order to keep the icebox consistent and avoid depending on any single client store
but maybe i'm speaking too soon and need more info/design on how individual clients will interact with the bidi actor, maybe we can pass along the proto and do the per-version processing later on?
There was a problem hiding this comment.
Agree — taking your "pass the proto along, process per-version later" path, with the extractor scheme + MLS-decoupled cursor store designed into the d14n follow-up after this lands.
8bd3ec0 to
b65f00e
Compare
b1273c9 to
75a4393
Compare
b65f00e to
c793213
Compare
75a4393 to
35d3d95
Compare
|
@insipx +1 — pushing bidi handling down behind a small unified-types interface to |
| /// The single writer: sole reader of the wire, sole producer of events. Ends | ||
| /// when the wire ends/errors or the consumer goes away; ending drops its | ||
| /// half of the channels, which is the close signal in both directions. | ||
| async fn run_actor<S, E>( | ||
| mut inbound: S, | ||
| outbound: mpsc::Sender<SubscribeRequest>, | ||
| events: mpsc::Sender<BidiEvent>, | ||
| ) where | ||
| S: futures::Stream<Item = Result<SubscribeResponse, E>> + Unpin, | ||
| E: std::fmt::Display, | ||
| { |
There was a problem hiding this comment.
🟡 Medium subscriptions/bidi.rs:186
When the server closes the bidi stream first, run_actor stops and next() returns None, but BidiSubscription.outbound is still alive so later mutate()/ping() calls can still return Ok(()) and enqueue frames into the dead request stream. Those requests are silently lost even though the API implies send failure means the stream is dead. Consider detecting when the actor stops and failing subsequent sends, or document that callers must stop using the subscription after next() returns None.
-async fn run_actor<S, E>(
+use tokio::sync::mpsc::error::SendError;
+
+async fn run_actor<S, E>(
mut inbound: S,
outbound: mpsc::Sender<SubscribeRequest>,
events: mpsc::Sender<BidiEvent>,
+ outbound_closed: tokio::sync::watch::Sender<bool>,
) where
S: futures::Stream<Item = Result<SubscribeResponse, E>> + Unpin,
E: std::fmt::Display,
{🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @crates/xmtp_mls/src/subscriptions/bidi.rs around lines 186-196:
When the server closes the bidi stream first, `run_actor` stops and `next()` returns `None`, but `BidiSubscription.outbound` is still alive so later `mutate()`/`ping()` calls can still return `Ok(())` and enqueue frames into the dead request stream. Those requests are silently lost even though the API implies send failure means the stream is dead. Consider detecting when the actor stops and failing subsequent sends, or document that callers must stop using the subscription after `next()` returns `None`.
#3771) **Stack 3/4** of the XIP-83 bidi client lane: #3769 → #3770 → #3771 → #3772. Native-only `XmtpMlsBidiStreams` trait in `xmtp_proto::api_client` — `subscribe_bidi(BoxStream<SubscribeRequest>) -> Stream<Result<SubscribeResponse>>` — and its `V3Client` implementation: prost-encode outbound frames over `bidi_stream("/xmtp.mls.api.v1.MlsApi/Subscribe")`, decode inbound via the existing `XmtpStream`. Browsers stay on `XmtpMlsStreams` + watchdog (gRPC-Web cannot full-duplex). 🤖 Generated with [Claude Code](https://claude.com/claude-code)
35d3d95 to
7aad79f
Compare
| Some(Response::TopicsLive(live)) => { | ||
| let event = BidiEvent::TopicsLive { | ||
| topics: live.topics.into_iter().map(Topic::from_bytes).collect(), | ||
| }; | ||
| if emit(&events, event).await { | ||
| break; | ||
| } | ||
| } |
There was a problem hiding this comment.
🟡 Medium subscriptions/bidi.rs:257
TopicsLive at line 259 uses Topic::from_bytes without validation, which its docs state can cause undefined behavior on invalid layouts. A server can send an empty topic or unknown kind byte, and this code will emit it as a BidiEvent::TopicsLive. Any later use of that Topic (e.g., Debug, Display, or kind()) will panic because those paths call self.inner[0] or TopicKind::try_from(..).expect(..) on unvalidated data. Consider filtering out or erroring on invalid topics instead of passing them through.
- Some(Response::TopicsLive(live)) => {
- let event = BidiEvent::TopicsLive {
- topics: live.topics.into_iter().map(Topic::from_bytes).collect(),
- };
+ Some(Response::TopicsLive(live)) => {
+ let topics: Vec<Topic> = live
+ .topics
+ .into_iter()
+ .filter_map(|bytes| Topic::try_from_bytes(&bytes).ok())
+ .collect();
+ let event = BidiEvent::TopicsLive { topics };
if emit(&events, event).await {
break;
}
}🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @crates/xmtp_mls/src/subscriptions/bidi.rs around lines 257-264:
`TopicsLive` at line 259 uses `Topic::from_bytes` without validation, which its docs state can cause undefined behavior on invalid layouts. A server can send an empty topic or unknown kind byte, and this code will emit it as a `BidiEvent::TopicsLive`. Any later use of that `Topic` (e.g., `Debug`, `Display`, or `kind()`) will panic because those paths call `self.inner[0]` or `TopicKind::try_from(..).expect(..)` on unvalidated data. Consider filtering out or erroring on invalid topics instead of passing them through.
7aad79f to
0232759
Compare
…ernal liveness Owns one bidi stream end-to-end at the transport layer as the sole writer of the request half: auto-answers server Pings, correlates client liveness probes internally (neither surfaces to the consumer), and forwards only real subscription events upward. Because the actor owns both halves, closing the inbound side tears the connection down in one place — a later mutate/probe returns Closed by channel ownership instead of silently enqueueing into a dead request stream. Relocated from xmtp_mls, where the actor held an outbound sender purely to auto-pong — which decoupled the request half's lifetime from the stream's and let post-close sends succeed silently.
0232759 to
893718b
Compare
| /// Send an event to the consumer; awaits a free slot (the backpressure that | ||
| /// stalls wire reads once [`EVENT_BUFFER`] fills) and returns true when the | ||
| /// consumer is gone and the actor should shut down. | ||
| async fn emit(events: &mpsc::Sender<BidiEvent>, event: BidiEvent) -> bool { |
There was a problem hiding this comment.
🟡 Medium v3/connection.rs:486
emit() awaits events.send(), which blocks the entire actor loop when EVENT_BUFFER fills. While blocked, the actor stops processing commands.recv(), so a Command::Probe times out even on a healthy connection—particularly after a resume that queues many events before the consumer drains them. Consider decoupling event emission from command processing, e.g. via a select! that concurrently handles commands while awaiting event sends.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @crates/xmtp_api_d14n/src/queries/v3/connection.rs around line 486:
`emit()` awaits `events.send()`, which blocks the entire actor loop when `EVENT_BUFFER` fills. While blocked, the actor stops processing `commands.recv()`, so a `Command::Probe` times out even on a healthy connection—particularly after a resume that queues many events before the consumer drains them. Consider decoupling event emission from command processing, e.g. via a `select!` that concurrently handles commands while awaiting event sends.
| // concurrently with everything else. This is what keeps a busy wire | ||
| // from blocking the loop. Gated on a non-empty queue so we only | ||
| // contend for a permit when there's something to flush. | ||
| permit = wire_out.reserve(), if !pending.is_empty() => { |
| let actor = xmtp_common::spawn( | ||
| None, | ||
| run_actor( | ||
| Box::pin(inbound), |
There was a problem hiding this comment.
small nit: shouldn't "inbound" be "wire_in" for consistency -- afaiu the inbound = messages we're getting and wire_out is messages we're sending off into bidi, where wire terminology is differentiating it from the in-mem message channels
Builds on the merged BidiConnection actor (#3772): - TrackedStatsClient forwards XmtpMlsBidiStreams so the feature-switched test client can open a BidiConnection. - BidiConnection::finish() half-closes the request half for bounded-sync. - Live v3 integration tests: handshake+welcome, catch-up/marker/live ordering, history_only, and bounded-sync half-close.
Builds on the merged BidiConnection actor (#3772): - TrackedStatsClient forwards XmtpMlsBidiStreams so the feature-switched test client can open a BidiConnection. - BidiConnection::finish() half-closes the request half for bounded-sync. - Live v3 integration tests: handshake+welcome, catch-up/marker/live ordering, history_only, and bounded-sync half-close.
Builds on the merged BidiConnection actor (#3772): - TrackedStatsClient forwards XmtpMlsBidiStreams so the feature-switched test client can open a BidiConnection. - BidiConnection::finish() half-closes the request half for bounded-sync. - Live v3 integration tests: handshake+welcome, catch-up/marker/live ordering, history_only, and bounded-sync half-close.
Builds on the merged BidiConnection actor (#3772): - TrackedStatsClient forwards XmtpMlsBidiStreams so the feature-switched test client can open a BidiConnection. - BidiConnection::finish() half-closes the request half for bounded-sync. - Live v3 integration tests: handshake+welcome, catch-up/marker/live ordering, history_only, and bounded-sync half-close.
Builds on the merged BidiConnection actor (#3772): - TrackedStatsClient forwards XmtpMlsBidiStreams so the feature-switched test client can open a BidiConnection. - BidiConnection::finish() half-closes the request half for bounded-sync. - Live v3 integration tests: handshake+welcome, catch-up/marker/live ordering, history_only, and bounded-sync half-close.
…3789) ## What Builds on the merged XIP-83 `BidiConnection` actor (#3772). Adds the bounded-sync **half-close** API and the first **live integration tests** that exercise a real `BidiConnection` against a running node, plus the small forward that lets the standard test client open one. ## Changes 1. **`TrackedStatsClient` forwards `XmtpMlsBidiStreams`** (`xmtp_api_d14n/src/queries/api_stats.rs`, native-only). The stats wrapper forwarded every client trait except bidi, so the feature-switched `TestClient` (`TrackedStatsClient<V3Client>`) could not open a `BidiConnection`. One delegating impl, no per-call stat (the bidi stream is opened once and mutated in place, not counted per RPC). 2. **`BidiConnection::finish()` — half-close / bounded sync** (XIP-83 server req 9 / client req 4). A new `Command::Finish` makes the actor drop its outbound (request) half so the stream half-closes; the server finishes the in-flight catch-up wave, emits its `TopicsLive` / `CatchUpComplete` markers, and closes its side, so the consumer drains `next()` to `None`. After `finish()`, `mutate`/`probe` return `Closed`. Opened with `Mutate::history_only`, this is the bounded catch-up ("sync") flow over the same connection. The response→event translation is factored into a shared `emit_response_events` so the live loop and the post-finish drain deliver identical events. 3. **Live v3 integration tests** (`xmtp_mls/src/subscriptions/bidi_tests.rs`). v3-only and native-only (the bidi transport is implemented for the v3 client and needs full-duplex HTTP/2; `TestClient` is only bidi-capable under the v3 config). They open a real `BidiConnection` against whichever backend the test feature switch selects and derive every assertion from the stream itself — using each frame's `is_commit` flag to count application messages independently of the MLS commits a membership change produces, rather than a side query: - `bidi_connection_delivers_live_welcome_over_the_wire` — handshake, live welcome after a membership change, ping/pong probe. - `bidi_catch_up_precedes_live_marker_then_streams_live` — the happy path in one test: pre-subscription history is caught up strictly before the `TopicsLive` marker; messages published while the stream starts arrive exactly once on either side of the marker; post-marker sends stream live and are strictly newer than every catch-up cursor; `CatchUpComplete` echoes the `mutate_id`. - `bidi_history_only_catches_up_then_delivers_nothing_live` — `history_only` catches up and emits the markers but registers nothing for live delivery. - `bidi_history_only_half_close_drains_then_server_closes` — bounded sync: `history_only` + `finish()`, the server closes after the wave, and `mutate` then returns `Closed`.
What
A bidirectional subscription connection actor (XIP-83), living at the transport layer in
xmtp_api_d14nnext to thesubscribe_biditransport. It owns a single bidi stream end-to-end and is the sole writer of the request half.This supersedes the original per-subscription actor that lived in
xmtp_mls(BidiSubscription): that design held an outbound sender purely to auto-pong, which decoupled the request half's lifetime from the stream's and let a post-closemutate/pingsucceed silently (Macroscope flagged exactly this). Pushing the actor down to where the wire lives fixes it by ownership rather than with a side-signal.Design
Command::{Mutate, Probe}over a channel; the actor multiplexes them with auto-pongs onto the one outbound FIFO. No second outbound sender exists anywhere.Ping→ auto-Pong, never surfaced.probe()sends a clientPingand the actor correlates the returningPongvia a oneshot.BidiEventhas noPing/Pongarm — consumers deal with zero keepalive.wire_out(request half closes, HTTP/2 torn down both ways), the command receiver (→mutate/probereturnClosed), the event sender (→next()ends), and any pending probe acks. One place, every teardown — the "enqueue into a dead stream" gap can't exist.probe()self-bounds atN × keepalive_interval_ms(N=3; 30s fallback untilStarted, whose cadence the actor captures).probe_within(Duration)lets a latency-sensitive caller — e.g. a push-notification handler — bound to seconds. A timeout returns the distinctProbeTimedOut→ drop + re-open; tight bounds are safe because resume-from-cursors dedupes.Notes
#[cfg(not(target_arch = "wasm32"))]), generic overXmtpMlsBidiStreams— mock-tested, no deployed node required.Topicparsing uses the validatingTryFrom(not the test-onlyfrom_bytes); malformedTopicsLivetopics are skipped with a warn.Note
Add
BidiConnectionactor for bidirectional subscription streams inxmtp_api_d14nBidiConnection, a handle that owns both inbound and outbound halves of a gRPC bidi stream.Mutatefirst, auto-responds to serverPingframes, and correlates clientprobe()/probe_within()calls toPongresponses for liveness checking.VecDequewhen the transport is not draining; if the backlog exceedsMAX_PENDING_FRAMES, the actor tears down the connection.BidiEventsurfaces structured events (Started,CatchUpComplete,TopicsLive,GroupMessages,WelcomeMessages) while hiding keepalive frames from callers.BidiConnectionaborts the actor and closes the stream; the module is excluded from wasm32 targets.Macroscope summarized 893718b.