Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions src/cli/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ pub async fn run(args: WatchArgs, output: OutputConfig) -> Result<()> {
let mut last_event_time = Instant::now();
let mut check_interval = tokio::time::interval(Duration::from_millis(100));

// Lance compaction throttling: without this, each delete+insert leaves a
// fragment/version behind and the on-disk dataset (plus the in-memory
// manifest cache) grows without bound. Trigger when either enough files
// have churned OR enough wall-time has passed with any churn at all.
const COMPACT_FILE_THRESHOLD: usize = 100;
const COMPACT_MIN_INTERVAL: Duration = Duration::from_secs(30 * 60);
let mut compact_counter: usize = 0;
let mut last_compact = Instant::now();

let mut sigterm =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;

Expand Down Expand Up @@ -258,6 +267,7 @@ pub async fn run(args: WatchArgs, output: OutputConfig) -> Result<()> {
.await
{
Ok(stats) if stats.files_indexed > 0 => {
compact_counter += stats.files_indexed;
if !output.quiet {
println!(
" {} Re-indexed {} file{} ({} chunks)",
Expand All @@ -276,6 +286,34 @@ pub async fn run(args: WatchArgs, output: OutputConfig) -> Result<()> {
}
}
}

// Periodically compact+prune Lance to cap memory/disk growth.
// Triggered when enough files have churned OR enough time has
// elapsed since the last compaction (with at least one change).
let should_compact = compact_counter >= COMPACT_FILE_THRESHOLD
|| (compact_counter > 0
&& last_compact.elapsed() >= COMPACT_MIN_INTERVAL);
if should_compact {
if let Err(e) = vector_store.compact().await {
if !output.quiet {
eprintln!(" {} Compact error: {}", "!".yellow(), e);
}
}
if let Err(e) = vector_store.prune().await {
if !output.quiet {
eprintln!(" {} Prune error: {}", "!".yellow(), e);
}
}
if !output.quiet {
println!(
" {} Compacted lance dataset ({} files since last)",
"~".cyan(),
compact_counter
);
}
compact_counter = 0;
last_compact = Instant::now();
}
}
}
}
Expand Down
Loading