diff --git a/netwatch/src/netmon.rs b/netwatch/src/netmon.rs index 22cafaa..0ebac4c 100644 --- a/netwatch/src/netmon.rs +++ b/netwatch/src/netmon.rs @@ -25,7 +25,7 @@ mod windows; #[cfg(not(wasm_browser))] pub(crate) use self::actor::is_interesting_interface; -use self::actor::{Actor, ActorMessage}; +use self::actor::{Actor, ActorMessage, default_state_fn}; pub use crate::interfaces::State; /// Monitors networking interface and route changes. @@ -61,12 +61,12 @@ impl From for Error { impl Monitor { /// Create a new monitor. pub async fn new() -> Result { - let actor = Actor::new().await?; + let (actor, route_monitor) = Actor::new().await?; let actor_tx = actor.subscribe(); let interface_state = actor.state().clone(); let handle = task::spawn(async move { - actor.run().await; + actor.run(Some(route_monitor), default_state_fn()).await; }); Ok(Monitor { diff --git a/netwatch/src/netmon/actor.rs b/netwatch/src/netmon/actor.rs index 3ce5bf5..5c15a2c 100644 --- a/netwatch/src/netmon/actor.rs +++ b/netwatch/src/netmon/actor.rs @@ -1,4 +1,10 @@ -use n0_future::time::{self, Duration, Instant}; +use std::sync::Arc; + +use n0_future::{ + FutureExt as _, + boxed::BoxFuture, + time::{self, Duration, Instant}, +}; use n0_watcher::Watchable; pub(super) use os::Error; use os::RouteMonitor; @@ -33,24 +39,29 @@ pub(super) enum NetworkMessage { Change, } -/// How often we execute a check for big jumps in wall time. +/// How often the actor wakes up to check for wall-time jumps and to +/// re-enumerate interfaces (see [`Actor::run`]). #[cfg(not(any(target_os = "ios", target_os = "android")))] -const POLL_WALL_TIME_INTERVAL: Duration = Duration::from_secs(15); +const POLL_INTERVAL: Duration = Duration::from_secs(15); /// Set background polling time to 1h to effectively disable it on mobile, /// to avoid increased battery usage. Sleep detection won't work this way there. #[cfg(any(target_os = "ios", target_os = "android"))] -const POLL_WALL_TIME_INTERVAL: Duration = Duration::from_secs(60 * 60); +const POLL_INTERVAL: Duration = Duration::from_secs(60 * 60); const MON_CHAN_CAPACITY: usize = 16; const ACTOR_CHAN_CAPACITY: usize = 16; +/// Enumerates the host's network interfaces. Boxed so tests can substitute it. +type StateFn = Arc BoxFuture + Send + Sync + 'static>; + +pub(super) fn default_state_fn() -> StateFn { + Arc::new(|| State::new().boxed()) +} + pub(super) struct Actor { /// Latest known interface state. interface_state: Watchable, /// Latest observed wall time. wall_time: Instant, - /// OS specific monitor. - #[allow(dead_code)] - route_monitor: RouteMonitor, mon_receiver: mpsc::Receiver, actor_receiver: mpsc::Receiver, actor_sender: mpsc::Sender, @@ -61,7 +72,7 @@ pub(super) enum ActorMessage { } impl Actor { - pub(super) async fn new() -> Result { + pub(super) async fn new() -> Result<(Self, RouteMonitor), os::Error> { let interface_state = State::new().await; let wall_time = Instant::now(); @@ -69,14 +80,14 @@ impl Actor { let route_monitor = RouteMonitor::new(mon_sender)?; let (actor_sender, actor_receiver) = mpsc::channel(ACTOR_CHAN_CAPACITY); - Ok(Actor { + let actor = Actor { interface_state: Watchable::new(interface_state), wall_time, - route_monitor, mon_receiver, actor_receiver, actor_sender, - }) + }; + Ok((actor, route_monitor)) } pub(super) fn state(&self) -> &Watchable { @@ -87,28 +98,40 @@ impl Actor { self.actor_sender.clone() } - pub(super) async fn run(mut self) { + pub(super) async fn run(mut self, route_monitor: Option, get_state: StateFn) { + // Held only to keep the OS monitor task alive; `None` in tests. + let _route_monitor = route_monitor; + const DEBOUNCE: Duration = Duration::from_millis(250); let mut pending_change = false; let mut pending_time_jump = false; let debounce = time::sleep(DEBOUNCE); tokio::pin!(debounce); - let mut wall_time_interval = time::interval(POLL_WALL_TIME_INTERVAL); + let mut poll_interval = time::interval(POLL_INTERVAL); + // Skip the immediate first tick; we just enumerated in `new`. + poll_interval.tick().await; loop { tokio::select! { _ = &mut debounce, if pending_change || pending_time_jump => { - self.handle_potential_change(pending_time_jump).await; + self.handle_potential_change(&get_state, pending_time_jump).await; pending_change = false; pending_time_jump = false; } - _ = wall_time_interval.tick() => { - trace!("tick: wall_time_interval"); + _ = poll_interval.tick() => { + trace!("tick: poll_interval"); if self.check_wall_time_advance() { pending_time_jump = true; - debounce.as_mut().reset(Instant::now() + DEBOUNCE); } + // Reconcile on every tick, not just on wall-time jumps. OS + // route monitors are best-effort and can drop events + // silently (or stop delivering entirely), which would + // otherwise freeze the interface state until restart. + // `handle_potential_change` diffs, so this is a no-op when + // nothing changed. + pending_change = true; + debounce.as_mut().reset(Instant::now() + DEBOUNCE); } event = self.mon_receiver.recv() => { match event { @@ -140,10 +163,10 @@ impl Actor { } } - async fn handle_potential_change(&mut self, time_jumped: bool) { + async fn handle_potential_change(&mut self, get_state: &StateFn, time_jumped: bool) { trace!("potential change"); - let mut new_state = State::new().await; + let mut new_state = (get_state)().await; let old_state = &self.interface_state.get(); if time_jumped { @@ -158,11 +181,11 @@ impl Actor { } /// Reports whether wall time jumped more than 150% - /// of `POLL_WALL_TIME_INTERVAL`, indicating we probably just came out of sleep. + /// of [`POLL_INTERVAL`], indicating we probably just came out of sleep. fn check_wall_time_advance(&mut self) -> bool { let now = Instant::now(); let jumped = if let Some(elapsed) = now.checked_duration_since(self.wall_time) { - elapsed > POLL_WALL_TIME_INTERVAL * 3 / 2 + elapsed > POLL_INTERVAL * 3 / 2 } else { false }; @@ -171,3 +194,89 @@ impl Actor { jumped } } + +#[cfg(test)] +mod tests { + use std::sync::Mutex; + + use n0_watcher::Watcher as _; + + use super::*; + + /// Builds two distinct interface states so a transition is observable. + fn state_pair() -> (State, State) { + let with_iface = State::fake(); + let mut without_iface = with_iface.clone(); + without_iface.interfaces.clear(); + assert_ne!(with_iface, without_iface); + (with_iface, without_iface) + } + + /// State must converge via the periodic reconcile alone, with no + /// route-monitor events. Paused time auto-advances through the poll + /// interval, so this does not wait in real time. + #[tokio::test(start_paused = true)] + async fn recovers_state_without_route_events() { + // Enumeration returns "before" once, then "after" on every later call: + // an interface change the route monitor never signalled. + let (before, after) = state_pair(); + let calls = Arc::new(Mutex::new(0usize)); + let states = Arc::new((before.clone(), after.clone())); + let get_state: StateFn = { + let calls = calls.clone(); + let states = states.clone(); + Arc::new(move || { + let n = { + let mut g = calls.lock().unwrap(); + let n = *g; + *g += 1; + n + }; + let states = states.clone(); + async move { + if n == 0 { + states.0.clone() + } else { + states.1.clone() + } + } + .boxed() + }) + }; + + // Keep `_mon_sender` alive so the actor does not see all senders gone + // and shut down. + let initial = (get_state)().await; + assert_eq!(initial, before); + let (_mon_sender, mon_receiver) = mpsc::channel(MON_CHAN_CAPACITY); + let (actor_sender, actor_receiver) = mpsc::channel(ACTOR_CHAN_CAPACITY); + let interface_state = Watchable::new(initial); + let mut watch = interface_state.watch(); + + let actor = Actor { + interface_state, + wall_time: Instant::now(), + mon_receiver, + actor_receiver, + actor_sender, + }; + let handle = tokio::spawn(actor.run(None, get_state)); + + // Must exceed POLL_INTERVAL, or auto-advanced time trips it before the + // first reconcile. Without the fix, state never converges and this fires. + let updated = tokio::time::timeout(Duration::from_secs(60), async { + loop { + let state = watch.updated().await.expect("watcher closed"); + if state == after { + return state; + } + } + }) + .await + .expect("interface state was not reconciled without route events"); + assert_eq!(updated, after); + + drop(_mon_sender); + handle.abort(); + } +}