Skip to content

Commit d5557e2

Browse files
fix(agents): await cancelled tasks in _merge_agent_run_pre_3_11 to prevent aclose() RuntimeError
On Python 3.10, ParallelAgent uses _merge_agent_run_pre_3_11 instead of asyncio.TaskGroup. When a sub-agent raises, the finally block cancelled all tasks but did not await them. This left the internal process_an_agent coroutines still executing their own finally blocks (which hold references to the sub-agent async generators) when _run_async_impl subsequently called aclose() on those generators, raising: RuntimeError: aclose(): asynchronous generator is already running Fix: add asyncio.gather(*tasks, return_exceptions=True) after cancellation so that all tasks — and their generator cleanup — complete before the caller can invoke aclose(). Fixes #5297
1 parent 3e282d2 commit d5557e2

2 files changed

Lines changed: 37 additions & 0 deletions

File tree

src/google/adk/agents/parallel_agent.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ async def process_an_agent(events_for_one_agent):
145145
finally:
146146
for task in tasks:
147147
task.cancel()
148+
await asyncio.gather(*tasks, return_exceptions=True)
148149

149150

150151
class ParallelAgent(BaseAgent):

tests/unittests/agents/test_parallel_agent.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from google.adk.agents.base_agent import BaseAgent
2121
from google.adk.agents.base_agent import BaseAgentState
2222
from google.adk.agents.invocation_context import InvocationContext
23+
from google.adk.agents.parallel_agent import _merge_agent_run_pre_3_11
2324
from google.adk.agents.parallel_agent import ParallelAgent
2425
from google.adk.agents.sequential_agent import SequentialAgent
2526
from google.adk.agents.sequential_agent import SequentialAgentState
@@ -373,3 +374,38 @@ async def test_stop_agent_if_sub_agent_fails(
373374
async for _ in agen:
374375
# The infinite agent could iterate a few times depending on scheduling.
375376
pass
377+
378+
379+
async def _slow_agent_with_cleanup_delay():
380+
"""Async generator that sleeps in its finally block to simulate cleanup."""
381+
try:
382+
await asyncio.sleep(10)
383+
yield 'slow-event'
384+
finally:
385+
await asyncio.sleep(0.05)
386+
387+
388+
async def _failing_agent():
389+
"""Async generator that raises after a short delay."""
390+
await asyncio.sleep(0.01)
391+
raise ValueError('simulated sub-agent failure')
392+
yield # pragma: no cover
393+
394+
395+
@pytest.mark.asyncio
396+
async def test_merge_agent_run_pre_3_11_no_aclose_error_on_failure():
397+
"""Regression test for Python 3.10 RuntimeError: aclose() already running.
398+
399+
_merge_agent_run_pre_3_11 must await all cancelled tasks before returning so
400+
that generators are fully released before the caller invokes aclose() on them.
401+
"""
402+
agent_runs = [_slow_agent_with_cleanup_delay(), _failing_agent()]
403+
404+
with pytest.raises(ValueError, match='simulated sub-agent failure'):
405+
async for _ in _merge_agent_run_pre_3_11(agent_runs):
406+
pass
407+
408+
# If tasks were not properly awaited, aclose() on a still-running generator
409+
# would raise RuntimeError here.
410+
for agen in agent_runs:
411+
await agen.aclose()

0 commit comments

Comments
 (0)