Conversation
There was a problem hiding this comment.
Thanks for putting this together — the overall structure is clean, the package mirrors the Redis patterns well, and the documentation/sample code are solid. A few items to address before this is ready to merge; see the inline review comments for details.
Correctness bug:
- Vector embeddings are stored/queried as hex strings instead of raw bytes, which will cause vector search to return incorrect results silently.
Performance:
save_messagesdoes individualrpushcalls instead of batching — easy fix with valkey-glide's multi-valuerpush.
API consistency (within the package and vs. Redis):
- Missing
aclose()onValkeyContextProvider(forcing__aexit__anti-pattern in sample code) ValkeyContextProviderlacksvalkey_urlsupport thatValkeyChatMessageStorehas- No
search_all()equivalent from the Redis package - No validation for mutually exclusive connection params
Test coverage gaps:
- Zero tests for the vector/hybrid search path (
embed_fnprovided) - No tests for
_search()error handling/exception wrapping - Fragile assertion logic in
test_stores_partition_fields
Type safety:
embed_fntyped asAnyinstead of a properCallabletype
See inline comments for details and suggested fixes.
| state: Optional session state. Unused for Valkey-backed history. | ||
| **kwargs: Additional arguments (unused). | ||
| """ | ||
| if not messages: |
There was a problem hiding this comment.
save_messages pushes each message with a separate await client.rpush(key, [serialized]) call — one network round-trip per message. The Redis implementation uses pipeline(transaction=True) to batch these.
valkey-glide's rpush accepts a list of values in a single call:
await client.rpush(key, serialized_messages)This would reduce N round-trips to 1 and align with how the Redis package handles this.
| raise IntegrationInvalidRequestException(f"Failed to create Valkey search index: {exc}") from exc | ||
|
|
||
| self._index_created = True | ||
|
|
There was a problem hiding this comment.
The embedding is stored as vec_bytes.hex() (a hex-encoded string), and the search query in _search() also passes vec_bytes.hex() as the PARAMS value. Valkey's VECTOR field expects raw binary bytes, not hex strings.
The Redis implementation stores np.asarray(...).tobytes() directly. With hex encoding, the stored data won't match the binary format the vector index expects, so vector search will silently return incorrect results.
Both _add() here and the FT.SEARCH PARAMS in _search() should pass raw bytes instead:
field_map[self.vector_field_name] = vec_bytes # not vec_bytes.hex()valkey-glide's hset and custom_command accept bytes values directly.
This bug is currently not caught because there are no tests exercising the vector search path (all tests use embed_fn=None). Adding a test with a mock embed_fn that asserts the value passed to hset is raw bytes would prevent regressions.
| vector_dims: Dimensionality of embedding vectors. Required if embed_fn is set. | ||
| vector_field_name: The name of the vector field. Required for vector search. | ||
| vector_algorithm: Vector index algorithm ("FLAT" or "HNSW"). Defaults to "HNSW". | ||
| vector_distance_metric: Distance metric ("COSINE", "IP", or "L2"). Defaults to "COSINE". |
There was a problem hiding this comment.
embed_fn is an async callable that takes a string and returns a list of floats — but the parameter is typed as Any | None, which provides no type safety.
The docstring correctly describes the expected signature as async def embed(text: str) -> list[float], but the type system can't enforce this. Consider using a proper type:
from collections.abc import Callable, Awaitable
EmbedFn = Callable[[str], Awaitable[list[float]]]
embed_fn: EmbedFn | None = None,The Redis implementation uses BaseVectorizer (a concrete type from redisvl) for the same purpose. Since this package intentionally avoids redisvl, a Callable type or a simple Protocol would be the idiomatic Python equivalent while keeping the dependency tree minimal.
| async def __aenter__(self) -> Self: | ||
| """Async context manager entry.""" | ||
| return self | ||
|
|
There was a problem hiding this comment.
ValkeyChatMessageStore has an explicit aclose() method, but ValkeyContextProvider only cleans up the client inside __aexit__. This forces callers to use the awkward pattern seen in the sample:
await context_provider.__aexit__(None, None, None) # valkey_sample.py line 213Add an aclose() method (like the chat store has) and have __aexit__ delegate to it:
async def aclose(self) -> None:
if self._owns_client and self._client is not None:
await self._client.close()
self._client = None
async def __aexit__(self, ...):
await self.aclose()This is consistent with the chat store, with standard Python async resource patterns, and avoids the __aexit__ anti-pattern in the sample code.
| self._client = None | ||
|
|
||
|
|
||
| __all__ = ["ValkeyContextProvider"] |
There was a problem hiding this comment.
The Redis RedisContextProvider exposes a search_all() method that retrieves all documents in the index using paginated FilterQuery. The Valkey implementation has no equivalent.
This is useful for debugging, testing, and administrative tasks (e.g., verifying what's been stored). Consider adding a similar method using FT.SEARCH with a * query and LIMIT pagination:
async def search_all(self, page_size: int = 200) -> list[dict[str, Any]]:
"""Returns all documents in the index."""
await self._ensure_index()
client = await self._get_client()
# Use FT.SEARCH with wildcard query and pagination
...Not a blocker, but a feature gap vs. the Redis package worth noting.
| valkey_url: str | None = None, | ||
| host: str = "localhost", | ||
| port: int = 6379, | ||
| *, |
There was a problem hiding this comment.
The Redis RedisHistoryProvider validates that redis_url and credential_provider are mutually exclusive and raises ValueError if both are provided.
Here, if a caller passes both valkey_url and host/port, the URL silently wins (in _get_client). This can lead to confusing behavior where the user thinks they're connecting to host:port but the URL takes precedence.
Consider adding validation:
if valkey_url is not None and (host != "localhost" or port != 6379):
raise ValueError("valkey_url and explicit host/port are mutually exclusive")Or at minimum, document the precedence clearly in the docstring.
| use_tls: bool = False, | ||
| index_name: str = "context_idx", | ||
| prefix: str = "context:", | ||
| vector_dims: int | None = None, |
There was a problem hiding this comment.
ValkeyChatMessageStore supports valkey_url for connection (with URL parsing), but ValkeyContextProvider only accepts host/port. Within the same package, users would expect a consistent connection interface across both components.
Consider adding valkey_url support here as well, reusing the same _parse_url logic (or extracting it to a shared utility).
| provider = ValkeyContextProvider(source_id="ctx", user_id="u1", client=mock_glide_client) | ||
| assert "Memories" in provider.context_prompt | ||
|
|
||
|
|
There was a problem hiding this comment.
All ValkeyContextProvider tests use text-only search (embed_fn=None). There are no tests covering the vector/hybrid search path. This leaves several critical code paths untested:
_add()— embedding generation and storage (the.hex()bug from the other comment)_search()— KNN query construction withPARAMS_ensure_index()— vector field inclusion inFT.CREATEschema
The Redis package includes TestRedisContextProviderHybridQuery for this exact scenario. Consider adding equivalent tests with a mock embed_fn:
async def test_vector_search_path(self, mock_glide_client):
mock_embed = AsyncMock(return_value=[0.1] * 128)
provider = ValkeyContextProvider(
source_id="ctx",
user_id="u1",
client=mock_glide_client,
embed_fn=mock_embed,
vector_field_name="embedding",
vector_dims=128,
)
# Test _add stores embeddings correctly
# Test _search constructs KNN query
# Test _ensure_index includes vector field in schema| assert "ctx" in ctx.context_messages | ||
| msgs = ctx.context_messages["ctx"] | ||
| assert len(msgs) == 1 | ||
| assert "Memory A" in msgs[0].text |
There was a problem hiding this comment.
The _search() method wraps unexpected exceptions in IntegrationInvalidRequestException, but there's no test verifying this behavior. A Valkey connection error or malformed response during search should be properly wrapped and re-raised.
Consider adding:
async def test_search_wraps_exceptions(self, mock_glide_client):
mock_glide_client.custom_command = AsyncMock(
side_effect=[None, ConnectionError("connection lost")] # first call = FT.CREATE, second = FT.SEARCH
)
provider = ValkeyContextProvider(source_id="ctx", user_id="u1", client=mock_glide_client)
with pytest.raises(IntegrationInvalidRequestException, match="Valkey search failed"):
await provider._search(text="test")Also worth testing that IntegrationInvalidRequestException raised directly inside _search (e.g., empty text) is not double-wrapped.
| ) | ||
| session = AgentSession(session_id="test-session") | ||
| ctx = SessionContext(input_messages=[Message(role="user", contents=["hello"])], session_id="s1") | ||
|
|
There was a problem hiding this comment.
field_dict = call_args[1] if call_args[1] else call_args[0][1] relies on the falsy-ness of an empty kwargs dict to fall through to positional args. Since hset is called as client.hset(doc_id, field_map) (positional), call_args[1] is {} (falsy), so it falls through — but this works by accident.
Use the explicit positional form for clarity:
call_args = mock_glide_client.hset.call_args
field_dict = call_args[0][1] # second positional argOr use call_args_list[0] if you want to be even more explicit about which call you're inspecting.
cb72704 to
21c2a3e
Compare
21c2a3e to
e2ff01c
Compare
###Motivation and Context
Fixes microsoft#5260
The current agent-framework-redis package depends on redisvl, which requires Redis Stack's RediSearch module. This creates an incompatibility for teams running Valkey — whether self-hosted or through managed cloud services (AWS ElastiCache, GCP Memorystore) — since RedisVL has known incompatibilities with Valkey's search module. Additionally, even the RedisChatMessageStore (which only needs basic key-value operations) pulls in RedisVL as a transitive dependency.
This PR adds a dedicated agent-framework-valkey package that uses valkey-glide (the official Valkey Python client) directly, with no RedisVL dependency.
###Description
New package: agent-framework-valkey (python/packages/valkey/)
Two components:
ValkeyChatMessageStore — Persistent chat message storage implementing the HistoryProvider protocol. Uses only basic Valkey key-value/list operations via valkey-glide, so it works with any Valkey (or Redis OSS) server with no search module required.
ValkeyContextProvider — Long-term memory context provider implementing the ContextProvider protocol. Uses Valkey's native FT.CREATE / FT.SEARCH commands via valkey-glide's custom_command API for full-text and optional hybrid vector search. Requires valkey-search >= 1.2 (ships with valkey-bundle >= 9.1.0).
The package follows the same structure and patterns as agent-framework-redis and agent-framework-mem0. Key design decisions:
Uses valkey-glide (official Valkey client, Apache-2.0 licensed) instead of RedisVL
Vector search uses Valkey's native FT commands directly rather than an abstraction layer, keeping the dependency tree minimal
embed_fn is a simple async callable rather than a framework-specific vectorizer class, making it easy to plug in any embedding provider
Includes a sample (
valkey_sample.py
) demonstrating both components with Bedrock
Changes:
python/packages/valkey/ — new package with implementation, tests, README, LICENSE
pyproject.toml
— added workspace source and pyright test environment for valkey
PACKAGE_STATUS.md
— registered as alpha
uv.lock
— updated with valkey-glide resolution
All checks pass: ruff formatting/linting, pyright, mypy, and 39 unit tests at 88% coverage.
###Contribution Checklist
-[x] The code builds clean without any errors or warnings
-[x] The PR follows the Contribution Guidelines
-[x]All unit tests pass, and I have added new tests where possible
-[ ] Is this a breaking change? If yes, add "[BREAKING]" prefix to the title of the PR.