Conversation
jbrinkman
left a comment
There was a problem hiding this comment.
Code Review: Valkey Storage Implementation
Thanks for this substantial contribution — adding Valkey as a distributed storage backend is a valuable addition to CrewAI's memory system. The overall architecture is sound: ValkeyStorage for the full StorageBackend protocol, ValkeyCache for lightweight caching, proper optional dependency handling in pyproject.toml, and comprehensive test coverage (~5300 lines of tests with mocked clients).
What works well
- Clean separation between
ValkeyStorage(full storage) andValkeyCache(simple cache) - Proper use of Valkey Search module for server-side vector similarity (FT.SEARCH with KNN)
- Good error handling with descriptive error messages and graceful degradation
- Comprehensive test suite covering CRUD, search, scopes, errors, and edge cases
- Optional dependency pattern (
pip install crewai[valkey]) follows existing conventions - Lazy client initialization and connection timeout handling
Issues requiring changes
Bugs:
_run_asyncdestroys cached client on every sync call — clearsself._client = Nonethen creates a new thread pool + event loop + TCP connection per call. This defeats lazy initialization and will be extremely slow under load._retry_operationcan't actually retry — accepts aCoroutineobject which can only be awaited once. Subsequent retry iterations will raiseRuntimeError. (Currently dead code, but the interface is broken.)
Performance:
3. _alist_records has N+1 query pattern — fetches ALL record IDs across scopes, issues individual HGETALL per record, sorts in Python, then applies pagination. For large scopes this loads everything into memory. Same pattern in _aget_scope_info and _alist_categories.
Design:
4. asave/adelete/_aupdate claim atomicity but aren't atomic — individual commands without MULTI/EXEC or pipeline. Docstrings should reflect actual behavior.
5. Massive code duplication in upload_cache.py — every method has if self._use_valkey: ... else: ... with repeated null checks. A strategy/adapter pattern would be cleaner.
6. Duplicate URL parsing — VALKEY_URL/REDIS_URL parsing is copy-pasted across agent_card.py, task.py, and upload_cache.py. Extract a shared helper.
7. Module-level side effects — agent_card.py and task.py both call caches.set_config() at import time. Import order determines which config wins. task.py also does a conditional from crewai.memory.storage.valkey_cache import ValkeyCache at module level when VALKEY_URL is set, which will fail if valkey-glide isn't installed.
Missing integration:
8. No way to actually select ValkeyStorage — unified_memory.py changes only modify drain_writes(). There's no configuration path for users to choose Valkey as their storage backend.
Other:
9. uv.lock includes unrelated lockfile regeneration changes — consider splitting.
10. embed_texts creates a new ThreadPoolExecutor(max_workers=1) on every call in async context — should reuse a pool.
Recommendations
- Fix bugs #1 and #2 before merging
- Address the missing integration (crewAIInc#8) — without it,
ValkeyStorageis unreachable - The performance and design issues (#3-7) are acceptable for a first version but should be tracked as follow-up work
- Consider adding an integration test that runs against a real Valkey instance (even if skipped in CI by default)
| so this lock is primarily for API compatibility with other storage backends. | ||
| """ | ||
| return self._write_lock | ||
|
|
||
| def _run_async(self, coro: Coroutine[Any, Any, Any]) -> Any: | ||
| """Bridge async operations to sync context. | ||
|
|
||
| If in async context, runs coroutine in a new thread with its own event loop. | ||
| Otherwise creates event loop in current thread. | ||
|
|
||
| Args: | ||
| coro: Coroutine to execute. | ||
|
|
||
| Returns: | ||
| Result of the coroutine execution. | ||
| """ | ||
| try: | ||
| asyncio.get_running_loop() | ||
| # We're in async context - run in thread with new event loop |
There was a problem hiding this comment.
Bug/Performance: _run_async clears self._client = None on every sync call, then creates a brand-new ThreadPoolExecutor, event loop, and Valkey connection. This means every sync operation (save, get_record, search, etc.) opens and closes a TCP connection to Valkey, which will be extremely slow under load and defeats the purpose of lazy client initialization.
Consider caching the thread-pool and reusing the event loop, or using asyncio.run_coroutine_threadsafe with a long-lived background loop. At minimum, avoid clearing self._client unless the event loop has actually changed.
|
|
||
| def _run_in_new_loop() -> Any: | ||
| # Clear cached client to avoid event loop conflicts | ||
| self._client = None | ||
| return asyncio.run(coro) | ||
|
|
||
| with concurrent.futures.ThreadPoolExecutor() as pool: | ||
| future = pool.submit(_run_in_new_loop) | ||
| return future.result() | ||
| except RuntimeError as e: | ||
| if "no running event loop" in str(e).lower(): | ||
| # Clear cached client to avoid event loop conflicts | ||
| self._client = None | ||
| return asyncio.run(coro) | ||
| raise | ||
|
|
||
| async def _retry_operation( | ||
| self, | ||
| operation: Coroutine[Any, Any, Any], | ||
| max_retries: int = 5, | ||
| ) -> Any: | ||
| """Retry operation with exponential backoff on connection errors. | ||
|
|
||
| Retries operations that fail due to connection errors using exponential | ||
| backoff starting at 0.2 seconds. Logs connection errors at debug level. | ||
|
|
||
| Args: | ||
| operation: Coroutine to execute. |
There was a problem hiding this comment.
Bug: _retry_operation accepts a Coroutine object, but a coroutine can only be awaited once. On the first attempt the coroutine is consumed; subsequent retry iterations will raise RuntimeError: cannot reuse already awaited coroutine. The retry loop will never actually retry.
This needs to accept a callable (factory) that produces a new coroutine on each attempt:
async def _retry_operation(self, operation_fn: Callable[[], Coroutine], max_retries: int = 5) -> Any:
for attempt in range(max_retries + 1):
try:
return await operation_fn()
except (ClosingError, ConnectionError) as e:
...Note: _retry_operation doesn't appear to be called anywhere in the PR, so this is currently dead code. If it's intended for future use, the interface needs fixing first.
| # Configure aiocache to use Valkey if VALKEY_URL is set, otherwise use Redis or memory | ||
| _valkey_url = os.environ.get("VALKEY_URL") | ||
| _redis_url = os.environ.get("REDIS_URL") | ||
|
|
||
| if _valkey_url: | ||
| # Parse Valkey URL for aiocache Redis backend (Valkey is Redis-compatible) | ||
| parsed = urlparse(_valkey_url) | ||
| caches.set_config( | ||
| { | ||
| "default": { | ||
| "cache": "aiocache.RedisCache", | ||
| "endpoint": parsed.hostname or "localhost", | ||
| "port": parsed.port or 6379, | ||
| "db": ( | ||
| int(parsed.path.lstrip("/")) | ||
| if parsed.path and parsed.path != "/" | ||
| else 0 | ||
| ), | ||
| "password": parsed.password, | ||
| } | ||
| } | ||
| ) | ||
| elif _redis_url: | ||
| # Use existing Redis configuration | ||
| parsed = urlparse(_redis_url) | ||
| caches.set_config( | ||
| { | ||
| "default": { | ||
| "cache": "aiocache.RedisCache", | ||
| "endpoint": parsed.hostname or "localhost", | ||
| "port": parsed.port or 6379, | ||
| "db": ( | ||
| int(parsed.path.lstrip("/")) | ||
| if parsed.path and parsed.path != "/" | ||
| else 0 | ||
| ), | ||
| "password": parsed.password, | ||
| } | ||
| } | ||
| ) | ||
| else: | ||
| # Use memory cache (default) | ||
| caches.set_config( | ||
| { | ||
| "default": { | ||
| "cache": "aiocache.SimpleMemoryCache", | ||
| } | ||
| } | ||
| ) | ||
|
|
||
|
|
There was a problem hiding this comment.
Duplication: The VALKEY_URL and REDIS_URL parsing blocks are nearly identical (same urlparse → caches.set_config pattern repeated 3 times across agent_card.py, task.py, and upload_cache.py). Consider extracting a shared helper like _parse_valkey_or_redis_url() to reduce duplication and ensure consistent behavior.
Also, this module-level code runs at import time and has a side effect (calling caches.set_config). If both agent_card.py and task.py are imported, they'll each call caches.set_config with potentially different configurations, and the last one wins. This could lead to subtle ordering bugs.
| return None | ||
|
|
||
| async def _ensure_vector_index(self) -> None: | ||
| """Create Valkey Search vector index if it doesn't exist. | ||
|
|
||
| Creates an index named 'memory_index' on record:* hashes with: | ||
| - Vector field for embeddings (HNSW or FLAT algorithm) | ||
| - TAG fields for scope and categories | ||
| - NUMERIC fields for created_at and importance | ||
|
|
||
| Raises: | ||
| RuntimeError: If Valkey Search module is not available. | ||
| """ | ||
| if self._index_created: | ||
| return | ||
|
|
||
| client = await self._get_client() | ||
|
|
||
| try: | ||
| # Check if index exists using FT.INFO | ||
| cmd: list[str | bytes] = ["FT.INFO", "memory_index"] | ||
| await client.custom_command(cmd) | ||
| _logger.debug("Vector index 'memory_index' already exists") | ||
| self._index_created = True | ||
| return | ||
| except Exception as e: | ||
| # Index doesn't exist, create it | ||
| _logger.debug(f"Index does not exist, will create: {e}") |
There was a problem hiding this comment.
Correctness: The docstring says "Save multiple records atomically" but the implementation issues separate HSET, ZADD, and SADD commands per record without using a Valkey transaction (MULTI/EXEC) or pipeline. If the process crashes mid-batch, some records will be saved with incomplete indexes.
For a first version this is probably acceptable, but the docstring should be updated to reflect the actual behavior, or a pipeline should be used for true atomicity. Same applies to adelete and _aupdate.
| if not valid: | ||
| return [[] for _ in texts] | ||
|
|
||
| result = embedder([t for _, t in valid]) | ||
| # Check if we're in an async context | ||
| result: Any | ||
| try: | ||
| asyncio.get_running_loop() | ||
| # We're in an async context, but this is a sync function | ||
| # Run embedder in thread pool to avoid blocking the event loop | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: | ||
| sync_future = pool.submit(embedder, [t for _, t in valid]) | ||
| result = sync_future.result(timeout=30) | ||
| except RuntimeError: | ||
| # Not in async context, run directly | ||
| result = embedder([t for _, t in valid]) | ||
|
|
There was a problem hiding this comment.
Concern: This creates a new ThreadPoolExecutor(max_workers=1) on every call to embed_texts when in an async context. Thread pool creation has overhead and this function may be called frequently during batch encoding. Consider using a module-level or class-level thread pool.
Also, sync_future.result(timeout=30) will raise concurrent.futures.TimeoutError (not TimeoutError), so the error handling may not behave as expected. The 30s timeout is hardcoded — consider making it configurable or at least documenting it.
| # Initialize cache based on configuration | ||
| # Priority: VALKEY_URL > REDIS_URL > SimpleMemoryCache | ||
| _valkey_url = os.environ.get("VALKEY_URL") | ||
| _redis_url = os.environ.get("REDIS_URL") | ||
|
|
||
| caches.set_config( | ||
| { | ||
| "default": _parse_redis_url(_redis_url) | ||
| if _redis_url | ||
| else { | ||
| "cache": "aiocache.SimpleMemoryCache", | ||
| if _valkey_url: | ||
| # Use ValkeyCache for caching | ||
| from crewai.memory.storage.valkey_cache import ValkeyCache | ||
|
|
||
| parsed = urlparse(_valkey_url) | ||
| _task_cache = ValkeyCache( | ||
| host=parsed.hostname or "localhost", | ||
| port=parsed.port or 6379, | ||
| db=int(parsed.path.lstrip("/")) if parsed.path and parsed.path != "/" else 0, | ||
| password=parsed.password, | ||
| default_ttl=3600, # 1 hour default TTL | ||
| ) | ||
| _use_valkey_cache = True | ||
| else: | ||
| # Fallback to existing aiocache configuration | ||
| caches.set_config( | ||
| { | ||
| "default": _parse_redis_url(_redis_url) | ||
| if _redis_url | ||
| else { | ||
| "cache": "aiocache.SimpleMemoryCache", | ||
| } | ||
| } | ||
| } | ||
| ) | ||
| ) | ||
| _use_valkey_cache = False |
There was a problem hiding this comment.
Design concern: When VALKEY_URL is set, this creates a ValkeyCache instance at module import time (_task_cache). This means:
- Importing this module triggers a
from crewai.memory.storage.valkey_cache import ValkeyCachewhich requiresvalkey-glideto be installed — but it's an optional dependency. Users who haveVALKEY_URLset in their environment but haven't installedcrewai[valkey]will get anImportErrorat import time. - The
_task_cacheis a module-level global with no cleanup path. TheValkeyCache.close()method is never called.
Consider lazy initialization or guarding the import with a try/except.
There was a problem hiding this comment.
Missing integration: ValkeyStorage is mentioned in the PR description as being "wired as a storage backend option" in unified_memory.py, but looking at the diff for unified_memory.py, there's no code that actually instantiates ValkeyStorage based on configuration. The unified_memory.py changes only modify drain_writes() behavior.
How does a user actually select Valkey as their storage backend? Is there a configuration option or environment variable? This seems like a missing piece of the integration.
There was a problem hiding this comment.
Nit: The uv.lock diff includes unrelated changes — exclude-newer timestamp changed, unstructured-inference version resolution simplified, onnxruntime/coloredlogs/humanfriendly marker changes, etc. These appear to be lockfile regeneration artifacts. Consider splitting these into a separate commit to keep the Valkey changes isolated and reviewable.
e0c07e1 to
979833a
Compare
979833a to
08c5544
Compare
Adds Valkey as a storage backend for CrewAI's unified memory system, using the valkey-glide client. Valkey is a high-performance, Redis-compatible key-value store that provides a distributed, production-ready alternative to the existing LanceDB and Qdrant storage options.
What's included
ValkeyStorage (valkey_storage.py) — full StorageBackend implementation:
CRUD operations (save, get, update, delete) with both sync and async APIs
Server-side vector search via Valkey Search module (FT.SEARCH with KNN)
Scope-based record organization with hierarchical scope queries
Category indexing and filtering
Metadata filtering with AND logic
Pagination support (limit/offset) for list_records
Scope introspection (get_scope_info, list_scopes, list_categories, count)
Bulk delete with scope, category, metadata, and age-based filters
Connection retry with exponential backoff for transient errors
Lazy client initialization and async context manager support
reset for clearing all or scoped records
ValkeyCache (valkey_cache.py) — lightweight cache interface:
get/set/delete/exists with optional TTL
JSON serialization for complex values
Connection timeout handling
Used by A2A agent card caching and file upload caching as an alternative to Redis
Integration points:
unified_memory.py — wired as a storage backend option
encoding_flow.py — encoding flow support for Valkey storage
memory_tools.py — memory tool descriptions updated with clearer parameter docs
upload_cache.py — added ValkeyCache as a cache backend option (alongside memory/Redis)
agent_card.py / task.py — added VALKEY_URL environment variable support for aiocache configuration, with priority over REDIS_URL
Optional dependency:
Added valkey = ["valkey-glide>=1.3.0"] to [project.optional-dependencies] in
pyproject.toml
Install with pip install crewai[valkey]
Tests
~5,300 lines of tests across 5 test files covering:
Core CRUD and index operations (test_valkey_storage.py)
Vector search with filters, scoring, pagination (test_valkey_storage_search.py)
Scope operations, listing, categories, count, reset (test_valkey_storage_scope.py)
Error handling, retry, serialization/deserialization (test_valkey_storage_errors.py)
Cache operations with TTL (test_valkey_cache.py)
All tests use mocked Valkey clients — no running Valkey instance required.
fixes crewAIInc#5578