From 6f138a47f9902f04df2ad95ab0f4fbf5724e7c85 Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Thu, 24 Jul 2025 14:48:51 -0700 Subject: [PATCH 1/8] Turbopack: Make the "non-recursive" logic for the fs watcher a runtime flag instead of a build-time one --- turbopack/crates/turbo-tasks-fs/src/lib.rs | 19 +- .../crates/turbo-tasks-fs/src/watcher.rs | 200 +++++++++++------- 2 files changed, 138 insertions(+), 81 deletions(-) diff --git a/turbopack/crates/turbo-tasks-fs/src/lib.rs b/turbopack/crates/turbo-tasks-fs/src/lib.rs index 7d58c64af146..de1aaf4898ba 100644 --- a/turbopack/crates/turbo-tasks-fs/src/lib.rs +++ b/turbopack/crates/turbo-tasks-fs/src/lib.rs @@ -255,9 +255,10 @@ impl DiskFileSystemInner { let invalidator = turbo_tasks::get_invalidator(); self.invalidator_map .insert(path_to_key(path), invalidator, None); - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - if let Some(dir) = path.parent() { - self.watcher.ensure_watching(dir, self.root_path())?; + if let Some(non_recursive) = &self.watcher.non_recursive_state + && let Some(dir) = path.parent() + { + non_recursive.ensure_watching(&self.watcher, dir, self.root_path())?; } Ok(()) } @@ -284,9 +285,10 @@ impl DiskFileSystemInner { .collect::>(); invalidators.insert(invalidator, Some(write_content)); drop(invalidator_map); - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - if let Some(dir) = path.parent() { - self.watcher.ensure_watching(dir, self.root_path())?; + if let Some(non_recursive) = &self.watcher.non_recursive_state + && let Some(dir) = path.parent() + { + non_recursive.ensure_watching(&self.watcher, dir, self.root_path())?; } Ok(old_invalidators) } @@ -297,8 +299,9 @@ impl DiskFileSystemInner { let invalidator = turbo_tasks::get_invalidator(); self.dir_invalidator_map .insert(path_to_key(path), invalidator, None); - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - self.watcher.ensure_watching(path, self.root_path())?; + if let Some(non_recursive) = &self.watcher.non_recursive_state { + non_recursive.ensure_watching(&self.watcher, path, self.root_path())?; + } Ok(()) } diff --git a/turbopack/crates/turbo-tasks-fs/src/watcher.rs b/turbopack/crates/turbo-tasks-fs/src/watcher.rs index 0fe0fb5c81cc..379c406c11bf 100644 --- a/turbopack/crates/turbo-tasks-fs/src/watcher.rs +++ b/turbopack/crates/turbo-tasks-fs/src/watcher.rs @@ -1,16 +1,17 @@ use std::{ any::Any, - fmt, + env, fmt, mem::take, path::{Path, PathBuf}, sync::{ - Arc, Mutex, + Arc, LazyLock, Mutex, MutexGuard, mpsc::{Receiver, TryRecvError, channel}, }, time::Duration, }; -use anyhow::Result; +use anyhow::{Context, Result}; +use dashmap::DashSet; use notify::{ Config, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher, event::{MetadataKind, ModifyKind, RenameMode}, @@ -32,6 +33,35 @@ use crate::{ path_to_key, }; +static WATCH_RECURSIVE_MODE: LazyLock = LazyLock::new(|| { + match env::var("TURBO_TASKS_FORCE_WATCH_MODE").as_deref() { + Ok("recursive") => { + return RecursiveMode::Recursive; + } + Ok("nonrecursive") => { + return RecursiveMode::NonRecursive; + } + Ok(_) => { + eprintln!( + "unsupported `TURBO_TASKS_FORCE_WATCH_MODE`, must be `recursive` or `nonrecursive`" + ); + } + _ => {} + } + if cfg!(any(target_os = "macos", target_os = "windows")) { + // these platforms have efficient recursive watchers, it's best to track the entire + // directory and filter events to the files we care about + RecursiveMode::Recursive + } else { + // inotify on linux is non-recursive, so notify-rs's implementation is inefficient, it's + // better for us to just track it ourselves and only watch the files we know we care about + // + // See: https://github.com/vercel/turborepo/pull/4100 + RecursiveMode::NonRecursive + } +}); + +/// A thin wrapper around [`RecommendedWatcher`] and [`PollWatcher`]. enum DiskWatcherInternal { Recommended(RecommendedWatcher), Polling(PollWatcher), @@ -46,76 +76,91 @@ impl DiskWatcherInternal { } } -#[derive(Default, Serialize, Deserialize)] +#[derive(Serialize, Deserialize)] pub(crate) struct DiskWatcher { - #[serde(skip)] - watcher: Mutex>, - /// Array of paths that should not notify invalidations. /// `notify` currently doesn't support unwatching subpaths from the root, /// so underlying we still watches filesystem event but only skips to /// invalidate. ignored_subpaths: Vec, - /// Keeps track of which directories are currently watched. This is only - /// used on OSs that doesn't support recursive watching. - #[cfg(not(any(target_os = "macos", target_os = "windows")))] #[serde(skip)] - watching: dashmap::DashSet, + internal: Mutex>, + + #[serde(skip)] + pub(crate) non_recursive_state: Option, } -impl DiskWatcher { - pub(crate) fn new(ignored_subpaths: Vec) -> Self { +impl Default for DiskWatcher { + fn default() -> Self { Self { - ignored_subpaths, - ..Default::default() + ignored_subpaths: Vec::new(), + internal: Mutex::new(None), + non_recursive_state: match *WATCH_RECURSIVE_MODE { + RecursiveMode::Recursive => None, + RecursiveMode::NonRecursive => Some(NonRecursiveDiskWatcherState { + watching: DashSet::new(), + }), + }, } } +} +/// Keeps track of which directories are currently watched. This is only used on OSs that don't +/// efficiently support recursive watching. +pub(crate) struct NonRecursiveDiskWatcherState { + watching: DashSet, +} + +impl NonRecursiveDiskWatcherState { /// Called after a rescan in case a previously watched-but-deleted directory was recreated. - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - pub(crate) fn restore_all_watching(&self, root_path: &Path) { - let mut watcher = self.watcher.lock().unwrap(); + pub(crate) fn restore_all_watching(&self, watcher: &DiskWatcher, root_path: &Path) { + let mut internal = watcher.internal.lock().unwrap(); for dir_path in self.watching.iter() { // TODO: Report diagnostics if this error happens - let _ = self.start_watching_dir(&mut watcher, &dir_path, root_path); + let _ = self.start_watching_dir(&mut internal, &dir_path, root_path); } } /// Called when a new directory is found in a parent directory we're watching. - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - pub(crate) fn restore_if_watching(&self, dir_path: &Path, root_path: &Path) -> Result<()> { + pub(crate) fn restore_if_watching( + &self, + watcher: &DiskWatcher, + dir_path: &Path, + root_path: &Path, + ) -> Result<()> { if self.watching.contains(dir_path) { - let mut watcher = self.watcher.lock().unwrap(); + let mut internal = watcher.internal.lock().unwrap(); // TODO: Also restore any watchers for children of this directory - self.start_watching_dir(&mut watcher, dir_path, root_path)?; + self.start_watching_dir(&mut internal, dir_path, root_path)?; } Ok(()) } - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - pub(crate) fn ensure_watching(&self, dir_path: &Path, root_path: &Path) -> Result<()> { + pub(crate) fn ensure_watching( + &self, + watcher: &DiskWatcher, + dir_path: &Path, + root_path: &Path, + ) -> Result<()> { if self.watching.contains(dir_path) { return Ok(()); } - let mut watcher = self.watcher.lock().unwrap(); + let mut internal = watcher.internal.lock().unwrap(); if self.watching.insert(dir_path.to_path_buf()) { - self.start_watching_dir(&mut watcher, dir_path, root_path)?; + self.start_watching_dir(&mut internal, dir_path, root_path)?; } Ok(()) } /// Private helper, assumes that the path has already been added to `self.watching`. - #[cfg(not(any(target_os = "macos", target_os = "windows")))] fn start_watching_dir( &self, - watcher: &mut std::sync::MutexGuard>, + watcher_internal_guard: &mut MutexGuard>, dir_path: &Path, root_path: &Path, ) -> Result<()> { - use anyhow::Context; // inner import due to conditional compilation - - if let Some(watcher) = watcher.as_mut() { + if let Some(watcher_internal_guard) = watcher_internal_guard.as_mut() { let mut path = dir_path; let err_with_context = |err| { return Err(err).context(format!( @@ -124,7 +169,7 @@ impl DiskWatcher { path.display() )); }; - while let Err(err) = watcher.watch(path, RecursiveMode::NonRecursive) { + while let Err(err) = watcher_internal_guard.watch(path, RecursiveMode::NonRecursive) { match err { notify::Error { kind: notify::ErrorKind::PathNotFound, @@ -153,6 +198,15 @@ impl DiskWatcher { } Ok(()) } +} + +impl DiskWatcher { + pub(crate) fn new(ignored_subpaths: Vec) -> Self { + Self { + ignored_subpaths, + ..Default::default() + } + } /// Create a watcher and start watching by creating `debounced` watcher /// via `full debouncer` @@ -171,12 +225,12 @@ impl DiskWatcher { /// - Doesn't emit Modify events after a Create event pub(crate) fn start_watching( &self, - inner: Arc, + fs_inner: Arc, report_invalidation_reason: bool, poll_interval: Option, ) -> Result<()> { - let mut watcher_guard = self.watcher.lock().unwrap(); - if watcher_guard.is_some() { + let mut internal_guard = self.internal.lock().unwrap(); + if internal_guard.is_some() { return Ok(()); } @@ -188,7 +242,7 @@ impl DiskWatcher { // we should track and invalidate each part of a symlink chain ourselves in turbo-tasks-fs config.with_follow_symlinks(false); - let mut watcher = if let Some(poll_interval) = poll_interval { + let mut internal = if let Some(poll_interval) = poll_interval { let config = config.with_poll_interval(poll_interval); DiskWatcherInternal::Polling(PollWatcher::new(tx, config)?) @@ -196,25 +250,21 @@ impl DiskWatcher { DiskWatcherInternal::Recommended(RecommendedWatcher::new(tx, Config::default())?) }; - // Macos and Windows provide efficient recursive directory watchers. On other platforms, we - // only track the directories we need: https://github.com/vercel/turborepo/pull/4100 - #[cfg(any(target_os = "macos", target_os = "windows"))] - { - watcher.watch(inner.root_path(), RecursiveMode::Recursive)?; - } - - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - for dir_path in self.watching.iter() { - watcher.watch(&dir_path, RecursiveMode::NonRecursive)?; + if let Some(non_recursive) = &self.non_recursive_state { + for dir_path in non_recursive.watching.iter() { + internal.watch(&dir_path, RecursiveMode::NonRecursive)?; + } + } else { + internal.watch(fs_inner.root_path(), RecursiveMode::Recursive)?; } // We need to invalidate all reads that happened before watching // Best is to start_watching before starting to read { - let _span = tracing::info_span!("invalidate filesystem").entered(); - let span = tracing::Span::current(); - let invalidator_map = take(&mut *inner.invalidator_map.lock().unwrap()); - let dir_invalidator_map = take(&mut *inner.dir_invalidator_map.lock().unwrap()); + let span = tracing::info_span!("invalidate filesystem"); + let _span = span.clone().entered(); + let invalidator_map = take(&mut *fs_inner.invalidator_map.lock().unwrap()); + let dir_invalidator_map = take(&mut *fs_inner.dir_invalidator_map.lock().unwrap()); let iter = invalidator_map .into_par_iter() .chain(dir_invalidator_map.into_par_iter()); @@ -223,7 +273,7 @@ impl DiskWatcher { iter.flat_map(|(path, invalidators)| { let _span = span.clone().entered(); let reason = WatchStart { - name: inner.name.clone(), + name: fs_inner.name.clone(), path: path.into(), }; invalidators @@ -248,21 +298,21 @@ impl DiskWatcher { } } - watcher_guard.replace(watcher); - drop(watcher_guard); + internal_guard.replace(internal); + drop(internal_guard); spawn_thread(move || { - inner + fs_inner .clone() .watcher - .watch_thread(rx, inner, report_invalidation_reason) + .watch_thread(rx, fs_inner, report_invalidation_reason) }); Ok(()) } pub(crate) fn stop_watching(&self) { - if let Some(watcher) = self.watcher.lock().unwrap().take() { + if let Some(watcher) = self.internal.lock().unwrap().take() { drop(watcher); // thread will detect the stop because the channel is disconnected } @@ -283,8 +333,11 @@ impl DiskWatcher { let mut batched_invalidate_path_and_children = FxHashSet::default(); let mut batched_invalidate_path_and_children_dir = FxHashSet::default(); - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - let mut batched_new_paths = FxHashSet::default(); + let mut batched_new_paths = if self.non_recursive_state.is_some() { + Some(FxHashSet::default()) + } else { + None + }; 'outer: loop { let mut event_result = rx.recv().or(Err(TryRecvError::Disconnected)); @@ -304,13 +357,14 @@ impl DiskWatcher { if event.need_rescan() { let _lock = inner.invalidation_lock.blocking_write(); - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - { + if let Some(non_recursive) = &self.non_recursive_state { // we can't narrow this down to a smaller set of paths: Rescan // events (at least when tested on Linux) come with no `paths`, and // we use only one global `notify::Watcher` instance. - self.restore_all_watching(inner.root_path()); - batched_new_paths.clear(); + non_recursive.restore_all_watching(self, inner.root_path()); + if let Some(batched_new_paths) = &mut batched_new_paths { + batched_new_paths.clear(); + } } if report_invalidation_reason { @@ -380,8 +434,9 @@ impl DiskWatcher { } }); - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - batched_new_paths.extend(paths.clone()); + if let Some(batched_new_paths) = &mut batched_new_paths { + batched_new_paths.extend(paths.clone()); + } } EventKind::Remove(_) => { batched_invalidate_path_and_children.extend(paths.clone()); @@ -406,8 +461,9 @@ impl DiskWatcher { if let Some(parent) = destination.parent() { batched_invalidate_path_dir.insert(PathBuf::from(parent)); } - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - batched_new_paths.insert(destination.clone()); + if let Some(batched_new_paths) = &mut batched_new_paths { + batched_new_paths.insert(destination.clone()); + } } else { // If we hit here, we expect this as a bug either in // notify or system weirdness. @@ -478,13 +534,11 @@ impl DiskWatcher { } // We need to start watching first before invalidating the changed paths... - // This is only needed on platforms we don't do recursive watching on: - // https://github.com/vercel/turborepo/pull/4100 - #[cfg(not(any(target_os = "macos", target_os = "windows")))] - { - for path in batched_new_paths.drain() { + // This is only needed on platforms we don't do recursive watching on. + if let Some(non_recursive) = &self.non_recursive_state { + for path in batched_new_paths.as_mut().unwrap().drain() { // TODO: Report diagnostics if this error happens - let _ = self.restore_if_watching(&path, inner.root_path()); + let _ = non_recursive.restore_if_watching(self, &path, inner.root_path()); } } From 84ccc189b6a74ece14f0528267f901817e258ea2 Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Tue, 29 Jul 2025 14:09:17 -0700 Subject: [PATCH 2/8] Address comments --- .../crates/turbo-tasks-fs/src/watcher.rs | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/turbopack/crates/turbo-tasks-fs/src/watcher.rs b/turbopack/crates/turbo-tasks-fs/src/watcher.rs index 379c406c11bf..093b6c5b63b0 100644 --- a/turbopack/crates/turbo-tasks-fs/src/watcher.rs +++ b/turbopack/crates/turbo-tasks-fs/src/watcher.rs @@ -87,7 +87,7 @@ pub(crate) struct DiskWatcher { #[serde(skip)] internal: Mutex>, - #[serde(skip)] + #[serde(skip, default = "NonRecursiveDiskWatcherState::try_new")] pub(crate) non_recursive_state: Option, } @@ -96,23 +96,28 @@ impl Default for DiskWatcher { Self { ignored_subpaths: Vec::new(), internal: Mutex::new(None), - non_recursive_state: match *WATCH_RECURSIVE_MODE { - RecursiveMode::Recursive => None, - RecursiveMode::NonRecursive => Some(NonRecursiveDiskWatcherState { - watching: DashSet::new(), - }), - }, + non_recursive_state: NonRecursiveDiskWatcherState::try_new(), } } } -/// Keeps track of which directories are currently watched. This is only used on OSs that don't -/// efficiently support recursive watching. +/// Extra state used by [`DiskWatcher`] when [`WATCH_RECURSIVE_MODE`] is +/// [`RecursiveMode::NonRecursive`] (default on Linux). pub(crate) struct NonRecursiveDiskWatcherState { + /// Keeps track of which directories are currently (or were previously) watched. watching: DashSet, } impl NonRecursiveDiskWatcherState { + fn try_new() -> Option { + match *WATCH_RECURSIVE_MODE { + RecursiveMode::Recursive => None, + RecursiveMode::NonRecursive => Some(NonRecursiveDiskWatcherState { + watching: DashSet::new(), + }), + } + } + /// Called after a rescan in case a previously watched-but-deleted directory was recreated. pub(crate) fn restore_all_watching(&self, watcher: &DiskWatcher, root_path: &Path) { let mut internal = watcher.internal.lock().unwrap(); From 0dc241a8d665f042538ed26bf4ca7f582981bf97 Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Fri, 25 Jul 2025 13:52:56 -0700 Subject: [PATCH 3/8] Turbopack: Add a flag to the fs watch fuzzer to print missing but expected invalidations --- turbopack/crates/turbo-tasks-fuzz/src/main.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/turbopack/crates/turbo-tasks-fuzz/src/main.rs b/turbopack/crates/turbo-tasks-fuzz/src/main.rs index 9c0e768fdc28..21fa2f1418a5 100644 --- a/turbopack/crates/turbo-tasks-fuzz/src/main.rs +++ b/turbopack/crates/turbo-tasks-fuzz/src/main.rs @@ -50,6 +50,8 @@ struct FsWatcher { file_modifications: u32, #[arg(long, default_value_t = 2)] directory_modifications: u32, + #[arg(long)] + print_missing_invalidations: bool, } #[tokio::main] @@ -135,6 +137,19 @@ async fn fuzz_fs_watcher(args: FsWatcher) -> anyhow::Result<()> { modified_file_paths.len(), invalidations.len() ); + if args.print_missing_invalidations { + let absolute_path_invalidations = invalidations + .iter() + .map(|relative_path| fs_root.join(relative_path)) + .collect::>(); + let mut missing = modified_file_paths + .difference(&absolute_path_invalidations) + .collect::>(); + missing.sort_unstable(); + for path in &missing { + println!(" missing {path:?}"); + } + } invalidations.clear(); } } From 5fd077d40efa3aadca2e44d19ce4343629d607fe Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Fri, 25 Jul 2025 16:13:12 -0700 Subject: [PATCH 4/8] Turbopack: Use `Path`/`PathBuf` for all of the invalidation logic --- crates/next-api/src/project.rs | 10 +++--- .../turbo-tasks-fs/src/invalidator_map.rs | 17 +++++++--- turbopack/crates/turbo-tasks-fs/src/lib.rs | 34 +++++++++---------- .../crates/turbo-tasks-fs/src/watcher.rs | 17 +++++----- 4 files changed, 42 insertions(+), 36 deletions(-) diff --git a/crates/next-api/src/project.rs b/crates/next-api/src/project.rs index 87204aca4256..ec4964a5a359 100644 --- a/crates/next-api/src/project.rs +++ b/crates/next-api/src/project.rs @@ -323,14 +323,15 @@ impl ProjectContainer { .await?; } else { project_fs.invalidate_with_reason(|path| invalidation::Initialize { - path: RcStr::from(path), + // this path is just used for display purposes + path: RcStr::from(path.to_string_lossy()), }); } let output_fs = output_fs_operation(project) .read_strongly_consistent() .await?; output_fs.invalidate_with_reason(|path| invalidation::Initialize { - path: RcStr::from(path), + path: RcStr::from(path.to_string_lossy()), }); Ok(()) } @@ -421,13 +422,14 @@ impl ProjectContainer { .await?; } else { project_fs.invalidate_with_reason(|path| invalidation::Initialize { - path: RcStr::from(path), + // this path is just used for display purposes + path: RcStr::from(path.to_string_lossy()), }); } } if !ReadRef::ptr_eq(&prev_output_fs, &output_fs) { prev_output_fs.invalidate_with_reason(|path| invalidation::Initialize { - path: RcStr::from(path), + path: RcStr::from(path.to_string_lossy()), }); } diff --git a/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs b/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs index b384027f04eb..a99b7f853251 100644 --- a/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs +++ b/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs @@ -1,4 +1,7 @@ -use std::sync::{LockResult, Mutex, MutexGuard}; +use std::{ + path::PathBuf, + sync::{LockResult, Mutex, MutexGuard}, +}; use concurrent_queue::ConcurrentQueue; use rustc_hash::FxHashMap; @@ -13,10 +16,10 @@ pub enum WriteContent { Link(ReadRef), } -type InnerMap = FxHashMap>>; +type InnerMap = FxHashMap>>; pub struct InvalidatorMap { - queue: ConcurrentQueue<(String, Invalidator, Option)>, + queue: ConcurrentQueue<(PathBuf, Invalidator, Option)>, map: Mutex, } @@ -44,7 +47,7 @@ impl InvalidatorMap { pub fn insert( &self, - key: String, + key: PathBuf, invalidator: Invalidator, write_content: Option, ) { @@ -66,7 +69,11 @@ impl Serialize for InvalidatorMap { where S: serde::Serializer, { - serializer.serialize_newtype_struct("InvalidatorMap", &*self.lock().unwrap()) + // TODO: This stores `PathBuf`s, which are machine-specific, but turbo_tasks doesn't + // actually care about these, so we should skip serializing them entirely. Really we just + // need a way to store list of invalidator task ids across restarts. + let inner: &LockedInvalidatorMap = &self.lock().unwrap(); + serializer.serialize_newtype_struct("InvalidatorMap", inner) } } diff --git a/turbopack/crates/turbo-tasks-fs/src/lib.rs b/turbopack/crates/turbo-tasks-fs/src/lib.rs index de1aaf4898ba..2893e0b60924 100644 --- a/turbopack/crates/turbo-tasks-fs/src/lib.rs +++ b/turbopack/crates/turbo-tasks-fs/src/lib.rs @@ -254,7 +254,7 @@ impl DiskFileSystemInner { fn register_read_invalidator(&self, path: &Path) -> Result<()> { let invalidator = turbo_tasks::get_invalidator(); self.invalidator_map - .insert(path_to_key(path), invalidator, None); + .insert(path.to_owned(), invalidator, None); if let Some(non_recursive) = &self.watcher.non_recursive_state && let Some(dir) = path.parent() { @@ -273,7 +273,7 @@ impl DiskFileSystemInner { write_content: WriteContent, ) -> Result)>> { let mut invalidator_map = self.invalidator_map.lock().unwrap(); - let invalidators = invalidator_map.entry(path_to_key(path)).or_default(); + let invalidators = invalidator_map.entry(path.to_owned()).or_default(); let old_invalidators = invalidators .extract_if(|i, old_write_content| { i == &invalidator @@ -298,7 +298,7 @@ impl DiskFileSystemInner { fn register_dir_invalidator(&self, path: &Path) -> Result<()> { let invalidator = turbo_tasks::get_invalidator(); self.dir_invalidator_map - .insert(path_to_key(path), invalidator, None); + .insert(path.to_owned(), invalidator, None); if let Some(non_recursive) = &self.watcher.non_recursive_state { non_recursive.ensure_watching(&self.watcher, path, self.root_path())?; } @@ -333,7 +333,7 @@ impl DiskFileSystemInner { /// Calls the given fn invalidate_with_reason( &self, - reason: impl Fn(String) -> R + Sync, + reason: impl Fn(&Path) -> R + Sync, ) { let _span = tracing::info_span!("invalidate filesystem", name = &*self.root).entered(); let span = tracing::Span::current(); @@ -345,7 +345,7 @@ impl DiskFileSystemInner { .chain(dir_invalidator_map.into_par_iter()) .flat_map(|(path, invalidators)| { let _span = span.clone().entered(); - let reason_for_path = reason(path); + let reason_for_path = reason(&path); invalidators .into_par_iter() .map(move |i| (reason_for_path.clone(), i)) @@ -449,7 +449,7 @@ impl DiskFileSystem { pub fn invalidate_with_reason( &self, - reason: impl Fn(String) -> R + Sync, + reason: impl Fn(&Path) -> R + Sync, ) { self.inner.invalidate_with_reason(reason); } @@ -504,10 +504,6 @@ fn format_absolute_fs_path(path: &Path, name: &str, root_path: &Path) -> Option< } } -pub fn path_to_key(path: impl AsRef) -> String { - path.as_ref().to_string_lossy().to_string() -} - #[turbo_tasks::value_impl] impl DiskFileSystem { /// Create a new instance of `DiskFileSystem`. @@ -756,11 +752,12 @@ impl FileSystem for DiskFileSystem { .await?; if compare == FileComparison::Equal { if !old_invalidators.is_empty() { - let key = path_to_key(&full_path); for (invalidator, write_content) in old_invalidators { - inner - .invalidator_map - .insert(key.clone(), invalidator, write_content); + inner.invalidator_map.insert( + full_path.clone().into_owned(), + invalidator, + write_content, + ); } } return Ok(()); @@ -899,11 +896,12 @@ impl FileSystem for DiskFileSystem { }; if is_equal { if !old_invalidators.is_empty() { - let key = path_to_key(&full_path); for (invalidator, write_content) in old_invalidators { - inner - .invalidator_map - .insert(key.clone(), invalidator, write_content); + inner.invalidator_map.insert( + full_path.clone().into_owned(), + invalidator, + write_content, + ); } } return Ok(()); diff --git a/turbopack/crates/turbo-tasks-fs/src/watcher.rs b/turbopack/crates/turbo-tasks-fs/src/watcher.rs index 093b6c5b63b0..646703b3a05a 100644 --- a/turbopack/crates/turbo-tasks-fs/src/watcher.rs +++ b/turbopack/crates/turbo-tasks-fs/src/watcher.rs @@ -30,7 +30,6 @@ use crate::{ DiskFileSystemInner, format_absolute_fs_path, invalidation::{WatchChange, WatchStart}, invalidator_map::WriteContent, - path_to_key, }; static WATCH_RECURSIVE_MODE: LazyLock = LazyLock::new(|| { @@ -279,7 +278,8 @@ impl DiskWatcher { let _span = span.clone().entered(); let reason = WatchStart { name: fs_inner.name.clone(), - path: path.into(), + // this path is just used for display purposes + path: RcStr::from(path.to_string_lossy()), }; invalidators .into_par_iter() @@ -374,7 +374,8 @@ impl DiskWatcher { if report_invalidation_reason { inner.invalidate_with_reason(|path| InvalidateRescan { - path: RcStr::from(path), + // this path is just used for display purposes + path: RcStr::from(path.to_string_lossy()), }); } else { inner.invalidate(); @@ -601,12 +602,11 @@ fn invalidate( fn invalidate_path( inner: &DiskFileSystemInner, report_invalidation_reason: bool, - invalidator_map: &mut FxHashMap>>, + invalidator_map: &mut FxHashMap>>, paths: impl Iterator, ) { for path in paths { - let key = path_to_key(&path); - if let Some(invalidators) = invalidator_map.remove(&key) { + if let Some(invalidators) = invalidator_map.remove(&path) { invalidators .into_iter() .for_each(|(i, _)| invalidate(inner, report_invalidation_reason, &path, i)); @@ -617,12 +617,11 @@ fn invalidate_path( fn invalidate_path_and_children_execute( inner: &DiskFileSystemInner, report_invalidation_reason: bool, - invalidator_map: &mut FxHashMap>>, + invalidator_map: &mut FxHashMap>>, paths: impl Iterator, ) { for path in paths { - let path_key = path_to_key(&path); - for (_, invalidators) in invalidator_map.extract_if(|key, _| key.starts_with(&path_key)) { + for (_, invalidators) in invalidator_map.extract_if(|key, _| key.starts_with(&path)) { invalidators .into_iter() .for_each(|(i, _)| invalidate(inner, report_invalidation_reason, &path, i)); From e3939b3168bd01e706614d52bea639af316d9f3f Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Tue, 29 Jul 2025 15:16:25 -0700 Subject: [PATCH 5/8] update comment --- .../crates/turbo-tasks-fs/src/invalidator_map.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs b/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs index a99b7f853251..c5c3bf8630c7 100644 --- a/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs +++ b/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs @@ -69,10 +69,14 @@ impl Serialize for InvalidatorMap { where S: serde::Serializer, { - // TODO: This stores `PathBuf`s, which are machine-specific, but turbo_tasks doesn't - // actually care about these, so we should skip serializing them entirely. Really we just - // need a way to store list of invalidator task ids across restarts. - let inner: &LockedInvalidatorMap = &self.lock().unwrap(); + // TODO: This stores absolute `PathBuf`s, which are machine-specific. This should + // normalize/denormalize paths relative to the disk filesystem root. + // + // Potential optimization: We invalidate all fs reads immediately upon resuming from a + // persisted cache, but we don't invalidate the fs writes. Those read invalidations trigger + // re-inserts into the `InvalidatorMap`. If we knew that certain invalidators were only + // needed for reads, we could potentially avoid serializing those paths entirely. + let inner: &InnerMap = &self.lock().unwrap(); serializer.serialize_newtype_struct("InvalidatorMap", inner) } } From 109bb93744346b400ca5589104b33d30bcfc72e0 Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Mon, 28 Jul 2025 15:58:47 -0700 Subject: [PATCH 6/8] Turbopack: Use a BTreeMap in InvalidatorMap to avoid many `O(n)` traversals --- .../turbo-tasks-fs/src/invalidator_map.rs | 11 +-- turbopack/crates/turbo-tasks-fs/src/lib.rs | 17 ++-- .../crates/turbo-tasks-fs/src/path_map.rs | 80 +++++++++++++++++++ .../crates/turbo-tasks-fs/src/watcher.rs | 11 +-- 4 files changed, 101 insertions(+), 18 deletions(-) create mode 100644 turbopack/crates/turbo-tasks-fs/src/path_map.rs diff --git a/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs b/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs index c5c3bf8630c7..7381550034d2 100644 --- a/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs +++ b/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs @@ -1,4 +1,5 @@ use std::{ + collections::BTreeMap, path::PathBuf, sync::{LockResult, Mutex, MutexGuard}, }; @@ -16,18 +17,18 @@ pub enum WriteContent { Link(ReadRef), } -type InnerMap = FxHashMap>>; +pub type LockedInvalidatorMap = BTreeMap>>; pub struct InvalidatorMap { queue: ConcurrentQueue<(PathBuf, Invalidator, Option)>, - map: Mutex, + map: Mutex, } impl Default for InvalidatorMap { fn default() -> Self { Self { queue: ConcurrentQueue::unbounded(), - map: Default::default(), + map: Mutex::::default(), } } } @@ -37,7 +38,7 @@ impl InvalidatorMap { Self::default() } - pub fn lock(&self) -> LockResult> { + pub fn lock(&self) -> LockResult> { let mut guard = self.map.lock()?; while let Ok((key, value, write_content)) = self.queue.pop() { guard.entry(key).or_default().insert(value, write_content); @@ -76,7 +77,7 @@ impl Serialize for InvalidatorMap { // persisted cache, but we don't invalidate the fs writes. Those read invalidations trigger // re-inserts into the `InvalidatorMap`. If we knew that certain invalidators were only // needed for reads, we could potentially avoid serializing those paths entirely. - let inner: &InnerMap = &self.lock().unwrap(); + let inner: &LockedInvalidatorMap = &self.lock().unwrap(); serializer.serialize_newtype_struct("InvalidatorMap", inner) } } diff --git a/turbopack/crates/turbo-tasks-fs/src/lib.rs b/turbopack/crates/turbo-tasks-fs/src/lib.rs index 2893e0b60924..6955035d4d7c 100644 --- a/turbopack/crates/turbo-tasks-fs/src/lib.rs +++ b/turbopack/crates/turbo-tasks-fs/src/lib.rs @@ -1,4 +1,5 @@ #![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this +#![feature(btree_cursors)] // needed for the `InvalidatorMap` and watcher, reduces time complexity #![feature(trivial_bounds)] #![feature(min_specialization)] #![feature(iter_advance_by)] @@ -16,6 +17,7 @@ pub mod invalidation; mod invalidator_map; pub mod json; mod mutex_map; +mod path_map; mod read_glob; mod retry; pub mod rope; @@ -23,6 +25,7 @@ pub mod source_context; pub mod util; pub(crate) mod virtual_fs; mod watcher; + use std::{ borrow::Cow, cmp::{Ordering, min}, @@ -40,14 +43,10 @@ use anyhow::{Context, Result, anyhow, bail}; use auto_hash_map::{AutoMap, AutoSet}; use bitflags::bitflags; use dunce::simplified; -use glob::Glob; use indexmap::IndexSet; -use invalidator_map::InvalidatorMap; use jsonc_parser::{ParseOptions, parse_to_serde_value}; use mime::Mime; use rayon::iter::{IntoParallelIterator, ParallelIterator}; -pub use read_glob::ReadGlobResult; -use read_glob::{read_glob, track_glob}; use rustc_hash::FxHashSet; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -60,17 +59,19 @@ use turbo_tasks::{ mark_session_dependent, mark_stateful, trace::TraceRawVcs, }; use turbo_tasks_hash::{DeterministicHash, DeterministicHasher, hash_xxh3_hash64}; -use util::{extract_disk_access, join_path, normalize_path, sys_to_unix, unix_to_sys}; -pub use virtual_fs::VirtualFileSystem; -use watcher::DiskWatcher; use self::{invalidation::Write, json::UnparsableJson, mutex_map::MutexMap}; use crate::{ attach::AttachedFileSystem, - invalidator_map::WriteContent, + glob::Glob, + invalidator_map::{InvalidatorMap, WriteContent}, + read_glob::{read_glob, track_glob}, retry::retry_blocking, rope::{Rope, RopeReader}, + util::{extract_disk_access, join_path, normalize_path, sys_to_unix, unix_to_sys}, + watcher::DiskWatcher, }; +pub use crate::{read_glob::ReadGlobResult, virtual_fs::VirtualFileSystem}; /// A (somewhat arbitrary) filename limit that we should try to keep output file names below. /// diff --git a/turbopack/crates/turbo-tasks-fs/src/path_map.rs b/turbopack/crates/turbo-tasks-fs/src/path_map.rs new file mode 100644 index 000000000000..82ad3925078a --- /dev/null +++ b/turbopack/crates/turbo-tasks-fs/src/path_map.rs @@ -0,0 +1,80 @@ +use std::{ + collections::{BTreeMap, btree_map::CursorMut}, + ops::Bound, + path::{Path, PathBuf}, +}; + +/// A thin wrapper around [`BTreeMap`] that provides efficient extraction of child +/// paths. +/// +/// In the future, this may use a more efficient representation, like a radix tree or trie. +pub trait OrderedPathMapExt { + fn extract_path_with_children<'a>(&'a mut self, path: &'a Path) -> ExtractWithChildren<'a, V>; +} + +impl OrderedPathMapExt for BTreeMap { + fn extract_path_with_children<'a>(&'a mut self, path: &'a Path) -> ExtractWithChildren<'a, V> { + ExtractWithChildren { + cursor: self.lower_bound_mut(Bound::Included(path)), + parent_path: path, + } + } +} + +pub struct ExtractWithChildren<'a, V> { + cursor: CursorMut<'a, PathBuf, V>, + parent_path: &'a Path, +} + +impl Iterator for ExtractWithChildren<'_, V> { + type Item = (PathBuf, V); + + fn next(&mut self) -> Option { + // this simple implementation works because `Path` implements `Ord` (and `starts_with`) + // using path component comparision, rather than raw byte comparisions. The parent path is + // always guaranteed to be placed immediately before its children (pre-order traversal). + if self + .cursor + .peek_next() + .is_none_or(|(k, _v)| !k.starts_with(self.parent_path)) + { + return None; + } + self.cursor.remove_next() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_with_children() { + let mut map = BTreeMap::default(); + map.insert(PathBuf::from("a"), 1); + map.insert(PathBuf::from("a/b"), 2); + map.insert(PathBuf::from("a/b/c"), 3); + map.insert(PathBuf::from("a/b/d"), 4); + map.insert(PathBuf::from("a/c"), 5); + map.insert(PathBuf::from("x/y/z"), 6); + map.insert(PathBuf::from("z/a/b"), 7); + + let parent_path = PathBuf::from("a/b"); + let extracted: Vec<_> = map.extract_path_with_children(&parent_path).collect(); + + let expected_extracted = vec![ + (PathBuf::from("a/b"), 2), + (PathBuf::from("a/b/c"), 3), + (PathBuf::from("a/b/d"), 4), + ]; + assert_eq!(extracted, expected_extracted); + + let mut expected_remaining = BTreeMap::new(); + expected_remaining.insert(PathBuf::from("a"), 1); + expected_remaining.insert(PathBuf::from("a/c"), 5); + expected_remaining.insert(PathBuf::from("x/y/z"), 6); + expected_remaining.insert(PathBuf::from("z/a/b"), 7); + + assert_eq!(map, expected_remaining); + } +} diff --git a/turbopack/crates/turbo-tasks-fs/src/watcher.rs b/turbopack/crates/turbo-tasks-fs/src/watcher.rs index 646703b3a05a..0717076b93ff 100644 --- a/turbopack/crates/turbo-tasks-fs/src/watcher.rs +++ b/turbopack/crates/turbo-tasks-fs/src/watcher.rs @@ -17,7 +17,7 @@ use notify::{ event::{MetadataKind, ModifyKind, RenameMode}, }; use rayon::iter::{IntoParallelIterator, ParallelIterator}; -use rustc_hash::{FxHashMap, FxHashSet}; +use rustc_hash::FxHashSet; use serde::{Deserialize, Serialize}; use tracing::instrument; use turbo_rcstr::RcStr; @@ -29,7 +29,8 @@ use turbo_tasks::{ use crate::{ DiskFileSystemInner, format_absolute_fs_path, invalidation::{WatchChange, WatchStart}, - invalidator_map::WriteContent, + invalidator_map::LockedInvalidatorMap, + path_map::OrderedPathMapExt, }; static WATCH_RECURSIVE_MODE: LazyLock = LazyLock::new(|| { @@ -602,7 +603,7 @@ fn invalidate( fn invalidate_path( inner: &DiskFileSystemInner, report_invalidation_reason: bool, - invalidator_map: &mut FxHashMap>>, + invalidator_map: &mut LockedInvalidatorMap, paths: impl Iterator, ) { for path in paths { @@ -617,11 +618,11 @@ fn invalidate_path( fn invalidate_path_and_children_execute( inner: &DiskFileSystemInner, report_invalidation_reason: bool, - invalidator_map: &mut FxHashMap>>, + invalidator_map: &mut LockedInvalidatorMap, paths: impl Iterator, ) { for path in paths { - for (_, invalidators) in invalidator_map.extract_if(|key, _| key.starts_with(&path)) { + for (_, invalidators) in invalidator_map.extract_path_with_children(&path) { invalidators .into_iter() .for_each(|(i, _)| invalidate(inner, report_invalidation_reason, &path, i)); From 25cad85c8a315e0e224bb8471b3b0a7877d8f2f8 Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Thu, 24 Jul 2025 15:42:23 -0700 Subject: [PATCH 7/8] Turbopack: Watch the root and every parent directory in non-recursive mode --- .../crates/turbo-tasks-fs/src/watcher.rs | 71 ++++++++++--------- 1 file changed, 38 insertions(+), 33 deletions(-) diff --git a/turbopack/crates/turbo-tasks-fs/src/watcher.rs b/turbopack/crates/turbo-tasks-fs/src/watcher.rs index 0717076b93ff..9285a6ea4bdc 100644 --- a/turbopack/crates/turbo-tasks-fs/src/watcher.rs +++ b/turbopack/crates/turbo-tasks-fs/src/watcher.rs @@ -10,7 +10,7 @@ use std::{ time::Duration, }; -use anyhow::{Context, Result}; +use anyhow::{Context, Result, anyhow}; use dashmap::DashSet; use notify::{ Config, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher, @@ -165,42 +165,46 @@ impl NonRecursiveDiskWatcherState { dir_path: &Path, root_path: &Path, ) -> Result<()> { - if let Some(watcher_internal_guard) = watcher_internal_guard.as_mut() { - let mut path = dir_path; - let err_with_context = |err| { - return Err(err).context(format!( - "Unable to watch {} (tried up to {})", - dir_path.display(), - path.display() - )); - }; - while let Err(err) = watcher_internal_guard.watch(path, RecursiveMode::NonRecursive) { - match err { - notify::Error { - kind: notify::ErrorKind::PathNotFound, - .. - } => { - // The path was probably deleted before we could process the event. That's - // okay, just make sure we're watching the parent directory, so we can know - // if it gets recreated. - let Some(parent_path) = path.parent() else { - // this should never happen as we break before we reach the root path - return err_with_context(err); - }; - if parent_path == root_path { - // assume there's already a root watcher - break; - } - if !self.watching.insert(parent_path.to_owned()) { - // we're already watching the parent path! - break; - } - path = parent_path; + let Some(watcher_internal_guard) = watcher_internal_guard.as_mut() else { + return Ok(()); + }; + + let mut path = dir_path; + let err_with_context = |err: anyhow::Error| { + return Err(err).context(format!( + "Unable to watch {} (tried up to {})", + dir_path.display(), + path.display() + )); + }; + + // watch every parent: https://docs.rs/notify/latest/notify/#parent-folder-deletion + while path != root_path { + match watcher_internal_guard.watch(path, RecursiveMode::NonRecursive) { + res @ Ok(()) + | res @ Err(notify::Error { + // The path was probably deleted before we could process the event. That's + // okay, just make sure we're watching the parent directory, so we can know + // if it gets recreated. + kind: notify::ErrorKind::PathNotFound, + .. + }) => { + let Some(parent_path) = path.parent() else { + // this should never happen as we break before we reach the root path + return err_with_context(res.err().map_or_else( + || anyhow!("failed to compute parent path"), + |err| err.into(), + )); + }; + if !self.watching.insert(parent_path.to_path_buf()) { + break; } - _ => return err_with_context(err), + path = parent_path; } + Err(err) => return err_with_context(err.into()), } } + Ok(()) } } @@ -256,6 +260,7 @@ impl DiskWatcher { }; if let Some(non_recursive) = &self.non_recursive_state { + internal.watch(fs_inner.root_path(), RecursiveMode::NonRecursive)?; for dir_path in non_recursive.watching.iter() { internal.watch(&dir_path, RecursiveMode::NonRecursive)?; } From eda8d6db7965986672a34bbc18165cacb0bdb39e Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Tue, 29 Jul 2025 14:44:39 -0700 Subject: [PATCH 8/8] Address comments --- .../crates/turbo-tasks-fs/src/watcher.rs | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/turbopack/crates/turbo-tasks-fs/src/watcher.rs b/turbopack/crates/turbo-tasks-fs/src/watcher.rs index 9285a6ea4bdc..860ce446d77c 100644 --- a/turbopack/crates/turbo-tasks-fs/src/watcher.rs +++ b/turbopack/crates/turbo-tasks-fs/src/watcher.rs @@ -105,6 +105,11 @@ impl Default for DiskWatcher { /// [`RecursiveMode::NonRecursive`] (default on Linux). pub(crate) struct NonRecursiveDiskWatcherState { /// Keeps track of which directories are currently (or were previously) watched. + /// + /// Invariants: + /// - Never contains `root_path`. A watcher for `root_path` is implicitly set up during + /// [`DiskWatcher::start_watching`]. + /// - Contains all parent directories up to `root_path` for every entry. watching: DashSet, } @@ -127,28 +132,31 @@ impl NonRecursiveDiskWatcherState { } } - /// Called when a new directory is found in a parent directory we're watching. + /// Called when a new directory is found in a parent directory we're watching. Restores the + /// watcher if we were previously watching it. pub(crate) fn restore_if_watching( &self, watcher: &DiskWatcher, dir_path: &Path, root_path: &Path, ) -> Result<()> { - if self.watching.contains(dir_path) { - let mut internal = watcher.internal.lock().unwrap(); - // TODO: Also restore any watchers for children of this directory - self.start_watching_dir(&mut internal, dir_path, root_path)?; + if dir_path == root_path || !self.watching.contains(dir_path) { + return Ok(()); } - Ok(()) + let mut internal = watcher.internal.lock().unwrap(); + // TODO: Also restore any watchers for children of this directory + self.start_watching_dir(&mut internal, dir_path, root_path) } + /// Called when a file in `dir_path` or `dir_path` itself is read or written. Adds a new watcher + /// if we're not already watching the directory. pub(crate) fn ensure_watching( &self, watcher: &DiskWatcher, dir_path: &Path, root_path: &Path, ) -> Result<()> { - if self.watching.contains(dir_path) { + if dir_path == root_path || self.watching.contains(dir_path) { return Ok(()); } let mut internal = watcher.internal.lock().unwrap(); @@ -165,6 +173,7 @@ impl NonRecursiveDiskWatcherState { dir_path: &Path, root_path: &Path, ) -> Result<()> { + debug_assert_ne!(dir_path, root_path); let Some(watcher_internal_guard) = watcher_internal_guard.as_mut() else { return Ok(()); }; @@ -179,7 +188,7 @@ impl NonRecursiveDiskWatcherState { }; // watch every parent: https://docs.rs/notify/latest/notify/#parent-folder-deletion - while path != root_path { + loop { match watcher_internal_guard.watch(path, RecursiveMode::NonRecursive) { res @ Ok(()) | res @ Err(notify::Error { @@ -196,7 +205,8 @@ impl NonRecursiveDiskWatcherState { |err| err.into(), )); }; - if !self.watching.insert(parent_path.to_path_buf()) { + if parent_path == root_path || !self.watching.insert(parent_path.to_path_buf()) + { break; } path = parent_path;