Skip to content

adds Valkey Storage Implementation#3

Open
MatthiasHowellYopp wants to merge 1 commit intomainfrom
feat/valkey-storage
Open

adds Valkey Storage Implementation#3
MatthiasHowellYopp wants to merge 1 commit intomainfrom
feat/valkey-storage

Conversation

@MatthiasHowellYopp
Copy link
Copy Markdown
Owner

@MatthiasHowellYopp MatthiasHowellYopp commented Apr 21, 2026

Adds Valkey as a storage backend for CrewAI's unified memory system, using the valkey-glide client. Valkey is a high-performance, Redis-compatible key-value store that provides a distributed, production-ready alternative to the existing LanceDB and Qdrant storage options.

What's included
ValkeyStorage (valkey_storage.py) — full StorageBackend implementation:

CRUD operations (save, get, update, delete) with both sync and async APIs
Server-side vector search via Valkey Search module (FT.SEARCH with KNN)
Scope-based record organization with hierarchical scope queries
Category indexing and filtering
Metadata filtering with AND logic
Pagination support (limit/offset) for list_records
Scope introspection (get_scope_info, list_scopes, list_categories, count)
Bulk delete with scope, category, metadata, and age-based filters
Connection retry with exponential backoff for transient errors
Lazy client initialization and async context manager support
reset for clearing all or scoped records
ValkeyCache (valkey_cache.py) — lightweight cache interface:

get/set/delete/exists with optional TTL
JSON serialization for complex values
Connection timeout handling
Used by A2A agent card caching and file upload caching as an alternative to Redis
Integration points:

unified_memory.py — wired as a storage backend option
encoding_flow.py — encoding flow support for Valkey storage
memory_tools.py — memory tool descriptions updated with clearer parameter docs
upload_cache.py — added ValkeyCache as a cache backend option (alongside memory/Redis)
agent_card.py / task.py — added VALKEY_URL environment variable support for aiocache configuration, with priority over REDIS_URL
Optional dependency:

Added valkey = ["valkey-glide>=1.3.0"] to [project.optional-dependencies] in
pyproject.toml
Install with pip install crewai[valkey]
Tests
~5,300 lines of tests across 5 test files covering:

Core CRUD and index operations (test_valkey_storage.py)
Vector search with filters, scoring, pagination (test_valkey_storage_search.py)
Scope operations, listing, categories, count, reset (test_valkey_storage_scope.py)
Error handling, retry, serialization/deserialization (test_valkey_storage_errors.py)
Cache operations with TTL (test_valkey_cache.py)
All tests use mocked Valkey clients — no running Valkey instance required.

fixes crewAIInc#5578

Copy link
Copy Markdown

@jbrinkman jbrinkman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: Valkey Storage Implementation

Thanks for this substantial contribution — adding Valkey as a distributed storage backend is a valuable addition to CrewAI's memory system. The overall architecture is sound: ValkeyStorage for the full StorageBackend protocol, ValkeyCache for lightweight caching, proper optional dependency handling in pyproject.toml, and comprehensive test coverage (~5300 lines of tests with mocked clients).

What works well

  • Clean separation between ValkeyStorage (full storage) and ValkeyCache (simple cache)
  • Proper use of Valkey Search module for server-side vector similarity (FT.SEARCH with KNN)
  • Good error handling with descriptive error messages and graceful degradation
  • Comprehensive test suite covering CRUD, search, scopes, errors, and edge cases
  • Optional dependency pattern (pip install crewai[valkey]) follows existing conventions
  • Lazy client initialization and connection timeout handling

Issues requiring changes

Bugs:

  1. _run_async destroys cached client on every sync call — clears self._client = None then creates a new thread pool + event loop + TCP connection per call. This defeats lazy initialization and will be extremely slow under load.
  2. _retry_operation can't actually retry — accepts a Coroutine object which can only be awaited once. Subsequent retry iterations will raise RuntimeError. (Currently dead code, but the interface is broken.)

Performance:
3. _alist_records has N+1 query pattern — fetches ALL record IDs across scopes, issues individual HGETALL per record, sorts in Python, then applies pagination. For large scopes this loads everything into memory. Same pattern in _aget_scope_info and _alist_categories.

Design:
4. asave/adelete/_aupdate claim atomicity but aren't atomic — individual commands without MULTI/EXEC or pipeline. Docstrings should reflect actual behavior.
5. Massive code duplication in upload_cache.py — every method has if self._use_valkey: ... else: ... with repeated null checks. A strategy/adapter pattern would be cleaner.
6. Duplicate URL parsing — VALKEY_URL/REDIS_URL parsing is copy-pasted across agent_card.py, task.py, and upload_cache.py. Extract a shared helper.
7. Module-level side effectsagent_card.py and task.py both call caches.set_config() at import time. Import order determines which config wins. task.py also does a conditional from crewai.memory.storage.valkey_cache import ValkeyCache at module level when VALKEY_URL is set, which will fail if valkey-glide isn't installed.

Missing integration:
8. No way to actually select ValkeyStorageunified_memory.py changes only modify drain_writes(). There's no configuration path for users to choose Valkey as their storage backend.

Other:
9. uv.lock includes unrelated lockfile regeneration changes — consider splitting.
10. embed_texts creates a new ThreadPoolExecutor(max_workers=1) on every call in async context — should reuse a pool.

Recommendations

  • Fix bugs #1 and #2 before merging
  • Address the missing integration (crewAIInc#8) — without it, ValkeyStorage is unreachable
  • The performance and design issues (#3-7) are acceptable for a first version but should be tracked as follow-up work
  • Consider adding an integration test that runs against a real Valkey instance (even if skipped in CI by default)

Comment on lines +178 to +196
so this lock is primarily for API compatibility with other storage backends.
"""
return self._write_lock

def _run_async(self, coro: Coroutine[Any, Any, Any]) -> Any:
"""Bridge async operations to sync context.

If in async context, runs coroutine in a new thread with its own event loop.
Otherwise creates event loop in current thread.

Args:
coro: Coroutine to execute.

Returns:
Result of the coroutine execution.
"""
try:
asyncio.get_running_loop()
# We're in async context - run in thread with new event loop
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug/Performance: _run_async clears self._client = None on every sync call, then creates a brand-new ThreadPoolExecutor, event loop, and Valkey connection. This means every sync operation (save, get_record, search, etc.) opens and closes a TCP connection to Valkey, which will be extremely slow under load and defeats the purpose of lazy client initialization.

Consider caching the thread-pool and reusing the event loop, or using asyncio.run_coroutine_threadsafe with a long-lived background loop. At minimum, avoid clearing self._client unless the event loop has actually changed.

Comment on lines +198 to +225

def _run_in_new_loop() -> Any:
# Clear cached client to avoid event loop conflicts
self._client = None
return asyncio.run(coro)

with concurrent.futures.ThreadPoolExecutor() as pool:
future = pool.submit(_run_in_new_loop)
return future.result()
except RuntimeError as e:
if "no running event loop" in str(e).lower():
# Clear cached client to avoid event loop conflicts
self._client = None
return asyncio.run(coro)
raise

async def _retry_operation(
self,
operation: Coroutine[Any, Any, Any],
max_retries: int = 5,
) -> Any:
"""Retry operation with exponential backoff on connection errors.

Retries operations that fail due to connection errors using exponential
backoff starting at 0.2 seconds. Logs connection errors at debug level.

Args:
operation: Coroutine to execute.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: _retry_operation accepts a Coroutine object, but a coroutine can only be awaited once. On the first attempt the coroutine is consumed; subsequent retry iterations will raise RuntimeError: cannot reuse already awaited coroutine. The retry loop will never actually retry.

This needs to accept a callable (factory) that produces a new coroutine on each attempt:

async def _retry_operation(self, operation_fn: Callable[[], Coroutine], max_retries: int = 5) -> Any:
    for attempt in range(max_retries + 1):
        try:
            return await operation_fn()
        except (ClosingError, ConnectionError) as e:
            ...

Note: _retry_operation doesn't appear to be called anywhere in the PR, so this is currently dead code. If it's intended for future use, the interface needs fixing first.

Comment on lines +49 to +99
# Configure aiocache to use Valkey if VALKEY_URL is set, otherwise use Redis or memory
_valkey_url = os.environ.get("VALKEY_URL")
_redis_url = os.environ.get("REDIS_URL")

if _valkey_url:
# Parse Valkey URL for aiocache Redis backend (Valkey is Redis-compatible)
parsed = urlparse(_valkey_url)
caches.set_config(
{
"default": {
"cache": "aiocache.RedisCache",
"endpoint": parsed.hostname or "localhost",
"port": parsed.port or 6379,
"db": (
int(parsed.path.lstrip("/"))
if parsed.path and parsed.path != "/"
else 0
),
"password": parsed.password,
}
}
)
elif _redis_url:
# Use existing Redis configuration
parsed = urlparse(_redis_url)
caches.set_config(
{
"default": {
"cache": "aiocache.RedisCache",
"endpoint": parsed.hostname or "localhost",
"port": parsed.port or 6379,
"db": (
int(parsed.path.lstrip("/"))
if parsed.path and parsed.path != "/"
else 0
),
"password": parsed.password,
}
}
)
else:
# Use memory cache (default)
caches.set_config(
{
"default": {
"cache": "aiocache.SimpleMemoryCache",
}
}
)


Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplication: The VALKEY_URL and REDIS_URL parsing blocks are nearly identical (same urlparsecaches.set_config pattern repeated 3 times across agent_card.py, task.py, and upload_cache.py). Consider extracting a shared helper like _parse_valkey_or_redis_url() to reduce duplication and ensure consistent behavior.

Also, this module-level code runs at import time and has a side effect (calling caches.set_config). If both agent_card.py and task.py are imported, they'll each call caches.set_config with potentially different configurations, and the last one wins. This could lead to subtle ordering bugs.

Comment on lines +449 to +476
return None

async def _ensure_vector_index(self) -> None:
"""Create Valkey Search vector index if it doesn't exist.

Creates an index named 'memory_index' on record:* hashes with:
- Vector field for embeddings (HNSW or FLAT algorithm)
- TAG fields for scope and categories
- NUMERIC fields for created_at and importance

Raises:
RuntimeError: If Valkey Search module is not available.
"""
if self._index_created:
return

client = await self._get_client()

try:
# Check if index exists using FT.INFO
cmd: list[str | bytes] = ["FT.INFO", "memory_index"]
await client.custom_command(cmd)
_logger.debug("Vector index 'memory_index' already exists")
self._index_created = True
return
except Exception as e:
# Index doesn't exist, create it
_logger.debug(f"Index does not exist, will create: {e}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness: The docstring says "Save multiple records atomically" but the implementation issues separate HSET, ZADD, and SADD commands per record without using a Valkey transaction (MULTI/EXEC) or pipeline. If the process crashes mid-batch, some records will be saved with incomplete indexes.

For a first version this is probably acceptable, but the docstring should be updated to reflect the actual behavior, or a pipeline should be used for true atomicity. Same applies to adelete and _aupdate.

Comment on lines 364 to +379
if not valid:
return [[] for _ in texts]

result = embedder([t for _, t in valid])
# Check if we're in an async context
result: Any
try:
asyncio.get_running_loop()
# We're in an async context, but this is a sync function
# Run embedder in thread pool to avoid blocking the event loop
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
sync_future = pool.submit(embedder, [t for _, t in valid])
result = sync_future.result(timeout=30)
except RuntimeError:
# Not in async context, run directly
result = embedder([t for _, t in valid])

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concern: This creates a new ThreadPoolExecutor(max_workers=1) on every call to embed_texts when in an async context. Thread pool creation has overhead and this function may be called frequently during batch encoding. Consider using a module-level or class-level thread pool.

Also, sync_future.result(timeout=30) will raise concurrent.futures.TimeoutError (not TimeoutError), so the error handling may not behave as expected. The 30s timeout is hardcoded — consider making it configurable or at least documenting it.

Comment thread lib/crewai/src/crewai/a2a/utils/task.py Outdated
Comment on lines +102 to +131
# Initialize cache based on configuration
# Priority: VALKEY_URL > REDIS_URL > SimpleMemoryCache
_valkey_url = os.environ.get("VALKEY_URL")
_redis_url = os.environ.get("REDIS_URL")

caches.set_config(
{
"default": _parse_redis_url(_redis_url)
if _redis_url
else {
"cache": "aiocache.SimpleMemoryCache",
if _valkey_url:
# Use ValkeyCache for caching
from crewai.memory.storage.valkey_cache import ValkeyCache

parsed = urlparse(_valkey_url)
_task_cache = ValkeyCache(
host=parsed.hostname or "localhost",
port=parsed.port or 6379,
db=int(parsed.path.lstrip("/")) if parsed.path and parsed.path != "/" else 0,
password=parsed.password,
default_ttl=3600, # 1 hour default TTL
)
_use_valkey_cache = True
else:
# Fallback to existing aiocache configuration
caches.set_config(
{
"default": _parse_redis_url(_redis_url)
if _redis_url
else {
"cache": "aiocache.SimpleMemoryCache",
}
}
}
)
)
_use_valkey_cache = False
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design concern: When VALKEY_URL is set, this creates a ValkeyCache instance at module import time (_task_cache). This means:

  1. Importing this module triggers a from crewai.memory.storage.valkey_cache import ValkeyCache which requires valkey-glide to be installed — but it's an optional dependency. Users who have VALKEY_URL set in their environment but haven't installed crewai[valkey] will get an ImportError at import time.
  2. The _task_cache is a module-level global with no cleanup path. The ValkeyCache.close() method is never called.

Consider lazy initialization or guarding the import with a try/except.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing integration: ValkeyStorage is mentioned in the PR description as being "wired as a storage backend option" in unified_memory.py, but looking at the diff for unified_memory.py, there's no code that actually instantiates ValkeyStorage based on configuration. The unified_memory.py changes only modify drain_writes() behavior.

How does a user actually select Valkey as their storage backend? Is there a configuration option or environment variable? This seems like a missing piece of the integration.

Comment thread uv.lock
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The uv.lock diff includes unrelated changes — exclude-newer timestamp changed, unstructured-inference version resolution simplified, onnxruntime/coloredlogs/humanfriendly marker changes, etc. These appear to be lockfile regeneration artifacts. Consider splitting these into a separate commit to keep the Valkey changes isolated and reviewable.

@MatthiasHowellYopp MatthiasHowellYopp force-pushed the feat/valkey-storage branch 6 times, most recently from e0c07e1 to 979833a Compare April 24, 2026 13:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE] Add Valkey as a storage backend for the unified memory system

2 participants