diff --git a/examples/python/a2a_server.py b/examples/python/a2a_server.py index 1f113ae6..abf672e1 100644 --- a/examples/python/a2a_server.py +++ b/examples/python/a2a_server.py @@ -1,6 +1,7 @@ # /// script # dependencies = [ # "a2a-sdk[http-server]", +# "httpx", # "openai", # "uvicorn", # ] @@ -19,6 +20,7 @@ from typing import Dict, List, Any, Optional from fastapi import FastAPI, Request, HTTPException, APIRouter from fastapi.responses import JSONResponse +import httpx from openai import OpenAI from a2a.server.agent_execution.agent_executor import AgentExecutor from a2a.server.agent_execution.context import RequestContext @@ -32,6 +34,18 @@ # Initialize OpenAI client openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) +# Realtime API defaults (optional override via env or request body) +OPENAI_REALTIME_SESSIONS_URL = os.getenv( + "OPENAI_REALTIME_SESSIONS_URL", + "https://api.openai.com/v1/realtime/sessions", +) +DEFAULT_REALTIME_MODEL = os.getenv( + "OPENAI_REALTIME_MODEL", + "gpt-4o-realtime-preview-2024-12-17", +) +DEFAULT_REALTIME_VOICE = os.getenv("OPENAI_REALTIME_VOICE") +DEFAULT_REALTIME_MODALITIES = os.getenv("OPENAI_REALTIME_MODALITIES") + # Agent IDs PERSONAL_ASSISTANT_ID = "00000000-0000-0000-0000-000000000000" WEATHER_ASSISTANT_ID = "FFFFFFFF-FFFF-FFFF-FFFF-FFFFFFFFFFFF" @@ -142,6 +156,64 @@ def build_system_message(agent_id: str, tools: List[Dict[str, Any]]) -> str: return "\n".join(system_parts) +def parse_modalities(value: Optional[str]) -> Optional[List[str]]: + """Parse comma-separated modalities from env var.""" + if not value: + return None + return [item.strip() for item in value.split(",") if item.strip()] + + +def build_realtime_instructions(agent_id: str, extra_instructions: Optional[str]) -> str: + """Build realtime instructions using the same agent system message.""" + base_instructions = build_system_message(agent_id, AGENT_TOOLS.get(agent_id, [])) + if extra_instructions: + return f"{base_instructions}\n\nAdditional instructions:\n{extra_instructions}" + return base_instructions + + +def build_realtime_session_payload( + agent_id: str, + body: Optional[Dict[str, Any]], +) -> Dict[str, Any]: + """Build OpenAI Realtime session payload with safe defaults.""" + body = body or {} + tools = AGENT_TOOLS.get(agent_id, []) + + payload: Dict[str, Any] = { + "model": body.get("model") or DEFAULT_REALTIME_MODEL, + "instructions": build_realtime_instructions(agent_id, body.get("instructions")), + } + + modalities = body.get("modalities") + if modalities is None: + modalities = parse_modalities(DEFAULT_REALTIME_MODALITIES) + if modalities is not None: + payload["modalities"] = modalities + + voice = body.get("voice") + if voice is None: + voice = DEFAULT_REALTIME_VOICE + if voice is not None: + payload["voice"] = voice + + include_tools = body.get("include_tools", True) + if include_tools and tools: + payload["tools"] = tools + payload["tool_choice"] = body.get("tool_choice", "auto") + + for key in ( + "input_audio_format", + "output_audio_format", + "turn_detection", + "temperature", + "max_output_tokens", + ): + if key in body: + payload[key] = body[key] + + return payload + + # Simple in-memory task storage (messages per task, keyed by agent_id:task_id) task_messages: Dict[str, List[Dict[str, Any]]] = {} @@ -491,6 +563,48 @@ async def get_agent_card(agent_id: str): return JSONResponse(card.model_dump(mode="json")) +# Realtime session endpoint (WebRTC clients can use returned client_secret) +@app.post("/agents/{agent_id}/realtime/session") +async def create_realtime_session(request: Request, agent_id: str): + """Create an OpenAI Realtime session scoped to the selected agent.""" + if agent_id not in agent_a2a_apps: + raise HTTPException(status_code=404, detail=f"Agent {agent_id} not found") + + api_key = os.getenv("OPENAI_API_KEY") + if not api_key: + raise HTTPException(status_code=500, detail="OPENAI_API_KEY is not set") + + try: + body = await request.json() + except Exception: + body = {} + + payload = build_realtime_session_payload(agent_id, body) + + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post( + OPENAI_REALTIME_SESSIONS_URL, + json=payload, + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + }, + ) + response.raise_for_status() + except httpx.HTTPStatusError as exc: + raise HTTPException( + status_code=exc.response.status_code, + detail=exc.response.text, + ) from exc + except httpx.HTTPError as exc: + raise HTTPException( + status_code=502, + detail=f"Realtime session request failed: {exc}", + ) from exc + + return JSONResponse(response.json()) + # Create agent-specific A2A apps and mount their routes agent_a2a_apps: Dict[str, A2ARESTFastAPIApplication] = { PERSONAL_ASSISTANT_ID: personal_a2a_app,