diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 5ebc31d3e2..c1a768c401 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -73,6 +73,7 @@ if TYPE_CHECKING: from ..agents.invocation_context import InvocationContext + from ..events.event import Event logger: logging.Logger = logging.getLogger("google_adk." + __name__) tracer = trace.get_tracer( @@ -84,12 +85,57 @@ _SCHEMA_VERSION = "1" _SCHEMA_VERSION_LABEL_KEY = "adk_schema_version" +# ADK 2.0 envelope version. Stamped onto every ADK-enriched row as +# ``attributes.adk.schema_version``. Independent of the BigQuery row +# schema version above — this names the producer's ADK 2.0 attribute +# contract so downstream consumers can gate on it. +_ADK_ENVELOPE_SCHEMA_VERSION = "1" + _HITL_EVENT_MAP = MappingProxyType({ "adk_request_credential": "HITL_CREDENTIAL_REQUEST", "adk_request_confirmation": "HITL_CONFIRMATION_REQUEST", "adk_request_input": "HITL_INPUT_REQUEST", }) +# Reverse of _HITL_EVENT_MAP for the C7 pause_kind discriminator. The +# id→name lookup in #199/#293 v5 routes ``adk_request_credential`` → +# ``hitl_credential`` etc.; everything else is ``tool``. +_HITL_PAUSE_KIND_MAP = MappingProxyType({ + "adk_request_credential": "hitl_credential", + "adk_request_confirmation": "hitl_confirmation", + "adk_request_input": "hitl_input", +}) + + +def _derive_scope( + isolation_scope: Optional[str], +) -> Optional[dict[str, str]]: + """Derives ``attributes.adk.scope`` per the #198 / #293 v5 rule. + + Order is fixed: (1) None → null; (2) node-shape (``name@run_id`` or + ``parent/name@run_id``) → ``node_run``; (3) any other non-empty + string → ``function_call`` (model-provided FC IDs like ``call_*`` and + ``toolu_*`` legitimately match here); (4) empty/non-string → ``unknown`` + with a warning. Steps 2 and 3 are intentionally ordered: a bare + ``name@run_id`` must classify as ``node_run`` first, not as + ``function_call`` by fall-through. + """ + if isolation_scope is None: + return None + if not isinstance(isolation_scope, str) or not isolation_scope: + logger.warning( + "Unexpected isolation_scope shape: %r; classifying as 'unknown'", + isolation_scope, + ) + return {"id": str(isolation_scope), "kind": "unknown"} + # Node-shape: last segment contains '@'. The full string may also be + # path-prefixed (e.g. ``wf/A@1/B@2``). + last_segment = isolation_scope.rsplit("/", 1)[-1] + if "@" in last_segment: + return {"id": isolation_scope, "kind": "node_run"} + return {"id": isolation_scope, "kind": "function_call"} + + # Track all living plugin instances so the fork handler can reset # them proactively in the child, before _ensure_started runs. _LIVE_PLUGINS: weakref.WeakSet = weakref.WeakSet() @@ -2021,6 +2067,12 @@ class EventData: error_message: Optional[str] = None extra_attributes: dict[str, Any] = field(default_factory=dict) trace_id_override: Optional[str] = None + # ADK 2.0 envelope: callbacks that hold the source Event pass it here + # so ``_log_event`` can stamp ``attributes.adk.{source_event_id, node, + # branch, scope, ...}``. Leave None for rows that don't originate from + # an Event — the envelope helper writes those attributes as null + # rather than synthesizing fake identity (per #293 v5 A3 contract). + source_event: Optional["Event"] = None class BigQueryAgentAnalyticsPlugin(BasePlugin): @@ -2825,6 +2877,108 @@ def _extract_latency( latency_json["time_to_first_token_ms"] = event_data.time_to_first_token_ms return latency_json or None + def _build_adk_envelope( + self, + callback_context: CallbackContext, + source_event: Optional["Event"], + ) -> dict[str, Any]: + """Builds the ``attributes.adk`` envelope per #293 v5. + + A1 / A2 (``schema_version``, ``app_name``) stamp on every ADK-enriched + row regardless of origin. A3 / C1 / C2 / C3 (``source_event_id``, + ``node``, ``branch``, ``scope``) and C8 (``route``, + ``render_ui_widgets``, ``rewind_before_invocation_id``) only stamp + when a source Event is provided — callback-only rows leave them + JSON null rather than synthesizing fake identity. + """ + adk: dict[str, Any] = { + "schema_version": _ADK_ENVELOPE_SCHEMA_VERSION, + } + try: + adk["app_name"] = callback_context._invocation_context.session.app_name + except Exception: + adk["app_name"] = None + + if source_event is None: + return adk + + # Every getattr below is defensive: source_event is "anything the + # caller hands us", which in test suites can be a Mock. Best-effort + # enrichment means "leave null on missing attrs", never crash the + # row. + try: + source_event_id = getattr(source_event, "id", None) + if source_event_id: + adk["source_event_id"] = source_event_id # A3 + except Exception: + pass + + # C1: node = {path, run_id, parent_path}. NodeInfo.path defaults to + # the empty string in current ADK (events/event.py:45); preserve it + # verbatim and emit parent_path = null rather than synthesizing a + # fake workflow node from an empty path. + try: + node_info = getattr(source_event, "node_info", None) + if node_info is not None and hasattr(node_info, "path"): + path = getattr(node_info, "path", "") or "" + run_id = getattr(node_info, "run_id", None) + parent_path = None + if path and "/" in path: + parent_path = path.rsplit("/", 1)[0] + adk["node"] = { + "path": path, + "run_id": run_id, + "parent_path": parent_path, + } + except Exception: + pass + + # C2: branch — absent stays JSON null (no sentinel string). + try: + if hasattr(source_event, "branch"): + adk["branch"] = source_event.branch + except Exception: + pass + + # C3: scope shape derivation (per #190 v5 / #198). Order matters: + # node-shape patterns must be checked before falling through to + # function_call so bare ``name@run_id`` doesn't misclassify. + try: + if hasattr(source_event, "isolation_scope"): + adk["scope"] = _derive_scope(source_event.isolation_scope) + except Exception: + pass + + # C8: raw EventActions mirror (flat under attributes.adk per #203). + # Stamp only when actually set so JSON doesn't bloat with nulls. + try: + actions = getattr(source_event, "actions", None) + except Exception: + actions = None + if actions is not None: + try: + route = getattr(actions, "route", None) + if route is not None: + adk["route"] = route + except Exception: + pass + try: + widgets = getattr(actions, "render_ui_widgets", None) + if widgets is not None: + adk["render_ui_widgets"] = [ + w.model_dump() if hasattr(w, "model_dump") else w for w in widgets + ] + except Exception: + pass + try: + rewind = getattr(actions, "rewind_before_invocation_id", None) + if rewind is not None: + adk["rewind_before_invocation_id"] = rewind + except Exception: + pass + + return adk + def _enrich_attributes( self, event_data: EventData, @@ -2834,12 +2988,15 @@ def _enrich_attributes( Reads ``model``, ``model_version``, and ``usage_metadata`` from *event_data*, copies ``extra_attributes``, then adds session metadata - and custom tags. + and custom tags. Also stamps the ``adk`` envelope (#293 v5). Returns: A new dict ready for JSON serialization into the attributes column. """ attrs: dict[str, Any] = dict(event_data.extra_attributes) + attrs["adk"] = self._build_adk_envelope( + callback_context, event_data.source_event + ) attrs["root_agent_name"] = TraceManager.get_root_agent_name() if event_data.model: @@ -2992,9 +3149,14 @@ async def on_user_message_callback( ) -> None: """Parity with V1: Logs USER_MESSAGE_RECEIVED event. - Also detects HITL completion responses (user-sent - ``FunctionResponse`` parts with ``adk_request_*`` names) and emits - dedicated ``HITL_*_COMPLETED`` events. + Also detects: + * HITL completion responses (user-sent ``FunctionResponse`` parts + with ``adk_request_*`` names) → ``HITL_*_COMPLETED``. + * Non-HITL ``FunctionResponse`` parts from a user message → these + are the long-running tool completions for tools that paused via + ``TOOL_PAUSED``. Emitted as ``TOOL_COMPLETED`` with + ``pause_kind = 'tool'`` and ``function_call_id`` so the customer + can join the pair from BigQuery (#293 v5 C7 pair-key subset). Args: invocation_context: The context of the current invocation. @@ -3008,26 +3170,49 @@ async def on_user_message_callback( raw_content=user_message, ) - # Detect HITL completion responses in the user message. + # Detect completion responses in the user message. if user_message and user_message.parts: for part in user_message.parts: - if part.function_response: - hitl_event = _HITL_EVENT_MAP.get(part.function_response.name) - if hitl_event: - resp_truncated, is_truncated = _recursive_smart_truncate( - part.function_response.response or {}, - self.config.max_content_length, - ) - content_dict = { - "tool": part.function_response.name, - "result": resp_truncated, - } - await self._log_event( - hitl_event + "_COMPLETED", - callback_ctx, - raw_content=content_dict, - is_truncated=is_truncated, - ) + if not part.function_response: + continue + hitl_event = _HITL_EVENT_MAP.get(part.function_response.name) + resp_truncated, is_truncated = _recursive_smart_truncate( + part.function_response.response or {}, + self.config.max_content_length, + ) + content_dict = { + "tool": part.function_response.name, + "result": resp_truncated, + } + if hitl_event: + # HITL completions stay on the HITL_*_COMPLETED stream — they + # MUST NOT also emit TOOL_COMPLETED (per #293 v5 C7). + await self._log_event( + hitl_event + "_COMPLETED", + callback_ctx, + raw_content=content_dict, + is_truncated=is_truncated, + ) + else: + # Non-HITL function_response arriving via a user message is + # by construction a long-running tool completion: regular + # tool calls complete inside the agent run via + # after_tool_callback, so a function_response inside a user + # message is the resume side of a previously-paused tool. + # Stamp the pair keys; pause_orphan / registry semantics are + # deferred to #206. + await self._log_event( + "TOOL_COMPLETED", + callback_ctx, + raw_content=content_dict, + is_truncated=is_truncated, + event_data=EventData( + extra_attributes={ + "pause_kind": "tool", + "function_call_id": part.function_response.id, + }, + ), + ) @_safe_callback async def on_event_callback( @@ -3070,11 +3255,84 @@ async def on_event_callback( "STATE_DELTA", callback_ctx, event_data=EventData( - extra_attributes={"state_delta": dict(event.actions.state_delta)} + source_event=event, + extra_attributes={"state_delta": dict(event.actions.state_delta)}, ), ) - # --- HITL event logging --- + # --- AGENT_TRANSFER (C4 / #200) --- + # actions.transfer_to_agent stores the *target* agent only + # (events/event_actions.py:75); from_agent is pinned to event.author + # per #293 v5 C4. Never fabricate authors on non-Event paths. + if event.actions.transfer_to_agent: + await self._log_event( + "AGENT_TRANSFER", + callback_ctx, + raw_content={ + "from_agent": event.author, + "to_agent": event.actions.transfer_to_agent, + }, + event_data=EventData(source_event=event), + ) + + # --- EVENT_COMPACTION (C5 / #201) --- + # EventCompaction.start_timestamp / end_timestamp are float epoch + # seconds. Preserve fractional precision here; consumer view + # conversion is deferred. + compaction = event.actions.compaction + if compaction is not None: + compacted_content, compaction_truncated = self._format_content_safely( + compaction.compacted_content + ) + await self._log_event( + "EVENT_COMPACTION", + callback_ctx, + raw_content={ + "start_timestamp": compaction.start_timestamp, + "end_timestamp": compaction.end_timestamp, + "compacted_content": compacted_content, + }, + is_truncated=compaction_truncated, + event_data=EventData(source_event=event), + ) + + # --- AGENT_STATE_CHECKPOINT (C6 / #202) --- + # Fires when *either* agent_state is set or end_of_agent is True; + # supports {agent_state: None, end_of_agent: True} payloads. + # Inline payload only — oversized-state GCS offload deferred. + if ( + event.actions.agent_state is not None + or event.actions.end_of_agent is True + ): + agent_state_dict, agent_state_truncated = ( + _recursive_smart_truncate( + event.actions.agent_state, + self.config.max_content_length, + ) + if event.actions.agent_state is not None + else (None, False) + ) + await self._log_event( + "AGENT_STATE_CHECKPOINT", + callback_ctx, + raw_content={ + "agent_state": agent_state_dict, + "end_of_agent": bool(event.actions.end_of_agent), + }, + is_truncated=agent_state_truncated, + event_data=EventData(source_event=event), + ) + + # --- HITL + TOOL_PAUSED (C7 / #199 — pair-key subset) + per-part + # iteration over event.content.parts --- + # TOOL_PAUSED fires per long_running_tool_id; pause_kind is derived + # via the id→name lookup against _HITL_PAUSE_KIND_MAP, so a HITL + # long-running call carries pause_kind = 'hitl_*' and a regular + # long-running tool carries pause_kind = 'tool'. function_call_id + # joins to the downstream TOOL_COMPLETED via the user message path. + # Use getattr so the existing Mock-based HITL test fixtures still + # work — they construct events without setting long_running_tool_ids. + long_running_ids = set(getattr(event, "long_running_tool_ids", None) or ()) if event.content and event.content.parts: for part in event.content.parts: # Detect HITL function calls (request events). @@ -3094,8 +3352,38 @@ async def on_event_callback( callback_ctx, raw_content=content_dict, is_truncated=is_truncated, + event_data=EventData(source_event=event), + ) + # C7: per-id TOOL_PAUSED emit. pause_kind derives from the + # function_call NAME (#199 / #293 v5) — looking it up against + # the id value would misclassify every HITL pause as 'tool'. + if part.function_call.id in long_running_ids: + pause_kind = _HITL_PAUSE_KIND_MAP.get( + part.function_call.name, "tool" + ) + args_truncated, is_truncated = _recursive_smart_truncate( + part.function_call.args or {}, + self.config.max_content_length, + ) + await self._log_event( + "TOOL_PAUSED", + callback_ctx, + raw_content={ + "tool": part.function_call.name, + "args": args_truncated, + }, + is_truncated=is_truncated, + event_data=EventData( + source_event=event, + extra_attributes={ + "pause_kind": pause_kind, + "function_call_id": part.function_call.id, + }, + ), ) - # Detect HITL function responses (completion events). + # Detect HITL function responses (completion events). HITL + # function responses route ONLY here, never to TOOL_COMPLETED + # (per #293 v5 C7 / verified at this file's HITL test suite). if part.function_response: hitl_event = _HITL_EVENT_MAP.get(part.function_response.name) if hitl_event: @@ -3112,6 +3400,7 @@ async def on_event_callback( callback_ctx, raw_content=content_dict, is_truncated=is_truncated, + event_data=EventData(source_event=event), ) # --- A2A interaction logging --- @@ -3147,6 +3436,7 @@ async def on_event_callback( raw_content=content_dict, is_truncated=is_truncated or content_truncated, event_data=EventData( + source_event=event, extra_attributes={ "a2a_metadata": a2a_truncated, }, @@ -3183,12 +3473,17 @@ async def on_event_callback( role=event.content.role, parts=visible_parts ) formatted, truncated = self._format_content_safely(visible_content) + # source_event=event carries the ADK envelope (A3 / node / + # branch / scope). The flat ``source_event_*`` extras are + # retained for backward compat with existing AGENT_RESPONSE + # consumers; the canonical keys are under ``attributes.adk.*``. await self._log_event( "AGENT_RESPONSE", callback_ctx, raw_content={"response": formatted}, is_truncated=truncated, event_data=EventData( + source_event=event, extra_attributes={ "source_event_id": event.id, "source_event_author": event.author, @@ -3199,24 +3494,6 @@ async def on_event_callback( return None - async def on_state_change_callback( - self, - *, - callback_context: CallbackContext, - state_delta: dict[str, Any], - ) -> None: - """Deprecated: use on_event_callback instead. - - This method is retained for API compatibility but is never invoked - by the framework (not in BasePlugin, PluginManager, or Runner). - State deltas are now captured via on_event_callback. - """ - logger.warning( - "on_state_change_callback is deprecated and never called by" - " the framework. State deltas are captured via" - " on_event_callback." - ) - @_safe_callback async def before_run_callback( self, *, invocation_context: "InvocationContext" diff --git a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py index d265f605dd..31226422bc 100644 --- a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py +++ b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py @@ -8125,3 +8125,472 @@ def test_plugin_get_drop_stats_empty_without_processor(self): project_id=PROJECT_ID, dataset_id=DATASET_ID, table_id=TABLE_ID ) assert plugin.get_drop_stats() == {} + + +# ----------------------------------------------------------------------------- +# ADK 2.0 minimum producer cut (#293 v5) +# +# Coverage matrix: +# A1 / A2 attributes.adk.{schema_version, app_name} on every row +# A3 attributes.adk.source_event_id on Event-originating rows +# C1 attributes.adk.node {path, run_id, parent_path} +# C2 attributes.adk.branch +# C3 attributes.adk.scope {id, kind} +# C4 AGENT_TRANSFER emit +# C5 EVENT_COMPACTION emit (preserves fractional float epoch) +# C6 AGENT_STATE_CHECKPOINT emit (both shapes) + id-stabilization +# C7 TOOL_PAUSED with pause_kind / function_call_id +# HITL non-routing to TOOL_COMPLETED +# user-message TOOL_COMPLETED with pause_kind='tool' +# C8 attributes.adk.{route, render_ui_widgets, rewind_before_invocation_id} +# D1 on_state_change_callback removed +# ----------------------------------------------------------------------------- + + +def test_derive_scope_unscoped(): + """C3: None isolation_scope → scope = null.""" + assert bigquery_agent_analytics_plugin._derive_scope(None) is None + + +def test_derive_scope_node_run_bare(): + """C3: bare 'name@run_id' classifies as node_run (not function_call).""" + scope = bigquery_agent_analytics_plugin._derive_scope("loopA@42") + assert scope == {"id": "loopA@42", "kind": "node_run"} + + +def test_derive_scope_node_run_path(): + """C3: 'parent/name@run_id' classifies as node_run.""" + scope = bigquery_agent_analytics_plugin._derive_scope("wf/A@1/B@2") + assert scope == {"id": "wf/A@1/B@2", "kind": "node_run"} + + +def test_derive_scope_function_call_provider_id(): + """C3: model-provided FC IDs (call_*, toolu_*) classify as function_call.""" + for fc_id in ("call_abc123", "toolu_xyz", "adk-fc-1"): + scope = bigquery_agent_analytics_plugin._derive_scope(fc_id) + assert scope == {"id": fc_id, "kind": "function_call"}, fc_id + + +def test_derive_scope_empty_string_unknown(): + """C3: empty/non-string anomalies classify as unknown.""" + scope = bigquery_agent_analytics_plugin._derive_scope("") + assert scope == {"id": "", "kind": "unknown"} + + +def test_d1_on_state_change_callback_removed(): + """D1: the deprecated stub is gone from the public surface.""" + assert not hasattr( + bigquery_agent_analytics_plugin.BigQueryAgentAnalyticsPlugin, + "on_state_change_callback", + ) + + +class TestAdkEnvelope: + """A1 / A2 / A3 / C1 / C2 / C3 / C8 envelope shape on emitted rows.""" + + @pytest.mark.asyncio + async def test_envelope_on_non_event_row( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """USER_MESSAGE_RECEIVED has no source Event → A1/A2 only, A3/C1/C2/C3 null.""" + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_user_message_callback( + invocation_context=invocation_context, + user_message=types.Content(role="user", parts=[types.Part(text="hi")]), + ) + await asyncio.sleep(0.01) + log_entry = await _get_captured_event_dict_async( + mock_write_client, dummy_arrow_schema + ) + _assert_common_fields(log_entry, "USER_MESSAGE_RECEIVED") + attributes = json.loads(log_entry["attributes"]) + adk = attributes["adk"] + # A1: schema_version always present. + assert adk["schema_version"] == ( + bigquery_agent_analytics_plugin._ADK_ENVELOPE_SCHEMA_VERSION + ) + # A2: app_name always present (from session). + assert adk["app_name"] == "test_app" + # A3 / C1 / C2 / C3 absent on rows without an originating Event. + assert "source_event_id" not in adk + assert "node" not in adk + assert "branch" not in adk + assert "scope" not in adk + + @pytest.mark.asyncio + async def test_envelope_on_event_row( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """STATE_DELTA from on_event_callback carries the full envelope.""" + state_delta = {"k": "v"} + event = event_lib.Event( + author="agent_a", + branch="branch-x", + actions=event_actions_lib.EventActions(state_delta=state_delta), + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + log_entry = await _get_captured_event_dict_async( + mock_write_client, dummy_arrow_schema + ) + _assert_common_fields(log_entry, "STATE_DELTA") + attributes = json.loads(log_entry["attributes"]) + adk = attributes["adk"] + assert adk["schema_version"] == ( + bigquery_agent_analytics_plugin._ADK_ENVELOPE_SCHEMA_VERSION + ) + assert adk["app_name"] == "test_app" + # A3: real Event.id (model_post_init auto-assigns a UUID). + assert adk["source_event_id"] == event.id + assert len(event.id) == 36 # sanity + # C2: branch passthrough. + assert adk["branch"] == "branch-x" + # C1: node defaults to path="" with parent_path=null (no synthesis). + assert adk["node"]["path"] == "" + assert adk["node"]["parent_path"] is None + + @pytest.mark.asyncio + async def test_envelope_node_with_parent_path( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """C1: parent_path is everything before the final '/segment@run_id'.""" + event = event_lib.Event( + author="agent_b", + actions=event_actions_lib.EventActions(state_delta={"k": "v"}), + ) + event.node_info.path = "wf/A@1/B@2" + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + log_entry = await _get_captured_event_dict_async( + mock_write_client, dummy_arrow_schema + ) + adk = json.loads(log_entry["attributes"])["adk"] + assert adk["node"]["path"] == "wf/A@1/B@2" + assert adk["node"]["parent_path"] == "wf/A@1" + + +class TestC4AgentTransfer: + + @pytest.mark.asyncio + async def test_agent_transfer_emits_from_to_payload( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + event = event_lib.Event( + author="root_agent", + actions=event_actions_lib.EventActions( + transfer_to_agent="specialist_agent" + ), + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + transfers = [r for r in rows if r["event_type"] == "AGENT_TRANSFER"] + assert len(transfers) == 1 + content = json.loads(transfers[0]["content"]) + assert content == { + "from_agent": "root_agent", + "to_agent": "specialist_agent", + } + + +class TestC5EventCompaction: + + @pytest.mark.asyncio + async def test_event_compaction_preserves_float_precision( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """C5: fractional float-epoch seconds must survive the producer.""" + compaction = event_actions_lib.EventCompaction( + start_timestamp=1700000000.125, + end_timestamp=1700000003.875, + compacted_content=types.Content( + role="model", parts=[types.Part(text="summary")] + ), + ) + event = event_lib.Event( + author="agent", + actions=event_actions_lib.EventActions(compaction=compaction), + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + compactions = [r for r in rows if r["event_type"] == "EVENT_COMPACTION"] + assert len(compactions) == 1 + content = json.loads(compactions[0]["content"]) + assert content["start_timestamp"] == 1700000000.125 + assert content["end_timestamp"] == 1700000003.875 + assert content["start_timestamp"] != int(content["start_timestamp"]) + + +class TestC6AgentStateCheckpoint: + + @pytest.mark.asyncio + async def test_checkpoint_state_only( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """{agent_state: {...}, end_of_agent: None} emits a CHECKPOINT row.""" + event = event_lib.Event( + author="agent", + actions=event_actions_lib.EventActions( + agent_state={"step": 3, "ctx": "abc"} + ), + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + cps = [r for r in rows if r["event_type"] == "AGENT_STATE_CHECKPOINT"] + assert len(cps) == 1 + content = json.loads(cps[0]["content"]) + assert content["agent_state"] == {"step": 3, "ctx": "abc"} + assert content["end_of_agent"] is False + + @pytest.mark.asyncio + async def test_checkpoint_end_of_agent_only( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """{agent_state: None, end_of_agent: True} is a valid CHECKPOINT shape.""" + event = event_lib.Event( + author="agent", + actions=event_actions_lib.EventActions(end_of_agent=True), + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + cps = [r for r in rows if r["event_type"] == "AGENT_STATE_CHECKPOINT"] + assert len(cps) == 1 + content = json.loads(cps[0]["content"]) + assert content["agent_state"] is None + assert content["end_of_agent"] is True + + @pytest.mark.asyncio + async def test_checkpoint_carries_real_source_event_id( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """v3 regression guard: Event.model_post_init auto-assigns id, so a + checkpoint Event constructed without explicit id still surfaces a real + 36-char UUID in attributes.adk.source_event_id.""" + event = event_lib.Event( + author="agent", + actions=event_actions_lib.EventActions(end_of_agent=True), + ) + assert event.id and len(event.id) == 36 + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + cps = [r for r in rows if r["event_type"] == "AGENT_STATE_CHECKPOINT"] + assert len(cps) == 1 + adk = json.loads(cps[0]["attributes"])["adk"] + assert adk["source_event_id"] == event.id + + +class TestC7ToolPauseAndComplete: + + @pytest.mark.asyncio + async def test_tool_paused_non_hitl_pause_kind_tool( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + fc = types.FunctionCall( + id="call-1", name="long_running_search", args={"q": "x"} + ) + event = event_lib.Event( + author="agent", + content=types.Content( + role="model", parts=[types.Part(function_call=fc)] + ), + long_running_tool_ids={"call-1"}, + actions=event_actions_lib.EventActions(), + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + pauses = [r for r in rows if r["event_type"] == "TOOL_PAUSED"] + assert len(pauses) == 1 + adk = json.loads(pauses[0]["attributes"])["adk"] + extras = json.loads(pauses[0]["attributes"]) + assert extras["pause_kind"] == "tool" + assert extras["function_call_id"] == "call-1" + + @pytest.mark.asyncio + async def test_tool_paused_hitl_pause_kind( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """C7: HITL long-running call → pause_kind derived from NAME, not id.""" + fc = types.FunctionCall( + id="call-hitl-1", name="adk_request_confirmation", args={} + ) + event = event_lib.Event( + author="agent", + content=types.Content( + role="model", parts=[types.Part(function_call=fc)] + ), + long_running_tool_ids={"call-hitl-1"}, + actions=event_actions_lib.EventActions(), + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + pauses = [r for r in rows if r["event_type"] == "TOOL_PAUSED"] + assert len(pauses) == 1 + extras = json.loads(pauses[0]["attributes"]) + assert extras["pause_kind"] == "hitl_confirmation" + assert extras["function_call_id"] == "call-hitl-1" + + @pytest.mark.asyncio + async def test_user_message_function_response_emits_tool_completed( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """C7: non-HITL function_response in a user message → TOOL_COMPLETED + with pause_kind='tool' (this is the long-running resume path).""" + fr = types.FunctionResponse( + id="call-1", name="long_running_search", response={"hits": 7} + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_user_message_callback( + invocation_context=invocation_context, + user_message=types.Content( + role="user", parts=[types.Part(function_response=fr)] + ), + ) + await asyncio.sleep(0.01) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + completed = [r for r in rows if r["event_type"] == "TOOL_COMPLETED"] + assert len(completed) == 1 + extras = json.loads(completed[0]["attributes"]) + assert extras["pause_kind"] == "tool" + assert extras["function_call_id"] == "call-1" + + @pytest.mark.asyncio + async def test_hitl_user_message_does_not_emit_tool_completed( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """C7 HITL non-routing: an adk_request_confirmation function_response in + a user message emits ONLY HITL_CONFIRMATION_REQUEST_COMPLETED, never + TOOL_COMPLETED.""" + fr = types.FunctionResponse( + id="call-hitl-1", + name="adk_request_confirmation", + response={"approved": True}, + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_user_message_callback( + invocation_context=invocation_context, + user_message=types.Content( + role="user", parts=[types.Part(function_response=fr)] + ), + ) + await asyncio.sleep(0.01) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + types_emitted = {r["event_type"] for r in rows} + assert "HITL_CONFIRMATION_REQUEST_COMPLETED" in types_emitted + assert "TOOL_COMPLETED" not in types_emitted + + +class TestC8ActionAttributes: + + @pytest.mark.asyncio + async def test_route_and_rewind_flat_under_attributes_adk( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """C8 (#203): route / rewind_before_invocation_id mirror under + attributes.adk.* (flat-with-prefix, NOT nested under .actions.).""" + event = event_lib.Event( + author="agent", + actions=event_actions_lib.EventActions( + state_delta={"k": "v"}, # to ensure an emit happens + route="branch_b", + rewind_before_invocation_id="inv-earlier", + ), + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + log_entry = await _get_captured_event_dict_async( + mock_write_client, dummy_arrow_schema + ) + adk = json.loads(log_entry["attributes"])["adk"] + # Flat-with-prefix per #203 / #293 v5. + assert adk["route"] == "branch_b" + assert adk["rewind_before_invocation_id"] == "inv-earlier" + # Not nested under .actions. + assert "actions" not in adk