From c0b2e65e5e0b3b63f1f76f8d2e239b3d5024d26b Mon Sep 17 00:00:00 2001 From: Ben Hagen Date: Fri, 12 Jun 2026 12:25:33 +0200 Subject: [PATCH 1/2] fix(netwatch): reconcile interface state periodically The netmon actor only re-enumerated interfaces in response to events from the OS route monitor (or a detected wall-time jump). Those events can be missed or stop entirely with no error: - Routing sockets are best-effort. On receive-buffer overflow the kernel drops the overflowing RTM_* messages, and on macOS (XNU raw_input/sbappendaddr) the drop is silent: so_error is not set and there is no SO_RERROR option, so the reader cannot tell a message was lost. A change can be missed on an otherwise-working socket. - In production we additionally observed the macOS AF_ROUTE read stop delivering events entirely (no further reads, no error) for the rest of the process lifetime. macOS PF_ROUTE is independently documented to be unreliable (it can fail to emit some interface-change events at all), and the socket is read through a tokio UnixStream over a SOCK_RAW fd, so a lost edge-triggered readiness wakeup is also possible; the exact cause was not determined and the fix does not depend on it. With no fallback the actor never recomputes State, so the interface state freezes at whatever it was when the last event was processed. In practice a brief interface change (a DHCP renew or link blip) made an endpoint permanently lose an interface and its addresses even though the interface was up and its address returned immediately; a fresh enumeration reflected the correct state the whole time. This matches iroh#3449. Re-enumerate on the existing periodic wake-up in addition to wall-time jumps, so a missed or absent event self-heals within one interval (15s on desktop). handle_potential_change already diffs against the current state, so this only emits a change when something actually changed. This is the same event-plus-periodic-reconcile pattern Tailscale uses (15s pollWallTimeInterval backstop) and complements the existing error-path socket rebind (net-tools#105) by covering the no-error case. The interface enumeration is made injectable and the poll interval configurable so the recovery path can be tested deterministically without a route monitor. Adds a regression test that drives the actor with no route-monitor events and asserts the state still converges to a changed enumeration. --- netwatch/src/netmon/actor.rs | 172 ++++++++++++++++++++++++++++++++--- 1 file changed, 158 insertions(+), 14 deletions(-) diff --git a/netwatch/src/netmon/actor.rs b/netwatch/src/netmon/actor.rs index 3ce5bf5..e2a1fec 100644 --- a/netwatch/src/netmon/actor.rs +++ b/netwatch/src/netmon/actor.rs @@ -1,4 +1,10 @@ -use n0_future::time::{self, Duration, Instant}; +use std::sync::Arc; + +use n0_future::{ + FutureExt as _, + boxed::BoxFuture, + time::{self, Duration, Instant}, +}; use n0_watcher::Watchable; pub(super) use os::Error; use os::RouteMonitor; @@ -33,27 +39,48 @@ pub(super) enum NetworkMessage { Change, } -/// How often we execute a check for big jumps in wall time. +/// How often the actor wakes up to check for wall-time jumps and to +/// re-enumerate interfaces (see [`Actor::run`]). #[cfg(not(any(target_os = "ios", target_os = "android")))] -const POLL_WALL_TIME_INTERVAL: Duration = Duration::from_secs(15); +const POLL_INTERVAL: Duration = Duration::from_secs(15); /// Set background polling time to 1h to effectively disable it on mobile, /// to avoid increased battery usage. Sleep detection won't work this way there. #[cfg(any(target_os = "ios", target_os = "android"))] -const POLL_WALL_TIME_INTERVAL: Duration = Duration::from_secs(60 * 60); +const POLL_INTERVAL: Duration = Duration::from_secs(60 * 60); const MON_CHAN_CAPACITY: usize = 16; const ACTOR_CHAN_CAPACITY: usize = 16; +/// Produces the current [`State`] of the host's network interfaces. +/// +/// Boxed so the enumeration can be substituted in tests; in production it is +/// always [`State::new`]. +type StateFn = Arc BoxFuture + Send + Sync + 'static>; + +fn default_state_fn() -> StateFn { + Arc::new(|| State::new().boxed()) +} + pub(super) struct Actor { /// Latest known interface state. interface_state: Watchable, /// Latest observed wall time. wall_time: Instant, /// OS specific monitor. + /// + /// `None` only in tests, where the OS route monitor is intentionally + /// absent so that recovery via the periodic reconcile can be exercised in + /// isolation. Held purely to keep the monitor task alive. #[allow(dead_code)] - route_monitor: RouteMonitor, + route_monitor: Option, mon_receiver: mpsc::Receiver, actor_receiver: mpsc::Receiver, actor_sender: mpsc::Sender, + /// How the actor enumerates interfaces. Always [`State::new`] in + /// production; overridable in tests. + get_state: StateFn, + /// Interval at which the actor re-enumerates interfaces and checks wall + /// time. Defaults to [`POLL_INTERVAL`]. + poll_interval: Duration, } pub(super) enum ActorMessage { @@ -62,7 +89,8 @@ pub(super) enum ActorMessage { impl Actor { pub(super) async fn new() -> Result { - let interface_state = State::new().await; + let get_state = default_state_fn(); + let interface_state = (get_state)().await; let wall_time = Instant::now(); let (mon_sender, mon_receiver) = mpsc::channel(MON_CHAN_CAPACITY); @@ -72,10 +100,12 @@ impl Actor { Ok(Actor { interface_state: Watchable::new(interface_state), wall_time, - route_monitor, + route_monitor: Some(route_monitor), mon_receiver, actor_receiver, actor_sender, + get_state, + poll_interval: POLL_INTERVAL, }) } @@ -94,7 +124,10 @@ impl Actor { let mut pending_time_jump = false; let debounce = time::sleep(DEBOUNCE); tokio::pin!(debounce); - let mut wall_time_interval = time::interval(POLL_WALL_TIME_INTERVAL); + let mut poll_interval = time::interval(self.poll_interval); + // The first tick fires immediately; skip it so startup does not do a + // redundant reconcile right after the initial enumeration. + poll_interval.tick().await; loop { tokio::select! { @@ -103,12 +136,27 @@ impl Actor { pending_change = false; pending_time_jump = false; } - _ = wall_time_interval.tick() => { - trace!("tick: wall_time_interval"); + _ = poll_interval.tick() => { + trace!("tick: poll_interval"); if self.check_wall_time_advance() { pending_time_jump = true; - debounce.as_mut().reset(Instant::now() + DEBOUNCE); } + // Re-enumerate interfaces on every tick, not just on + // wall-time jumps. The OS route monitors are best-effort + // and an event can be missed without any error: a routing + // socket drops messages when its receive buffer overflows, + // and on macOS (XNU `raw_input`) that drop is silent (no + // `so_error`, no `SO_RERROR`), so a change can be missed on + // an otherwise-working socket. We have also observed the + // macOS AF_ROUTE read simply stop delivering events (no + // further reads, no error), after which the actor has + // nothing to react to. A missed or absent event must not + // freeze the interface state forever, so reconcile + // periodically. `handle_potential_change` diffs against the + // current state, so this is a no-op (beyond the enumeration + // itself) whenever nothing actually changed. + pending_change = true; + debounce.as_mut().reset(Instant::now() + DEBOUNCE); } event = self.mon_receiver.recv() => { match event { @@ -143,7 +191,7 @@ impl Actor { async fn handle_potential_change(&mut self, time_jumped: bool) { trace!("potential change"); - let mut new_state = State::new().await; + let mut new_state = (self.get_state)().await; let old_state = &self.interface_state.get(); if time_jumped { @@ -158,11 +206,11 @@ impl Actor { } /// Reports whether wall time jumped more than 150% - /// of `POLL_WALL_TIME_INTERVAL`, indicating we probably just came out of sleep. + /// of [`Self::poll_interval`], indicating we probably just came out of sleep. fn check_wall_time_advance(&mut self) -> bool { let now = Instant::now(); let jumped = if let Some(elapsed) = now.checked_duration_since(self.wall_time) { - elapsed > POLL_WALL_TIME_INTERVAL * 3 / 2 + elapsed > self.poll_interval * 3 / 2 } else { false }; @@ -171,3 +219,99 @@ impl Actor { jumped } } + +#[cfg(test)] +mod tests { + use std::sync::Mutex; + + use n0_watcher::Watcher as _; + + use super::*; + + /// Builds two distinct interface states so a transition is observable. + fn state_pair() -> (State, State) { + let with_iface = State::fake(); + let mut without_iface = with_iface.clone(); + without_iface.interfaces.clear(); + assert_ne!(with_iface, without_iface); + (with_iface, without_iface) + } + + /// The actor must recover the correct interface state purely from its + /// periodic reconcile, with no route-monitor events at all. + /// + /// This reproduces the production failure where the OS route monitor stops + /// delivering events (on macOS the raw AF_ROUTE socket can lose its read + /// readiness after a burst of messages): the interface state would + /// otherwise stay frozen forever. Before the periodic reconcile was added + /// this test hangs until the timeout, because nothing ever re-enumerates. + #[tokio::test] + async fn recovers_state_without_route_events() { + // The enumeration reports the "before" state once (the initial state), + // then the "after" state on every subsequent call, modelling an + // interface change that the route monitor never signalled. + let (before, after) = state_pair(); + let calls = Arc::new(Mutex::new(0usize)); + let states = Arc::new((before.clone(), after.clone())); + let get_state: StateFn = { + let calls = calls.clone(); + let states = states.clone(); + Arc::new(move || { + let n = { + let mut g = calls.lock().unwrap(); + let n = *g; + *g += 1; + n + }; + let states = states.clone(); + async move { + if n == 0 { + states.0.clone() + } else { + states.1.clone() + } + } + .boxed() + }) + }; + + // No route monitor: the only path that can update state is the + // periodic reconcile. Keep `mon_sender` alive so the receiver does not + // report all senders gone (which would shut the actor down). + let initial = (get_state)().await; + assert_eq!(initial, before); + let (mon_sender, mon_receiver) = mpsc::channel(MON_CHAN_CAPACITY); + let (actor_sender, actor_receiver) = mpsc::channel(ACTOR_CHAN_CAPACITY); + let interface_state = Watchable::new(initial); + let mut watch = interface_state.watch(); + + let actor = Actor { + interface_state, + wall_time: Instant::now(), + route_monitor: None, + mon_receiver, + actor_receiver, + actor_sender, + get_state, + // Short interval so the test does not wait the production 15s, + // but above the 250ms debounce so each tick settles into a reconcile. + poll_interval: Duration::from_millis(400), + }; + let handle = tokio::spawn(actor.run()); + + let updated = tokio::time::timeout(Duration::from_secs(5), async { + loop { + let state = watch.updated().await.expect("watcher closed"); + if state == after { + return state; + } + } + }) + .await + .expect("interface state was not reconciled without route events"); + assert_eq!(updated, after); + + drop(mon_sender); + handle.abort(); + } +} From 2a8a85f27fc91445c2b4e89e6f898901b5ca3192 Mon Sep 17 00:00:00 2001 From: Ben Hagen Date: Tue, 16 Jun 2026 07:46:45 +0200 Subject: [PATCH 2/2] refactor(netwatch): keep test seams out of the netmon Actor struct Move the route monitor and interface enumeration into Actor::run parameters instead of struct fields, and drive the recovery test under paused tokio time so the poll interval need not be configurable. The Actor struct no longer carries any field that exists only for testing. --- netwatch/src/netmon.rs | 6 +- netwatch/src/netmon/actor.rs | 111 ++++++++++++----------------------- 2 files changed, 41 insertions(+), 76 deletions(-) diff --git a/netwatch/src/netmon.rs b/netwatch/src/netmon.rs index 22cafaa..0ebac4c 100644 --- a/netwatch/src/netmon.rs +++ b/netwatch/src/netmon.rs @@ -25,7 +25,7 @@ mod windows; #[cfg(not(wasm_browser))] pub(crate) use self::actor::is_interesting_interface; -use self::actor::{Actor, ActorMessage}; +use self::actor::{Actor, ActorMessage, default_state_fn}; pub use crate::interfaces::State; /// Monitors networking interface and route changes. @@ -61,12 +61,12 @@ impl From for Error { impl Monitor { /// Create a new monitor. pub async fn new() -> Result { - let actor = Actor::new().await?; + let (actor, route_monitor) = Actor::new().await?; let actor_tx = actor.subscribe(); let interface_state = actor.state().clone(); let handle = task::spawn(async move { - actor.run().await; + actor.run(Some(route_monitor), default_state_fn()).await; }); Ok(Monitor { diff --git a/netwatch/src/netmon/actor.rs b/netwatch/src/netmon/actor.rs index e2a1fec..5c15a2c 100644 --- a/netwatch/src/netmon/actor.rs +++ b/netwatch/src/netmon/actor.rs @@ -50,13 +50,10 @@ const POLL_INTERVAL: Duration = Duration::from_secs(60 * 60); const MON_CHAN_CAPACITY: usize = 16; const ACTOR_CHAN_CAPACITY: usize = 16; -/// Produces the current [`State`] of the host's network interfaces. -/// -/// Boxed so the enumeration can be substituted in tests; in production it is -/// always [`State::new`]. +/// Enumerates the host's network interfaces. Boxed so tests can substitute it. type StateFn = Arc BoxFuture + Send + Sync + 'static>; -fn default_state_fn() -> StateFn { +pub(super) fn default_state_fn() -> StateFn { Arc::new(|| State::new().boxed()) } @@ -65,22 +62,9 @@ pub(super) struct Actor { interface_state: Watchable, /// Latest observed wall time. wall_time: Instant, - /// OS specific monitor. - /// - /// `None` only in tests, where the OS route monitor is intentionally - /// absent so that recovery via the periodic reconcile can be exercised in - /// isolation. Held purely to keep the monitor task alive. - #[allow(dead_code)] - route_monitor: Option, mon_receiver: mpsc::Receiver, actor_receiver: mpsc::Receiver, actor_sender: mpsc::Sender, - /// How the actor enumerates interfaces. Always [`State::new`] in - /// production; overridable in tests. - get_state: StateFn, - /// Interval at which the actor re-enumerates interfaces and checks wall - /// time. Defaults to [`POLL_INTERVAL`]. - poll_interval: Duration, } pub(super) enum ActorMessage { @@ -88,25 +72,22 @@ pub(super) enum ActorMessage { } impl Actor { - pub(super) async fn new() -> Result { - let get_state = default_state_fn(); - let interface_state = (get_state)().await; + pub(super) async fn new() -> Result<(Self, RouteMonitor), os::Error> { + let interface_state = State::new().await; let wall_time = Instant::now(); let (mon_sender, mon_receiver) = mpsc::channel(MON_CHAN_CAPACITY); let route_monitor = RouteMonitor::new(mon_sender)?; let (actor_sender, actor_receiver) = mpsc::channel(ACTOR_CHAN_CAPACITY); - Ok(Actor { + let actor = Actor { interface_state: Watchable::new(interface_state), wall_time, - route_monitor: Some(route_monitor), mon_receiver, actor_receiver, actor_sender, - get_state, - poll_interval: POLL_INTERVAL, - }) + }; + Ok((actor, route_monitor)) } pub(super) fn state(&self) -> &Watchable { @@ -117,22 +98,24 @@ impl Actor { self.actor_sender.clone() } - pub(super) async fn run(mut self) { + pub(super) async fn run(mut self, route_monitor: Option, get_state: StateFn) { + // Held only to keep the OS monitor task alive; `None` in tests. + let _route_monitor = route_monitor; + const DEBOUNCE: Duration = Duration::from_millis(250); let mut pending_change = false; let mut pending_time_jump = false; let debounce = time::sleep(DEBOUNCE); tokio::pin!(debounce); - let mut poll_interval = time::interval(self.poll_interval); - // The first tick fires immediately; skip it so startup does not do a - // redundant reconcile right after the initial enumeration. + let mut poll_interval = time::interval(POLL_INTERVAL); + // Skip the immediate first tick; we just enumerated in `new`. poll_interval.tick().await; loop { tokio::select! { _ = &mut debounce, if pending_change || pending_time_jump => { - self.handle_potential_change(pending_time_jump).await; + self.handle_potential_change(&get_state, pending_time_jump).await; pending_change = false; pending_time_jump = false; } @@ -141,20 +124,12 @@ impl Actor { if self.check_wall_time_advance() { pending_time_jump = true; } - // Re-enumerate interfaces on every tick, not just on - // wall-time jumps. The OS route monitors are best-effort - // and an event can be missed without any error: a routing - // socket drops messages when its receive buffer overflows, - // and on macOS (XNU `raw_input`) that drop is silent (no - // `so_error`, no `SO_RERROR`), so a change can be missed on - // an otherwise-working socket. We have also observed the - // macOS AF_ROUTE read simply stop delivering events (no - // further reads, no error), after which the actor has - // nothing to react to. A missed or absent event must not - // freeze the interface state forever, so reconcile - // periodically. `handle_potential_change` diffs against the - // current state, so this is a no-op (beyond the enumeration - // itself) whenever nothing actually changed. + // Reconcile on every tick, not just on wall-time jumps. OS + // route monitors are best-effort and can drop events + // silently (or stop delivering entirely), which would + // otherwise freeze the interface state until restart. + // `handle_potential_change` diffs, so this is a no-op when + // nothing changed. pending_change = true; debounce.as_mut().reset(Instant::now() + DEBOUNCE); } @@ -188,10 +163,10 @@ impl Actor { } } - async fn handle_potential_change(&mut self, time_jumped: bool) { + async fn handle_potential_change(&mut self, get_state: &StateFn, time_jumped: bool) { trace!("potential change"); - let mut new_state = (self.get_state)().await; + let mut new_state = (get_state)().await; let old_state = &self.interface_state.get(); if time_jumped { @@ -206,11 +181,11 @@ impl Actor { } /// Reports whether wall time jumped more than 150% - /// of [`Self::poll_interval`], indicating we probably just came out of sleep. + /// of [`POLL_INTERVAL`], indicating we probably just came out of sleep. fn check_wall_time_advance(&mut self) -> bool { let now = Instant::now(); let jumped = if let Some(elapsed) = now.checked_duration_since(self.wall_time) { - elapsed > self.poll_interval * 3 / 2 + elapsed > POLL_INTERVAL * 3 / 2 } else { false }; @@ -237,19 +212,13 @@ mod tests { (with_iface, without_iface) } - /// The actor must recover the correct interface state purely from its - /// periodic reconcile, with no route-monitor events at all. - /// - /// This reproduces the production failure where the OS route monitor stops - /// delivering events (on macOS the raw AF_ROUTE socket can lose its read - /// readiness after a burst of messages): the interface state would - /// otherwise stay frozen forever. Before the periodic reconcile was added - /// this test hangs until the timeout, because nothing ever re-enumerates. - #[tokio::test] + /// State must converge via the periodic reconcile alone, with no + /// route-monitor events. Paused time auto-advances through the poll + /// interval, so this does not wait in real time. + #[tokio::test(start_paused = true)] async fn recovers_state_without_route_events() { - // The enumeration reports the "before" state once (the initial state), - // then the "after" state on every subsequent call, modelling an - // interface change that the route monitor never signalled. + // Enumeration returns "before" once, then "after" on every later call: + // an interface change the route monitor never signalled. let (before, after) = state_pair(); let calls = Arc::new(Mutex::new(0usize)); let states = Arc::new((before.clone(), after.clone())); @@ -275,12 +244,11 @@ mod tests { }) }; - // No route monitor: the only path that can update state is the - // periodic reconcile. Keep `mon_sender` alive so the receiver does not - // report all senders gone (which would shut the actor down). + // Keep `_mon_sender` alive so the actor does not see all senders gone + // and shut down. let initial = (get_state)().await; assert_eq!(initial, before); - let (mon_sender, mon_receiver) = mpsc::channel(MON_CHAN_CAPACITY); + let (_mon_sender, mon_receiver) = mpsc::channel(MON_CHAN_CAPACITY); let (actor_sender, actor_receiver) = mpsc::channel(ACTOR_CHAN_CAPACITY); let interface_state = Watchable::new(initial); let mut watch = interface_state.watch(); @@ -288,18 +256,15 @@ mod tests { let actor = Actor { interface_state, wall_time: Instant::now(), - route_monitor: None, mon_receiver, actor_receiver, actor_sender, - get_state, - // Short interval so the test does not wait the production 15s, - // but above the 250ms debounce so each tick settles into a reconcile. - poll_interval: Duration::from_millis(400), }; - let handle = tokio::spawn(actor.run()); + let handle = tokio::spawn(actor.run(None, get_state)); - let updated = tokio::time::timeout(Duration::from_secs(5), async { + // Must exceed POLL_INTERVAL, or auto-advanced time trips it before the + // first reconcile. Without the fix, state never converges and this fires. + let updated = tokio::time::timeout(Duration::from_secs(60), async { loop { let state = watch.updated().await.expect("watcher closed"); if state == after { @@ -311,7 +276,7 @@ mod tests { .expect("interface state was not reconciled without route events"); assert_eq!(updated, after); - drop(mon_sender); + drop(_mon_sender); handle.abort(); } }