Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions src/google/adk/agents/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,20 @@ async def run_async(
if ctx.end_invocation:
return

async with Aclosing(self._run_async_impl(ctx)) as agen:
async for event in agen:
yield event
try:
async with Aclosing(self._run_async_impl(ctx)) as agen:
async for event in agen:
yield event
except Exception as agent_error:
# Notify plugins that this agent run failed before re-raising.
# after_agent_callback is intentionally skipped so plugins can
# distinguish a clean completion from a fatal failure.
await ctx.plugin_manager.run_on_agent_error_callback(
agent=self,
callback_context=CallbackContext(ctx),
error=agent_error,
)
raise

if ctx.end_invocation:
return
Expand Down Expand Up @@ -326,9 +337,18 @@ async def run_live(
if ctx.end_invocation:
return

async with Aclosing(self._run_live_impl(ctx)) as agen:
async for event in agen:
yield event
try:
async with Aclosing(self._run_live_impl(ctx)) as agen:
async for event in agen:
yield event
except Exception as agent_error:
# Notify plugins that this live agent run failed before re-raising.
await ctx.plugin_manager.run_on_agent_error_callback(
agent=self,
callback_context=CallbackContext(ctx),
error=agent_error,
)
raise

if event := await self._handle_after_agent_callback(ctx):
yield event
Expand Down
60 changes: 60 additions & 0 deletions src/google/adk/plugins/base_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,63 @@ async def on_tool_error_callback(
allows the original error to be raised.
"""
pass

async def on_agent_error_callback(
self,
*,
agent: BaseAgent,
callback_context: CallbackContext,
error: Exception,
) -> None:
"""Callback executed when an unhandled exception escapes an agent's run.

This callback fires when an exception propagates out of the agent's
``_run_async_impl`` or ``_run_live_impl`` before ``after_agent_callback``
has had a chance to execute. It is intended purely for observability
(logging, metrics, tracing) — the original exception is always re-raised
after all registered plugins have been notified.

Unlike ``on_tool_error_callback`` and ``on_model_error_callback``, this
callback cannot swallow or replace the error; it always returns ``None``.

Args:
agent: The agent instance whose execution raised the exception.
callback_context: The callback context for the failed agent invocation.
error: The exception that was raised.

Returns:
None. The return value is ignored; the exception is re-raised by the
framework regardless.
"""
pass

async def on_run_error_callback(
self,
*,
invocation_context: InvocationContext,
error: Exception,
) -> None:
"""Callback executed when an unhandled exception escapes a runner invocation.

This callback fires when an exception propagates out of the runner's main
execution loop before ``after_run_callback`` has had a chance to execute.
It is intended purely for observability (logging, metrics, tracing) — the
original exception is always re-raised after all registered plugins have
been notified.

This fills the gap where a fatal error (e.g. an unrecoverable model crash
or tool exception) would otherwise cause the invocation to disappear from
observability sinks without ever emitting a terminal event.

Unlike ``on_tool_error_callback`` and ``on_model_error_callback``, this
callback cannot swallow or replace the error; it always returns ``None``.

Args:
invocation_context: The context for the entire invocation.
error: The exception that escaped the runner's execution loop.

Returns:
None. The return value is ignored; the exception is re-raised by the
framework regardless.
"""
pass
77 changes: 77 additions & 0 deletions src/google/adk/plugins/plugin_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
"after_model_callback",
"on_tool_error_callback",
"on_model_error_callback",
"on_agent_error_callback",
"on_run_error_callback",
]

logger = logging.getLogger("google_adk." + __name__)
Expand Down Expand Up @@ -257,6 +259,46 @@ async def run_on_tool_error_callback(
error=error,
)

async def run_on_agent_error_callback(
self,
*,
agent: BaseAgent,
callback_context: CallbackContext,
error: Exception,
) -> None:
"""Runs the ``on_agent_error_callback`` for all plugins.

All registered plugins are notified even if an earlier plugin raises —
failures in individual plugins are logged but do not prevent subsequent
plugins from being called. The original agent error is never suppressed
by this method.
"""
await self._run_error_callbacks(
"on_agent_error_callback",
agent=agent,
callback_context=callback_context,
error=error,
)

async def run_on_run_error_callback(
self,
*,
invocation_context: InvocationContext,
error: Exception,
) -> None:
"""Runs the ``on_run_error_callback`` for all plugins.

All registered plugins are notified even if an earlier plugin raises —
failures in individual plugins are logged but do not prevent subsequent
plugins from being called. The original runner error is never suppressed
by this method.
"""
await self._run_error_callbacks(
"on_run_error_callback",
invocation_context=invocation_context,
error=error,
)

async def _run_callbacks(
self, callback_name: PluginCallbackName, **kwargs: Any
) -> Optional[Any]:
Expand Down Expand Up @@ -306,6 +348,41 @@ async def _run_callbacks(

return None

async def _run_error_callbacks(
self, callback_name: PluginCallbackName, **kwargs: Any
) -> None:
"""Executes an error-notification callback for **all** registered plugins.

Unlike ``_run_callbacks``, this method does **not** stop on the first
non-``None`` return value. Error callbacks are pure observers — every
plugin deserves a chance to record the failure even if an earlier plugin
in the chain itself encounters an error.

Individual plugin failures are logged but do not prevent subsequent
plugins from being called, and they do not propagate to the caller. The
underlying framework error that triggered this notification is always
re-raised by the caller independently.

Args:
callback_name: The name of the error callback method to execute.
**kwargs: Keyword arguments forwarded to each plugin's callback.
"""
for plugin in self.plugins:
callback_method = getattr(plugin, callback_name)
try:
await callback_method(**kwargs)
except Exception as e:
# Log but continue — a broken observability plugin must not hide the
# original error from the framework or prevent other plugins from
# receiving the notification.
logger.error(
"Error in plugin '%s' during '%s' callback: %s",
plugin.name,
callback_name,
e,
exc_info=True,
)

async def close(self) -> None:
"""Calls the close method on all registered plugins concurrently.

Expand Down
137 changes: 74 additions & 63 deletions src/google/adk/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,73 +849,84 @@ async def _exec_with_plugin(
buffered_events: list[Event] = []
is_transcribing: bool = False

async with Aclosing(execute_fn(invocation_context)) as agen:
async for event in agen:
_apply_run_config_custom_metadata(
event, invocation_context.run_config
)
if is_live_call:
if event.partial and _is_transcription(event):
is_transcribing = True
if is_transcribing and _is_tool_call_or_response(event):
# only buffer function call and function response event which is
# non-partial
buffered_events.append(event)
continue
# Note for live/bidi: for audio response, it's considered as
# non-partial event(event.partial=None)
# event.partial=False and event.partial=None are considered as
# non-partial event; event.partial=True is considered as partial
# event.
if event.partial is not True:
if _is_transcription(event) and (
_has_non_empty_transcription_text(event.input_transcription)
or _has_non_empty_transcription_text(
event.output_transcription
try:
async with Aclosing(execute_fn(invocation_context)) as agen:
async for event in agen:
_apply_run_config_custom_metadata(
event, invocation_context.run_config
)
if is_live_call:
if event.partial and _is_transcription(event):
is_transcribing = True
if is_transcribing and _is_tool_call_or_response(event):
# only buffer function call and function response event which is
# non-partial
buffered_events.append(event)
continue
# Note for live/bidi: for audio response, it's considered as
# non-partial event(event.partial=None)
# event.partial=False and event.partial=None are considered as
# non-partial event; event.partial=True is considered as partial
# event.
if event.partial is not True:
if _is_transcription(event) and (
_has_non_empty_transcription_text(event.input_transcription)
or _has_non_empty_transcription_text(
event.output_transcription
)
):
# transcription end signal, append buffered events
is_transcribing = False
logger.debug(
'Appending transcription finished event: %s', event
)
):
# transcription end signal, append buffered events
is_transcribing = False
logger.debug(
'Appending transcription finished event: %s', event
if self._should_append_event(event, is_live_call):
await self.session_service.append_event(
session=session, event=event
)

for buffered_event in buffered_events:
logger.debug('Appending buffered event: %s', buffered_event)
await self.session_service.append_event(
session=session, event=buffered_event
)
yield buffered_event # yield buffered events to caller
buffered_events = []
else:
# non-transcription event or empty transcription event, for
# example, event that stores blob reference, should be appended.
if self._should_append_event(event, is_live_call):
logger.debug('Appending non-buffered event: %s', event)
await self.session_service.append_event(
session=session, event=event
)
else:
if event.partial is not True:
await self.session_service.append_event(
session=session, event=event
)
if self._should_append_event(event, is_live_call):
await self.session_service.append_event(
session=session, event=event
)

for buffered_event in buffered_events:
logger.debug('Appending buffered event: %s', buffered_event)
await self.session_service.append_event(
session=session, event=buffered_event
)
yield buffered_event # yield buffered events to caller
buffered_events = []
else:
# non-transcription event or empty transcription event, for
# example, event that stores blob reference, should be appended.
if self._should_append_event(event, is_live_call):
logger.debug('Appending non-buffered event: %s', event)
await self.session_service.append_event(
session=session, event=event
)
else:
if event.partial is not True:
await self.session_service.append_event(
session=session, event=event
)

# Step 3: Run the on_event callbacks to optionally modify the event.
modified_event = await plugin_manager.run_on_event_callback(
invocation_context=invocation_context, event=event
)
if modified_event:
_apply_run_config_custom_metadata(
modified_event, invocation_context.run_config
# Step 3: Run the on_event callbacks to optionally modify the event.
modified_event = await plugin_manager.run_on_event_callback(
invocation_context=invocation_context, event=event
)
yield modified_event
else:
yield event
if modified_event:
_apply_run_config_custom_metadata(
modified_event, invocation_context.run_config
)
yield modified_event
else:
yield event
except Exception as run_error:
# Step 3b: Notify all plugins that this invocation failed.
# The callback is fire-and-forget — it cannot suppress the error.
# after_run_callback is intentionally skipped on the error path so
# that plugins can distinguish clean completions from fatal failures.
await plugin_manager.run_on_run_error_callback(
invocation_context=invocation_context,
error=run_error,
)
raise

# Step 4: Run the after_run callbacks to perform global cleanup tasks or
# finalizing logs and metrics data.
Expand Down
Loading