From ea64b932e9908d0e7088abbd10099f9d302ce42e Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Wed, 21 Jan 2026 14:10:05 +0100 Subject: [PATCH 1/9] AgentLauncher: implement max_sessions_per_call and max_concurrent_sessions limits --- .../core/agents/agent_launcher.py | 69 +++- .../vision_agents/core/agents/exceptions.py | 8 + tests/test_agents/test_agent_launcher.py | 297 +++++++++++++++++- 3 files changed, 348 insertions(+), 26 deletions(-) create mode 100644 agents-core/vision_agents/core/agents/exceptions.py diff --git a/agents-core/vision_agents/core/agents/agent_launcher.py b/agents-core/vision_agents/core/agents/agent_launcher.py index a0d755688..68d0140d0 100644 --- a/agents-core/vision_agents/core/agents/agent_launcher.py +++ b/agents-core/vision_agents/core/agents/agent_launcher.py @@ -1,8 +1,8 @@ import asyncio import logging -import weakref from dataclasses import dataclass from datetime import datetime, timezone +from functools import partial from typing import ( TYPE_CHECKING, Any, @@ -14,6 +14,8 @@ from vision_agents.core.utils.utils import await_or_run, cancel_and_wait from vision_agents.core.warmup import Warmable, WarmupCache +from .exceptions import MaxConcurrentSessionsExceeded, MaxSessionsPerCallExceeded + if TYPE_CHECKING: from .agents import Agent @@ -57,7 +59,10 @@ def __init__( create_agent: Callable[..., "Agent" | Coroutine[Any, Any, "Agent"]], join_call: Callable[["Agent", str, str], Coroutine], agent_idle_timeout: float = 60.0, - agent_idle_cleanup_interval: float = 5.0, + max_concurrent_sessions: Optional[int] = 50, + max_sessions_per_call: Optional[int] = None, + max_session_duration_seconds: Optional[float] = None, + cleanup_interval: float = 5.0, ): """ Initialize the agent launcher. @@ -74,20 +79,32 @@ def __init__( self._warmup_lock = asyncio.Lock() self._warmup_cache = WarmupCache() + if max_concurrent_sessions is not None and max_concurrent_sessions <= 0: + raise ValueError("max_concurrent_sessions must be > 0 or None") + self._max_concurrent_sessions = max_concurrent_sessions + if max_sessions_per_call is not None and max_sessions_per_call <= 0: + raise ValueError("max_sessions_per_call must be > 0 or None") + self._max_sessions_per_call = max_sessions_per_call + if ( + max_session_duration_seconds is not None + and max_session_duration_seconds <= 0 + ): + raise ValueError("max_session_duration_seconds must be > 0 or None") + self._max_session_duration_seconds = max_session_duration_seconds + if agent_idle_timeout < 0: raise ValueError("agent_idle_timeout must be >= 0") self._agent_idle_timeout = agent_idle_timeout - if agent_idle_cleanup_interval <= 0: - raise ValueError("agent_idle_cleanup_interval must be > 0") - self._agent_idle_cleanup_interval = agent_idle_cleanup_interval - - self._active_agents: weakref.WeakSet[Agent] = weakref.WeakSet() + if cleanup_interval <= 0: + raise ValueError("cleanup_interval must be > 0") + self._cleanup_interval: float = cleanup_interval self._running = False self._cleanup_task: Optional[asyncio.Task] = None self._warmed_up: bool = False self._sessions: dict[str, AgentSession] = {} + self._calls: dict[str, set[str]] = {} async def start(self): if self._running: @@ -158,7 +175,6 @@ async def launch(self, **kwargs) -> "Agent": """ agent: "Agent" = await await_or_run(self._create_agent, **kwargs) await self._warmup_agent(agent) - self._active_agents.add(agent) return agent async def start_session( @@ -168,6 +184,20 @@ async def start_session( created_by: Optional[Any] = None, video_track_override_path: Optional[str] = None, ) -> AgentSession: + sessions_total = len(self._sessions) + if len(self._sessions) == self._max_concurrent_sessions: + raise MaxConcurrentSessionsExceeded( + f"Maximum concurrent sessions exceeded:" + f" {sessions_total}/{self._max_concurrent_sessions} sessions active" + ) + + call_sessions_total = len(self._calls.get(call_id, set())) + if call_sessions_total == self._max_sessions_per_call: + raise MaxSessionsPerCallExceeded( + f"Maximum sessions exceeded for call " + f"'{call_id}': {call_sessions_total}/{self._max_sessions_per_call}" + ) + agent: "Agent" = await self.launch() if video_track_override_path: agent.set_video_track_override_path(video_track_override_path) @@ -177,10 +207,16 @@ async def start_session( ) # Remove the session when the task is done - def _done_cb(_, agent_id_=agent.id): - self._sessions.pop(agent_id_, None) - - task.add_done_callback(_done_cb) + # or when the AgentSession is garbage-collected + # in case the done callback wasn't fired + def _finalizer(session_id_: str, call_id_: str, *_): + session_ = self._sessions.pop(session_id_, None) + if session_ is not None: + call_sessions = self._calls.get(call_id_, set()) + if call_sessions: + call_sessions.discard(session_id_) + + task.add_done_callback(partial(_finalizer, agent.id, call_id)) session = AgentSession( agent=agent, task=task, @@ -189,6 +225,7 @@ def _done_cb(_, agent_id_=agent.id): created_by=created_by, ) self._sessions[agent.id] = session + self._calls.setdefault(call_id, set()).add(agent.id) logger.info(f"Start agent session with id {session.id}") return session @@ -209,6 +246,9 @@ async def close_session(self, session_id: str, wait: bool = False) -> bool: if session is None: # The session is either closed or doesn't exist, exit early return False + call_sessions = self._calls.get(session.call_id) + if call_sessions: + call_sessions.discard(session.id) logger.info(f"Closing agent session with id {session.id}") if wait: @@ -266,7 +306,8 @@ async def _cleanup_idle_agents(self) -> None: while self._running: # Collect idle agents first to close them all at once idle_agents = [] - for agent in self._active_agents: + for session in self._sessions.values(): + agent = session.agent agent_idle_for = agent.idle_for() if agent_idle_for >= self._agent_idle_timeout: logger.info( @@ -287,7 +328,7 @@ async def _cleanup_idle_agents(self) -> None: exc_info=r, ) - await asyncio.sleep(self._agent_idle_cleanup_interval) + await asyncio.sleep(self._cleanup_interval) async def __aenter__(self): await self.start() diff --git a/agents-core/vision_agents/core/agents/exceptions.py b/agents-core/vision_agents/core/agents/exceptions.py new file mode 100644 index 000000000..eb727dce3 --- /dev/null +++ b/agents-core/vision_agents/core/agents/exceptions.py @@ -0,0 +1,8 @@ +class SessionLimitExceeded(Exception): + pass + + +class MaxConcurrentSessionsExceeded(SessionLimitExceeded): ... + + +class MaxSessionsPerCallExceeded(SessionLimitExceeded): ... diff --git a/tests/test_agents/test_agent_launcher.py b/tests/test_agents/test_agent_launcher.py index 11b901e06..7a0fd7185 100644 --- a/tests/test_agents/test_agent_launcher.py +++ b/tests/test_agents/test_agent_launcher.py @@ -4,10 +4,15 @@ import pytest from vision_agents.core import Agent, AgentLauncher, User +from vision_agents.core.agents.exceptions import ( + MaxConcurrentSessionsExceeded, + MaxSessionsPerCallExceeded, +) from vision_agents.core.events import EventManager from vision_agents.core.llm import LLM from vision_agents.core.llm.llm import LLMResponseEvent from vision_agents.core.tts import TTS +from vision_agents.core.utils.utils import cancel_and_wait from vision_agents.core.warmup import Warmable @@ -29,7 +34,7 @@ async def simple_response(self, *_, **__) -> LLMResponseEvent[Any]: async def on_warmup(self) -> bool: return True - async def on_warmed_up(self, *_) -> None: + def on_warmed_up(self, *_) -> None: self.warmed_up = True @@ -40,9 +45,8 @@ async def stream_edge_mock() -> MagicMock: return mock -async def join_call_noop( - agent: Agent, call_type: str, call_id: str, **kwargs -) -> None: ... +async def join_call_noop(agent: Agent, call_type: str, call_id: str, **kwargs) -> None: + await asyncio.sleep(10) class TestAgentLauncher: @@ -78,7 +82,7 @@ async def create_agent(**kwargs) -> Agent: agent = await launcher.launch() assert agent - async def test_idle_agents_stopped(self, stream_edge_mock): + async def test_idle_sessions_stopped(self, stream_edge_mock): llm = DummyLLM() tts = DummyTTS() @@ -94,22 +98,22 @@ async def create_agent(**kwargs) -> Agent: create_agent=create_agent, join_call=join_call_noop, agent_idle_timeout=1.0, - agent_idle_cleanup_interval=0.5, + cleanup_interval=0.5, ) with patch.object(Agent, "idle_for", return_value=10): # Start the launcher internals async with launcher: # Launch a couple of idle agents - agent1 = await launcher.launch() - agent2 = await launcher.launch() + session1 = await launcher.start_session(call_id="1") + session2 = await launcher.start_session(call_id="2") # Sleep 2s to let the launcher clean up the agents await asyncio.sleep(2) # The agents must be closed - assert agent1.closed - assert agent2.closed + assert session1.agent.closed + assert session2.agent.closed - async def test_idle_agents_alive_with_idle_timeout_zero(self, stream_edge_mock): + async def test_idle_sessions_alive_with_idle_timeout_zero(self, stream_edge_mock): llm = DummyLLM() tts = DummyTTS() @@ -155,7 +159,7 @@ async def create_agent(**kwargs) -> Agent: create_agent=create_agent, join_call=join_call_noop, agent_idle_timeout=1.0, - agent_idle_cleanup_interval=0.5, + cleanup_interval=0.5, ) with patch.object(Agent, "idle_for", return_value=0): # Start the launcher internals @@ -298,3 +302,272 @@ async def join_call( assert session1.finished assert session2.finished assert session3.finished + + async def test_session_cleaned_up_after_finish(self, stream_edge_mock): + llm = DummyLLM() + tts = DummyTTS() + + async def create_agent(**kwargs) -> Agent: + return Agent( + llm=llm, + tts=tts, + edge=stream_edge_mock, + agent_user=User(name="test"), + ) + + async def join_call( + agent: Agent, call_type: str, call_id: str, **kwargs + ) -> None: + await asyncio.sleep(1) + + launcher = AgentLauncher(create_agent=create_agent, join_call=join_call) + session = await launcher.start_session(call_id="test", call_type="default") + assert session + + await session.wait() + assert session.finished + # The session becomes unavailable after it's done + assert launcher.get_session(session_id=session.id) is None + + async def test_session_cleaned_up_after_cancel(self, stream_edge_mock): + llm = DummyLLM() + tts = DummyTTS() + + async def create_agent(**kwargs) -> Agent: + return Agent( + llm=llm, + tts=tts, + edge=stream_edge_mock, + agent_user=User(name="test"), + ) + + async def join_call( + agent: Agent, call_type: str, call_id: str, **kwargs + ) -> None: + await asyncio.sleep(1) + + launcher = AgentLauncher(create_agent=create_agent, join_call=join_call) + session = await launcher.start_session(call_id="test", call_type="default") + assert session + + await cancel_and_wait(session.task) + assert session.finished + # The session becomes unavailable if it was cancelled + assert launcher.get_session(session_id=session.id) is None + await launcher.stop() + + async def test_max_concurrent_agents_invalid(self, stream_edge_mock): + async def create_agent(**kwargs) -> Agent: + return Agent( + llm=DummyLLM(), + tts=DummyTTS(), + edge=stream_edge_mock, + agent_user=User(name="test"), + ) + + with pytest.raises(ValueError, match="max_concurrent_agents must be > 0"): + AgentLauncher( + create_agent=create_agent, + join_call=join_call_noop, + max_concurrent_sessions=0, + ) + + with pytest.raises(ValueError, match="max_concurrent_agents must be > 0"): + AgentLauncher( + create_agent=create_agent, + join_call=join_call_noop, + max_concurrent_sessions=-1, + ) + + async def test_max_sessions_per_call_invalid(self, stream_edge_mock): + async def create_agent(**kwargs) -> Agent: + return Agent( + llm=DummyLLM(), + tts=DummyTTS(), + edge=stream_edge_mock, + agent_user=User(name="test"), + ) + + with pytest.raises(ValueError, match="max_sessions_per_call must be > 0"): + AgentLauncher( + create_agent=create_agent, + join_call=join_call_noop, + max_sessions_per_call=0, + ) + with pytest.raises(ValueError, match="max_sessions_per_call must be > 0"): + AgentLauncher( + create_agent=create_agent, + join_call=join_call_noop, + max_sessions_per_call=-1, + ) + + async def test_max_concurrent_agents_exceeded(self, stream_edge_mock): + async def create_agent(**kwargs) -> Agent: + return Agent( + llm=DummyLLM(), + tts=DummyTTS(), + edge=stream_edge_mock, + agent_user=User(name="test"), + ) + + launcher = AgentLauncher( + create_agent=create_agent, + join_call=join_call_noop, + max_concurrent_sessions=2, + ) + session1 = await launcher.start_session(call_id="call1") + session2 = await launcher.start_session(call_id="call2") + + with pytest.raises(MaxConcurrentSessionsExceeded): + await launcher.start_session(call_id="call3") + + # Close one session and try to create a new one again + await launcher.close_session(session_id=session1.id) + session3 = await launcher.start_session(call_id="call3") + assert session3 is not None + await launcher.stop() + + async def test_max_concurrent_agents_can_create_after_session_ends( + self, stream_edge_mock + ): + async def create_agent(**kwargs) -> Agent: + return Agent( + llm=DummyLLM(), + tts=DummyTTS(), + edge=stream_edge_mock, + agent_user=User(name="test"), + ) + + async def join_call(*args, **kwargs): + await asyncio.sleep(1) + + launcher = AgentLauncher( + create_agent=create_agent, + join_call=join_call, + max_concurrent_sessions=2, + ) + session1 = await launcher.start_session(call_id="call1") + session2 = await launcher.start_session(call_id="call2") + with pytest.raises(MaxConcurrentSessionsExceeded): + await launcher.start_session(call_id="call3") + + await session1.wait() + + # Can create a new session when the previous one ends + session3 = await launcher.start_session(call_id="call3") + assert session3 is not None + await launcher.stop() + + async def test_max_sessions_per_call_exceeded(self, stream_edge_mock): + async def create_agent(**kwargs) -> Agent: + return Agent( + llm=DummyLLM(), + tts=DummyTTS(), + edge=stream_edge_mock, + agent_user=User(name="test"), + ) + + launcher = AgentLauncher( + create_agent=create_agent, + join_call=join_call_noop, + max_sessions_per_call=2, + ) + session1 = await launcher.start_session(call_id="same_call") + session2 = await launcher.start_session(call_id="same_call") + + with pytest.raises(MaxSessionsPerCallExceeded): + await launcher.start_session(call_id="same_call") + + # Different call should still work + session3 = await launcher.start_session(call_id="call2") + assert session3 is not None + + # Close one session + await launcher.close_session(session_id=session1.id, wait=True) + + # Now we should be able to start a new session for the same call + session4 = await launcher.start_session(call_id="same_call") + assert session4 is not None + + await launcher.stop() + + async def test_max_sessions_per_call_can_create_after_session_ends( + self, stream_edge_mock + ): + async def create_agent(**kwargs) -> Agent: + return Agent( + llm=DummyLLM(), + tts=DummyTTS(), + edge=stream_edge_mock, + agent_user=User(name="test"), + ) + + async def join_call(*args, **kwargs): + await asyncio.sleep(1) + + launcher = AgentLauncher( + create_agent=create_agent, + join_call=join_call, + max_sessions_per_call=2, + ) + session1 = await launcher.start_session(call_id="same_call") + session2 = await launcher.start_session(call_id="same_call") + + with pytest.raises(MaxSessionsPerCallExceeded): + await launcher.start_session(call_id="same_call") + + await session1.wait() + # Different call should still work + session3 = await launcher.start_session(call_id="same_call") + assert session3 is not None + + await launcher.stop() + + async def test_max_concurrent_agents_none_allows_unlimited(self, stream_edge_mock): + async def create_agent(**kwargs) -> Agent: + return Agent( + llm=DummyLLM(), + tts=DummyTTS(), + edge=stream_edge_mock, + agent_user=User(name="test"), + ) + + launcher = AgentLauncher( + create_agent=create_agent, + join_call=join_call_noop, + max_concurrent_sessions=None, + ) + # Start many sessions - should not raise + sessions = [] + for i in range(10): + session = await launcher.start_session(call_id=f"call{i}") + sessions.append(session) + + assert len(sessions) == 10 + + await launcher.stop() + + async def test_max_sessions_per_call_none_allows_unlimited(self, stream_edge_mock): + async def create_agent(**kwargs) -> Agent: + return Agent( + llm=DummyLLM(), + tts=DummyTTS(), + edge=stream_edge_mock, + agent_user=User(name="test"), + ) + + launcher = AgentLauncher( + create_agent=create_agent, + join_call=join_call_noop, + max_concurrent_sessions=None, + max_sessions_per_call=None, + ) + # Start many sessions for the same call - should not raise + sessions = [] + for i in range(10): + session = await launcher.start_session(call_id="same_call") + sessions.append(session) + + assert len(sessions) == 10 + + await launcher.stop() From ce889d364ae415b78646755f9edf65c3d87d51c2 Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Wed, 21 Jan 2026 14:25:45 +0100 Subject: [PATCH 2/9] AgentLauncher: update tests --- tests/test_agents/test_agent_launcher.py | 140 +++++++++++------------ 1 file changed, 68 insertions(+), 72 deletions(-) diff --git a/tests/test_agents/test_agent_launcher.py b/tests/test_agents/test_agent_launcher.py index 7a0fd7185..767bd3a57 100644 --- a/tests/test_agents/test_agent_launcher.py +++ b/tests/test_agents/test_agent_launcher.py @@ -294,11 +294,11 @@ async def join_call( await asyncio.sleep(10) launcher = AgentLauncher(create_agent=create_agent, join_call=join_call) - session1 = await launcher.start_session(call_id="test", call_type="default") - session2 = await launcher.start_session(call_id="test", call_type="default") - session3 = await launcher.start_session(call_id="test", call_type="default") + async with launcher: + session1 = await launcher.start_session(call_id="test", call_type="default") + session2 = await launcher.start_session(call_id="test", call_type="default") + session3 = await launcher.start_session(call_id="test", call_type="default") - await launcher.stop() assert session1.finished assert session2.finished assert session3.finished @@ -347,14 +347,14 @@ async def join_call( await asyncio.sleep(1) launcher = AgentLauncher(create_agent=create_agent, join_call=join_call) - session = await launcher.start_session(call_id="test", call_type="default") - assert session + async with launcher: + session = await launcher.start_session(call_id="test", call_type="default") + assert session - await cancel_and_wait(session.task) - assert session.finished - # The session becomes unavailable if it was cancelled - assert launcher.get_session(session_id=session.id) is None - await launcher.stop() + await cancel_and_wait(session.task) + assert session.finished + # The session becomes unavailable if it was cancelled + assert launcher.get_session(session_id=session.id) is None async def test_max_concurrent_agents_invalid(self, stream_edge_mock): async def create_agent(**kwargs) -> Agent: @@ -365,14 +365,14 @@ async def create_agent(**kwargs) -> Agent: agent_user=User(name="test"), ) - with pytest.raises(ValueError, match="max_concurrent_agents must be > 0"): + with pytest.raises(ValueError, match="max_concurrent_sessions must be > 0"): AgentLauncher( create_agent=create_agent, join_call=join_call_noop, max_concurrent_sessions=0, ) - with pytest.raises(ValueError, match="max_concurrent_agents must be > 0"): + with pytest.raises(ValueError, match="max_concurrent_sessions must be > 0"): AgentLauncher( create_agent=create_agent, join_call=join_call_noop, @@ -415,17 +415,17 @@ async def create_agent(**kwargs) -> Agent: join_call=join_call_noop, max_concurrent_sessions=2, ) - session1 = await launcher.start_session(call_id="call1") - session2 = await launcher.start_session(call_id="call2") + async with launcher: + session1 = await launcher.start_session(call_id="call1") + session2 = await launcher.start_session(call_id="call2") - with pytest.raises(MaxConcurrentSessionsExceeded): - await launcher.start_session(call_id="call3") + with pytest.raises(MaxConcurrentSessionsExceeded): + await launcher.start_session(call_id="call3") - # Close one session and try to create a new one again - await launcher.close_session(session_id=session1.id) - session3 = await launcher.start_session(call_id="call3") - assert session3 is not None - await launcher.stop() + # Close one session and try to create a new one again + await launcher.close_session(session_id=session1.id) + session3 = await launcher.start_session(call_id="call3") + assert session3 is not None async def test_max_concurrent_agents_can_create_after_session_ends( self, stream_edge_mock @@ -446,17 +446,17 @@ async def join_call(*args, **kwargs): join_call=join_call, max_concurrent_sessions=2, ) - session1 = await launcher.start_session(call_id="call1") - session2 = await launcher.start_session(call_id="call2") - with pytest.raises(MaxConcurrentSessionsExceeded): - await launcher.start_session(call_id="call3") + async with launcher: + session1 = await launcher.start_session(call_id="call1") + session2 = await launcher.start_session(call_id="call2") + with pytest.raises(MaxConcurrentSessionsExceeded): + await launcher.start_session(call_id="call3") - await session1.wait() + await session1.wait() - # Can create a new session when the previous one ends - session3 = await launcher.start_session(call_id="call3") - assert session3 is not None - await launcher.stop() + # Can create a new session when the previous one ends + session3 = await launcher.start_session(call_id="call3") + assert session3 is not None async def test_max_sessions_per_call_exceeded(self, stream_edge_mock): async def create_agent(**kwargs) -> Agent: @@ -472,24 +472,23 @@ async def create_agent(**kwargs) -> Agent: join_call=join_call_noop, max_sessions_per_call=2, ) - session1 = await launcher.start_session(call_id="same_call") - session2 = await launcher.start_session(call_id="same_call") - - with pytest.raises(MaxSessionsPerCallExceeded): - await launcher.start_session(call_id="same_call") + async with launcher: + session1 = await launcher.start_session(call_id="same_call") + session2 = await launcher.start_session(call_id="same_call") - # Different call should still work - session3 = await launcher.start_session(call_id="call2") - assert session3 is not None + with pytest.raises(MaxSessionsPerCallExceeded): + await launcher.start_session(call_id="same_call") - # Close one session - await launcher.close_session(session_id=session1.id, wait=True) + # Different call should still work + session3 = await launcher.start_session(call_id="call2") + assert session3 is not None - # Now we should be able to start a new session for the same call - session4 = await launcher.start_session(call_id="same_call") - assert session4 is not None + # Close one session + await launcher.close_session(session_id=session1.id, wait=True) - await launcher.stop() + # Now we should be able to start a new session for the same call + session4 = await launcher.start_session(call_id="same_call") + assert session4 is not None async def test_max_sessions_per_call_can_create_after_session_ends( self, stream_edge_mock @@ -510,18 +509,17 @@ async def join_call(*args, **kwargs): join_call=join_call, max_sessions_per_call=2, ) - session1 = await launcher.start_session(call_id="same_call") - session2 = await launcher.start_session(call_id="same_call") + async with launcher: + session1 = await launcher.start_session(call_id="same_call") + session2 = await launcher.start_session(call_id="same_call") - with pytest.raises(MaxSessionsPerCallExceeded): - await launcher.start_session(call_id="same_call") + with pytest.raises(MaxSessionsPerCallExceeded): + await launcher.start_session(call_id="same_call") - await session1.wait() - # Different call should still work - session3 = await launcher.start_session(call_id="same_call") - assert session3 is not None - - await launcher.stop() + await session1.wait() + # Can create a new session when the previous one ends + session3 = await launcher.start_session(call_id="same_call") + assert session3 is not None async def test_max_concurrent_agents_none_allows_unlimited(self, stream_edge_mock): async def create_agent(**kwargs) -> Agent: @@ -537,15 +535,14 @@ async def create_agent(**kwargs) -> Agent: join_call=join_call_noop, max_concurrent_sessions=None, ) - # Start many sessions - should not raise - sessions = [] - for i in range(10): - session = await launcher.start_session(call_id=f"call{i}") - sessions.append(session) - - assert len(sessions) == 10 + async with launcher: + # Start many sessions - should not raise + sessions = [] + for i in range(10): + session = await launcher.start_session(call_id=f"call{i}") + sessions.append(session) - await launcher.stop() + assert len(sessions) == 10 async def test_max_sessions_per_call_none_allows_unlimited(self, stream_edge_mock): async def create_agent(**kwargs) -> Agent: @@ -562,12 +559,11 @@ async def create_agent(**kwargs) -> Agent: max_concurrent_sessions=None, max_sessions_per_call=None, ) - # Start many sessions for the same call - should not raise - sessions = [] - for i in range(10): - session = await launcher.start_session(call_id="same_call") - sessions.append(session) - - assert len(sessions) == 10 - - await launcher.stop() + async with launcher: + # Start many sessions for the same call - should not raise + sessions = [] + for i in range(10): + session = await launcher.start_session(call_id="same_call") + sessions.append(session) + + assert len(sessions) == 10 From 4e94b3e5f286027fd6b23061658b2c797f81dd46 Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Wed, 21 Jan 2026 14:33:02 +0100 Subject: [PATCH 3/9] Fix ruff --- tests/test_agents/test_agent_launcher.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_agents/test_agent_launcher.py b/tests/test_agents/test_agent_launcher.py index 767bd3a57..6711fd63b 100644 --- a/tests/test_agents/test_agent_launcher.py +++ b/tests/test_agents/test_agent_launcher.py @@ -417,7 +417,7 @@ async def create_agent(**kwargs) -> Agent: ) async with launcher: session1 = await launcher.start_session(call_id="call1") - session2 = await launcher.start_session(call_id="call2") + await launcher.start_session(call_id="call2") with pytest.raises(MaxConcurrentSessionsExceeded): await launcher.start_session(call_id="call3") @@ -448,7 +448,7 @@ async def join_call(*args, **kwargs): ) async with launcher: session1 = await launcher.start_session(call_id="call1") - session2 = await launcher.start_session(call_id="call2") + await launcher.start_session(call_id="call2") with pytest.raises(MaxConcurrentSessionsExceeded): await launcher.start_session(call_id="call3") @@ -474,7 +474,7 @@ async def create_agent(**kwargs) -> Agent: ) async with launcher: session1 = await launcher.start_session(call_id="same_call") - session2 = await launcher.start_session(call_id="same_call") + await launcher.start_session(call_id="same_call") with pytest.raises(MaxSessionsPerCallExceeded): await launcher.start_session(call_id="same_call") @@ -511,7 +511,7 @@ async def join_call(*args, **kwargs): ) async with launcher: session1 = await launcher.start_session(call_id="same_call") - session2 = await launcher.start_session(call_id="same_call") + await launcher.start_session(call_id="same_call") with pytest.raises(MaxSessionsPerCallExceeded): await launcher.start_session(call_id="same_call") From fdb5bcfc39452424940786179ed7ce22e20dec21 Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Wed, 21 Jan 2026 18:01:29 +0100 Subject: [PATCH 4/9] Fix cancellation handling in `run` mode --- agents-core/vision_agents/core/runner/runner.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/agents-core/vision_agents/core/runner/runner.py b/agents-core/vision_agents/core/runner/runner.py index 01bd3e65b..80794eb6b 100644 --- a/agents-core/vision_agents/core/runner/runner.py +++ b/agents-core/vision_agents/core/runner/runner.py @@ -147,6 +147,8 @@ async def _run(): call_id, call_type, video_track_override_path=video_track_override ) await session.wait() + except asyncio.CancelledError: + logger.info("The session is cancelled, shutting down gracefully...") except KeyboardInterrupt: logger.info("🛑 Received interrupt signal, shutting down gracefully...") except Exception as e: From 0de6b68d106c6df334455154a08fce7456fdad2d Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Wed, 21 Jan 2026 18:01:50 +0100 Subject: [PATCH 5/9] Add docstrings --- .../core/agents/agent_launcher.py | 147 +++++++++++++++--- .../vision_agents/core/agents/agents.py | 12 ++ tests/test_agents/test_agent_launcher.py | 90 ++++++++++- 3 files changed, 224 insertions(+), 25 deletions(-) diff --git a/agents-core/vision_agents/core/agents/agent_launcher.py b/agents-core/vision_agents/core/agents/agent_launcher.py index 68d0140d0..cb3ed1321 100644 --- a/agents-core/vision_agents/core/agents/agent_launcher.py +++ b/agents-core/vision_agents/core/agents/agent_launcher.py @@ -24,6 +24,14 @@ @dataclass class AgentSession: + """ + Represents an active agent session within a call. + + An AgentSession wraps an Agent instance along with metadata about the session, + including when it started, which call it belongs to, and the async task running + the agent's call handler. + """ + agent: "Agent" call_id: str started_at: datetime @@ -32,10 +40,12 @@ class AgentSession: @property def finished(self) -> bool: + """Return True if the session task has completed.""" return self.task.done() @property def id(self) -> str: + """Return the session ID (same as the agent ID).""" return self.agent.id async def wait(self): @@ -44,6 +54,25 @@ async def wait(self): """ return await self.task + def on_call_for(self) -> float: + """ + Return the number of seconds for how long the agent has been on the call. + Returns 0.0 if the agent has not joined a call yet. + + Returns: + Duration in seconds since the agent joined the call, or 0.0 if not on a call. + """ + return self.agent.on_call_for() + + def idle_for(self) -> float: + """ + Return the idle time for this session if there are no other participants except the agent. + + Returns: + Idle time in seconds, or 0.0 if the session is active. + """ + return self.agent.idle_for() + # TODO: Rename to `AgentManager`. class AgentLauncher: @@ -59,7 +88,7 @@ def __init__( create_agent: Callable[..., "Agent" | Coroutine[Any, Any, "Agent"]], join_call: Callable[["Agent", str, str], Coroutine], agent_idle_timeout: float = 60.0, - max_concurrent_sessions: Optional[int] = 50, + max_concurrent_sessions: Optional[int] = None, max_sessions_per_call: Optional[int] = None, max_session_duration_seconds: Optional[float] = None, cleanup_interval: float = 5.0, @@ -68,11 +97,19 @@ def __init__( Initialize the agent launcher. Args: - create_agent: A function that creates and returns an Agent instance - join_call: Optional function that handles joining a call with the agent - agent_idle_timeout: Optional timeout in seconds for agent to stay alone on the call. Default - `60.0`. - `0` means idle agents won't leave the call until it's ended. - + create_agent: A function that creates and returns an Agent instance. + join_call: A coroutine function that handles joining a call with the agent. + agent_idle_timeout: Timeout in seconds for an agent to stay alone on a call + before being automatically closed. Default is 60.0 seconds. + Set to 0 to disable idle timeout (agents won't leave until the call ends). + max_concurrent_sessions: Maximum number of concurrent sessions allowed across + all calls. Default is None (unlimited). + max_sessions_per_call: Maximum number of sessions allowed per call_id. + Default is None (unlimited). + max_session_duration_seconds: Maximum duration in seconds for a session + before it is automatically closed. Default is None (unlimited). + cleanup_interval: Interval in seconds between cleanup checks for idle + or expired sessions. Default is 5.0 seconds. """ self._create_agent = create_agent self._join_call = join_call @@ -106,16 +143,31 @@ def __init__( self._sessions: dict[str, AgentSession] = {} self._calls: dict[str, set[str]] = {} - async def start(self): + async def start(self) -> None: + """ + Start the agent launcher. + + This method warms up the agent components and starts the background + cleanup task for managing idle and expired sessions. + + Raises: + RuntimeError: If the launcher is already running. + """ if self._running: raise RuntimeError("AgentLauncher is already running") logger.debug("Starting AgentLauncher") self._running = True await self.warmup() - self._cleanup_task = asyncio.create_task(self._cleanup_idle_agents()) + self._cleanup_task = asyncio.create_task(self._cleanup_idle_sessions()) logger.debug("AgentLauncher started") - async def stop(self): + async def stop(self) -> None: + """ + Stop the agent launcher and close all active sessions. + + This method cancels the cleanup task, then cancels and waits for + all active session tasks to complete. + """ logger.debug("Stopping AgentLauncher") self._running = False if self._cleanup_task: @@ -153,14 +205,17 @@ async def warmup(self) -> None: @property def warmed_up(self) -> bool: + """Return True if the agent components have been warmed up.""" return self._warmed_up @property def running(self) -> bool: + """Return True if the launcher is currently running.""" return self._running @property def ready(self) -> bool: + """Return True if the launcher is warmed up and running.""" return self.warmed_up and self.running async def launch(self, **kwargs) -> "Agent": @@ -184,6 +239,28 @@ async def start_session( created_by: Optional[Any] = None, video_track_override_path: Optional[str] = None, ) -> AgentSession: + """ + Start a new agent session for a call. + + Creates a new agent, joins the specified call, and returns an AgentSession + object to track the session. + + Args: + call_id: Unique identifier for the call to join. + call_type: Type of call. Default is "default". + created_by: Optional metadata about who/what created this session. + video_track_override_path: Optional path to a video file to use + instead of a live video track. + + Returns: + An AgentSession object representing the new session. + + Raises: + MaxConcurrentSessionsExceeded: If the maximum number of concurrent + sessions has been reached. + MaxSessionsPerCallExceeded: If the maximum number of sessions for + this call_id has been reached. + """ sessions_total = len(self._sessions) if len(self._sessions) == self._max_concurrent_sessions: raise MaxConcurrentSessionsExceeded( @@ -258,6 +335,15 @@ async def close_session(self, session_id: str, wait: bool = False) -> bool: return True def get_session(self, session_id: str) -> Optional[AgentSession]: + """ + Get a session by its ID. + + Args: + session_id: The session ID to look up. + + Returns: + The AgentSession if found, None otherwise. + """ return self._sessions.get(session_id) async def _warmup_agent(self, agent: "Agent") -> None: @@ -299,40 +385,55 @@ async def _warmup_agent(self, agent: "Agent") -> None: if warmup_tasks: await asyncio.gather(*warmup_tasks) - async def _cleanup_idle_agents(self) -> None: - if not self._agent_idle_timeout: + async def _cleanup_idle_sessions(self) -> None: + if not self._agent_idle_timeout and not self._max_session_duration_seconds: return while self._running: # Collect idle agents first to close them all at once - idle_agents = [] + to_close = [] for session in self._sessions.values(): agent = session.agent - agent_idle_for = agent.idle_for() - if agent_idle_for >= self._agent_idle_timeout: + on_call_for = agent.on_call_for() + idle_for = agent.idle_for() + if idle_for >= self._agent_idle_timeout: + logger.info( + f'Closing session "{session.id}" with ' + f'user_id "{agent.agent_user.id}" after being ' + f"idle for {round(idle_for, 2)}s " + f"(idle timeout is {self._agent_idle_timeout}s)" + ) + to_close.append(agent) + elif on_call_for >= self._max_session_duration_seconds: logger.info( - f'Agent with user_id "{agent.agent_user.id}" is idle for {round(agent_idle_for, 2)}s, ' - f"closing it after {self._agent_idle_timeout}s timeout" + f'Closing session "{session.id}" with user_id "{agent.agent_user.id}" ' + f"after reaching the maximum session " + f"duration of {self._max_session_duration_seconds}s" ) - idle_agents.append(agent) + to_close.append(agent) - if idle_agents: - coros = [asyncio.shield(a.close()) for a in idle_agents] + if to_close: + coros = [ + asyncio.shield(self.close_session(s.id, wait=False)) + for s in to_close + ] result = await asyncio.shield( asyncio.gather(*coros, return_exceptions=True) ) - for agent, r in zip(idle_agents, result): + for agent, r in zip(to_close, result): if isinstance(r, Exception): logger.error( - f"Failed to close idle agent with user_id {agent.agent_user.id}", + f"Failed to close agent with user_id {agent.agent_user.id}", exc_info=r, ) await asyncio.sleep(self._cleanup_interval) - async def __aenter__(self): + async def __aenter__(self) -> "AgentLauncher": + """Enter the async context manager, starting the launcher.""" await self.start() return self - async def __aexit__(self, exc_type, exc_val, exc_tb): + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + """Exit the async context manager, stopping the launcher.""" await self.stop() diff --git a/agents-core/vision_agents/core/agents/agents.py b/agents-core/vision_agents/core/agents/agents.py index 2d3c69475..26f310d7b 100644 --- a/agents-core/vision_agents/core/agents/agents.py +++ b/agents-core/vision_agents/core/agents/agents.py @@ -652,6 +652,18 @@ def idle_for(self) -> float: idle_since_adjusted = max(idle_since, self._joined_at) return time.time() - idle_since_adjusted + def on_call_for(self) -> float: + """ + Return the number of seconds for how long the agent has been on the call. + Returns 0.0 if the agent has not joined a call yet. + + Returns: + Duration in seconds since the agent joined the call, or 0.0 if not on a call. + """ + if not self._joined_at: + return 0.0 + return time.time() - self._joined_at + async def finish(self): """ Wait for the call to end gracefully. diff --git a/tests/test_agents/test_agent_launcher.py b/tests/test_agents/test_agent_launcher.py index 6711fd63b..64bdfb02f 100644 --- a/tests/test_agents/test_agent_launcher.py +++ b/tests/test_agents/test_agent_launcher.py @@ -110,8 +110,8 @@ async def create_agent(**kwargs) -> Agent: await asyncio.sleep(2) # The agents must be closed - assert session1.agent.closed - assert session2.agent.closed + assert session1.finished + assert session2.finished async def test_idle_sessions_alive_with_idle_timeout_zero(self, stream_edge_mock): llm = DummyLLM() @@ -567,3 +567,89 @@ async def create_agent(**kwargs) -> Agent: sessions.append(session) assert len(sessions) == 10 + + async def test_max_session_duration_seconds_invalid(self, stream_edge_mock): + async def create_agent(**kwargs) -> Agent: + return Agent( + llm=DummyLLM(), + tts=DummyTTS(), + edge=stream_edge_mock, + agent_user=User(name="test"), + ) + + with pytest.raises( + ValueError, match="max_session_duration_seconds must be > 0" + ): + AgentLauncher( + create_agent=create_agent, + join_call=join_call_noop, + max_session_duration_seconds=0, + ) + + with pytest.raises( + ValueError, match="max_session_duration_seconds must be > 0" + ): + AgentLauncher( + create_agent=create_agent, + join_call=join_call_noop, + max_session_duration_seconds=-1, + ) + + async def test_max_session_duration_exceeded(self, stream_edge_mock): + llm = DummyLLM() + tts = DummyTTS() + + async def create_agent(**kwargs) -> Agent: + return Agent( + llm=llm, + tts=tts, + edge=stream_edge_mock, + agent_user=User(name="test"), + ) + + launcher = AgentLauncher( + create_agent=create_agent, + join_call=join_call_noop, + max_session_duration_seconds=1.0, + agent_idle_timeout=0, # Disable idle timeout + cleanup_interval=0.5, + ) + with patch.object(Agent, "on_call_for", return_value=10): + async with launcher: + session1 = await launcher.start_session(call_id="1") + session2 = await launcher.start_session(call_id="2") + # Sleep to let the launcher clean up the sessions + await asyncio.sleep(2) + + # The sessions must be closed due to max duration exceeded + assert session1.finished + assert session2.finished + + async def test_sessions_alive_with_max_session_duration_none(self, stream_edge_mock): + llm = DummyLLM() + tts = DummyTTS() + + async def create_agent(**kwargs) -> Agent: + return Agent( + llm=llm, + tts=tts, + edge=stream_edge_mock, + agent_user=User(name="test"), + ) + + launcher = AgentLauncher( + create_agent=create_agent, + join_call=join_call_noop, + max_session_duration_seconds=None, + agent_idle_timeout=0, # Disable idle timeout + ) + with patch.object(Agent, "on_call_for", return_value=10): + async with launcher: + agent1 = await launcher.launch() + agent2 = await launcher.launch() + # Sleep to give cleanup a chance to run + await asyncio.sleep(2) + + # The agents must NOT be closed because max_session_duration_seconds=None + assert not agent1.closed + assert not agent2.closed From 567b25d06bff33374e355e4609cb1dfced5c54cb Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Wed, 21 Jan 2026 19:18:21 +0100 Subject: [PATCH 6/9] Fix tests --- .../core/agents/agent_launcher.py | 17 ++- tests/test_agents/test_agent_launcher.py | 118 ++++++++++-------- 2 files changed, 77 insertions(+), 58 deletions(-) diff --git a/agents-core/vision_agents/core/agents/agent_launcher.py b/agents-core/vision_agents/core/agents/agent_launcher.py index cb3ed1321..13e99413f 100644 --- a/agents-core/vision_agents/core/agents/agent_launcher.py +++ b/agents-core/vision_agents/core/agents/agent_launcher.py @@ -262,14 +262,20 @@ async def start_session( this call_id has been reached. """ sessions_total = len(self._sessions) - if len(self._sessions) == self._max_concurrent_sessions: + if ( + self._max_concurrent_sessions + and len(self._sessions) == self._max_concurrent_sessions + ): raise MaxConcurrentSessionsExceeded( f"Maximum concurrent sessions exceeded:" f" {sessions_total}/{self._max_concurrent_sessions} sessions active" ) call_sessions_total = len(self._calls.get(call_id, set())) - if call_sessions_total == self._max_sessions_per_call: + if ( + self._max_sessions_per_call + and call_sessions_total == self._max_sessions_per_call + ): raise MaxSessionsPerCallExceeded( f"Maximum sessions exceeded for call " f"'{call_id}': {call_sessions_total}/{self._max_sessions_per_call}" @@ -388,6 +394,9 @@ async def _warmup_agent(self, agent: "Agent") -> None: async def _cleanup_idle_sessions(self) -> None: if not self._agent_idle_timeout and not self._max_session_duration_seconds: return + max_session_duration_seconds = self._max_session_duration_seconds or float( + "inf" + ) while self._running: # Collect idle agents first to close them all at once @@ -404,11 +413,11 @@ async def _cleanup_idle_sessions(self) -> None: f"(idle timeout is {self._agent_idle_timeout}s)" ) to_close.append(agent) - elif on_call_for >= self._max_session_duration_seconds: + elif on_call_for >= max_session_duration_seconds: logger.info( f'Closing session "{session.id}" with user_id "{agent.agent_user.id}" ' f"after reaching the maximum session " - f"duration of {self._max_session_duration_seconds}s" + f"duration of {max_session_duration_seconds}s" ) to_close.append(agent) diff --git a/tests/test_agents/test_agent_launcher.py b/tests/test_agents/test_agent_launcher.py index 64bdfb02f..cad82dd27 100644 --- a/tests/test_agents/test_agent_launcher.py +++ b/tests/test_agents/test_agent_launcher.py @@ -79,8 +79,9 @@ async def create_agent(**kwargs) -> Agent: ) launcher = AgentLauncher(create_agent=create_agent, join_call=join_call_noop) - agent = await launcher.launch() - assert agent + async with launcher: + agent = await launcher.launch() + assert agent async def test_idle_sessions_stopped(self, stream_edge_mock): llm = DummyLLM() @@ -109,9 +110,9 @@ async def create_agent(**kwargs) -> Agent: # Sleep 2s to let the launcher clean up the agents await asyncio.sleep(2) - # The agents must be closed - assert session1.finished - assert session2.finished + # The agents must be closed + assert session1.finished + assert session2.finished async def test_idle_sessions_alive_with_idle_timeout_zero(self, stream_edge_mock): llm = DummyLLM() @@ -129,19 +130,20 @@ async def create_agent(**kwargs) -> Agent: create_agent=create_agent, join_call=join_call_noop, agent_idle_timeout=0, + cleanup_interval=0.5, ) with patch.object(Agent, "idle_for", return_value=10): # Start the launcher internals async with launcher: # Launch a couple of idle agents - agent1 = await launcher.launch() - agent2 = await launcher.launch() + session1 = await launcher.start_session(call_id="call") + session2 = await launcher.start_session(call_id="call") # Sleep 2s to let the launcher clean up the agents await asyncio.sleep(2) - # The agents must not be closed because agent_idle_timeout=0 - assert not agent1.closed - assert not agent2.closed + # The agents must not be closed because agent_idle_timeout=0 + assert not session1.finished + assert not session2.finished async def test_active_agents_alive(self, stream_edge_mock): llm = DummyLLM() @@ -165,14 +167,14 @@ async def create_agent(**kwargs) -> Agent: # Start the launcher internals async with launcher: # Launch a couple of active agents (idle_for=0) - agent1 = await launcher.launch() - agent2 = await launcher.launch() + session1 = await launcher.start_session(call_id="call") + session2 = await launcher.start_session(call_id="call") # Sleep 2s to let the launcher clean up the agents await asyncio.sleep(2) - # The agents must not be closed - assert not agent1.closed - assert not agent2.closed + # The agents must not be closed + assert not session1.finished + assert not session2.finished async def test_start_session(self, stream_edge_mock): llm = DummyLLM() @@ -192,21 +194,22 @@ async def join_call( await asyncio.sleep(2) launcher = AgentLauncher(create_agent=create_agent, join_call=join_call) - session = await launcher.start_session(call_id="test", call_type="default") - assert session - assert session.id - assert session.call_id - assert session.agent - assert session.started_at - assert session.created_by is None - assert not session.finished - - assert launcher.get_session(session_id=session.id) - - # Wait for session to stop (it just sleeps) - await session.wait() - assert session.finished - assert not launcher.get_session(session_id=session.id) + async with launcher: + session = await launcher.start_session(call_id="test", call_type="default") + assert session + assert session.id + assert session.call_id + assert session.agent + assert session.started_at + assert session.created_by is None + assert not session.finished + + assert launcher.get_session(session_id=session.id) + + # Wait for session to stop (it just sleeps) + await session.wait() + assert session.finished + assert not launcher.get_session(session_id=session.id) async def test_close_session_exists(self, stream_edge_mock): llm = DummyLLM() @@ -226,13 +229,14 @@ async def join_call( await asyncio.sleep(10) launcher = AgentLauncher(create_agent=create_agent, join_call=join_call) - session = await launcher.start_session(call_id="test", call_type="default") - assert session + async with launcher: + session = await launcher.start_session(call_id="test", call_type="default") + assert session - await launcher.close_session(session_id=session.id, wait=True) - assert session.finished - assert session.task.done() - assert not launcher.get_session(session_id=session.id) + await launcher.close_session(session_id=session.id, wait=True) + assert session.finished + assert session.task.done() + assert not launcher.get_session(session_id=session.id) async def test_close_session_doesnt_exist(self, stream_edge_mock): llm = DummyLLM() @@ -273,8 +277,9 @@ async def join_call( await asyncio.sleep(10) launcher = AgentLauncher(create_agent=create_agent, join_call=join_call) - session = launcher.get_session(session_id="session-id") - assert session is None + async with launcher: + session = launcher.get_session(session_id="session-id") + assert session is None async def test_stop_multiple_sessions(self, stream_edge_mock): llm = DummyLLM() @@ -299,6 +304,7 @@ async def join_call( session2 = await launcher.start_session(call_id="test", call_type="default") session3 = await launcher.start_session(call_id="test", call_type="default") + # Sessions must be stopped when the launcher context manager exits assert session1.finished assert session2.finished assert session3.finished @@ -321,13 +327,14 @@ async def join_call( await asyncio.sleep(1) launcher = AgentLauncher(create_agent=create_agent, join_call=join_call) - session = await launcher.start_session(call_id="test", call_type="default") - assert session + async with launcher: + session = await launcher.start_session(call_id="test", call_type="default") + assert session - await session.wait() - assert session.finished - # The session becomes unavailable after it's done - assert launcher.get_session(session_id=session.id) is None + await session.wait() + assert session.finished + # The session becomes unavailable after it's done + assert launcher.get_session(session_id=session.id) is None async def test_session_cleaned_up_after_cancel(self, stream_edge_mock): llm = DummyLLM() @@ -621,11 +628,13 @@ async def create_agent(**kwargs) -> Agent: # Sleep to let the launcher clean up the sessions await asyncio.sleep(2) - # The sessions must be closed due to max duration exceeded - assert session1.finished - assert session2.finished + # The sessions must be closed due to max duration exceeded + assert session1.finished + assert session2.finished - async def test_sessions_alive_with_max_session_duration_none(self, stream_edge_mock): + async def test_sessions_alive_with_max_session_duration_none( + self, stream_edge_mock + ): llm = DummyLLM() tts = DummyTTS() @@ -641,15 +650,16 @@ async def create_agent(**kwargs) -> Agent: create_agent=create_agent, join_call=join_call_noop, max_session_duration_seconds=None, - agent_idle_timeout=0, # Disable idle timeout + agent_idle_timeout=10, + cleanup_interval=0.5, ) with patch.object(Agent, "on_call_for", return_value=10): async with launcher: - agent1 = await launcher.launch() - agent2 = await launcher.launch() + session1 = await launcher.start_session(call_id="1") + session2 = await launcher.start_session(call_id="2") # Sleep to give cleanup a chance to run await asyncio.sleep(2) - # The agents must NOT be closed because max_session_duration_seconds=None - assert not agent1.closed - assert not agent2.closed + # The agents must NOT be closed because max_session_duration_seconds=None + assert not session1.finished + assert not session2.finished From e5cceadd3ab92955d5db13761909fabb37014c1d Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Thu, 22 Jan 2026 13:00:02 +0100 Subject: [PATCH 7/9] Add limits to Runner API --- .../core/agents/agent_launcher.py | 7 +- .../vision_agents/core/runner/http/api.py | 19 +++++ tests/test_agents/test_runner.py | 75 +++++++++++++++---- 3 files changed, 81 insertions(+), 20 deletions(-) diff --git a/agents-core/vision_agents/core/agents/agent_launcher.py b/agents-core/vision_agents/core/agents/agent_launcher.py index 13e99413f..cfee3f25c 100644 --- a/agents-core/vision_agents/core/agents/agent_launcher.py +++ b/agents-core/vision_agents/core/agents/agent_launcher.py @@ -261,14 +261,12 @@ async def start_session( MaxSessionsPerCallExceeded: If the maximum number of sessions for this call_id has been reached. """ - sessions_total = len(self._sessions) if ( self._max_concurrent_sessions and len(self._sessions) == self._max_concurrent_sessions ): raise MaxConcurrentSessionsExceeded( - f"Maximum concurrent sessions exceeded:" - f" {sessions_total}/{self._max_concurrent_sessions} sessions active" + f"Reached maximum concurrent sessions of {self._max_concurrent_sessions}" ) call_sessions_total = len(self._calls.get(call_id, set())) @@ -277,8 +275,7 @@ async def start_session( and call_sessions_total == self._max_sessions_per_call ): raise MaxSessionsPerCallExceeded( - f"Maximum sessions exceeded for call " - f"'{call_id}': {call_sessions_total}/{self._max_sessions_per_call}" + f"Reached maximum sessions per call of {self._max_sessions_per_call}" ) agent: "Agent" = await self.launch() diff --git a/agents-core/vision_agents/core/runner/http/api.py b/agents-core/vision_agents/core/runner/http/api.py index 0986f0b94..ffe01777b 100644 --- a/agents-core/vision_agents/core/runner/http/api.py +++ b/agents-core/vision_agents/core/runner/http/api.py @@ -7,6 +7,7 @@ from fastapi.responses import Response from vision_agents.core import AgentLauncher from vision_agents.core.agents.agent_launcher import AgentSession +from vision_agents.core.agents.exceptions import SessionLimitExceeded from .dependencies import ( can_close_session, @@ -50,6 +51,22 @@ async def lifespan(app: FastAPI): status_code=status.HTTP_201_CREATED, summary="Join call with an agent", description="Start a new agent and have it join the specified call.", + responses={ + 201: { + "description": "Session created successfully", + "model": StartSessionResponse, + }, + 429: { + "description": "Session limits exceeded", + "content": { + "application/json": { + "example": { + "detail": "Reached maximum concurrent sessions of X", + } + } + }, + }, + }, dependencies=[Depends(can_start_session)], ) async def start_session( @@ -63,6 +80,8 @@ async def start_session( session = await launcher.start_session( call_id=request.call_id, call_type=request.call_type, created_by=user ) + except SessionLimitExceeded as e: + raise HTTPException(status_code=429, detail=str(e)) from e except Exception as e: logger.exception("Failed to start agent") raise HTTPException( diff --git a/tests/test_agents/test_runner.py b/tests/test_agents/test_runner.py index cd5f94e6b..815a516fa 100644 --- a/tests/test_agents/test_runner.py +++ b/tests/test_agents/test_runner.py @@ -33,28 +33,37 @@ async def simple_response(self, *_, **__) -> LLMResponseEvent[Any]: async def on_warmup(self) -> bool: return True - async def on_warmed_up(self, *_) -> None: + def on_warmed_up(self, *_) -> None: self.warmed_up = True @pytest.fixture() -async def agent_launcher(): - async def create_agent(**kwargs) -> Agent: - stream_edge_mock = MagicMock() - stream_edge_mock.events = EventManager() - - return Agent( - llm=DummyLLM(), - tts=DummyTTS(), - edge=stream_edge_mock, - agent_user=User(name="test"), +def agent_launcher_factory(): + def factory(**launcher_kwargs) -> AgentLauncher: + async def create_agent(**kwargs) -> Agent: + stream_edge_mock = MagicMock() + stream_edge_mock.events = EventManager() + + return Agent( + llm=DummyLLM(), + tts=DummyTTS(), + edge=stream_edge_mock, + agent_user=User(name="test"), + ) + + async def join_call(*args, **kwargs): + await asyncio.sleep(10) + + return AgentLauncher( + create_agent=create_agent, join_call=join_call, **launcher_kwargs ) - async def join_call(*args, **kwargs): - await asyncio.sleep(10) + return factory - launcher = AgentLauncher(create_agent=create_agent, join_call=join_call) - return launcher + +@pytest.fixture() +def agent_launcher(agent_launcher_factory): + return agent_launcher_factory() @pytest.fixture() @@ -403,3 +412,39 @@ def hello_world(): resp = await client.get("/hello-world") assert resp.status_code == 200 assert resp.content.decode() == "Hello world" + + async def test_start_session_max_concurrent_sessions_exceeded( + self, agent_launcher_factory, test_client_factory + ) -> None: + launcher = agent_launcher_factory(max_concurrent_sessions=1) + runner = Runner(launcher=launcher) + + async with test_client_factory(runner) as client: + resp = await client.post( + "/sessions", json={"call_id": "test-1", "call_type": "default"} + ) + assert resp.status_code == 201 + + resp = await client.post( + "/sessions", json={"call_id": "test-2", "call_type": "default"} + ) + assert resp.status_code == 429 + assert "Reached maximum concurrent sessions of" in resp.json()["detail"] + + async def test_start_session_max_sessions_per_call_exceeded( + self, agent_launcher_factory, test_client_factory + ) -> None: + launcher = agent_launcher_factory(max_sessions_per_call=1) + runner = Runner(launcher=launcher) + + async with test_client_factory(runner) as client: + resp = await client.post( + "/sessions", json={"call_id": "test", "call_type": "default"} + ) + assert resp.status_code == 201 + + resp = await client.post( + "/sessions", json={"call_id": "test", "call_type": "default"} + ) + assert resp.status_code == 429 + assert "Reached maximum sessions per call of" in resp.json()["detail"] From b898fa938a1b1ad204e8e3e54b9bd6b525ad8d9f Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Thu, 22 Jan 2026 13:18:17 +0100 Subject: [PATCH 8/9] Fix possible limit overflow in `AgentLauncher.start_session` --- .../core/agents/agent_launcher.py | 86 ++++++++++--------- 1 file changed, 44 insertions(+), 42 deletions(-) diff --git a/agents-core/vision_agents/core/agents/agent_launcher.py b/agents-core/vision_agents/core/agents/agent_launcher.py index cfee3f25c..73c92e06e 100644 --- a/agents-core/vision_agents/core/agents/agent_launcher.py +++ b/agents-core/vision_agents/core/agents/agent_launcher.py @@ -115,6 +115,7 @@ def __init__( self._join_call = join_call self._warmup_lock = asyncio.Lock() self._warmup_cache = WarmupCache() + self._start_lock = asyncio.Lock() if max_concurrent_sessions is not None and max_concurrent_sessions <= 0: raise ValueError("max_concurrent_sessions must be > 0 or None") @@ -261,52 +262,53 @@ async def start_session( MaxSessionsPerCallExceeded: If the maximum number of sessions for this call_id has been reached. """ - if ( - self._max_concurrent_sessions - and len(self._sessions) == self._max_concurrent_sessions - ): - raise MaxConcurrentSessionsExceeded( - f"Reached maximum concurrent sessions of {self._max_concurrent_sessions}" - ) + async with self._start_lock: + if ( + self._max_concurrent_sessions + and len(self._sessions) >= self._max_concurrent_sessions + ): + raise MaxConcurrentSessionsExceeded( + f"Reached maximum concurrent sessions of {self._max_concurrent_sessions}" + ) - call_sessions_total = len(self._calls.get(call_id, set())) - if ( - self._max_sessions_per_call - and call_sessions_total == self._max_sessions_per_call - ): - raise MaxSessionsPerCallExceeded( - f"Reached maximum sessions per call of {self._max_sessions_per_call}" - ) + call_sessions_total = len(self._calls.get(call_id, set())) + if ( + self._max_sessions_per_call + and call_sessions_total >= self._max_sessions_per_call + ): + raise MaxSessionsPerCallExceeded( + f"Reached maximum sessions per call of {self._max_sessions_per_call}" + ) - agent: "Agent" = await self.launch() - if video_track_override_path: - agent.set_video_track_override_path(video_track_override_path) + agent: "Agent" = await self.launch() + if video_track_override_path: + agent.set_video_track_override_path(video_track_override_path) - task = asyncio.create_task( - self._join_call(agent, call_type, call_id), name=f"agent-{agent.id}" - ) + task = asyncio.create_task( + self._join_call(agent, call_type, call_id), name=f"agent-{agent.id}" + ) - # Remove the session when the task is done - # or when the AgentSession is garbage-collected - # in case the done callback wasn't fired - def _finalizer(session_id_: str, call_id_: str, *_): - session_ = self._sessions.pop(session_id_, None) - if session_ is not None: - call_sessions = self._calls.get(call_id_, set()) - if call_sessions: - call_sessions.discard(session_id_) - - task.add_done_callback(partial(_finalizer, agent.id, call_id)) - session = AgentSession( - agent=agent, - task=task, - started_at=datetime.now(timezone.utc), - call_id=call_id, - created_by=created_by, - ) - self._sessions[agent.id] = session - self._calls.setdefault(call_id, set()).add(agent.id) - logger.info(f"Start agent session with id {session.id}") + # Remove the session when the task is done + # or when the AgentSession is garbage-collected + # in case the done callback wasn't fired + def _finalizer(session_id_: str, call_id_: str, *_): + session_ = self._sessions.pop(session_id_, None) + if session_ is not None: + call_sessions = self._calls.get(call_id_, set()) + if call_sessions: + call_sessions.discard(session_id_) + + task.add_done_callback(partial(_finalizer, agent.id, call_id)) + session = AgentSession( + agent=agent, + task=task, + started_at=datetime.now(timezone.utc), + call_id=call_id, + created_by=created_by, + ) + self._sessions[agent.id] = session + self._calls.setdefault(call_id, set()).add(agent.id) + logger.info(f"Started agent session with id {session.id}") return session async def close_session(self, session_id: str, wait: bool = False) -> bool: From f45964e4e4a73efe0b6eab288d54eab3aeafc706 Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Thu, 22 Jan 2026 13:23:06 +0100 Subject: [PATCH 9/9] Fix idleness check in `AgentLauncher` --- agents-core/vision_agents/core/agents/agent_launcher.py | 2 +- tests/test_agents/test_agent_launcher.py | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/agents-core/vision_agents/core/agents/agent_launcher.py b/agents-core/vision_agents/core/agents/agent_launcher.py index 73c92e06e..4f677e759 100644 --- a/agents-core/vision_agents/core/agents/agent_launcher.py +++ b/agents-core/vision_agents/core/agents/agent_launcher.py @@ -404,7 +404,7 @@ async def _cleanup_idle_sessions(self) -> None: agent = session.agent on_call_for = agent.on_call_for() idle_for = agent.idle_for() - if idle_for >= self._agent_idle_timeout: + if 0 < self._agent_idle_timeout <= idle_for: logger.info( f'Closing session "{session.id}" with ' f'user_id "{agent.agent_user.id}" after being ' diff --git a/tests/test_agents/test_agent_launcher.py b/tests/test_agents/test_agent_launcher.py index cad82dd27..9dfa44006 100644 --- a/tests/test_agents/test_agent_launcher.py +++ b/tests/test_agents/test_agent_launcher.py @@ -114,7 +114,10 @@ async def create_agent(**kwargs) -> Agent: assert session1.finished assert session2.finished - async def test_idle_sessions_alive_with_idle_timeout_zero(self, stream_edge_mock): + @pytest.mark.parametrize("idle_for", [0, 10]) + async def test_idle_sessions_alive_with_idle_timeout_zero( + self, stream_edge_mock, idle_for: float + ): llm = DummyLLM() tts = DummyTTS() @@ -132,7 +135,7 @@ async def create_agent(**kwargs) -> Agent: agent_idle_timeout=0, cleanup_interval=0.5, ) - with patch.object(Agent, "idle_for", return_value=10): + with patch.object(Agent, "idle_for", return_value=idle_for): # Start the launcher internals async with launcher: # Launch a couple of idle agents