diff --git a/Cargo.lock b/Cargo.lock index 4bc95046..d855f3da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4699,6 +4699,7 @@ dependencies = [ name = "openlogi-agent-core" version = "0.6.12" dependencies = [ + "async-hid", "bincode", "interprocess", "openlogi-core", diff --git a/crates/openlogi-agent-core/Cargo.toml b/crates/openlogi-agent-core/Cargo.toml index 0cbf3492..41e8560f 100644 --- a/crates/openlogi-agent-core/Cargo.toml +++ b/crates/openlogi-agent-core/Cargo.toml @@ -25,3 +25,5 @@ workspace = true [dev-dependencies] bincode = "1.3" +# Only to construct an `InventoryError::Hid` in the watcher's classify tests. +async-hid = { workspace = true } diff --git a/crates/openlogi-agent-core/src/watchers/inventory.rs b/crates/openlogi-agent-core/src/watchers/inventory.rs index 45640dce..b5abae03 100644 --- a/crates/openlogi-agent-core/src/watchers/inventory.rs +++ b/crates/openlogi-agent-core/src/watchers/inventory.rs @@ -48,6 +48,54 @@ pub enum InventoryEvent { SystemWake, } +/// The watcher's cross-tick memory, factored out of the poll loop so the +/// tick → event decision is unit-testable without spawning the thread or +/// touching real HID. +#[derive(Default)] +struct WatchState { + /// Set once any enumeration has completed. After that, a failed tick keeps + /// the last good snapshot forever instead of ever reporting `Unavailable`. + succeeded: bool, + /// Consecutive failures, counted only before the first success. + initial_failures: u8, +} + +impl WatchState { + /// Decide what (if anything) a watch tick emits. + /// + /// - `Ok(snapshot)` — a completed enumeration (an empty one included: that's + /// a genuine disconnect) — is forwarded so the agent's device set tracks + /// reality. A transient per-node probe miss never reaches here as an empty + /// `Ok`: `openlogi_hid`'s `NodeLedger` replays the node's last inventory + /// (#218/#222). + /// - `Err(..)` means enumeration itself failed (OS-level HID enumerate + /// error): emit nothing, so the agent keeps its last good device set and + /// live bindings instead of wiping them for ~one period. Before the *first* + /// success there is no good set to keep, so persistent initial failure is + /// reported once as [`InventoryEvent::Unavailable`]; the loop keeps + /// retrying and a later success recovers. + fn classify( + &mut self, + result: Result, openlogi_hid::InventoryError>, + ) -> Option { + match result { + Ok(inv) => { + self.succeeded = true; + Some(InventoryEvent::Snapshot(inv)) + } + Err(e) => { + warn!(error = ?e, "enumerate failed during watch tick — keeping last snapshot"); + if self.succeeded { + return None; + } + self.initial_failures = self.initial_failures.saturating_add(1); + (self.initial_failures == INITIAL_FAILURE_LIMIT) + .then_some(InventoryEvent::Unavailable) + } + } + } +} + /// Spawn the watcher and return a receiver of inventory events. The /// channel is unbounded so a slow consumer cannot back-pressure the HID /// poll loop into stalling on a real device disconnect. @@ -76,8 +124,7 @@ pub fn spawn(period: Duration) -> mpsc::UnboundedReceiver { // across ticks — a known device's immutable data (model, features) // is reused instead of being re-handshaked every poll. let mut enumerator = openlogi_hid::Enumerator::default(); - let mut succeeded = false; - let mut initial_failures: u8 = 0; + let mut state = WatchState::default(); let mut last_tick = SystemTime::now(); loop { // A tick arriving far past its period means the system slept; @@ -93,32 +140,12 @@ pub fn spawn(period: Duration) -> mpsc::UnboundedReceiver { } } last_tick = now; - match rt.block_on(enumerator.enumerate()) { - Ok(inv) => { - succeeded = true; - if worker_tx.send(InventoryEvent::Snapshot(inv)).is_err() { - debug!("inventory watcher receiver dropped — exiting"); - return; - } - } - // A failed enumerate means "couldn't check", NOT "no devices": - // skip the tick so the agent keeps its last good device set - // and live bindings instead of wiping them for ~one period. A - // genuine disconnect comes back as an `Ok` empty snapshot, - // which we DO forward. Before the *first* success there is no - // good set to keep, so persistent failure is reported once — - // the loop keeps retrying, and a later success recovers. - Err(e) => { - warn!(error = ?e, "enumerate failed during watch tick — keeping last snapshot"); - if !succeeded { - initial_failures = initial_failures.saturating_add(1); - if initial_failures == INITIAL_FAILURE_LIMIT - && worker_tx.send(InventoryEvent::Unavailable).is_err() - { - return; - } - } - } + let result = rt.block_on(enumerator.enumerate()); + if let Some(event) = state.classify(result) + && worker_tx.send(event).is_err() + { + debug!("inventory watcher receiver dropped — exiting"); + return; } thread::sleep(period); } @@ -133,3 +160,63 @@ pub fn spawn(period: Duration) -> mpsc::UnboundedReceiver { } rx } + +#[cfg(test)] +mod tests { + use openlogi_hid::InventoryError; + + use super::{INITIAL_FAILURE_LIMIT, InventoryEvent, WatchState}; + + /// A transport-level enumerate failure — what the watcher's `Err` arm now + /// sees (a partial per-node read is replayed by the hid ledger as `Ok`). + fn enumerate_failed() -> InventoryError { + InventoryError::Hid(async_hid::HidError::Disconnected) + } + + #[test] + fn completed_enumeration_is_forwarded_even_when_empty() { + let mut state = WatchState::default(); + // A genuine "checked, nothing there" still propagates as a disconnect — + // the resilience must not swallow a real empty. + assert!(matches!( + state.classify(Ok(vec![])), + Some(InventoryEvent::Snapshot(snap)) if snap.is_empty() + )); + assert!(state.succeeded); + } + + #[test] + fn failure_after_a_success_keeps_the_last_snapshot() { + let mut state = WatchState::default(); + // A good tick first, so there is a last-known-good set to preserve. + assert!(matches!( + state.classify(Ok(vec![])), + Some(InventoryEvent::Snapshot(_)) + )); + // Then transient enumerate failures emit nothing — the agent keeps the + // last snapshot instead of flapping to "No devices" (#218). + assert!(state.classify(Err(enumerate_failed())).is_none()); + assert!(state.classify(Err(enumerate_failed())).is_none()); + } + + #[test] + fn persistent_initial_failure_reports_unavailable_once_then_recovers() { + let mut state = WatchState::default(); + // No snapshot has ever landed, so repeated failure must eventually stop + // looking like "still scanning". + for _ in 0..INITIAL_FAILURE_LIMIT - 1 { + assert!(state.classify(Err(enumerate_failed())).is_none()); + } + assert!(matches!( + state.classify(Err(enumerate_failed())), + Some(InventoryEvent::Unavailable) + )); + // Reported once, not on every later failure. + assert!(state.classify(Err(enumerate_failed())).is_none()); + // …and a later success recovers with a live snapshot. + assert!(matches!( + state.classify(Ok(vec![])), + Some(InventoryEvent::Snapshot(_)) + )); + } +} diff --git a/crates/openlogi-hid/src/inventory.rs b/crates/openlogi-hid/src/inventory.rs index e8fe9368..3e71349b 100644 --- a/crates/openlogi-hid/src/inventory.rs +++ b/crates/openlogi-hid/src/inventory.rs @@ -255,9 +255,39 @@ struct CachedChannel { /// We merge the two so an MX Master that's been asleep still shows up with /// its codename and kind even before you click it. pub async fn enumerate() -> Result, InventoryError> { - Enumerator::default().enumerate().await + // The polling [`Enumerator`] keeps a per-node ledger across ticks, so a + // transient probe miss replays the node's last good inventory. A one-shot + // caller (CLI `list` / `diag`) builds a fresh `Enumerator` whose ledger is + // empty, so a miss has nothing to replay and would surface as an empty or + // partial list — the two isolated runs in #218 read 3 devices and 0. Retry a + // few times instead, reusing the same enumerator so its ledger accumulates a + // snapshot a later attempt can replay and the opened channel stays warm. + // #226's 5 s request timeout inside `HidppChannel::send` makes a dead probe + // fail fast, so a short bounded retry is cheap. + let mut enumerator = Enumerator::default(); + let mut attempt = 1u8; + loop { + let (inventories, all_healthy) = enumerator.enumerate_reporting_health().await?; + if all_healthy || attempt >= ONESHOT_ATTEMPTS { + return Ok(inventories); + } + debug!( + attempt, + "one-shot enumerate saw an unhealthy node — retrying" + ); + tokio::time::sleep(ONESHOT_RETRY_DELAY).await; + attempt += 1; + } } +/// Attempts a one-shot [`enumerate`] makes before returning whatever it last +/// read, when a node keeps coming back unhealthy. +const ONESHOT_ATTEMPTS: u8 = 4; + +/// Delay between one-shot [`enumerate`] retries. A first probe usually wakes an +/// asleep device, so a short pause lets the next attempt read it cleanly. +const ONESHOT_RETRY_DELAY: Duration = Duration::from_millis(300); + impl Enumerator { /// One enumeration pass, reusing the cache from prior passes. Probes every /// HID candidate concurrently (so one asleep node that burns the whole @@ -270,6 +300,19 @@ impl Enumerator { /// channel is reopened, so a transient HID++ glitch can't masquerade as /// "no devices" (#218) — see [`crate::node_ledger`]. pub async fn enumerate(&mut self) -> Result, InventoryError> { + self.enumerate_reporting_health().await.map(|(inv, _)| inv) + } + + /// [`Self::enumerate`] plus whether every probed node answered cleanly this + /// pass — `false` if any probe timed out, failed to open, or read short of a + /// receiver's pairing count. The polling watcher ignores the flag (the ledger + /// already replays a node through a transient miss), but the one-shot + /// [`enumerate`] free fn uses it to retry: a fresh `Enumerator` has no ledger + /// history to replay, so a transient miss would otherwise surface as an + /// empty/partial list (#218). + async fn enumerate_reporting_health( + &mut self, + ) -> Result<(Vec, bool), InventoryError> { self.tick = self.tick.wrapping_add(1); let tick = self.tick; let candidates = enumerate_hidpp_devices().await?; @@ -336,6 +379,9 @@ impl Enumerator { let mut inventories = Vec::new(); let mut outcomes = Vec::new(); + // Whether every node answered cleanly this pass. Drives the one-shot + // `enumerate` retry; the ledger's own per-node replay is unaffected. + let mut all_healthy = true; for (node, result) in results { let probe = if let Ok(probe) = result { probe @@ -347,6 +393,7 @@ impl Enumerator { warn!(budget = ?PROBE_BUDGET, "device probe timed out — treating as a failed probe"); NodeProbe::failed() }; + all_healthy &= probe.healthy; outcomes.extend(probe.outcomes); let settled = self.ledger.settle(&node, probe.healthy, probe.inventory); if settled.evict_channel && self.channels.remove(&node).is_some() { @@ -357,6 +404,7 @@ impl Enumerator { // Nodes that wouldn't open this tick still replay their last snapshot // (they have no cached channel to evict). for node in open_failures { + all_healthy = false; let settled = self.ledger.settle(&node, false, None); inventories.extend(settled.inventory); } @@ -376,7 +424,7 @@ impl Enumerator { } } self.evict_unseen(&seen_keys); - Ok(inventories) + Ok((inventories, all_healthy)) } /// Drop cache entries for devices not seen this tick, after a short grace so