From 6bdc1a02112d3fadbfc8d06c8bd64f41c400559e Mon Sep 17 00:00:00 2001 From: stryder Date: Sat, 18 Apr 2026 15:04:32 -0400 Subject: [PATCH] fix(watch): add periodic Lance compaction/prune to cap memory growth The watch loop performed delete+insert per change event without ever calling compact() or prune(). On long-running watchers, the Lance dataset accumulated thousands of version manifests and data fragments, driving RSS to multi-GB. Observed: 7.6 GB RSS + 4.2 GB swap on a 76K-file watcher after 8 days, with 27,750 versions / 7,384 fragments. Track a file-churn counter and wall-clock since last compaction in the watch loop. Trigger compact()+prune() when the counter reaches 100 files OR 30 minutes elapse with any churn, mirroring the post-index cleanup in cli/index.rs. Fixes #15 (bo-07l) Co-Authored-By: Claude Opus 4.7 --- src/cli/watch.rs | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/cli/watch.rs b/src/cli/watch.rs index 8b903c9..75a5d39 100644 --- a/src/cli/watch.rs +++ b/src/cli/watch.rs @@ -155,6 +155,15 @@ pub async fn run(args: WatchArgs, output: OutputConfig) -> Result<()> { let mut last_event_time = Instant::now(); let mut check_interval = tokio::time::interval(Duration::from_millis(100)); + // Lance compaction throttling: without this, each delete+insert leaves a + // fragment/version behind and the on-disk dataset (plus the in-memory + // manifest cache) grows without bound. Trigger when either enough files + // have churned OR enough wall-time has passed with any churn at all. + const COMPACT_FILE_THRESHOLD: usize = 100; + const COMPACT_MIN_INTERVAL: Duration = Duration::from_secs(30 * 60); + let mut compact_counter: usize = 0; + let mut last_compact = Instant::now(); + let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; @@ -258,6 +267,7 @@ pub async fn run(args: WatchArgs, output: OutputConfig) -> Result<()> { .await { Ok(stats) if stats.files_indexed > 0 => { + compact_counter += stats.files_indexed; if !output.quiet { println!( " {} Re-indexed {} file{} ({} chunks)", @@ -276,6 +286,34 @@ pub async fn run(args: WatchArgs, output: OutputConfig) -> Result<()> { } } } + + // Periodically compact+prune Lance to cap memory/disk growth. + // Triggered when enough files have churned OR enough time has + // elapsed since the last compaction (with at least one change). + let should_compact = compact_counter >= COMPACT_FILE_THRESHOLD + || (compact_counter > 0 + && last_compact.elapsed() >= COMPACT_MIN_INTERVAL); + if should_compact { + if let Err(e) = vector_store.compact().await { + if !output.quiet { + eprintln!(" {} Compact error: {}", "!".yellow(), e); + } + } + if let Err(e) = vector_store.prune().await { + if !output.quiet { + eprintln!(" {} Prune error: {}", "!".yellow(), e); + } + } + if !output.quiet { + println!( + " {} Compacted lance dataset ({} files since last)", + "~".cyan(), + compact_counter + ); + } + compact_counter = 0; + last_compact = Instant::now(); + } } } }