Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions netwatch/src/netmon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod windows;

#[cfg(not(wasm_browser))]
pub(crate) use self::actor::is_interesting_interface;
use self::actor::{Actor, ActorMessage};
use self::actor::{Actor, ActorMessage, default_state_fn};
pub use crate::interfaces::State;

/// Monitors networking interface and route changes.
Expand Down Expand Up @@ -61,12 +61,12 @@ impl From<oneshot::error::RecvError> for Error {
impl Monitor {
/// Create a new monitor.
pub async fn new() -> Result<Self, Error> {
let actor = Actor::new().await?;
let (actor, route_monitor) = Actor::new().await?;
let actor_tx = actor.subscribe();
let interface_state = actor.state().clone();

let handle = task::spawn(async move {
actor.run().await;
actor.run(Some(route_monitor), default_state_fn()).await;
});

Ok(Monitor {
Expand Down
151 changes: 130 additions & 21 deletions netwatch/src/netmon/actor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use n0_future::time::{self, Duration, Instant};
use std::sync::Arc;

use n0_future::{
FutureExt as _,
boxed::BoxFuture,
time::{self, Duration, Instant},
};
use n0_watcher::Watchable;
pub(super) use os::Error;
use os::RouteMonitor;
Expand Down Expand Up @@ -33,24 +39,29 @@ pub(super) enum NetworkMessage {
Change,
}

/// How often we execute a check for big jumps in wall time.
/// How often the actor wakes up to check for wall-time jumps and to
/// re-enumerate interfaces (see [`Actor::run`]).
#[cfg(not(any(target_os = "ios", target_os = "android")))]
const POLL_WALL_TIME_INTERVAL: Duration = Duration::from_secs(15);
const POLL_INTERVAL: Duration = Duration::from_secs(15);
/// Set background polling time to 1h to effectively disable it on mobile,
/// to avoid increased battery usage. Sleep detection won't work this way there.
#[cfg(any(target_os = "ios", target_os = "android"))]
const POLL_WALL_TIME_INTERVAL: Duration = Duration::from_secs(60 * 60);
const POLL_INTERVAL: Duration = Duration::from_secs(60 * 60);
const MON_CHAN_CAPACITY: usize = 16;
const ACTOR_CHAN_CAPACITY: usize = 16;

/// Enumerates the host's network interfaces. Boxed so tests can substitute it.
type StateFn = Arc<dyn Fn() -> BoxFuture<State> + Send + Sync + 'static>;

pub(super) fn default_state_fn() -> StateFn {
Arc::new(|| State::new().boxed())
}

pub(super) struct Actor {
/// Latest known interface state.
interface_state: Watchable<State>,
/// Latest observed wall time.
wall_time: Instant,
/// OS specific monitor.
#[allow(dead_code)]
route_monitor: RouteMonitor,
mon_receiver: mpsc::Receiver<NetworkMessage>,
actor_receiver: mpsc::Receiver<ActorMessage>,
actor_sender: mpsc::Sender<ActorMessage>,
Expand All @@ -61,22 +72,22 @@ pub(super) enum ActorMessage {
}

impl Actor {
pub(super) async fn new() -> Result<Self, os::Error> {
pub(super) async fn new() -> Result<(Self, RouteMonitor), os::Error> {
let interface_state = State::new().await;
let wall_time = Instant::now();

let (mon_sender, mon_receiver) = mpsc::channel(MON_CHAN_CAPACITY);
let route_monitor = RouteMonitor::new(mon_sender)?;
let (actor_sender, actor_receiver) = mpsc::channel(ACTOR_CHAN_CAPACITY);

Ok(Actor {
let actor = Actor {
interface_state: Watchable::new(interface_state),
wall_time,
route_monitor,
mon_receiver,
actor_receiver,
actor_sender,
})
};
Ok((actor, route_monitor))
}

pub(super) fn state(&self) -> &Watchable<State> {
Expand All @@ -87,28 +98,40 @@ impl Actor {
self.actor_sender.clone()
}

pub(super) async fn run(mut self) {
pub(super) async fn run(mut self, route_monitor: Option<RouteMonitor>, get_state: StateFn) {
// Held only to keep the OS monitor task alive; `None` in tests.
let _route_monitor = route_monitor;

const DEBOUNCE: Duration = Duration::from_millis(250);

let mut pending_change = false;
let mut pending_time_jump = false;
let debounce = time::sleep(DEBOUNCE);
tokio::pin!(debounce);
let mut wall_time_interval = time::interval(POLL_WALL_TIME_INTERVAL);
let mut poll_interval = time::interval(POLL_INTERVAL);
// Skip the immediate first tick; we just enumerated in `new`.
poll_interval.tick().await;

loop {
tokio::select! {
_ = &mut debounce, if pending_change || pending_time_jump => {
self.handle_potential_change(pending_time_jump).await;
self.handle_potential_change(&get_state, pending_time_jump).await;
pending_change = false;
pending_time_jump = false;
}
_ = wall_time_interval.tick() => {
trace!("tick: wall_time_interval");
_ = poll_interval.tick() => {
trace!("tick: poll_interval");
if self.check_wall_time_advance() {
pending_time_jump = true;
debounce.as_mut().reset(Instant::now() + DEBOUNCE);
}
// Reconcile on every tick, not just on wall-time jumps. OS
// route monitors are best-effort and can drop events
// silently (or stop delivering entirely), which would
// otherwise freeze the interface state until restart.
// `handle_potential_change` diffs, so this is a no-op when
// nothing changed.
pending_change = true;
debounce.as_mut().reset(Instant::now() + DEBOUNCE);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now at every interval this always does a full enumeration. What does the check_wall_time_advance logic still achieve at this point? Because it basically does a full enumeration every 15s now. Which IIUC the wall-clock jump was trying to optimise to only do if there is a clock jump. And they both trigger by the same timer, so it is not like one triggers faster.

Could you point to the exact code in tailscale that does a full network enumeration every 15s? Because they also have this time-jump detection and I'd like to understand how they let these two interact.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, just resume-from-sleep detection now: stamps last_unsuspend and force-publishes on wake even when interfaces are unchanged (the reconcile treats that as a no-op). Though last_unsuspend is write-only in netwatch atm, so the only live effect is that wake notification.

Sorry to send you down the wrong trail here and wasting your time. 🙈 I didn't fully verify this claim and it proved to be wrong. Tailscale doesn't reconcile every 15s.

}
event = self.mon_receiver.recv() => {
match event {
Expand Down Expand Up @@ -140,10 +163,10 @@ impl Actor {
}
}

async fn handle_potential_change(&mut self, time_jumped: bool) {
async fn handle_potential_change(&mut self, get_state: &StateFn, time_jumped: bool) {
trace!("potential change");

let mut new_state = State::new().await;
let mut new_state = (get_state)().await;
let old_state = &self.interface_state.get();

if time_jumped {
Expand All @@ -158,11 +181,11 @@ impl Actor {
}

/// Reports whether wall time jumped more than 150%
/// of `POLL_WALL_TIME_INTERVAL`, indicating we probably just came out of sleep.
/// of [`POLL_INTERVAL`], indicating we probably just came out of sleep.
fn check_wall_time_advance(&mut self) -> bool {
let now = Instant::now();
let jumped = if let Some(elapsed) = now.checked_duration_since(self.wall_time) {
elapsed > POLL_WALL_TIME_INTERVAL * 3 / 2
elapsed > POLL_INTERVAL * 3 / 2
} else {
false
};
Expand All @@ -171,3 +194,89 @@ impl Actor {
jumped
}
}

#[cfg(test)]
mod tests {
use std::sync::Mutex;

use n0_watcher::Watcher as _;

use super::*;

/// Builds two distinct interface states so a transition is observable.
fn state_pair() -> (State, State) {
let with_iface = State::fake();
let mut without_iface = with_iface.clone();
without_iface.interfaces.clear();
assert_ne!(with_iface, without_iface);
(with_iface, without_iface)
}

/// State must converge via the periodic reconcile alone, with no
/// route-monitor events. Paused time auto-advances through the poll
/// interval, so this does not wait in real time.
#[tokio::test(start_paused = true)]
async fn recovers_state_without_route_events() {
// Enumeration returns "before" once, then "after" on every later call:
// an interface change the route monitor never signalled.
let (before, after) = state_pair();
let calls = Arc::new(Mutex::new(0usize));
let states = Arc::new((before.clone(), after.clone()));
let get_state: StateFn = {
let calls = calls.clone();
let states = states.clone();
Arc::new(move || {
let n = {
let mut g = calls.lock().unwrap();
let n = *g;
*g += 1;
n
};
let states = states.clone();
async move {
if n == 0 {
states.0.clone()
} else {
states.1.clone()
}
}
.boxed()
})
};

// Keep `_mon_sender` alive so the actor does not see all senders gone
// and shut down.
let initial = (get_state)().await;
assert_eq!(initial, before);
let (_mon_sender, mon_receiver) = mpsc::channel(MON_CHAN_CAPACITY);
let (actor_sender, actor_receiver) = mpsc::channel(ACTOR_CHAN_CAPACITY);
let interface_state = Watchable::new(initial);
let mut watch = interface_state.watch();

let actor = Actor {
interface_state,
wall_time: Instant::now(),
mon_receiver,
actor_receiver,
actor_sender,
};
let handle = tokio::spawn(actor.run(None, get_state));

// Must exceed POLL_INTERVAL, or auto-advanced time trips it before the
// first reconcile. Without the fix, state never converges and this fires.
let updated = tokio::time::timeout(Duration::from_secs(60), async {
loop {
let state = watch.updated().await.expect("watcher closed");
if state == after {
return state;
}
}
})
.await
.expect("interface state was not reconciled without route events");
assert_eq!(updated, after);

drop(_mon_sender);
handle.abort();
}
}