-
Notifications
You must be signed in to change notification settings - Fork 31
make vfs hot path local-only #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ use parking_lot::Mutex; | |
|
|
||
| use super::db::{Db, DENTRY_CACHE_MAX, ROOT_INO}; | ||
| use super::file::SqliteFile; | ||
| use super::hydration::{HydrationKey, HydrationScheduler}; | ||
| use super::profile::{ProfileFile, PROFILE_INO, PROFILE_NAME}; | ||
|
|
||
| const DERIVED_SIBLING_SUFFIXES: &[&str] = &[ | ||
|
|
@@ -34,6 +35,7 @@ pub struct SupermemoryFs { | |
| api: Option<Arc<crate::api::ApiClient>>, | ||
| profile_file: Option<Arc<ProfileFile>>, | ||
| dentry_cache: Mutex<LruCache<(u64, String), u64>>, | ||
| hydration: Arc<HydrationScheduler>, | ||
| } | ||
|
|
||
| impl SupermemoryFs { | ||
|
|
@@ -44,6 +46,7 @@ impl SupermemoryFs { | |
| api: None, | ||
| profile_file: None, | ||
| dentry_cache: Mutex::new(LruCache::new(NonZeroUsize::new(DENTRY_CACHE_MAX).unwrap())), | ||
| hydration: HydrationScheduler::new(), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -55,9 +58,18 @@ impl SupermemoryFs { | |
| api: Some(api), | ||
| profile_file: Some(profile_file), | ||
| dentry_cache: Mutex::new(LruCache::new(NonZeroUsize::new(DENTRY_CACHE_MAX).unwrap())), | ||
| hydration: HydrationScheduler::new(), | ||
| } | ||
| } | ||
|
|
||
| pub fn hydration(&self) -> &Arc<HydrationScheduler> { | ||
| &self.hydration | ||
| } | ||
|
|
||
| pub(crate) async fn hydrate_path(&self, path: &str) -> VfsResult<()> { | ||
| self.pull_documents(path).await | ||
| } | ||
|
|
||
| pub async fn warm_profile(&self) { | ||
| if let Some(pf) = &self.profile_file { | ||
| pf.warm().await; | ||
|
|
@@ -331,6 +343,19 @@ impl SupermemoryFs { | |
| }; | ||
| if let Some(ino_i64) = existing_ino { | ||
| let ino = ino_i64 as u64; | ||
|
|
||
| // Attach remote_id even on skip so the queued push promotes | ||
| // Create→Update instead of POSTing a duplicate. | ||
| if let Some(dirty_since) = self.db.get_dirty_since(ino) { | ||
| if let Some(ms) = updated_ms { | ||
| if dirty_since > ms { | ||
| self.db.set_remote_id(ino, &doc.id); | ||
| self.db.push_queue_set_remote_id(filepath, &doc.id); | ||
| return Ok(ReconcileOutcome::SkippedDirty); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This |
||
| } | ||
| } | ||
| } | ||
|
|
||
| self.db.set_remote_id(ino, &doc.id); | ||
| if doc.status == "done" { | ||
| if !is_binary_type { | ||
|
|
@@ -1043,38 +1068,13 @@ impl FileSystem for SupermemoryFs { | |
| return Ok(result); | ||
| } | ||
|
|
||
| // Not in local cache — try API pull. | ||
| // VFS hot path: do not await network I/O. Freshness arrives via | ||
| // the delta loop and `smfs sync`. | ||
| if self.api.is_some() { | ||
| if let Some(parent_path) = self.resolve_filepath(parent_ino) { | ||
| let sep = if parent_path.ends_with('/') { "" } else { "/" }; | ||
| let file_path = format!("{parent_path}{sep}{name}"); | ||
| let _ = self.pull_documents(&file_path).await; | ||
|
|
||
| // Retry from cache. | ||
| let conn = self.db.conn.lock(); | ||
| let child_ino: Option<i64> = conn | ||
| .query_row( | ||
| "SELECT ino FROM fs_dentry WHERE parent_ino = ?1 AND name = ?2", | ||
| rusqlite::params![parent_ino as i64, name], | ||
| |r| r.get(0), | ||
| ) | ||
| .ok(); | ||
|
|
||
| if let Some(child_ino) = child_ino { | ||
| self.dentry_cache | ||
| .lock() | ||
| .put((parent_ino, name.to_string()), child_ino as u64); | ||
|
|
||
| let attr = conn | ||
| .query_row( | ||
| &format!("SELECT {INODE_COLS} FROM fs_inode WHERE ino = ?1"), | ||
| [child_ino], | ||
| Db::row_to_attr, | ||
| ) | ||
| .ok(); | ||
|
|
||
| return Ok(attr); | ||
| } | ||
| self.hydration.enqueue(HydrationKey::Exact(file_path)); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1175,31 +1175,19 @@ impl FileSystem for SupermemoryFs { | |
| return Ok(Some(names)); | ||
| } | ||
|
|
||
| // Empty directory — try API pull. | ||
| if let Some(dir_path) = self.resolve_filepath(ino) { | ||
| let prefix = if dir_path.ends_with('/') { | ||
| dir_path | ||
| } else { | ||
| format!("{dir_path}/") | ||
| }; | ||
| let _ = self.pull_documents(&prefix).await; | ||
|
|
||
| let conn = self.db.conn.lock(); | ||
| let mut stmt = conn | ||
| .prepare_cached("SELECT name FROM fs_dentry WHERE parent_ino = ?1 ORDER BY name") | ||
| .map_err(sql_err)?; | ||
| let mut names: Vec<String> = stmt | ||
| .query_map([ino as i64], |r| r.get(0)) | ||
| .map_err(sql_err)? | ||
| .filter_map(|r| r.ok()) | ||
| .collect(); | ||
| if ino == ROOT_INO && self.api.is_some() && !names.contains(&PROFILE_NAME.to_string()) { | ||
| names.push(PROFILE_NAME.to_string()); | ||
| } | ||
| return Ok(Some(names)); | ||
| self.hydration.enqueue(HydrationKey::Prefix(prefix)); | ||
| } | ||
|
|
||
| Ok(Some(Vec::new())) | ||
| let mut names: Vec<String> = Vec::new(); | ||
| if ino == ROOT_INO && self.api.is_some() && !names.contains(&PROFILE_NAME.to_string()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: |
||
| names.push(PROFILE_NAME.to_string()); | ||
| } | ||
| Ok(Some(names)) | ||
| } | ||
|
|
||
| async fn readdir_plus(&self, ino: u64) -> VfsResult<Option<Vec<DirEntry>>> { | ||
|
|
@@ -1240,21 +1228,15 @@ impl FileSystem for SupermemoryFs { | |
| return Ok(Some(append_profile(entries))); | ||
| } | ||
|
|
||
| // Empty directory — try API pull. | ||
| if let Some(dir_path) = self.resolve_filepath(ino) { | ||
| let prefix = if dir_path.ends_with('/') { | ||
| dir_path | ||
| } else { | ||
| format!("{dir_path}/") | ||
| }; | ||
| let _ = self.pull_documents(&prefix).await; | ||
|
|
||
| let conn = self.db.conn.lock(); | ||
| let entries = self.query_dir_entries(&conn, ino)?; | ||
| return Ok(Some(append_profile(entries))); | ||
| self.hydration.enqueue(HydrationKey::Prefix(prefix)); | ||
| } | ||
|
|
||
| Ok(Some(Vec::new())) | ||
| Ok(Some(append_profile(Vec::new()))) | ||
| } | ||
|
|
||
| async fn mkdir( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Subtle ordering concern: if
updated_msisNone(parse failure ondoc.updated_at) anddirty_sinceisSome, theif let Some(ms) = updated_mscheck fails, we fall through to line 359 and unconditionally set remote_id + rewrite content -- potentially clobbering a local dirty write. Consider: ifdirty_since.is_some() && updated_ms.is_none(), returnSkippedDirtyas a safety net.