Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/api/agents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,8 +774,9 @@ pub async fn create_agent_internal(
format!("failed to init embeddings: {error}")
})?;

if let Err(error) = embedding_table.ensure_fts_index().await {
tracing::warn!(%error, agent_id = %agent_id, "failed to create FTS index");
// Ensure vector and FTS indexes exist (prevents 30-minute rebuild loop)
if let Err(error) = embedding_table.ensure_indexes_exist().await {
tracing::warn!(%error, agent_id = %agent_id, "failed to ensure indexes exist");
}

let memory_search = std::sync::Arc::new(crate::memory::MemorySearch::new(
Expand Down
6 changes: 3 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2761,9 +2761,9 @@ async fn initialize_agents(
format!("failed to init embeddings for agent '{}'", agent_config.id)
})?;

// Ensure FTS index exists for full-text search queries
if let Err(error) = embedding_table.ensure_fts_index().await {
tracing::warn!(%error, agent = %agent_config.id, "failed to create FTS index");
// Ensure vector and FTS indexes exist (prevents 30-minute rebuild loop)
if let Err(error) = embedding_table.ensure_indexes_exist().await {
tracing::warn!(%error, agent = %agent_config.id, "failed to ensure indexes exist");
}

let memory_search = Arc::new(spacebot::memory::MemorySearch::new(
Expand Down
173 changes: 135 additions & 38 deletions src/memory/lance.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,38 @@
//! LanceDB table management and embedding storage with HNSW vector index and FTS.
//! LanceDB table management and embedding storage with IVF-HNSW-SQ vector index and FTS.
//!
//! Index creation uses a single-flight guard pattern to prevent race conditions
//! when multiple concurrent callers attempt to ensure indexes exist simultaneously.
//! This ensures only ONE index build runs at a time per index type.
//!
//! Guards are module-level statics to ensure serialization across all
//! EmbeddingTable instances in the process, not just clones of the same instance.

use crate::error::{DbError, Result};
use arrow_array::cast::AsArray;
use arrow_array::types::Float32Type;
use arrow_array::{Array, RecordBatchIterator};
use futures::TryStreamExt;
use std::sync::Arc;
use tokio::sync::OnceCell;

/// Module-level single-flight guards for index creation.
///
/// These are statics to ensure that index creation is serialized across
/// ALL EmbeddingTable instances in the process, not just clones of the same instance.
/// This prevents race conditions when multiple threads independently call
/// `open_or_create()` and then attempt to ensure indexes exist.
static VECTOR_INDEX_GUARD: OnceCell<()> = OnceCell::const_new();
static FTS_INDEX_GUARD: OnceCell<()> = OnceCell::const_new();

/// Schema constants for the embeddings table.
const TABLE_NAME: &str = "memory_embeddings";
const EMBEDDING_DIM: i32 = 384; // all-MiniLM-L6-v2 dimension

/// LanceDB table for memory embeddings with HNSW index and FTS.
///
/// Index creation is protected by module-level static single-flight guards
/// to prevent duplicate concurrent builds. The guards ensure that only ONE
/// index creation runs at a time across ALL EmbeddingTable instances.
pub struct EmbeddingTable {
table: lancedb::Table,
}
Expand All @@ -32,15 +53,19 @@ impl EmbeddingTable {
pub async fn open_or_create(connection: &lancedb::Connection) -> Result<Self> {
// Try to open existing table
match connection.open_table(TABLE_NAME).execute().await {
Ok(table) => return Ok(Self { table }),
Ok(table) => {
return Ok(Self { table })
}
Err(error) => {
tracing::debug!(%error, "failed to open embeddings table, will create");
}
}

// Table doesn't exist or is unreadable — try creating it
match Self::create_empty_table(connection).await {
Ok(table) => return Ok(Self { table }),
Ok(table) => {
return Ok(Self { table })
}
Err(error) => {
tracing::warn!(
%error,
Expand Down Expand Up @@ -295,48 +320,120 @@ impl EmbeddingTable {
Ok(matches)
}

/// Create HNSW vector index and FTS index for better performance.
/// Should be called after enough data accumulates.
pub async fn create_indexes(&self) -> Result<()> {
// Create HNSW vector index on embedding column
self.table
.create_index(&["embedding"], lancedb::index::Index::Auto)
.execute()
.await
.map_err(|e| DbError::LanceDb(format!("Failed to create vector index: {}", e)))?;
/// Ensure vector and FTS indexes exist, creating them only if they don't already exist.
///
/// This prevents the expensive HNSW index training from running on every startup.
/// Uses module-level static single-flight guards to ensure only ONE index creation
/// runs at a time, even when multiple concurrent callers invoke this method simultaneously.
///
/// # Problem this solves
///
/// LanceDB's `create_index()` unconditionally triggers a full rebuild when called,
/// regardless of whether an index already exists on disk. The previous approach of
/// catching errors after the fact was too late — the expensive KMeans training had
/// already completed.
///
/// # Solution
///
/// Two-layer protection:
/// 1. **Single-flight guards** (module-level statics): Ensure only one index creation runs
/// at a time. Concurrent callers wait for the first to complete.
/// 2. **`list_indices()` check**: After acquiring the guard, verify the index still
/// doesn't exist (handles cases where another process created it externally).
///
/// # Concurrency pattern
///
/// The module-level static `OnceCell` guard ensures that:
/// - Only the first caller actually performs the index creation
/// - Subsequent callers await the initialization and get the same result
/// - The guard is shared across ALL EmbeddingTable instances in the process
/// - No deadlocks: each index has its own independent guard
pub async fn ensure_indexes_exist(&self) -> Result<()> {
use lancedb::index::Index;

// Ensure vector index on embedding column using module-level static guard
VECTOR_INDEX_GUARD
.get_or_try_init(|| async {
// Double-check: verify index doesn't exist before creating
// This handles cases where another process created it externally
let indices = self
.table
.list_indices()
.await
.map_err(|e| DbError::LanceDb(e.to_string()))?;

let has_vector_index = indices
.iter()
.any(|idx| idx.columns.iter().any(|col| col == "embedding"));

if has_vector_index {
tracing::debug!("Vector index already exists, skipping creation");
return Ok::<(), crate::error::Error>(());
}

self.ensure_fts_index().await?;
tracing::info!("Creating vector index (IVF-HNSW with Scalar Quantization)");
self.table
.create_index(&["embedding"], Index::IvfHnswSq(
lancedb::index::vector::IvfHnswSqIndexBuilder::default()
))
.execute()
.await
.map_err(|e| DbError::LanceDb(format!("Failed to create vector index: {}", e)))?;
tracing::info!("Vector index created successfully");

Ok::<(), crate::error::Error>(())
})
.await?;

// Ensure FTS index on content column using module-level static guard
FTS_INDEX_GUARD
.get_or_try_init(|| async {
// Double-check: verify index doesn't exist before creating
let indices = self
.table
.list_indices()
.await
.map_err(|e| DbError::LanceDb(e.to_string()))?;

let has_fts_index = indices
.iter()
.any(|idx| idx.columns.iter().any(|col| col == "content"));

if has_fts_index {
tracing::debug!("FTS index already exists, skipping creation");
return Ok::<(), crate::error::Error>(());
}

tracing::info!("Creating FTS index on content column");
self.table
.create_index(&["content"], Index::FTS(Default::default()))
.execute()
.await
.map_err(|e| DbError::LanceDb(format!("Failed to create FTS index: {}", e)))?;
tracing::info!("FTS index created successfully");

Ok::<(), crate::error::Error>(())
})
.await?;

Ok(())
}

/// Ensure the FTS index exists on the content column.
/// Optimize indexes for incremental updates after data insertion.
///
/// LanceDB requires an inverted index for `full_text_search()` queries.
/// This is safe to call multiple times — if the index already exists, the
/// error is silently ignored.
pub async fn ensure_fts_index(&self) -> Result<()> {
match self
.table
.create_index(&["content"], lancedb::index::Index::FTS(Default::default()))
.execute()
/// This is much faster than a full rebuild and should be called after
/// significant data changes to maintain query performance.
pub async fn optimize_indexes(&self) -> Result<()> {
use lancedb::table::{OptimizeAction, OptimizeOptions};

tracing::debug!("Optimizing indexes (incremental update)");
self.table
.optimize(OptimizeAction::Index(OptimizeOptions::default()))
.await
{
Ok(()) => {
tracing::debug!("FTS index created on content column");
Ok(())
}
Err(error) => {
let message = error.to_string();
// LanceDB returns an error if the index already exists
if message.contains("already") || message.contains("index") {
tracing::trace!("FTS index already exists");
Ok(())
} else {
Err(DbError::LanceDb(format!("Failed to create FTS index: {}", message)).into())
}
}
}
.map_err(|e| DbError::LanceDb(format!("Failed to optimize indexes: {}", e)))?;
tracing::debug!("Index optimization complete");

Ok(())
}

/// Get the Arrow schema for the embeddings table.
Expand Down
12 changes: 12 additions & 0 deletions src/memory/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ pub async fn run_maintenance_with_cancel(
.await?;
}

// Optimize indexes to incorporate all changes from decay, prune, and merge.
// This ensures the ANN index stays current with the full dataset.
if let Err(error) = embedding_table.optimize_indexes().await {
tracing::warn!(
%error,
"failed to optimize indexes during maintenance — index may be stale"
);
// Don't fail the entire maintenance — optimization is best-effort
} else {
tracing::info!("Index optimization complete after maintenance");
}
Comment on lines +95 to +103
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Honor cancellation during index optimization.

At Line 95, optimize_indexes() is awaited directly instead of using maintenance_cancelable_op(...). If cancellation arrives after merge completes, this step can still run to completion and weaken the “exit quickly” contract for run_maintenance_with_cancel.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/memory/maintenance.rs` around lines 95 - 103, The direct await of
embedding_table.optimize_indexes() can ignore cancellation; wrap the
optimize_indexes() call in maintenance_cancelable_op so it checks for
cancellation and exits quickly. Replace the direct await of
embedding_table.optimize_indexes().await with a call like
maintenance_cancelable_op(|| embedding_table.optimize_indexes().await)
(preserving the existing Err handling and tracing::warn/info logic) so
run_maintenance_with_cancel honors cancellation during index optimization.


Ok(report)
}

Expand Down
8 changes: 4 additions & 4 deletions src/tools/memory_save.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,15 +379,15 @@ impl Tool for MemorySaveTool {
}
}

// Ensure the FTS index exists so full_text_search queries work.
// Safe to call repeatedly — no-ops if the index already exists.
// Ensure vector and FTS indexes exist (prevents 30-minute rebuild loop)
// Safe to call repeatedly — skips creation if indexes already exist.
if let Err(error) = self
.memory_search
.embedding_table()
.ensure_fts_index()
.ensure_indexes_exist()
.await
{
tracing::warn!(%error, "failed to ensure FTS index after memory save");
tracing::warn!(%error, "failed to ensure indexes after memory save");
Comment on lines +382 to +390
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t lose post-insert maintenance on the save path.

After switching this branch to ensure_indexes_exist(), it becomes a pure existence check once the indexes are present, so later writes never reach the new optimize_indexes() flow. Please chain the incremental optimize step from here as well, ideally off the request path.

As per coding guidelines "Use tokio::spawn for fire-and-forget database writes (conversation history saves, memory writes, worker log persistence) so the user gets their response immediately."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/tools/memory_save.rs` around lines 382 - 390, After
ensure_indexes_exist() returns Ok in the memory save path, spawn a background
task to run the incremental index maintenance so you don't lose post-insert
optimize work; specifically, after calling
self.memory_search.embedding_table().ensure_indexes_exist().await, call
embedding_table().optimize_indexes() (or the incremental optimizer helper)
inside a tokio::spawn so it runs fire-and-forget off the request path, and log
errors from the spawned task (use tracing::warn or similar) to surface failures
without blocking the save response.

}

if let Some(event_context) = &self.event_context
Expand Down