Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 178 additions & 4 deletions openviking/storage/vectordb/collection/local_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def get_or_create_local_collection(
path: str = "",
vectorizer: Optional[BaseVectorizer] = None,
config: Optional[Dict[str, Any]] = None,
cache_config: Optional[Dict[str, Any]] = None,
):
"""Create or retrieve a local Collection.

Expand All @@ -67,6 +68,10 @@ def get_or_create_local_collection(
- "ttl_cleanup_seconds": Interval (in seconds) for TTL expiration data cleanup
- "index_maintenance_seconds": Interval (in seconds) for index maintenance tasks
If not provided, values will be obtained from environment variables or defaults
cache_config: Cache configuration for query result caching, optional settings include:
- "max_size": Maximum number of cache entries (default: 1000)
- "ttl_seconds": Time-to-live for cache entries in seconds (default: 300)
- "enabled": Whether caching is enabled (default: True)

Returns:
Collection: Collection instance
Expand All @@ -81,6 +86,11 @@ def get_or_create_local_collection(
... config={
... "ttl_cleanup_seconds": 5,
... "index_maintenance_seconds": 60
... },
... cache_config={
... "max_size": 2000,
... "ttl_seconds": 600,
... "enabled": True
... }
... )

Expand All @@ -103,7 +113,7 @@ def get_or_create_local_collection(
)
store_mgr = create_store_manager("local")
collection = VolatileCollection(
meta=meta, store=store_mgr, vectorizer=vectorizer, config=config
meta=meta, store=store_mgr, vectorizer=vectorizer, config=config, cache_config=cache_config
)
return Collection(collection)
else:
Expand All @@ -118,7 +128,7 @@ def get_or_create_local_collection(
storage_path = os.path.join(path, STORAGE_DIR_NAME)
store_mgr = create_store_manager("local", storage_path)
collection = PersistCollection(
path=path, meta=meta, store=store_mgr, vectorizer=vectorizer, config=config
path=path, meta=meta, store=store_mgr, vectorizer=vectorizer, config=config, cache_config=cache_config
)
return Collection(collection)

Expand All @@ -130,6 +140,7 @@ def __init__(
store_mgr: StoreManager,
vectorizer: Optional[BaseVectorizer] = None,
config: Optional[Dict[str, Any]] = None,
cache_config: Optional[Dict[str, Any]] = None,
):
self.indexes = ThreadSafeDictManager[IIndex]()
self.meta: CollectionMeta = meta
Expand Down Expand Up @@ -160,6 +171,9 @@ def __init__(
executors={"default": {"type": "threadpool", "max_workers": 1}}
)
self.scheduler.start()

# Cache configuration for all indexes
self.cache_config = cache_config or {}

def update(self, fields: Optional[Dict[str, Any]] = None, description: Optional[str] = None):
meta_data: Dict[str, Any] = {}
Expand Down Expand Up @@ -326,6 +340,162 @@ def search_by_vector(
]
return search_result

def batch_search_by_vector(
self,
index_name: str,
dense_vectors: List[List[float]],
limit: int = 10,
offset: int = 0,
filters: Optional[Dict[str, Any]] = None,
sparse_vectors: Optional[List[Dict[str, float]]] = None,
output_fields: Optional[List[str]] = None,
num_threads: Optional[int] = None,
) -> List[SearchResult]:
"""Perform batch vector similarity search with multiple query vectors.

This method searches with multiple query vectors in a single call,
providing significant performance improvements for batch workloads
through parallel processing and query result caching.

Args:
index_name: Name of the index to search
dense_vectors: List of dense query vectors
limit: Maximum number of results to return per query. Defaults to 10.
offset: Number of results to skip per query. Defaults to 0.
filters: Query DSL for filtering results by scalar fields.
sparse_vectors: List of sparse vectors (dictionaries) for hybrid search.
output_fields: List of fields to include in results.
num_threads: Number of threads for parallel search. Defaults to 4.

Returns:
List of SearchResult objects, one per query vector, in the same order
as the input dense_vectors.

Example:
>>> query_vectors = [[0.1, 0.2, ...], [0.3, 0.4, ...], [0.5, 0.6, ...]]
>>> results = collection.batch_search_by_vector(
... index_name="my_index",
... dense_vectors=query_vectors,
... limit=10
... )
>>> for i, result in enumerate(results):
... print(f"Query {i}: {len(result.data)} results")
"""
if not dense_vectors:
return []

index = self.indexes.get(index_name)
if not index:
return [SearchResult() for _ in dense_vectors]

# Prepare sparse vectors if provided
sparse_raw_terms_list = None
sparse_values_list = None
if sparse_vectors:
sparse_raw_terms_list = []
sparse_values_list = []
for sv in sparse_vectors:
if sv and isinstance(sv, dict):
sparse_raw_terms_list.append(list(sv.keys()))
sparse_values_list.append(list(sv.values()))
else:
sparse_raw_terms_list.append([])
sparse_values_list.append([])

# Perform batch search with parallel processing
actual_limit = limit + offset
batch_results = index.batch_search(
dense_vectors, actual_limit, filters, sparse_raw_terms_list, sparse_values_list, num_threads
)

# Process results for each query
search_results: List[SearchResult] = []
if not output_fields:
output_fields = list(self.meta.fields_dict.keys())

for label_list, scores_list in batch_results:
search_result = SearchResult()

# Apply offset by slicing the results
if offset > 0:
label_list = label_list[offset:]
scores_list = scores_list[offset:]

# Limit to requested size
if len(label_list) > limit:
label_list = label_list[:limit]
scores_list = scores_list[:limit]

pk_list = label_list
fields_list = []

if self.meta.primary_key or output_fields:
if not self.store_mgr:
raise RuntimeError("Store manager is not initialized")

# Fetch candidate data for labels
if label_list:
cands_list = self.store_mgr.fetch_cands_data(label_list)

valid_indices = []
for i, cand in enumerate(cands_list):
if cand is not None:
valid_indices.append(i)

if len(valid_indices) < len(cands_list):
cands_list = [cands_list[i] for i in valid_indices]
pk_list = [pk_list[i] for i in valid_indices]
scores_list = [scores_list[i] for i in valid_indices]

if cands_list:
cands_fields = [json.loads(cand.fields) for cand in cands_list]

if self.meta.primary_key:
pk_list = [
cands_field.get(self.meta.primary_key, "")
for cands_field in cands_fields
]
fields_list = [
{field: cands_field.get(field, None) for field in output_fields}
for cands_field in cands_fields
]
if self.meta.vector_key:
for i, cands in enumerate(cands_list):
fields_list[i][self.meta.vector_key] = cands.vector

search_result.data = [
SearchItemResult(id=pk, fields=fields, score=score)
for pk, score, fields in zip_longest(pk_list, scores_list, fields_list)
]
search_results.append(search_result)

return search_results

def get_index_cache_stats(self, index_name: str) -> Optional[Dict[str, Any]]:
"""Get cache statistics for a specific index.

Args:
index_name: Name of the index

Returns:
Dictionary containing cache statistics if the index exists,
None otherwise.
"""
index = self.indexes.get(index_name)
if not index:
return None
return index.get_cache_stats()

def invalidate_index_cache(self, index_name: str) -> None:
"""Invalidate the query cache for a specific index.

Args:
index_name: Name of the index
"""
index = self.indexes.get(index_name)
if index:
index.invalidate_cache()

def search_by_id(
self,
index_name: str,
Expand Down Expand Up @@ -895,8 +1065,9 @@ def __init__(
store: StoreManager,
vectorizer: Optional[BaseVectorizer] = None,
config: Optional[Dict[str, Any]] = None,
cache_config: Optional[Dict[str, Any]] = None,
):
super().__init__(meta, store, vectorizer, config)
super().__init__(meta, store, vectorizer, config, cache_config)
LocalCollection._register_scheduler_job(self)

def _new_index(
Expand All @@ -911,6 +1082,7 @@ def _new_index(
name=index_name,
meta=meta,
cands_list=cands_list,
cache_config=self.cache_config,
)
return index

Expand All @@ -926,12 +1098,13 @@ def __init__(
store: StoreManager,
vectorizer: Optional[BaseVectorizer] = None,
config: Optional[Dict[str, Any]] = None,
cache_config: Optional[Dict[str, Any]] = None,
):
self.collection_dir = path
os.makedirs(self.collection_dir, exist_ok=True)
self.index_dir = os.path.join(self.collection_dir, "index")
os.makedirs(self.index_dir, exist_ok=True)
super().__init__(meta, store, vectorizer, config)
super().__init__(meta, store, vectorizer, config, cache_config)
self._recover()
LocalCollection._register_scheduler_job(self) # TTL expiration data cleanup

Expand Down Expand Up @@ -1031,6 +1204,7 @@ def _new_index(
meta=meta,
cands_list=cands_list,
force_rebuild=force_rebuild,
cache_config=self.cache_config,
)
return index

Expand Down
113 changes: 113 additions & 0 deletions openviking/storage/vectordb/index/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,65 @@ def need_rebuild(self) -> bool:
"""
return True

def batch_search(
self,
query_vectors: List[List[float]],
limit: int = 10,
filters: Optional[Dict[str, Any]] = None,
sparse_raw_terms_list: Optional[List[List[str]]] = None,
sparse_values_list: Optional[List[List[float]]] = None,
) -> List[Tuple[List[int], List[float]]]:
"""Perform batch vector similarity search with multiple query vectors.

This method allows searching with multiple query vectors in a single call,
which can be more efficient than multiple individual searches due to
better cache utilization and reduced overhead.

Args:
query_vectors: List of dense query vectors for similarity matching.
Each vector should have the same dimensionality as indexed vectors.
limit: Maximum number of results to return per query. Defaults to 10.
filters: Query DSL for filtering results by scalar fields.
Applied to all queries in the batch.
sparse_raw_terms_list: List of term token lists for sparse vector search.
Each inner list corresponds to a query vector.
sparse_values_list: List of weight lists for sparse vector search.
Each inner list corresponds to a query vector.

Returns:
List of tuples, one per query vector, each containing:
- List of labels (record identifiers) sorted by similarity
- List of similarity scores corresponding to each label

Note:
Default implementation calls search() for each query vector sequentially.
Subclasses may override this method to provide optimized batch processing.
"""
# Default implementation: sequential search
results = []
for i, query_vector in enumerate(query_vectors):
sparse_terms = sparse_raw_terms_list[i] if sparse_raw_terms_list else None
sparse_values = sparse_values_list[i] if sparse_values_list else None
result = self.search(query_vector, limit, filters, sparse_terms, sparse_values)
results.append(result)
return results

def get_cache_stats(self) -> Optional[Dict[str, Any]]:
"""Get cache statistics for this index.

Returns:
Dictionary containing cache statistics if caching is enabled,
None otherwise.
"""
return None

def invalidate_cache(self) -> None:
"""Invalidate the query cache for this index.

Should be called when the underlying data is modified.
"""
pass


class Index:
"""
Expand Down Expand Up @@ -480,3 +539,57 @@ def aggregate(
if self.__index is None:
raise RuntimeError("Index is not initialized")
return self.__index.aggregate(filters)

def batch_search(
self,
query_vectors: List[List[float]],
limit: int = 10,
filters: Optional[Dict[str, Any]] = None,
sparse_raw_terms_list: Optional[List[List[str]]] = None,
sparse_values_list: Optional[List[List[float]]] = None,
) -> List[Tuple[List[int], List[float]]]:
"""Perform batch vector similarity search with multiple query vectors.

This method allows searching with multiple query vectors in a single call,
which can be more efficient than multiple individual searches due to
better cache utilization and reduced overhead.

Args:
query_vectors: List of dense query vectors for similarity matching.
limit: Maximum number of results to return per query. Defaults to 10.
filters: Query DSL for filtering results by scalar fields.
sparse_raw_terms_list: List of term token lists for sparse vector search.
sparse_values_list: List of weight lists for sparse vector search.

Returns:
List of tuples, one per query vector, each containing:
- List of labels (record identifiers) sorted by similarity
- List of similarity scores corresponding to each label

Raises:
RuntimeError: If the underlying index is not initialized.
"""
if self.__index is None:
raise RuntimeError("Index is not initialized")
return self.__index.batch_search(
query_vectors, limit, filters, sparse_raw_terms_list, sparse_values_list
)

def get_cache_stats(self) -> Optional[Dict[str, Any]]:
"""Get cache statistics for this index.

Returns:
Dictionary containing cache statistics if caching is enabled,
None otherwise.
"""
if self.__index is None:
return None
return self.__index.get_cache_stats()

def invalidate_cache(self) -> None:
"""Invalidate the query cache for this index.

Should be called when the underlying data is modified.
"""
if self.__index is not None:
self.__index.invalidate_cache()
Loading
Loading