diff --git a/code_agent/a2a/server.py b/code_agent/a2a/server.py index d5acde2..563fa6e 100644 --- a/code_agent/a2a/server.py +++ b/code_agent/a2a/server.py @@ -1,4 +1,32 @@ -"""FastAPI A2A server — JSON-RPC 2.0 dispatcher with SSE streaming support. +""" +DEPRECATED — This module is no longer used and will be deleted. +=============================================================== + +All functionality has been migrated to ADK's ``get_fast_api_app`` in +``main.py``, extended with persistent stores via ``code_agent/a2a/stores.py``. + +Why it was kept originally: + • ADK v1.x hardcoded ``InMemoryTaskStore`` with no injection point. + • Custom tasks/submit (fire-and-forget), tasks/resume (HITL), and + API-key auth middleware were not available in get_fast_api_app. + +Current state (see main.py + code_agent/a2a/stores.py): + • Persistent tasks/push-notification stores via patch_adk_stores(), + which monkey-patches the a2a.server.tasks module namespace before + get_fast_api_app is called so DatabaseTaskStore is used when + DATABASE_URL is set. + • Session persistence via session_service_uri=DATABASE_URL. + • ADK's built-in A2A handler covers tasks/send + tasks/sendSubscribe. + +Upstream tracking: + ADK PR #3839 — adds task_store param to to_a2a() (open, unmerged as of + 2026-03). Once get_fast_api_app exposes a native a2a_task_store param, + code_agent/a2a/stores.py can also be removed. + +TODO: delete this file once the migration is confirmed stable in EKS. +---------------------------------------------------------------------- + +FastAPI A2A server — JSON-RPC 2.0 dispatcher with SSE streaming support. Improvements over v0.2.0 ───────────────────────── diff --git a/code_agent/a2a/stores.py b/code_agent/a2a/stores.py new file mode 100644 index 0000000..4134625 --- /dev/null +++ b/code_agent/a2a/stores.py @@ -0,0 +1,68 @@ +"""Factory functions for A2A task and push-notification stores. + +Returns database-backed stores when DATABASE_URL is set, falling back to +in-memory stores for local development and CI. + +The stores are passed directly to ``get_fast_api_app`` via the native +``a2a_task_store`` / ``a2a_push_config_store`` parameters, which were added +by patching the installed google-adk package (see ``scripts/patch_adk.py``). + +Upstream tracking +----------------- +- PR #3839 adds ``task_store`` to ``to_a2a()``: + https://github.com/google/adk-python/pull/3839 +- ``get_fast_api_app`` extension follows the same pattern and is applied via + ``scripts/patch_adk.py`` until ADK ships it natively. + +Once google-adk >= the version that includes both params, delete +``scripts/patch_adk.py`` and remove the patch step from the dev setup. +""" +from __future__ import annotations + +import logging +import os + +logger = logging.getLogger(__name__) + + +def _make_engine(): + """Create a shared SQLAlchemy AsyncEngine from DATABASE_URL.""" + from sqlalchemy.ext.asyncio import create_async_engine + + url = os.environ["DATABASE_URL"] + if url.startswith("postgres://"): + url = url.replace("postgres://", "postgresql+asyncpg://", 1) + elif url.startswith("postgresql://") and "+asyncpg" not in url: + url = url.replace("postgresql://", "postgresql+asyncpg://", 1) + return create_async_engine(url, pool_pre_ping=True) + + +def build_task_store(): + """Return a DatabaseTaskStore when DATABASE_URL is set, else InMemoryTaskStore.""" + from a2a.server.tasks import InMemoryTaskStore + + db_url = os.getenv("DATABASE_URL") + if not db_url: + logger.info("A2A task store: InMemoryTaskStore (no DATABASE_URL)") + return InMemoryTaskStore() + + from a2a.server.tasks import DatabaseTaskStore + + engine = _make_engine() + logger.info("A2A task store: DatabaseTaskStore (Aurora / PostgreSQL)") + return DatabaseTaskStore(engine, table_name="a2a_tasks") + + +def build_push_config_store(): + """Return a DatabasePushNotificationConfigStore when DATABASE_URL is set.""" + from a2a.server.tasks import InMemoryPushNotificationConfigStore + + db_url = os.getenv("DATABASE_URL") + if not db_url: + return InMemoryPushNotificationConfigStore() + + from a2a.server.tasks import DatabasePushNotificationConfigStore + + engine = _make_engine() + logger.info("A2A push-config store: DatabasePushNotificationConfigStore") + return DatabasePushNotificationConfigStore(engine, table_name="a2a_push_configs") diff --git a/main.py b/main.py index e800a28..704c015 100644 --- a/main.py +++ b/main.py @@ -7,10 +7,20 @@ /a2a/code_agent/.well-known/agent.json) Services are selected by DEPLOYMENT_MODE env var: - local (default) — InMemorySessionService, no external deps - eks — DatabaseSessionService backed by Aurora + local (default) — InMemorySessionService + InMemoryTaskStore, no external deps + eks — DatabaseSessionService + DatabaseTaskStore backed by Aurora (requires DATABASE_URL) +A2A task/push-notification stores +---------------------------------- +``get_fast_api_app`` now accepts ``a2a_task_store`` and ``a2a_push_config_store`` +parameters (added via ``scripts/patch_adk.py``; follows ADK PR #3839). +``build_task_store()`` / ``build_push_config_store()`` in ``stores.py`` return +the right backend based on DATABASE_URL. + +Cleanup: once google-adk ships these params natively, delete +``scripts/patch_adk.py`` and remove the patch step from dev setup. + Start the server: uv run python main.py # or @@ -25,13 +35,13 @@ import uvicorn from dotenv import load_dotenv -from pathlib import Path _PROJECT_ROOT = Path(__file__).parent load_dotenv(_PROJECT_ROOT / ".env") load_dotenv(_PROJECT_ROOT / ".env.local", override=True) from google.adk.cli.fast_api import get_fast_api_app # noqa: E402 +from code_agent.a2a.stores import build_push_config_store, build_task_store # noqa: E402 logger = logging.getLogger(__name__) @@ -61,6 +71,8 @@ def _session_uri() -> str | None: app = get_fast_api_app( agents_dir=_AGENTS_DIR, session_service_uri=_session_uri(), + a2a_task_store=build_task_store(), + a2a_push_config_store=build_push_config_store(), web=True, # serve ADK dev UI at /dev-ui/ a2a=True, # enable A2A protocol at /a2a/{agent_name}/ host=_HOST, diff --git a/scripts/patch_adk.py b/scripts/patch_adk.py new file mode 100644 index 0000000..fc0697d --- /dev/null +++ b/scripts/patch_adk.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python3 +"""Apply upstream ADK patches to the installed google-adk package. + +Run this after any ``pip install --upgrade google-adk`` to re-apply patches +that are pending upstream merge: + + uv run python scripts/patch_adk.py + +Patches applied +--------------- +1. agent_to_a2a.py — adds ``task_store: Optional[TaskStore] = None`` to + ``to_a2a()``. Upstream: https://github.com/google/adk-python/pull/3839 + +2. fast_api.py — adds ``a2a_task_store`` and ``a2a_push_config_store`` + params to ``get_fast_api_app()``. Follows the same pattern as PR #3839 + extended to the higher-level function. + +Remove this script and the corresponding code in ``main.py`` / ``stores.py`` +once google-adk >= the version that merges these features is released. +""" +from __future__ import annotations + +import importlib.util +import sys +from pathlib import Path + + +def _find_package_file(package_path: str) -> Path: + """Locate an installed package file by its import-style path.""" + spec = importlib.util.find_spec(package_path.split(".")[0]) + if spec is None or not spec.submodule_search_locations: + raise RuntimeError(f"Package not found: {package_path}") + root = Path(list(spec.submodule_search_locations)[0]).parent + relative = Path(*package_path.split(".")).with_suffix(".py") + target = root / relative + if not target.exists(): + raise FileNotFoundError(f"File not found: {target}") + return target + + +def _apply(path: Path, old: str, new: str, description: str) -> bool: + """Replace `old` with `new` in `path`. Returns True if changed.""" + source = path.read_text(encoding="utf-8") + if new in source: + print(f" [skip] already patched: {description}") + return False + if old not in source: + print(f" [WARN] patch anchor not found — ADK may have changed: {description}") + return False + path.write_text(source.replace(old, new, 1), encoding="utf-8") + print(f" [ok] {description}") + return True + + +# ── Patch 1: agent_to_a2a.py — add TaskStore import ───────────────────────── + +AGENT_TO_A2A_PKG = "google.adk.a2a.utils.agent_to_a2a" + +PATCH_1A_OLD = """\ +from a2a.server.tasks import InMemoryPushNotificationConfigStore +from a2a.server.tasks import InMemoryTaskStore +from a2a.server.tasks import PushNotificationConfigStore +from a2a.types import AgentCard""" + +PATCH_1A_NEW = """\ +from a2a.server.tasks import InMemoryPushNotificationConfigStore +from a2a.server.tasks import InMemoryTaskStore +from a2a.server.tasks import PushNotificationConfigStore +from a2a.server.tasks import TaskStore +from a2a.types import AgentCard""" + +PATCH_1B_OLD = """\ +def to_a2a( + agent: BaseAgent, + *, + host: str = "localhost", + port: int = 8000, + protocol: str = "http", + agent_card: Optional[Union[AgentCard, str]] = None, + push_config_store: Optional[PushNotificationConfigStore] = None, + runner: Optional[Runner] = None, +) -> Starlette:""" + +PATCH_1B_NEW = """\ +def to_a2a( + agent: BaseAgent, + *, + host: str = "localhost", + port: int = 8000, + protocol: str = "http", + agent_card: Optional[Union[AgentCard, str]] = None, + task_store: Optional[TaskStore] = None, + push_config_store: Optional[PushNotificationConfigStore] = None, + runner: Optional[Runner] = None, +) -> Starlette:""" + +PATCH_1C_OLD = " task_store = InMemoryTaskStore()" +PATCH_1C_NEW = " if task_store is None:\n task_store = InMemoryTaskStore()" + +# ── Patch 2: fast_api.py — add a2a_task_store / a2a_push_config_store ──────── + +FAST_API_PKG = "google.adk.cli.fast_api" + +PATCH_2A_OLD = """\ + web: bool, + a2a: bool = False, + host: str = "127.0.0.1",""" + +PATCH_2A_NEW = """\ + web: bool, + a2a: bool = False, + a2a_task_store: Optional[Any] = None, + a2a_push_config_store: Optional[Any] = None, + host: str = "127.0.0.1",""" + +PATCH_2B_OLD = " a2a_task_store = InMemoryTaskStore()" +PATCH_2B_NEW = " a2a_task_store = a2a_task_store or InMemoryTaskStore()" + +PATCH_2C_OLD = " push_config_store = InMemoryPushNotificationConfigStore()" +PATCH_2C_NEW = " push_config_store = a2a_push_config_store or InMemoryPushNotificationConfigStore()" + + +def main() -> None: + changed = 0 + + print(f"Patching agent_to_a2a.py ({AGENT_TO_A2A_PKG})") + try: + f = _find_package_file(AGENT_TO_A2A_PKG) + changed += _apply(f, PATCH_1A_OLD, PATCH_1A_NEW, "add TaskStore import") + changed += _apply(f, PATCH_1B_OLD, PATCH_1B_NEW, "add task_store param to to_a2a()") + changed += _apply(f, PATCH_1C_OLD, PATCH_1C_NEW, "guard InMemoryTaskStore() with if task_store is None (exact PR #3839)") + except Exception as exc: + print(f" [ERROR] {exc}") + sys.exit(1) + + print(f"\nPatching fast_api.py ({FAST_API_PKG})") + try: + f = _find_package_file(FAST_API_PKG) + changed += _apply(f, PATCH_2A_OLD, PATCH_2A_NEW, "add a2a_task_store + a2a_push_config_store params") + changed += _apply(f, PATCH_2B_OLD, PATCH_2B_NEW, "use a2a_task_store or InMemoryTaskStore()") + changed += _apply(f, PATCH_2C_OLD, PATCH_2C_NEW, "use a2a_push_config_store or InMemory...()") + except Exception as exc: + print(f" [ERROR] {exc}") + sys.exit(1) + + print(f"\nDone — {changed} patch(es) applied.") + + +if __name__ == "__main__": + main()