Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions crates/smfs-core/src/cache/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,13 @@ impl Db {
}
}

pub(crate) fn remote_count(&self) -> usize {
let conn = self.conn.lock();
conn.query_row("SELECT COUNT(*) FROM fs_remote", [], |r| r.get::<_, i64>(0))
.unwrap_or(0)
.max(0) as usize
}

/// Rows currently inflight — drives the inflight status poller.
#[allow(dead_code)] // kept for future deep diagnostics
pub(crate) fn push_queue_inflight(&self) -> Vec<InflightRow> {
Expand Down
9 changes: 9 additions & 0 deletions crates/smfs-core/src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub fn pids_dir() -> PathBuf {
pub fn logs_dir() -> PathBuf {
cache_dir().join("logs")
}
pub fn startup_dir() -> PathBuf {
cache_dir().join("startup")
}

pub fn socket_path(tag: &str) -> PathBuf {
sockets_dir().join(format!("{tag}.sock"))
Expand All @@ -43,12 +46,16 @@ pub fn pid_path(tag: &str) -> PathBuf {
pub fn log_path(tag: &str) -> PathBuf {
logs_dir().join(format!("{tag}.log"))
}
pub fn startup_path(tag: &str) -> PathBuf {
startup_dir().join(format!("{tag}.json"))
}

/// Create `sockets/`, `pids/`, `logs/` subdirectories if missing.
pub fn ensure_dirs() -> std::io::Result<()> {
std::fs::create_dir_all(sockets_dir())?;
std::fs::create_dir_all(pids_dir())?;
std::fs::create_dir_all(logs_dir())?;
std::fs::create_dir_all(startup_dir())?;
Ok(())
}

Expand Down Expand Up @@ -83,10 +90,12 @@ pub fn cleanup_stale(tag: &str) -> bool {
Some(pid) if !pid_alive(pid) => {
let _ = std::fs::remove_file(pid_path(tag));
let _ = std::fs::remove_file(socket_path(tag));
let _ = std::fs::remove_file(startup_path(tag));
cleaned = true;
}
None if socket_path(tag).exists() => {
let _ = std::fs::remove_file(socket_path(tag));
let _ = std::fs::remove_file(startup_path(tag));
cleaned = true;
}
_ => {}
Expand Down
29 changes: 29 additions & 0 deletions crates/smfs-core/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ use tokio::task::JoinSet;

use crate::cache::SupermemoryFs;

#[derive(Debug, Clone, Copy)]
pub enum InitialPullProgress {
DeletionScan(scan::DeletionScanProgress),
Pull(pull::PullProgress),
}

/// Knobs for the sync engine. All optional — defaults are production-sane.
#[derive(Debug, Clone, Copy)]
pub struct SyncOptions {
Expand Down Expand Up @@ -60,6 +66,29 @@ impl SyncEngine {
Ok((removed, reconciled))
}

pub async fn initial_pull_with_progress<F>(
fs: &Arc<SupermemoryFs>,
mut on_progress: F,
) -> anyhow::Result<(usize, usize)>
where
F: FnMut(InitialPullProgress) + Send,
{
let removed = if fs.db().remote_count() == 0 {
0
} else {
scan::deletion_scan_with_progress(fs, |progress| {
on_progress(InitialPullProgress::DeletionScan(progress));
})
.await
.unwrap_or(0)
};
let reconciled = pull::full_pull_with_progress(fs, |progress| {
on_progress(InitialPullProgress::Pull(progress));
})
.await?;
Ok((removed, reconciled))
}

/// Spawn background loops for this mount. Push (D) and inflight status
/// poller (E) are always spawned — they are the mount's write path and
/// disabling them would defeat the purpose of running the mount.
Expand Down
33 changes: 33 additions & 0 deletions crates/smfs-core/src/sync/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ const SYNC_META_LAST_SEEN: &str = "last_seen_updated_at";
/// Per-file cap on R2 rehydration fetch (larger files stay 0-byte stubs).
const REHYDRATE_SIZE_CAP: u64 = 20 * 1024 * 1024;

#[derive(Debug, Clone, Copy)]
pub struct PullProgress {
pub page: u32,
pub total_pages: u32,
pub total_items: usize,
pub reconciled: usize,
}

/// Run one pass of the delta pull loop. Returns the number of remote docs
/// that were reconciled this pass (whether or not they produced local
/// changes).
Expand Down Expand Up @@ -88,6 +96,23 @@ async fn list_page(api: &ApiClient, page: u32) -> anyhow::Result<ListDocumentsRe
/// Full pull (no watermark). Used on mount when we have no prior state — we
/// want to catch every remote doc regardless of `updatedAt`.
pub async fn full_pull(fs: &Arc<SupermemoryFs>) -> anyhow::Result<usize> {
full_pull_inner(fs, None).await
}

pub async fn full_pull_with_progress<F>(
fs: &Arc<SupermemoryFs>,
mut on_progress: F,
) -> anyhow::Result<usize>
where
F: FnMut(PullProgress) + Send,
{
full_pull_inner(fs, Some(&mut on_progress)).await
}

async fn full_pull_inner(
fs: &Arc<SupermemoryFs>,
mut on_progress: Option<&mut (dyn FnMut(PullProgress) + Send)>,
) -> anyhow::Result<usize> {
let Some(api) = fs.api() else {
return Ok(0);
};
Expand All @@ -110,6 +135,14 @@ pub async fn full_pull(fs: &Arc<SupermemoryFs>) -> anyhow::Result<usize> {
if doc.updated_at > newest_seen {
newest_seen = doc.updated_at.clone();
}
if let Some(cb) = on_progress.as_mut() {
cb(PullProgress {
page,
total_pages: resp.pagination.total_pages,
total_items: resp.pagination.total_items as usize,
reconciled,
});
}
}
if page >= resp.pagination.total_pages {
break;
Expand Down
33 changes: 33 additions & 0 deletions crates/smfs-core/src/sync/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,35 @@ use crate::cache::SupermemoryFs;

const PAGE_SIZE: u32 = 100;

#[derive(Debug, Clone, Copy)]
pub struct DeletionScanProgress {
pub page: u32,
pub total_pages: u32,
pub total_items: usize,
pub remote_seen: usize,
}

/// Run one deletion-scan pass. Returns `Ok(removed)` where `removed` is the
/// number of local inodes that were unlinked because their remote_id
/// disappeared from the server.
pub async fn deletion_scan(fs: &Arc<SupermemoryFs>) -> anyhow::Result<usize> {
deletion_scan_inner(fs, None).await
}

pub async fn deletion_scan_with_progress<F>(
fs: &Arc<SupermemoryFs>,
mut on_progress: F,
) -> anyhow::Result<usize>
where
F: FnMut(DeletionScanProgress) + Send,
{
deletion_scan_inner(fs, Some(&mut on_progress)).await
}

async fn deletion_scan_inner(
fs: &Arc<SupermemoryFs>,
mut on_progress: Option<&mut (dyn FnMut(DeletionScanProgress) + Send)>,
) -> anyhow::Result<usize> {
let Some(api) = fs.api() else {
return Ok(0);
};
Expand All @@ -38,6 +63,14 @@ pub async fn deletion_scan(fs: &Arc<SupermemoryFs>) -> anyhow::Result<usize> {
for d in &resp.memories {
remote_ids.insert(d.id.clone());
}
if let Some(cb) = on_progress.as_mut() {
cb(DeletionScanProgress {
page,
total_pages: resp.pagination.total_pages,
total_items: resp.pagination.total_items as usize,
remote_seen: remote_ids.len(),
});
}
if page >= resp.pagination.total_pages {
break;
}
Expand Down
1 change: 1 addition & 0 deletions crates/smfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ clap = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
libc = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
directories = { workspace = true }

Expand Down
47 changes: 46 additions & 1 deletion crates/smfs/src/cmd/daemon_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::time::Instant;
use anyhow::{Context, Result};
use tokio::sync::Notify;

use super::startup::StartupReporter;
use smfs_core::cache::{Db, SupermemoryFs};
use smfs_core::daemon;
use smfs_core::mount::{mount_fs, MountBackend, MountOpts};
Expand All @@ -40,6 +41,9 @@ pub struct DaemonConfig {
}

pub async fn run(cfg: DaemonConfig) -> Result<()> {
let mut startup = StartupReporter::new(&cfg.container_tag);
startup.report("creating_mountpoint", "preparing mountpoint")?;

let created_dir = !cfg.mount_path.exists();
if created_dir {
std::fs::create_dir_all(&cfg.mount_path)?;
Expand Down Expand Up @@ -89,6 +93,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> {

let opts = MountOpts::new(cfg.mount_path.clone(), cfg.backend).with_ownership(uid, gid);

startup.report("validating_key", "validating API key")?;
let session = if cfg.ephemeral {
smfs_core::api::ApiClient::validate_key(&cfg.api_url, &cfg.api_key)
.await
Expand All @@ -113,6 +118,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> {
"server did not return org id; cannot open cache. Run `smfs login` and retry."
)
})?;
startup.report("opening_cache", format!("opening cache for org {org_id}"))?;
let db_path = smfs_core::config::cache_db_path(org_id, &cfg.container_tag);
if let Some(parent) = db_path.parent() {
std::fs::create_dir_all(parent)?;
Expand All @@ -132,6 +138,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> {
Arc::new(Db::open(&db_path)?)
};

startup.report("configuring_api", "configuring API client")?;
let mut api_client =
smfs_core::api::ApiClient::new(&cfg.api_url, &cfg.api_key, &cfg.container_tag);
if let Some(uid) = session.as_ref().and_then(|s| s.user_id.clone()) {
Expand All @@ -156,9 +163,43 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> {

let fs = Arc::new(SupermemoryFs::with_api(db, api));

startup.report("warming_profile", "warming profile")?;
fs.warm_profile().await;

let pull_succeeded = match smfs_core::sync::SyncEngine::initial_pull(&fs).await {
startup.report("initial_sync", "starting initial sync")?;
let pull_succeeded = match smfs_core::sync::SyncEngine::initial_pull_with_progress(
&fs,
|progress| match progress {
smfs_core::sync::InitialPullProgress::DeletionScan(progress) => {
if progress.remote_seen == 1 || progress.remote_seen % 100 == 0 {
let _ = startup.report_counts(
"initial_sync",
format!(
"deletion scan saw {} remote docs (page {}/{})",
progress.remote_seen, progress.page, progress.total_pages
),
progress.remote_seen,
progress.total_items,
);
}
}
smfs_core::sync::InitialPullProgress::Pull(progress) => {
if progress.reconciled == 1 || progress.reconciled % 100 == 0 {
let _ = startup.report_counts(
"initial_sync",
format!(
"reconciled {} docs (page {}/{})",
progress.reconciled, progress.page, progress.total_pages
),
progress.reconciled,
progress.total_items,
);
}
}
},
)
.await
{
Ok((removed, reconciled)) => {
eprintln!(
"initial sync: {reconciled} docs reconciled, {removed} stale entries removed"
Expand Down Expand Up @@ -205,6 +246,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> {
};
let sync_tasks = smfs_core::sync::SyncEngine::start(fs.clone(), sync_opts, shutdown_rx.clone());

startup.report("mounting_fs", "mounting filesystem")?;
let handle = mount_fs(fs.clone(), opts).await?;

// Auto-install grep wrapper on first mount.
Expand All @@ -215,6 +257,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> {
}

// Bring up the IPC control socket. Clients use it for status/sync/unmount.
startup.report("starting_ipc", "starting IPC socket")?;
daemon::ensure_dirs().context("creating daemon state dirs")?;
let ipc_shutdown_notify = Arc::new(Notify::new());
let state = Arc::new(smfs_core::daemon::ipc::IpcState {
Expand Down Expand Up @@ -244,6 +287,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> {
std::fs::create_dir_all(parent)?;
}
std::fs::write(&pid_path, std::process::id().to_string())?;
startup.report("ready", "filesystem mounted and IPC ready")?;

eprintln!(
"supermemoryfs mounted at {} (backend: {}, tag: {})",
Expand Down Expand Up @@ -340,6 +384,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> {
}
let _ = std::fs::remove_file(&pid_path);
let _ = std::fs::remove_file(&socket_path);
let _ = std::fs::remove_file(daemon::startup_path(&cfg.container_tag));
if created_dir {
let _ = std::fs::remove_dir(&cfg.mount_path);
}
Expand Down
1 change: 1 addition & 0 deletions crates/smfs/src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod logout;
pub mod logs;
pub mod marker;
pub mod mount;
pub mod startup;
pub mod status;
pub mod sync;
pub mod unmount;
Expand Down
Loading
Loading