Skip to content

feat(xmtp_api_d14n): BidiConnection actor — own both wire halves, internal liveness#3772

Merged
tylerhawkes merged 1 commit into
mainfrom
tyler/xip83-4-bidi-actor
Jun 26, 2026
Merged

feat(xmtp_api_d14n): BidiConnection actor — own both wire halves, internal liveness#3772
tylerhawkes merged 1 commit into
mainfrom
tyler/xip83-4-bidi-actor

Conversation

@tylerhawkes

@tylerhawkes tylerhawkes commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

What

A bidirectional subscription connection actor (XIP-83), living at the transport layer in xmtp_api_d14n next to the subscribe_bidi transport. It owns a single bidi stream end-to-end and is the sole writer of the request half.

This supersedes the original per-subscription actor that lived in xmtp_mls (BidiSubscription): that design held an outbound sender purely to auto-pong, which decoupled the request half's lifetime from the stream's and let a post-close mutate/ping succeed silently (Macroscope flagged exactly this). Pushing the actor down to where the wire lives fixes it by ownership rather than with a side-signal.

Design

  • Sole owner of both halves. Callers submit Command::{Mutate, Probe} over a channel; the actor multiplexes them with auto-pongs onto the one outbound FIFO. No second outbound sender exists anywhere.
  • Liveness is internal. Server Ping → auto-Pong, never surfaced. probe() sends a client Ping and the actor correlates the returning Pong via a oneshot. BidiEvent has no Ping/Pong arm — consumers deal with zero keepalive.
  • Teardown by ownership. Inbound end/error → the actor drops wire_out (request half closes, HTTP/2 torn down both ways), the command receiver (→ mutate/probe return Closed), the event sender (→ next() ends), and any pending probe acks. One place, every teardown — the "enqueue into a dead stream" gap can't exist.
  • Bounded probe. probe() self-bounds at N × keepalive_interval_ms (N=3; 30s fallback until Started, whose cadence the actor captures). probe_within(Duration) lets a latency-sensitive caller — e.g. a push-notification handler — bound to seconds. A timeout returns the distinct ProbeTimedOut → drop + re-open; tight bounds are safe because resume-from-cursors dedupes.

Notes

  • Native-only (#[cfg(not(target_arch = "wasm32"))]), generic over XmtpMlsBidiStreams — mock-tested, no deployed node required.
  • This is the Phase-1 primitive the herald connection-multiplexer (Phase 2) drives per shard; durability / reconnection stays at the mux / consumer layer, not coupled into the actor.
  • Production Topic parsing uses the validating TryFrom (not the test-only from_bytes); malformed TopicsLive topics are skipped with a warn.
  • 11 unit tests: ownership-teardown, auto-pong-not-surfaced, probe round-trip / timeout / keepalive-derived default, wire-order, unknown-version tolerance.

Note

Add BidiConnection actor for bidirectional subscription streams in xmtp_api_d14n

  • Adds connection.rs, a native-only module exposing BidiConnection, a handle that owns both inbound and outbound halves of a gRPC bidi stream.
  • The actor sends an initial Mutate first, auto-responds to server Ping frames, and correlates client probe()/probe_within() calls to Pong responses for liveness checking.
  • Outbound frames are buffered in a VecDeque when the transport is not draining; if the backlog exceeds MAX_PENDING_FRAMES, the actor tears down the connection.
  • BidiEvent surfaces structured events (Started, CatchUpComplete, TopicsLive, GroupMessages, WelcomeMessages) while hiding keepalive frames from callers.
  • Dropping BidiConnection aborts the actor and closes the stream; the module is excluded from wasm32 targets.

Macroscope summarized 893718b.

@claude

claude Bot commented Jun 12, 2026

Copy link
Copy Markdown

Claude finished @tylerhawkes's task —— View job


Code Review — June 26, 2026, 19:35 UTC

  • Read and analyze the new connection.rs file
  • Review code quality, potential bugs, performance, security, and test coverage
  • Post review feedback

Review Feedback

Overall, this is excellent code — well-documented, properly abstracted, and thoroughly tested. The ownership-based teardown design is sound. A few observations:

Minor Issues

  1. Potential atomicity gap in probe_nonce (connection.rs:194-197)

    • fetch_add(1, Ordering::Relaxed).wrapping_add(1) results in nonces starting at 1 (since 0.fetch_add(1) returns 0, then +11). If this is intentional to avoid nonce 0, consider a comment. If not, just use fetch_add(1, ...) directly.
    • Using Relaxed ordering is fine here since nonce uniqueness doesn't require synchronization, but consider AcqRel if there are subtle ordering dependencies I missed.
  2. Event buffer sizing rationale (connection.rs:37)

    • EVENT_BUFFER: usize = 1024 is 16× larger than WIRE_BUFFER. The comment says "large enough that a brief consumer stall doesn't stall wire reads," which is good. Consider whether 1024 is empirically derived or could be tuned down (e.g., 256) if memory pressure becomes a concern at scale. Not urgent, just worth documenting if there's a specific reason for this ratio.
  3. Test coverage for malformed TopicsLive (connection.rs:438-448)

    • The code correctly validates topics via Topic::try_from and skips invalid ones with a warn. However, I don't see a test for this path (e.g., a server sending an empty topic byte array or invalid kind byte). Consider adding a regression test like skips_malformed_topics_in_topics_live to verify the filter_map logic and ensure the warning fires.
  4. Drop documentation (connection.rs:226-234)

    • The Drop impl correctly aborts the actor to prevent zombie keepalives. Minor: the comment "abort the actor so it cannot keep auto-ponging" could clarify that this also closes both wire directions immediately, which is critical for connection pool management in a multi-shard setup (Phase 2). Readers might miss that detail.

Code Quality

  • Excellent: The actor's ownership-based design eliminates the "send into closed stream" class of bugs that Macroscope caught in earlier revisions.
  • Excellent: Test suite is comprehensive (11 tests covering teardown, concurrency, backpressure, timeouts, wire-order preservation).
  • Good: Inline comments explain non-obvious choices (e.g., why try_send for the initial mutate, why reserve() branch prevents blocking).

Performance

  • The reserve() pattern (connection.rs:324-341) correctly prevents inbound stalls during wire backpressure. This is critical for keepalive correctness.
  • Bounded buffers are appropriately sized. The MAX_PENDING_FRAMES cap (connection.rs:52) provides backpressure without unbounded growth.

Security

  • No secrets are logged or exposed.
  • The actor aborts on Drop, preventing resource leaks (server-side subscription holds).
  • Malformed server frames (unknown versions, invalid topics) are handled defensively without panicking.

Test Coverage

  • 95.67% patch coverage is strong. The 21 missing lines (per Codecov) are likely edge cases in error paths or defensive branches (e.g., the None arm at line 471).
  • All critical paths (ownership teardown, probe correlation, auto-pong, wire-order) are tested.
  • Mock-based tests are clean and don't require a deployed node.

Recommendation: Approve after addressing the minor test gap (malformed TopicsLive). The nonce atomicity and buffer sizing are minor polish items, not blockers.


@codecov

codecov Bot commented Jun 12, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 94.46367% with 32 lines in your changes missing coverage. Please review.
✅ Project coverage is 84.54%. Comparing base (82a7c3b) to head (893718b).

Files with missing lines Patch % Lines
crates/xmtp_api_d14n/src/queries/v3/connection.rs 94.46% 32 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3772      +/-   ##
==========================================
+ Coverage   84.40%   84.54%   +0.13%     
==========================================
  Files         409      410       +1     
  Lines       60138    60716     +578     
==========================================
+ Hits        50759    51331     +572     
- Misses       9379     9385       +6     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Comment thread crates/xmtp_mls/src/subscriptions/bidi.rs Outdated
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-3-bidi-api branch from 22b2c8b to a9ffc28 Compare June 12, 2026 21:33
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-4-bidi-actor branch from e06c6e2 to a9b8444 Compare June 12, 2026 21:33
Comment thread crates/xmtp_mls/src/subscriptions/bidi.rs Outdated
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-4-bidi-actor branch from a9b8444 to 1993216 Compare June 12, 2026 21:59
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-3-bidi-api branch from a9ffc28 to 249cf99 Compare June 12, 2026 21:59
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-4-bidi-actor branch from 1993216 to 294f32b Compare June 15, 2026 20:20
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-3-bidi-api branch from 249cf99 to 13bce00 Compare June 15, 2026 20:20
tylerhawkes added a commit that referenced this pull request Jun 16, 2026
…#3769)

**Stack 1/4** of the XIP-83 bidi client lane: #3769#3770#3771#3772.

Regenerated `xmtp.mls.api.v1` from xmtp/proto#337: the bidirectional
`Subscribe` RPC with versioned `SubscribeRequest`/`SubscribeResponse`,
id-based `Mutate` (cursors, `history_only`), `Ping`/`Pong`,
`TopicsLive`, `CATCHUP_COMPLETE`, and STARTED `capabilities`. Purely
additive (+1,896 generated lines); `proto_version` pinned to that
branch's sha — draft until the proto PR merges.
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-4-bidi-actor branch from 294f32b to 630c62a Compare June 16, 2026 21:04
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-3-bidi-api branch from 13bce00 to 0f1cead Compare June 16, 2026 21:04
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-4-bidi-actor branch from 630c62a to b1273c9 Compare June 24, 2026 20:29
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-3-bidi-api branch from 0f1cead to 8bd3ec0 Compare June 24, 2026 20:29
tylerhawkes added a commit that referenced this pull request Jun 24, 2026
**Stack 2/4** of the XIP-83 bidi client lane: #3769#3770#3771#3772.

Adds one primitive to the protocol-agnostic `Client` trait:
`bidi_stream(request, path, BoxDynStream<Bytes>) ->
http::Response<BytesStream>` — outbound stream of encoded protobuf
messages in, inbound message stream out. Default implementation errors
("not supported by this transport"), so gRPC-Web/wasm and mocks are
untouched; forwarded through the `&T`/`Box`/`Arc`/boxed-client impls.
`GrpcClient` overrides it natively via tonic `Grpc::streaming` + the
existing `TransparentCodec`, reusing the NonBlocking stream machinery
verbatim. `build_tonic_request` made generic over the body type.

🤖 Generated with [Claude Code](https://claude.com/claude-code)
@tylerhawkes tylerhawkes marked this pull request as ready for review June 24, 2026 22:20
@tylerhawkes tylerhawkes requested a review from a team as a code owner June 24, 2026 22:20
@macroscopeapp

macroscopeapp Bot commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Approvability

Verdict: Needs human review

3 blocking correctness issues found. This PR introduces a substantial new BidiConnection actor (~1000 lines) with complex async networking logic for bidirectional stream management, liveness probing, and backpressure handling. New features of this scope introducing significant runtime behavior warrant human review, and there's an unresolved comment about potential blocking behavior in the actor loop.

You can customize Macroscope's approvability policy. Learn more.

@insipx

insipx commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

one thing we'll want to figure out with BiDi is how we handle any potential difference in V3/d14n APIs while also handling MLS logic. Mostly v3 and d14n differ in how they handle their proto types/what they do with those types handled by the Extractor system in d14n. That's where we define common Stream types for each version.

It could make the delineation between MLS and Bidi stream handling easier than if raw Bidi stream handling is implemented in xmtp_api_d14n or even its own stream crate and xmtp_mls is given only a small interface surface to interact with it, then we can test both the mls and stream parts separately, and also compose the interface xmtp_mls receives with the xmtp_proto::types unified GroupMessage/Group etc types to avoid dealing with raw protobuf types in the more easily complicated MLS code, and also avoid worrying about manually reconciling cursor schemes or other differences between v3/d14n (which would then be handled by the extractors)

}
}
Some(Response::Messages(messages)) => {
if !messages.group_messages.is_empty()

@insipx insipx Jun 25, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these message types are where we will need to be careful about the v3/d14n differences. based on client type we can re-use the d14n or v3 extractor schemes here. namely, d14n message return from the D14nClientStreams include logic for the iceboxing & dependency resolution based on depends_on that v3 does not include. If we are going to use this bidi stream across many clients, we will need a new cursor store implementation that is decoupled from MLS (and potentially includes its own on-disk storage solution or sqlite db?) in order to keep the icebox consistent and avoid depending on any single client store

but maybe i'm speaking too soon and need more info/design on how individual clients will interact with the bidi actor, maybe we can pass along the proto and do the per-version processing later on?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree — taking your "pass the proto along, process per-version later" path, with the extractor scheme + MLS-decoupled cursor store designed into the d14n follow-up after this lands.

Comment thread crates/xmtp_mls/src/subscriptions/bidi.rs Outdated
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-3-bidi-api branch from 8bd3ec0 to b65f00e Compare June 25, 2026 17:53
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-4-bidi-actor branch from b1273c9 to 75a4393 Compare June 25, 2026 17:53
Comment thread crates/xmtp_mls/src/subscriptions/bidi.rs Outdated
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-3-bidi-api branch from b65f00e to c793213 Compare June 25, 2026 19:13
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-4-bidi-actor branch from 75a4393 to 35d3d95 Compare June 25, 2026 19:18
@tylerhawkes

Copy link
Copy Markdown
Contributor Author

@insipx +1 — pushing bidi handling down behind a small unified-types interface to xmtp_mls is the right shape, and I'll do it as part of the d14n lane rather than this PR.

Comment on lines +186 to +196
/// The single writer: sole reader of the wire, sole producer of events. Ends
/// when the wire ends/errors or the consumer goes away; ending drops its
/// half of the channels, which is the close signal in both directions.
async fn run_actor<S, E>(
mut inbound: S,
outbound: mpsc::Sender<SubscribeRequest>,
events: mpsc::Sender<BidiEvent>,
) where
S: futures::Stream<Item = Result<SubscribeResponse, E>> + Unpin,
E: std::fmt::Display,
{

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium subscriptions/bidi.rs:186

When the server closes the bidi stream first, run_actor stops and next() returns None, but BidiSubscription.outbound is still alive so later mutate()/ping() calls can still return Ok(()) and enqueue frames into the dead request stream. Those requests are silently lost even though the API implies send failure means the stream is dead. Consider detecting when the actor stops and failing subsequent sends, or document that callers must stop using the subscription after next() returns None.

-async fn run_actor<S, E>(
+use tokio::sync::mpsc::error::SendError;
+
+async fn run_actor<S, E>(
     mut inbound: S,
     outbound: mpsc::Sender<SubscribeRequest>,
     events: mpsc::Sender<BidiEvent>,
+    outbound_closed: tokio::sync::watch::Sender<bool>,
 ) where
     S: futures::Stream<Item = Result<SubscribeResponse, E>> + Unpin,
     E: std::fmt::Display,
 {
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @crates/xmtp_mls/src/subscriptions/bidi.rs around lines 186-196:

When the server closes the bidi stream first, `run_actor` stops and `next()` returns `None`, but `BidiSubscription.outbound` is still alive so later `mutate()`/`ping()` calls can still return `Ok(())` and enqueue frames into the dead request stream. Those requests are silently lost even though the API implies send failure means the stream is dead. Consider detecting when the actor stops and failing subsequent sends, or document that callers must stop using the subscription after `next()` returns `None`.

Base automatically changed from tyler/xip83-3-bidi-api to main June 25, 2026 19:45
tylerhawkes added a commit that referenced this pull request Jun 25, 2026
#3771)

**Stack 3/4** of the XIP-83 bidi client lane: #3769#3770#3771#3772.

Native-only `XmtpMlsBidiStreams` trait in `xmtp_proto::api_client` —
`subscribe_bidi(BoxStream<SubscribeRequest>) ->
Stream<Result<SubscribeResponse>>` — and its `V3Client` implementation:
prost-encode outbound frames over
`bidi_stream("/xmtp.mls.api.v1.MlsApi/Subscribe")`, decode inbound via
the existing `XmtpStream`. Browsers stay on `XmtpMlsStreams` + watchdog
(gRPC-Web cannot full-duplex).

🤖 Generated with [Claude Code](https://claude.com/claude-code)
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-4-bidi-actor branch from 35d3d95 to 7aad79f Compare June 25, 2026 21:49
Comment on lines +257 to +264
Some(Response::TopicsLive(live)) => {
let event = BidiEvent::TopicsLive {
topics: live.topics.into_iter().map(Topic::from_bytes).collect(),
};
if emit(&events, event).await {
break;
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium subscriptions/bidi.rs:257

TopicsLive at line 259 uses Topic::from_bytes without validation, which its docs state can cause undefined behavior on invalid layouts. A server can send an empty topic or unknown kind byte, and this code will emit it as a BidiEvent::TopicsLive. Any later use of that Topic (e.g., Debug, Display, or kind()) will panic because those paths call self.inner[0] or TopicKind::try_from(..).expect(..) on unvalidated data. Consider filtering out or erroring on invalid topics instead of passing them through.

-            Some(Response::TopicsLive(live)) => {
-                let event = BidiEvent::TopicsLive {
-                    topics: live.topics.into_iter().map(Topic::from_bytes).collect(),
-                };
+            Some(Response::TopicsLive(live)) => {
+                let topics: Vec<Topic> = live
+                    .topics
+                    .into_iter()
+                    .filter_map(|bytes| Topic::try_from_bytes(&bytes).ok())
+                    .collect();
+                let event = BidiEvent::TopicsLive { topics };
                 if emit(&events, event).await {
                     break;
                 }
             }
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @crates/xmtp_mls/src/subscriptions/bidi.rs around lines 257-264:

`TopicsLive` at line 259 uses `Topic::from_bytes` without validation, which its docs state can cause undefined behavior on invalid layouts. A server can send an empty topic or unknown kind byte, and this code will emit it as a `BidiEvent::TopicsLive`. Any later use of that `Topic` (e.g., `Debug`, `Display`, or `kind()`) will panic because those paths call `self.inner[0]` or `TopicKind::try_from(..).expect(..)` on unvalidated data. Consider filtering out or erroring on invalid topics instead of passing them through.

@tylerhawkes tylerhawkes force-pushed the tyler/xip83-4-bidi-actor branch from 7aad79f to 0232759 Compare June 26, 2026 18:32
@tylerhawkes tylerhawkes changed the title feat(xmtp_mls): BidiSubscription actor (mutate-in-place, auto-pong liveness) feat(xmtp_api_d14n): BidiConnection actor — own both wire halves, internal liveness Jun 26, 2026
…ernal liveness

Owns one bidi stream end-to-end at the transport layer as the sole writer of the request half: auto-answers server Pings, correlates client liveness probes internally (neither surfaces to the consumer), and forwards only real subscription events upward. Because the actor owns both halves, closing the inbound side tears the connection down in one place — a later mutate/probe returns Closed by channel ownership instead of silently enqueueing into a dead request stream.

Relocated from xmtp_mls, where the actor held an outbound sender purely to auto-pong — which decoupled the request half's lifetime from the stream's and let post-close sends succeed silently.
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-4-bidi-actor branch from 0232759 to 893718b Compare June 26, 2026 19:34
/// Send an event to the consumer; awaits a free slot (the backpressure that
/// stalls wire reads once [`EVENT_BUFFER`] fills) and returns true when the
/// consumer is gone and the actor should shut down.
async fn emit(events: &mpsc::Sender<BidiEvent>, event: BidiEvent) -> bool {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium v3/connection.rs:486

emit() awaits events.send(), which blocks the entire actor loop when EVENT_BUFFER fills. While blocked, the actor stops processing commands.recv(), so a Command::Probe times out even on a healthy connection—particularly after a resume that queues many events before the consumer drains them. Consider decoupling event emission from command processing, e.g. via a select! that concurrently handles commands while awaiting event sends.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @crates/xmtp_api_d14n/src/queries/v3/connection.rs around line 486:

`emit()` awaits `events.send()`, which blocks the entire actor loop when `EVENT_BUFFER` fills. While blocked, the actor stops processing `commands.recv()`, so a `Command::Probe` times out even on a healthy connection—particularly after a resume that queues many events before the consumer drains them. Consider decoupling event emission from command processing, e.g. via a `select!` that concurrently handles commands while awaiting event sends.

// concurrently with everything else. This is what keeps a busy wire
// from blocking the loop. Gated on a non-empty queue so we only
// contend for a permit when there's something to flush.
permit = wire_out.reserve(), if !pending.is_empty() => {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL abt the reserve() api

@tylerhawkes tylerhawkes enabled auto-merge (squash) June 26, 2026 20:08
let actor = xmtp_common::spawn(
None,
run_actor(
Box::pin(inbound),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit: shouldn't "inbound" be "wire_in" for consistency -- afaiu the inbound = messages we're getting and wire_out is messages we're sending off into bidi, where wire terminology is differentiating it from the in-mem message channels

@tylerhawkes tylerhawkes merged commit 7a0f50f into main Jun 26, 2026
48 checks passed
@tylerhawkes tylerhawkes deleted the tyler/xip83-4-bidi-actor branch June 26, 2026 20:21
tylerhawkes added a commit that referenced this pull request Jun 29, 2026
Builds on the merged BidiConnection actor (#3772):
- TrackedStatsClient forwards XmtpMlsBidiStreams so the feature-switched test
  client can open a BidiConnection.
- BidiConnection::finish() half-closes the request half for bounded-sync.
- Live v3 integration tests: handshake+welcome, catch-up/marker/live ordering,
  history_only, and bounded-sync half-close.
tylerhawkes added a commit that referenced this pull request Jun 29, 2026
Builds on the merged BidiConnection actor (#3772):
- TrackedStatsClient forwards XmtpMlsBidiStreams so the feature-switched test
  client can open a BidiConnection.
- BidiConnection::finish() half-closes the request half for bounded-sync.
- Live v3 integration tests: handshake+welcome, catch-up/marker/live ordering,
  history_only, and bounded-sync half-close.
tylerhawkes added a commit that referenced this pull request Jun 29, 2026
Builds on the merged BidiConnection actor (#3772):
- TrackedStatsClient forwards XmtpMlsBidiStreams so the feature-switched test
  client can open a BidiConnection.
- BidiConnection::finish() half-closes the request half for bounded-sync.
- Live v3 integration tests: handshake+welcome, catch-up/marker/live ordering,
  history_only, and bounded-sync half-close.
tylerhawkes added a commit that referenced this pull request Jun 29, 2026
Builds on the merged BidiConnection actor (#3772):
- TrackedStatsClient forwards XmtpMlsBidiStreams so the feature-switched test
  client can open a BidiConnection.
- BidiConnection::finish() half-closes the request half for bounded-sync.
- Live v3 integration tests: handshake+welcome, catch-up/marker/live ordering,
  history_only, and bounded-sync half-close.
tylerhawkes added a commit that referenced this pull request Jun 29, 2026
Builds on the merged BidiConnection actor (#3772):
- TrackedStatsClient forwards XmtpMlsBidiStreams so the feature-switched test
  client can open a BidiConnection.
- BidiConnection::finish() half-closes the request half for bounded-sync.
- Live v3 integration tests: handshake+welcome, catch-up/marker/live ordering,
  history_only, and bounded-sync half-close.
tylerhawkes added a commit that referenced this pull request Jun 29, 2026
…3789)

## What

Builds on the merged XIP-83 `BidiConnection` actor (#3772). Adds the
bounded-sync **half-close** API and the first **live integration tests**
that exercise a real `BidiConnection` against a running node, plus the
small forward that lets the standard test client open one.

## Changes

1. **`TrackedStatsClient` forwards `XmtpMlsBidiStreams`**
(`xmtp_api_d14n/src/queries/api_stats.rs`, native-only). The stats
wrapper forwarded every client trait except bidi, so the
feature-switched `TestClient` (`TrackedStatsClient<V3Client>`) could not
open a `BidiConnection`. One delegating impl, no per-call stat (the bidi
stream is opened once and mutated in place, not counted per RPC).

2. **`BidiConnection::finish()` — half-close / bounded sync** (XIP-83
server req 9 / client req 4). A new `Command::Finish` makes the actor
drop its outbound (request) half so the stream half-closes; the server
finishes the in-flight catch-up wave, emits its `TopicsLive` /
`CatchUpComplete` markers, and closes its side, so the consumer drains
`next()` to `None`. After `finish()`, `mutate`/`probe` return `Closed`.
Opened with `Mutate::history_only`, this is the bounded catch-up
("sync") flow over the same connection. The response→event translation
is factored into a shared `emit_response_events` so the live loop and
the post-finish drain deliver identical events.

3. **Live v3 integration tests**
(`xmtp_mls/src/subscriptions/bidi_tests.rs`). v3-only and native-only
(the bidi transport is implemented for the v3 client and needs
full-duplex HTTP/2; `TestClient` is only bidi-capable under the v3
config). They open a real `BidiConnection` against whichever backend the
test feature switch selects and derive every assertion from the stream
itself — using each frame's `is_commit` flag to count application
messages independently of the MLS commits a membership change produces,
rather than a side query:
- `bidi_connection_delivers_live_welcome_over_the_wire` — handshake,
live welcome after a membership change, ping/pong probe.
- `bidi_catch_up_precedes_live_marker_then_streams_live` — the happy
path in one test: pre-subscription history is caught up strictly before
the `TopicsLive` marker; messages published while the stream starts
arrive exactly once on either side of the marker; post-marker sends
stream live and are strictly newer than every catch-up cursor;
`CatchUpComplete` echoes the `mutate_id`.
- `bidi_history_only_catches_up_then_delivers_nothing_live` —
`history_only` catches up and emits the markers but registers nothing
for live delivery.
- `bidi_history_only_half_close_drains_then_server_closes` — bounded
sync: `history_only` + `finish()`, the server closes after the wave, and
`mutate` then returns `Closed`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants