From 79b2efa8b94d3c465488f0551b0eb12010bf9a67 Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Fri, 23 Jan 2026 13:58:22 +0100 Subject: [PATCH 1/8] Stop EventManager when closing the agent to prevent processing task getting destroyed --- agents-core/vision_agents/core/agents/agents.py | 1 + agents-core/vision_agents/core/events/manager.py | 10 ++++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/agents-core/vision_agents/core/agents/agents.py b/agents-core/vision_agents/core/agents/agents.py index 26f310d7b..fb53e8d4c 100644 --- a/agents-core/vision_agents/core/agents/agents.py +++ b/agents-core/vision_agents/core/agents/agents.py @@ -799,6 +799,7 @@ async def _close(self): self._call_ended_event = None self._joined_at = 0.0 self.clear_call_logging_context() + self.events.stop() self._closed = True self.logger.info("🤖 Agent stopped") diff --git a/agents-core/vision_agents/core/events/manager.py b/agents-core/vision_agents/core/events/manager.py index dee53c5aa..3f4548094 100644 --- a/agents-core/vision_agents/core/events/manager.py +++ b/agents-core/vision_agents/core/events/manager.py @@ -1,9 +1,9 @@ import asyncio -import uuid import collections import logging import types import typing +import uuid from typing import Any, Deque, Dict, Optional, Union, get_args, get_origin from .base import ( @@ -14,7 +14,6 @@ HealthCheckEvent, ) - logger = logging.getLogger(__name__) @@ -195,8 +194,7 @@ def register(self, event_class, ignore_not_compatible=False): def merge(self, em: "EventManager"): # Stop the processing task in the merged manager - if em._processing_task and not em._processing_task.done(): - em._processing_task.cancel() + em.stop() # Merge all data from the other manager self._events.update(em._events) @@ -559,3 +557,7 @@ async def _process_single_event(self, event): loop = asyncio.get_running_loop() handler_task = loop.create_task(self._run_handler(handler, event)) self._handler_tasks[uuid.uuid4()] = handler_task + + def stop(self): + if self._processing_task and not self._processing_task.done(): + self._processing_task.cancel() From d84451476d81789539e8ae77508192e2fe8ae11c Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Fri, 23 Jan 2026 13:58:35 +0100 Subject: [PATCH 2/8] Close agent after a warmup --- .../vision_agents/core/agents/agent_launcher.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/agents-core/vision_agents/core/agents/agent_launcher.py b/agents-core/vision_agents/core/agents/agent_launcher.py index 4f677e759..a4afdfcbd 100644 --- a/agents-core/vision_agents/core/agents/agent_launcher.py +++ b/agents-core/vision_agents/core/agents/agent_launcher.py @@ -198,11 +198,13 @@ async def warmup(self) -> None: # Create a dry-run Agent instance and warmup its components for the first time. agent: "Agent" = await await_or_run(self._create_agent) - logger.info("Warming up agent components...") - await self._warmup_agent(agent) - self._warmed_up = True - - logger.info("Agent warmup completed") + try: + logger.info("Warming up agent components...") + await self._warmup_agent(agent) + self._warmed_up = True + logger.info("Agent warmup completed") + finally: + await agent.close() @property def warmed_up(self) -> bool: From 2bac8353df421b7f3ca7450c340714ebd2c1c0f3 Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Fri, 23 Jan 2026 13:58:57 +0100 Subject: [PATCH 3/8] Runner: start agent session before opening the demo UI --- agents-core/vision_agents/core/runner/runner.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/agents-core/vision_agents/core/runner/runner.py b/agents-core/vision_agents/core/runner/runner.py index 80794eb6b..66027add8 100644 --- a/agents-core/vision_agents/core/runner/runner.py +++ b/agents-core/vision_agents/core/runner/runner.py @@ -127,12 +127,15 @@ async def _run(): # Start the agent launcher. await self._launcher.start() - # Create the agent - agent = await self._launcher.launch() - logger.info("✅ Agent warmed up and ready") + # Join call if join_call function is provided + logger.info(f"📞 Joining call: {call_type}/{call_id}") + session = await self._launcher.start_session( + call_id, call_type, video_track_override_path=video_track_override + ) # Open demo UI by default + agent = session.agent if ( not no_demo and hasattr(agent, "edge") @@ -141,11 +144,6 @@ async def _run(): logger.info("🌐 Opening demo UI...") await agent.edge.open_demo_for_agent(agent, call_type, call_id) - # Join call if join_call function is provided - logger.info(f"📞 Joining call: {call_type}/{call_id}") - session = await self._launcher.start_session( - call_id, call_type, video_track_override_path=video_track_override - ) await session.wait() except asyncio.CancelledError: logger.info("The session is cancelled, shutting down gracefully...") From 6176cf9a0f67392ca69b2c96daf6f0732005d78b Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Fri, 23 Jan 2026 17:05:50 +0100 Subject: [PATCH 4/8] Remove unused plugin events --- .../vision_agents/core/events/__init__.py | 21 ++--- agents-core/vision_agents/core/events/base.py | 84 +------------------ .../vision_agents/core/events/manager.py | 12 +-- agents-core/vision_agents/core/tts/tts.py | 10 --- 4 files changed, 13 insertions(+), 114 deletions(-) diff --git a/agents-core/vision_agents/core/events/__init__.py b/agents-core/vision_agents/core/events/__init__.py index 7e87154e7..d41954ce7 100644 --- a/agents-core/vision_agents/core/events/__init__.py +++ b/agents-core/vision_agents/core/events/__init__.py @@ -1,15 +1,3 @@ -from .base import ( - ConnectionState, - AudioFormat, - BaseEvent, - PluginBaseEvent, - PluginInitializedEvent, - PluginClosedEvent, - PluginErrorEvent, - VideoProcessorDetectionEvent, -) -from .manager import EventManager - from getstream.models import ( BlockedUserEvent, CallAcceptedEvent, @@ -65,6 +53,15 @@ UpdatedCallPermissionsEvent, ) +from .base import ( + AudioFormat, + BaseEvent, + ConnectionState, + PluginBaseEvent, + VideoProcessorDetectionEvent, +) +from .manager import EventManager + __all__ = [ "BlockedUserEvent", "CallAcceptedEvent", diff --git a/agents-core/vision_agents/core/events/base.py b/agents-core/vision_agents/core/events/base.py index 36c93cf01..7250c6a42 100644 --- a/agents-core/vision_agents/core/events/base.py +++ b/agents-core/vision_agents/core/events/base.py @@ -1,12 +1,12 @@ -import uuid import dataclasses +import uuid from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum -from typing import Any, Dict, List, Optional from types import FunctionType -from dataclasses_json import DataClassJsonMixin +from typing import Any, Optional +from dataclasses_json import DataClassJsonMixin from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import Participant @@ -54,45 +54,6 @@ class PluginBaseEvent(BaseEvent): plugin_version: str | None = None -@dataclass -class PluginInitializedEvent(PluginBaseEvent): - """Event emitted when a plugin is successfully initialized.""" - - type: str = field(default="plugin.initialized", init=False) - plugin_type: Optional[str] = None - provider: Optional[str] = None - configuration: Optional[Dict[str, Any]] = None - capabilities: Optional[List[str]] = None - - -@dataclass -class PluginClosedEvent(PluginBaseEvent): - """Event emitted when a plugin is closed.""" - - type: str = field(default="plugin.closed", init=False) - plugin_type: Optional[str] = None # "STT", "STS", "VAD" - provider: Optional[str] = None - reason: Optional[str] = None - cleanup_successful: bool = True - - -@dataclass -class PluginErrorEvent(PluginBaseEvent): - """Event emitted when a generic plugin error occurs.""" - - type: str = field(default="plugin.error", init=False) - plugin_type: Optional[str] = None # "STT", "TTS", "STS", "VAD" - provider: Optional[str] = None - error: Optional[Exception] = None - error_code: Optional[str] = None - context: Optional[str] = None - is_fatal: bool = False - - @property - def error_message(self) -> str: - return str(self.error) if self.error else "Unknown error" - - @dataclasses.dataclass class ExceptionEvent: exc: Exception @@ -100,45 +61,6 @@ class ExceptionEvent: type: str = "base.exception" -@dataclasses.dataclass -class HealthCheckEvent(DataClassJsonMixin): - connection_id: str - created_at: int - custom: dict - type: str = "health.check" - - -@dataclass -class ConnectionOkEvent(BaseEvent): - """Event emitted when WebSocket connection is established.""" - - type: str = field(default="connection.ok", init=False) - connection_id: Optional[str] = None - server_time: Optional[str] = None - api_key: Optional[str] = None - user_id: Optional[str] = None # type: ignore[assignment] - - -@dataclass -class ConnectionErrorEvent(BaseEvent): - """Event emitted when WebSocket connection encounters an error.""" - - type: str = field(default="connection.error", init=False) - error_code: Optional[str] = None - error_message: Optional[str] = None - reconnect_attempt: Optional[int] = None - - -@dataclass -class ConnectionClosedEvent(BaseEvent): - """Event emitted when WebSocket connection is closed.""" - - type: str = field(default="connection.closed", init=False) - code: Optional[int] = None - reason: Optional[str] = None - was_clean: bool = False - - @dataclass class VideoProcessorDetectionEvent(PluginBaseEvent): """Base event for video processor detection results. diff --git a/agents-core/vision_agents/core/events/manager.py b/agents-core/vision_agents/core/events/manager.py index 3f4548094..1200123ed 100644 --- a/agents-core/vision_agents/core/events/manager.py +++ b/agents-core/vision_agents/core/events/manager.py @@ -6,13 +6,7 @@ import uuid from typing import Any, Deque, Dict, Optional, Union, get_args, get_origin -from .base import ( - ConnectionClosedEvent, - ConnectionErrorEvent, - ConnectionOkEvent, - ExceptionEvent, - HealthCheckEvent, -) +from .base import ExceptionEvent logger = logging.getLogger(__name__) @@ -144,10 +138,6 @@ def __init__(self, ignore_unknown_events: bool = True): self._received_event = asyncio.Event() self.register(ExceptionEvent) - self.register(HealthCheckEvent) - self.register(ConnectionOkEvent) - self.register(ConnectionErrorEvent) - self.register(ConnectionClosedEvent) # Start background processing task self._start_processing_task() diff --git a/agents-core/vision_agents/core/tts/tts.py b/agents-core/vision_agents/core/tts/tts.py index 6f29d5bed..c03784e5c 100644 --- a/agents-core/vision_agents/core/tts/tts.py +++ b/agents-core/vision_agents/core/tts/tts.py @@ -7,7 +7,6 @@ import av from vision_agents.core.events import ( AudioFormat, - PluginClosedEvent, ) from vision_agents.core.events.manager import EventManager @@ -282,12 +281,3 @@ async def send( async def close(self): """Close the TTS service and release any resources.""" - self.events.send( - PluginClosedEvent( - session_id=self.session_id, - plugin_name=self.provider_name, - plugin_type="TTS", - provider=self.provider_name, - cleanup_successful=True, - ) - ) From 52b7365a432d17930c8da54113933fc3016b57f1 Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Fri, 23 Jan 2026 17:09:18 +0100 Subject: [PATCH 5/8] Add `debug` param to `serve` command --- .../vision_agents/core/runner/runner.py | 44 +++++++++++++------ 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/agents-core/vision_agents/core/runner/runner.py b/agents-core/vision_agents/core/runner/runner.py index 66027add8..754b2d0e7 100644 --- a/agents-core/vision_agents/core/runner/runner.py +++ b/agents-core/vision_agents/core/runner/runner.py @@ -26,10 +26,6 @@ logger = logging.getLogger(__name__) -# TODO: -# - Figure out how to serialize the agent config into some dict -# - Docs - asyncio_logger = logging.getLogger("asyncio") @@ -175,18 +171,17 @@ def serve( port: int = 8000, agents_log_level: str = "INFO", http_log_level: str = "INFO", - ): + debug: bool = False, + ) -> None: """ Start the HTTP server that spawns agents to the calls. Args: - host: - port: - agents_log_level: - http_log_level: - - Returns: - + host: Host address to bind the server to. + port: Port number for the server. + agents_log_level: Logging level for agent-related logs. + http_log_level: Logging level for FastAPI and uvicorn logs. + debug: Enable asyncio debug mode. """ # Configure loggers if they're not already configured configure_sdk_logger( @@ -201,9 +196,22 @@ def serve( warnings.filterwarnings( "ignore", category=RuntimeWarning, module="dataclasses_json.core" ) + + # Get the policy's loop and enable debug + asyncio.get_event_loop().set_debug(debug) + uvicorn.run(self.fast_api, host=host, port=port, log_config=None) def _create_fastapi_app(self, options: ServeOptions) -> FastAPI: + """ + Create and configure a FastAPI application for serving agents. + + Args: + options: Configuration options for the server. + + Returns: + Configured FastAPI application instance. + """ app = FastAPI(lifespan=lifespan) app.state.launcher = self._launcher app.state.options = self._serve_options @@ -226,9 +234,9 @@ def _create_fastapi_app(self, options: ServeOptions) -> FastAPI: ) return app - def cli(self): + def cli(self) -> None: """ - Run the CLI + Run the command-line interface with `run` and `serve` subcommands. """ @click.group() @@ -324,11 +332,18 @@ def run_cmd( default="INFO", help="Set the logging level for FastAPI and uvicorn", ) + @click.option( + "--debug", + is_flag=True, + default=False, + help="Enable asyncio debug mode", + ) def serve_cmd( host: str, port: int, agents_log_level: str, http_log_level: str, + debug: bool, ) -> None: """ Start the HTTP server that spawns agents to the calls. @@ -338,6 +353,7 @@ def serve_cmd( port=port, agents_log_level=agents_log_level.upper(), http_log_level=http_log_level.upper(), + debug=debug, ) cli_() From a090dc7143e9c17080dc1ada211a57fc8e8a8ab7 Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Fri, 23 Jan 2026 17:28:58 +0100 Subject: [PATCH 6/8] Fix __all__ mentioning deleted classes --- agents-core/vision_agents/core/events/__init__.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/agents-core/vision_agents/core/events/__init__.py b/agents-core/vision_agents/core/events/__init__.py index d41954ce7..1a22577af 100644 --- a/agents-core/vision_agents/core/events/__init__.py +++ b/agents-core/vision_agents/core/events/__init__.py @@ -122,9 +122,6 @@ "AudioFormat", "BaseEvent", "PluginBaseEvent", - "PluginInitializedEvent", - "PluginClosedEvent", - "PluginErrorEvent", "VideoProcessorDetectionEvent", "EventManager", ] From 4a9df9dcf5cf1b8fed6cec28df0f0056ce64a8b3 Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Fri, 23 Jan 2026 17:37:18 +0100 Subject: [PATCH 7/8] Reset `EventManager._processing_task` --- agents-core/vision_agents/core/events/manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/agents-core/vision_agents/core/events/manager.py b/agents-core/vision_agents/core/events/manager.py index 1200123ed..75f8f0c5a 100644 --- a/agents-core/vision_agents/core/events/manager.py +++ b/agents-core/vision_agents/core/events/manager.py @@ -551,3 +551,4 @@ async def _process_single_event(self, event): def stop(self): if self._processing_task and not self._processing_task.done(): self._processing_task.cancel() + self._processing_task = None From 697d9e30899d489ffce39fa414ade67d17d3b8c2 Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Fri, 23 Jan 2026 17:44:49 +0100 Subject: [PATCH 8/8] Change how debug is enabled --- agents-core/vision_agents/core/runner/runner.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/agents-core/vision_agents/core/runner/runner.py b/agents-core/vision_agents/core/runner/runner.py index 754b2d0e7..687563cc9 100644 --- a/agents-core/vision_agents/core/runner/runner.py +++ b/agents-core/vision_agents/core/runner/runner.py @@ -1,5 +1,6 @@ import asyncio import logging +import os import warnings from typing import Optional from uuid import uuid4 @@ -197,9 +198,9 @@ def serve( "ignore", category=RuntimeWarning, module="dataclasses_json.core" ) - # Get the policy's loop and enable debug - asyncio.get_event_loop().set_debug(debug) - + # Enable asyncio debug via environment variable before uvicorn creates its loop + if debug: + os.environ.setdefault("PYTHONASYNCIODEBUG", "1") uvicorn.run(self.fast_api, host=host, port=port, log_config=None) def _create_fastapi_app(self, options: ServeOptions) -> FastAPI: