From 6f138a47f9902f04df2ad95ab0f4fbf5724e7c85 Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Thu, 24 Jul 2025 14:48:51 -0700 Subject: [PATCH 1/3] Turbopack: Make the "non-recursive" logic for the fs watcher a runtime flag instead of a build-time one --- turbopack/crates/turbo-tasks-fs/src/lib.rs | 19 +- .../crates/turbo-tasks-fs/src/watcher.rs | 200 +++++++++++------- 2 files changed, 138 insertions(+), 81 deletions(-) diff --git a/turbopack/crates/turbo-tasks-fs/src/lib.rs b/turbopack/crates/turbo-tasks-fs/src/lib.rs index 7d58c64af146..de1aaf4898ba 100644 --- a/turbopack/crates/turbo-tasks-fs/src/lib.rs +++ b/turbopack/crates/turbo-tasks-fs/src/lib.rs @@ -255,9 +255,10 @@ impl DiskFileSystemInner { let invalidator = turbo_tasks::get_invalidator(); self.invalidator_map .insert(path_to_key(path), invalidator, None); - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - if let Some(dir) = path.parent() { - self.watcher.ensure_watching(dir, self.root_path())?; + if let Some(non_recursive) = &self.watcher.non_recursive_state + && let Some(dir) = path.parent() + { + non_recursive.ensure_watching(&self.watcher, dir, self.root_path())?; } Ok(()) } @@ -284,9 +285,10 @@ impl DiskFileSystemInner { .collect::>(); invalidators.insert(invalidator, Some(write_content)); drop(invalidator_map); - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - if let Some(dir) = path.parent() { - self.watcher.ensure_watching(dir, self.root_path())?; + if let Some(non_recursive) = &self.watcher.non_recursive_state + && let Some(dir) = path.parent() + { + non_recursive.ensure_watching(&self.watcher, dir, self.root_path())?; } Ok(old_invalidators) } @@ -297,8 +299,9 @@ impl DiskFileSystemInner { let invalidator = turbo_tasks::get_invalidator(); self.dir_invalidator_map .insert(path_to_key(path), invalidator, None); - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - self.watcher.ensure_watching(path, self.root_path())?; + if let Some(non_recursive) = &self.watcher.non_recursive_state { + non_recursive.ensure_watching(&self.watcher, path, self.root_path())?; + } Ok(()) } diff --git a/turbopack/crates/turbo-tasks-fs/src/watcher.rs b/turbopack/crates/turbo-tasks-fs/src/watcher.rs index 0fe0fb5c81cc..379c406c11bf 100644 --- a/turbopack/crates/turbo-tasks-fs/src/watcher.rs +++ b/turbopack/crates/turbo-tasks-fs/src/watcher.rs @@ -1,16 +1,17 @@ use std::{ any::Any, - fmt, + env, fmt, mem::take, path::{Path, PathBuf}, sync::{ - Arc, Mutex, + Arc, LazyLock, Mutex, MutexGuard, mpsc::{Receiver, TryRecvError, channel}, }, time::Duration, }; -use anyhow::Result; +use anyhow::{Context, Result}; +use dashmap::DashSet; use notify::{ Config, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher, event::{MetadataKind, ModifyKind, RenameMode}, @@ -32,6 +33,35 @@ use crate::{ path_to_key, }; +static WATCH_RECURSIVE_MODE: LazyLock = LazyLock::new(|| { + match env::var("TURBO_TASKS_FORCE_WATCH_MODE").as_deref() { + Ok("recursive") => { + return RecursiveMode::Recursive; + } + Ok("nonrecursive") => { + return RecursiveMode::NonRecursive; + } + Ok(_) => { + eprintln!( + "unsupported `TURBO_TASKS_FORCE_WATCH_MODE`, must be `recursive` or `nonrecursive`" + ); + } + _ => {} + } + if cfg!(any(target_os = "macos", target_os = "windows")) { + // these platforms have efficient recursive watchers, it's best to track the entire + // directory and filter events to the files we care about + RecursiveMode::Recursive + } else { + // inotify on linux is non-recursive, so notify-rs's implementation is inefficient, it's + // better for us to just track it ourselves and only watch the files we know we care about + // + // See: https://github.com/vercel/turborepo/pull/4100 + RecursiveMode::NonRecursive + } +}); + +/// A thin wrapper around [`RecommendedWatcher`] and [`PollWatcher`]. enum DiskWatcherInternal { Recommended(RecommendedWatcher), Polling(PollWatcher), @@ -46,76 +76,91 @@ impl DiskWatcherInternal { } } -#[derive(Default, Serialize, Deserialize)] +#[derive(Serialize, Deserialize)] pub(crate) struct DiskWatcher { - #[serde(skip)] - watcher: Mutex>, - /// Array of paths that should not notify invalidations. /// `notify` currently doesn't support unwatching subpaths from the root, /// so underlying we still watches filesystem event but only skips to /// invalidate. ignored_subpaths: Vec, - /// Keeps track of which directories are currently watched. This is only - /// used on OSs that doesn't support recursive watching. - #[cfg(not(any(target_os = "macos", target_os = "windows")))] #[serde(skip)] - watching: dashmap::DashSet, + internal: Mutex>, + + #[serde(skip)] + pub(crate) non_recursive_state: Option, } -impl DiskWatcher { - pub(crate) fn new(ignored_subpaths: Vec) -> Self { +impl Default for DiskWatcher { + fn default() -> Self { Self { - ignored_subpaths, - ..Default::default() + ignored_subpaths: Vec::new(), + internal: Mutex::new(None), + non_recursive_state: match *WATCH_RECURSIVE_MODE { + RecursiveMode::Recursive => None, + RecursiveMode::NonRecursive => Some(NonRecursiveDiskWatcherState { + watching: DashSet::new(), + }), + }, } } +} +/// Keeps track of which directories are currently watched. This is only used on OSs that don't +/// efficiently support recursive watching. +pub(crate) struct NonRecursiveDiskWatcherState { + watching: DashSet, +} + +impl NonRecursiveDiskWatcherState { /// Called after a rescan in case a previously watched-but-deleted directory was recreated. - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - pub(crate) fn restore_all_watching(&self, root_path: &Path) { - let mut watcher = self.watcher.lock().unwrap(); + pub(crate) fn restore_all_watching(&self, watcher: &DiskWatcher, root_path: &Path) { + let mut internal = watcher.internal.lock().unwrap(); for dir_path in self.watching.iter() { // TODO: Report diagnostics if this error happens - let _ = self.start_watching_dir(&mut watcher, &dir_path, root_path); + let _ = self.start_watching_dir(&mut internal, &dir_path, root_path); } } /// Called when a new directory is found in a parent directory we're watching. - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - pub(crate) fn restore_if_watching(&self, dir_path: &Path, root_path: &Path) -> Result<()> { + pub(crate) fn restore_if_watching( + &self, + watcher: &DiskWatcher, + dir_path: &Path, + root_path: &Path, + ) -> Result<()> { if self.watching.contains(dir_path) { - let mut watcher = self.watcher.lock().unwrap(); + let mut internal = watcher.internal.lock().unwrap(); // TODO: Also restore any watchers for children of this directory - self.start_watching_dir(&mut watcher, dir_path, root_path)?; + self.start_watching_dir(&mut internal, dir_path, root_path)?; } Ok(()) } - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - pub(crate) fn ensure_watching(&self, dir_path: &Path, root_path: &Path) -> Result<()> { + pub(crate) fn ensure_watching( + &self, + watcher: &DiskWatcher, + dir_path: &Path, + root_path: &Path, + ) -> Result<()> { if self.watching.contains(dir_path) { return Ok(()); } - let mut watcher = self.watcher.lock().unwrap(); + let mut internal = watcher.internal.lock().unwrap(); if self.watching.insert(dir_path.to_path_buf()) { - self.start_watching_dir(&mut watcher, dir_path, root_path)?; + self.start_watching_dir(&mut internal, dir_path, root_path)?; } Ok(()) } /// Private helper, assumes that the path has already been added to `self.watching`. - #[cfg(not(any(target_os = "macos", target_os = "windows")))] fn start_watching_dir( &self, - watcher: &mut std::sync::MutexGuard>, + watcher_internal_guard: &mut MutexGuard>, dir_path: &Path, root_path: &Path, ) -> Result<()> { - use anyhow::Context; // inner import due to conditional compilation - - if let Some(watcher) = watcher.as_mut() { + if let Some(watcher_internal_guard) = watcher_internal_guard.as_mut() { let mut path = dir_path; let err_with_context = |err| { return Err(err).context(format!( @@ -124,7 +169,7 @@ impl DiskWatcher { path.display() )); }; - while let Err(err) = watcher.watch(path, RecursiveMode::NonRecursive) { + while let Err(err) = watcher_internal_guard.watch(path, RecursiveMode::NonRecursive) { match err { notify::Error { kind: notify::ErrorKind::PathNotFound, @@ -153,6 +198,15 @@ impl DiskWatcher { } Ok(()) } +} + +impl DiskWatcher { + pub(crate) fn new(ignored_subpaths: Vec) -> Self { + Self { + ignored_subpaths, + ..Default::default() + } + } /// Create a watcher and start watching by creating `debounced` watcher /// via `full debouncer` @@ -171,12 +225,12 @@ impl DiskWatcher { /// - Doesn't emit Modify events after a Create event pub(crate) fn start_watching( &self, - inner: Arc, + fs_inner: Arc, report_invalidation_reason: bool, poll_interval: Option, ) -> Result<()> { - let mut watcher_guard = self.watcher.lock().unwrap(); - if watcher_guard.is_some() { + let mut internal_guard = self.internal.lock().unwrap(); + if internal_guard.is_some() { return Ok(()); } @@ -188,7 +242,7 @@ impl DiskWatcher { // we should track and invalidate each part of a symlink chain ourselves in turbo-tasks-fs config.with_follow_symlinks(false); - let mut watcher = if let Some(poll_interval) = poll_interval { + let mut internal = if let Some(poll_interval) = poll_interval { let config = config.with_poll_interval(poll_interval); DiskWatcherInternal::Polling(PollWatcher::new(tx, config)?) @@ -196,25 +250,21 @@ impl DiskWatcher { DiskWatcherInternal::Recommended(RecommendedWatcher::new(tx, Config::default())?) }; - // Macos and Windows provide efficient recursive directory watchers. On other platforms, we - // only track the directories we need: https://github.com/vercel/turborepo/pull/4100 - #[cfg(any(target_os = "macos", target_os = "windows"))] - { - watcher.watch(inner.root_path(), RecursiveMode::Recursive)?; - } - - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - for dir_path in self.watching.iter() { - watcher.watch(&dir_path, RecursiveMode::NonRecursive)?; + if let Some(non_recursive) = &self.non_recursive_state { + for dir_path in non_recursive.watching.iter() { + internal.watch(&dir_path, RecursiveMode::NonRecursive)?; + } + } else { + internal.watch(fs_inner.root_path(), RecursiveMode::Recursive)?; } // We need to invalidate all reads that happened before watching // Best is to start_watching before starting to read { - let _span = tracing::info_span!("invalidate filesystem").entered(); - let span = tracing::Span::current(); - let invalidator_map = take(&mut *inner.invalidator_map.lock().unwrap()); - let dir_invalidator_map = take(&mut *inner.dir_invalidator_map.lock().unwrap()); + let span = tracing::info_span!("invalidate filesystem"); + let _span = span.clone().entered(); + let invalidator_map = take(&mut *fs_inner.invalidator_map.lock().unwrap()); + let dir_invalidator_map = take(&mut *fs_inner.dir_invalidator_map.lock().unwrap()); let iter = invalidator_map .into_par_iter() .chain(dir_invalidator_map.into_par_iter()); @@ -223,7 +273,7 @@ impl DiskWatcher { iter.flat_map(|(path, invalidators)| { let _span = span.clone().entered(); let reason = WatchStart { - name: inner.name.clone(), + name: fs_inner.name.clone(), path: path.into(), }; invalidators @@ -248,21 +298,21 @@ impl DiskWatcher { } } - watcher_guard.replace(watcher); - drop(watcher_guard); + internal_guard.replace(internal); + drop(internal_guard); spawn_thread(move || { - inner + fs_inner .clone() .watcher - .watch_thread(rx, inner, report_invalidation_reason) + .watch_thread(rx, fs_inner, report_invalidation_reason) }); Ok(()) } pub(crate) fn stop_watching(&self) { - if let Some(watcher) = self.watcher.lock().unwrap().take() { + if let Some(watcher) = self.internal.lock().unwrap().take() { drop(watcher); // thread will detect the stop because the channel is disconnected } @@ -283,8 +333,11 @@ impl DiskWatcher { let mut batched_invalidate_path_and_children = FxHashSet::default(); let mut batched_invalidate_path_and_children_dir = FxHashSet::default(); - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - let mut batched_new_paths = FxHashSet::default(); + let mut batched_new_paths = if self.non_recursive_state.is_some() { + Some(FxHashSet::default()) + } else { + None + }; 'outer: loop { let mut event_result = rx.recv().or(Err(TryRecvError::Disconnected)); @@ -304,13 +357,14 @@ impl DiskWatcher { if event.need_rescan() { let _lock = inner.invalidation_lock.blocking_write(); - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - { + if let Some(non_recursive) = &self.non_recursive_state { // we can't narrow this down to a smaller set of paths: Rescan // events (at least when tested on Linux) come with no `paths`, and // we use only one global `notify::Watcher` instance. - self.restore_all_watching(inner.root_path()); - batched_new_paths.clear(); + non_recursive.restore_all_watching(self, inner.root_path()); + if let Some(batched_new_paths) = &mut batched_new_paths { + batched_new_paths.clear(); + } } if report_invalidation_reason { @@ -380,8 +434,9 @@ impl DiskWatcher { } }); - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - batched_new_paths.extend(paths.clone()); + if let Some(batched_new_paths) = &mut batched_new_paths { + batched_new_paths.extend(paths.clone()); + } } EventKind::Remove(_) => { batched_invalidate_path_and_children.extend(paths.clone()); @@ -406,8 +461,9 @@ impl DiskWatcher { if let Some(parent) = destination.parent() { batched_invalidate_path_dir.insert(PathBuf::from(parent)); } - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - batched_new_paths.insert(destination.clone()); + if let Some(batched_new_paths) = &mut batched_new_paths { + batched_new_paths.insert(destination.clone()); + } } else { // If we hit here, we expect this as a bug either in // notify or system weirdness. @@ -478,13 +534,11 @@ impl DiskWatcher { } // We need to start watching first before invalidating the changed paths... - // This is only needed on platforms we don't do recursive watching on: - // https://github.com/vercel/turborepo/pull/4100 - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - { - for path in batched_new_paths.drain() { + // This is only needed on platforms we don't do recursive watching on. + if let Some(non_recursive) = &self.non_recursive_state { + for path in batched_new_paths.as_mut().unwrap().drain() { // TODO: Report diagnostics if this error happens - let _ = self.restore_if_watching(&path, inner.root_path()); + let _ = non_recursive.restore_if_watching(self, &path, inner.root_path()); } } From 84ccc189b6a74ece14f0528267f901817e258ea2 Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Tue, 29 Jul 2025 14:09:17 -0700 Subject: [PATCH 2/3] Address comments --- .../crates/turbo-tasks-fs/src/watcher.rs | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/turbopack/crates/turbo-tasks-fs/src/watcher.rs b/turbopack/crates/turbo-tasks-fs/src/watcher.rs index 379c406c11bf..093b6c5b63b0 100644 --- a/turbopack/crates/turbo-tasks-fs/src/watcher.rs +++ b/turbopack/crates/turbo-tasks-fs/src/watcher.rs @@ -87,7 +87,7 @@ pub(crate) struct DiskWatcher { #[serde(skip)] internal: Mutex>, - #[serde(skip)] + #[serde(skip, default = "NonRecursiveDiskWatcherState::try_new")] pub(crate) non_recursive_state: Option, } @@ -96,23 +96,28 @@ impl Default for DiskWatcher { Self { ignored_subpaths: Vec::new(), internal: Mutex::new(None), - non_recursive_state: match *WATCH_RECURSIVE_MODE { - RecursiveMode::Recursive => None, - RecursiveMode::NonRecursive => Some(NonRecursiveDiskWatcherState { - watching: DashSet::new(), - }), - }, + non_recursive_state: NonRecursiveDiskWatcherState::try_new(), } } } -/// Keeps track of which directories are currently watched. This is only used on OSs that don't -/// efficiently support recursive watching. +/// Extra state used by [`DiskWatcher`] when [`WATCH_RECURSIVE_MODE`] is +/// [`RecursiveMode::NonRecursive`] (default on Linux). pub(crate) struct NonRecursiveDiskWatcherState { + /// Keeps track of which directories are currently (or were previously) watched. watching: DashSet, } impl NonRecursiveDiskWatcherState { + fn try_new() -> Option { + match *WATCH_RECURSIVE_MODE { + RecursiveMode::Recursive => None, + RecursiveMode::NonRecursive => Some(NonRecursiveDiskWatcherState { + watching: DashSet::new(), + }), + } + } + /// Called after a rescan in case a previously watched-but-deleted directory was recreated. pub(crate) fn restore_all_watching(&self, watcher: &DiskWatcher, root_path: &Path) { let mut internal = watcher.internal.lock().unwrap(); From 0dc241a8d665f042538ed26bf4ca7f582981bf97 Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Fri, 25 Jul 2025 13:52:56 -0700 Subject: [PATCH 3/3] Turbopack: Add a flag to the fs watch fuzzer to print missing but expected invalidations --- turbopack/crates/turbo-tasks-fuzz/src/main.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/turbopack/crates/turbo-tasks-fuzz/src/main.rs b/turbopack/crates/turbo-tasks-fuzz/src/main.rs index 9c0e768fdc28..21fa2f1418a5 100644 --- a/turbopack/crates/turbo-tasks-fuzz/src/main.rs +++ b/turbopack/crates/turbo-tasks-fuzz/src/main.rs @@ -50,6 +50,8 @@ struct FsWatcher { file_modifications: u32, #[arg(long, default_value_t = 2)] directory_modifications: u32, + #[arg(long)] + print_missing_invalidations: bool, } #[tokio::main] @@ -135,6 +137,19 @@ async fn fuzz_fs_watcher(args: FsWatcher) -> anyhow::Result<()> { modified_file_paths.len(), invalidations.len() ); + if args.print_missing_invalidations { + let absolute_path_invalidations = invalidations + .iter() + .map(|relative_path| fs_root.join(relative_path)) + .collect::>(); + let mut missing = modified_file_paths + .difference(&absolute_path_invalidations) + .collect::>(); + missing.sort_unstable(); + for path in &missing { + println!(" missing {path:?}"); + } + } invalidations.clear(); } }