Conversation
New crate `arcbox-xnu-net` wraps platform-specific batch datagram syscalls for high-throughput SOCK_DGRAM I/O: - macOS: XNU's recvmsg_x(2) / sendmsg_x(2) (syscalls 480/481) - Linux: recvmmsg(2) / sendmmsg(2) Provides BatchDgram (sync) and AsyncBatchDgram (tokio feature) that batch up to 256 datagrams per syscall, reducing syscall overhead by ~99% for workloads like L2 frame exchange over AF_UNIX socketpairs.
13 tests covering: - Single/batch/roundtrip send and receive - WouldBlock on empty fd, partial batch, zero-length datagrams - MAX_BATCH cap enforcement, varying datagram sizes - Threaded producer/consumer correctness (200 datagrams) - Async roundtrip and sequential send/recv via tokio
Benchmarks on AF_UNIX SOCK_DGRAM socketpair (10K × 4000B datagrams):
- single_rw: baseline read()/write() per datagram
- batch_rw/{64,128,256}: send_batch/recv_batch at varying batch sizes
Greptile SummaryThis PR introduces
Confidence Score: 4/5Three P1 defects (errno TOCTOU, missing RxEntry accessor in async API, benchmark deadlock) should be resolved before merge. The FFI bindings and overall architecture are sound, but the errno double-read is a present defect in error reporting, the async API exposes an incomplete contract vs. the sync API, and the benchmark will hang making the stated test plan item unchecked-in-practice. All three are straightforward to fix. src/batch.rs (all four platform methods), src/async_batch.rs (recv_batch return type / entries accessor), benches/throughput.rs (send/recv interleaving). Important Files Changed
Sequence DiagramsequenceDiagram
participant Caller
participant BatchDgram
participant AsyncBatchDgram
participant AsyncFd
participant Kernel
Note over Caller,Kernel: Sync path (BatchDgram)
Caller->>BatchDgram: send_batch(fd, bufs)
BatchDgram->>BatchDgram: setup_send (fill iovecs & hdrs)
BatchDgram->>Kernel: sendmsg_x / sendmmsg (batch syscall)
Kernel-->>BatchDgram: n datagrams sent (or -1 + errno)
BatchDgram-->>Caller: Ok(n)
Caller->>BatchDgram: recv_batch(fd, bufs)
BatchDgram->>BatchDgram: setup_recv (fill iovecs & hdrs)
BatchDgram->>Kernel: recvmsg_x / recvmmsg (batch syscall)
Kernel-->>BatchDgram: n datagrams + msg_datalen per slot
BatchDgram->>BatchDgram: collect_recv → entries[..n]
BatchDgram-->>Caller: Ok((&entries[..n], n))
Note over Caller,Kernel: Async path (AsyncBatchDgram)
Caller->>AsyncBatchDgram: recv_batch(bufs)
AsyncBatchDgram->>AsyncFd: readable().await
AsyncFd-->>AsyncBatchDgram: ReadyGuard
AsyncBatchDgram->>BatchDgram: recv_batch(fd, bufs)
BatchDgram->>Kernel: recvmsg_x / recvmmsg
Kernel-->>BatchDgram: n datagrams
BatchDgram-->>AsyncBatchDgram: Ok((&entries, n))
AsyncBatchDgram-->>Caller: Ok(n) ⚠️ entries not exposed
Reviews (1): Last reviewed commit: "perf(xnu-net): add criterion benchmarks ..." | Re-trigger Greptile |
| match io::Error::last_os_error().kind() { | ||
| io::ErrorKind::Interrupted => {} | ||
| io::ErrorKind::WouldBlock => return Ok(0), | ||
| _ => return Err(io::Error::last_os_error()), | ||
| } |
There was a problem hiding this comment.
Double-call TOCTOU on
last_os_error
io::Error::last_os_error() is called twice: once to get the ErrorKind for the match, then again in the wildcard arm to construct the returned error. Between the two calls a signal handler could invoke a failing syscall and overwrite errno, causing the error returned to the caller to differ from the one that was matched. Capture the error once:
| match io::Error::last_os_error().kind() { | |
| io::ErrorKind::Interrupted => {} | |
| io::ErrorKind::WouldBlock => return Ok(0), | |
| _ => return Err(io::Error::last_os_error()), | |
| } | |
| let err = io::Error::last_os_error(); | |
| match err.kind() { | |
| io::ErrorKind::Interrupted => {} | |
| io::ErrorKind::WouldBlock => return Ok(0), | |
| _ => return Err(err), | |
| } |
The same pattern appears in platform_send (macOS, line ~190), platform_recv (Linux, line ~245), and platform_send (Linux, line ~269).
| /// Waits for the FD to become readable, then receives a batch. | ||
| /// | ||
| /// Returns the number of datagrams received. Per-datagram lengths are | ||
| /// accessible via the returned count and the internal entries. | ||
| /// | ||
| /// # Errors | ||
| /// | ||
| /// Propagates I/O errors from the underlying batch receive. | ||
| #[allow(clippy::future_not_send)] // intentional: contains raw pointers from iovec arrays | ||
| pub async fn recv_batch(&mut self, bufs: &mut [&mut [u8]]) -> io::Result<usize> { | ||
| loop { | ||
| let mut guard = self.inner.readable().await?; | ||
| match guard.try_io(|inner| { | ||
| let fd = inner.get_ref().as_raw_fd(); | ||
| self.batch.recv_batch(fd, bufs).map(|(_, n)| n) | ||
| }) { | ||
| Ok(result) => return result, | ||
| Err(_would_block) => {} | ||
| } | ||
| } |
There was a problem hiding this comment.
RxEntry metadata inaccessible after async recv
The doc comment states "Per-datagram lengths are accessible via the returned count and the internal entries", but AsyncBatchDgram::recv_batch only returns the datagram count as usize. The batch: BatchDgram field is private and there is no public entries() accessor method. Callers with variable-length datagrams have no way to determine how many bytes were received per slot — they are forced to scan the raw buffer contents themselves.
Either expose the entries slice (e.g. pub fn last_entries(&self) -> &[RxEntry]), change the return type to mirror BatchDgram::recv_batch (returning (Vec<RxEntry>, usize)), or at minimum fix the doc comment to reflect the actual API.
| let mut remaining = TOTAL_DATAGRAMS; | ||
|
|
||
| // Send all datagrams in batches. | ||
| while remaining > 0 { | ||
| let this_batch = remaining.min(bs); | ||
| let bufs = &send_payloads[..this_batch]; | ||
| match batch.send_batch(fd_a, bufs) { | ||
| Ok(n) if n > 0 => remaining -= n, | ||
| _ => {} | ||
| } | ||
| } | ||
|
|
||
| // Receive all datagrams in batches. | ||
| let mut received = 0; | ||
| while received < TOTAL_DATAGRAMS { | ||
| let mut bufs: Vec<&mut [u8]> = | ||
| recv_buffers.iter_mut().map(|b| b.as_mut_slice()).collect(); | ||
| if let Ok((_, n)) = batch.recv_batch(fd_b, &mut bufs) { | ||
| received += n; | ||
| } | ||
| } |
There was a problem hiding this comment.
Benchmark will deadlock when socket buffer fills
The benchmark tries to send TOTAL_DATAGRAMS × DATAGRAM_SIZE = 10 000 × 4 000 = 40 MB into an 8 MB socket buffer before the receive loop begins. Once the buffer fills, send_batch returns Ok(0) (WouldBlock) and the _ => {} arm silently retries — but the receive loop is never reached, so the benchmark spins forever.
The send/receive loops need to be interleaved (ping-pong) or the total payload size must fit within the socket buffer. For example, reduce TOTAL_DATAGRAMS to ≤ 2 000 (keeping total payload ≤ 8 MB), or split each iteration into smaller send-then-receive chunks.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d3e06bfa5e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| while remaining > 0 { | ||
| let this_batch = remaining.min(bs); | ||
| let bufs = &send_payloads[..this_batch]; | ||
| match batch.send_batch(fd_a, bufs) { | ||
| Ok(n) if n > 0 => remaining -= n, | ||
| _ => {} | ||
| } |
There was a problem hiding this comment.
Interleave send and receive in batch benchmark
This loop sends TOTAL_DATAGRAMS before doing any reads on a non-blocking socketpair, so once the kernel queue fills (10_000 * 4000 bytes far exceeds the configured 8 MB buffers), send_batch starts returning Ok(0)/EAGAIN and remaining stops decreasing; because the _ => {} branch ignores that state, the while remaining > 0 loop can run forever and cargo bench -p arcbox-xnu-net may hang instead of producing throughput results.
Useful? React with 👍 / 👎.
| let mut guard = self.inner.readable().await?; | ||
| match guard.try_io(|inner| { | ||
| let fd = inner.get_ref().as_raw_fd(); | ||
| self.batch.recv_batch(fd, bufs).map(|(_, n)| n) |
There was a problem hiding this comment.
Preserve per-datagram lengths in async receive path
The async wrapper drops RxEntry metadata by mapping BatchDgram::recv_batch to only n, so callers cannot recover each datagram’s true byte length for variable-size packets and may treat trailing bytes in the provided buffers as valid payload; this makes AsyncBatchDgram::recv_batch incorrect for general datagram workloads unless all frames are fixed-size.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Pull request overview
Adds a new arcbox-xnu-net crate to provide platform-specific batch datagram I/O (macOS sendmsg_x/recvmsg_x, Linux sendmmsg/recvmmsg) with sync and optional tokio-based async wrappers, plus tests and a Criterion throughput benchmark.
Changes:
- Introduce
BatchDgram(sync) andAsyncBatchDgram(tokio) APIs for batched datagram send/recv. - Add Darwin FFI bindings for
msghdr_x,recvmsg_x, andsendmsg_x. - Add unit tests and a Criterion benchmark to compare batch vs single-syscall throughput.
Reviewed changes
Copilot reviewed 7 out of 8 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| common/arcbox-xnu-net/src/lib.rs | New crate entrypoint exporting sync + optional async batch APIs. |
| common/arcbox-xnu-net/src/ffi.rs | Darwin-only FFI declarations for XNU batch syscalls and msghdr_x. |
| common/arcbox-xnu-net/src/batch.rs | Implements BatchDgram sync batch send/recv + unit tests. |
| common/arcbox-xnu-net/src/async_batch.rs | Implements AsyncBatchDgram using tokio::io::unix::AsyncFd. |
| common/arcbox-xnu-net/benches/throughput.rs | Criterion benchmark comparing single vs batched I/O on socketpairs. |
| common/arcbox-xnu-net/Cargo.toml | New crate manifest with optional tokio feature and benches. |
| Cargo.toml | Adds new crate to workspace members and adds Criterion to workspace deps. |
| Cargo.lock | Locks new dependencies pulled in by Criterion and the new crate. |
| pub async fn send_batch(&mut self, bufs: &[&[u8]]) -> io::Result<usize> { | ||
| loop { | ||
| let mut guard = self.inner.writable().await?; | ||
| match guard.try_io(|inner| { | ||
| let fd = inner.get_ref().as_raw_fd(); | ||
| self.batch.send_batch(fd, bufs) | ||
| }) { | ||
| Ok(result) => return result, | ||
| Err(_would_block) => {} | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Same issue as recv_batch: BatchDgram::send_batch returns Ok(0) on WouldBlock, so try_io will not clear writable readiness and the method may return 0 repeatedly without awaiting. Either clear readiness and continue when the send result is 0, or plumb through WouldBlock as an error for try_io to handle.
| /// Newtype so `AsyncFd` can track a raw `OwnedFd`. | ||
| pub struct FdWrapper(OwnedFd); | ||
|
|
||
| impl AsRawFd for FdWrapper { | ||
| fn as_raw_fd(&self) -> RawFd { | ||
| self.0.as_raw_fd() | ||
| } | ||
| } |
There was a problem hiding this comment.
FdWrapper looks unnecessary and leaks an implementation-detail type into the public API via inner() -> &AsyncFd<FdWrapper>. The codebase already uses AsyncFd<OwnedFd> directly (e.g., rpc/arcbox-transport/src/vsock/stream.rs:20-46), so consider switching inner to AsyncFd<OwnedFd> and adjusting inner() accordingly to avoid exposing FdWrapper.
| impl AsyncBatchDgram { | ||
| /// Creates a new async batch context, registering `fd` with the tokio | ||
| /// reactor. | ||
| /// | ||
| /// # Errors | ||
| /// | ||
| /// Returns an error if the tokio reactor registration fails. | ||
| pub fn new(fd: OwnedFd) -> io::Result<Self> { | ||
| Ok(Self { | ||
| inner: AsyncFd::new(FdWrapper(fd))?, | ||
| batch: BatchDgram::new(), | ||
| }) | ||
| } |
There was a problem hiding this comment.
AsyncBatchDgram::new requires the FD to already be O_NONBLOCK, but it doesn't enforce or set it. Elsewhere in the codebase constructors set non-blocking internally (e.g., VsockStream::from_fd in rpc/arcbox-transport/src/vsock/stream.rs:26-46). Consider setting O_NONBLOCK inside new (and returning an error if fcntl fails) to prevent accidental blocking syscalls inside the async reactor.
| #[cfg(feature = "tokio")] | ||
| pub mod async_batch; | ||
|
|
||
| pub use batch::{BatchDgram, MAX_BATCH, RxEntry}; | ||
|
|
||
| #[cfg(feature = "tokio")] | ||
| pub use async_batch::AsyncBatchDgram; |
There was a problem hiding this comment.
Async wrapper unit tests in async_batch.rs are only compiled when the tokio feature is enabled (#[cfg(feature = "tokio")] pub mod async_batch;). CI runs cargo test --workspace without --all-features (see .github/workflows/ci.yml), so these async tests won’t run by default despite the PR description claiming coverage for async. Consider compiling the module/tests under cfg(test) as well, or adding a CI/test target that runs cargo test -p arcbox-xnu-net --features tokio.
| libc::setsockopt( | ||
| fd, | ||
| libc::SOL_SOCKET, | ||
| libc::SO_SNDBUF, | ||
| std::ptr::from_ref(&buf_size).cast(), | ||
| std::mem::size_of_val(&buf_size) as libc::socklen_t, | ||
| ); | ||
| libc::setsockopt( | ||
| fd, | ||
| libc::SOL_SOCKET, | ||
| libc::SO_RCVBUF, | ||
| std::ptr::from_ref(&buf_size).cast(), | ||
| std::mem::size_of_val(&buf_size) as libc::socklen_t, | ||
| ); |
There was a problem hiding this comment.
In socketpair(), the return values from setsockopt are ignored. If these calls fail (e.g., due to platform limits or permission quirks), later test failures can become non-obvious/flaky. Please capture and assert/check the return codes (and/or errno) so failures are surfaced at the point of configuration.
| libc::setsockopt( | |
| fd, | |
| libc::SOL_SOCKET, | |
| libc::SO_SNDBUF, | |
| std::ptr::from_ref(&buf_size).cast(), | |
| std::mem::size_of_val(&buf_size) as libc::socklen_t, | |
| ); | |
| libc::setsockopt( | |
| fd, | |
| libc::SOL_SOCKET, | |
| libc::SO_RCVBUF, | |
| std::ptr::from_ref(&buf_size).cast(), | |
| std::mem::size_of_val(&buf_size) as libc::socklen_t, | |
| ); | |
| let sndbuf_ret = libc::setsockopt( | |
| fd, | |
| libc::SOL_SOCKET, | |
| libc::SO_SNDBUF, | |
| std::ptr::from_ref(&buf_size).cast(), | |
| std::mem::size_of_val(&buf_size) as libc::socklen_t, | |
| ); | |
| assert_eq!( | |
| sndbuf_ret, | |
| 0, | |
| "setsockopt(SO_SNDBUF) failed for fd {}: {}", | |
| fd, | |
| io::Error::last_os_error() | |
| ); | |
| let rcvbuf_ret = libc::setsockopt( | |
| fd, | |
| libc::SOL_SOCKET, | |
| libc::SO_RCVBUF, | |
| std::ptr::from_ref(&buf_size).cast(), | |
| std::mem::size_of_val(&buf_size) as libc::socklen_t, | |
| ); | |
| assert_eq!( | |
| rcvbuf_ret, | |
| 0, | |
| "setsockopt(SO_RCVBUF) failed for fd {}: {}", | |
| fd, | |
| io::Error::last_os_error() | |
| ); |
| std::mem::size_of_val(&buf_size) as libc::socklen_t, | ||
| ); | ||
| let flags = libc::fcntl(fd, libc::F_GETFL); | ||
| libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK); |
There was a problem hiding this comment.
fcntl(F_GETFL) / fcntl(F_SETFL, ...) return values aren’t checked here. If either call fails, tests may silently proceed with a blocking FD and hang intermittently. Consider checking for < 0 and failing fast with io::Error::last_os_error()/assert messages.
| libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK); | |
| assert!( | |
| flags >= 0, | |
| "fcntl(F_GETFL) failed for fd {fd}: {}", | |
| io::Error::last_os_error() | |
| ); | |
| let ret = libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK); | |
| assert!( | |
| ret >= 0, | |
| "fcntl(F_SETFL, O_NONBLOCK) failed for fd {fd}: {}", | |
| io::Error::last_os_error() | |
| ); |
| bencher.iter(|| { | ||
| let send_payloads: Vec<&[u8]> = (0..bs).map(|_| payload.as_slice()).collect(); | ||
| let mut remaining = TOTAL_DATAGRAMS; | ||
|
|
||
| // Send all datagrams in batches. | ||
| while remaining > 0 { | ||
| let this_batch = remaining.min(bs); | ||
| let bufs = &send_payloads[..this_batch]; | ||
| match batch.send_batch(fd_a, bufs) { | ||
| Ok(n) if n > 0 => remaining -= n, | ||
| _ => {} | ||
| } | ||
| } | ||
|
|
||
| // Receive all datagrams in batches. | ||
| let mut received = 0; | ||
| while received < TOTAL_DATAGRAMS { | ||
| let mut bufs: Vec<&mut [u8]> = | ||
| recv_buffers.iter_mut().map(|b| b.as_mut_slice()).collect(); | ||
| if let Ok((_, n)) = batch.recv_batch(fd_b, &mut bufs) { |
There was a problem hiding this comment.
Inside the measured iter closure, this benchmark allocates send_payloads (and repeatedly rebuilds Vec<&mut [u8]> from recv_buffers) on each iteration. That overhead can dominate and blur syscall throughput differences. Consider moving these allocations outside the iter loop (or using Criterion’s batched APIs) so the benchmark primarily measures I/O behavior.
| pub async fn recv_batch(&mut self, bufs: &mut [&mut [u8]]) -> io::Result<usize> { | ||
| loop { | ||
| let mut guard = self.inner.readable().await?; | ||
| match guard.try_io(|inner| { | ||
| let fd = inner.get_ref().as_raw_fd(); | ||
| self.batch.recv_batch(fd, bufs).map(|(_, n)| n) | ||
| }) { | ||
| Ok(result) => return result, | ||
| Err(_would_block) => {} | ||
| } | ||
| } |
There was a problem hiding this comment.
BatchDgram::recv_batch maps WouldBlock to Ok(0), so AsyncFd::try_io will never return Err(_would_block) here. That means readiness is never cleared and callers can end up in a tight loop repeatedly getting Ok(0) after a spurious readiness event. Consider treating n == 0 as WouldBlock in the async wrapper (e.g., clear readiness and continue), or expose a variant of recv_batch that returns Err(WouldBlock) so try_io can manage readiness correctly.
| fn single_write(fd: RawFd, data: &[u8]) { | ||
| // SAFETY: valid fd and buffer. | ||
| unsafe { libc::write(fd, data.as_ptr().cast(), data.len()) }; | ||
| } | ||
|
|
||
| fn single_read(fd: RawFd, buf: &mut [u8]) -> usize { | ||
| // SAFETY: valid fd and buffer. | ||
| let n = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), buf.len()) }; | ||
| if n < 0 { 0 } else { n as usize } | ||
| } |
There was a problem hiding this comment.
The single-syscall benchmark path ignores write/read errors and does not handle EAGAIN for the non-blocking socket. This can drop datagrams and make the benchmark measure inconsistent work (e.g., single_read returning 0 on error). Consider looping/retrying on EINTR/EAGAIN and validating that exactly TOTAL_DATAGRAMS were transferred so the comparison against batching is meaningful.
Summary
arcbox-xnu-netcrate (common/arcbox-xnu-net) providing batch datagram I/O primitives on top of Darwin'ssendmsg_x/recvmsg_xsyscalls. ExposesBatchDgram(sync) andAsyncBatchDgram(tokio-aware) wrappers with FFI bindings undersrc/ffi.rs.benches/throughput.rs) comparing batch vs single-syscall throughput so the speedup is measurable and tracked.This is foundation work for the network throughput target (>50 Gbps). The crate is self-contained — no existing call sites changed. Follow-up PRs will wire
BatchDgraminto the vmnet/utun relays where VM-exits currently dominate RX/TX cost.Test plan
cargo test -p arcbox-xnu-netpassescargo clippy -p arcbox-xnu-net --all-targets -- -D warningscleancargo fmt -p arcbox-xnu-net --checkcleancargo bench -p arcbox-xnu-netlocally on Apple Silicon to confirm batch gives >2x throughput vs single on small-packet workloads