diff --git a/crates/uffs-daemon/src/cache/body_loader.rs b/crates/uffs-daemon/src/cache/body_loader.rs index ab9c2996e..39b31cd41 100644 --- a/crates/uffs-daemon/src/cache/body_loader.rs +++ b/crates/uffs-daemon/src/cache/body_loader.rs @@ -88,6 +88,16 @@ impl BodyLoader for DiskBodyLoader { // the daemon's last MFT refresh. On non-Windows the helper // errors out by design and we fall through to the bare // compact-cache load below. + // + // NB: the startup warm-load guard (`cache::guarded_load`) is + // deliberately NOT used here. Re-promote runs while the + // daemon is live, and the per-shard journal loop keeps + // advancing its persisted cursor while the shard is demoted + // (the apply no-ops with no warm body). Serving the on-disk + // compact cache then deferring to that loop would strand the + // `[demote, now]` delta forever — the loop's cursor is already + // past it. A synchronous refresh is the only correct choice + // on this path. match uffs_core::compact_loader::load_drive_with_usn_refresh(letter) { Ok((body, _timing)) => return Some(Arc::new(body)), Err(err) => { diff --git a/crates/uffs-daemon/src/cache/guarded_load.rs b/crates/uffs-daemon/src/cache/guarded_load.rs new file mode 100644 index 000000000..ffdd466c7 --- /dev/null +++ b/crates/uffs-daemon/src/cache/guarded_load.rs @@ -0,0 +1,297 @@ +// SPDX-License-Identifier: MPL-2.0 +// Copyright (c) 2025-2026 SKY, LLC. + +//! Guarded warm-load: serve the on-disk compact cache fast when the +//! background USN journal loop can converge the (bounded) delta, and +//! fall back to a synchronous full rebuild only when it cannot. +//! +//! ## Why this exists +//! +//! Before #94 the warm load (daemon restart from an existing cache) +//! deserialized the compact cache directly — ~20 ms — and relied on a +//! later refresh to catch up. #94 replaced that with a **synchronous** +//! `load_drive_with_usn_refresh` on every warm load: a full `MftIndex` +//! read plus a complete `build_compact_index` (records + `path_len` + +//! trigram + children + extension indexes). That fixed a real +//! staleness bug (5/7 drives served stale data at v0.5.80) **but** moved +//! hundreds of milliseconds of CPU onto the warm-start critical path, +//! regressing the WARM phase. +//! +//! Since #94, the Phase 7 per-shard journal loop +//! ([`crate::cache::journal_loop`]) exists: it polls the live USN +//! journal every 500 ms, seeds its cursor from the persisted +//! `_usn.cursor`, and applies deltas incrementally. That loop +//! makes the synchronous refresh **redundant for freshness** in the +//! common case — the loop converges the cache to the live filesystem +//! within roughly one poll interval after startup. +//! +//! ## The guard +//! +//! [`decide_strategy`] inspects the cheap signals only — the persisted +//! cursor (an 8-byte file read) and `FSCTL_QUERY_USN_JOURNAL` (a single +//! ioctl) — and never touches the multi-hundred-MB `MftIndex`: +//! +//! * [`WarmLoadStrategy::FastFromCompactCache`] — the persisted cursor lies +//! inside the live journal's valid window (`first_usn <= cursor <= +//! next_usn`). The compact cache is at least as fresh as that cursor (the +//! journal loop writes body + cursor in lockstep, and full rebuilds write a +//! body at the live head ≥ the cursor), so serving it immediately is safe: +//! the background loop re-applies `[cursor, live)` — idempotent on any +//! overlap — and converges within ~one poll interval. +//! +//! * [`WarmLoadStrategy::FullRebuild`] — the cursor is absent (`0` sentinel: +//! cold boot / never persisted), predates the journal (`cursor < first_usn`: +//! wrapped or long-downtime), or postdates it (`cursor > next_usn`: the +//! journal was deleted + recreated and is younger than the cursor). In each +//! case the background loop cannot converge the existing cache from the +//! persisted cursor, so we pay the synchronous rebuild — preserving #94's +//! correctness guarantee. +//! +//! ### Residual edge case +//! +//! A journal deleted + recreated *while the daemon was down* whose new +//! `[first_usn, next_usn]` window happens to contain the stale cursor +//! would pass the bounds check yet point into an unrelated USN space. +//! This requires an admin `fsutil usn deletejournal` plus a coincidental +//! USN-range overlap and is not reachable under normal operation. +//! Closing it fully would require persisting the journal id alongside +//! the cursor; that is deliberately out of scope here to keep the change +//! surgical, and is documented rather than hidden. + +#[cfg(windows)] +use uffs_core::compact::DriveCompactIndex; + +/// Outcome of [`decide_strategy`]: how to materialise a drive's body on +/// a warm load. +/// +/// Compiled on Windows (the only platform with a USN journal) and under +/// `test` (so the boundary logic is host-testable); a non-Windows +/// release build never reaches the decision, so it is not compiled there. +#[cfg(any(windows, test))] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum WarmLoadStrategy { + /// Deserialize the on-disk compact cache directly (~20 ms) and let + /// the background journal loop converge the bounded delta. + FastFromCompactCache, + /// Synchronously rebuild from a live `MftIndex` read + USN replay + /// (the #94 path) because the background loop cannot converge the + /// existing cache from the persisted cursor. + FullRebuild, +} + +/// Pure warm-load decision from the cheap signals. +/// +/// `cursor` is the persisted `_usn.cursor` value (`0` when no +/// cursor has been persisted yet). `first_usn` / `next_usn` are the +/// live journal bounds from `FSCTL_QUERY_USN_JOURNAL`. +/// +/// Kept platform-agnostic and side-effect-free so the boundary logic is +/// pinned by host unit tests rather than only exercised on Windows. +#[cfg(any(windows, test))] +#[must_use] +pub(crate) fn decide_strategy(cursor: u64, first_usn: i64, next_usn: i64) -> WarmLoadStrategy { + // No persisted cursor → cold boot / first touch: the background loop + // would have to replay from the journal head, which cannot + // reconstruct an arbitrary on-disk snapshot. Build synchronously. + if cursor == 0 { + return WarmLoadStrategy::FullRebuild; + } + // Narrow the unsigned persisted cursor to the kernel-facing signed + // USN space exactly as the journal source does + // (`WindowsJournalSource::poll`), so the comparison matches the value + // the background loop will actually feed to `FSCTL_READ_USN_JOURNAL`. + let cursor_usn = i64::try_from(cursor).unwrap_or(i64::MAX); + // Cursor predates the oldest readable record (journal wrapped or the + // daemon was down long enough for the gap to be truncated), or + // postdates the live head (journal recreated younger than the + // cursor). Either way the loop can't converge the existing cache. + if cursor_usn < first_usn || cursor_usn > next_usn { + return WarmLoadStrategy::FullRebuild; + } + WarmLoadStrategy::FastFromCompactCache +} + +/// Windows warm-load entry point used by the startup live-drive loader +/// and the re-promote body loader. +/// +/// Returns the same `(DriveCompactIndex, LoadTiming)` shape as +/// [`uffs_core::compact::load_drive`] so call sites are unchanged. +/// +/// # Errors +/// +/// Propagates the underlying loader error when both the fast compact +/// cache path and the synchronous rebuild fail. +#[cfg(windows)] +pub(crate) fn load_live_drive( + letter: uffs_mft::platform::DriveLetter, + no_cache: bool, +) -> anyhow::Result<(DriveCompactIndex, uffs_core::compact::LoadTiming)> { + use std::time::Instant; + + use crate::cache::cursor_store::DiskCursorStore; + use crate::cache::journal_loop::CursorStore as _; + + // `--no-cache` forces a clean rebuild and must not consult the cache. + if no_cache { + return full_rebuild(letter, no_cache, None); + } + + // Cheap signal #1: live journal bounds (single ioctl, no MFT read). + let info = match uffs_mft::usn::query_usn_journal(letter) { + Ok(info) => info, + Err(err) => { + // No journal (e.g. os error 1179) → no incremental refresh + // mechanism, so the background loop can't converge a cached + // body. Rebuild synchronously, matching the pre-guard path. + tracing::debug!( + target: "shard.warm_load", + drive = %letter, + error = %err, + "USN journal unavailable; full rebuild", + ); + return full_rebuild(letter, no_cache, None); + } + }; + + // Cheap signal #2: persisted cursor (8-byte file read). + let cursor_store = DiskCursorStore::new(uffs_mft::cache::cache_dir()); + let cursor = cursor_store.load(letter); + + match decide_strategy(cursor, info.first_usn.raw(), info.next_usn.raw()) { + WarmLoadStrategy::FastFromCompactCache => { + let t0 = Instant::now(); + match uffs_core::compact_cache::load_compact_cache(letter, u64::MAX, 0, true) { + Ok(body) => { + let cache_ms = t0.elapsed().as_millis(); + tracing::info!( + target: "shard.warm_load", + drive = %letter, + cursor, + first_usn = %info.first_usn, + next_usn = %info.next_usn, + cache_ms, + "Warm load: fast compact-cache path (background loop converges delta)", + ); + Ok((body, uffs_core::compact::LoadTiming { + cache: cache_ms, + mft: 0, + compact: 0, + trigram: 0, + })) + } + Err(err) => { + // Cache missing / corrupt / key rotated: the fast + // path is unavailable, so rebuild synchronously. + tracing::warn!( + target: "shard.warm_load", + drive = %letter, + error = %err, + "Warm load: compact cache unusable; falling back to full rebuild", + ); + full_rebuild(letter, no_cache, Some(&info)) + } + } + } + WarmLoadStrategy::FullRebuild => full_rebuild(letter, no_cache, Some(&info)), + } +} + +/// Run the synchronous full rebuild ([`uffs_core::compact::load_drive`]) +/// and, on success, persist a cursor so a subsequent restart can take +/// the fast path. +/// +/// The persisted cursor is the journal's `next_usn` **captured before** +/// the rebuild's MFT read. The freshly built body reflects USN state up +/// to the live head at read time, which is `>=` that pre-read value, so +/// the persisted cursor is a safe lower bound: the background loop +/// re-applies `[cursor, live)` idempotently with no gap. +#[cfg(windows)] +fn full_rebuild( + letter: uffs_mft::platform::DriveLetter, + no_cache: bool, + info: Option<&uffs_mft::usn::UsnJournalInfo>, +) -> anyhow::Result<(DriveCompactIndex, uffs_core::compact::LoadTiming)> { + use crate::cache::cursor_store::DiskCursorStore; + use crate::cache::journal_loop::CursorStore as _; + + let result = + uffs_core::compact::load_drive(&uffs_core::compact::MftSource::Live(letter), no_cache)?; + + // Best-effort cursor seed so the next warm load is fast. Only when + // caching is enabled and we have a live journal reading; never + // regress a newer persisted cursor (the journal loop may have + // advanced past this pre-read lower bound already). + if !no_cache && let Some(journal) = info { + let store = DiskCursorStore::new(uffs_mft::cache::cache_dir()); + let pre_read = u64::try_from(journal.next_usn.raw()).unwrap_or(0); + let seed = store.load(letter).max(pre_read); + if seed != 0 { + store.store(letter, seed); + } + } + + Ok(result) +} + +#[cfg(test)] +mod tests { + use super::{WarmLoadStrategy, decide_strategy}; + + #[test] + fn zero_cursor_forces_full_rebuild() { + // No persisted cursor → cold boot; the loop can't reconstruct an + // arbitrary snapshot from the journal head. + assert_eq!(decide_strategy(0, 0, 1_000), WarmLoadStrategy::FullRebuild); + } + + #[test] + fn cursor_inside_window_takes_fast_path() { + assert_eq!( + decide_strategy(500, 100, 1_000), + WarmLoadStrategy::FastFromCompactCache + ); + } + + #[test] + fn cursor_at_window_edges_is_fast() { + // Inclusive bounds: a cursor exactly at first_usn or next_usn is + // still convergeable by the background loop. + assert_eq!( + decide_strategy(100, 100, 1_000), + WarmLoadStrategy::FastFromCompactCache + ); + assert_eq!( + decide_strategy(1_000, 100, 1_000), + WarmLoadStrategy::FastFromCompactCache + ); + } + + #[test] + fn cursor_before_first_usn_rebuilds() { + // Journal wrapped / long downtime: the gap was truncated. + assert_eq!( + decide_strategy(50, 100, 1_000), + WarmLoadStrategy::FullRebuild + ); + } + + #[test] + fn cursor_after_next_usn_rebuilds() { + // Journal recreated younger than the persisted cursor. + assert_eq!( + decide_strategy(2_000, 100, 1_000), + WarmLoadStrategy::FullRebuild + ); + } + + #[test] + fn cursor_overflowing_i64_is_clamped_and_rebuilds() { + // A cursor past i64::MAX narrows to i64::MAX, which is > any real + // next_usn, so it conservatively rebuilds rather than trusting a + // nonsensical value. + assert_eq!( + decide_strategy(u64::MAX, 100, 1_000), + WarmLoadStrategy::FullRebuild + ); + } +} diff --git a/crates/uffs-daemon/src/cache/journal_loop.rs b/crates/uffs-daemon/src/cache/journal_loop.rs index 51ba54ed5..9062d02b2 100644 --- a/crates/uffs-daemon/src/cache/journal_loop.rs +++ b/crates/uffs-daemon/src/cache/journal_loop.rs @@ -175,7 +175,19 @@ pub(crate) trait PatchSink: Send + Sync + 'static { /// background thread; the loop does not wait for it. If /// the save fails, the implementor logs but does not /// propagate — the next threshold crossing will retry. - fn trigger_save(&self, letter: uffs_mft::platform::DriveLetter, reason: SaveReason); + /// + /// `cursor` is the loop's current read position. The sink + /// persists it **in lockstep** with the on-disk compact-cache + /// body — and only when that body save actually succeeds — so + /// the persisted cursor never outruns the persisted body (a + /// parked shard's save is a no-op, so its cursor must not + /// advance on disk; see `journal_sink`). + fn trigger_save( + &self, + letter: uffs_mft::platform::DriveLetter, + reason: SaveReason, + cursor: u64, + ); /// Notify the sink that the USN journal for `letter` was /// detected to have wrapped (Phase 7 task 7.7). @@ -471,7 +483,12 @@ impl JournalLoop { } cursor = result.next_cursor; - let saved = process_tick( + // The cursor is persisted by the sink in lockstep with the + // compact-cache body save (and only when that save actually + // happens), so the loop no longer persists it here — doing + // so would let a parked shard's on-disk cursor outrun its + // frozen on-disk body. See `PatchSink::trigger_save`. + process_tick( self.sink.as_ref(), letter, cursor, @@ -480,11 +497,6 @@ impl JournalLoop { self.config.save_threshold_events, self.config.save_threshold_age, ); - // Persist the cursor in lockstep with each save so - // on-disk cursor and on-disk body advance together. - if saved { - self.cursor_store.store(letter, cursor); - } } } } @@ -556,10 +568,8 @@ async fn poll_blocking( /// /// On a non-empty tick, also: (a) records the event count into /// `save_trigger`, (b) evaluates the save thresholds, and (c) -/// fires [`PatchSink::trigger_save`] when a threshold crosses. -/// -/// **Returns** `true` when a save was triggered this tick (the -/// caller persists the cursor in lockstep), `false` otherwise. +/// fires [`PatchSink::trigger_save`] (passing `cursor` so the sink can +/// persist it in lockstep with the body save) when a threshold crosses. fn process_tick( sink: &dyn PatchSink, letter: uffs_mft::platform::DriveLetter, @@ -568,17 +578,15 @@ fn process_tick( save_trigger: &mut SaveTrigger, save_threshold_events: u64, save_threshold_age: Duration, -) -> bool { +) { if changes.is_empty() { tracing::trace!(drive = %letter, "Journal poll: no changes"); - return false; + return; } let accepted = sink.accept(letter, changes); save_trigger.record(changes.len() as u64); - let mut saved = false; if let Some(reason) = save_trigger.evaluate(save_threshold_events, save_threshold_age) { - sink.trigger_save(letter, reason); - saved = true; + sink.trigger_save(letter, reason, cursor); tracing::info!( drive = %letter, ?reason, @@ -593,7 +601,6 @@ fn process_tick( cursor, "Journal poll: applied tick" ); - saved } /// Handle returned by [`spawn_journal_loop`] for cancellation + diff --git a/crates/uffs-daemon/src/cache/journal_loop/tests.rs b/crates/uffs-daemon/src/cache/journal_loop/tests.rs index 81d14668b..697a36e4e 100644 --- a/crates/uffs-daemon/src/cache/journal_loop/tests.rs +++ b/crates/uffs-daemon/src/cache/journal_loop/tests.rs @@ -168,6 +168,12 @@ struct RecordingSink { /// Phase 7-C surface — lets tests assert the threshold state /// machine fires the right reason at the right time. save_calls: Mutex>, + /// One entry per `trigger_save()` call: the `cursor` argument. + /// The loop hands its read position to the sink here; the sink + /// (production `RegistryPatchSink`) persists it in lockstep with + /// a successful body save. Lets the loop-level tests assert the + /// cursor reaches the sink without coupling to persistence. + save_cursors: Mutex>, /// One entry per `journal_wrapped()` call: `letter`. /// Phase 7-D surface — lets tests assert the wrap-detection /// state machine fires when `journal_id` changes between @@ -184,6 +190,7 @@ impl RecordingSink { Self { calls: Mutex::new(Vec::new()), save_calls: Mutex::new(Vec::new()), + save_cursors: Mutex::new(Vec::new()), wrap_calls: Mutex::new(Vec::new()), accept_outcome: Mutex::new(true), } @@ -197,6 +204,10 @@ impl RecordingSink { lock_or_recover(&self.save_calls).clone() } + fn save_cursors(&self) -> Vec { + lock_or_recover(&self.save_cursors).clone() + } + fn wrap_calls(&self) -> Vec { lock_or_recover(&self.wrap_calls).clone() } @@ -208,8 +219,14 @@ impl PatchSink for RecordingSink { *lock_or_recover(&self.accept_outcome) } - fn trigger_save(&self, letter: uffs_mft::platform::DriveLetter, reason: SaveReason) { + fn trigger_save( + &self, + letter: uffs_mft::platform::DriveLetter, + reason: SaveReason, + cursor: u64, + ) { lock_or_recover(&self.save_calls).push((letter, reason)); + lock_or_recover(&self.save_cursors).push(cursor); } fn journal_wrapped(&self, letter: uffs_mft::platform::DriveLetter) { diff --git a/crates/uffs-daemon/src/cache/journal_loop/tests/integration.rs b/crates/uffs-daemon/src/cache/journal_loop/tests/integration.rs index df0558473..b9e16c8ff 100644 --- a/crates/uffs-daemon/src/cache/journal_loop/tests/integration.rs +++ b/crates/uffs-daemon/src/cache/journal_loop/tests/integration.rs @@ -16,9 +16,10 @@ //! * **Multiple saves fire** \u2014 with `save_threshold_events = BATCH_SIZE + //! (BATCH_SIZE / 2)`, the threshold crosses inside roughly every other batch. //! A 10-batch run produces \u2265 4 saves. -//! * **Final cursor persists** \u2014 the cursor store's append-only log -//! contains the post-final-batch cursor, proving cursor and body advance in -//! lockstep all the way through. +//! * **Final cursor handed to sink** \u2014 the cursor handed to the sink +//! advances to the post-final-batch value, proving the loop carries its read +//! position forward to the sink (which persists it in lockstep with the body +//! save; the sink-side lockstep is pinned in `cache::journal_sink`). //! * **Cursor monotonicity** \u2014 across 10+ polls, every cursor passed into //! `JournalSource::poll` is \u2265 the previous one. //! * **No false-positive wrap detection** \u2014 with a single stable @@ -155,18 +156,30 @@ async fn ten_thousand_events_end_to_end() { "every save must be for drive 'C'; got {saves:?}" ); - // ── Final cursor persisted ────────────────────────────────────── - let log = cursor_store.store_log(); + // ── Final cursor handed to sink ───────────────────────────────── + // The loop forwards its read position to the sink on every save; + // persistence then happens sink-side in lockstep with the body + // save (so the loop itself never writes the cursor store on a + // save tick). + let save_cursors = sink.save_cursors(); assert!( - !log.is_empty(), - "cursor store must have at least one persistence write; got empty log" + !save_cursors.is_empty(), + "sink must have received at least one save cursor; got empty list" ); - // The largest cursor in the log should match TOTAL_EVENTS (the - // final batch's next_cursor). - let max_persisted = log.iter().map(|(_, cursor)| *cursor).max().unwrap_or(0); + // The largest cursor handed to the sink should match TOTAL_EVENTS + // (the final batch's next_cursor). + let max_handed = save_cursors.iter().copied().max().unwrap_or(0); assert!( - max_persisted >= TOTAL_EVENTS as u64, - "highest persisted cursor must be ≥ {TOTAL_EVENTS}; got {max_persisted}" + max_handed >= TOTAL_EVENTS as u64, + "highest cursor handed to the sink must be ≥ {TOTAL_EVENTS}; got {max_handed}" + ); + // The loop must not persist the cursor itself on a save tick — + // that moved to the sink so a parked shard's no-op save can't + // advance the on-disk cursor past the on-disk body. + assert!( + cursor_store.store_log().is_empty(), + "loop must not write the cursor store on save ticks; got {:?}", + cursor_store.store_log(), ); // ── Cursor monotonicity ───────────────────────────────────────── diff --git a/crates/uffs-daemon/src/cache/journal_loop/tests/save_log_message.rs b/crates/uffs-daemon/src/cache/journal_loop/tests/save_log_message.rs index 312714d1c..df7073833 100644 --- a/crates/uffs-daemon/src/cache/journal_loop/tests/save_log_message.rs +++ b/crates/uffs-daemon/src/cache/journal_loop/tests/save_log_message.rs @@ -78,7 +78,7 @@ fn compact_cache_save_log_message_pins_string_target_and_level() { // Age threshold set generously so it can't be the path that // fires (we want a deterministic `EventsExceeded` reason). let changes = [one_change(10), one_change(11), one_change(12)]; - let saved = process_tick( + process_tick( &sink as &dyn PatchSink, uffs_mft::platform::DriveLetter::C, 100, // cursor @@ -87,14 +87,11 @@ fn compact_cache_save_log_message_pins_string_target_and_level() { 1, // save_threshold_events — tight Duration::from_hours(1), // save_threshold_age — generous ); - assert!( - saved, - "process_tick must report saved=true when events threshold crosses" - ); - // Sanity: the sink saw the trigger_save callback once with the - // expected (letter, reason) pair. This is the behavioral - // contract; the *log message* below is the soak-harness contract. + // The sink saw the trigger_save callback once with the expected + // (letter, reason) pair — this proves the threshold crossed and + // fired a save. This is the behavioral contract; the *log + // message* below is the soak-harness contract. let save_calls = sink.save_calls(); assert_eq!( save_calls.as_slice(), @@ -104,6 +101,13 @@ fn compact_cache_save_log_message_pins_string_target_and_level() { )], "trigger_save must fire exactly once with EventsExceeded reason; got {save_calls:?}", ); + // The cursor passed to process_tick must be handed through to the + // sink so it can persist it in lockstep with the body save. + assert_eq!( + sink.save_cursors().as_slice(), + &[100], + "process_tick must forward the tick cursor to trigger_save", + ); // Find the INFO event the soak harness greps for. let events = log.events(); diff --git a/crates/uffs-daemon/src/cache/journal_loop/tests/wrap_and_persistence.rs b/crates/uffs-daemon/src/cache/journal_loop/tests/wrap_and_persistence.rs index b53fa6963..4f5bb746f 100644 --- a/crates/uffs-daemon/src/cache/journal_loop/tests/wrap_and_persistence.rs +++ b/crates/uffs-daemon/src/cache/journal_loop/tests/wrap_and_persistence.rs @@ -7,9 +7,13 @@ //! //! * **Cursor seeded at spawn** — `cursor_store.load(letter)` is the first //! poll's cursor argument when the store has a pre-loaded value. -//! * **Cursor persisted on save** — the loop calls `cursor_store.store(letter, -//! cursor)` at the same time as `trigger_save` so on-disk cursor and on-disk -//! body advance together. +//! * **Cursor handed to the sink on save** — the loop passes its read position +//! into `PatchSink::trigger_save(letter, reason, cursor)`. The loop itself +//! no longer persists the cursor on save; the production sink +//! (`RegistryPatchSink`) persists it **in lockstep with a successful +//! compact-cache body save** (a parked shard's save is a no-op, so its cursor +//! must not advance on disk). The sink-side lockstep is pinned in +//! `cache::journal_sink` tests. //! * **Wrap detected via `journal_id` change** — a different `journal_id` //! between two successive non-zero polls fires `sink.journal_wrapped(letter)` //! and the wrap-tick's changes are NOT applied via `accept`. @@ -64,7 +68,7 @@ async fn cursor_loaded_from_store_at_spawn() { } #[tokio::test] -async fn cursor_persisted_on_save_trigger() { +async fn cursor_handed_to_sink_on_save_trigger() { let source = Arc::new(FakeJournalSource::new()); let sink = Arc::new(RecordingSink::new()); let cursor_store = Arc::new(FakeCursorStore::new()); @@ -105,11 +109,22 @@ async fn cursor_persisted_on_save_trigger() { converged, "save did not fire within {CONVERGENCE_DEADLINE:?}" ); + // The loop hands its read position (100) to the sink's + // `trigger_save` so the sink can persist it in lockstep with the + // body save. + let save_cursors = sink.save_cursors(); + assert!( + save_cursors.contains(&100), + "cursor 100 must be handed to the sink's trigger_save; got {save_cursors:?}" + ); + // The loop itself must NOT persist the cursor on save — that + // responsibility moved to the sink so a parked shard's no-op + // save can't advance the on-disk cursor past the on-disk body. let log = cursor_store.store_log(); assert!( - log.iter() + !log.iter() .any(|&(letter, cursor)| letter == uffs_mft::platform::DriveLetter::C && cursor == 100), - "cursor 100 must be persisted alongside the save trigger; log = {log:?}" + "loop must not persist the save cursor directly; log = {log:?}" ); } diff --git a/crates/uffs-daemon/src/cache/journal_sink.rs b/crates/uffs-daemon/src/cache/journal_sink.rs index 0d23ff12e..4f2337d79 100644 --- a/crates/uffs-daemon/src/cache/journal_sink.rs +++ b/crates/uffs-daemon/src/cache/journal_sink.rs @@ -64,7 +64,7 @@ use tokio::sync::mpsc; use tokio::task::JoinHandle; use uffs_mft::usn::FileChange; -use super::journal_loop::{PatchSink, SaveReason}; +use super::journal_loop::{CursorStore, PatchSink, SaveReason}; use crate::index::IndexManager; /// Cross-task message from a sync sink callback to the async applier. @@ -90,6 +90,12 @@ enum ApplyMsg { /// triggers when the drive saw no churn since the last save /// (the surgical-patch path short-circuits to a no-op). changes: Vec, + /// The journal loop's read position at this save tick. The + /// applier persists it via the cursor store **only when the + /// compact-cache body save actually succeeds**, keeping the + /// on-disk cursor in lockstep with the on-disk body (a parked + /// shard's save is a no-op, so its cursor must not advance). + cursor: u64, }, /// `journal_wrapped` callback — the journal head reset so any /// pending events are stale; the applier discards them in the @@ -188,10 +194,16 @@ impl RegistryPatchSink { /// /// See `docs/dev/baseline/2026-05-19/phase_10_backpressure_audit.md` /// (local) for the full per-site verdict. - pub(crate) fn spawn_with_applier(idx: &Arc) -> (Arc, JoinHandle<()>) { + pub(crate) fn spawn_with_applier( + idx: &Arc, + cursor_store: Arc, + ) -> (Arc, JoinHandle<()>) { let (apply_tx, apply_rx) = mpsc::unbounded_channel(); let weak = Arc::downgrade(idx); - let handle = tokio::spawn(applier_task(apply_rx, weak)); + // The cursor store lives on the applier task (it persists the + // cursor in lockstep with a successful body save); the sink + // itself only forwards the cursor inside `ApplyMsg::Save`. + let handle = tokio::spawn(applier_task(apply_rx, weak, cursor_store)); ( Arc::new(Self { apply_tx, @@ -252,7 +264,12 @@ impl PatchSink for RegistryPatchSink { true } - fn trigger_save(&self, letter: uffs_mft::platform::DriveLetter, reason: SaveReason) { + fn trigger_save( + &self, + letter: uffs_mft::platform::DriveLetter, + reason: SaveReason, + cursor: u64, + ) { // Drain the per-letter buffer in one swoop so the applier // sees a snapshot of all events accumulated since the // previous save. An empty drained Vec is forwarded as-is @@ -265,6 +282,7 @@ impl PatchSink for RegistryPatchSink { letter, reason, changes: drained, + cursor, }); } @@ -291,7 +309,11 @@ impl PatchSink for RegistryPatchSink { /// `Weak::upgrade()` returns `None`, the loop returns immediately on the /// next message even if more are pending (correctness: the daemon is /// shutting down, no point applying more refreshes). -async fn applier_task(mut rx: mpsc::UnboundedReceiver, idx_weak: Weak) { +async fn applier_task( + mut rx: mpsc::UnboundedReceiver, + idx_weak: Weak, + cursor_store: Arc, +) { while let Some(msg) = rx.recv().await { let Some(idx_strong) = idx_weak.upgrade() else { tracing::debug!( @@ -300,7 +322,7 @@ async fn applier_task(mut rx: mpsc::UnboundedReceiver, idx_weak: Weak< ); return; }; - dispatch_msg(&idx_strong, msg).await; + dispatch_msg(&idx_strong, cursor_store.as_ref(), msg).await; } tracing::debug!( target: "shard.journal", @@ -316,12 +338,13 @@ async fn applier_task(mut rx: mpsc::UnboundedReceiver, idx_weak: Weak< /// also keeps the `Weak::upgrade` lifecycle decision in the parent /// (visible at the recv-loop level) and the per-variant dispatch /// in this helper (focused, easy to extend). -async fn dispatch_msg(idx: &Arc, msg: ApplyMsg) { +async fn dispatch_msg(idx: &Arc, cursor_store: &dyn CursorStore, msg: ApplyMsg) { match msg { ApplyMsg::Save { letter, reason, changes, + cursor, } => { let reason_str = save_reason_str(reason); // Phase 8 surgical-patch path: hand the drained per-letter @@ -329,7 +352,16 @@ async fn dispatch_msg(idx: &Arc, msg: ApplyMsg) { // which clones the Warm body, applies the patch, swaps // the new Arc into the registry, and persists the patched // body via `save_compact_cache_background`. - let _applied = idx.handle_journal_save(letter, reason_str, changes).await; + let applied = idx.handle_journal_save(letter, reason_str, changes).await; + // Persist the cursor in lockstep with the body save: only + // when the save actually happened (`applied`). A parked + // shard returns `false` here, so its on-disk cursor stays + // pinned to the last real body save and the startup + // warm-load guard can never strand a delta past the + // persisted body. See `cache::guarded_load`. + if applied { + cursor_store.store(letter, cursor); + } } ApplyMsg::Wrap { letter } => { // Wrap stays on the Phase-7 full-reload path. The @@ -355,379 +387,4 @@ const fn save_reason_str(reason: SaveReason) -> &'static str { } #[cfg(test)] -mod tests { - use super::*; - - /// Construct a fresh [`IndexManager`] suitable for sink lifecycle - /// tests. No drives loaded — the applier exits cleanly when - /// the strong-Arc drops, regardless of whether refresh messages - /// were drained, so tests don't need a populated registry. - fn fresh_index_manager() -> Arc { - let (event_tx, _event_rx) = crate::events::event_channel(); - Arc::new(IndexManager::new( - None, - event_tx, - Arc::new(crate::config::Config::default()), - )) - } - - /// Construct a [`FileChange`] fixture with a unique FRS for - /// per-event identification. Fields other than `frs` use - /// `FileChange::default()` because the sink doesn't inspect them - /// — only `IndexManager::handle_journal_save` does (covered in - /// `cache::shard::tests` and the patch end-to-end suite). - /// - /// Takes a raw `u64` because FRS values are most naturally - /// written as integer literals in test fixtures; lifts to the - /// typed `Frs` at this single construction boundary so the rest - /// of the test surface keeps the typed contract. - fn make_change(frs: u64) -> FileChange { - FileChange { - frs: frs.into(), - ..FileChange::default() - } - } - - /// Snapshot the per-letter pending buffer's event FRS sequence, - /// dropping the `lock_pending()` guard before returning so the - /// caller's assertions don't hold the mutex (satisfies - /// `clippy::significant_drop_tightening` in tests). - /// - /// Demotes typed `Frs` → raw `u64` at the snapshot boundary so - /// assertion literals stay as integer arrays. - fn pending_frs_for_letter( - sink: &RegistryPatchSink, - letter: uffs_mft::platform::DriveLetter, - ) -> Option> { - let guard = sink.lock_pending(); - guard - .get(&letter) - .map(|buf| buf.iter().map(|change| change.frs.raw()).collect()) - } - - /// Pin: `accept` appends to the per-letter pending buffer and - /// does NOT enqueue a message on the applier channel. - #[tokio::test] - async fn accept_buffers_changes_without_enqueueing() { - let (sink, mut rx) = RegistryPatchSink::new_for_test(); - - let accepted = sink.accept(uffs_mft::platform::DriveLetter::C, &[ - make_change(100), - make_change(101), - ]); - assert!(accepted, "accept must return true optimistically"); - - // The pending buffer holds the two events for letter 'C'. - let buf = pending_frs_for_letter(&sink, uffs_mft::platform::DriveLetter::C) - .expect("accept must populate pending buffer for 'C'"); - assert_eq!( - buf, - [100, 101], - "accept must preserve event order in the buffer", - ); - - // Channel is empty: accept did not enqueue. - assert!( - rx.try_recv().is_err(), - "accept must NOT enqueue an ApplyMsg", - ); - } - - /// Pin: a sequence of `accept` calls for the same letter - /// accumulates into the same buffer — no per-call drain or - /// truncation. - #[tokio::test] - async fn multiple_accepts_accumulate_in_pending() { - let (sink, _rx) = RegistryPatchSink::new_for_test(); - - sink.accept(uffs_mft::platform::DriveLetter::C, &[make_change(1)]); - sink.accept(uffs_mft::platform::DriveLetter::C, &[ - make_change(2), - make_change(3), - ]); - sink.accept(uffs_mft::platform::DriveLetter::C, &[make_change(4)]); - - let buf = pending_frs_for_letter(&sink, uffs_mft::platform::DriveLetter::C) - .expect("buffer for 'C' must exist"); - assert_eq!( - buf, - [1, 2, 3, 4], - "consecutive accepts must accumulate in send order", - ); - } - - /// Pin: `trigger_save` drains the pending buffer for `letter` - /// and ships it inside `ApplyMsg::Save { changes }`. The buffer - /// for `letter` is cleared after the drain. - #[tokio::test] - async fn trigger_save_drains_pending_into_save_message() { - let (sink, mut rx) = RegistryPatchSink::new_for_test(); - - sink.accept(uffs_mft::platform::DriveLetter::C, &[ - make_change(10), - make_change(11), - ]); - sink.trigger_save( - uffs_mft::platform::DriveLetter::C, - SaveReason::EventsExceeded, - ); - - let ApplyMsg::Save { - letter, - reason, - changes, - } = rx.try_recv().expect("trigger_save must enqueue Save") - else { - panic!("expected ApplyMsg::Save, got Wrap"); - }; - assert_eq!(letter, uffs_mft::platform::DriveLetter::C); - assert!(matches!(reason, SaveReason::EventsExceeded)); - assert_eq!( - changes - .iter() - .map(|change| change.frs.raw()) - .collect::>(), - [10, 11], - "Save must carry the buffered changes in send order", - ); - - // Pending buffer for 'C' is gone after the drain. - assert!( - pending_frs_for_letter(&sink, uffs_mft::platform::DriveLetter::C).is_none(), - "trigger_save must remove the per-letter pending entry", - ); - } - - /// Pin: `trigger_save` on a letter with no prior `accept` still - /// emits `ApplyMsg::Save { changes: [] }`. The applier's - /// empty-batch fast path then short-circuits to a no-op. - #[tokio::test] - async fn trigger_save_with_no_pending_sends_empty_changes() { - let (sink, mut rx) = RegistryPatchSink::new_for_test(); - - sink.trigger_save(uffs_mft::platform::DriveLetter::Z, SaveReason::AgeElapsed); - - let ApplyMsg::Save { - letter, - reason, - changes, - } = rx.try_recv().expect("trigger_save must enqueue Save") - else { - panic!("expected ApplyMsg::Save, got Wrap"); - }; - assert_eq!(letter, uffs_mft::platform::DriveLetter::Z); - assert!(matches!(reason, SaveReason::AgeElapsed)); - assert!( - changes.is_empty(), - "Save must carry an empty Vec when no events were buffered", - ); - } - - /// Pin: `journal_wrapped` clears the pending buffer for the - /// letter and emits `ApplyMsg::Wrap`. A subsequent - /// `trigger_save` then sees an empty buffer (no replay of the - /// stale events past the wrap). - #[tokio::test] - async fn journal_wrapped_discards_pending_buffer_and_sends_wrap() { - let (sink, mut rx) = RegistryPatchSink::new_for_test(); - - sink.accept(uffs_mft::platform::DriveLetter::C, &[ - make_change(5), - make_change(6), - ]); - sink.journal_wrapped(uffs_mft::platform::DriveLetter::C); - - let ApplyMsg::Wrap { letter } = rx.try_recv().expect("journal_wrapped must enqueue Wrap") - else { - panic!("expected ApplyMsg::Wrap, got Save"); - }; - assert_eq!(letter, uffs_mft::platform::DriveLetter::C); - - assert!( - pending_frs_for_letter(&sink, uffs_mft::platform::DriveLetter::C).is_none(), - "journal_wrapped must discard the per-letter pending entry", - ); - - // A subsequent trigger_save must see an empty buffer. - sink.trigger_save(uffs_mft::platform::DriveLetter::C, SaveReason::AgeElapsed); - let ApplyMsg::Save { - changes: post_wrap_changes, - .. - } = rx.try_recv().expect("trigger_save must enqueue Save") - else { - panic!("expected ApplyMsg::Save after wrap, got another Wrap"); - }; - assert!( - post_wrap_changes.is_empty(), - "post-wrap trigger_save must drain to empty (stale events discarded)", - ); - } - - /// Pin: per-letter buffers are independent. Accepting events on - /// 'C' must not leak into 'D's buffer or pending state. - #[tokio::test] - async fn pending_buffers_are_per_letter() { - let (sink, mut rx) = RegistryPatchSink::new_for_test(); - - sink.accept(uffs_mft::platform::DriveLetter::C, &[make_change(1)]); - sink.accept(uffs_mft::platform::DriveLetter::D, &[ - make_change(2), - make_change(3), - ]); - - // Drain 'C' first — should NOT include any of 'D's events. - sink.trigger_save( - uffs_mft::platform::DriveLetter::C, - SaveReason::EventsExceeded, - ); - let ApplyMsg::Save { - letter: c_letter, - changes: c_changes, - .. - } = rx.try_recv().expect("Save for 'C'") - else { - panic!("expected ApplyMsg::Save for 'C'"); - }; - assert_eq!(c_letter, uffs_mft::platform::DriveLetter::C); - assert_eq!( - c_changes - .iter() - .map(|change| change.frs.raw()) - .collect::>(), - [1], - ); - - // 'D's buffer must still hold its events. - sink.trigger_save( - uffs_mft::platform::DriveLetter::D, - SaveReason::EventsExceeded, - ); - let ApplyMsg::Save { - letter: d_letter, - changes: d_changes, - .. - } = rx.try_recv().expect("Save for 'D'") - else { - panic!("expected ApplyMsg::Save for 'D'"); - }; - assert_eq!(d_letter, uffs_mft::platform::DriveLetter::D); - assert_eq!( - d_changes - .iter() - .map(|change| change.frs.raw()) - .collect::>(), - [2, 3], - "'D's buffer must be preserved across 'C's drain", - ); - } - - /// Drop all `Arc` instances → the sender side - /// of the channel is dropped → the applier's `rx.recv().await` - /// returns `None` → the task exits cleanly. Pins the graceful- - /// shutdown shape used when the daemon's load task tears down - /// without dropping `IndexManager` first. - #[tokio::test] - async fn applier_exits_when_sink_dropped() { - let idx = fresh_index_manager(); - let (sink, applier) = RegistryPatchSink::spawn_with_applier(&idx); - - // Sanity: applier is still running before we drop the sink. - assert!(!applier.is_finished()); - - drop(sink); - - // Should converge within the test deadline; in practice - // it's <1 ms on Mac (single context-switch). - let join_result = tokio::time::timeout(core::time::Duration::from_secs(2), applier).await; - assert!( - join_result.is_ok(), - "applier must exit within 2 s of last sender dropping", - ); - join_result - .expect("timeout deadline must not elapse") - .expect("applier must not panic"); - } - - /// Drop the last `Arc` → the applier's - /// `Weak::upgrade()` returns `None` on the next message → - /// the task exits cleanly. Pins the graceful-shutdown shape - /// used when the daemon's main lifecycle drops the index before - /// the load task observes the sink-side drop. - #[tokio::test] - async fn applier_exits_when_index_manager_dropped() { - let idx = fresh_index_manager(); - let (sink, applier) = RegistryPatchSink::spawn_with_applier(&idx); - - drop(idx); - // Send one message so the applier wakes up, observes the - // dropped Weak, and exits. Without this the applier blocks - // on `recv().await` forever (no Weak-watch primitive in tokio). - sink.trigger_save( - uffs_mft::platform::DriveLetter::C, - SaveReason::EventsExceeded, - ); - - let join_result = tokio::time::timeout(core::time::Duration::from_secs(2), applier).await; - assert!( - join_result.is_ok(), - "applier must exit within 2 s of IndexManager Arc dropping + one wake message", - ); - join_result - .expect("timeout deadline must not elapse") - .expect("applier must not panic"); - } - - /// Pin that `save_reason_str` produces stable diagnostic strings. - /// These strings are surfaced in the production tracing target - /// `shard.journal` and consumed by operator runbooks — drift - /// would silently break grep-based monitoring, so the round-trip - /// is asserted directly. - #[test] - fn save_reason_str_maps_each_variant() { - assert_eq!( - save_reason_str(SaveReason::EventsExceeded), - "events-exceeded" - ); - assert_eq!(save_reason_str(SaveReason::AgeElapsed), "age-elapsed"); - } - - /// Pin the multi-message FIFO ordering contract: a sink that - /// buffers many messages before the applier can drain them all - /// must process them in send order. Out-of-order applies don't - /// corrupt (each per-letter `handle_journal_save` is independent), - /// but the FIFO contract is a regression-net for any future - /// change that swaps `mpsc::unbounded_channel` for an unordered - /// surface. - #[tokio::test] - async fn applier_drains_in_fifo_order() { - let idx = fresh_index_manager(); - let (sink, applier) = RegistryPatchSink::spawn_with_applier(&idx); - - // Burst-send 5 trigger_save calls without prior `accept`. - // Each one drains an empty pending buffer and ships - // `ApplyMsg::Save { changes: [] }`; the applier's empty-batch - // fast path in `handle_journal_save` short-circuits to a - // debug-log no-op. The test verifies the applier processes - // all 5 in order before the sink's drop closes the channel. - for letter in [ - uffs_mft::platform::DriveLetter::C, - uffs_mft::platform::DriveLetter::D, - uffs_mft::platform::DriveLetter::E, - uffs_mft::platform::DriveLetter::F, - uffs_mft::platform::DriveLetter::G, - ] { - sink.trigger_save(letter, SaveReason::EventsExceeded); - } - - // Drop the sink → applier drains remaining messages → exits. - drop(sink); - let join_result = tokio::time::timeout(core::time::Duration::from_secs(5), applier).await; - assert!( - join_result.is_ok(), - "applier must finish draining within 5 s (5 empty-batch no-ops, no real I/O)", - ); - join_result - .expect("timeout deadline must not elapse") - .expect("applier must not panic"); - } -} +mod tests; diff --git a/crates/uffs-daemon/src/cache/journal_sink/tests.rs b/crates/uffs-daemon/src/cache/journal_sink/tests.rs new file mode 100644 index 000000000..e025861bc --- /dev/null +++ b/crates/uffs-daemon/src/cache/journal_sink/tests.rs @@ -0,0 +1,481 @@ +// SPDX-License-Identifier: MPL-2.0 +// Copyright (c) 2025-2026 SKY, LLC. + +//! Unit tests for [`super::RegistryPatchSink`] — the journal-applier sink. +//! +//! Extracted from the parent `journal_sink.rs` to keep that file under the +//! workspace 800-LOC file-size policy; `use super::*` keeps every private +//! item (`ApplyMsg`, `dispatch_msg`, `save_reason_str`, …) in scope exactly +//! as it was inline. + +use super::*; + +/// Construct a fresh [`IndexManager`] suitable for sink lifecycle +/// tests. No drives loaded — the applier exits cleanly when +/// the strong-Arc drops, regardless of whether refresh messages +/// were drained, so tests don't need a populated registry. +fn fresh_index_manager() -> Arc { + let (event_tx, _event_rx) = crate::events::event_channel(); + Arc::new(IndexManager::new( + None, + event_tx, + Arc::new(crate::config::Config::default()), + )) +} + +/// In-memory [`CursorStore`] that records every `store` call. +/// Lets the applier tests assert *whether* a cursor was persisted +/// (the lockstep safety property) without touching the disk. +/// `load` always returns 0 (tests seed nothing). +struct RecordingCursorStore { + log: Mutex>, +} + +impl RecordingCursorStore { + fn new() -> Arc { + Arc::new(Self { + log: Mutex::new(Vec::new()), + }) + } + + fn store_log(&self) -> Vec<(uffs_mft::platform::DriveLetter, u64)> { + self.log + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .clone() + } +} + +impl CursorStore for RecordingCursorStore { + fn load(&self, _letter: uffs_mft::platform::DriveLetter) -> u64 { + 0 + } + + fn store(&self, letter: uffs_mft::platform::DriveLetter, cursor: u64) { + self.log + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .push((letter, cursor)); + } +} + +/// Construct a [`FileChange`] fixture with a unique FRS for +/// per-event identification. Fields other than `frs` use +/// `FileChange::default()` because the sink doesn't inspect them +/// — only `IndexManager::handle_journal_save` does (covered in +/// `cache::shard::tests` and the patch end-to-end suite). +/// +/// Takes a raw `u64` because FRS values are most naturally +/// written as integer literals in test fixtures; lifts to the +/// typed `Frs` at this single construction boundary so the rest +/// of the test surface keeps the typed contract. +fn make_change(frs: u64) -> FileChange { + FileChange { + frs: frs.into(), + ..FileChange::default() + } +} + +/// Snapshot the per-letter pending buffer's event FRS sequence, +/// dropping the `lock_pending()` guard before returning so the +/// caller's assertions don't hold the mutex (satisfies +/// `clippy::significant_drop_tightening` in tests). +/// +/// Demotes typed `Frs` → raw `u64` at the snapshot boundary so +/// assertion literals stay as integer arrays. +fn pending_frs_for_letter( + sink: &RegistryPatchSink, + letter: uffs_mft::platform::DriveLetter, +) -> Option> { + let guard = sink.lock_pending(); + guard + .get(&letter) + .map(|buf| buf.iter().map(|change| change.frs.raw()).collect()) +} + +/// Pin: `accept` appends to the per-letter pending buffer and +/// does NOT enqueue a message on the applier channel. +#[tokio::test] +async fn accept_buffers_changes_without_enqueueing() { + let (sink, mut rx) = RegistryPatchSink::new_for_test(); + + let accepted = sink.accept(uffs_mft::platform::DriveLetter::C, &[ + make_change(100), + make_change(101), + ]); + assert!(accepted, "accept must return true optimistically"); + + // The pending buffer holds the two events for letter 'C'. + let buf = pending_frs_for_letter(&sink, uffs_mft::platform::DriveLetter::C) + .expect("accept must populate pending buffer for 'C'"); + assert_eq!( + buf, + [100, 101], + "accept must preserve event order in the buffer", + ); + + // Channel is empty: accept did not enqueue. + assert!( + rx.try_recv().is_err(), + "accept must NOT enqueue an ApplyMsg", + ); +} + +/// Pin: a sequence of `accept` calls for the same letter +/// accumulates into the same buffer — no per-call drain or +/// truncation. +#[tokio::test] +async fn multiple_accepts_accumulate_in_pending() { + let (sink, _rx) = RegistryPatchSink::new_for_test(); + + sink.accept(uffs_mft::platform::DriveLetter::C, &[make_change(1)]); + sink.accept(uffs_mft::platform::DriveLetter::C, &[ + make_change(2), + make_change(3), + ]); + sink.accept(uffs_mft::platform::DriveLetter::C, &[make_change(4)]); + + let buf = pending_frs_for_letter(&sink, uffs_mft::platform::DriveLetter::C) + .expect("buffer for 'C' must exist"); + assert_eq!( + buf, + [1, 2, 3, 4], + "consecutive accepts must accumulate in send order", + ); +} + +/// Pin: `trigger_save` drains the pending buffer for `letter` +/// and ships it inside `ApplyMsg::Save { changes }`. The buffer +/// for `letter` is cleared after the drain. +#[tokio::test] +async fn trigger_save_drains_pending_into_save_message() { + let (sink, mut rx) = RegistryPatchSink::new_for_test(); + + sink.accept(uffs_mft::platform::DriveLetter::C, &[ + make_change(10), + make_change(11), + ]); + sink.trigger_save( + uffs_mft::platform::DriveLetter::C, + SaveReason::EventsExceeded, + 4242, + ); + + let ApplyMsg::Save { + letter, + reason, + changes, + cursor, + } = rx.try_recv().expect("trigger_save must enqueue Save") + else { + panic!("expected ApplyMsg::Save, got Wrap"); + }; + assert_eq!(letter, uffs_mft::platform::DriveLetter::C); + assert!(matches!(reason, SaveReason::EventsExceeded)); + assert_eq!(cursor, 4242, "Save must carry the tick cursor"); + assert_eq!( + changes + .iter() + .map(|change| change.frs.raw()) + .collect::>(), + [10, 11], + "Save must carry the buffered changes in send order", + ); + + // Pending buffer for 'C' is gone after the drain. + assert!( + pending_frs_for_letter(&sink, uffs_mft::platform::DriveLetter::C).is_none(), + "trigger_save must remove the per-letter pending entry", + ); +} + +/// Pin: `trigger_save` on a letter with no prior `accept` still +/// emits `ApplyMsg::Save { changes: [] }`. The applier's +/// empty-batch fast path then short-circuits to a no-op. +#[tokio::test] +async fn trigger_save_with_no_pending_sends_empty_changes() { + let (sink, mut rx) = RegistryPatchSink::new_for_test(); + + sink.trigger_save( + uffs_mft::platform::DriveLetter::Z, + SaveReason::AgeElapsed, + 77, + ); + + let ApplyMsg::Save { + letter, + reason, + changes, + cursor, + } = rx.try_recv().expect("trigger_save must enqueue Save") + else { + panic!("expected ApplyMsg::Save, got Wrap"); + }; + assert_eq!(letter, uffs_mft::platform::DriveLetter::Z); + assert_eq!( + cursor, 77, + "Save must carry the tick cursor even with no pending events" + ); + assert!(matches!(reason, SaveReason::AgeElapsed)); + assert!( + changes.is_empty(), + "Save must carry an empty Vec when no events were buffered", + ); +} + +/// Pin: `journal_wrapped` clears the pending buffer for the +/// letter and emits `ApplyMsg::Wrap`. A subsequent +/// `trigger_save` then sees an empty buffer (no replay of the +/// stale events past the wrap). +#[tokio::test] +async fn journal_wrapped_discards_pending_buffer_and_sends_wrap() { + let (sink, mut rx) = RegistryPatchSink::new_for_test(); + + sink.accept(uffs_mft::platform::DriveLetter::C, &[ + make_change(5), + make_change(6), + ]); + sink.journal_wrapped(uffs_mft::platform::DriveLetter::C); + + let ApplyMsg::Wrap { letter } = rx.try_recv().expect("journal_wrapped must enqueue Wrap") + else { + panic!("expected ApplyMsg::Wrap, got Save"); + }; + assert_eq!(letter, uffs_mft::platform::DriveLetter::C); + + assert!( + pending_frs_for_letter(&sink, uffs_mft::platform::DriveLetter::C).is_none(), + "journal_wrapped must discard the per-letter pending entry", + ); + + // A subsequent trigger_save must see an empty buffer. + sink.trigger_save( + uffs_mft::platform::DriveLetter::C, + SaveReason::AgeElapsed, + 0, + ); + let ApplyMsg::Save { + changes: post_wrap_changes, + .. + } = rx.try_recv().expect("trigger_save must enqueue Save") + else { + panic!("expected ApplyMsg::Save after wrap, got another Wrap"); + }; + assert!( + post_wrap_changes.is_empty(), + "post-wrap trigger_save must drain to empty (stale events discarded)", + ); +} + +/// Pin: per-letter buffers are independent. Accepting events on +/// 'C' must not leak into 'D's buffer or pending state. +#[tokio::test] +async fn pending_buffers_are_per_letter() { + let (sink, mut rx) = RegistryPatchSink::new_for_test(); + + sink.accept(uffs_mft::platform::DriveLetter::C, &[make_change(1)]); + sink.accept(uffs_mft::platform::DriveLetter::D, &[ + make_change(2), + make_change(3), + ]); + + // Drain 'C' first — should NOT include any of 'D's events. + sink.trigger_save( + uffs_mft::platform::DriveLetter::C, + SaveReason::EventsExceeded, + 10, + ); + let ApplyMsg::Save { + letter: c_letter, + changes: c_changes, + .. + } = rx.try_recv().expect("Save for 'C'") + else { + panic!("expected ApplyMsg::Save for 'C'"); + }; + assert_eq!(c_letter, uffs_mft::platform::DriveLetter::C); + assert_eq!( + c_changes + .iter() + .map(|change| change.frs.raw()) + .collect::>(), + [1], + ); + + // 'D's buffer must still hold its events. + sink.trigger_save( + uffs_mft::platform::DriveLetter::D, + SaveReason::EventsExceeded, + 20, + ); + let ApplyMsg::Save { + letter: d_letter, + changes: d_changes, + .. + } = rx.try_recv().expect("Save for 'D'") + else { + panic!("expected ApplyMsg::Save for 'D'"); + }; + assert_eq!(d_letter, uffs_mft::platform::DriveLetter::D); + assert_eq!( + d_changes + .iter() + .map(|change| change.frs.raw()) + .collect::>(), + [2, 3], + "'D's buffer must be preserved across 'C's drain", + ); +} + +/// Drop all `Arc` instances → the sender side +/// of the channel is dropped → the applier's `rx.recv().await` +/// returns `None` → the task exits cleanly. Pins the graceful- +/// shutdown shape used when the daemon's load task tears down +/// without dropping `IndexManager` first. +#[tokio::test] +async fn applier_exits_when_sink_dropped() { + let idx = fresh_index_manager(); + let (sink, applier) = RegistryPatchSink::spawn_with_applier(&idx, RecordingCursorStore::new()); + + // Sanity: applier is still running before we drop the sink. + assert!(!applier.is_finished()); + + drop(sink); + + // Should converge within the test deadline; in practice + // it's <1 ms on Mac (single context-switch). + let join_result = tokio::time::timeout(core::time::Duration::from_secs(2), applier).await; + assert!( + join_result.is_ok(), + "applier must exit within 2 s of last sender dropping", + ); + join_result + .expect("timeout deadline must not elapse") + .expect("applier must not panic"); +} + +/// Drop the last `Arc` → the applier's +/// `Weak::upgrade()` returns `None` on the next message → +/// the task exits cleanly. Pins the graceful-shutdown shape +/// used when the daemon's main lifecycle drops the index before +/// the load task observes the sink-side drop. +#[tokio::test] +async fn applier_exits_when_index_manager_dropped() { + let idx = fresh_index_manager(); + let (sink, applier) = RegistryPatchSink::spawn_with_applier(&idx, RecordingCursorStore::new()); + + drop(idx); + // Send one message so the applier wakes up, observes the + // dropped Weak, and exits. Without this the applier blocks + // on `recv().await` forever (no Weak-watch primitive in tokio). + sink.trigger_save( + uffs_mft::platform::DriveLetter::C, + SaveReason::EventsExceeded, + 1, + ); + + let join_result = tokio::time::timeout(core::time::Duration::from_secs(2), applier).await; + assert!( + join_result.is_ok(), + "applier must exit within 2 s of IndexManager Arc dropping + one wake message", + ); + join_result + .expect("timeout deadline must not elapse") + .expect("applier must not panic"); +} + +/// Pin that `save_reason_str` produces stable diagnostic strings. +/// These strings are surfaced in the production tracing target +/// `shard.journal` and consumed by operator runbooks — drift +/// would silently break grep-based monitoring, so the round-trip +/// is asserted directly. +#[test] +fn save_reason_str_maps_each_variant() { + assert_eq!( + save_reason_str(SaveReason::EventsExceeded), + "events-exceeded" + ); + assert_eq!(save_reason_str(SaveReason::AgeElapsed), "age-elapsed"); +} + +/// Pin the multi-message FIFO ordering contract: a sink that +/// buffers many messages before the applier can drain them all +/// must process them in send order. Out-of-order applies don't +/// corrupt (each per-letter `handle_journal_save` is independent), +/// but the FIFO contract is a regression-net for any future +/// change that swaps `mpsc::unbounded_channel` for an unordered +/// surface. +#[tokio::test] +async fn applier_drains_in_fifo_order() { + let idx = fresh_index_manager(); + let (sink, applier) = RegistryPatchSink::spawn_with_applier(&idx, RecordingCursorStore::new()); + + // Burst-send 5 trigger_save calls without prior `accept`. + // Each one drains an empty pending buffer and ships + // `ApplyMsg::Save { changes: [] }`; the applier's empty-batch + // fast path in `handle_journal_save` short-circuits to a + // debug-log no-op. The test verifies the applier processes + // all 5 in order before the sink's drop closes the channel. + for letter in [ + uffs_mft::platform::DriveLetter::C, + uffs_mft::platform::DriveLetter::D, + uffs_mft::platform::DriveLetter::E, + uffs_mft::platform::DriveLetter::F, + uffs_mft::platform::DriveLetter::G, + ] { + sink.trigger_save(letter, SaveReason::EventsExceeded, 1); + } + + // Drop the sink → applier drains remaining messages → exits. + drop(sink); + let join_result = tokio::time::timeout(core::time::Duration::from_secs(5), applier).await; + assert!( + join_result.is_ok(), + "applier must finish draining within 5 s (5 empty-batch no-ops, no real I/O)", + ); + join_result + .expect("timeout deadline must not elapse") + .expect("applier must not panic"); +} + +/// Lockstep safety pin: when `handle_journal_save` does NOT save a +/// body (here: no shard is registered for the letter, so the save +/// returns `false` — the same outcome as a Parked shard), the +/// applier must NOT persist the cursor. This is the invariant the +/// startup warm-load guard (`cache::guarded_load`) relies on: the +/// on-disk cursor never outruns the on-disk compact-cache body, so +/// the guard can fast-serve the body and let the background loop +/// converge the bounded delta without stranding events. +#[tokio::test] +async fn no_warm_shard_save_does_not_persist_cursor() { + let idx = fresh_index_manager(); // no drives registered + let cursor_store = RecordingCursorStore::new(); + let (sink, applier) = RegistryPatchSink::spawn_with_applier( + &idx, + Arc::clone(&cursor_store) as Arc, + ); + + // A save tick for an unregistered letter: handle_journal_save + // finds no shard and returns false → cursor must stay unwritten. + sink.accept(uffs_mft::platform::DriveLetter::C, &[make_change(1)]); + sink.trigger_save( + uffs_mft::platform::DriveLetter::C, + SaveReason::EventsExceeded, + 9999, + ); + + // Drain + join the applier so the (a)synchronous dispatch has + // certainly run before we inspect the store. + drop(sink); + let join_result = tokio::time::timeout(core::time::Duration::from_secs(5), applier).await; + join_result + .expect("applier must exit within 5 s") + .expect("applier must not panic"); + + assert!( + cursor_store.store_log().is_empty(), + "cursor must NOT be persisted when the body save no-ops (no warm shard); \ + got {:?}", + cursor_store.store_log(), + ); +} diff --git a/crates/uffs-daemon/src/cache/mod.rs b/crates/uffs-daemon/src/cache/mod.rs index 9e4e9460f..f42bab93e 100644 --- a/crates/uffs-daemon/src/cache/mod.rs +++ b/crates/uffs-daemon/src/cache/mod.rs @@ -32,6 +32,7 @@ pub(crate) mod background_io; pub(crate) mod body_loader; pub(crate) mod cache_cleaner; pub(crate) mod cursor_store; +pub(crate) mod guarded_load; pub(crate) mod journal_loop; pub(crate) mod journal_sink; pub(crate) mod policy; diff --git a/crates/uffs-daemon/src/index/loading.rs b/crates/uffs-daemon/src/index/loading.rs index 7adf22cde..2e07a1be0 100644 --- a/crates/uffs-daemon/src/index/loading.rs +++ b/crates/uffs-daemon/src/index/loading.rs @@ -282,10 +282,11 @@ impl IndexManager { tracing::info!(drive = %letter, "Loading live drive (parallel)"); eprintln!("[diag] load_live_drives: spawning thread for drive={letter}"); join_set.spawn_blocking(move || { - let result = uffs_core::compact::load_drive( - &uffs_core::compact::MftSource::Live(letter), - no_cache, - ); + // Guarded warm load: serve the on-disk compact cache fast + // when the background USN journal loop can converge the + // bounded delta, falling back to a synchronous rebuild + // only when it cannot (see `cache::guarded_load`). + let result = crate::cache::guarded_load::load_live_drive(letter, no_cache); (letter, result) }); } diff --git a/crates/uffs-daemon/src/index/search.rs b/crates/uffs-daemon/src/index/search.rs index 5402727ed..64b66d945 100644 --- a/crates/uffs-daemon/src/index/search.rs +++ b/crates/uffs-daemon/src/index/search.rs @@ -12,16 +12,13 @@ //! Search execution: query dispatch, profile construction, and drive info. use core::sync::atomic::Ordering; -use std::io::Write as _; use std::time::Instant; use uffs_client::protocol::response::{ DriveProfile, SearchPayload, SearchProfile, SearchResponse, SearchRow, }; use uffs_client::protocol::{SearchFilterMode, SearchParams, SearchResponseMode}; -use uffs_core::search::backend::{ - DisplayRow, FilterMode, PhaseTimings, SearchRequest, SortSpec, search_index, -}; +use uffs_core::search::backend::{FilterMode, PhaseTimings, SearchRequest, SortSpec, search_index}; use uffs_core::search::field::FieldId; use uffs_core::search::filters::{SearchFilterParams, SearchFilters}; @@ -360,7 +357,7 @@ impl IndexManager { // reuses `filtered_rows` — `spawn_blocking` would force an // expensive clone or an `Arc>` refactor. let write_result = tokio::task::block_in_place(|| { - Self::write_rows_to_file(&filtered_rows, output_path, &output_config) + write_rows_to_file(&filtered_rows, output_path, &output_config) }); let write_us = t_write.map_or(0, |ts| ts.elapsed().as_micros()); @@ -653,92 +650,6 @@ impl IndexManager { drives: drive_profiles, } } - - // ── Direct file output (OPT-4) ────────────────────────────────── - - /// Write `DisplayRow`s directly to a file, bypassing `SearchRow` and IPC. - /// - /// Uses the same `OutputConfig::write_display_rows` that the CLI uses, - /// so all formatting options (separator, quotes, header, pos/neg, - /// columns, timestamps) produce identical output. - /// - /// Atomic write: writes to a `.uffs.tmp` sibling file, then renames - /// to the target after a `BufWriter::flush`. No `fsync` — - /// `--out=` is reproducible search output, so the tmp+rename - /// dance protects against partial-file exposure during normal - /// writes but power-loss durability is intentionally not provided. - /// See the inline comment in the body and §Run 7 C / §Run 8 of - /// `docs/research/perf-phase2-measurement-plan.md` for the - /// measurement that motivated this trade-off. Zero rows → no - /// file is created. - fn write_rows_to_file( - rows: &[DisplayRow], - path: &str, - output_config: &uffs_core::output::OutputConfig, - ) -> Result { - use std::io::BufWriter; - - use rand::Rng as _; - - // Zero results → don't create the file at all. - if rows.is_empty() { - return Ok(0); - } - - let target = std::path::Path::new(path); - - // Randomised temp name in the same directory (same-FS rename stays - // atomic). Born 0600 via `create_new_secure_file`, which refuses to - // follow a symlink pre-planted at a guessed temp path. `file_name` - // is an `Option`, not a `Result` — `unwrap_or_default` is not an - // unwrap-lint violation. - let mut suffix_bytes = [0_u8; 8]; - rand::rng().fill_bytes(&mut suffix_bytes); - let suffix = u64::from_le_bytes(suffix_bytes); - let file_name = target.file_name().unwrap_or_default(); - let tmp_name = format!("{}.{:016x}.uffs.tmp", file_name.to_string_lossy(), suffix); - let tmp_path = target.with_file_name(tmp_name); - - // Write to temp file — target is untouched until rename. - let file = uffs_security::fs::create_new_secure_file(&tmp_path)?; - let mut writer = BufWriter::with_capacity(256 * 1024, file); - - let write_result = output_config - .write_display_rows(rows, &mut writer) - .map_err(std::io::Error::other); - - // On write error, clean up the temp file and propagate. - if let Err(err) = write_result { - drop(writer); - let _cleanup: Result<(), std::io::Error> = std::fs::remove_file(&tmp_path); - return Err(err); - } - - // Flush the BufWriter and close the underlying file. - // - // We deliberately skip `sync_all()` here. `--out=` is - // a user-requested export of search results; the data is - // reproducible from the MFT index in ~100 ms, so paying a - // 5-15 ms `fsync` per query for power-loss durability is not - // worth it — a power cut would just leave a 0-byte file and - // the user can simply re-run the query. The atomic - // tmp+rename below still prevents partial-file exposure - // during normal writes. See - // `docs/research/perf-phase2-measurement-plan.md` §Run 7 C / - // §Run 8 for the measurement that motivated dropping the - // sync. - writer.flush()?; - writer - .into_inner() - .map_err(std::io::IntoInnerError::into_error)?; - // The File temporary above is dropped at the semicolon, - // closing the OS handle before the rename below. - - // Atomic rename: target appears only with complete data. - std::fs::rename(&tmp_path, target)?; - - Ok(rows.len()) - } } /// Decide the backend scan limit (the cap applied *before* the daemon's @@ -788,6 +699,14 @@ pub(crate) use output_config::build_output_config; mod predicates; use predicates::build_query_predicates; +// The `--out=` file-export writer lives in a sibling file to keep +// `search.rs` under the 800-line policy ceiling. It was a `Self`-less +// associated fn, so the single call site in `search()` above invokes the +// free function directly. +#[path = "search_file_sink.rs"] +mod file_sink; +use file_sink::write_rows_to_file; + // The inline `tests` module lives in a sibling file to keep `search.rs` // under the 800-line policy ceiling. `#[path]` keeps the test module // path identical (`crate::index::search::tests`), so `super::*` inside diff --git a/crates/uffs-daemon/src/index/search_file_sink.rs b/crates/uffs-daemon/src/index/search_file_sink.rs new file mode 100644 index 000000000..5d7ac73ed --- /dev/null +++ b/crates/uffs-daemon/src/index/search_file_sink.rs @@ -0,0 +1,104 @@ +// SPDX-License-Identifier: MPL-2.0 +// Copyright (c) 2025-2026 SKY, LLC. + +//! Direct file output for `--out=` search exports (OPT-4). +//! +//! Extracted from the parent `search.rs` to keep that file under the +//! workspace 800-LOC file-size policy. The single caller in +//! `IndexManager::search` invokes `write_rows_to_file` directly (it was a +//! `Self`-less associated fn), so the move is a pure relocation with no +//! behavioural change. + +use uffs_core::search::backend::DisplayRow; + +/// Write `DisplayRow`s directly to a file, bypassing `SearchRow` and IPC. +/// +/// Uses the same `OutputConfig::write_display_rows` that the CLI uses, +/// so all formatting options (separator, quotes, header, pos/neg, +/// columns, timestamps) produce identical output. +/// +/// Atomic write: writes to a `.uffs.tmp` sibling file, then renames +/// to the target after a `BufWriter::flush`. No `fsync` — +/// `--out=` is reproducible search output, so the tmp+rename +/// dance protects against partial-file exposure during normal +/// writes but power-loss durability is intentionally not provided. +/// See the inline comment in the body and §Run 7 C / §Run 8 of +/// `docs/research/perf-phase2-measurement-plan.md` for the +/// measurement that motivated this trade-off. Zero rows → no +/// file is created. +pub(super) fn write_rows_to_file( + rows: &[DisplayRow], + path: &str, + output_config: &uffs_core::output::OutputConfig, +) -> Result { + use std::io::{BufWriter, Write as _}; + + use rand::Rng as _; + + // Zero results → don't create the file at all. + if rows.is_empty() { + return Ok(0); + } + + let target = std::path::Path::new(path); + + // Randomised temp name in the same directory (same-FS rename stays + // atomic). The random suffix + exclusive `create_new` open refuse to + // follow a symlink pre-planted at a guessed temp path. `file_name` + // is an `Option`, not a `Result` — `unwrap_or_default` is not an + // unwrap-lint violation. + let mut suffix_bytes = [0_u8; 8]; + rand::rng().fill_bytes(&mut suffix_bytes); + let suffix = u64::from_le_bytes(suffix_bytes); + let file_name = target.file_name().unwrap_or_default(); + let tmp_name = format!("{}.{:016x}.uffs.tmp", file_name.to_string_lossy(), suffix); + let tmp_path = target.with_file_name(tmp_name); + + // Write to temp file — target is untouched until rename. + // + // `--out=` is a user-chosen export, NOT a secret: it must adopt + // the directory's normal permissions, not the daemon owner's. We use + // `create_new_file_exclusive` (exclusive create, no owner-only ACL) + // rather than `create_new_secure_file` — the latter applies a Windows + // owner-only ACL that historically shelled out to `icacls.exe`, + // adding a per-query process-spawn (~tens of ms) to every result + // write. The exclusive create still closes the symlink/TOCTOU window. + let file = uffs_security::fs::create_new_file_exclusive(&tmp_path)?; + let mut writer = BufWriter::with_capacity(256 * 1024, file); + + let write_result = output_config + .write_display_rows(rows, &mut writer) + .map_err(std::io::Error::other); + + // On write error, clean up the temp file and propagate. + if let Err(err) = write_result { + drop(writer); + let _cleanup: Result<(), std::io::Error> = std::fs::remove_file(&tmp_path); + return Err(err); + } + + // Flush the BufWriter and close the underlying file. + // + // We deliberately skip `sync_all()` here. `--out=` is + // a user-requested export of search results; the data is + // reproducible from the MFT index in ~100 ms, so paying a + // 5-15 ms `fsync` per query for power-loss durability is not + // worth it — a power cut would just leave a 0-byte file and + // the user can simply re-run the query. The atomic + // tmp+rename below still prevents partial-file exposure + // during normal writes. See + // `docs/research/perf-phase2-measurement-plan.md` §Run 7 C / + // §Run 8 for the measurement that motivated dropping the + // sync. + writer.flush()?; + writer + .into_inner() + .map_err(std::io::IntoInnerError::into_error)?; + // The File temporary above is dropped at the semicolon, + // closing the OS handle before the rename below. + + // Atomic rename: target appears only with complete data. + std::fs::rename(&tmp_path, target)?; + + Ok(rows.len()) +} diff --git a/crates/uffs-daemon/src/index/search_tests.rs b/crates/uffs-daemon/src/index/search_tests.rs index 11f0b2d09..63b1fed3b 100644 --- a/crates/uffs-daemon/src/index/search_tests.rs +++ b/crates/uffs-daemon/src/index/search_tests.rs @@ -150,7 +150,7 @@ fn write_rows_to_file_ignores_pre_planted_predictable_tmp() { )]; let cfg = build_output_config(&SearchParams::default()); - let written = IndexManager::write_rows_to_file(&rows, target.to_str().unwrap(), &cfg).unwrap(); + let written = write_rows_to_file(&rows, target.to_str().unwrap(), &cfg).unwrap(); assert_eq!(written, 1); // Target exists with the row's filename in it. diff --git a/crates/uffs-daemon/src/lib.rs b/crates/uffs-daemon/src/lib.rs index 222237b5f..2917dd6d9 100644 --- a/crates/uffs-daemon/src/lib.rs +++ b/crates/uffs-daemon/src/lib.rs @@ -525,20 +525,26 @@ async fn spawn_journal_loops_for_warm_shards( }; use cache::journal_sink::RegistryPatchSink; - let (sink, applier_handle) = RegistryPatchSink::spawn_with_applier(idx); - let sink_dyn: Arc = sink; - // Cursor-store choice: Windows persists per-drive cursors next // to the compact cache; Mac/Linux uses the always-zero // NullCursorStore (no journal → no cursor → fall back to // "start from journal head" semantics, which is the correct // no-op on those platforms). + // + // Shared with the applier: the sink persists each loop's cursor + // in lockstep with a successful compact-cache body save (the + // loops only seed from it and reset it on wrap), so the on-disk + // cursor never outruns the on-disk body. See `journal_sink`. let cursor_store: Arc = if cfg!(windows) { Arc::new(DiskCursorStore::new(uffs_mft::cache::cache_dir())) } else { Arc::new(NullCursorStore) }; + let (sink, applier_handle) = + RegistryPatchSink::spawn_with_applier(idx, Arc::clone(&cursor_store)); + let sink_dyn: Arc = sink; + let config = JournalLoopConfig::default(); let letters = idx.loaded_drive_letters().await; tracing::info!( diff --git a/crates/uffs-security/src/fs.rs b/crates/uffs-security/src/fs.rs index 9572a5e49..340b77121 100644 --- a/crates/uffs-security/src/fs.rs +++ b/crates/uffs-security/src/fs.rs @@ -60,6 +60,35 @@ pub fn create_new_secure_file(path: &Path) -> io::Result { } } +/// Create a brand-new file at a **user-chosen output path**, failing if the +/// path already exists (including as a symlink/pre-planted target). +/// +/// Unlike [`create_new_secure_file`], this does **not** apply an owner-only +/// ACL (Windows) or `0o600` mode (Unix). The destination is a path the user +/// explicitly asked us to write (e.g. `--out=`), so the file should +/// adopt the normal permissions of the directory the user chose rather than +/// being locked to the daemon's owner — forcing owner-only here is both +/// surprising (e.g. exporting into a shared folder) and, on Windows, costly: +/// the owner-only ACL path historically shelled out to `icacls.exe`, adding a +/// process-spawn (~tens of ms) to **every** query that writes results. +/// +/// The exclusive `create_new(true)` open still closes the TOCTOU / +/// symlink-swap window, which is the only hardening that matters for a +/// throwaway temp file that is `rename`d into place. Callers that write a +/// *secret* (cache key, daemon state) must use [`create_new_secure_file`] +/// instead. +/// +/// # Errors +/// +/// Returns [`io::ErrorKind::AlreadyExists`] if the path exists, or any other +/// error from the underlying open. +pub fn create_new_file_exclusive(path: &Path) -> io::Result { + std::fs::OpenOptions::new() + .write(true) + .create_new(true) + .open(path) +} + /// Write `data` to a **new** secret file born with owner-only permissions. /// /// Refuses to overwrite an existing path; callers that intend to replace an @@ -222,33 +251,170 @@ fn win_set_file_attributes(wide: &[u16], attrs: u32) -> bool { result.is_ok() } -/// Windows: set owner-only ACL via `icacls` command. +/// Windows: grant the current user full control of `path` via the native +/// Win32 security APIs (no subprocess). /// -/// S1.2.6: Grants current user full control with inheritance. -/// NOTE: We no longer strip inherited ACEs (`/inheritance:r`) because when -/// running as Administrator, `%USERNAME%` may differ from the effective SID, -/// causing `icacls /grant:r` to grant to the wrong principal and leaving -/// the directory inaccessible. Instead we keep inherited permissions and -/// add an explicit grant for the current user. This is still secure for the -/// cache use case (user-private %LOCALAPPDATA% directory). +/// S1.2.6: adds an explicit full-control ACE for the **process token's owner +/// SID** while keeping inherited ACEs intact (the DACL is set *unprotected*, +/// so the parent's inheritable ACEs are re-applied). This is still secure for +/// the cache use case (a user-private `%LOCALAPPDATA%` directory). +/// +/// Why native instead of `icacls`: +/// - **Correctness.** The old path resolved the principal from `%USERNAME%`, +/// which diverges from the effective SID under elevation — it could grant to +/// the wrong principal. The token's owner SID is always the right one. +/// - **Cost.** Shelling out to `icacls.exe` is a full process spawn (tens of +/// ms). `create_new_secure_file` runs this on every write, so the spawn was a +/// per-call tax; the native calls are microseconds. +/// +/// Returns `true` on success; callers fall back to the hidden attribute. #[cfg(windows)] fn win_set_owner_only_acl(path: &Path) -> bool { - let username = std::env::var("USERNAME").unwrap_or_default(); - if username.is_empty() { + use windows::Win32::Foundation::{CloseHandle, HANDLE}; + use windows::Win32::Security::TOKEN_QUERY; + use windows::Win32::System::Threading::{GetCurrentProcess, OpenProcessToken}; + + // SAFETY: returns a pseudo-handle that is always valid for the current + // process and needs no close. + #[expect(unsafe_code, reason = "Win32 FFI — current-process pseudo-handle")] + let process = unsafe { GetCurrentProcess() }; + + let mut token = HANDLE::default(); + // SAFETY: `process` is valid; `&raw mut token` is a valid out-pointer for + // the returned token handle. + #[expect(unsafe_code, reason = "Win32 FFI — open our own process token")] + let opened = unsafe { OpenProcessToken(process, TOKEN_QUERY, &raw mut token) }; + if opened.is_err() { return false; } - let path_str = path.to_string_lossy(); + let applied = win_apply_owner_ace(path, token); - // Grant current user full control (keep inherited ACEs intact) - let grant_arg = format!("{username}:(OI)(CI)F"); - let grant_result = std::process::Command::new("icacls") - .args([path_str.as_ref(), "/grant", &grant_arg]) - .stdout(std::process::Stdio::null()) - .stderr(std::process::Stdio::null()) - .status(); + // SAFETY: `token` was returned by a successful `OpenProcessToken` and has + // not been closed elsewhere. + #[expect(unsafe_code, reason = "Win32 FFI — close the process token handle")] + let _closed = unsafe { CloseHandle(token) }; + applied +} + +/// Read the process token's owner SID and apply a full-control ACE for it to +/// `path`'s DACL. Split out so the token handle in the caller is always +/// closed on every return path. +#[cfg(windows)] +fn win_apply_owner_ace(path: &Path, token: windows::Win32::Foundation::HANDLE) -> bool { + use core::ffi::c_void; + + use windows::Win32::Foundation::{ERROR_SUCCESS, HLOCAL, LocalFree}; + use windows::Win32::Security::Authorization::{ + EXPLICIT_ACCESS_W, SE_FILE_OBJECT, SET_ACCESS, SetEntriesInAclW, SetNamedSecurityInfoW, + TRUSTEE_IS_SID, TRUSTEE_IS_USER, TRUSTEE_W, + }; + use windows::Win32::Security::{ + ACL, DACL_SECURITY_INFORMATION, GetTokenInformation, PSID, + SUB_CONTAINERS_AND_OBJECTS_INHERIT, TOKEN_USER, TokenUser, + }; + use windows::Win32::Storage::FileSystem::FILE_ALL_ACCESS; + use windows::core::PWSTR; + + // Size the TOKEN_USER buffer (first call fails, filling `needed`). + let mut needed = 0_u32; + // SAFETY: a null buffer with length 0 is the documented "query size" call; + // `&raw mut needed` receives the required byte count. + #[expect(unsafe_code, reason = "Win32 FFI — size the token-info buffer")] + let _probe = unsafe { GetTokenInformation(token, TokenUser, None, 0, &raw mut needed) }; + if needed == 0 { + return false; + } + + // Over-aligned backing store: a `TOKEN_USER` embeds a pointer (8-byte + // aligned on x64) which a `Vec` would not guarantee. Round the byte + // count up to whole `u64` words so the cast below is well-aligned. We pass + // `needed` (≤ the allocation) as the length, so no `usize→u32` cast. + let words = (needed as usize).div_ceil(size_of::()); + let mut buffer = vec![0_u64; words]; + // SAFETY: `buffer` is `words * 8 ≥ needed` bytes; the pointer/length pair + // stay within it and `&raw mut needed` receives the bytes written. + #[expect(unsafe_code, reason = "Win32 FFI — read the token user/SID")] + let read = unsafe { + GetTokenInformation( + token, + TokenUser, + Some(buffer.as_mut_ptr().cast::()), + needed, + &raw mut needed, + ) + }; + if read.is_err() { + return false; + } + + // The SID lives *inside* `buffer`, which must outlive every use below. + #[expect( + unsafe_code, + reason = "Win32 FFI — interpret token buffer as TOKEN_USER" + )] + // SAFETY: `GetTokenInformation(TokenUser)` populated `buffer` (a `u64` + // allocation, so 8-aligned) with a `TOKEN_USER` whose `User.Sid` points + // into that same allocation. + let sid: PSID = unsafe { (*buffer.as_ptr().cast::()).User.Sid }; + if sid.is_invalid() { + return false; + } + + // The SID is passed through the `ptstrName` pointer slot, per the + // documented `TRUSTEE_IS_SID` convention — it is never dereferenced as + // UTF-16. + let sid_name = PWSTR(sid.0.cast::()); + + // Grant the owner SID full control; (OI)(CI) so a directory's children + // inherit it (ignored for plain files). + let explicit = EXPLICIT_ACCESS_W { + grfAccessPermissions: FILE_ALL_ACCESS.0, + grfAccessMode: SET_ACCESS, + grfInheritance: SUB_CONTAINERS_AND_OBJECTS_INHERIT, + Trustee: TRUSTEE_W { + TrusteeForm: TRUSTEE_IS_SID, + TrusteeType: TRUSTEE_IS_USER, + ptstrName: sid_name, + ..Default::default() + }, + }; + + let mut new_dacl: *mut ACL = core::ptr::null_mut(); + let entries = [explicit]; + // SAFETY: `entries` outlives the call; `&raw mut new_dacl` receives a + // `LocalAlloc`-owned ACL we free below. + #[expect(unsafe_code, reason = "Win32 FFI — build the new DACL")] + let build = unsafe { SetEntriesInAclW(Some(&entries), None, &raw mut new_dacl) }; + if build != ERROR_SUCCESS { + return false; + } + + let mut wide = path_to_wide(path); + // SAFETY: `wide` is a mutable null-terminated UTF-16 buffer; `new_dacl` is + // a valid ACL from `SetEntriesInAclW`. Owner/group are unchanged (`None`); + // only the DACL is set, unprotected (inherited ACEs preserved). + #[expect(unsafe_code, reason = "Win32 FFI — apply the DACL to the path")] + let set = unsafe { + SetNamedSecurityInfoW( + PWSTR(wide.as_mut_ptr()), + SE_FILE_OBJECT, + DACL_SECURITY_INFORMATION, + None, + None, + Some(new_dacl), + None, + ) + }; + + if !new_dacl.is_null() { + // SAFETY: `new_dacl` was allocated by `SetEntriesInAclW` via + // `LocalAlloc`, so `LocalFree` is the matching deallocator. + #[expect(unsafe_code, reason = "Win32 FFI — free the ACL allocation")] + let _freed = unsafe { LocalFree(Some(HLOCAL(new_dacl.cast::()))) }; + } - grant_result.is_ok_and(|status| status.success()) + set == ERROR_SUCCESS } // ──────────────────────────────────────────────────────────────────────────── diff --git a/crates/uffs-security/src/fs/tests.rs b/crates/uffs-security/src/fs/tests.rs index ec6e54820..41ba5d013 100644 --- a/crates/uffs-security/src/fs/tests.rs +++ b/crates/uffs-security/src/fs/tests.rs @@ -202,6 +202,59 @@ fn paths_identical_false_for_different_files() { ); } +#[test] +fn create_new_file_exclusive_rejects_existing() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("exists.bin"); + std::fs::write(&path, b"pre-existing").unwrap(); + let err = create_new_file_exclusive(&path).expect_err("must refuse existing path"); + assert_eq!(err.kind(), io::ErrorKind::AlreadyExists); +} + +#[cfg(unix)] +#[test] +fn create_new_file_exclusive_rejects_symlink() { + // The TOCTOU/symlink hardening must survive even without the owner-only + // ACL: `create_new` still refuses to follow a pre-planted symlink. + let dir = tempfile::tempdir().unwrap(); + let target = dir.path().join("target.bin"); + std::fs::write(&target, b"sentinel").unwrap(); + let link = dir.path().join("link.bin"); + std::os::unix::fs::symlink(&target, &link).unwrap(); + + let err = create_new_file_exclusive(&link).expect_err("must refuse symlink"); + assert_eq!(err.kind(), io::ErrorKind::AlreadyExists); + assert_eq!(std::fs::read(&target).unwrap(), b"sentinel"); +} + +#[test] +fn create_new_file_exclusive_writes_content() { + use std::io::Write as _; + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("out.csv"); + let mut file = create_new_file_exclusive(&path).unwrap(); + file.write_all(b"a,b,c\n").unwrap(); + drop(file); + assert_eq!(std::fs::read(&path).unwrap(), b"a,b,c\n"); +} + +/// Regression guard for the v0.5.111 performance regression: the secure-fs +/// module must never shell out to a subprocess (the old owner-only ACL path +/// invoked `icacls.exe`) to set permissions. That put a ~tens-of-ms process +/// spawn on every result write, because `create_new_secure_file` runs per +/// `--out` query. Permissions are now applied via native Win32 ACL APIs; this +/// test fails loudly if a `process::Command` spawn creeps back into `fs.rs`. +/// We match the spawn API (not the word "icacls", which appears in the +/// explanatory comments) so the guard tracks behaviour, not prose. +#[test] +fn fs_module_spawns_no_subprocess() { + let source = include_str!("../fs.rs"); + assert!( + !source.contains("Command::new") && !source.contains("process::Command"), + "fs.rs must not spawn a subprocess (perf regression guard, WI-8.1 / v0.5.111)" + ); +} + #[cfg(unix)] #[test] fn secure_remove_follows_symlink_to_target_then_unlinks_link() { diff --git a/scripts/windows/cross-tool-benchmark.rs b/scripts/windows/cross-tool-benchmark.rs index 83a07514a..8bb2a5f35 100644 --- a/scripts/windows/cross-tool-benchmark.rs +++ b/scripts/windows/cross-tool-benchmark.rs @@ -108,6 +108,10 @@ use std::time::{Duration, Instant}; const TIMEOUT: Duration = Duration::from_secs(120); const DEFAULT_ROUNDS: usize = 10; const DEFAULT_DRIVES: &[&str] = &["C", "D"]; +/// Warm rounds run against the daemon (and implicitly the OS FS cache) right +/// before each HOT head-to-head, so UFFS competes from a fully-primed index — +/// fair against ES (always pre-indexed) and the C++ tool (re-reads every MFT). +const PRIME_ROUNDS: usize = 3; /// Bench output file in current working directory. /// C++ UFFS cannot write to absolute paths — relative paths work fine. @@ -162,6 +166,21 @@ const PATTERNS: &[(&str, &str, &str, &str, &str, &str)] = &[ ("substring", "config", "config", "config", "", "config"), ]; +/// Patterns worth running in the COLD and WARM phases. +/// +/// COLD/WARM wall time is dominated by the MFT read + index (de)serialisation +/// — a fixed per-drive cost that is **independent of the query pattern**. The +/// only thing the pattern changes in those phases is the output-write cost, +/// which scales with result-set size. Running all 7 patterns there just +/// re-measures the same index-load floor 7 times, so we restrict COLD/WARM to: +/// +/// - `exact` — tiny result set ⇒ measures the pure index-load floor. +/// - `full_scan` — 3M+ rows ⇒ measures the max output-write cost. +/// +/// All 7 patterns still run in HOT, where pure query execution dominates and +/// the engines (trie / prefix / regex / full-scan) genuinely diverge. +const COLD_WARM_PATTERNS: &[&str] = &["exact", "full_scan"]; + // ── Types ──────────────────────────────────────────────────────────────────── #[derive(Clone, Copy, PartialEq, Eq)] enum Tool { Uffs, UffsCpp, Everything } impl Tool { fn label(self) -> &'static str { match self { Self::Uffs=>"UFFS", Self::UffsCpp=>"UFFS-C++", Self::Everything=>"Everything" } } } @@ -294,6 +313,20 @@ fn find_uffs_cpp() -> Option { find_in(&[PathBuf::from(&h).join("bin").join("uffs.com")]) }) } +/// Locate `Everything.exe` (the GUI/service binary) so the bench can launch a +/// private, drive-scoped instance. Distinct from `es.exe` (the CLI client). +fn find_everything() -> Option { + where_exe("Everything.exe").or_else(|| { + let (h, pf, pf86) = (env::var("USERPROFILE").unwrap_or_default(), + env::var("ProgramFiles").unwrap_or_default(), + env::var("ProgramFiles(x86)").unwrap_or_default()); + find_in(&[ + PathBuf::from(&pf).join("Everything").join("Everything.exe"), + PathBuf::from(&pf86).join("Everything").join("Everything.exe"), + PathBuf::from(&h).join("bin").join("Everything.exe"), + ]) + }) +} // ── UFFS lifecycle ─────────────────────────────────────────────────────────── @@ -307,9 +340,21 @@ fn uffs_stop(bin: &Path) { /// Hot→Warm or Warm→Parked mid-run. These env vars are scoped to the /// daemon child process only — the bench script's own env is unchanged, /// and teardown's next `uffs daemon start` gets production defaults. -fn uffs_start(bin: &Path) { +/// +/// `drives` scopes which drives the daemon loads on startup. This is +/// **essential** for fair timing: without an explicit `--drive` flag the +/// daemon discovers and loads *every* NTFS volume on the host, so the +/// WARM/HOT load cost would reflect all drives rather than just the one(s) +/// under test. Each letter is forwarded as a separate `--drive ` +/// (`parse_daemon_start` in uffs-cli accumulates them). +fn uffs_start(bin: &Path, drives: &[String]) { + let mut args: Vec = vec!["daemon".into(), "start".into()]; + for d in drives { + args.push("--drive".into()); + args.push(d.clone()); + } let _ = Command::new(bin) - .args(["daemon", "start"]) + .args(&args) .env("UFFS_HOT_TO_WARM_IDLE_SECS", "3600") .env("UFFS_WARM_TO_PARKED_IDLE_SECS", "7200") .stdout(Stdio::null()).stderr(Stdio::null()) @@ -326,6 +371,28 @@ fn uffs_purge_cache() { let _ = std::fs::remove_dir_all(dir); } } +/// Prime the daemon for peak HOT performance over `drive_spec` (a single `"C"` +/// or a CSV `"C,D,G"`): run `rounds` warm full-scan searches with `--no-output` +/// (rows discarded — the daemon still executes the search, warming the hot tier +/// and OS FS cache). This makes the UFFS HOT comparison fair against ES (fully +/// pre-indexed) and the C++ tool (re-reads every MFT each invocation). +fn prime_daemon(bin: &Path, drive_spec: &str, rounds: usize) { + eprint!(" priming daemon ({rounds} rounds, drives={drive_spec})..."); + flush(); + for _ in 0..rounds { + let mut args: Vec = vec!["*".into()]; + if drive_spec.contains(',') { + args.push(format!("--drives={drive_spec}")); + } else { + args.push("--drive".into()); + args.push(drive_spec.into()); + } + args.push("--no-output".into()); + let _ = Command::new(bin).args(&args) + .stdout(Stdio::null()).stderr(Stdio::null()).status(); + } + eprintln!(" ready."); +} /// Check if a line is a header, footer, or empty (not a data row). @@ -392,11 +459,10 @@ fn count_and_validate(path: &str, needle: &str) -> (u64, u64) { fn cleanup_bench_file() { let p = bench_out_path(); let _ = std::fs::remove_file(&p); } fn cleanup_file(p: &str) { let _ = std::fs::remove_file(p); } -/// Extract the path column from a tool's output file or byte buffer into a -/// normalised (lowercase, trimmed) set of strings. Headers, footers, and -/// empty lines are stripped by `is_header_or_footer` so the set only -/// contains actual filesystem paths. Used for path-set superset checks. -fn extract_paths_from_file(path: &str) -> std::collections::HashSet { +/// Read a tool's output file, strip headers/footers, normalise each path +/// (lowercase, trim quotes, first CSV field), and return a **sorted** vec. +/// Handles both UTF-8 and UTF-16 LE (BOM) output. +fn normalise_paths(path: &str) -> Vec { let content = match std::fs::read_to_string(path) { Ok(s) => s, Err(_) => match std::fs::read(path) { @@ -406,36 +472,96 @@ fn extract_paths_from_file(path: &str) -> std::collections::HashSet { String::from_utf16_lossy(&u16s) } Ok(bytes) => String::from_utf8_lossy(&bytes).into_owned(), - Err(_) => return std::collections::HashSet::new(), + Err(_) => return Vec::new(), }, }; - content.lines() + let mut v: Vec = content.lines() .filter(|l| !is_header_or_footer(l)) .map(|l| { - // The Path column may be quoted CSV: strip surrounding quotes and - // take only the first comma-delimited field (path is always first). let trimmed = l.trim().trim_matches('"'); trimmed.split(',').next().unwrap_or(trimmed).trim().to_lowercase() }) .filter(|s| !s.is_empty()) - .collect() + .collect(); + v.sort_unstable(); + v.dedup(); + v } +/// Result of comparing two sorted normalised path lists. +struct DiffResult { + only_in_a: Vec, // paths present in A but not B + only_in_b: Vec, // paths present in B but not A +} + +impl DiffResult { + fn is_identical(&self) -> bool { self.only_in_a.is_empty() && self.only_in_b.is_empty() } +} + +/// Compare two **sorted** path vecs using a merge-walk (O(n)). Collects all +/// differences; the caller prints at most `max_examples` from each side. +fn diff_paths(a: &[String], b: &[String]) -> DiffResult { + let mut only_in_a = Vec::new(); + let mut only_in_b = Vec::new(); + let (mut i, mut j) = (0usize, 0usize); + while i < a.len() && j < b.len() { + match a[i].cmp(&b[j]) { + std::cmp::Ordering::Equal => { i += 1; j += 1; } + std::cmp::Ordering::Less => { only_in_a.push(a[i].clone()); i += 1; } + std::cmp::Ordering::Greater => { only_in_b.push(b[j].clone()); j += 1; } + } + } + only_in_a.extend_from_slice(&a[i..]); + only_in_b.extend_from_slice(&b[j..]); + DiffResult { only_in_a, only_in_b } +} + +/// Resolve a (possibly relative) bench output path to an absolute path string +/// for display, so the diff header names the exact file each side came from. +fn abs_display(path: &str) -> String { + env::current_dir() + .map(|d| d.join(path)) + .unwrap_or_else(|_| PathBuf::from(path)) + .display() + .to_string() +} -/// Check that `subset` is a subset of `superset`; return a short summary -/// string. Reports the first few missing paths on a violation. -fn check_subset( - subset_label: &str, - subset: &std::collections::HashSet, - superset_label: &str, - superset: &std::collections::HashSet, -) -> Option { - if subset.is_empty() || superset.is_empty() { return None; } - let missing: Vec<&String> = subset.iter().filter(|p| !superset.contains(*p)).collect(); - if missing.is_empty() { return None; } - let preview: Vec<&str> = missing.iter().take(3).map(|s| s.as_str()).collect(); - Some(format!("{subset_label}⊄{superset_label}: {} paths missing (e.g. {})", - missing.len(), preview.join(", "))) +/// Print a human-readable diff summary between two tool outputs. +/// +/// The header names the **full source file** each side was read from and its +/// row count; both lists were sorted + normalised by `normalise_paths` before +/// the merge-walk, so the examples appear in sorted order. Shows up to +/// `max_examples` lines from each side so operators can spot patterns. +fn print_diff( + a_label: &str, a_path: &str, a_n: usize, + b_label: &str, b_path: &str, b_n: usize, + diff: &DiffResult, max_examples: usize, +) { + eprintln!(" diff {a_label} vs {b_label} (sorted + normalised):"); + eprintln!(" {a_label:<4} ({a_n:>8} rows): {}", abs_display(a_path)); + eprintln!(" {b_label:<4} ({b_n:>8} rows): {}", abs_display(b_path)); + if diff.is_identical() { + eprintln!(" result: identical ✓"); + return; + } + let show_a = diff.only_in_a.len().min(max_examples); + let show_b = diff.only_in_b.len().min(max_examples); + eprintln!(" result: ⚠ {} only in {a_label}, {} only in {b_label}", + diff.only_in_a.len(), diff.only_in_b.len()); + if show_a > 0 { + eprintln!(" only in {a_label} (first {show_a}):"); + for p in &diff.only_in_a[..show_a] { eprintln!(" - {p}"); } + if diff.only_in_a.len() > max_examples { + eprintln!(" ... and {} more", diff.only_in_a.len() - max_examples); + } + } + if show_b > 0 { + eprintln!(" only in {b_label} (first {show_b}):"); + for p in &diff.only_in_b[..show_b] { eprintln!(" + {p}"); } + if diff.only_in_b.len() > max_examples { + eprintln!(" ... and {} more", diff.only_in_b.len() - max_examples); + } + } } /// Count data lines in a captured stdout byte buffer, filtering the same @@ -541,6 +667,10 @@ fn validate_output(sink: OutputSink, path: &str, stdout: &[u8], needle: &str) -> fn run_uffs(bin: &Path, drive: &str, pattern: &str, validate: &str, sink: OutputSink) -> Timing { run_uffs_to(bin, drive, pattern, validate, sink, &bench_out_path()) } +/// `drive` is either a single letter (`"C"`) for a per-drive step or a CSV +/// drive-spec (`"C,D,G"`) for the all-drives aggregate step. The former emits +/// `--drive C`; the latter emits `--drives=C,D,G` (the uffs CLI parses the CSV +/// into multiple drive targets — see `commands/search/dispatch.rs`). fn run_uffs_to(bin: &Path, drive: &str, pattern: &str, validate: &str, sink: OutputSink, bpath: &str) -> Timing { cleanup_file(bpath); let bpath = bpath.to_owned(); @@ -549,11 +679,15 @@ fn run_uffs_to(bin: &Path, drive: &str, pattern: &str, validate: &str, sink: Out // Everything (which does not index NTFS system files or Alternate Data // Streams by default). Without these flags, UFFS returns 30-70% more // rows for broad patterns and the timing comparison is meaningless. - let mut args: Vec = vec![ - pattern.into(), "--drive".into(), drive.into(), - "--columns".into(), "Path".into(), - "--hide-system".into(), "--hide-ads".into(), - ]; + let mut args: Vec = vec![pattern.into()]; + if drive.contains(',') { + args.push(format!("--drives={drive}")); + } else { + args.push("--drive".into()); + args.push(drive.into()); + } + args.push("--columns".into()); args.push("Path".into()); + args.push("--hide-system".into()); args.push("--hide-ads".into()); if matches!(sink, OutputSink::File) { args.push(format!("--out={bpath}")); } @@ -627,13 +761,19 @@ fn run_es_to(bin: &Path, drive: &str, pattern: &str, validate: &str, sink: Outpu // When a named instance is in use (private bench instance launched with // -instance ), prepend -instance so es.exe connects to the // correct IPC window instead of the default one. - let drive_path = format!("{drive}:\\"); let mut args: Vec = Vec::new(); if let Some(inst) = es_instance { args.push("-instance".into()); args.push(inst.into()); } - args.push(drive_path); + // Scope: a single drive uses the ":\" path filter; the aggregate + // drive-spec ("C,D,G") omits the filter so es searches the whole + // instance — which the relaunched sandbox has scoped to exactly those + // drives, so no per-path filter is needed (and one would only match a + // single drive anyway). + if !drive.contains(',') { + args.push(format!("{drive}:\\")); + } if pattern != "*" { args.push(pattern.into()); } if matches!(sink, OutputSink::File) { args.push("-export-csv".into()); @@ -657,6 +797,242 @@ fn run_es_to(bin: &Path, drive: &str, pattern: &str, validate: &str, sink: Outpu Timing { wall_ms: out.wall_ms, rows, bad_rows, ok: true, ..Default::default() } } +// ── Everything: isolated bench instance ───────────────────────────────────── +// Ported from `crates/uffs-bench/src/run/es_instance.rs`. When ES is part of +// the run the bench KILLS every running `Everything.exe` and launches a private +// sandbox instance: +// +// Everything.exe -config -instance uffs-bench -startup +// +// `` is generated from the permanent `Everything.ini` but with +// `ntfs_volume_includes`/`ntfs_volume_monitors` set to 1 ONLY for the requested +// drives (0 for the rest) and all `auto_include_*`/`auto_remove_*` keys forced +// to 0 so ES does not auto-discover other volumes. The permanent ini is never +// modified. `es.exe` queries target the sandbox via `-instance uffs-bench`. + +/// Named instance used for the bench-local Everything process. +const ES_INSTANCE_NAME: &str = "uffs-bench"; +/// Poll budget waiting for the bench instance to finish indexing (60×5s = 5m). +const ES_LOAD_POLL_ATTEMPTS: u32 = 60; +const ES_LOAD_POLL_INTERVAL: Duration = Duration::from_secs(5); +/// Grace after asking existing instances to exit before spawning ours. +const ES_KILL_GRACE: Duration = Duration::from_secs(3); +/// Grace after spawning before the first IPC readiness poll. +const ES_STARTUP_GRACE: Duration = Duration::from_secs(5); + +/// Permanent Everything.ini path (`%APPDATA%\Everything\Everything.ini`). +fn everything_ini_path() -> PathBuf { + PathBuf::from(env::var("APPDATA").unwrap_or_default()) + .join("Everything").join("Everything.ini") +} + +/// Temp path for the bench ini (prefers `%TEMP%`). +fn bench_ini_path() -> PathBuf { + env::temp_dir().join("uffs-bench-everything.ini") +} + +/// Temp path for the bench instance's Everything database. Pinned (rather +/// than the per-instance default) so it can be deleted before every launch, +/// forcing a fresh index scoped to the current `ntfs_volume_includes` mask. +/// Without this the `uffs-bench` instance reuses the db from the PREVIOUS +/// per-drive launch (e.g. the C run), loads those drives, ignores the new +/// includes mask, and rewrites our temp ini to match — so an E run ends up +/// indexing C. +fn bench_db_path() -> PathBuf { + env::temp_dir().join("uffs-bench-everything.db") +} + +/// Parse a `key=val1,val2,...` Everything.ini array value into tokens. +/// Handles the quoted-string format ES uses (`"C:","D:"`); quoted tokens are +/// kept whole. +fn parse_ini_array(value: &str) -> Vec { + let mut tokens = Vec::new(); + let mut rest = value.trim(); + while !rest.is_empty() { + if rest.starts_with('"') { + let close = rest.char_indices().skip(1).find(|(_, ch)| *ch == '"'); + let end = close.map_or(rest.len(), |(idx, _)| idx + 1); + let (tok, tail) = rest.split_at(end); + tokens.push(tok.to_owned()); + rest = tail.trim_start_matches(','); + } else { + let end = rest.find(',').unwrap_or(rest.len()); + let (tok, tail) = rest.split_at(end); + tokens.push(tok.to_owned()); + rest = tail.trim_start_matches(','); + } + } + tokens +} + +/// Rebuild ini text replacing `ntfs_volume_includes`, `ntfs_volume_monitors`, +/// and `ntfs_volume_load_recent_changes` with the provided bitmask, pinning +/// `db_location` to `db_location`, and forcing the `auto_include_*`/ +/// `auto_remove_*` keys to `0`. Every other line is copied verbatim. +fn rebuild_ini(text: &str, includes: &str, monitors: &str, db_location: &str) -> String { + let mut out = String::with_capacity(text.len()); + for line in text.lines() { + let key = line.split_once('=').map_or("", |(k, _)| k.trim()); + match key { + "ntfs_volume_includes" => { + out.push_str("ntfs_volume_includes="); + out.push_str(includes); + out.push('\n'); + } + "ntfs_volume_monitors" => { + out.push_str("ntfs_volume_monitors="); + out.push_str(monitors); + out.push('\n'); + } + "ntfs_volume_load_recent_changes" => { + out.push_str("ntfs_volume_load_recent_changes="); + out.push_str(includes); + out.push('\n'); + } + // Pin the db to our known temp path so es_launch can delete it + // before each launch, guaranteeing a fresh index that honours the + // includes mask above (instead of reusing a prior run's db). + "db_location" => { + out.push_str("db_location="); + out.push_str(db_location); + out.push('\n'); + } + // Force to 0 — without this ES ignores ntfs_volume_paths and + // auto-discovers every fixed NTFS drive on the machine. + "auto_include_fixed_volumes" + | "auto_include_removable_volumes" + | "auto_include_fixed_refs_volumes" + | "auto_include_removable_refs_volumes" + | "auto_remove_offline_ntfs_volumes" + | "auto_remove_moved_ntfs_volumes" + | "auto_remove_offline_refs_volumes" + | "auto_remove_moved_refs_volumes" => { + out.push_str(key); + out.push_str("=0\n"); + } + _ => { out.push_str(line); out.push('\n'); } + } + } + out +} + +/// Write the bench `Everything.ini` into `ini_out`, including only `drives`. +fn write_bench_ini(ini_out: &Path, drives: &[String]) -> std::io::Result<()> { + let permanent = everything_ini_path(); + let text = std::fs::read_to_string(&permanent).unwrap_or_default(); + let bench_set: Vec = drives.iter() + .filter_map(|d| d.chars().next()) + .map(|c| c.to_ascii_uppercase()) + .collect(); + // Map positional ntfs_volume_paths → include bit (1 for bench drives). + let mut paths: Vec = Vec::new(); + for line in text.lines() { + if let Some((k, v)) = line.split_once('=') { + if k.trim() == "ntfs_volume_paths" { paths = parse_ini_array(v); break; } + } + } + let includes: String = paths.iter().map(|tok| { + let letter = tok.trim_matches('"').chars().next().unwrap_or(' ').to_ascii_uppercase(); + if bench_set.contains(&letter) { "1" } else { "0" } + }).collect::>().join(","); + let monitors = includes.clone(); + // Diagnostic: surface the exact bit→volume mapping so a wrong bitpattern + // (or an empty/garbled ntfs_volume_paths that would misalign the mask) is + // visible in the run log. e.g. "C:=0 D:=0 E:=1 F:=0 ...". + let map: String = paths.iter().zip(includes.split(',')) + .map(|(tok, bit)| format!("{}={bit}", tok.trim_matches('"'))) + .collect::>().join(" "); + eprintln!(" [es-instance] ini volumes ({} entries): {map}", paths.len()); + let db = bench_db_path(); + let db_str = db.to_string_lossy(); + let out = rebuild_ini(&text, &includes, &monitors, &db_str); + std::fs::write(ini_out, out.as_bytes()) +} + +/// Ask any running Everything instances (default + stale bench) to exit. +fn es_kill_existing(everything: &Path) { + let _ = Command::new(everything).args(["-exit"]) + .stdout(Stdio::null()).stderr(Stdio::null()).status(); + let _ = Command::new(everything).args(["-instance", ES_INSTANCE_NAME, "-exit"]) + .stdout(Stdio::null()).stderr(Stdio::null()).status(); +} + +/// Launch the sandboxed Everything instance indexing only `drives`. Returns +/// the temp ini path so the caller can remove it on [`es_stop`]. +fn es_launch(everything: &Path, drives: &[String]) -> Option { + if drives.is_empty() { return None; } + let ini = bench_ini_path(); + // ORDER MATTERS. Kill any running instance FIRST and wait for it to fully + // exit. A previous uffs-bench instance was launched with `-config `; on shutdown Everything writes its current (wrong-drive) + // volume state back to that ini. If we wrote the ini before killing, the + // dying instance would clobber our freshly-written includes mask — which + // is exactly why the relaunched instance kept indexing the old drive. + es_kill_existing(everything); + std::thread::sleep(ES_KILL_GRACE); + // Delete the prior bench db so the relaunched instance rebuilds a fresh + // index from the includes mask, rather than loading the previous per-drive + // run's db. + let db = bench_db_path(); + if db.exists() { + if let Err(e) = std::fs::remove_file(&db) { + eprintln!(" [es-instance] WARNING: could not remove stale db {} — {e}", db.display()); + } else { + eprintln!(" [es-instance] removed stale db {}", db.display()); + } + } + // Now that no instance is alive to overwrite it, write the fresh ini. + if let Err(e) = write_bench_ini(&ini, drives) { + eprintln!(" [es-instance] WARNING: could not write temp ini — {e}"); + return None; + } + eprintln!(" [es-instance] launching Everything (drives: {}) …", drives.join(",")); + let ini_str = ini.to_string_lossy().to_string(); + let args = ["-config", ini_str.as_str(), "-instance", ES_INSTANCE_NAME, "-startup"]; + eprintln!(" [es-instance] spawn: {} {}", everything.display(), args.join(" ")); + match Command::new(everything).args(args) + .stdout(Stdio::null()).stderr(Stdio::null()).spawn() { + Ok(_) => Some(ini), + Err(e) => { eprintln!(" [es-instance] WARNING: could not launch Everything — {e}"); None } + } +} + +/// Poll `es.exe -instance uffs-bench` until every drive reports a non-zero +/// result count, or the poll budget is exhausted. Returns `true` when loaded. +fn es_wait_until_loaded(es: &Path, drives: &[String]) -> bool { + std::thread::sleep(ES_STARTUP_GRACE); + for attempt in 1..=ES_LOAD_POLL_ATTEMPTS { + let counts: Vec<(String, u64)> = drives.iter().map(|d| { + let search = format!("{d}:"); + let n = Command::new(es) + .args(["-instance", ES_INSTANCE_NAME, search.as_str(), "-get-result-count"]) + .stdout(Stdio::piped()).stderr(Stdio::null()).output().ok() + .and_then(|o| String::from_utf8_lossy(&o.stdout).trim().parse::().ok()) + .unwrap_or(0); + (d.clone(), n) + }).collect(); + if counts.iter().all(|(_, n)| *n > 0) { + eprintln!(" [es-instance] Everything index loaded — proceeding"); + return true; + } + let cs = counts.iter().map(|(d, n)| format!("{d}:{n}")).collect::>().join(" "); + eprintln!(" [es-instance] waiting for Everything to finish indexing … (attempt {attempt}/{ES_LOAD_POLL_ATTEMPTS}) [{cs}]"); + std::thread::sleep(ES_LOAD_POLL_INTERVAL); + } + eprintln!(" [es-instance] WARNING: Everything did not finish indexing within 5 minutes — ES cells measured with a partial index"); + false +} + +/// Send `Everything.exe -instance uffs-bench -exit` and remove the temp ini +/// and pinned bench db so nothing stale is left for the next run. +fn es_stop(everything: &Path, ini: Option<&Path>) { + let _ = Command::new(everything).args(["-instance", ES_INSTANCE_NAME, "-exit"]) + .stdout(Stdio::null()).stderr(Stdio::null()).status(); + std::thread::sleep(ES_KILL_GRACE); + if let Some(p) = ini { let _ = std::fs::remove_file(p); } + let _ = std::fs::remove_file(bench_db_path()); +} + // ── Run: UFFS C++ (uffs.com) ───────────────────────────────────────────────── /// C++ UFFS reads MFT every invocation (no daemon). No --limit flag. /// Extension filter uses --ext= instead of glob *.ext. @@ -664,10 +1040,12 @@ fn run_es_to(bin: &Path, drive: &str, pattern: &str, validate: &str, sink: Outpu /// /// # Sink notes /// -/// - `File`: emit `--out=` and use `Stdio::inherit()` on stdout/stderr. -/// The C++ binary internally `freopen()`s stdout onto the `--out=` file; -/// pre-redirecting stdout to a Rust pipe or NUL makes freopen fail silently -/// and the output file comes out empty. Inherit is the only safe choice here. +/// - `File`: emit `--out=`, inherit stdout (the C++ binary internally +/// `freopen()`s stdout onto the `--out=` file; pre-redirecting stdout to a +/// Rust pipe or NUL makes freopen fail silently and the output file comes out +/// empty), and send stderr to NUL. stderr carries only the decorative +/// `Drives? … / Finished in N s` footer, which otherwise spams the console +/// and makes the per-round diff output unreadable. /// - `Stdout` / `Null`: drop `--out=` entirely. With no `--out=` the freopen /// path never fires, so piped-capture (Stdout) and `cmd /C "... > NUL"` /// (Null) behave normally. @@ -692,12 +1070,13 @@ fn run_uffs_cpp_to(bin: &Path, drive: &str, pattern: &str, cpp_ext: &str, valida eprintln!(" CMD: & '{}' {} [sink={}]", bin.display(), args.join(" "), sink.label()); let out: ToolOutput = match sink { OutputSink::File => { - // freopen on --out= requires inherited stdout. Inherit both streams - // and use .status(); we get no captured bytes back but the file is - // what we validate here anyway. + // freopen on --out= requires inherited stdout. stderr (the + // decorative footer) is discarded so it does not clutter the + // bench's own progress/diff output. We get no captured bytes + // back but the file is what we validate here anyway. let t = Instant::now(); let r = Command::new(bin).args(&args) - .stdout(Stdio::inherit()).stderr(Stdio::inherit()) + .stdout(Stdio::inherit()).stderr(Stdio::null()) .status(); let wall_ms = t.elapsed().as_millis() as u64; match r { @@ -839,9 +1218,206 @@ fn print_help() { eprintln!(" --help This message"); } +// ── HOT head-to-head comparison ────────────────────────────────────────────── +/// Run the HOT cross-tool comparison for `drive` — either a single letter +/// (`"C"`) or a CSV drive-spec (`"C,D,G"`) for the all-drives aggregate. +/// +/// The caller is responsible for having ALREADY scoped every tool to exactly +/// these drives (daemon restarted + primed, ES sandbox relaunched with the same +/// drive set) so the timings are apples-to-apples. For each sink × pattern this +/// runs `cfg.rounds` rounds in a freshly-shuffled tool order, prints a per-round +/// row-count line, diffs the normalised path lists (File sink), and appends one +/// summary `Row` per tool to `all_rows`. +fn run_hot_compare(cfg: &Cfg, drive: &str, all_rows: &mut Vec) { + for sink in cfg.sinks.iter().copied() { + if cfg.sinks.len() > 1 { + println!(" ── sink={} ──────────────────────────────────────────────", sink.label()); + } + + for &(label, pat, es_pat, cpp_pat, cpp_ext, validate) in PATTERNS { + if cfg.skip_pattern(label) { continue; } + + let es_skip = label == "full_scan"; // es.exe 2GB IPC ceiling on large drives + let cpp_skip = cpp_pat.is_empty(); // pattern not supported by C++ tool + + let run_uffs_tool = cfg.tools.contains(&Tool::Uffs); + let run_cpp_tool = cfg.tools.contains(&Tool::UffsCpp) && cfg.uffs_cpp.is_some() && !cpp_skip; + let run_es_tool = cfg.tools.contains(&Tool::Everything) && cfg.es.is_some() && !es_skip; + + if !run_uffs_tool && !run_cpp_tool && !run_es_tool { continue; } + + eprintln!(); + if es_skip { eprintln!(" HOT {label:<12} ES SKIP (es.exe 2GB IPC limit)"); } + if cpp_skip { eprintln!(" HOT {label:<12} C++ SKIP (pattern not supported)"); } + + eprintln!(" HOT [{s}] {label:<12} {} rounds (tools shuffled each round)", + cfg.rounds, s = sink.label()); + + let mut uffs_runs: Vec = Vec::new(); + let mut cpp_runs: Vec = Vec::new(); + let mut es_runs: Vec = Vec::new(); + let mut es_aborted = false; + + // Separate output file per tool per round — all relative paths + // (C++ cannot write to absolute paths). Files are cleaned up + // immediately after the per-round diff so disk usage stays low. + let f_uffs = format!("bench_uffs_{label}.csv"); + let f_cpp = format!("bench_cpp_{label}.csv"); + let f_es = format!("bench_es_{label}.csv"); + + for round in 0..cfg.rounds { + // Fresh LCG seed every round so tool order varies. + let seed = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.subsec_nanos() as u64 + round as u64 * 1_000_000_007) + .unwrap_or(round as u64 + 1); + let order = lcg_shuffle3(seed); + + let order_labels: Vec<&str> = order.iter().map(|&s| match s { + 0 => "uffs", 1 => "cpp", 2 => "es", _ => "?" + }).collect(); + eprintln!(" [round {:>2}/{}] order=[{}]", + round + 1, cfg.rounds, order_labels.join(",")); + flush(); + + let mut round_rows: [Option; 3] = [None; 3]; // [uffs, cpp, es] + + // ── Run tools in shuffled order ─────────────────────── + for &slot in &order { + match slot { + 0 if run_uffs_tool => { + let t = check_dnf(run_uffs_to( + &cfg.uffs, drive, pat, validate, sink, &f_uffs)); + round_rows[0] = t.ok.then_some(t.rows); + uffs_runs.push(t); + } + 1 if run_cpp_tool => { + let cpp = cfg.uffs_cpp.as_ref().unwrap(); + let t = check_dnf(run_uffs_cpp_to( + cpp, drive, cpp_pat, cpp_ext, validate, sink, &f_cpp)); + round_rows[1] = t.ok.then_some(t.rows); + cpp_runs.push(t); + } + 2 if run_es_tool && !es_aborted => { + let es = cfg.es.as_ref().unwrap(); + let t = check_dnf(run_es_to( + es, drive, es_pat, validate, sink, + cfg.es_instance.as_deref(), &f_es)); + if round == 0 && is_fast_deterministic_fail(&t) { + eprintln!(" es.exe fast-fail ({}); skipping remaining es rounds", t.err); + es_aborted = true; + } + round_rows[2] = t.ok.then_some(t.rows); + es_runs.push(t); + } + _ => {} + } + } + + // ── Line-count summary for this round ───────────────── + eprintln!(" rows: uffs={} cpp={} es={}", + round_rows[0].map_or("-".into(), |n| n.to_string()), + round_rows[1].map_or("-".into(), |n| n.to_string()), + round_rows[2].map_or("-".into(), |n| n.to_string()), + ); + + // ── Content diff (File sink only) ───────────────────── + // Expected subset invariant: ES ⊆ CPP ⊆ UFFS. + // Only print a diff when that invariant is violated — + // i.e. when the smaller tool has rows the larger doesn't. + // Identical sets and clean supersets are silent. + if matches!(sink, OutputSink::File) { + let uffs_paths = if run_uffs_tool { normalise_paths(&f_uffs) } else { Vec::new() }; + let cpp_paths = if run_cpp_tool { normalise_paths(&f_cpp) } else { Vec::new() }; + let es_paths = if run_es_tool && !es_aborted { normalise_paths(&f_es) } else { Vec::new() }; + + // cpp ⊆ uffs: violation = something in cpp not in uffs + if run_uffs_tool && run_cpp_tool && !uffs_paths.is_empty() && !cpp_paths.is_empty() { + let d = diff_paths(&uffs_paths, &cpp_paths); + if !d.only_in_b.is_empty() { + print_diff("uffs", &f_uffs, uffs_paths.len(), + "cpp", &f_cpp, cpp_paths.len(), &d, 10); + } + } + // es ⊆ uffs: violation = something in es not in uffs + if run_uffs_tool && run_es_tool && !es_aborted + && !uffs_paths.is_empty() && !es_paths.is_empty() { + let d = diff_paths(&uffs_paths, &es_paths); + if !d.only_in_b.is_empty() { + print_diff("uffs", &f_uffs, uffs_paths.len(), + "es", &f_es, es_paths.len(), &d, 10); + } + } + // es ⊆ cpp: violation = something in es not in cpp + if run_cpp_tool && run_es_tool && !es_aborted + && !cpp_paths.is_empty() && !es_paths.is_empty() { + let d = diff_paths(&cpp_paths, &es_paths); + if !d.only_in_b.is_empty() { + print_diff("cpp", &f_cpp, cpp_paths.len(), + "es", &f_es, es_paths.len(), &d, 10); + } + } + } + + // Clean up per-tool output files before the next round. + cleanup_file(&f_uffs); + cleanup_file(&f_cpp); + cleanup_file(&f_es); + } + + // ── Per-tool timing summary after all rounds ────────────── + if run_uffs_tool && !uffs_runs.is_empty() { + let s = sw(&uffs_runs); + let mut dm: Vec = uffs_runs.iter() + .filter(|r| r.ok && r.daemon_ms > 0).map(|r| r.daemon_ms).collect(); + dm.sort(); + let daemon_str = if dm.is_empty() { String::new() } + else { format!(" daemon_p50={}", fms(p50(&dm))) }; + let any_bad = uffs_runs.iter().any(|r| r.bad_rows > 0); + let verdict = if uffs_runs.iter().any(|r| r.dnf) { "DNF" } + else if any_bad { "WRONG" } else { "PASS" }; + let first_ok = uffs_runs.iter().find(|r| r.ok); + eprintln!(" UFFS p50={:>6} p95={:>6}{} rows={} {}", + fms(p50(&s)), fms(p95(&s)), daemon_str, + first_ok.map_or(0, |r| r.rows), verdict); + all_rows.push(Row { tool: Tool::Uffs, phase: Phase::Hot, sink, + drive: drive.to_string(), pat: label.into(), runs: uffs_runs }); + } + if run_cpp_tool && !cpp_runs.is_empty() { + let s = sw(&cpp_runs); + let any_bad = cpp_runs.iter().any(|r| r.bad_rows > 0); + let verdict = if cpp_runs.iter().any(|r| r.dnf) { "DNF" } + else if any_bad { "WRONG" } + else if cpp_runs.iter().all(|r| r.ok) { "PASS" } else { "ERROR" }; + let first_ok = cpp_runs.iter().find(|r| r.ok); + eprintln!(" UFFS-C++ p50={:>6} p95={:>6} rows={} {}", + fms(p50(&s)), fms(p95(&s)), first_ok.map_or(0, |r| r.rows), verdict); + all_rows.push(Row { tool: Tool::UffsCpp, phase: Phase::Hot, sink, + drive: drive.to_string(), pat: label.into(), runs: cpp_runs }); + } + if run_es_tool && !es_runs.is_empty() { + let s = sw(&es_runs); + let any_bad = es_runs.iter().any(|r| r.bad_rows > 0); + let abort_str = if es_aborted { + format!(" (fast-fail after {} round(s))", es_runs.len()) + } else { String::new() }; + let verdict = if es_runs.iter().any(|r| r.dnf) { "DNF" } + else if any_bad { "WRONG" } + else if es_runs.iter().all(|r| r.ok) { "PASS" } else { "ERROR" }; + let first_ok = es_runs.iter().find(|r| r.ok); + eprintln!(" ES p50={:>6} p95={:>6} rows={} {}{}", + fms(p50(&s)), fms(p95(&s)), + first_ok.map_or(0, |r| r.rows), verdict, abort_str); + all_rows.push(Row { tool: Tool::Everything, phase: Phase::Hot, sink, + drive: drive.to_string(), pat: label.into(), runs: es_runs }); + } + } + } +} + // ── Main ───────────────────────────────────────────────────────────────────── fn main() { - let cfg = parse_args(); + let mut cfg = parse_args(); println!("╔══════════════════════════════════════════════════════════════════════════════╗"); println!("║ Cross-Tool Benchmark v1.0 ║"); @@ -873,19 +1449,42 @@ fn main() { let mut all_rows: Vec = Vec::new(); - // ── Daemon warmup (once for all drives) ───────────────────────────── - if cfg.tools.contains(&Tool::Uffs) && cfg.skip_cold { - // When skipping COLD/WARM, kill+restart with bench-safe TTLs then - // issue one probe so the daemon is fully loaded before HOT starts. - eprint!(" Warming up UFFS daemon (all drives)..."); flush(); - uffs_stop(&cfg.uffs); - uffs_start(&cfg.uffs); - let _ = Command::new(&cfg.uffs) - .args(["__uffs_warmup_probe__", "--limit", "1"]) - .stdout(Stdio::null()).stderr(Stdio::null()) - .status(); - eprintln!(" ready."); + // ── Everything: discover Everything.exe for the private sandbox ─────── + // The sandbox instance (`-instance uffs-bench`) is (re)launched per step + // scoped to EXACTLY the drives under test — see `scope_everything` — so + // ES's working set always matches the daemon's. Here we only locate + // Everything.exe and reserve the instance name. Skipped when the operator + // already passed `--es-instance` (they manage their own) or it is missing. + let mut es_everything: Option = None; + let mut es_bench_ini: Option = None; + let manage_es = cfg.tools.contains(&Tool::Everything) && cfg.es.is_some() && cfg.es_instance.is_none(); + if manage_es { + match find_everything() { + Some(ev) => { cfg.es_instance = Some(ES_INSTANCE_NAME.to_string()); es_everything = Some(ev); } + None => eprintln!( + " [es-instance] WARNING: Everything.exe not found — es.exe will \ + query whatever instance is running (if any)"), + } } + // Re-scope ES + daemon to exactly `drives`, primed for peak HOT perf. + // Returns the temp-ini path of the (re)launched ES sandbox, if any. + let scope_tools = |cfg: &Cfg, drives: &[String]| -> Option { + let ini = es_everything.as_ref().and_then(|ev| { + eprintln!(); + let p = es_launch(ev, drives); + if p.is_some() { + if let Some(ref es) = cfg.es { es_wait_until_loaded(es, drives); } + } + p + }); + if cfg.tools.contains(&Tool::Uffs) { + uffs_stop(&cfg.uffs); + uffs_start(&cfg.uffs, drives); + eprintln!(); + prime_daemon(&cfg.uffs, &drives.join(","), PRIME_ROUNDS); + } + ini + }; for drive in &cfg.drives { println!("━━━ Drive {}: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━", drive); @@ -899,6 +1498,7 @@ fn main() { eprintln!(" done."); for &(label, pat, _, _, _, validate) in PATTERNS { + if !COLD_WARM_PATTERNS.contains(&label) { continue; } if cfg.skip_pattern(label) { continue; } eprint!(" {label:<12} "); flush(); // COLD: only 1 round (destructive — restarts daemon each time) @@ -915,15 +1515,17 @@ fn main() { // ── UFFS WARM (file sink only; same reasoning as COLD) ──────────── if cfg.tools.contains(&Tool::Uffs) && !cfg.skip_cold { + eprintln!(); eprint!(" UFFS WARM: stopping daemon (cache stays)..."); flush(); uffs_stop(&cfg.uffs); eprintln!(" done."); for &(label, pat, _, _, _, validate) in PATTERNS { + if !COLD_WARM_PATTERNS.contains(&label) { continue; } if cfg.skip_pattern(label) { continue; } eprint!(" {label:<12} "); flush(); uffs_stop(&cfg.uffs); - uffs_start(&cfg.uffs); + uffs_start(&cfg.uffs, std::slice::from_ref(drive)); let t = check_dnf(run_uffs(&cfg.uffs, drive, pat, validate, OutputSink::File)); let verdict = if t.dnf { "DNF" } else if t.bad_rows > 0 { "WRONG" } else if t.ok { "PASS" } else { "ERROR" }; let bad_str = if t.bad_rows > 0 { format!(" bad={}", t.bad_rows) } else { String::new() }; @@ -933,216 +1535,38 @@ fn main() { } } - // ── UFFS HOT daemon warmup (once per drive, before the sink loop) ─ - if cfg.tools.contains(&Tool::Uffs) { - // Tiny query just to trigger daemon startup + index load for this - // drive. Use a pattern that matches nothing, and limit 1 to - // minimise I/O. Done once per drive — the daemon stays warm - // across every sink iteration below. - eprint!(" UFFS HOT: warming up daemon..."); flush(); - uffs_start(&cfg.uffs); - let _ = Command::new(&cfg.uffs) - .args(["__uffs_warmup_probe__", "--drive", drive, "--limit", "1"]) - .stdout(Stdio::null()).stderr(Stdio::null()) - .status(); - eprintln!(" ready."); - } - - // ── Per-sink HOT rotation (interleaved rounds, randomised tool order) ─ - // - // For each (sink, pattern) the tools run in a freshly shuffled order - // every round so no tool consistently benefits from OS file-system - // caching warmed up by a prior tool. After every round the superset - // constraint is checked immediately: - // - // uffs.exe rows ≥ uffs.com rows ≥ es.exe rows - // - // This reflects the fact that UFFS (Rust) covers all files including - // system files / ADS when --hide-system / --hide-ads are NOT passed - // (here they ARE, so the counts should be close), and that Everything - // does not index NTFS metadata files that uffs.com does include. - // A violation is printed as a warning per round — it does not abort — - // so timing data is preserved even when parity is off. - for sink in cfg.sinks.iter().copied() { - if cfg.sinks.len() > 1 { - println!(" ── sink={} ──────────────────────────────────────────────", sink.label()); - } - - for &(label, pat, es_pat, cpp_pat, cpp_ext, validate) in PATTERNS { - if cfg.skip_pattern(label) { continue; } - - // Skip full_scan for Everything (2GB IPC ceiling). - let es_skip = label == "full_scan"; - // Skip patterns C++ does not support. - let cpp_skip = cpp_pat.is_empty(); - - let run_uffs_tool = cfg.tools.contains(&Tool::Uffs); - let run_cpp_tool = cfg.tools.contains(&Tool::UffsCpp) && cfg.uffs_cpp.is_some() && !cpp_skip; - let run_es_tool = cfg.tools.contains(&Tool::Everything) && cfg.es.is_some() && !es_skip; - - if !run_uffs_tool && !run_cpp_tool && !run_es_tool { continue; } - - eprintln!(" HOT [{sink}] {label:<12} {} rounds (tools shuffled each round)", - cfg.rounds, sink = sink.label()); - - let mut uffs_runs: Vec = Vec::new(); - let mut cpp_runs: Vec = Vec::new(); - let mut es_runs: Vec = Vec::new(); - let mut es_aborted = false; - - // Per-tool output files for this pattern — kept alive for the - // duration of a round so path sets can be compared, then deleted. - // C++ cannot write to absolute paths, so all three use relative names. - let f_uffs = format!("bench_uffs_{label}.csv"); - let f_cpp = format!("bench_cpp_{label}.csv"); - let f_es = format!("bench_es_{label}.csv"); - - for round in 0..cfg.rounds { - // Fresh random tool order every round — seeded from wall clock - // nanoseconds so consecutive rounds get different seeds. - let seed = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .map(|d| d.subsec_nanos() as u64 + round as u64 * 1_000_000_007) - .unwrap_or(round as u64 + 1); - let order = lcg_shuffle3(seed); - - let mut round_rows: [Option; 3] = [None; 3]; // [uffs, cpp, es] - - for &slot in &order { - match slot { - 0 if run_uffs_tool => { - let t = check_dnf(run_uffs_to(&cfg.uffs, drive, pat, validate, sink, &f_uffs)); - round_rows[0] = t.ok.then_some(t.rows); - uffs_runs.push(t); - } - 1 if run_cpp_tool => { - let cpp = cfg.uffs_cpp.as_ref().unwrap(); - let t = check_dnf(run_uffs_cpp_to(cpp, drive, cpp_pat, cpp_ext, validate, sink, &f_cpp)); - round_rows[1] = t.ok.then_some(t.rows); - cpp_runs.push(t); - } - 2 if run_es_tool && !es_aborted => { - let es = cfg.es.as_ref().unwrap(); - let t = check_dnf(run_es_to(es, drive, es_pat, validate, sink, cfg.es_instance.as_deref(), &f_es)); - if round == 0 && is_fast_deterministic_fail(&t) { - eprintln!(" [round {round}] es.exe fast-fail (exit={}); skipping remaining es rounds", t.err); - es_aborted = true; - } - round_rows[2] = t.ok.then_some(t.rows); - es_runs.push(t); - } - _ => {} - } - } - - // ── Path-set superset check after every round (File sink only) ── - // Superset contract: - // es.exe paths ⊆ uffs.com paths ⊆ uffs.exe paths - // Stdout/Null sinks don't retain output files so we skip - // the set check there; row counts are still shown. - let round_order_labels: Vec<&str> = order.iter().map(|&s| match s { - 0 => "uffs", 1 => "cpp", 2 => "es", _ => "?" - }).collect(); - - let mut violations: Vec = Vec::new(); - if matches!(sink, OutputSink::File) { - let uffs_paths = extract_paths_from_file(&f_uffs); - let cpp_paths = extract_paths_from_file(&f_cpp); - let es_paths = extract_paths_from_file(&f_es); - - if run_es_tool && !es_aborted && run_uffs_tool - && !uffs_paths.is_empty() && !es_paths.is_empty() { - if let Some(v) = check_subset("es", &es_paths, "uffs", &uffs_paths) { - violations.push(v); - } - } - if run_es_tool && !es_aborted && run_cpp_tool - && !cpp_paths.is_empty() && !es_paths.is_empty() { - if let Some(v) = check_subset("es", &es_paths, "cpp", &cpp_paths) { - violations.push(v); - } - } - if run_cpp_tool && run_uffs_tool - && !uffs_paths.is_empty() && !cpp_paths.is_empty() { - if let Some(v) = check_subset("cpp", &cpp_paths, "uffs", &uffs_paths) { - violations.push(v); - } - } - } + // ── HOT prep: reset + warm-prime the daemon AND reload the ES + // sandbox, both scoped to EXACTLY this drive, so the head-to-head + // runs against a fully-warmed, same-working-set index — fair vs ES + // (always pre-indexed) and the C++ tool (re-reads the MFT each call). + es_bench_ini = scope_tools(&cfg, std::slice::from_ref(drive)); - // Clean up per-tool files for this round. - cleanup_file(&f_uffs); - cleanup_file(&f_cpp); - cleanup_file(&f_es); - - if violations.is_empty() { - eprintln!(" [round {:>2}] order=[{}] rows: uffs={} cpp={} es={} ✓ paths ok", - round + 1, - round_order_labels.join(","), - round_rows[0].map_or("-".into(), |n| n.to_string()), - round_rows[1].map_or("-".into(), |n| n.to_string()), - round_rows[2].map_or("-".into(), |n| n.to_string()), - ); - } else { - eprintln!(" [round {:>2}] order=[{}] rows: uffs={} cpp={} es={} ⚠ PATH SUPERSET VIOLATION: {}", - round + 1, - round_order_labels.join(","), - round_rows[0].map_or("-".into(), |n| n.to_string()), - round_rows[1].map_or("-".into(), |n| n.to_string()), - round_rows[2].map_or("-".into(), |n| n.to_string()), - violations.join(" | "), - ); - } - } + // ── HOT: head-to-head comparison on this single drive ──────────── + run_hot_compare(&cfg, drive, &mut all_rows); - // ── Per-tool summary lines ──────────────────────────────── - if run_uffs_tool && !uffs_runs.is_empty() { - let s = sw(&uffs_runs); - let mut dm: Vec = uffs_runs.iter().filter(|r| r.ok && r.daemon_ms > 0).map(|r| r.daemon_ms).collect(); - dm.sort(); - let daemon_str = if dm.is_empty() { String::new() } else { format!(" daemon_p50={}", fms(p50(&dm))) }; - let any_bad = uffs_runs.iter().any(|r| r.bad_rows > 0); - let verdict = if uffs_runs.iter().any(|r| r.dnf) { "DNF" } else if any_bad { "WRONG" } else { "PASS" }; - let first_ok = uffs_runs.iter().find(|r| r.ok); - eprintln!(" UFFS p50={:>6} p95={:>6}{} rows={} {}", - fms(p50(&s)), fms(p95(&s)), daemon_str, - first_ok.map_or(0, |r| r.rows), verdict); - all_rows.push(Row { tool: Tool::Uffs, phase: Phase::Hot, sink, - drive: drive.clone(), pat: label.into(), runs: uffs_runs }); - } - if run_cpp_tool && !cpp_runs.is_empty() { - let s = sw(&cpp_runs); - let any_bad = cpp_runs.iter().any(|r| r.bad_rows > 0); - let verdict = if cpp_runs.iter().any(|r| r.dnf) { "DNF" } else if any_bad { "WRONG" } else if cpp_runs.iter().all(|r| r.ok) { "PASS" } else { "ERROR" }; - let first_ok = cpp_runs.iter().find(|r| r.ok); - eprintln!(" UFFS-C++ p50={:>6} p95={:>6} rows={} {}", - fms(p50(&s)), fms(p95(&s)), first_ok.map_or(0, |r| r.rows), verdict); - all_rows.push(Row { tool: Tool::UffsCpp, phase: Phase::Hot, sink, - drive: drive.clone(), pat: label.into(), runs: cpp_runs }); - } - if run_es_tool && !es_runs.is_empty() { - let s = sw(&es_runs); - let any_bad = es_runs.iter().any(|r| r.bad_rows > 0); - let abort_str = if es_aborted { format!(" (fast-fail, {} rounds)", es_runs.len()) } else { String::new() }; - let verdict = if es_runs.iter().any(|r| r.dnf) { "DNF" } else if any_bad { "WRONG" } else if es_runs.iter().all(|r| r.ok) { "PASS" } else { "ERROR" }; - let first_ok = es_runs.iter().find(|r| r.ok); - eprintln!(" ES p50={:>6} p95={:>6} rows={} {}{}", - fms(p50(&s)), fms(p95(&s)), first_ok.map_or(0, |r| r.rows), verdict, abort_str); - all_rows.push(Row { tool: Tool::Everything, phase: Phase::Hot, sink, - drive: drive.clone(), pat: label.into(), runs: es_runs }); - } - if run_es_tool && es_skip { - eprintln!(" {label:<12} ES SKIP (es.exe 2GB IPC limit)"); - } - if run_cpp_tool && cpp_skip { - eprintln!(" {label:<12} C++ SKIP (pattern not supported)"); - } - } - } + println!(); + } + // ── ALL-drives aggregate step ───────────────────────────────────────── + // When more than one drive is under test, run a final head-to-head with + // every tool scoped to ALL requested drives at once: ES sandbox reloaded + // with the full set, daemon restarted + primed across the full set, and + // queries spanning every drive (uffs `--drives=C,D,G`, es with no path + // filter, uffs.com `--drives=C,D,G`). Mirrors the per-drive fairness at + // aggregate scale. + if cfg.drives.len() > 1 { + let all = cfg.drives.join(","); + println!("━━━ ALL drives {all} ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━", ); + es_bench_ini = scope_tools(&cfg, &cfg.drives); + run_hot_compare(&cfg, &all, &mut all_rows); println!(); } + // ── Everything: tear down the private bench instance ────────────────── + if let Some(ev) = &es_everything { + es_stop(ev, es_bench_ini.as_deref()); + } + // ── Summary table ──────────────────────────────────────────────────────── print_summary(&cfg, &all_rows); @@ -1235,7 +1659,11 @@ fn print_summary(cfg: &Cfg, rows: &[Row]) { println!("| Drive | Pattern | UFFS HOT p50 | UFFS-C++ p50 | Everything p50 |"); println!("|-------|--------------|--------------|--------------|----------------|"); - for drive in &cfg.drives { + // Per-drive rows, then the all-drives aggregate spec (e.g. "C,D,G") + // when more than one drive was tested. + let mut drive_specs: Vec = cfg.drives.clone(); + if cfg.drives.len() > 1 { drive_specs.push(cfg.drives.join(",")); } + for drive in &drive_specs { for &(label, _, _, _, _, _) in PATTERNS { if cfg.skip_pattern(label) { continue; } let uffs_p50 = find_p50(rows, Tool::Uffs, Phase::Hot, sink, drive, label);