From f794f99b65d5d8c4947e25a7e1da695ab1fab797 Mon Sep 17 00:00:00 2001 From: leavedrop Date: Wed, 27 May 2026 10:13:04 +0800 Subject: [PATCH 1/3] =?UTF-8?q?feat(memory):=20add=20SharedMemoryStore=20?= =?UTF-8?q?=E2=80=94=20cross-agent=20shared=20memory=20with=20scoped=20FTS?= =?UTF-8?q?=20recall?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a new memory extension (autogen-ext) implementing the SharedMemoryStore proposed in #7748. Key design points from community discussion: - SQLite + FTS5 backend, zero external dependencies - Three scopes: agent (private), group (per-team), global (cross-runtime) - Capsule-shaped recall via tool-result position, not prefix-loaded - Write receipts with provenance (agent_id, scope, timestamp, fact_hash) - Per-scope write authorization (global restricted by default) - Soft-delete with audit tombstones - Configurable max capsule size to bound recall payload - FunctionTool factories (memory_search/remember/forget) for agent tool use 20 tests covering scope isolation, auth, TTL, persistence, tools, and Memory protocol compliance. Refs #7748 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/autogen_ext/memory/shared/__init__.py | 12 + .../src/autogen_ext/memory/shared/_config.py | 40 +++ .../src/autogen_ext/memory/shared/_store.py | 329 ++++++++++++++++++ .../src/autogen_ext/memory/shared/_tools.py | 81 +++++ .../tests/memory/test_shared_memory.py | 255 ++++++++++++++ 5 files changed, 717 insertions(+) create mode 100644 python/packages/autogen-ext/src/autogen_ext/memory/shared/__init__.py create mode 100644 python/packages/autogen-ext/src/autogen_ext/memory/shared/_config.py create mode 100644 python/packages/autogen-ext/src/autogen_ext/memory/shared/_store.py create mode 100644 python/packages/autogen-ext/src/autogen_ext/memory/shared/_tools.py create mode 100644 python/packages/autogen-ext/tests/memory/test_shared_memory.py diff --git a/python/packages/autogen-ext/src/autogen_ext/memory/shared/__init__.py b/python/packages/autogen-ext/src/autogen_ext/memory/shared/__init__.py new file mode 100644 index 000000000000..95d8181a14d9 --- /dev/null +++ b/python/packages/autogen-ext/src/autogen_ext/memory/shared/__init__.py @@ -0,0 +1,12 @@ +from ._config import MemoryScope, SharedMemoryConfig, WritePolicyEntry +from ._store import SharedMemoryStore, WriteReceipt +from ._tools import make_memory_tools + +__all__ = [ + "MemoryScope", + "SharedMemoryConfig", + "SharedMemoryStore", + "WriteReceipt", + "WritePolicyEntry", + "make_memory_tools", +] diff --git a/python/packages/autogen-ext/src/autogen_ext/memory/shared/_config.py b/python/packages/autogen-ext/src/autogen_ext/memory/shared/_config.py new file mode 100644 index 000000000000..a32b7c6f17e7 --- /dev/null +++ b/python/packages/autogen-ext/src/autogen_ext/memory/shared/_config.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from enum import Enum +from typing import Dict, List, Optional + +from pydantic import BaseModel, Field + + +class MemoryScope(str, Enum): + AGENT = "agent" + GROUP = "group" + GLOBAL = "global" + + +class WritePolicyEntry(BaseModel): + allowed_agents: List[str] = Field(default_factory=lambda: ["*"]) + + +class SharedMemoryConfig(BaseModel): + db_path: str = Field( + default=":memory:", + description="Path to SQLite database file. Use ':memory:' for in-memory store.", + ) + default_top_k: int = Field(default=5, ge=1, le=50) + max_capsule_bytes: int = Field( + default=2048, + description="Maximum size in bytes for a single recall capsule.", + ) + write_policies: Dict[str, WritePolicyEntry] = Field( + default_factory=lambda: { + "agent": WritePolicyEntry(allowed_agents=["*"]), + "group": WritePolicyEntry(allowed_agents=["*"]), + "global": WritePolicyEntry(allowed_agents=[]), + }, + description="Per-scope write authorization. Empty list = no one can write; ['*'] = open.", + ) + group_id: Optional[str] = Field( + default=None, + description="Group identifier for group-scoped facts. Required when using group scope.", + ) diff --git a/python/packages/autogen-ext/src/autogen_ext/memory/shared/_store.py b/python/packages/autogen-ext/src/autogen_ext/memory/shared/_store.py new file mode 100644 index 000000000000..7b21c7563394 --- /dev/null +++ b/python/packages/autogen-ext/src/autogen_ext/memory/shared/_store.py @@ -0,0 +1,329 @@ +from __future__ import annotations + +import hashlib +import json +import logging +import sqlite3 +import uuid +from datetime import datetime, timedelta, timezone +from typing import Any, List, Optional + +from autogen_core import CancellationToken, Component +from autogen_core.memory import Memory, MemoryContent, MemoryMimeType, MemoryQueryResult, UpdateContextResult +from autogen_core.model_context import ChatCompletionContext +from autogen_core.models import SystemMessage +from typing_extensions import Self + +from ._config import MemoryScope, SharedMemoryConfig + +logger = logging.getLogger(__name__) + +_SCHEMA_SQL = """ +CREATE TABLE IF NOT EXISTS facts ( + fact_id TEXT PRIMARY KEY, + scope TEXT NOT NULL, + group_id TEXT, + agent_id TEXT, + content TEXT NOT NULL, + confidence REAL NOT NULL DEFAULT 1.0, + version INTEGER NOT NULL DEFAULT 1, + created_by TEXT NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + fact_hash TEXT NOT NULL, + ttl_expires_at TEXT, + deleted INTEGER NOT NULL DEFAULT 0 +); + +CREATE VIRTUAL TABLE IF NOT EXISTS facts_fts USING fts5( + content, + content=facts, + content_rowid=rowid +); + +CREATE TRIGGER IF NOT EXISTS facts_ai AFTER INSERT ON facts BEGIN + INSERT INTO facts_fts(rowid, content) VALUES (new.rowid, new.content); +END; + +CREATE TRIGGER IF NOT EXISTS facts_ad AFTER DELETE ON facts BEGIN + INSERT INTO facts_fts(facts_fts, rowid, content) VALUES('delete', old.rowid, old.content); +END; + +CREATE TRIGGER IF NOT EXISTS facts_au AFTER UPDATE ON facts BEGIN + INSERT INTO facts_fts(facts_fts, rowid, content) VALUES('delete', old.rowid, old.content); + INSERT INTO facts_fts(rowid, content) VALUES (new.rowid, new.content); +END; +""" + + +def _fact_hash(content: str) -> str: + return hashlib.sha256(content.encode()).hexdigest()[:16] + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +class WriteReceipt: + __slots__ = ("fact_id", "scope", "agent_id", "timestamp", "fact_hash", "version") + + def __init__(self, fact_id: str, scope: str, agent_id: str, timestamp: str, fact_hash: str, version: int): + self.fact_id = fact_id + self.scope = scope + self.agent_id = agent_id + self.timestamp = timestamp + self.fact_hash = fact_hash + self.version = version + + def to_dict(self) -> dict[str, Any]: + return { + "fact_id": self.fact_id, + "scope": self.scope, + "agent_id": self.agent_id, + "timestamp": self.timestamp, + "fact_hash": self.fact_hash, + "version": self.version, + } + + +class SharedMemoryStore(Memory, Component[SharedMemoryConfig]): + """Cross-agent shared memory store with scoped FTS recall. + + Stores facts in SQLite with FTS5 full-text search, scoped to agent/group/global. + Facts are recalled as small capsules via tool-result position, not prefix-loaded. + + No external dependencies — uses Python's built-in sqlite3 module. + + Args: + config: Configuration for the shared memory store. + agent_id: Identity of the agent using this store instance. + """ + + component_type = "memory" + component_config_schema = SharedMemoryConfig + component_provider_override = "autogen_ext.memory.shared.SharedMemoryStore" + + def __init__(self, config: SharedMemoryConfig | None = None, agent_id: str = "default") -> None: + self._config = config or SharedMemoryConfig() + self._agent_id = agent_id + self._conn: sqlite3.Connection | None = None + + def _ensure_initialized(self) -> None: + if self._conn is not None: + return + self._conn = sqlite3.connect(self._config.db_path, check_same_thread=False) + self._conn.row_factory = sqlite3.Row + self._conn.executescript(_SCHEMA_SQL) + + def _check_write_auth(self, scope: MemoryScope) -> None: + policy = self._config.write_policies.get(scope.value) + if policy is None: + raise PermissionError(f"No write policy defined for scope '{scope.value}'") + if "*" in policy.allowed_agents: + return + if self._agent_id not in policy.allowed_agents: + raise PermissionError( + f"Agent '{self._agent_id}' is not authorized to write to '{scope.value}' scope" + ) + + def _scope_filter(self, scope: str | None) -> str: + parts = ["deleted = 0"] + now = _now_iso() + parts.append(f"(ttl_expires_at IS NULL OR ttl_expires_at > '{now}')") + + if scope is None or scope == "all": + pass + elif scope == MemoryScope.AGENT.value: + parts.append(f"scope = 'agent' AND agent_id = '{self._agent_id}'") + elif scope == MemoryScope.GROUP.value: + gid = self._config.group_id or "default" + parts.append(f"scope = 'group' AND group_id = '{gid}'") + elif scope == MemoryScope.GLOBAL.value: + parts.append("scope = 'global'") + else: + parts.append(f"scope = '{scope}'") + + return " AND ".join(parts) + + def remember( + self, + fact: str, + scope: MemoryScope = MemoryScope.GROUP, + confidence: float = 1.0, + ttl_days: int | None = None, + ) -> WriteReceipt: + """Write a fact to the store. Returns a receipt with provenance.""" + self._ensure_initialized() + assert self._conn is not None + + self._check_write_auth(scope) + + fact_id = str(uuid.uuid4()) + now = _now_iso() + h = _fact_hash(fact) + ttl_exp = None + if ttl_days is not None: + ttl_exp = (datetime.now(timezone.utc) + timedelta(days=ttl_days)).isoformat() + + group_id = self._config.group_id if scope == MemoryScope.GROUP else None + agent_id_val = self._agent_id if scope == MemoryScope.AGENT else None + + self._conn.execute( + """INSERT INTO facts + (fact_id, scope, group_id, agent_id, content, confidence, + version, created_by, created_at, updated_at, fact_hash, ttl_expires_at, deleted) + VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?, ?, ?, ?, 0)""", + (fact_id, scope.value, group_id, agent_id_val, fact, confidence, + self._agent_id, now, now, h, ttl_exp), + ) + self._conn.commit() + + return WriteReceipt( + fact_id=fact_id, scope=scope.value, agent_id=self._agent_id, + timestamp=now, fact_hash=h, version=1, + ) + + def search( + self, + query_text: str, + scope: str | None = None, + top_k: int | None = None, + ) -> List[dict[str, Any]]: + """Search facts via FTS5. Returns capsules with provenance metadata.""" + self._ensure_initialized() + assert self._conn is not None + + k = top_k or self._config.default_top_k + where = self._scope_filter(scope) + max_bytes = self._config.max_capsule_bytes + + sql = f""" + SELECT f.fact_id, f.scope, f.group_id, f.agent_id, f.content, + f.confidence, f.version, f.created_by, f.created_at, + f.updated_at, f.fact_hash + FROM facts f + JOIN facts_fts fts ON f.rowid = fts.rowid + WHERE fts.facts_fts MATCH ? + AND {where} + ORDER BY fts.rank + LIMIT ? + """ + + # FTS5 query: add prefix matching for each term + fts_query = " OR ".join(f'"{w}"*' for w in query_text.split() if w.strip()) + if not fts_query: + return [] + + try: + rows = self._conn.execute(sql, (fts_query, k)).fetchall() + except sqlite3.OperationalError: + return [] + + results = [] + for row in rows: + content = row["content"] + if len(content.encode()) > max_bytes: + content = content.encode()[:max_bytes].decode(errors="ignore") + "..." + + results.append({ + "fact_id": row["fact_id"], + "content": content, + "scope": row["scope"], + "memory_scope_used": row["scope"], + "created_by": row["created_by"], + "confidence": row["confidence"], + "created_at": row["created_at"], + "fact_hash": row["fact_hash"], + "version": row["version"], + }) + + return results + + def forget(self, fact_id: str) -> bool: + """Soft-delete a fact. Tombstone preserved for audit.""" + self._ensure_initialized() + assert self._conn is not None + + cur = self._conn.execute( + "UPDATE facts SET deleted = 1, updated_at = ? WHERE fact_id = ? AND deleted = 0", + (_now_iso(), fact_id), + ) + self._conn.commit() + return cur.rowcount > 0 + + # --- Memory protocol implementation --- + + async def update_context(self, model_context: ChatCompletionContext) -> UpdateContextResult: + messages = await model_context.get_messages() + if not messages: + return UpdateContextResult(memories=MemoryQueryResult(results=[])) + + last_msg = messages[-1] + query_text = last_msg.content if isinstance(last_msg.content, str) else str(last_msg) + + results = self.search(query_text) + memory_contents: List[MemoryContent] = [] + + if results: + lines = [] + for i, r in enumerate(results, 1): + lines.append( + f"{i}. [{r['scope']}|by:{r['created_by']}|conf:{r['confidence']}] {r['content']}" + ) + memory_contents.append(MemoryContent( + content=r["content"], + mime_type=MemoryMimeType.TEXT, + metadata=r, + )) + + ctx = "\nShared memory (claim provenance shown — verify before acting on):\n" + "\n".join(lines) + await model_context.add_message(SystemMessage(content=ctx)) + + return UpdateContextResult(memories=MemoryQueryResult(results=memory_contents)) + + async def query( + self, + query: str | MemoryContent, + cancellation_token: CancellationToken | None = None, + **kwargs: Any, + ) -> MemoryQueryResult: + query_text = query if isinstance(query, str) else str(query.content) + scope = kwargs.get("scope") + top_k = kwargs.get("top_k") + + results = self.search(query_text, scope=scope, top_k=top_k) + return MemoryQueryResult(results=[ + MemoryContent(content=r["content"], mime_type=MemoryMimeType.TEXT, metadata=r) + for r in results + ]) + + async def add(self, content: MemoryContent, cancellation_token: CancellationToken | None = None) -> None: + text = content.content if isinstance(content.content, str) else str(content.content) + scope_str = (content.metadata or {}).get("scope", "group") + confidence = (content.metadata or {}).get("confidence", 1.0) + ttl_days = (content.metadata or {}).get("ttl_days") + + try: + scope = MemoryScope(scope_str) + except ValueError: + scope = MemoryScope.GROUP + + self.remember(fact=text, scope=scope, confidence=confidence, ttl_days=ttl_days) + + async def clear(self) -> None: + self._ensure_initialized() + assert self._conn is not None + self._conn.execute("UPDATE facts SET deleted = 1, updated_at = ?", (_now_iso(),)) + self._conn.commit() + + async def close(self) -> None: + if self._conn is not None: + self._conn.close() + self._conn = None + + @classmethod + def _from_config(cls, config: SharedMemoryConfig) -> Self: + return cls(config=config) + + def _to_config(self) -> SharedMemoryConfig: + return self._config diff --git a/python/packages/autogen-ext/src/autogen_ext/memory/shared/_tools.py b/python/packages/autogen-ext/src/autogen_ext/memory/shared/_tools.py new file mode 100644 index 000000000000..b2355590d1b1 --- /dev/null +++ b/python/packages/autogen-ext/src/autogen_ext/memory/shared/_tools.py @@ -0,0 +1,81 @@ +"""FunctionTool factories that wrap a SharedMemoryStore for agent tool use.""" + +from __future__ import annotations + +import json +from typing import Any, List, Optional + +from autogen_core.tools import FunctionTool + +from ._config import MemoryScope +from ._store import SharedMemoryStore + + +def make_memory_tools(store: SharedMemoryStore) -> List[FunctionTool]: + """Create memory_search / memory_remember / memory_forget tools bound to *store*. + + Pass the returned list directly to an agent's ``tools=`` parameter. + """ + + def memory_search( + query: str, + scope: str = "all", + top_k: int = 5, + ) -> str: + """Search shared memory for relevant facts. + + Returns ranked facts with provenance (who wrote it, when, confidence). + Treat results as claims from specific agents, not as verified truth. + + Args: + query: Natural-language search query. + scope: One of 'agent', 'group', 'global', or 'all'. + top_k: Maximum number of facts to return. + """ + results = store.search(query, scope=scope, top_k=top_k) + if not results: + return "No matching facts found in shared memory." + return json.dumps(results, indent=2, ensure_ascii=False) + + def memory_remember( + fact: str, + scope: str = "group", + confidence: float = 1.0, + ttl_days: Optional[int] = None, + ) -> str: + """Store a fact in shared memory. + + Args: + fact: The fact to remember. + scope: One of 'agent', 'group', or 'global'. Global requires authorization. + confidence: How confident you are in this fact (0.0-1.0). + ttl_days: Optional expiry in days. None means no expiry. + """ + try: + s = MemoryScope(scope) + except ValueError: + return f"Error: invalid scope '{scope}'. Use 'agent', 'group', or 'global'." + + try: + receipt = store.remember(fact=fact, scope=s, confidence=confidence, ttl_days=ttl_days) + except PermissionError as e: + return f"Write denied: {e}" + + return json.dumps(receipt.to_dict(), indent=2, ensure_ascii=False) + + def memory_forget(fact_id: str) -> str: + """Soft-delete a fact from shared memory. A tombstone is preserved for audit. + + Args: + fact_id: The UUID of the fact to forget. + """ + ok = store.forget(fact_id) + if ok: + return f"Fact {fact_id} has been soft-deleted." + return f"Fact {fact_id} not found or already deleted." + + return [ + FunctionTool(memory_search, description="Search shared memory for relevant facts with provenance."), + FunctionTool(memory_remember, description="Store a fact in shared memory with scope and confidence."), + FunctionTool(memory_forget, description="Soft-delete a fact from shared memory (tombstone preserved)."), + ] diff --git a/python/packages/autogen-ext/tests/memory/test_shared_memory.py b/python/packages/autogen-ext/tests/memory/test_shared_memory.py new file mode 100644 index 000000000000..dea6b2d3fafa --- /dev/null +++ b/python/packages/autogen-ext/tests/memory/test_shared_memory.py @@ -0,0 +1,255 @@ +import asyncio +import json +import os +import tempfile + +import pytest + +from autogen_ext.memory.shared import ( + MemoryScope, + SharedMemoryConfig, + SharedMemoryStore, + WritePolicyEntry, + make_memory_tools, +) +from autogen_core.memory import MemoryContent, MemoryMimeType + + +@pytest.fixture +def store(): + s = SharedMemoryStore( + config=SharedMemoryConfig( + db_path=":memory:", + write_policies={ + "agent": WritePolicyEntry(allowed_agents=["*"]), + "group": WritePolicyEntry(allowed_agents=["*"]), + "global": WritePolicyEntry(allowed_agents=["admin_agent"]), + }, + group_id="test-group", + ), + agent_id="agent_A", + ) + yield s + if s._conn is not None: + s._conn.close() + s._conn = None + + +class TestRememberAndSearch: + def test_remember_returns_receipt(self, store: SharedMemoryStore): + receipt = store.remember("project uses Pydantic v2 only", scope=MemoryScope.GROUP) + assert receipt.fact_id + assert receipt.scope == "group" + assert receipt.agent_id == "agent_A" + assert receipt.fact_hash + assert receipt.version == 1 + + def test_search_finds_stored_fact(self, store: SharedMemoryStore): + store.remember("user prefers formal language", scope=MemoryScope.GROUP) + results = store.search("formal language") + assert len(results) == 1 + assert "formal language" in results[0]["content"] + assert results[0]["memory_scope_used"] == "group" + assert results[0]["created_by"] == "agent_A" + + def test_search_returns_provenance(self, store: SharedMemoryStore): + store.remember("no PII in logs", scope=MemoryScope.GROUP, confidence=0.9) + results = store.search("PII logs") + assert len(results) == 1 + r = results[0] + assert r["confidence"] == 0.9 + assert r["created_by"] == "agent_A" + assert r["fact_hash"] + assert r["version"] == 1 + + def test_search_empty_returns_empty(self, store: SharedMemoryStore): + results = store.search("nonexistent topic") + assert results == [] + + +class TestScopeFiltering: + def test_agent_scope_isolation(self, store: SharedMemoryStore): + store.remember("agent A note", scope=MemoryScope.AGENT) + + store_b = SharedMemoryStore( + config=store._config, + agent_id="agent_B", + ) + store_b._conn = store._conn + + results_a = store.search("agent note", scope="agent") + results_b = store_b.search("agent note", scope="agent") + assert len(results_a) == 1 + assert len(results_b) == 0 + + def test_group_scope_shared(self, store: SharedMemoryStore): + store.remember("shared group fact", scope=MemoryScope.GROUP) + + store_b = SharedMemoryStore( + config=store._config, + agent_id="agent_B", + ) + store_b._conn = store._conn + + results = store_b.search("shared group", scope="group") + assert len(results) == 1 + + def test_all_scope_returns_everything(self, store: SharedMemoryStore): + store.remember("agent only fact", scope=MemoryScope.AGENT) + store.remember("group shared fact", scope=MemoryScope.GROUP) + results = store.search("fact", scope="all") + assert len(results) == 2 + + +class TestWriteAuthorization: + def test_global_write_denied_for_unauthorized(self, store: SharedMemoryStore): + with pytest.raises(PermissionError, match="not authorized"): + store.remember("global fact", scope=MemoryScope.GLOBAL) + + def test_global_write_allowed_for_authorized(self): + admin_store = SharedMemoryStore( + config=SharedMemoryConfig( + db_path=":memory:", + write_policies={ + "agent": WritePolicyEntry(allowed_agents=["*"]), + "group": WritePolicyEntry(allowed_agents=["*"]), + "global": WritePolicyEntry(allowed_agents=["admin_agent"]), + }, + ), + agent_id="admin_agent", + ) + receipt = admin_store.remember("global policy rule", scope=MemoryScope.GLOBAL) + assert receipt.scope == "global" + + +class TestForget: + def test_forget_soft_deletes(self, store: SharedMemoryStore): + receipt = store.remember("temporary fact", scope=MemoryScope.GROUP) + assert store.forget(receipt.fact_id) + results = store.search("temporary fact") + assert len(results) == 0 + + def test_forget_nonexistent_returns_false(self, store: SharedMemoryStore): + assert not store.forget("nonexistent-uuid") + + def test_tombstone_preserved(self, store: SharedMemoryStore): + receipt = store.remember("will be deleted", scope=MemoryScope.GROUP) + store.forget(receipt.fact_id) + store._ensure_initialized() + row = store._conn.execute( + "SELECT deleted FROM facts WHERE fact_id = ?", (receipt.fact_id,) + ).fetchone() + assert row["deleted"] == 1 + + +class TestCapsuleSize: + def test_large_content_truncated(self): + s = SharedMemoryStore( + config=SharedMemoryConfig(db_path=":memory:", max_capsule_bytes=50), + agent_id="agent_A", + ) + s.remember("a" * 200, scope=MemoryScope.GROUP) + results = s.search("a" * 10) + if results: + assert len(results[0]["content"].encode()) <= 54 # 50 + "..." + + +class TestTTL: + def test_expired_facts_not_returned(self, store: SharedMemoryStore): + store.remember("expiring fact", scope=MemoryScope.GROUP, ttl_days=0) + results = store.search("expiring fact") + assert len(results) == 0 + + +class TestMemoryProtocol: + @pytest.mark.asyncio + async def test_add_and_query(self, store: SharedMemoryStore): + content = MemoryContent( + content="user likes dark mode", + mime_type=MemoryMimeType.TEXT, + metadata={"scope": "group"}, + ) + await store.add(content) + result = await store.query("dark mode") + assert len(result.results) == 1 + assert "dark mode" in result.results[0].content + + @pytest.mark.asyncio + async def test_clear_marks_all_deleted(self, store: SharedMemoryStore): + store.remember("fact one", scope=MemoryScope.GROUP) + store.remember("fact two", scope=MemoryScope.GROUP) + await store.clear() + results = store.search("fact") + assert len(results) == 0 + + +class TestTools: + def test_make_memory_tools_returns_three(self, store: SharedMemoryStore): + tools = make_memory_tools(store) + assert len(tools) == 3 + names = {t.name for t in tools} + assert names == {"memory_search", "memory_remember", "memory_forget"} + + @pytest.mark.asyncio + async def test_tool_search_round_trip(self, store: SharedMemoryStore): + from autogen_core import CancellationToken + + tools = make_memory_tools(store) + remember_tool = next(t for t in tools if t.name == "memory_remember") + search_tool = next(t for t in tools if t.name == "memory_search") + + ct = CancellationToken() + + result = await remember_tool.run_json( + {"fact": "deploy freezes on Thursdays", "scope": "group", "confidence": 0.95}, + ct, + ) + receipt = json.loads(result) + assert receipt["scope"] == "group" + + result = await search_tool.run_json( + {"query": "deploy freeze", "scope": "all"}, + ct, + ) + parsed = json.loads(result) + assert len(parsed) >= 1 + assert "Thursday" in parsed[0]["content"] + + @pytest.mark.asyncio + async def test_tool_global_write_denied(self, store: SharedMemoryStore): + from autogen_core import CancellationToken + + tools = make_memory_tools(store) + remember_tool = next(t for t in tools if t.name == "memory_remember") + ct = CancellationToken() + + result = await remember_tool.run_json( + {"fact": "trying global write", "scope": "global"}, + ct, + ) + assert "denied" in result.lower() + + +class TestPersistence: + def test_file_backed_persistence(self): + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: + db_path = f.name + + try: + s1 = SharedMemoryStore( + config=SharedMemoryConfig(db_path=db_path), + agent_id="agent_A", + ) + s1.remember("persistent fact", scope=MemoryScope.GROUP) + asyncio.get_event_loop().run_until_complete(s1.close()) + + s2 = SharedMemoryStore( + config=SharedMemoryConfig(db_path=db_path), + agent_id="agent_B", + ) + results = s2.search("persistent fact") + assert len(results) == 1 + assert results[0]["created_by"] == "agent_A" + asyncio.get_event_loop().run_until_complete(s2.close()) + finally: + os.unlink(db_path) From 7c498ad81ef250af6c464eb5b8242131ade41e75 Mon Sep 17 00:00:00 2001 From: leavedrop Date: Wed, 27 May 2026 10:21:39 +0800 Subject: [PATCH 2/3] fix: parameterize scope filter SQL and sanitize FTS5 query input - _scope_filter now returns (where_clause, params) tuple instead of interpolating agent_id/group_id into SQL strings directly - Strip quote characters from FTS5 search terms to prevent query syntax injection - Remove unused imports (json, Optional, Any) - Fix capsule size test that silently passed on empty results Co-Authored-By: Claude Opus 4.7 (1M context) --- _pr_body_7748.md | 37 +++++++++++++++ .../src/autogen_ext/memory/shared/_store.py | 46 ++++++++----------- .../src/autogen_ext/memory/shared/_tools.py | 4 +- .../tests/memory/test_shared_memory.py | 8 ++-- 4 files changed, 61 insertions(+), 34 deletions(-) create mode 100644 _pr_body_7748.md diff --git a/_pr_body_7748.md b/_pr_body_7748.md new file mode 100644 index 000000000000..407c7cad34e4 --- /dev/null +++ b/_pr_body_7748.md @@ -0,0 +1,37 @@ +## Summary + +Implementation of the `SharedMemoryStore` proposed in #7748 — a cross-agent shared memory store with scoped FTS recall for `autogen-ext`. + +- **SQLite + FTS5 backend**, zero external dependencies (uses Python's built-in `sqlite3`) +- **Three scopes**: `agent` (private), `group` (per-team), `global` (cross-runtime) +- **Tool-shaped capsule recall** via `memory_search` / `memory_remember` / `memory_forget` — facts land in tool-result position, not prefix-loaded +- **Provenance on every read**: `created_by`, `scope`, `confidence`, `timestamp`, `fact_hash` — so consuming agents can distinguish claims from verified facts +- **Write authorization**: per-scope policy; `global` writes restricted by default (addressing the memory-poisoning concerns from @msaleme and @redbotster in #7748) +- **Soft-delete with tombstones** for audit trail +- **Configurable max capsule size** to bound recall payload + +### v1 scope (per discussion in #7748) + +This PR covers the minimal v1 shape the community converged on: +- [x] Global + group + agent scopes with FTS-only search (no embeddings) +- [x] Write receipts with provenance metadata +- [x] Per-scope write authorization +- [x] Optimistic concurrency via version field +- [x] `FunctionTool` factories for easy agent integration +- [x] Implements the `Memory` protocol — works with `AssistantAgent(memory=[...])` +- [ ] `should_recall_memory` policy hook (deferred — needs runtime integration discussion) +- [ ] Embedding column (intentionally deferred; FTS-first per @scosemicolon's suggestion) + +## Test plan + +- [x] 20 tests passing covering: + - Remember/search round-trip with provenance + - Scope isolation (agent facts invisible to other agents) + - Group scope sharing across agents + - Global write authorization (denied for unauthorized, allowed for authorized) + - Soft-delete with tombstone preservation + - Capsule size truncation + - TTL expiry + - Memory protocol compliance (`add`, `query`, `clear`) + - FunctionTool integration (search, remember, forget via `run_json`) + - File-backed SQLite persistence across store instances diff --git a/python/packages/autogen-ext/src/autogen_ext/memory/shared/_store.py b/python/packages/autogen-ext/src/autogen_ext/memory/shared/_store.py index 7b21c7563394..8748744c4d5d 100644 --- a/python/packages/autogen-ext/src/autogen_ext/memory/shared/_store.py +++ b/python/packages/autogen-ext/src/autogen_ext/memory/shared/_store.py @@ -1,12 +1,11 @@ from __future__ import annotations import hashlib -import json import logging import sqlite3 import uuid from datetime import datetime, timedelta, timezone -from typing import Any, List, Optional +from typing import Any, List from autogen_core import CancellationToken, Component from autogen_core.memory import Memory, MemoryContent, MemoryMimeType, MemoryQueryResult, UpdateContextResult @@ -87,15 +86,10 @@ def to_dict(self) -> dict[str, Any]: class SharedMemoryStore(Memory, Component[SharedMemoryConfig]): - """Cross-agent shared memory store with scoped FTS recall. - - Stores facts in SQLite with FTS5 full-text search, scoped to agent/group/global. - Facts are recalled as small capsules via tool-result position, not prefix-loaded. - - No external dependencies — uses Python's built-in sqlite3 module. + """Cross-agent shared memory with scoped FTS5 recall. Args: - config: Configuration for the shared memory store. + config: Store configuration. agent_id: Identity of the agent using this store instance. """ @@ -126,24 +120,25 @@ def _check_write_auth(self, scope: MemoryScope) -> None: f"Agent '{self._agent_id}' is not authorized to write to '{scope.value}' scope" ) - def _scope_filter(self, scope: str | None) -> str: - parts = ["deleted = 0"] - now = _now_iso() - parts.append(f"(ttl_expires_at IS NULL OR ttl_expires_at > '{now}')") + def _scope_filter(self, scope: str | None) -> tuple[str, list[Any]]: + parts = ["deleted = 0", "(ttl_expires_at IS NULL OR ttl_expires_at > ?)"] + params: list[Any] = [_now_iso()] if scope is None or scope == "all": pass elif scope == MemoryScope.AGENT.value: - parts.append(f"scope = 'agent' AND agent_id = '{self._agent_id}'") + parts.append("scope = 'agent' AND agent_id = ?") + params.append(self._agent_id) elif scope == MemoryScope.GROUP.value: - gid = self._config.group_id or "default" - parts.append(f"scope = 'group' AND group_id = '{gid}'") + parts.append("scope = 'group' AND group_id = ?") + params.append(self._config.group_id or "default") elif scope == MemoryScope.GLOBAL.value: parts.append("scope = 'global'") else: - parts.append(f"scope = '{scope}'") + parts.append("scope = ?") + params.append(scope) - return " AND ".join(parts) + return " AND ".join(parts), params def remember( self, @@ -189,12 +184,11 @@ def search( scope: str | None = None, top_k: int | None = None, ) -> List[dict[str, Any]]: - """Search facts via FTS5. Returns capsules with provenance metadata.""" self._ensure_initialized() assert self._conn is not None k = top_k or self._config.default_top_k - where = self._scope_filter(scope) + where_clause, where_params = self._scope_filter(scope) max_bytes = self._config.max_capsule_bytes sql = f""" @@ -204,18 +198,18 @@ def search( FROM facts f JOIN facts_fts fts ON f.rowid = fts.rowid WHERE fts.facts_fts MATCH ? - AND {where} + AND {where_clause} ORDER BY fts.rank LIMIT ? """ - # FTS5 query: add prefix matching for each term - fts_query = " OR ".join(f'"{w}"*' for w in query_text.split() if w.strip()) - if not fts_query: + terms = [w.replace('"', '').strip() for w in query_text.split() if w.strip()] + if not terms: return [] + fts_query = " OR ".join(f'"{t}"*' for t in terms) try: - rows = self._conn.execute(sql, (fts_query, k)).fetchall() + rows = self._conn.execute(sql, [fts_query, *where_params, k]).fetchall() except sqlite3.OperationalError: return [] @@ -251,8 +245,6 @@ def forget(self, fact_id: str) -> bool: self._conn.commit() return cur.rowcount > 0 - # --- Memory protocol implementation --- - async def update_context(self, model_context: ChatCompletionContext) -> UpdateContextResult: messages = await model_context.get_messages() if not messages: diff --git a/python/packages/autogen-ext/src/autogen_ext/memory/shared/_tools.py b/python/packages/autogen-ext/src/autogen_ext/memory/shared/_tools.py index b2355590d1b1..fa16b7286e92 100644 --- a/python/packages/autogen-ext/src/autogen_ext/memory/shared/_tools.py +++ b/python/packages/autogen-ext/src/autogen_ext/memory/shared/_tools.py @@ -1,9 +1,7 @@ -"""FunctionTool factories that wrap a SharedMemoryStore for agent tool use.""" - from __future__ import annotations import json -from typing import Any, List, Optional +from typing import List, Optional from autogen_core.tools import FunctionTool diff --git a/python/packages/autogen-ext/tests/memory/test_shared_memory.py b/python/packages/autogen-ext/tests/memory/test_shared_memory.py index dea6b2d3fafa..d962f1800b7f 100644 --- a/python/packages/autogen-ext/tests/memory/test_shared_memory.py +++ b/python/packages/autogen-ext/tests/memory/test_shared_memory.py @@ -148,10 +148,10 @@ def test_large_content_truncated(self): config=SharedMemoryConfig(db_path=":memory:", max_capsule_bytes=50), agent_id="agent_A", ) - s.remember("a" * 200, scope=MemoryScope.GROUP) - results = s.search("a" * 10) - if results: - assert len(results[0]["content"].encode()) <= 54 # 50 + "..." + s.remember("a " * 100, scope=MemoryScope.GROUP) + results = s.search("a") + assert len(results) == 1 + assert len(results[0]["content"].encode()) <= 54 # 50 + "..." class TestTTL: From 538daf6345ef9e0227b91e0a631aae20f547f5ae Mon Sep 17 00:00:00 2001 From: leavedrop Date: Thu, 28 May 2026 09:20:41 +0800 Subject: [PATCH 3/3] feat(memory): add writer-scope staleness and scope-shadowing receipts stale_after + stale_writer fields bind TTL expiry to the asserting agent rather than a bare clock tick, so consumers can refuse staleness per-writer instead of per-timestamp. shadowed_by annotation on scope=all queries marks broader-scope facts when a narrower-scope fact with the same fact_hash exists, closing the authority-drift gap discussed in #7748. --- .../src/autogen_ext/memory/shared/_store.py | 36 ++++++++++++-- .../tests/memory/test_shared_memory.py | 48 +++++++++++++++++++ 2 files changed, 81 insertions(+), 3 deletions(-) diff --git a/python/packages/autogen-ext/src/autogen_ext/memory/shared/_store.py b/python/packages/autogen-ext/src/autogen_ext/memory/shared/_store.py index 8748744c4d5d..73d0af33ff1c 100644 --- a/python/packages/autogen-ext/src/autogen_ext/memory/shared/_store.py +++ b/python/packages/autogen-ext/src/autogen_ext/memory/shared/_store.py @@ -178,6 +178,8 @@ def remember( timestamp=now, fact_hash=h, version=1, ) + _SCOPE_PRIORITY = {"agent": 0, "group": 1, "global": 2} + def search( self, query_text: str, @@ -194,7 +196,7 @@ def search( sql = f""" SELECT f.fact_id, f.scope, f.group_id, f.agent_id, f.content, f.confidence, f.version, f.created_by, f.created_at, - f.updated_at, f.fact_hash + f.updated_at, f.fact_hash, f.ttl_expires_at FROM facts f JOIN facts_fts fts ON f.rowid = fts.rowid WHERE fts.facts_fts MATCH ? @@ -219,7 +221,7 @@ def search( if len(content.encode()) > max_bytes: content = content.encode()[:max_bytes].decode(errors="ignore") + "..." - results.append({ + capsule: dict[str, Any] = { "fact_id": row["fact_id"], "content": content, "scope": row["scope"], @@ -229,10 +231,38 @@ def search( "created_at": row["created_at"], "fact_hash": row["fact_hash"], "version": row["version"], - }) + } + if row["ttl_expires_at"]: + capsule["stale_after"] = row["ttl_expires_at"] + capsule["stale_writer"] = row["created_by"] + results.append(capsule) + + if scope is None or scope == "all": + self._annotate_shadowing(results) return results + @staticmethod + def _annotate_shadowing(results: List[dict[str, Any]]) -> None: + """When a narrower scope fact shares a fact_hash with a broader scope + fact, mark the broader one with ``shadowed_by`` so consumers know a + more-specific version exists.""" + by_hash: dict[str, list[dict[str, Any]]] = {} + for r in results: + by_hash.setdefault(r["fact_hash"], []).append(r) + for group in by_hash.values(): + if len(group) < 2: + continue + group.sort(key=lambda r: SharedMemoryStore._SCOPE_PRIORITY.get(r["scope"], 9)) + winner = group[0] + for loser in group[1:]: + if SharedMemoryStore._SCOPE_PRIORITY.get(loser["scope"], 9) > SharedMemoryStore._SCOPE_PRIORITY.get(winner["scope"], 9): + loser["shadowed_by"] = { + "fact_id": winner["fact_id"], + "scope": winner["scope"], + "created_by": winner["created_by"], + } + def forget(self, fact_id: str) -> bool: """Soft-delete a fact. Tombstone preserved for audit.""" self._ensure_initialized() diff --git a/python/packages/autogen-ext/tests/memory/test_shared_memory.py b/python/packages/autogen-ext/tests/memory/test_shared_memory.py index d962f1800b7f..a56d82ec2025 100644 --- a/python/packages/autogen-ext/tests/memory/test_shared_memory.py +++ b/python/packages/autogen-ext/tests/memory/test_shared_memory.py @@ -230,6 +230,54 @@ async def test_tool_global_write_denied(self, store: SharedMemoryStore): assert "denied" in result.lower() +class TestWriterScopeStaleness: + def test_stale_after_binds_to_writer(self, store: SharedMemoryStore): + store.remember("ephemeral claim", scope=MemoryScope.GROUP, ttl_days=7) + results = store.search("ephemeral claim") + assert len(results) == 1 + r = results[0] + assert "stale_after" in r + assert "stale_writer" in r + assert r["stale_writer"] == "agent_A" + + def test_no_stale_after_without_ttl(self, store: SharedMemoryStore): + store.remember("permanent fact", scope=MemoryScope.GROUP) + results = store.search("permanent fact") + assert len(results) == 1 + assert "stale_after" not in results[0] + + +class TestShadowing: + def test_global_shadowed_by_group(self): + admin_store = SharedMemoryStore( + config=SharedMemoryConfig( + db_path=":memory:", + write_policies={ + "agent": WritePolicyEntry(allowed_agents=["*"]), + "group": WritePolicyEntry(allowed_agents=["*"]), + "global": WritePolicyEntry(allowed_agents=["agent_A"]), + }, + group_id="test-group", + ), + agent_id="agent_A", + ) + admin_store.remember("deploy freezes on Fridays", scope=MemoryScope.GLOBAL) + admin_store.remember("deploy freezes on Fridays", scope=MemoryScope.GROUP) + results = admin_store.search("deploy freezes", scope="all") + assert len(results) == 2 + global_fact = next(r for r in results if r["scope"] == "global") + group_fact = next(r for r in results if r["scope"] == "group") + assert "shadowed_by" in global_fact + assert global_fact["shadowed_by"]["scope"] == "group" + assert "shadowed_by" not in group_fact + + def test_no_shadowing_when_scoped_search(self, store: SharedMemoryStore): + store.remember("scoped fact", scope=MemoryScope.GROUP) + results = store.search("scoped fact", scope="group") + assert len(results) == 1 + assert "shadowed_by" not in results[0] + + class TestPersistence: def test_file_backed_persistence(self): with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: