diff --git a/agents-core/vision_agents/core/agents/agent_launcher.py b/agents-core/vision_agents/core/agents/agent_launcher.py index 4f677e759..a4afdfcbd 100644 --- a/agents-core/vision_agents/core/agents/agent_launcher.py +++ b/agents-core/vision_agents/core/agents/agent_launcher.py @@ -198,11 +198,13 @@ async def warmup(self) -> None: # Create a dry-run Agent instance and warmup its components for the first time. agent: "Agent" = await await_or_run(self._create_agent) - logger.info("Warming up agent components...") - await self._warmup_agent(agent) - self._warmed_up = True - - logger.info("Agent warmup completed") + try: + logger.info("Warming up agent components...") + await self._warmup_agent(agent) + self._warmed_up = True + logger.info("Agent warmup completed") + finally: + await agent.close() @property def warmed_up(self) -> bool: diff --git a/agents-core/vision_agents/core/agents/agents.py b/agents-core/vision_agents/core/agents/agents.py index 26f310d7b..fb53e8d4c 100644 --- a/agents-core/vision_agents/core/agents/agents.py +++ b/agents-core/vision_agents/core/agents/agents.py @@ -799,6 +799,7 @@ async def _close(self): self._call_ended_event = None self._joined_at = 0.0 self.clear_call_logging_context() + self.events.stop() self._closed = True self.logger.info("🤖 Agent stopped") diff --git a/agents-core/vision_agents/core/events/__init__.py b/agents-core/vision_agents/core/events/__init__.py index 7e87154e7..1a22577af 100644 --- a/agents-core/vision_agents/core/events/__init__.py +++ b/agents-core/vision_agents/core/events/__init__.py @@ -1,15 +1,3 @@ -from .base import ( - ConnectionState, - AudioFormat, - BaseEvent, - PluginBaseEvent, - PluginInitializedEvent, - PluginClosedEvent, - PluginErrorEvent, - VideoProcessorDetectionEvent, -) -from .manager import EventManager - from getstream.models import ( BlockedUserEvent, CallAcceptedEvent, @@ -65,6 +53,15 @@ UpdatedCallPermissionsEvent, ) +from .base import ( + AudioFormat, + BaseEvent, + ConnectionState, + PluginBaseEvent, + VideoProcessorDetectionEvent, +) +from .manager import EventManager + __all__ = [ "BlockedUserEvent", "CallAcceptedEvent", @@ -125,9 +122,6 @@ "AudioFormat", "BaseEvent", "PluginBaseEvent", - "PluginInitializedEvent", - "PluginClosedEvent", - "PluginErrorEvent", "VideoProcessorDetectionEvent", "EventManager", ] diff --git a/agents-core/vision_agents/core/events/base.py b/agents-core/vision_agents/core/events/base.py index 36c93cf01..7250c6a42 100644 --- a/agents-core/vision_agents/core/events/base.py +++ b/agents-core/vision_agents/core/events/base.py @@ -1,12 +1,12 @@ -import uuid import dataclasses +import uuid from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum -from typing import Any, Dict, List, Optional from types import FunctionType -from dataclasses_json import DataClassJsonMixin +from typing import Any, Optional +from dataclasses_json import DataClassJsonMixin from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import Participant @@ -54,45 +54,6 @@ class PluginBaseEvent(BaseEvent): plugin_version: str | None = None -@dataclass -class PluginInitializedEvent(PluginBaseEvent): - """Event emitted when a plugin is successfully initialized.""" - - type: str = field(default="plugin.initialized", init=False) - plugin_type: Optional[str] = None - provider: Optional[str] = None - configuration: Optional[Dict[str, Any]] = None - capabilities: Optional[List[str]] = None - - -@dataclass -class PluginClosedEvent(PluginBaseEvent): - """Event emitted when a plugin is closed.""" - - type: str = field(default="plugin.closed", init=False) - plugin_type: Optional[str] = None # "STT", "STS", "VAD" - provider: Optional[str] = None - reason: Optional[str] = None - cleanup_successful: bool = True - - -@dataclass -class PluginErrorEvent(PluginBaseEvent): - """Event emitted when a generic plugin error occurs.""" - - type: str = field(default="plugin.error", init=False) - plugin_type: Optional[str] = None # "STT", "TTS", "STS", "VAD" - provider: Optional[str] = None - error: Optional[Exception] = None - error_code: Optional[str] = None - context: Optional[str] = None - is_fatal: bool = False - - @property - def error_message(self) -> str: - return str(self.error) if self.error else "Unknown error" - - @dataclasses.dataclass class ExceptionEvent: exc: Exception @@ -100,45 +61,6 @@ class ExceptionEvent: type: str = "base.exception" -@dataclasses.dataclass -class HealthCheckEvent(DataClassJsonMixin): - connection_id: str - created_at: int - custom: dict - type: str = "health.check" - - -@dataclass -class ConnectionOkEvent(BaseEvent): - """Event emitted when WebSocket connection is established.""" - - type: str = field(default="connection.ok", init=False) - connection_id: Optional[str] = None - server_time: Optional[str] = None - api_key: Optional[str] = None - user_id: Optional[str] = None # type: ignore[assignment] - - -@dataclass -class ConnectionErrorEvent(BaseEvent): - """Event emitted when WebSocket connection encounters an error.""" - - type: str = field(default="connection.error", init=False) - error_code: Optional[str] = None - error_message: Optional[str] = None - reconnect_attempt: Optional[int] = None - - -@dataclass -class ConnectionClosedEvent(BaseEvent): - """Event emitted when WebSocket connection is closed.""" - - type: str = field(default="connection.closed", init=False) - code: Optional[int] = None - reason: Optional[str] = None - was_clean: bool = False - - @dataclass class VideoProcessorDetectionEvent(PluginBaseEvent): """Base event for video processor detection results. diff --git a/agents-core/vision_agents/core/events/manager.py b/agents-core/vision_agents/core/events/manager.py index dee53c5aa..75f8f0c5a 100644 --- a/agents-core/vision_agents/core/events/manager.py +++ b/agents-core/vision_agents/core/events/manager.py @@ -1,19 +1,12 @@ import asyncio -import uuid import collections import logging import types import typing +import uuid from typing import Any, Deque, Dict, Optional, Union, get_args, get_origin -from .base import ( - ConnectionClosedEvent, - ConnectionErrorEvent, - ConnectionOkEvent, - ExceptionEvent, - HealthCheckEvent, -) - +from .base import ExceptionEvent logger = logging.getLogger(__name__) @@ -145,10 +138,6 @@ def __init__(self, ignore_unknown_events: bool = True): self._received_event = asyncio.Event() self.register(ExceptionEvent) - self.register(HealthCheckEvent) - self.register(ConnectionOkEvent) - self.register(ConnectionErrorEvent) - self.register(ConnectionClosedEvent) # Start background processing task self._start_processing_task() @@ -195,8 +184,7 @@ def register(self, event_class, ignore_not_compatible=False): def merge(self, em: "EventManager"): # Stop the processing task in the merged manager - if em._processing_task and not em._processing_task.done(): - em._processing_task.cancel() + em.stop() # Merge all data from the other manager self._events.update(em._events) @@ -559,3 +547,8 @@ async def _process_single_event(self, event): loop = asyncio.get_running_loop() handler_task = loop.create_task(self._run_handler(handler, event)) self._handler_tasks[uuid.uuid4()] = handler_task + + def stop(self): + if self._processing_task and not self._processing_task.done(): + self._processing_task.cancel() + self._processing_task = None diff --git a/agents-core/vision_agents/core/runner/runner.py b/agents-core/vision_agents/core/runner/runner.py index 80794eb6b..687563cc9 100644 --- a/agents-core/vision_agents/core/runner/runner.py +++ b/agents-core/vision_agents/core/runner/runner.py @@ -1,5 +1,6 @@ import asyncio import logging +import os import warnings from typing import Optional from uuid import uuid4 @@ -26,10 +27,6 @@ logger = logging.getLogger(__name__) -# TODO: -# - Figure out how to serialize the agent config into some dict -# - Docs - asyncio_logger = logging.getLogger("asyncio") @@ -127,12 +124,15 @@ async def _run(): # Start the agent launcher. await self._launcher.start() - # Create the agent - agent = await self._launcher.launch() - logger.info("✅ Agent warmed up and ready") + # Join call if join_call function is provided + logger.info(f"📞 Joining call: {call_type}/{call_id}") + session = await self._launcher.start_session( + call_id, call_type, video_track_override_path=video_track_override + ) # Open demo UI by default + agent = session.agent if ( not no_demo and hasattr(agent, "edge") @@ -141,11 +141,6 @@ async def _run(): logger.info("🌐 Opening demo UI...") await agent.edge.open_demo_for_agent(agent, call_type, call_id) - # Join call if join_call function is provided - logger.info(f"📞 Joining call: {call_type}/{call_id}") - session = await self._launcher.start_session( - call_id, call_type, video_track_override_path=video_track_override - ) await session.wait() except asyncio.CancelledError: logger.info("The session is cancelled, shutting down gracefully...") @@ -177,18 +172,17 @@ def serve( port: int = 8000, agents_log_level: str = "INFO", http_log_level: str = "INFO", - ): + debug: bool = False, + ) -> None: """ Start the HTTP server that spawns agents to the calls. Args: - host: - port: - agents_log_level: - http_log_level: - - Returns: - + host: Host address to bind the server to. + port: Port number for the server. + agents_log_level: Logging level for agent-related logs. + http_log_level: Logging level for FastAPI and uvicorn logs. + debug: Enable asyncio debug mode. """ # Configure loggers if they're not already configured configure_sdk_logger( @@ -203,9 +197,22 @@ def serve( warnings.filterwarnings( "ignore", category=RuntimeWarning, module="dataclasses_json.core" ) + + # Enable asyncio debug via environment variable before uvicorn creates its loop + if debug: + os.environ.setdefault("PYTHONASYNCIODEBUG", "1") uvicorn.run(self.fast_api, host=host, port=port, log_config=None) def _create_fastapi_app(self, options: ServeOptions) -> FastAPI: + """ + Create and configure a FastAPI application for serving agents. + + Args: + options: Configuration options for the server. + + Returns: + Configured FastAPI application instance. + """ app = FastAPI(lifespan=lifespan) app.state.launcher = self._launcher app.state.options = self._serve_options @@ -228,9 +235,9 @@ def _create_fastapi_app(self, options: ServeOptions) -> FastAPI: ) return app - def cli(self): + def cli(self) -> None: """ - Run the CLI + Run the command-line interface with `run` and `serve` subcommands. """ @click.group() @@ -326,11 +333,18 @@ def run_cmd( default="INFO", help="Set the logging level for FastAPI and uvicorn", ) + @click.option( + "--debug", + is_flag=True, + default=False, + help="Enable asyncio debug mode", + ) def serve_cmd( host: str, port: int, agents_log_level: str, http_log_level: str, + debug: bool, ) -> None: """ Start the HTTP server that spawns agents to the calls. @@ -340,6 +354,7 @@ def serve_cmd( port=port, agents_log_level=agents_log_level.upper(), http_log_level=http_log_level.upper(), + debug=debug, ) cli_() diff --git a/agents-core/vision_agents/core/tts/tts.py b/agents-core/vision_agents/core/tts/tts.py index 6f29d5bed..c03784e5c 100644 --- a/agents-core/vision_agents/core/tts/tts.py +++ b/agents-core/vision_agents/core/tts/tts.py @@ -7,7 +7,6 @@ import av from vision_agents.core.events import ( AudioFormat, - PluginClosedEvent, ) from vision_agents.core.events.manager import EventManager @@ -282,12 +281,3 @@ async def send( async def close(self): """Close the TTS service and release any resources.""" - self.events.send( - PluginClosedEvent( - session_id=self.session_id, - plugin_name=self.provider_name, - plugin_type="TTS", - provider=self.provider_name, - cleanup_successful=True, - ) - )