diff --git a/src/api/agents.rs b/src/api/agents.rs index f4547a73d..580ced964 100644 --- a/src/api/agents.rs +++ b/src/api/agents.rs @@ -774,8 +774,9 @@ pub async fn create_agent_internal( format!("failed to init embeddings: {error}") })?; - if let Err(error) = embedding_table.ensure_fts_index().await { - tracing::warn!(%error, agent_id = %agent_id, "failed to create FTS index"); + // Ensure vector and FTS indexes exist (prevents 30-minute rebuild loop) + if let Err(error) = embedding_table.ensure_indexes_exist().await { + tracing::warn!(%error, agent_id = %agent_id, "failed to ensure indexes exist"); } let memory_search = std::sync::Arc::new(crate::memory::MemorySearch::new( diff --git a/src/main.rs b/src/main.rs index 41f4e73e0..ce20e9316 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2761,9 +2761,9 @@ async fn initialize_agents( format!("failed to init embeddings for agent '{}'", agent_config.id) })?; - // Ensure FTS index exists for full-text search queries - if let Err(error) = embedding_table.ensure_fts_index().await { - tracing::warn!(%error, agent = %agent_config.id, "failed to create FTS index"); + // Ensure vector and FTS indexes exist (prevents 30-minute rebuild loop) + if let Err(error) = embedding_table.ensure_indexes_exist().await { + tracing::warn!(%error, agent = %agent_config.id, "failed to ensure indexes exist"); } let memory_search = Arc::new(spacebot::memory::MemorySearch::new( diff --git a/src/memory/lance.rs b/src/memory/lance.rs index 382bedefc..78faa0717 100644 --- a/src/memory/lance.rs +++ b/src/memory/lance.rs @@ -1,4 +1,11 @@ -//! LanceDB table management and embedding storage with HNSW vector index and FTS. +//! LanceDB table management and embedding storage with IVF-HNSW-SQ vector index and FTS. +//! +//! Index creation uses a single-flight guard pattern to prevent race conditions +//! when multiple concurrent callers attempt to ensure indexes exist simultaneously. +//! This ensures only ONE index build runs at a time per index type. +//! +//! Guards are module-level statics to ensure serialization across all +//! EmbeddingTable instances in the process, not just clones of the same instance. use crate::error::{DbError, Result}; use arrow_array::cast::AsArray; @@ -6,12 +13,26 @@ use arrow_array::types::Float32Type; use arrow_array::{Array, RecordBatchIterator}; use futures::TryStreamExt; use std::sync::Arc; +use tokio::sync::OnceCell; + +/// Module-level single-flight guards for index creation. +/// +/// These are statics to ensure that index creation is serialized across +/// ALL EmbeddingTable instances in the process, not just clones of the same instance. +/// This prevents race conditions when multiple threads independently call +/// `open_or_create()` and then attempt to ensure indexes exist. +static VECTOR_INDEX_GUARD: OnceCell<()> = OnceCell::const_new(); +static FTS_INDEX_GUARD: OnceCell<()> = OnceCell::const_new(); /// Schema constants for the embeddings table. const TABLE_NAME: &str = "memory_embeddings"; const EMBEDDING_DIM: i32 = 384; // all-MiniLM-L6-v2 dimension /// LanceDB table for memory embeddings with HNSW index and FTS. +/// +/// Index creation is protected by module-level static single-flight guards +/// to prevent duplicate concurrent builds. The guards ensure that only ONE +/// index creation runs at a time across ALL EmbeddingTable instances. pub struct EmbeddingTable { table: lancedb::Table, } @@ -32,7 +53,9 @@ impl EmbeddingTable { pub async fn open_or_create(connection: &lancedb::Connection) -> Result { // Try to open existing table match connection.open_table(TABLE_NAME).execute().await { - Ok(table) => return Ok(Self { table }), + Ok(table) => { + return Ok(Self { table }) + } Err(error) => { tracing::debug!(%error, "failed to open embeddings table, will create"); } @@ -40,7 +63,9 @@ impl EmbeddingTable { // Table doesn't exist or is unreadable — try creating it match Self::create_empty_table(connection).await { - Ok(table) => return Ok(Self { table }), + Ok(table) => { + return Ok(Self { table }) + } Err(error) => { tracing::warn!( %error, @@ -295,48 +320,120 @@ impl EmbeddingTable { Ok(matches) } - /// Create HNSW vector index and FTS index for better performance. - /// Should be called after enough data accumulates. - pub async fn create_indexes(&self) -> Result<()> { - // Create HNSW vector index on embedding column - self.table - .create_index(&["embedding"], lancedb::index::Index::Auto) - .execute() - .await - .map_err(|e| DbError::LanceDb(format!("Failed to create vector index: {}", e)))?; + /// Ensure vector and FTS indexes exist, creating them only if they don't already exist. + /// + /// This prevents the expensive HNSW index training from running on every startup. + /// Uses module-level static single-flight guards to ensure only ONE index creation + /// runs at a time, even when multiple concurrent callers invoke this method simultaneously. + /// + /// # Problem this solves + /// + /// LanceDB's `create_index()` unconditionally triggers a full rebuild when called, + /// regardless of whether an index already exists on disk. The previous approach of + /// catching errors after the fact was too late — the expensive KMeans training had + /// already completed. + /// + /// # Solution + /// + /// Two-layer protection: + /// 1. **Single-flight guards** (module-level statics): Ensure only one index creation runs + /// at a time. Concurrent callers wait for the first to complete. + /// 2. **`list_indices()` check**: After acquiring the guard, verify the index still + /// doesn't exist (handles cases where another process created it externally). + /// + /// # Concurrency pattern + /// + /// The module-level static `OnceCell` guard ensures that: + /// - Only the first caller actually performs the index creation + /// - Subsequent callers await the initialization and get the same result + /// - The guard is shared across ALL EmbeddingTable instances in the process + /// - No deadlocks: each index has its own independent guard + pub async fn ensure_indexes_exist(&self) -> Result<()> { + use lancedb::index::Index; + + // Ensure vector index on embedding column using module-level static guard + VECTOR_INDEX_GUARD + .get_or_try_init(|| async { + // Double-check: verify index doesn't exist before creating + // This handles cases where another process created it externally + let indices = self + .table + .list_indices() + .await + .map_err(|e| DbError::LanceDb(e.to_string()))?; + + let has_vector_index = indices + .iter() + .any(|idx| idx.columns.iter().any(|col| col == "embedding")); + + if has_vector_index { + tracing::debug!("Vector index already exists, skipping creation"); + return Ok::<(), crate::error::Error>(()); + } - self.ensure_fts_index().await?; + tracing::info!("Creating vector index (IVF-HNSW with Scalar Quantization)"); + self.table + .create_index(&["embedding"], Index::IvfHnswSq( + lancedb::index::vector::IvfHnswSqIndexBuilder::default() + )) + .execute() + .await + .map_err(|e| DbError::LanceDb(format!("Failed to create vector index: {}", e)))?; + tracing::info!("Vector index created successfully"); + + Ok::<(), crate::error::Error>(()) + }) + .await?; + + // Ensure FTS index on content column using module-level static guard + FTS_INDEX_GUARD + .get_or_try_init(|| async { + // Double-check: verify index doesn't exist before creating + let indices = self + .table + .list_indices() + .await + .map_err(|e| DbError::LanceDb(e.to_string()))?; + + let has_fts_index = indices + .iter() + .any(|idx| idx.columns.iter().any(|col| col == "content")); + + if has_fts_index { + tracing::debug!("FTS index already exists, skipping creation"); + return Ok::<(), crate::error::Error>(()); + } + + tracing::info!("Creating FTS index on content column"); + self.table + .create_index(&["content"], Index::FTS(Default::default())) + .execute() + .await + .map_err(|e| DbError::LanceDb(format!("Failed to create FTS index: {}", e)))?; + tracing::info!("FTS index created successfully"); + + Ok::<(), crate::error::Error>(()) + }) + .await?; Ok(()) } - /// Ensure the FTS index exists on the content column. + /// Optimize indexes for incremental updates after data insertion. /// - /// LanceDB requires an inverted index for `full_text_search()` queries. - /// This is safe to call multiple times — if the index already exists, the - /// error is silently ignored. - pub async fn ensure_fts_index(&self) -> Result<()> { - match self - .table - .create_index(&["content"], lancedb::index::Index::FTS(Default::default())) - .execute() + /// This is much faster than a full rebuild and should be called after + /// significant data changes to maintain query performance. + pub async fn optimize_indexes(&self) -> Result<()> { + use lancedb::table::{OptimizeAction, OptimizeOptions}; + + tracing::debug!("Optimizing indexes (incremental update)"); + self.table + .optimize(OptimizeAction::Index(OptimizeOptions::default())) .await - { - Ok(()) => { - tracing::debug!("FTS index created on content column"); - Ok(()) - } - Err(error) => { - let message = error.to_string(); - // LanceDB returns an error if the index already exists - if message.contains("already") || message.contains("index") { - tracing::trace!("FTS index already exists"); - Ok(()) - } else { - Err(DbError::LanceDb(format!("Failed to create FTS index: {}", message)).into()) - } - } - } + .map_err(|e| DbError::LanceDb(format!("Failed to optimize indexes: {}", e)))?; + tracing::debug!("Index optimization complete"); + + Ok(()) } /// Get the Arrow schema for the embeddings table. diff --git a/src/memory/maintenance.rs b/src/memory/maintenance.rs index 67b935497..f0294f9fe 100644 --- a/src/memory/maintenance.rs +++ b/src/memory/maintenance.rs @@ -90,6 +90,18 @@ pub async fn run_maintenance_with_cancel( .await?; } + // Optimize indexes to incorporate all changes from decay, prune, and merge. + // This ensures the ANN index stays current with the full dataset. + if let Err(error) = embedding_table.optimize_indexes().await { + tracing::warn!( + %error, + "failed to optimize indexes during maintenance — index may be stale" + ); + // Don't fail the entire maintenance — optimization is best-effort + } else { + tracing::info!("Index optimization complete after maintenance"); + } + Ok(report) } diff --git a/src/tools/memory_save.rs b/src/tools/memory_save.rs index 8193d7f50..86e5004dc 100644 --- a/src/tools/memory_save.rs +++ b/src/tools/memory_save.rs @@ -379,15 +379,15 @@ impl Tool for MemorySaveTool { } } - // Ensure the FTS index exists so full_text_search queries work. - // Safe to call repeatedly — no-ops if the index already exists. + // Ensure vector and FTS indexes exist (prevents 30-minute rebuild loop) + // Safe to call repeatedly — skips creation if indexes already exist. if let Err(error) = self .memory_search .embedding_table() - .ensure_fts_index() + .ensure_indexes_exist() .await { - tracing::warn!(%error, "failed to ensure FTS index after memory save"); + tracing::warn!(%error, "failed to ensure indexes after memory save"); } if let Some(event_context) = &self.event_context