diff --git a/crates/next-api/src/project.rs b/crates/next-api/src/project.rs index 87204aca4256..ec4964a5a359 100644 --- a/crates/next-api/src/project.rs +++ b/crates/next-api/src/project.rs @@ -323,14 +323,15 @@ impl ProjectContainer { .await?; } else { project_fs.invalidate_with_reason(|path| invalidation::Initialize { - path: RcStr::from(path), + // this path is just used for display purposes + path: RcStr::from(path.to_string_lossy()), }); } let output_fs = output_fs_operation(project) .read_strongly_consistent() .await?; output_fs.invalidate_with_reason(|path| invalidation::Initialize { - path: RcStr::from(path), + path: RcStr::from(path.to_string_lossy()), }); Ok(()) } @@ -421,13 +422,14 @@ impl ProjectContainer { .await?; } else { project_fs.invalidate_with_reason(|path| invalidation::Initialize { - path: RcStr::from(path), + // this path is just used for display purposes + path: RcStr::from(path.to_string_lossy()), }); } } if !ReadRef::ptr_eq(&prev_output_fs, &output_fs) { prev_output_fs.invalidate_with_reason(|path| invalidation::Initialize { - path: RcStr::from(path), + path: RcStr::from(path.to_string_lossy()), }); } diff --git a/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs b/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs index b384027f04eb..7381550034d2 100644 --- a/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs +++ b/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs @@ -1,4 +1,8 @@ -use std::sync::{LockResult, Mutex, MutexGuard}; +use std::{ + collections::BTreeMap, + path::PathBuf, + sync::{LockResult, Mutex, MutexGuard}, +}; use concurrent_queue::ConcurrentQueue; use rustc_hash::FxHashMap; @@ -13,18 +17,18 @@ pub enum WriteContent { Link(ReadRef), } -type InnerMap = FxHashMap>>; +pub type LockedInvalidatorMap = BTreeMap>>; pub struct InvalidatorMap { - queue: ConcurrentQueue<(String, Invalidator, Option)>, - map: Mutex, + queue: ConcurrentQueue<(PathBuf, Invalidator, Option)>, + map: Mutex, } impl Default for InvalidatorMap { fn default() -> Self { Self { queue: ConcurrentQueue::unbounded(), - map: Default::default(), + map: Mutex::::default(), } } } @@ -34,7 +38,7 @@ impl InvalidatorMap { Self::default() } - pub fn lock(&self) -> LockResult> { + pub fn lock(&self) -> LockResult> { let mut guard = self.map.lock()?; while let Ok((key, value, write_content)) = self.queue.pop() { guard.entry(key).or_default().insert(value, write_content); @@ -44,7 +48,7 @@ impl InvalidatorMap { pub fn insert( &self, - key: String, + key: PathBuf, invalidator: Invalidator, write_content: Option, ) { @@ -66,7 +70,15 @@ impl Serialize for InvalidatorMap { where S: serde::Serializer, { - serializer.serialize_newtype_struct("InvalidatorMap", &*self.lock().unwrap()) + // TODO: This stores absolute `PathBuf`s, which are machine-specific. This should + // normalize/denormalize paths relative to the disk filesystem root. + // + // Potential optimization: We invalidate all fs reads immediately upon resuming from a + // persisted cache, but we don't invalidate the fs writes. Those read invalidations trigger + // re-inserts into the `InvalidatorMap`. If we knew that certain invalidators were only + // needed for reads, we could potentially avoid serializing those paths entirely. + let inner: &LockedInvalidatorMap = &self.lock().unwrap(); + serializer.serialize_newtype_struct("InvalidatorMap", inner) } } diff --git a/turbopack/crates/turbo-tasks-fs/src/lib.rs b/turbopack/crates/turbo-tasks-fs/src/lib.rs index 7d58c64af146..6955035d4d7c 100644 --- a/turbopack/crates/turbo-tasks-fs/src/lib.rs +++ b/turbopack/crates/turbo-tasks-fs/src/lib.rs @@ -1,4 +1,5 @@ #![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this +#![feature(btree_cursors)] // needed for the `InvalidatorMap` and watcher, reduces time complexity #![feature(trivial_bounds)] #![feature(min_specialization)] #![feature(iter_advance_by)] @@ -16,6 +17,7 @@ pub mod invalidation; mod invalidator_map; pub mod json; mod mutex_map; +mod path_map; mod read_glob; mod retry; pub mod rope; @@ -23,6 +25,7 @@ pub mod source_context; pub mod util; pub(crate) mod virtual_fs; mod watcher; + use std::{ borrow::Cow, cmp::{Ordering, min}, @@ -40,14 +43,10 @@ use anyhow::{Context, Result, anyhow, bail}; use auto_hash_map::{AutoMap, AutoSet}; use bitflags::bitflags; use dunce::simplified; -use glob::Glob; use indexmap::IndexSet; -use invalidator_map::InvalidatorMap; use jsonc_parser::{ParseOptions, parse_to_serde_value}; use mime::Mime; use rayon::iter::{IntoParallelIterator, ParallelIterator}; -pub use read_glob::ReadGlobResult; -use read_glob::{read_glob, track_glob}; use rustc_hash::FxHashSet; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -60,17 +59,19 @@ use turbo_tasks::{ mark_session_dependent, mark_stateful, trace::TraceRawVcs, }; use turbo_tasks_hash::{DeterministicHash, DeterministicHasher, hash_xxh3_hash64}; -use util::{extract_disk_access, join_path, normalize_path, sys_to_unix, unix_to_sys}; -pub use virtual_fs::VirtualFileSystem; -use watcher::DiskWatcher; use self::{invalidation::Write, json::UnparsableJson, mutex_map::MutexMap}; use crate::{ attach::AttachedFileSystem, - invalidator_map::WriteContent, + glob::Glob, + invalidator_map::{InvalidatorMap, WriteContent}, + read_glob::{read_glob, track_glob}, retry::retry_blocking, rope::{Rope, RopeReader}, + util::{extract_disk_access, join_path, normalize_path, sys_to_unix, unix_to_sys}, + watcher::DiskWatcher, }; +pub use crate::{read_glob::ReadGlobResult, virtual_fs::VirtualFileSystem}; /// A (somewhat arbitrary) filename limit that we should try to keep output file names below. /// @@ -254,10 +255,11 @@ impl DiskFileSystemInner { fn register_read_invalidator(&self, path: &Path) -> Result<()> { let invalidator = turbo_tasks::get_invalidator(); self.invalidator_map - .insert(path_to_key(path), invalidator, None); - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - if let Some(dir) = path.parent() { - self.watcher.ensure_watching(dir, self.root_path())?; + .insert(path.to_owned(), invalidator, None); + if let Some(non_recursive) = &self.watcher.non_recursive_state + && let Some(dir) = path.parent() + { + non_recursive.ensure_watching(&self.watcher, dir, self.root_path())?; } Ok(()) } @@ -272,7 +274,7 @@ impl DiskFileSystemInner { write_content: WriteContent, ) -> Result)>> { let mut invalidator_map = self.invalidator_map.lock().unwrap(); - let invalidators = invalidator_map.entry(path_to_key(path)).or_default(); + let invalidators = invalidator_map.entry(path.to_owned()).or_default(); let old_invalidators = invalidators .extract_if(|i, old_write_content| { i == &invalidator @@ -284,9 +286,10 @@ impl DiskFileSystemInner { .collect::>(); invalidators.insert(invalidator, Some(write_content)); drop(invalidator_map); - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - if let Some(dir) = path.parent() { - self.watcher.ensure_watching(dir, self.root_path())?; + if let Some(non_recursive) = &self.watcher.non_recursive_state + && let Some(dir) = path.parent() + { + non_recursive.ensure_watching(&self.watcher, dir, self.root_path())?; } Ok(old_invalidators) } @@ -296,9 +299,10 @@ impl DiskFileSystemInner { fn register_dir_invalidator(&self, path: &Path) -> Result<()> { let invalidator = turbo_tasks::get_invalidator(); self.dir_invalidator_map - .insert(path_to_key(path), invalidator, None); - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - self.watcher.ensure_watching(path, self.root_path())?; + .insert(path.to_owned(), invalidator, None); + if let Some(non_recursive) = &self.watcher.non_recursive_state { + non_recursive.ensure_watching(&self.watcher, path, self.root_path())?; + } Ok(()) } @@ -330,7 +334,7 @@ impl DiskFileSystemInner { /// Calls the given fn invalidate_with_reason( &self, - reason: impl Fn(String) -> R + Sync, + reason: impl Fn(&Path) -> R + Sync, ) { let _span = tracing::info_span!("invalidate filesystem", name = &*self.root).entered(); let span = tracing::Span::current(); @@ -342,7 +346,7 @@ impl DiskFileSystemInner { .chain(dir_invalidator_map.into_par_iter()) .flat_map(|(path, invalidators)| { let _span = span.clone().entered(); - let reason_for_path = reason(path); + let reason_for_path = reason(&path); invalidators .into_par_iter() .map(move |i| (reason_for_path.clone(), i)) @@ -446,7 +450,7 @@ impl DiskFileSystem { pub fn invalidate_with_reason( &self, - reason: impl Fn(String) -> R + Sync, + reason: impl Fn(&Path) -> R + Sync, ) { self.inner.invalidate_with_reason(reason); } @@ -501,10 +505,6 @@ fn format_absolute_fs_path(path: &Path, name: &str, root_path: &Path) -> Option< } } -pub fn path_to_key(path: impl AsRef) -> String { - path.as_ref().to_string_lossy().to_string() -} - #[turbo_tasks::value_impl] impl DiskFileSystem { /// Create a new instance of `DiskFileSystem`. @@ -753,11 +753,12 @@ impl FileSystem for DiskFileSystem { .await?; if compare == FileComparison::Equal { if !old_invalidators.is_empty() { - let key = path_to_key(&full_path); for (invalidator, write_content) in old_invalidators { - inner - .invalidator_map - .insert(key.clone(), invalidator, write_content); + inner.invalidator_map.insert( + full_path.clone().into_owned(), + invalidator, + write_content, + ); } } return Ok(()); @@ -896,11 +897,12 @@ impl FileSystem for DiskFileSystem { }; if is_equal { if !old_invalidators.is_empty() { - let key = path_to_key(&full_path); for (invalidator, write_content) in old_invalidators { - inner - .invalidator_map - .insert(key.clone(), invalidator, write_content); + inner.invalidator_map.insert( + full_path.clone().into_owned(), + invalidator, + write_content, + ); } } return Ok(()); diff --git a/turbopack/crates/turbo-tasks-fs/src/path_map.rs b/turbopack/crates/turbo-tasks-fs/src/path_map.rs new file mode 100644 index 000000000000..82ad3925078a --- /dev/null +++ b/turbopack/crates/turbo-tasks-fs/src/path_map.rs @@ -0,0 +1,80 @@ +use std::{ + collections::{BTreeMap, btree_map::CursorMut}, + ops::Bound, + path::{Path, PathBuf}, +}; + +/// A thin wrapper around [`BTreeMap`] that provides efficient extraction of child +/// paths. +/// +/// In the future, this may use a more efficient representation, like a radix tree or trie. +pub trait OrderedPathMapExt { + fn extract_path_with_children<'a>(&'a mut self, path: &'a Path) -> ExtractWithChildren<'a, V>; +} + +impl OrderedPathMapExt for BTreeMap { + fn extract_path_with_children<'a>(&'a mut self, path: &'a Path) -> ExtractWithChildren<'a, V> { + ExtractWithChildren { + cursor: self.lower_bound_mut(Bound::Included(path)), + parent_path: path, + } + } +} + +pub struct ExtractWithChildren<'a, V> { + cursor: CursorMut<'a, PathBuf, V>, + parent_path: &'a Path, +} + +impl Iterator for ExtractWithChildren<'_, V> { + type Item = (PathBuf, V); + + fn next(&mut self) -> Option { + // this simple implementation works because `Path` implements `Ord` (and `starts_with`) + // using path component comparision, rather than raw byte comparisions. The parent path is + // always guaranteed to be placed immediately before its children (pre-order traversal). + if self + .cursor + .peek_next() + .is_none_or(|(k, _v)| !k.starts_with(self.parent_path)) + { + return None; + } + self.cursor.remove_next() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_with_children() { + let mut map = BTreeMap::default(); + map.insert(PathBuf::from("a"), 1); + map.insert(PathBuf::from("a/b"), 2); + map.insert(PathBuf::from("a/b/c"), 3); + map.insert(PathBuf::from("a/b/d"), 4); + map.insert(PathBuf::from("a/c"), 5); + map.insert(PathBuf::from("x/y/z"), 6); + map.insert(PathBuf::from("z/a/b"), 7); + + let parent_path = PathBuf::from("a/b"); + let extracted: Vec<_> = map.extract_path_with_children(&parent_path).collect(); + + let expected_extracted = vec![ + (PathBuf::from("a/b"), 2), + (PathBuf::from("a/b/c"), 3), + (PathBuf::from("a/b/d"), 4), + ]; + assert_eq!(extracted, expected_extracted); + + let mut expected_remaining = BTreeMap::new(); + expected_remaining.insert(PathBuf::from("a"), 1); + expected_remaining.insert(PathBuf::from("a/c"), 5); + expected_remaining.insert(PathBuf::from("x/y/z"), 6); + expected_remaining.insert(PathBuf::from("z/a/b"), 7); + + assert_eq!(map, expected_remaining); + } +} diff --git a/turbopack/crates/turbo-tasks-fs/src/watcher.rs b/turbopack/crates/turbo-tasks-fs/src/watcher.rs index 0fe0fb5c81cc..0717076b93ff 100644 --- a/turbopack/crates/turbo-tasks-fs/src/watcher.rs +++ b/turbopack/crates/turbo-tasks-fs/src/watcher.rs @@ -1,22 +1,23 @@ use std::{ any::Any, - fmt, + env, fmt, mem::take, path::{Path, PathBuf}, sync::{ - Arc, Mutex, + Arc, LazyLock, Mutex, MutexGuard, mpsc::{Receiver, TryRecvError, channel}, }, time::Duration, }; -use anyhow::Result; +use anyhow::{Context, Result}; +use dashmap::DashSet; use notify::{ Config, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher, event::{MetadataKind, ModifyKind, RenameMode}, }; use rayon::iter::{IntoParallelIterator, ParallelIterator}; -use rustc_hash::{FxHashMap, FxHashSet}; +use rustc_hash::FxHashSet; use serde::{Deserialize, Serialize}; use tracing::instrument; use turbo_rcstr::RcStr; @@ -28,10 +29,39 @@ use turbo_tasks::{ use crate::{ DiskFileSystemInner, format_absolute_fs_path, invalidation::{WatchChange, WatchStart}, - invalidator_map::WriteContent, - path_to_key, + invalidator_map::LockedInvalidatorMap, + path_map::OrderedPathMapExt, }; +static WATCH_RECURSIVE_MODE: LazyLock = LazyLock::new(|| { + match env::var("TURBO_TASKS_FORCE_WATCH_MODE").as_deref() { + Ok("recursive") => { + return RecursiveMode::Recursive; + } + Ok("nonrecursive") => { + return RecursiveMode::NonRecursive; + } + Ok(_) => { + eprintln!( + "unsupported `TURBO_TASKS_FORCE_WATCH_MODE`, must be `recursive` or `nonrecursive`" + ); + } + _ => {} + } + if cfg!(any(target_os = "macos", target_os = "windows")) { + // these platforms have efficient recursive watchers, it's best to track the entire + // directory and filter events to the files we care about + RecursiveMode::Recursive + } else { + // inotify on linux is non-recursive, so notify-rs's implementation is inefficient, it's + // better for us to just track it ourselves and only watch the files we know we care about + // + // See: https://github.com/vercel/turborepo/pull/4100 + RecursiveMode::NonRecursive + } +}); + +/// A thin wrapper around [`RecommendedWatcher`] and [`PollWatcher`]. enum DiskWatcherInternal { Recommended(RecommendedWatcher), Polling(PollWatcher), @@ -46,76 +76,96 @@ impl DiskWatcherInternal { } } -#[derive(Default, Serialize, Deserialize)] +#[derive(Serialize, Deserialize)] pub(crate) struct DiskWatcher { - #[serde(skip)] - watcher: Mutex>, - /// Array of paths that should not notify invalidations. /// `notify` currently doesn't support unwatching subpaths from the root, /// so underlying we still watches filesystem event but only skips to /// invalidate. ignored_subpaths: Vec, - /// Keeps track of which directories are currently watched. This is only - /// used on OSs that doesn't support recursive watching. - #[cfg(not(any(target_os = "macos", target_os = "windows")))] #[serde(skip)] - watching: dashmap::DashSet, + internal: Mutex>, + + #[serde(skip, default = "NonRecursiveDiskWatcherState::try_new")] + pub(crate) non_recursive_state: Option, } -impl DiskWatcher { - pub(crate) fn new(ignored_subpaths: Vec) -> Self { +impl Default for DiskWatcher { + fn default() -> Self { Self { - ignored_subpaths, - ..Default::default() + ignored_subpaths: Vec::new(), + internal: Mutex::new(None), + non_recursive_state: NonRecursiveDiskWatcherState::try_new(), + } + } +} + +/// Extra state used by [`DiskWatcher`] when [`WATCH_RECURSIVE_MODE`] is +/// [`RecursiveMode::NonRecursive`] (default on Linux). +pub(crate) struct NonRecursiveDiskWatcherState { + /// Keeps track of which directories are currently (or were previously) watched. + watching: DashSet, +} + +impl NonRecursiveDiskWatcherState { + fn try_new() -> Option { + match *WATCH_RECURSIVE_MODE { + RecursiveMode::Recursive => None, + RecursiveMode::NonRecursive => Some(NonRecursiveDiskWatcherState { + watching: DashSet::new(), + }), } } /// Called after a rescan in case a previously watched-but-deleted directory was recreated. - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - pub(crate) fn restore_all_watching(&self, root_path: &Path) { - let mut watcher = self.watcher.lock().unwrap(); + pub(crate) fn restore_all_watching(&self, watcher: &DiskWatcher, root_path: &Path) { + let mut internal = watcher.internal.lock().unwrap(); for dir_path in self.watching.iter() { // TODO: Report diagnostics if this error happens - let _ = self.start_watching_dir(&mut watcher, &dir_path, root_path); + let _ = self.start_watching_dir(&mut internal, &dir_path, root_path); } } /// Called when a new directory is found in a parent directory we're watching. - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - pub(crate) fn restore_if_watching(&self, dir_path: &Path, root_path: &Path) -> Result<()> { + pub(crate) fn restore_if_watching( + &self, + watcher: &DiskWatcher, + dir_path: &Path, + root_path: &Path, + ) -> Result<()> { if self.watching.contains(dir_path) { - let mut watcher = self.watcher.lock().unwrap(); + let mut internal = watcher.internal.lock().unwrap(); // TODO: Also restore any watchers for children of this directory - self.start_watching_dir(&mut watcher, dir_path, root_path)?; + self.start_watching_dir(&mut internal, dir_path, root_path)?; } Ok(()) } - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - pub(crate) fn ensure_watching(&self, dir_path: &Path, root_path: &Path) -> Result<()> { + pub(crate) fn ensure_watching( + &self, + watcher: &DiskWatcher, + dir_path: &Path, + root_path: &Path, + ) -> Result<()> { if self.watching.contains(dir_path) { return Ok(()); } - let mut watcher = self.watcher.lock().unwrap(); + let mut internal = watcher.internal.lock().unwrap(); if self.watching.insert(dir_path.to_path_buf()) { - self.start_watching_dir(&mut watcher, dir_path, root_path)?; + self.start_watching_dir(&mut internal, dir_path, root_path)?; } Ok(()) } /// Private helper, assumes that the path has already been added to `self.watching`. - #[cfg(not(any(target_os = "macos", target_os = "windows")))] fn start_watching_dir( &self, - watcher: &mut std::sync::MutexGuard>, + watcher_internal_guard: &mut MutexGuard>, dir_path: &Path, root_path: &Path, ) -> Result<()> { - use anyhow::Context; // inner import due to conditional compilation - - if let Some(watcher) = watcher.as_mut() { + if let Some(watcher_internal_guard) = watcher_internal_guard.as_mut() { let mut path = dir_path; let err_with_context = |err| { return Err(err).context(format!( @@ -124,7 +174,7 @@ impl DiskWatcher { path.display() )); }; - while let Err(err) = watcher.watch(path, RecursiveMode::NonRecursive) { + while let Err(err) = watcher_internal_guard.watch(path, RecursiveMode::NonRecursive) { match err { notify::Error { kind: notify::ErrorKind::PathNotFound, @@ -153,6 +203,15 @@ impl DiskWatcher { } Ok(()) } +} + +impl DiskWatcher { + pub(crate) fn new(ignored_subpaths: Vec) -> Self { + Self { + ignored_subpaths, + ..Default::default() + } + } /// Create a watcher and start watching by creating `debounced` watcher /// via `full debouncer` @@ -171,12 +230,12 @@ impl DiskWatcher { /// - Doesn't emit Modify events after a Create event pub(crate) fn start_watching( &self, - inner: Arc, + fs_inner: Arc, report_invalidation_reason: bool, poll_interval: Option, ) -> Result<()> { - let mut watcher_guard = self.watcher.lock().unwrap(); - if watcher_guard.is_some() { + let mut internal_guard = self.internal.lock().unwrap(); + if internal_guard.is_some() { return Ok(()); } @@ -188,7 +247,7 @@ impl DiskWatcher { // we should track and invalidate each part of a symlink chain ourselves in turbo-tasks-fs config.with_follow_symlinks(false); - let mut watcher = if let Some(poll_interval) = poll_interval { + let mut internal = if let Some(poll_interval) = poll_interval { let config = config.with_poll_interval(poll_interval); DiskWatcherInternal::Polling(PollWatcher::new(tx, config)?) @@ -196,25 +255,21 @@ impl DiskWatcher { DiskWatcherInternal::Recommended(RecommendedWatcher::new(tx, Config::default())?) }; - // Macos and Windows provide efficient recursive directory watchers. On other platforms, we - // only track the directories we need: https://github.com/vercel/turborepo/pull/4100 - #[cfg(any(target_os = "macos", target_os = "windows"))] - { - watcher.watch(inner.root_path(), RecursiveMode::Recursive)?; - } - - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - for dir_path in self.watching.iter() { - watcher.watch(&dir_path, RecursiveMode::NonRecursive)?; + if let Some(non_recursive) = &self.non_recursive_state { + for dir_path in non_recursive.watching.iter() { + internal.watch(&dir_path, RecursiveMode::NonRecursive)?; + } + } else { + internal.watch(fs_inner.root_path(), RecursiveMode::Recursive)?; } // We need to invalidate all reads that happened before watching // Best is to start_watching before starting to read { - let _span = tracing::info_span!("invalidate filesystem").entered(); - let span = tracing::Span::current(); - let invalidator_map = take(&mut *inner.invalidator_map.lock().unwrap()); - let dir_invalidator_map = take(&mut *inner.dir_invalidator_map.lock().unwrap()); + let span = tracing::info_span!("invalidate filesystem"); + let _span = span.clone().entered(); + let invalidator_map = take(&mut *fs_inner.invalidator_map.lock().unwrap()); + let dir_invalidator_map = take(&mut *fs_inner.dir_invalidator_map.lock().unwrap()); let iter = invalidator_map .into_par_iter() .chain(dir_invalidator_map.into_par_iter()); @@ -223,8 +278,9 @@ impl DiskWatcher { iter.flat_map(|(path, invalidators)| { let _span = span.clone().entered(); let reason = WatchStart { - name: inner.name.clone(), - path: path.into(), + name: fs_inner.name.clone(), + // this path is just used for display purposes + path: RcStr::from(path.to_string_lossy()), }; invalidators .into_par_iter() @@ -248,21 +304,21 @@ impl DiskWatcher { } } - watcher_guard.replace(watcher); - drop(watcher_guard); + internal_guard.replace(internal); + drop(internal_guard); spawn_thread(move || { - inner + fs_inner .clone() .watcher - .watch_thread(rx, inner, report_invalidation_reason) + .watch_thread(rx, fs_inner, report_invalidation_reason) }); Ok(()) } pub(crate) fn stop_watching(&self) { - if let Some(watcher) = self.watcher.lock().unwrap().take() { + if let Some(watcher) = self.internal.lock().unwrap().take() { drop(watcher); // thread will detect the stop because the channel is disconnected } @@ -283,8 +339,11 @@ impl DiskWatcher { let mut batched_invalidate_path_and_children = FxHashSet::default(); let mut batched_invalidate_path_and_children_dir = FxHashSet::default(); - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - let mut batched_new_paths = FxHashSet::default(); + let mut batched_new_paths = if self.non_recursive_state.is_some() { + Some(FxHashSet::default()) + } else { + None + }; 'outer: loop { let mut event_result = rx.recv().or(Err(TryRecvError::Disconnected)); @@ -304,18 +363,20 @@ impl DiskWatcher { if event.need_rescan() { let _lock = inner.invalidation_lock.blocking_write(); - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - { + if let Some(non_recursive) = &self.non_recursive_state { // we can't narrow this down to a smaller set of paths: Rescan // events (at least when tested on Linux) come with no `paths`, and // we use only one global `notify::Watcher` instance. - self.restore_all_watching(inner.root_path()); - batched_new_paths.clear(); + non_recursive.restore_all_watching(self, inner.root_path()); + if let Some(batched_new_paths) = &mut batched_new_paths { + batched_new_paths.clear(); + } } if report_invalidation_reason { inner.invalidate_with_reason(|path| InvalidateRescan { - path: RcStr::from(path), + // this path is just used for display purposes + path: RcStr::from(path.to_string_lossy()), }); } else { inner.invalidate(); @@ -380,8 +441,9 @@ impl DiskWatcher { } }); - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - batched_new_paths.extend(paths.clone()); + if let Some(batched_new_paths) = &mut batched_new_paths { + batched_new_paths.extend(paths.clone()); + } } EventKind::Remove(_) => { batched_invalidate_path_and_children.extend(paths.clone()); @@ -406,8 +468,9 @@ impl DiskWatcher { if let Some(parent) = destination.parent() { batched_invalidate_path_dir.insert(PathBuf::from(parent)); } - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - batched_new_paths.insert(destination.clone()); + if let Some(batched_new_paths) = &mut batched_new_paths { + batched_new_paths.insert(destination.clone()); + } } else { // If we hit here, we expect this as a bug either in // notify or system weirdness. @@ -478,13 +541,11 @@ impl DiskWatcher { } // We need to start watching first before invalidating the changed paths... - // This is only needed on platforms we don't do recursive watching on: - // https://github.com/vercel/turborepo/pull/4100 - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - { - for path in batched_new_paths.drain() { + // This is only needed on platforms we don't do recursive watching on. + if let Some(non_recursive) = &self.non_recursive_state { + for path in batched_new_paths.as_mut().unwrap().drain() { // TODO: Report diagnostics if this error happens - let _ = self.restore_if_watching(&path, inner.root_path()); + let _ = non_recursive.restore_if_watching(self, &path, inner.root_path()); } } @@ -542,12 +603,11 @@ fn invalidate( fn invalidate_path( inner: &DiskFileSystemInner, report_invalidation_reason: bool, - invalidator_map: &mut FxHashMap>>, + invalidator_map: &mut LockedInvalidatorMap, paths: impl Iterator, ) { for path in paths { - let key = path_to_key(&path); - if let Some(invalidators) = invalidator_map.remove(&key) { + if let Some(invalidators) = invalidator_map.remove(&path) { invalidators .into_iter() .for_each(|(i, _)| invalidate(inner, report_invalidation_reason, &path, i)); @@ -558,12 +618,11 @@ fn invalidate_path( fn invalidate_path_and_children_execute( inner: &DiskFileSystemInner, report_invalidation_reason: bool, - invalidator_map: &mut FxHashMap>>, + invalidator_map: &mut LockedInvalidatorMap, paths: impl Iterator, ) { for path in paths { - let path_key = path_to_key(&path); - for (_, invalidators) in invalidator_map.extract_if(|key, _| key.starts_with(&path_key)) { + for (_, invalidators) in invalidator_map.extract_path_with_children(&path) { invalidators .into_iter() .for_each(|(i, _)| invalidate(inner, report_invalidation_reason, &path, i)); diff --git a/turbopack/crates/turbo-tasks-fuzz/src/main.rs b/turbopack/crates/turbo-tasks-fuzz/src/main.rs index 9c0e768fdc28..21fa2f1418a5 100644 --- a/turbopack/crates/turbo-tasks-fuzz/src/main.rs +++ b/turbopack/crates/turbo-tasks-fuzz/src/main.rs @@ -50,6 +50,8 @@ struct FsWatcher { file_modifications: u32, #[arg(long, default_value_t = 2)] directory_modifications: u32, + #[arg(long)] + print_missing_invalidations: bool, } #[tokio::main] @@ -135,6 +137,19 @@ async fn fuzz_fs_watcher(args: FsWatcher) -> anyhow::Result<()> { modified_file_paths.len(), invalidations.len() ); + if args.print_missing_invalidations { + let absolute_path_invalidations = invalidations + .iter() + .map(|relative_path| fs_root.join(relative_path)) + .collect::>(); + let mut missing = modified_file_paths + .difference(&absolute_path_invalidations) + .collect::>(); + missing.sort_unstable(); + for path in &missing { + println!(" missing {path:?}"); + } + } invalidations.clear(); } }