Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion code_agent/a2a/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,32 @@
"""FastAPI A2A server — JSON-RPC 2.0 dispatcher with SSE streaming support.
"""
DEPRECATED — This module is no longer used and will be deleted.
===============================================================

All functionality has been migrated to ADK's ``get_fast_api_app`` in
``main.py``, extended with persistent stores via ``code_agent/a2a/stores.py``.

Why it was kept originally:
• ADK v1.x hardcoded ``InMemoryTaskStore`` with no injection point.
• Custom tasks/submit (fire-and-forget), tasks/resume (HITL), and
API-key auth middleware were not available in get_fast_api_app.

Current state (see main.py + code_agent/a2a/stores.py):
• Persistent tasks/push-notification stores via patch_adk_stores(),
which monkey-patches the a2a.server.tasks module namespace before
get_fast_api_app is called so DatabaseTaskStore is used when
DATABASE_URL is set.
• Session persistence via session_service_uri=DATABASE_URL.
• ADK's built-in A2A handler covers tasks/send + tasks/sendSubscribe.

Upstream tracking:
ADK PR #3839 — adds task_store param to to_a2a() (open, unmerged as of
2026-03). Once get_fast_api_app exposes a native a2a_task_store param,
code_agent/a2a/stores.py can also be removed.

TODO: delete this file once the migration is confirmed stable in EKS.
----------------------------------------------------------------------

FastAPI A2A server — JSON-RPC 2.0 dispatcher with SSE streaming support.

Improvements over v0.2.0
─────────────────────────
Expand Down
68 changes: 68 additions & 0 deletions code_agent/a2a/stores.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""Factory functions for A2A task and push-notification stores.

Returns database-backed stores when DATABASE_URL is set, falling back to
in-memory stores for local development and CI.

The stores are passed directly to ``get_fast_api_app`` via the native
``a2a_task_store`` / ``a2a_push_config_store`` parameters, which were added
by patching the installed google-adk package (see ``scripts/patch_adk.py``).

Upstream tracking
-----------------
- PR #3839 adds ``task_store`` to ``to_a2a()``:
https://github.com/google/adk-python/pull/3839
- ``get_fast_api_app`` extension follows the same pattern and is applied via
``scripts/patch_adk.py`` until ADK ships it natively.

Once google-adk >= the version that includes both params, delete
``scripts/patch_adk.py`` and remove the patch step from the dev setup.
"""
from __future__ import annotations

import logging
import os

logger = logging.getLogger(__name__)


def _make_engine():
"""Create a shared SQLAlchemy AsyncEngine from DATABASE_URL."""
from sqlalchemy.ext.asyncio import create_async_engine

url = os.environ["DATABASE_URL"]
if url.startswith("postgres://"):
url = url.replace("postgres://", "postgresql+asyncpg://", 1)
elif url.startswith("postgresql://") and "+asyncpg" not in url:
url = url.replace("postgresql://", "postgresql+asyncpg://", 1)
return create_async_engine(url, pool_pre_ping=True)


def build_task_store():
"""Return a DatabaseTaskStore when DATABASE_URL is set, else InMemoryTaskStore."""
from a2a.server.tasks import InMemoryTaskStore

db_url = os.getenv("DATABASE_URL")
if not db_url:
logger.info("A2A task store: InMemoryTaskStore (no DATABASE_URL)")
return InMemoryTaskStore()

from a2a.server.tasks import DatabaseTaskStore

engine = _make_engine()
logger.info("A2A task store: DatabaseTaskStore (Aurora / PostgreSQL)")
return DatabaseTaskStore(engine, table_name="a2a_tasks")


def build_push_config_store():
"""Return a DatabasePushNotificationConfigStore when DATABASE_URL is set."""
from a2a.server.tasks import InMemoryPushNotificationConfigStore

db_url = os.getenv("DATABASE_URL")
if not db_url:
return InMemoryPushNotificationConfigStore()

from a2a.server.tasks import DatabasePushNotificationConfigStore

engine = _make_engine()
logger.info("A2A push-config store: DatabasePushNotificationConfigStore")
return DatabasePushNotificationConfigStore(engine, table_name="a2a_push_configs")
18 changes: 15 additions & 3 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,20 @@
/a2a/code_agent/.well-known/agent.json)

Services are selected by DEPLOYMENT_MODE env var:
local (default) — InMemorySessionService, no external deps
eks — DatabaseSessionService backed by Aurora
local (default) — InMemorySessionService + InMemoryTaskStore, no external deps
eks — DatabaseSessionService + DatabaseTaskStore backed by Aurora
(requires DATABASE_URL)

A2A task/push-notification stores
----------------------------------
``get_fast_api_app`` now accepts ``a2a_task_store`` and ``a2a_push_config_store``
parameters (added via ``scripts/patch_adk.py``; follows ADK PR #3839).
``build_task_store()`` / ``build_push_config_store()`` in ``stores.py`` return
the right backend based on DATABASE_URL.

Cleanup: once google-adk ships these params natively, delete
``scripts/patch_adk.py`` and remove the patch step from dev setup.

Start the server:
uv run python main.py
# or
Expand All @@ -25,13 +35,13 @@

import uvicorn
from dotenv import load_dotenv
from pathlib import Path

_PROJECT_ROOT = Path(__file__).parent
load_dotenv(_PROJECT_ROOT / ".env")
load_dotenv(_PROJECT_ROOT / ".env.local", override=True)

from google.adk.cli.fast_api import get_fast_api_app # noqa: E402
from code_agent.a2a.stores import build_push_config_store, build_task_store # noqa: E402

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,6 +71,8 @@ def _session_uri() -> str | None:
app = get_fast_api_app(
agents_dir=_AGENTS_DIR,
session_service_uri=_session_uri(),
a2a_task_store=build_task_store(),
a2a_push_config_store=build_push_config_store(),
web=True, # serve ADK dev UI at /dev-ui/
a2a=True, # enable A2A protocol at /a2a/{agent_name}/
host=_HOST,
Expand Down
150 changes: 150 additions & 0 deletions scripts/patch_adk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#!/usr/bin/env python3
"""Apply upstream ADK patches to the installed google-adk package.

Run this after any ``pip install --upgrade google-adk`` to re-apply patches
that are pending upstream merge:

uv run python scripts/patch_adk.py

Patches applied
---------------
1. agent_to_a2a.py — adds ``task_store: Optional[TaskStore] = None`` to
``to_a2a()``. Upstream: https://github.com/google/adk-python/pull/3839

2. fast_api.py — adds ``a2a_task_store`` and ``a2a_push_config_store``
params to ``get_fast_api_app()``. Follows the same pattern as PR #3839
extended to the higher-level function.

Remove this script and the corresponding code in ``main.py`` / ``stores.py``
once google-adk >= the version that merges these features is released.
"""
from __future__ import annotations

import importlib.util
import sys
from pathlib import Path


def _find_package_file(package_path: str) -> Path:
"""Locate an installed package file by its import-style path."""
spec = importlib.util.find_spec(package_path.split(".")[0])
if spec is None or not spec.submodule_search_locations:
raise RuntimeError(f"Package not found: {package_path}")
root = Path(list(spec.submodule_search_locations)[0]).parent
relative = Path(*package_path.split(".")).with_suffix(".py")
target = root / relative
if not target.exists():
raise FileNotFoundError(f"File not found: {target}")
return target


def _apply(path: Path, old: str, new: str, description: str) -> bool:
"""Replace `old` with `new` in `path`. Returns True if changed."""
source = path.read_text(encoding="utf-8")
if new in source:
print(f" [skip] already patched: {description}")
return False
if old not in source:
print(f" [WARN] patch anchor not found — ADK may have changed: {description}")
return False
path.write_text(source.replace(old, new, 1), encoding="utf-8")
print(f" [ok] {description}")
return True


# ── Patch 1: agent_to_a2a.py — add TaskStore import ─────────────────────────

AGENT_TO_A2A_PKG = "google.adk.a2a.utils.agent_to_a2a"

PATCH_1A_OLD = """\
from a2a.server.tasks import InMemoryPushNotificationConfigStore
from a2a.server.tasks import InMemoryTaskStore
from a2a.server.tasks import PushNotificationConfigStore
from a2a.types import AgentCard"""

PATCH_1A_NEW = """\
from a2a.server.tasks import InMemoryPushNotificationConfigStore
from a2a.server.tasks import InMemoryTaskStore
from a2a.server.tasks import PushNotificationConfigStore
from a2a.server.tasks import TaskStore
from a2a.types import AgentCard"""

PATCH_1B_OLD = """\
def to_a2a(
agent: BaseAgent,
*,
host: str = "localhost",
port: int = 8000,
protocol: str = "http",
agent_card: Optional[Union[AgentCard, str]] = None,
push_config_store: Optional[PushNotificationConfigStore] = None,
runner: Optional[Runner] = None,
) -> Starlette:"""

PATCH_1B_NEW = """\
def to_a2a(
agent: BaseAgent,
*,
host: str = "localhost",
port: int = 8000,
protocol: str = "http",
agent_card: Optional[Union[AgentCard, str]] = None,
task_store: Optional[TaskStore] = None,
push_config_store: Optional[PushNotificationConfigStore] = None,
runner: Optional[Runner] = None,
) -> Starlette:"""

PATCH_1C_OLD = " task_store = InMemoryTaskStore()"
PATCH_1C_NEW = " if task_store is None:\n task_store = InMemoryTaskStore()"

# ── Patch 2: fast_api.py — add a2a_task_store / a2a_push_config_store ────────

FAST_API_PKG = "google.adk.cli.fast_api"

PATCH_2A_OLD = """\
web: bool,
a2a: bool = False,
host: str = "127.0.0.1","""

PATCH_2A_NEW = """\
web: bool,
a2a: bool = False,
a2a_task_store: Optional[Any] = None,
a2a_push_config_store: Optional[Any] = None,
host: str = "127.0.0.1","""

PATCH_2B_OLD = " a2a_task_store = InMemoryTaskStore()"
PATCH_2B_NEW = " a2a_task_store = a2a_task_store or InMemoryTaskStore()"

PATCH_2C_OLD = " push_config_store = InMemoryPushNotificationConfigStore()"
PATCH_2C_NEW = " push_config_store = a2a_push_config_store or InMemoryPushNotificationConfigStore()"


def main() -> None:
changed = 0

print(f"Patching agent_to_a2a.py ({AGENT_TO_A2A_PKG})")
try:
f = _find_package_file(AGENT_TO_A2A_PKG)
changed += _apply(f, PATCH_1A_OLD, PATCH_1A_NEW, "add TaskStore import")
changed += _apply(f, PATCH_1B_OLD, PATCH_1B_NEW, "add task_store param to to_a2a()")
changed += _apply(f, PATCH_1C_OLD, PATCH_1C_NEW, "guard InMemoryTaskStore() with if task_store is None (exact PR #3839)")
except Exception as exc:
print(f" [ERROR] {exc}")
sys.exit(1)

print(f"\nPatching fast_api.py ({FAST_API_PKG})")
try:
f = _find_package_file(FAST_API_PKG)
changed += _apply(f, PATCH_2A_OLD, PATCH_2A_NEW, "add a2a_task_store + a2a_push_config_store params")
changed += _apply(f, PATCH_2B_OLD, PATCH_2B_NEW, "use a2a_task_store or InMemoryTaskStore()")
changed += _apply(f, PATCH_2C_OLD, PATCH_2C_NEW, "use a2a_push_config_store or InMemory...()")
except Exception as exc:
print(f" [ERROR] {exc}")
sys.exit(1)

print(f"\nDone — {changed} patch(es) applied.")


if __name__ == "__main__":
main()