From bb19855a4d3fdbb9269f7687031cc7b30e0fbb48 Mon Sep 17 00:00:00 2001 From: Ben Hagen Date: Tue, 16 Jun 2026 10:09:19 +0200 Subject: [PATCH] fix(netwatch): drain AF_ROUTE socket to avoid macOS route monitor stall The BSD/macOS route monitor read the AF_ROUTE socket with one AsyncRead::read per await and never drained. On an edge-triggered reactor that can drop the next readiness notification, so under sustained route churn the reader stalled with data still queued and stopped delivering NetworkMessage::Change for the rest of the process. Read via readable() + try_read() drained to WouldBlock instead. The UnixStream and its construction are unchanged. --- netwatch/src/netmon/bsd.rs | 212 ++++++++++++++++++++++++++++--------- 1 file changed, 163 insertions(+), 49 deletions(-) diff --git a/netwatch/src/netmon/bsd.rs b/netwatch/src/netmon/bsd.rs index 54b0ebea..b1a3653a 100644 --- a/netwatch/src/netmon/bsd.rs +++ b/netwatch/src/netmon/bsd.rs @@ -1,3 +1,5 @@ +use std::ops::ControlFlow; + #[cfg(any(target_os = "macos", target_os = "ios"))] use libc::{RTAX_DST, RTAX_IFP}; use n0_error::stack_error; @@ -5,13 +7,16 @@ use n0_future::{ task::AbortOnDropHandle, time::{self, Duration}, }; -use tokio::{io::AsyncReadExt, sync::mpsc}; +use tokio::sync::mpsc; use tracing::{trace, warn}; use super::actor::NetworkMessage; #[cfg(any(target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))] use crate::interfaces::bsd::{RTAX_DST, RTAX_IFP}; -use crate::{interfaces::bsd::WireMessage, ip::is_link_local}; +use crate::{ + interfaces::bsd::{WireMessage, parse_rib}, + ip::is_link_local, +}; #[derive(Debug)] pub(super) struct RouteMonitor { @@ -25,6 +30,12 @@ pub enum Error { Io { source: std::io::Error }, } +/// Maximum backoff between socket recreation attempts. +const MAX_BACKOFF: Duration = Duration::from_secs(30); + +/// Initial backoff, grown exponentially toward [`MAX_BACKOFF`] on repeated errors. +const INITIAL_BACKOFF: Duration = Duration::from_millis(50); + fn create_socket() -> std::io::Result { let socket = socket2::Socket::new(libc::AF_ROUTE.into(), socket2::Type::RAW, None)?; socket.set_nonblocking(true)?; @@ -38,60 +49,105 @@ fn create_socket() -> std::io::Result { impl RouteMonitor { pub(super) fn new(sender: mpsc::Sender) -> Result { - let mut socket = create_socket()?; - let handle = tokio::task::spawn(async move { - trace!("AF_ROUTE monitor started"); - - let mut buffer = vec![0u8; 2048]; - let mut backoff = Duration::from_secs(1); - const MAX_BACKOFF: Duration = Duration::from_secs(30); - - loop { - match socket.read(&mut buffer).await { - Ok(read) => { - // Grow buffer if the read filled it, up to 64KiB - if read == buffer.len() && buffer.len() < 65536 { - buffer.resize(buffer.len() * 2, 0); - } - backoff = Duration::from_secs(1); - trace!("AF_ROUTE: read {} bytes", read); - match super::super::interfaces::bsd::parse_rib( - libc::NET_RT_DUMP, - &buffer[..read], - ) { - Ok(msgs) => { - if contains_interesting_message(&msgs) - && sender.send(NetworkMessage::Change).await.is_err() - { - break; - } - } - Err(err) => { - warn!("AF_ROUTE: failed to parse rib: {:?}", err); - } + let socket = create_socket()?; + let handle = tokio::task::spawn(run(socket, sender)); + + Ok(RouteMonitor { + _handle: AbortOnDropHandle::new(handle), + }) + } +} + +/// Reads routing messages and forwards interesting changes. +/// +/// Recreates the socket with backoff on error. Returns when the receiver is +/// gone. +async fn run(mut socket: tokio::net::UnixStream, sender: mpsc::Sender) { + trace!("AF_ROUTE monitor started"); + + let mut buffer = vec![0u8; 2048]; + let mut backoff = INITIAL_BACKOFF; + + loop { + if let Err(err) = socket.readable().await { + warn!("AF_ROUTE: error awaiting readable: {:?}", err); + socket = recreate_socket(&mut backoff).await; + continue; + } + + match read_available(&socket, &mut buffer, &sender).await { + ControlFlow::Break(()) => break, + ControlFlow::Continue(Ok(_read)) => backoff = INITIAL_BACKOFF, + ControlFlow::Continue(Err(err)) => { + warn!("AF_ROUTE: error reading: {:?}", err); + socket = recreate_socket(&mut backoff).await; + } + } + } +} + +/// Drains all currently queued routing messages. +/// +/// Sends a [`NetworkMessage::Change`] for each batch with an interesting message +/// and returns the number of messages read, or [`ControlFlow::Break`] once the +/// receiver is gone. +/// +/// Drains with `try_read` until `WouldBlock`. Do not read via `AsyncRead::read` +/// one message per await: the fd is registered edge-triggered, so leaving data +/// queued can lose the next readiness notification and permanently stall the +/// monitor. See mio's `Poll` docs on draining edge-triggered readiness. +async fn read_available( + socket: &tokio::net::UnixStream, + buffer: &mut Vec, + sender: &mpsc::Sender, +) -> ControlFlow<(), std::io::Result> { + let mut read_count = 0; + loop { + match socket.try_read(buffer) { + Ok(0) => return ControlFlow::Continue(Ok(read_count)), + Ok(read) => { + read_count += 1; + // Grow buffer if the read filled it, up to 64KiB. + if read == buffer.len() && buffer.len() < 65536 { + buffer.resize(buffer.len() * 2, 0); + } + trace!("AF_ROUTE: read {} bytes", read); + match parse_rib(libc::NET_RT_DUMP, &buffer[..read]) { + Ok(msgs) => { + if contains_interesting_message(&msgs) + && sender.send(NetworkMessage::Change).await.is_err() + { + return ControlFlow::Break(()); } } Err(err) => { - warn!("AF_ROUTE: error reading: {:?}", err); - time::sleep(backoff).await; - match create_socket() { - Ok(new_socket) => { - socket = new_socket; - backoff = Duration::from_secs(1); - } - Err(err) => { - warn!("AF_ROUTE: unable to recreate socket: {:?}", err); - backoff = (backoff * 2).min(MAX_BACKOFF); - } - } + warn!("AF_ROUTE: failed to parse rib: {:?}", err); } } } - }); + // Fully drained; readiness is re-armed for the next message. + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + return ControlFlow::Continue(Ok(read_count)); + } + Err(err) => return ControlFlow::Continue(Err(err)), + } + } +} - Ok(RouteMonitor { - _handle: AbortOnDropHandle::new(handle), - }) +/// Recreates the socket, retrying with exponential backoff until it succeeds. +async fn recreate_socket(backoff: &mut Duration) -> tokio::net::UnixStream { + loop { + time::sleep(*backoff).await; + match create_socket() { + Ok(socket) => { + *backoff = INITIAL_BACKOFF; + return socket; + } + Err(err) => { + warn!("AF_ROUTE: unable to recreate socket: {:?}", err); + *backoff = (*backoff * 2).min(MAX_BACKOFF); + } + } } } @@ -135,3 +191,61 @@ pub(crate) fn is_interesting_interface(name: &str) -> bool { true } + +#[cfg(test)] +mod tests { + use super::*; + + /// A message [`parse_rib`] skips (wrong `rtm_version`), so it is read but + /// yields no `Change`. + fn skipped_message() -> Vec { + let mut m = vec![0u8; 8]; + m[..2].copy_from_slice(&8u16.to_ne_bytes()); // rtm_msglen + m[2] = 0xff; // rtm_version: deliberately not the real one + m + } + + /// [`read_available`] must drain every queued message in one readiness + /// episode, not just one per wakeup. Uses an `AF_UNIX` datagram socketpair + /// so it is deterministic and independent of the routing subsystem. + #[tokio::test] + async fn read_available_drains_all_queued_messages() { + let (writer, reader) = + socket2::Socket::pair(socket2::Domain::UNIX, socket2::Type::DGRAM, None) + .expect("socketpair"); + reader.set_nonblocking(true).expect("nonblocking"); + + let msg = skipped_message(); + let mut sent = 0; + for _ in 0..64 { + if writer.send(&msg).is_ok() { + sent += 1; + } else { + break; + } + } + assert!( + sent > 1, + "expected to queue several datagrams, queued {sent}" + ); + + let reader_std: std::os::unix::net::UnixStream = reader.into(); + let reader = tokio::net::UnixStream::from_std(reader_std).expect("unixstream"); + let (tx, mut rx) = mpsc::channel(8); + let mut buffer = vec![0u8; 2048]; + + reader.readable().await.expect("readable"); + match read_available(&reader, &mut buffer, &tx).await { + ControlFlow::Continue(Ok(read)) => assert_eq!( + read, sent, + "read_available must drain all {sent} queued datagrams in one episode, drained {read}" + ), + other => panic!("unexpected control flow: {other:?}"), + } + // Skipped messages are not interesting, so no Change is sent. + assert!( + rx.try_recv().is_err(), + "no Change expected for skipped messages" + ); + } +}