diff --git a/crates/smfs-core/src/cache/fs.rs b/crates/smfs-core/src/cache/fs.rs index 0a3f417..31313b2 100644 --- a/crates/smfs-core/src/cache/fs.rs +++ b/crates/smfs-core/src/cache/fs.rs @@ -9,6 +9,7 @@ use parking_lot::Mutex; use super::db::{Db, DENTRY_CACHE_MAX, ROOT_INO}; use super::file::SqliteFile; +use super::hydration::{HydrationKey, HydrationScheduler}; use super::profile::{ProfileFile, PROFILE_INO, PROFILE_NAME}; const DERIVED_SIBLING_SUFFIXES: &[&str] = &[ @@ -34,6 +35,7 @@ pub struct SupermemoryFs { api: Option>, profile_file: Option>, dentry_cache: Mutex>, + hydration: Arc, } impl SupermemoryFs { @@ -44,6 +46,7 @@ impl SupermemoryFs { api: None, profile_file: None, dentry_cache: Mutex::new(LruCache::new(NonZeroUsize::new(DENTRY_CACHE_MAX).unwrap())), + hydration: HydrationScheduler::new(), } } @@ -55,9 +58,18 @@ impl SupermemoryFs { api: Some(api), profile_file: Some(profile_file), dentry_cache: Mutex::new(LruCache::new(NonZeroUsize::new(DENTRY_CACHE_MAX).unwrap())), + hydration: HydrationScheduler::new(), } } + pub fn hydration(&self) -> &Arc { + &self.hydration + } + + pub(crate) async fn hydrate_path(&self, path: &str) -> VfsResult<()> { + self.pull_documents(path).await + } + pub async fn warm_profile(&self) { if let Some(pf) = &self.profile_file { pf.warm().await; @@ -331,6 +343,19 @@ impl SupermemoryFs { }; if let Some(ino_i64) = existing_ino { let ino = ino_i64 as u64; + + // Attach remote_id even on skip so the queued push promotes + // Create→Update instead of POSTing a duplicate. + if let Some(dirty_since) = self.db.get_dirty_since(ino) { + if let Some(ms) = updated_ms { + if dirty_since > ms { + self.db.set_remote_id(ino, &doc.id); + self.db.push_queue_set_remote_id(filepath, &doc.id); + return Ok(ReconcileOutcome::SkippedDirty); + } + } + } + self.db.set_remote_id(ino, &doc.id); if doc.status == "done" { if !is_binary_type { @@ -1043,38 +1068,13 @@ impl FileSystem for SupermemoryFs { return Ok(result); } - // Not in local cache — try API pull. + // VFS hot path: do not await network I/O. Freshness arrives via + // the delta loop and `smfs sync`. if self.api.is_some() { if let Some(parent_path) = self.resolve_filepath(parent_ino) { let sep = if parent_path.ends_with('/') { "" } else { "/" }; let file_path = format!("{parent_path}{sep}{name}"); - let _ = self.pull_documents(&file_path).await; - - // Retry from cache. - let conn = self.db.conn.lock(); - let child_ino: Option = conn - .query_row( - "SELECT ino FROM fs_dentry WHERE parent_ino = ?1 AND name = ?2", - rusqlite::params![parent_ino as i64, name], - |r| r.get(0), - ) - .ok(); - - if let Some(child_ino) = child_ino { - self.dentry_cache - .lock() - .put((parent_ino, name.to_string()), child_ino as u64); - - let attr = conn - .query_row( - &format!("SELECT {INODE_COLS} FROM fs_inode WHERE ino = ?1"), - [child_ino], - Db::row_to_attr, - ) - .ok(); - - return Ok(attr); - } + self.hydration.enqueue(HydrationKey::Exact(file_path)); } } @@ -1175,31 +1175,19 @@ impl FileSystem for SupermemoryFs { return Ok(Some(names)); } - // Empty directory — try API pull. if let Some(dir_path) = self.resolve_filepath(ino) { let prefix = if dir_path.ends_with('/') { dir_path } else { format!("{dir_path}/") }; - let _ = self.pull_documents(&prefix).await; - - let conn = self.db.conn.lock(); - let mut stmt = conn - .prepare_cached("SELECT name FROM fs_dentry WHERE parent_ino = ?1 ORDER BY name") - .map_err(sql_err)?; - let mut names: Vec = stmt - .query_map([ino as i64], |r| r.get(0)) - .map_err(sql_err)? - .filter_map(|r| r.ok()) - .collect(); - if ino == ROOT_INO && self.api.is_some() && !names.contains(&PROFILE_NAME.to_string()) { - names.push(PROFILE_NAME.to_string()); - } - return Ok(Some(names)); + self.hydration.enqueue(HydrationKey::Prefix(prefix)); } - - Ok(Some(Vec::new())) + let mut names: Vec = Vec::new(); + if ino == ROOT_INO && self.api.is_some() && !names.contains(&PROFILE_NAME.to_string()) { + names.push(PROFILE_NAME.to_string()); + } + Ok(Some(names)) } async fn readdir_plus(&self, ino: u64) -> VfsResult>> { @@ -1240,21 +1228,15 @@ impl FileSystem for SupermemoryFs { return Ok(Some(append_profile(entries))); } - // Empty directory — try API pull. if let Some(dir_path) = self.resolve_filepath(ino) { let prefix = if dir_path.ends_with('/') { dir_path } else { format!("{dir_path}/") }; - let _ = self.pull_documents(&prefix).await; - - let conn = self.db.conn.lock(); - let entries = self.query_dir_entries(&conn, ino)?; - return Ok(Some(append_profile(entries))); + self.hydration.enqueue(HydrationKey::Prefix(prefix)); } - - Ok(Some(Vec::new())) + Ok(Some(append_profile(Vec::new()))) } async fn mkdir( diff --git a/crates/smfs-core/src/cache/hydration.rs b/crates/smfs-core/src/cache/hydration.rs new file mode 100644 index 0000000..dde788b --- /dev/null +++ b/crates/smfs-core/src/cache/hydration.rs @@ -0,0 +1,236 @@ +//! Background read-side hydration queue. +//! +//! Invariant: VFS syscall paths must not await network I/O. Misses enqueue +//! a refresh here and return immediately; the queue is in-memory and safe +//! to lose on restart (delta pull and `smfs sync` are the durable paths). + +use std::collections::{HashMap, HashSet, VecDeque}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use parking_lot::Mutex; +use tokio::sync::{watch, Notify, Semaphore}; +use tokio::task::JoinSet; + +const HYDRATION_CONCURRENCY: usize = 4; +const NEGATIVE_TTL: Duration = Duration::from_secs(2); + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum HydrationKey { + Exact(String), + /// Must end with `/`. + Prefix(String), +} + +impl HydrationKey { + pub fn path(&self) -> &str { + match self { + HydrationKey::Exact(p) | HydrationKey::Prefix(p) => p, + } + } +} + +#[derive(Debug)] +struct Inner { + queue: VecDeque, + pending: HashSet, + inflight: HashSet, + recent: HashMap, +} + +#[derive(Debug)] +pub struct HydrationScheduler { + inner: Mutex, + notify: Notify, + negative_ttl: Duration, +} + +impl HydrationScheduler { + pub fn new() -> Arc { + Arc::new(Self { + inner: Mutex::new(Inner { + queue: VecDeque::new(), + pending: HashSet::new(), + inflight: HashSet::new(), + recent: HashMap::new(), + }), + notify: Notify::new(), + negative_ttl: NEGATIVE_TTL, + }) + } + + pub fn enqueue(&self, key: HydrationKey) { + let now = Instant::now(); + let should_notify = { + let mut inner = self.inner.lock(); + inner + .recent + .retain(|_, ts| now.duration_since(*ts) < self.negative_ttl); + if inner.recent.contains_key(&key) { + return; + } + if inner.pending.contains(&key) || inner.inflight.contains(&key) { + return; + } + inner.pending.insert(key.clone()); + inner.queue.push_back(key); + true + }; + if should_notify { + self.notify.notify_one(); + } + } + + pub(crate) fn claim_next(&self) -> Option { + let mut inner = self.inner.lock(); + let key = inner.queue.pop_front()?; + inner.pending.remove(&key); + inner.inflight.insert(key.clone()); + Some(key) + } + + pub(crate) fn complete(&self, key: HydrationKey) { + let mut inner = self.inner.lock(); + inner.inflight.remove(&key); + inner.recent.insert(key, Instant::now()); + } + + pub fn notify(&self) -> &Notify { + &self.notify + } + + pub fn pending_len(&self) -> usize { + self.inner.lock().queue.len() + } + + pub fn inflight_len(&self) -> usize { + self.inner.lock().inflight.len() + } +} + +pub async fn run_hydration_worker( + fs: Arc, + mut shutdown: watch::Receiver, +) { + if fs.api().is_none() { + return; + } + let sched = fs.hydration().clone(); + let sem = Arc::new(Semaphore::new(HYDRATION_CONCURRENCY)); + let mut set = JoinSet::new(); + + 'outer: loop { + tokio::select! { + _ = shutdown.changed() => { + if *shutdown.borrow() { break 'outer; } + } + _ = sched.notify().notified() => {} + _ = tokio::time::sleep(Duration::from_millis(500)) => {} + } + + // Reap finished spawns; otherwise the JoinSet grows unbounded + // under steady load. + while let Ok(Some(_)) = + tokio::time::timeout(Duration::from_millis(0), set.join_next()).await + {} + + loop { + if *shutdown.borrow() { + break 'outer; + } + let permit = match sem.clone().try_acquire_owned() { + Ok(p) => p, + Err(_) => break, + }; + let Some(key) = sched.claim_next() else { + drop(permit); + break; + }; + let fs_clone = fs.clone(); + set.spawn(async move { + let _permit = permit; + let path = key.path().to_string(); + match fs_clone.hydrate_path(&path).await { + Ok(()) => { + tracing::debug!(key = ?key, "hydration: pull ok"); + } + Err(e) => { + tracing::warn!(key = ?key, error = %e, "hydration: pull failed"); + } + } + fs_clone.hydration().complete(key); + }); + } + } + + // Graceful drain on shutdown. + while set.join_next().await.is_some() {} +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn enqueue_dedupes_pending() { + let s = HydrationScheduler::new(); + s.enqueue(HydrationKey::Prefix("/docs/".into())); + s.enqueue(HydrationKey::Prefix("/docs/".into())); + s.enqueue(HydrationKey::Prefix("/docs/".into())); + assert_eq!(s.pending_len(), 1); + } + + #[test] + fn claim_next_drains_in_fifo_order() { + let s = HydrationScheduler::new(); + s.enqueue(HydrationKey::Exact("/a.md".into())); + s.enqueue(HydrationKey::Exact("/b.md".into())); + s.enqueue(HydrationKey::Exact("/c.md".into())); + assert_eq!(s.claim_next(), Some(HydrationKey::Exact("/a.md".into()))); + assert_eq!(s.claim_next(), Some(HydrationKey::Exact("/b.md".into()))); + assert_eq!(s.claim_next(), Some(HydrationKey::Exact("/c.md".into()))); + assert_eq!(s.claim_next(), None); + } + + #[test] + fn enqueue_skips_inflight() { + let s = HydrationScheduler::new(); + s.enqueue(HydrationKey::Exact("/x".into())); + let _ = s.claim_next(); + assert_eq!(s.inflight_len(), 1); + s.enqueue(HydrationKey::Exact("/x".into())); // already inflight + assert_eq!(s.pending_len(), 0); + } + + #[test] + fn negative_ttl_suppresses_recent_completion() { + let s = HydrationScheduler::new(); + s.enqueue(HydrationKey::Exact("/x".into())); + let key = s.claim_next().unwrap(); + s.complete(key); + // Within TTL: skipped. + s.enqueue(HydrationKey::Exact("/x".into())); + assert_eq!(s.pending_len(), 0); + } + + #[test] + fn negative_ttl_expires_eventually() { + // Use a scheduler with a very short TTL so the test stays fast. + let s = Arc::new(HydrationScheduler { + inner: Mutex::new(Inner { + queue: VecDeque::new(), + pending: HashSet::new(), + inflight: HashSet::new(), + recent: HashMap::new(), + }), + notify: Notify::new(), + negative_ttl: Duration::from_millis(20), + }); + s.enqueue(HydrationKey::Exact("/x".into())); + let key = s.claim_next().unwrap(); + s.complete(key); + std::thread::sleep(Duration::from_millis(40)); + s.enqueue(HydrationKey::Exact("/x".into())); + assert_eq!(s.pending_len(), 1); + } +} diff --git a/crates/smfs-core/src/cache/mod.rs b/crates/smfs-core/src/cache/mod.rs index 1d76125..b129820 100644 --- a/crates/smfs-core/src/cache/mod.rs +++ b/crates/smfs-core/src/cache/mod.rs @@ -10,10 +10,12 @@ pub(crate) mod db; mod file; mod fs; +pub mod hydration; pub mod profile; pub use db::{is_macos_noise_path, Db, DEFAULT_CHUNK_SIZE, DENTRY_CACHE_MAX, ROOT_INO}; pub use fs::{ReconcileOutcome, SupermemoryFs}; +pub use hydration::{HydrationKey, HydrationScheduler}; pub(crate) use fs::parse_iso_to_ms; diff --git a/crates/smfs-core/src/cache/tests.rs b/crates/smfs-core/src/cache/tests.rs index 62b5625..1c84084 100644 --- a/crates/smfs-core/src/cache/tests.rs +++ b/crates/smfs-core/src/cache/tests.rs @@ -669,3 +669,117 @@ async fn test_file_size_cap_enforced() { .expect_err("write past cap must fail"); assert!(matches!(err, VfsError::InvalidPath(_))); } + +#[tokio::test] +async fn test_reconcile_attach_by_path_skips_when_dirty() { + use crate::cache::fs::ReconcileOutcome; + + let fs = fs(); + let (attr, handle) = fs + .create_file(ROOT, "local.md", 0o644, UID, GID) + .await + .unwrap(); + handle.write(0, b"local edits").await.unwrap(); + handle.flush().await.unwrap(); + fs.db() + .push_queue_upsert("/local.md", PushOp::Create, Some(attr.ino), None, None, 1); + fs.db().set_dirty_since(attr.ino, Some(9_999_999_999_999)); + + let doc = doc_with("rid-1", "/local.md", "text", "remote stale", "done"); + let outcome = fs.reconcile_one(&doc).expect("reconcile_one ok"); + + assert!( + matches!(outcome, ReconcileOutcome::SkippedDirty), + "expected SkippedDirty, got {outcome:?}" + ); + let bytes = fs.db().read_all_content(attr.ino); + assert_eq!(&bytes, b"local edits", "local content was clobbered"); + assert_eq!(fs.db().get_remote_id(attr.ino).as_deref(), Some("rid-1")); + let snap = fs + .push_queue_inspect("/local.md") + .expect("push_queue row should exist"); + assert_eq!(snap.remote_id.as_deref(), Some("rid-1")); +} + +// ─── Hot-path locality ────────────────────────────────────────────── +// +// API client points at an unbindable port: any synchronous network I/O +// in lookup/readdir would either time out or fail-fast and the wall-time +// assertion would catch it. + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_lookup_burst_does_not_block_on_api() { + let api = Arc::new(crate::api::ApiClient::new( + "http://127.0.0.1:1", + "test-key", + "test-tag", + )); + let db = Arc::new(super::db::Db::open_in_memory().unwrap()); + let fs = Arc::new(SupermemoryFs::with_api(db, api)); + + fs.mkdir(ROOT, "docs", 0o755, UID, GID).await.unwrap(); + let docs_attr = fs.lookup(ROOT, "docs").await.unwrap().unwrap(); + + let start = std::time::Instant::now(); + let mut joins = Vec::with_capacity(120); + for i in 0..120 { + let fs_c = fs.clone(); + let docs_ino = docs_attr.ino; + joins.push(tokio::spawn(async move { + let name_a = format!("api-reference-{i}"); + let _ = fs_c.lookup(docs_ino, &name_a).await.unwrap(); + let _ = fs_c.lookup(ROOT, &format!("missing-{i}")).await.unwrap(); + })); + } + for j in joins { + j.await.unwrap(); + } + let elapsed = start.elapsed(); + + // Generous bound vs the NFS soft-mount timeout (~3s) so this stays + // stable on slow CI; the signal is "not seconds per call". + assert!( + elapsed < std::time::Duration::from_millis(500), + "lookup burst took {elapsed:?}; hot path may be blocking on API" + ); + let q = fs.hydration().pending_len() + fs.hydration().inflight_len(); + assert!(q >= 1, "expected ≥1 hydration request; got {q}"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_readdir_on_empty_dir_does_not_block_on_api() { + let api = Arc::new(crate::api::ApiClient::new( + "http://127.0.0.1:1", + "test-key", + "test-tag", + )); + let db = Arc::new(super::db::Db::open_in_memory().unwrap()); + let fs = Arc::new(SupermemoryFs::with_api(db, api)); + + let mut dir_inos = Vec::new(); + for i in 0..30 { + let attr = fs + .mkdir(ROOT, &format!("d{i}"), 0o755, UID, GID) + .await + .unwrap(); + dir_inos.push(attr.ino); + } + + let start = std::time::Instant::now(); + let mut joins = Vec::new(); + for ino in dir_inos { + let fs_c = fs.clone(); + joins.push(tokio::spawn(async move { + let names = fs_c.readdir(ino).await.unwrap().unwrap(); + assert!(names.is_empty()); + })); + } + for j in joins { + j.await.unwrap(); + } + let elapsed = start.elapsed(); + assert!( + elapsed < std::time::Duration::from_millis(500), + "readdir burst took {elapsed:?}; empty-dir hot path may be blocking on API" + ); +} diff --git a/crates/smfs-core/src/sync/mod.rs b/crates/smfs-core/src/sync/mod.rs index 3192ee2..a9be907 100644 --- a/crates/smfs-core/src/sync/mod.rs +++ b/crates/smfs-core/src/sync/mod.rs @@ -90,6 +90,14 @@ impl SyncEngine { set.spawn(async move { run_deletion_loop(fs_c, opts.deletion_scan_interval, &mut sd_c).await; }); + + // Loop F — hydration worker. Gated on pull_enabled so + // `--no-sync` mounts make no remote reads. + let fs_f = fs.clone(); + let sd_f = shutdown.clone(); + set.spawn(async move { + crate::cache::hydration::run_hydration_worker(fs_f, sd_f).await; + }); } let fs_d = fs.clone();