Skip to content

base work#1

Open
MatthiasHowellYopp wants to merge 1 commit intomainfrom
issue-5260
Open

base work#1
MatthiasHowellYopp wants to merge 1 commit intomainfrom
issue-5260

Conversation

@MatthiasHowellYopp
Copy link
Copy Markdown
Owner

###Motivation and Context
Fixes microsoft#5260

The current agent-framework-redis package depends on redisvl, which requires Redis Stack's RediSearch module. This creates an incompatibility for teams running Valkey — whether self-hosted or through managed cloud services (AWS ElastiCache, GCP Memorystore) — since RedisVL has known incompatibilities with Valkey's search module. Additionally, even the RedisChatMessageStore (which only needs basic key-value operations) pulls in RedisVL as a transitive dependency.

This PR adds a dedicated agent-framework-valkey package that uses valkey-glide (the official Valkey Python client) directly, with no RedisVL dependency.

###Description
New package: agent-framework-valkey (python/packages/valkey/)

Two components:

ValkeyChatMessageStore — Persistent chat message storage implementing the HistoryProvider protocol. Uses only basic Valkey key-value/list operations via valkey-glide, so it works with any Valkey (or Redis OSS) server with no search module required.

ValkeyContextProvider — Long-term memory context provider implementing the ContextProvider protocol. Uses Valkey's native FT.CREATE / FT.SEARCH commands via valkey-glide's custom_command API for full-text and optional hybrid vector search. Requires valkey-search >= 1.2 (ships with valkey-bundle >= 9.1.0).

The package follows the same structure and patterns as agent-framework-redis and agent-framework-mem0. Key design decisions:

Uses valkey-glide (official Valkey client, Apache-2.0 licensed) instead of RedisVL
Vector search uses Valkey's native FT commands directly rather than an abstraction layer, keeping the dependency tree minimal
embed_fn is a simple async callable rather than a framework-specific vectorizer class, making it easy to plug in any embedding provider
Includes a sample (
valkey_sample.py
) demonstrating both components with Bedrock
Changes:

python/packages/valkey/ — new package with implementation, tests, README, LICENSE
pyproject.toml
— added workspace source and pyright test environment for valkey
PACKAGE_STATUS.md
— registered as alpha
uv.lock
— updated with valkey-glide resolution
All checks pass: ruff formatting/linting, pyright, mypy, and 39 unit tests at 88% coverage.

###Contribution Checklist

-[x] The code builds clean without any errors or warnings
-[x] The PR follows the Contribution Guidelines
-[x]All unit tests pass, and I have added new tests where possible
-[ ] Is this a breaking change? If yes, add "[BREAKING]" prefix to the title of the PR.

Copy link
Copy Markdown

@daric93 daric93 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting this together — the overall structure is clean, the package mirrors the Redis patterns well, and the documentation/sample code are solid. A few items to address before this is ready to merge; see the inline review comments for details.

Correctness bug:

  • Vector embeddings are stored/queried as hex strings instead of raw bytes, which will cause vector search to return incorrect results silently.

Performance:

  • save_messages does individual rpush calls instead of batching — easy fix with valkey-glide's multi-value rpush.

API consistency (within the package and vs. Redis):

  • Missing aclose() on ValkeyContextProvider (forcing __aexit__ anti-pattern in sample code)
  • ValkeyContextProvider lacks valkey_url support that ValkeyChatMessageStore has
  • No search_all() equivalent from the Redis package
  • No validation for mutually exclusive connection params

Test coverage gaps:

  • Zero tests for the vector/hybrid search path (embed_fn provided)
  • No tests for _search() error handling/exception wrapping
  • Fragile assertion logic in test_stores_partition_fields

Type safety:

  • embed_fn typed as Any instead of a proper Callable type

See inline comments for details and suggested fixes.

state: Optional session state. Unused for Valkey-backed history.
**kwargs: Additional arguments (unused).
"""
if not messages:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

save_messages pushes each message with a separate await client.rpush(key, [serialized]) call — one network round-trip per message. The Redis implementation uses pipeline(transaction=True) to batch these.

valkey-glide's rpush accepts a list of values in a single call:

await client.rpush(key, serialized_messages)

This would reduce N round-trips to 1 and align with how the Redis package handles this.

raise IntegrationInvalidRequestException(f"Failed to create Valkey search index: {exc}") from exc

self._index_created = True

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The embedding is stored as vec_bytes.hex() (a hex-encoded string), and the search query in _search() also passes vec_bytes.hex() as the PARAMS value. Valkey's VECTOR field expects raw binary bytes, not hex strings.

The Redis implementation stores np.asarray(...).tobytes() directly. With hex encoding, the stored data won't match the binary format the vector index expects, so vector search will silently return incorrect results.

Both _add() here and the FT.SEARCH PARAMS in _search() should pass raw bytes instead:

field_map[self.vector_field_name] = vec_bytes  # not vec_bytes.hex()

valkey-glide's hset and custom_command accept bytes values directly.

This bug is currently not caught because there are no tests exercising the vector search path (all tests use embed_fn=None). Adding a test with a mock embed_fn that asserts the value passed to hset is raw bytes would prevent regressions.

vector_dims: Dimensionality of embedding vectors. Required if embed_fn is set.
vector_field_name: The name of the vector field. Required for vector search.
vector_algorithm: Vector index algorithm ("FLAT" or "HNSW"). Defaults to "HNSW".
vector_distance_metric: Distance metric ("COSINE", "IP", or "L2"). Defaults to "COSINE".
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

embed_fn is an async callable that takes a string and returns a list of floats — but the parameter is typed as Any | None, which provides no type safety.

The docstring correctly describes the expected signature as async def embed(text: str) -> list[float], but the type system can't enforce this. Consider using a proper type:

from collections.abc import Callable, Awaitable

EmbedFn = Callable[[str], Awaitable[list[float]]]

embed_fn: EmbedFn | None = None,

The Redis implementation uses BaseVectorizer (a concrete type from redisvl) for the same purpose. Since this package intentionally avoids redisvl, a Callable type or a simple Protocol would be the idiomatic Python equivalent while keeping the dependency tree minimal.

async def __aenter__(self) -> Self:
"""Async context manager entry."""
return self

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ValkeyChatMessageStore has an explicit aclose() method, but ValkeyContextProvider only cleans up the client inside __aexit__. This forces callers to use the awkward pattern seen in the sample:

await context_provider.__aexit__(None, None, None)  # valkey_sample.py line 213

Add an aclose() method (like the chat store has) and have __aexit__ delegate to it:

async def aclose(self) -> None:
    if self._owns_client and self._client is not None:
        await self._client.close()
        self._client = None

async def __aexit__(self, ...):
    await self.aclose()

This is consistent with the chat store, with standard Python async resource patterns, and avoids the __aexit__ anti-pattern in the sample code.

self._client = None


__all__ = ["ValkeyContextProvider"]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Redis RedisContextProvider exposes a search_all() method that retrieves all documents in the index using paginated FilterQuery. The Valkey implementation has no equivalent.

This is useful for debugging, testing, and administrative tasks (e.g., verifying what's been stored). Consider adding a similar method using FT.SEARCH with a * query and LIMIT pagination:

async def search_all(self, page_size: int = 200) -> list[dict[str, Any]]:
    """Returns all documents in the index."""
    await self._ensure_index()
    client = await self._get_client()
    # Use FT.SEARCH with wildcard query and pagination
    ...

Not a blocker, but a feature gap vs. the Redis package worth noting.

valkey_url: str | None = None,
host: str = "localhost",
port: int = 6379,
*,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Redis RedisHistoryProvider validates that redis_url and credential_provider are mutually exclusive and raises ValueError if both are provided.

Here, if a caller passes both valkey_url and host/port, the URL silently wins (in _get_client). This can lead to confusing behavior where the user thinks they're connecting to host:port but the URL takes precedence.

Consider adding validation:

if valkey_url is not None and (host != "localhost" or port != 6379):
    raise ValueError("valkey_url and explicit host/port are mutually exclusive")

Or at minimum, document the precedence clearly in the docstring.

use_tls: bool = False,
index_name: str = "context_idx",
prefix: str = "context:",
vector_dims: int | None = None,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ValkeyChatMessageStore supports valkey_url for connection (with URL parsing), but ValkeyContextProvider only accepts host/port. Within the same package, users would expect a consistent connection interface across both components.

Consider adding valkey_url support here as well, reusing the same _parse_url logic (or extracting it to a shared utility).

provider = ValkeyContextProvider(source_id="ctx", user_id="u1", client=mock_glide_client)
assert "Memories" in provider.context_prompt


Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All ValkeyContextProvider tests use text-only search (embed_fn=None). There are no tests covering the vector/hybrid search path. This leaves several critical code paths untested:

  1. _add() — embedding generation and storage (the .hex() bug from the other comment)
  2. _search() — KNN query construction with PARAMS
  3. _ensure_index() — vector field inclusion in FT.CREATE schema

The Redis package includes TestRedisContextProviderHybridQuery for this exact scenario. Consider adding equivalent tests with a mock embed_fn:

async def test_vector_search_path(self, mock_glide_client):
    mock_embed = AsyncMock(return_value=[0.1] * 128)
    provider = ValkeyContextProvider(
        source_id="ctx",
        user_id="u1",
        client=mock_glide_client,
        embed_fn=mock_embed,
        vector_field_name="embedding",
        vector_dims=128,
    )
    # Test _add stores embeddings correctly
    # Test _search constructs KNN query
    # Test _ensure_index includes vector field in schema

assert "ctx" in ctx.context_messages
msgs = ctx.context_messages["ctx"]
assert len(msgs) == 1
assert "Memory A" in msgs[0].text
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _search() method wraps unexpected exceptions in IntegrationInvalidRequestException, but there's no test verifying this behavior. A Valkey connection error or malformed response during search should be properly wrapped and re-raised.

Consider adding:

async def test_search_wraps_exceptions(self, mock_glide_client):
    mock_glide_client.custom_command = AsyncMock(
        side_effect=[None, ConnectionError("connection lost")]  # first call = FT.CREATE, second = FT.SEARCH
    )
    provider = ValkeyContextProvider(source_id="ctx", user_id="u1", client=mock_glide_client)

    with pytest.raises(IntegrationInvalidRequestException, match="Valkey search failed"):
        await provider._search(text="test")

Also worth testing that IntegrationInvalidRequestException raised directly inside _search (e.g., empty text) is not double-wrapped.

)
session = AgentSession(session_id="test-session")
ctx = SessionContext(input_messages=[Message(role="user", contents=["hello"])], session_id="s1")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

field_dict = call_args[1] if call_args[1] else call_args[0][1] relies on the falsy-ness of an empty kwargs dict to fall through to positional args. Since hset is called as client.hset(doc_id, field_map) (positional), call_args[1] is {} (falsy), so it falls through — but this works by accident.

Use the explicit positional form for clarity:

call_args = mock_glide_client.hset.call_args
field_dict = call_args[0][1]  # second positional arg

Or use call_args_list[0] if you want to be even more explicit about which call you're inspecting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Python: [Feature]: Add Valkey Context Provider and Chat Message Store

2 participants