Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 82 additions & 46 deletions turbopack/crates/turbo-tasks-fs/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use std::{
mem::take,
path::{Path, PathBuf},
sync::{
Arc, LazyLock, Mutex, MutexGuard,
Arc, LazyLock, Mutex,
mpsc::{Receiver, TryRecvError, channel},
},
time::Duration,
};

use anyhow::{Context, Result, anyhow};
use anyhow::{Context, Result};
use dashmap::DashSet;
use notify::{
Config, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher,
Expand Down Expand Up @@ -78,6 +78,8 @@ impl DiskWatcherInternal {

#[derive(Serialize, Deserialize)]
pub(crate) struct DiskWatcher {
/// This value is [`None`] when the watcher has been stopped (see
/// [`DiskWatcher::stop_watching`]).
#[serde(skip)]
internal: Mutex<Option<DiskWatcherInternal>>,

Expand Down Expand Up @@ -118,10 +120,17 @@ impl NonRecursiveDiskWatcherState {

/// Called after a rescan in case a previously watched-but-deleted directory was recreated.
pub(crate) fn restore_all_watching(&self, watcher: &DiskWatcher, root_path: &Path) {
let mut internal = watcher.internal.lock().unwrap();
let mut internal_guard = watcher.internal.lock().unwrap();
let Some(internal) = &mut *internal_guard else {
return;
};
for dir_path in self.watching.iter() {
// TODO: Report diagnostics if this error happens
let _ = self.start_watching_dir(&mut internal, &dir_path, root_path);
//
// Don't watch the parents, because those are already included in `self.watching` (so
// it'd be redundant), but also because this could deadlock, since we'd try to modify
// `self.watching` while iterating over it (write lock overlapping with a read lock).
let _ = self.start_watching_dir(internal, &dir_path, root_path);
}
}

Expand All @@ -136,9 +145,24 @@ impl NonRecursiveDiskWatcherState {
if dir_path == root_path || !self.watching.contains(dir_path) {
return Ok(());
}
let mut internal = watcher.internal.lock().unwrap();
// TODO: Also restore any watchers for children of this directory
self.start_watching_dir(&mut internal, dir_path, root_path)
let mut internal_guard = watcher.internal.lock().unwrap();
let Some(internal) = &mut *internal_guard else {
return Ok(());
};

// watch the new directory
self.start_watching_dir(internal, dir_path, root_path)?;

// Also try to restore any watchers for children of this directory
for child_path in self
.watching
.iter()
.filter(|p| p.key().starts_with(dir_path) && **p != dir_path)
{
// Don't watch the parents -- see the comment on `restore_all_watching`
self.start_watching_dir(internal, child_path.key(), root_path)?;
}
Ok(())
}

/// Called when a file in `dir_path` or `dir_path` itself is read or written. Adds a new watcher
Expand All @@ -152,60 +176,72 @@ impl NonRecursiveDiskWatcherState {
if dir_path == root_path || self.watching.contains(dir_path) {
return Ok(());
}
let mut internal = watcher.internal.lock().unwrap();
let mut internal_guard = watcher.internal.lock().unwrap();
let Some(internal) = &mut *internal_guard else {
return Ok(());
};
if self.watching.insert(dir_path.to_path_buf()) {
self.start_watching_dir(&mut internal, dir_path, root_path)?;
self.start_watching_dir_and_parents(internal, dir_path, root_path)?;
}
Ok(())
}

/// Private helper, assumes that the path has already been added to `self.watching`.
/// Private helper, assumes that `dir_path` has already been added to `self.watching`.
///
/// This does not watch any of the parent directories. For that, use
/// [`start_watching_dir_and_parents`]. Use this method when iterating over previously-watched
/// values in `self.watching`.
fn start_watching_dir(
&self,
watcher_internal_guard: &mut MutexGuard<Option<DiskWatcherInternal>>,
watcher_internal: &mut DiskWatcherInternal,
dir_path: &Path,
root_path: &Path,
) -> Result<()> {
debug_assert_ne!(dir_path, root_path);
let Some(watcher_internal_guard) = watcher_internal_guard.as_mut() else {
return Ok(());
};

let mut path = dir_path;
let err_with_context = |err: anyhow::Error| {
return Err(err).context(format!(
"Unable to watch {} (tried up to {})",
dir_path.display(),
path.display()
));
};
match watcher_internal.watch(dir_path, RecursiveMode::NonRecursive) {
Ok(())
| Err(notify::Error {
// The path was probably deleted before we could process the event, but the parent
// should still be watched. The codepaths that care about this either call
// `start_watching_dir_and_parents` or handle the parents themselves.
kind: notify::ErrorKind::PathNotFound,
..
}) => Ok(()),
Err(err) => {
return Err(err).context(format!("Unable to watch {}", dir_path.display(),));
}
}
}

// watch every parent: https://docs.rs/notify/latest/notify/#parent-folder-deletion
/// Private helper, assumes that `dir_path` has already been added to `self.watching`.
///
/// Watches the given `dir_path` and every parent up to `root_path`. Parents must be recursively
/// watched in case any of them change:
/// https://docs.rs/notify/latest/notify/#parent-folder-deletion
fn start_watching_dir_and_parents(
&self,
watcher_internal: &mut DiskWatcherInternal,
dir_path: &Path,
root_path: &Path,
) -> Result<()> {
let mut cur_path = dir_path;
loop {
match watcher_internal_guard.watch(path, RecursiveMode::NonRecursive) {
res @ Ok(())
| res @ Err(notify::Error {
// The path was probably deleted before we could process the event. That's
// okay, just make sure we're watching the parent directory, so we can know
// if it gets recreated.
kind: notify::ErrorKind::PathNotFound,
..
}) => {
let Some(parent_path) = path.parent() else {
// this should never happen as we break before we reach the root path
return err_with_context(res.err().map_or_else(
|| anyhow!("failed to compute parent path"),
|err| err.into(),
));
};
if parent_path == root_path || !self.watching.insert(parent_path.to_path_buf())
{
break;
}
path = parent_path;
}
Err(err) => return err_with_context(err.into()),
self.start_watching_dir(watcher_internal, cur_path, root_path)?;

let Some(parent_path) = cur_path.parent() else {
// this should never happen as we break before we reach the root path
anyhow::bail!(
"failed to compute parent path of {cur_path:?} while watching {dir_path:?} in \
root {root_path:?}"
);
};

if parent_path == root_path || !self.watching.insert(parent_path.to_path_buf()) {
break;
}

cur_path = parent_path;
}

Ok(())
Expand Down