From 119b8ea1a4ab8887296ae45cb10dacf0fb81acd1 Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Mon, 8 Jun 2026 00:53:16 -0700 Subject: [PATCH 1/5] feat(bqaa): ADK 2.0 minimum producer cut for #293 v5 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the customer-driven mid-June producer-only subset of the ADK 2.0 observability work tracked in #293 (parent #190). The customer needs these specific fields visible in BigQuery before their ADK 2.0 production cutover; full v15 contract (#190) lands incrementally. This change is producer-only. No consumer-side SDK / typed-view work is included — the customer reads base-table JSON directly during the mid-June window. What lands ---------- A1/A2 — every ADK-enriched row carries the attributes.adk envelope: attributes.adk.schema_version (_ADK_ENVELOPE_SCHEMA_VERSION = "1") attributes.adk.app_name (from InvocationContext.session.app_name) A3 — rows from on_event_callback additionally carry: attributes.adk.source_event_id (Event.id, reliable join key) Note: never fabricated — callback rows without an originating Event leave it JSON-null. C1 — attributes.adk.node = {path, run_id, parent_path}. parent_path is derived from path; default-empty path (NodeInfo.path = "") is preserved verbatim with parent_path = null, no synthesis. C2 — attributes.adk.branch (absent stays JSON null). C3 — attributes.adk.scope = null | {id, kind} per #198 / #293 v5 derivation order: (1) None → null, (2) name@run_id / path/name@run_id → node_run, (3) any other non-empty string → function_call (model-provided FC IDs match here), (4) empty/non-string → unknown with warning. C4 — emit AGENT_TRANSFER from event.actions.transfer_to_agent. Payload pinned: from_agent = event.author, to_agent = the target. Verified against EventActions.transfer_to_agent which stores the target only. C5 — emit EVENT_COMPACTION from event.actions.compaction. Float start_timestamp / end_timestamp preserved with fractional precision (consumer view conversion deferred). C6 — emit AGENT_STATE_CHECKPOINT when actions.agent_state is not None OR actions.end_of_agent is True. Allows {agent_state: null, end_of_agent: true} payloads. Inline payload only; GCS offload for oversized state deferred. C7 — emit TOOL_PAUSED for each event.long_running_tool_ids id, with attributes.pause_kind (derived from function_call NAME via _HITL_PAUSE_KIND_MAP — hitl_* for adk_request_*, tool otherwise) and attributes.function_call_id. HITL routing is unchanged: HITL function_responses stay on HITL_*_COMPLETED, NEVER emit TOOL_COMPLETED. Non-HITL function_responses arriving via on_user_message_callback emit TOOL_COMPLETED with pause_kind='tool' so the customer can pair (TOOL_PAUSED ↔ TOOL_COMPLETED) on (app_name, user_id, session_id, function_call_id) directly in SQL. Pause registry / pause_orphan semantics deferred to #206. C8 — attributes.adk.{route, render_ui_widgets, rewind_before_invocation_id} mirror EventActions, flat-with-prefix per #203 (matches the rest of the attributes.adk.* envelope convention). D1 — delete the deprecated on_state_change_callback stub (never called by ADK 2.0; verified no callers). Compatibility ------------- * AGENT_RESPONSE retains the legacy flat extras (source_event_id, source_event_author, source_event_branch) for backward compat. The canonical keys are now under attributes.adk.*. * The HITL test fixtures use Mock events without long_running_tool_ids or .id; the envelope helper is defensive against missing attrs. * No EventData / _log_event signature change. Added one optional field EventData.source_event: Optional[Event] = None — a minimal B0 (#194) step. Callbacks that have access to the source Event pass it through; others leave it None (and the envelope correctly leaves A3/C1/C2/C3 null on those rows). Tests ----- 257 plugin tests pass (238 existing + 19 new): * envelope shape on event-originating and non-event-originating rows * node parent_path derivation with both empty and nested paths * _derive_scope for None, bare node, path/node, FC IDs, empty string * C4/C5/C6 emit paths * C5 fractional float-epoch precision round-trip * C6 both-shape coverage + id-stabilization regression guard (Event.model_post_init auto-assigns id even when constructor omits it) * C7 TOOL_PAUSED pause_kind derivation for non-HITL and HITL * C7 HITL non-routing: HITL function_response → HITL_*_COMPLETED only, NEVER TOOL_COMPLETED * C7 user-message TOOL_COMPLETED with pause_kind='tool' * C8 flat-with-prefix route / rewind_before_invocation_id * D1: on_state_change_callback removed from the public surface Refs: #293 (v5), #190, #194, #196, #197, #198, #199, #200, #201, #202, --- .../bigquery_agent_analytics_plugin.py | 363 ++++++++++++-- .../test_bigquery_agent_analytics_plugin.py | 470 ++++++++++++++++++ 2 files changed, 790 insertions(+), 43 deletions(-) diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 0e1dddcec3..425d3bc89e 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -73,6 +73,7 @@ if TYPE_CHECKING: from ..agents.invocation_context import InvocationContext + from ..events.event import Event logger: logging.Logger = logging.getLogger("google_adk." + __name__) tracer = trace.get_tracer( @@ -84,12 +85,57 @@ _SCHEMA_VERSION = "1" _SCHEMA_VERSION_LABEL_KEY = "adk_schema_version" +# ADK 2.0 envelope version. Stamped onto every ADK-enriched row as +# ``attributes.adk.schema_version``. Independent of the BigQuery row +# schema version above — this names the producer's ADK 2.0 attribute +# contract so downstream consumers can gate on it. +_ADK_ENVELOPE_SCHEMA_VERSION = "1" + _HITL_EVENT_MAP = MappingProxyType({ "adk_request_credential": "HITL_CREDENTIAL_REQUEST", "adk_request_confirmation": "HITL_CONFIRMATION_REQUEST", "adk_request_input": "HITL_INPUT_REQUEST", }) +# Reverse of _HITL_EVENT_MAP for the C7 pause_kind discriminator. The +# id→name lookup in #199/#293 v5 routes ``adk_request_credential`` → +# ``hitl_credential`` etc.; everything else is ``tool``. +_HITL_PAUSE_KIND_MAP = MappingProxyType({ + "adk_request_credential": "hitl_credential", + "adk_request_confirmation": "hitl_confirmation", + "adk_request_input": "hitl_input", +}) + + +def _derive_scope( + isolation_scope: Optional[str], +) -> Optional[dict[str, str]]: + """Derives ``attributes.adk.scope`` per the #198 / #293 v5 rule. + + Order is fixed: (1) None → null; (2) node-shape (``name@run_id`` or + ``parent/name@run_id``) → ``node_run``; (3) any other non-empty + string → ``function_call`` (model-provided FC IDs like ``call_*`` and + ``toolu_*`` legitimately match here); (4) empty/non-string → ``unknown`` + with a warning. Steps 2 and 3 are intentionally ordered: a bare + ``name@run_id`` must classify as ``node_run`` first, not as + ``function_call`` by fall-through. + """ + if isolation_scope is None: + return None + if not isinstance(isolation_scope, str) or not isolation_scope: + logger.warning( + "Unexpected isolation_scope shape: %r; classifying as 'unknown'", + isolation_scope, + ) + return {"id": str(isolation_scope), "kind": "unknown"} + # Node-shape: last segment contains '@'. The full string may also be + # path-prefixed (e.g. ``wf/A@1/B@2``). + last_segment = isolation_scope.rsplit("/", 1)[-1] + if "@" in last_segment: + return {"id": isolation_scope, "kind": "node_run"} + return {"id": isolation_scope, "kind": "function_call"} + + # Track all living plugin instances so the fork handler can reset # them proactively in the child, before _ensure_started runs. _LIVE_PLUGINS: weakref.WeakSet = weakref.WeakSet() @@ -1968,6 +2014,12 @@ class EventData: error_message: Optional[str] = None extra_attributes: dict[str, Any] = field(default_factory=dict) trace_id_override: Optional[str] = None + # ADK 2.0 envelope: callbacks that hold the source Event pass it here + # so ``_log_event`` can stamp ``attributes.adk.{source_event_id, node, + # branch, scope, ...}``. Leave None for rows that don't originate from + # an Event — the envelope helper writes those attributes as null + # rather than synthesizing fake identity (per #293 v5 A3 contract). + source_event: Optional["Event"] = None class BigQueryAgentAnalyticsPlugin(BasePlugin): @@ -2754,6 +2806,108 @@ def _extract_latency( latency_json["time_to_first_token_ms"] = event_data.time_to_first_token_ms return latency_json or None + def _build_adk_envelope( + self, + callback_context: CallbackContext, + source_event: Optional["Event"], + ) -> dict[str, Any]: + """Builds the ``attributes.adk`` envelope per #293 v5. + + A1 / A2 (``schema_version``, ``app_name``) stamp on every ADK-enriched + row regardless of origin. A3 / C1 / C2 / C3 (``source_event_id``, + ``node``, ``branch``, ``scope``) and C8 (``route``, + ``render_ui_widgets``, ``rewind_before_invocation_id``) only stamp + when a source Event is provided — callback-only rows leave them + JSON null rather than synthesizing fake identity. + """ + adk: dict[str, Any] = { + "schema_version": _ADK_ENVELOPE_SCHEMA_VERSION, + } + try: + adk["app_name"] = callback_context._invocation_context.session.app_name + except Exception: + adk["app_name"] = None + + if source_event is None: + return adk + + # Every getattr below is defensive: source_event is "anything the + # caller hands us", which in test suites can be a Mock. Best-effort + # enrichment means "leave null on missing attrs", never crash the + # row. + try: + source_event_id = getattr(source_event, "id", None) + if source_event_id: + adk["source_event_id"] = source_event_id # A3 + except Exception: + pass + + # C1: node = {path, run_id, parent_path}. NodeInfo.path defaults to + # the empty string in current ADK (events/event.py:45); preserve it + # verbatim and emit parent_path = null rather than synthesizing a + # fake workflow node from an empty path. + try: + node_info = getattr(source_event, "node_info", None) + if node_info is not None and hasattr(node_info, "path"): + path = getattr(node_info, "path", "") or "" + run_id = getattr(node_info, "run_id", None) + parent_path = None + if path and "/" in path: + parent_path = path.rsplit("/", 1)[0] + adk["node"] = { + "path": path, + "run_id": run_id, + "parent_path": parent_path, + } + except Exception: + pass + + # C2: branch — absent stays JSON null (no sentinel string). + try: + if hasattr(source_event, "branch"): + adk["branch"] = source_event.branch + except Exception: + pass + + # C3: scope shape derivation (per #190 v5 / #198). Order matters: + # node-shape patterns must be checked before falling through to + # function_call so bare ``name@run_id`` doesn't misclassify. + try: + if hasattr(source_event, "isolation_scope"): + adk["scope"] = _derive_scope(source_event.isolation_scope) + except Exception: + pass + + # C8: raw EventActions mirror (flat under attributes.adk per #203). + # Stamp only when actually set so JSON doesn't bloat with nulls. + try: + actions = getattr(source_event, "actions", None) + except Exception: + actions = None + if actions is not None: + try: + route = getattr(actions, "route", None) + if route is not None: + adk["route"] = route + except Exception: + pass + try: + widgets = getattr(actions, "render_ui_widgets", None) + if widgets is not None: + adk["render_ui_widgets"] = [ + w.model_dump() if hasattr(w, "model_dump") else w for w in widgets + ] + except Exception: + pass + try: + rewind = getattr(actions, "rewind_before_invocation_id", None) + if rewind is not None: + adk["rewind_before_invocation_id"] = rewind + except Exception: + pass + + return adk + def _enrich_attributes( self, event_data: EventData, @@ -2763,12 +2917,15 @@ def _enrich_attributes( Reads ``model``, ``model_version``, and ``usage_metadata`` from *event_data*, copies ``extra_attributes``, then adds session metadata - and custom tags. + and custom tags. Also stamps the ``adk`` envelope (#293 v5). Returns: A new dict ready for JSON serialization into the attributes column. """ attrs: dict[str, Any] = dict(event_data.extra_attributes) + attrs["adk"] = self._build_adk_envelope( + callback_context, event_data.source_event + ) attrs["root_agent_name"] = TraceManager.get_root_agent_name() if event_data.model: @@ -2921,9 +3078,14 @@ async def on_user_message_callback( ) -> None: """Parity with V1: Logs USER_MESSAGE_RECEIVED event. - Also detects HITL completion responses (user-sent - ``FunctionResponse`` parts with ``adk_request_*`` names) and emits - dedicated ``HITL_*_COMPLETED`` events. + Also detects: + * HITL completion responses (user-sent ``FunctionResponse`` parts + with ``adk_request_*`` names) → ``HITL_*_COMPLETED``. + * Non-HITL ``FunctionResponse`` parts from a user message → these + are the long-running tool completions for tools that paused via + ``TOOL_PAUSED``. Emitted as ``TOOL_COMPLETED`` with + ``pause_kind = 'tool'`` and ``function_call_id`` so the customer + can join the pair from BigQuery (#293 v5 C7 pair-key subset). Args: invocation_context: The context of the current invocation. @@ -2937,26 +3099,49 @@ async def on_user_message_callback( raw_content=user_message, ) - # Detect HITL completion responses in the user message. + # Detect completion responses in the user message. if user_message and user_message.parts: for part in user_message.parts: - if part.function_response: - hitl_event = _HITL_EVENT_MAP.get(part.function_response.name) - if hitl_event: - resp_truncated, is_truncated = _recursive_smart_truncate( - part.function_response.response or {}, - self.config.max_content_length, - ) - content_dict = { - "tool": part.function_response.name, - "result": resp_truncated, - } - await self._log_event( - hitl_event + "_COMPLETED", - callback_ctx, - raw_content=content_dict, - is_truncated=is_truncated, - ) + if not part.function_response: + continue + hitl_event = _HITL_EVENT_MAP.get(part.function_response.name) + resp_truncated, is_truncated = _recursive_smart_truncate( + part.function_response.response or {}, + self.config.max_content_length, + ) + content_dict = { + "tool": part.function_response.name, + "result": resp_truncated, + } + if hitl_event: + # HITL completions stay on the HITL_*_COMPLETED stream — they + # MUST NOT also emit TOOL_COMPLETED (per #293 v5 C7). + await self._log_event( + hitl_event + "_COMPLETED", + callback_ctx, + raw_content=content_dict, + is_truncated=is_truncated, + ) + else: + # Non-HITL function_response arriving via a user message is + # by construction a long-running tool completion: regular + # tool calls complete inside the agent run via + # after_tool_callback, so a function_response inside a user + # message is the resume side of a previously-paused tool. + # Stamp the pair keys; pause_orphan / registry semantics are + # deferred to #206. + await self._log_event( + "TOOL_COMPLETED", + callback_ctx, + raw_content=content_dict, + is_truncated=is_truncated, + event_data=EventData( + extra_attributes={ + "pause_kind": "tool", + "function_call_id": part.function_response.id, + }, + ), + ) @_safe_callback async def on_event_callback( @@ -2999,11 +3184,84 @@ async def on_event_callback( "STATE_DELTA", callback_ctx, event_data=EventData( - extra_attributes={"state_delta": dict(event.actions.state_delta)} + source_event=event, + extra_attributes={"state_delta": dict(event.actions.state_delta)}, ), ) - # --- HITL event logging --- + # --- AGENT_TRANSFER (C4 / #200) --- + # actions.transfer_to_agent stores the *target* agent only + # (events/event_actions.py:75); from_agent is pinned to event.author + # per #293 v5 C4. Never fabricate authors on non-Event paths. + if event.actions.transfer_to_agent: + await self._log_event( + "AGENT_TRANSFER", + callback_ctx, + raw_content={ + "from_agent": event.author, + "to_agent": event.actions.transfer_to_agent, + }, + event_data=EventData(source_event=event), + ) + + # --- EVENT_COMPACTION (C5 / #201) --- + # EventCompaction.start_timestamp / end_timestamp are float epoch + # seconds. Preserve fractional precision here; consumer view + # conversion is deferred. + compaction = event.actions.compaction + if compaction is not None: + compacted_content, compaction_truncated = self._format_content_safely( + compaction.compacted_content + ) + await self._log_event( + "EVENT_COMPACTION", + callback_ctx, + raw_content={ + "start_timestamp": compaction.start_timestamp, + "end_timestamp": compaction.end_timestamp, + "compacted_content": compacted_content, + }, + is_truncated=compaction_truncated, + event_data=EventData(source_event=event), + ) + + # --- AGENT_STATE_CHECKPOINT (C6 / #202) --- + # Fires when *either* agent_state is set or end_of_agent is True; + # supports {agent_state: None, end_of_agent: True} payloads. + # Inline payload only — oversized-state GCS offload deferred. + if ( + event.actions.agent_state is not None + or event.actions.end_of_agent is True + ): + agent_state_dict, agent_state_truncated = ( + _recursive_smart_truncate( + event.actions.agent_state, + self.config.max_content_length, + ) + if event.actions.agent_state is not None + else (None, False) + ) + await self._log_event( + "AGENT_STATE_CHECKPOINT", + callback_ctx, + raw_content={ + "agent_state": agent_state_dict, + "end_of_agent": bool(event.actions.end_of_agent), + }, + is_truncated=agent_state_truncated, + event_data=EventData(source_event=event), + ) + + # --- HITL + TOOL_PAUSED (C7 / #199 — pair-key subset) + per-part + # iteration over event.content.parts --- + # TOOL_PAUSED fires per long_running_tool_id; pause_kind is derived + # via the id→name lookup against _HITL_PAUSE_KIND_MAP, so a HITL + # long-running call carries pause_kind = 'hitl_*' and a regular + # long-running tool carries pause_kind = 'tool'. function_call_id + # joins to the downstream TOOL_COMPLETED via the user message path. + # Use getattr so the existing Mock-based HITL test fixtures still + # work — they construct events without setting long_running_tool_ids. + long_running_ids = set(getattr(event, "long_running_tool_ids", None) or ()) if event.content and event.content.parts: for part in event.content.parts: # Detect HITL function calls (request events). @@ -3023,8 +3281,38 @@ async def on_event_callback( callback_ctx, raw_content=content_dict, is_truncated=is_truncated, + event_data=EventData(source_event=event), + ) + # C7: per-id TOOL_PAUSED emit. pause_kind derives from the + # function_call NAME (#199 / #293 v5) — looking it up against + # the id value would misclassify every HITL pause as 'tool'. + if part.function_call.id in long_running_ids: + pause_kind = _HITL_PAUSE_KIND_MAP.get( + part.function_call.name, "tool" + ) + args_truncated, is_truncated = _recursive_smart_truncate( + part.function_call.args or {}, + self.config.max_content_length, + ) + await self._log_event( + "TOOL_PAUSED", + callback_ctx, + raw_content={ + "tool": part.function_call.name, + "args": args_truncated, + }, + is_truncated=is_truncated, + event_data=EventData( + source_event=event, + extra_attributes={ + "pause_kind": pause_kind, + "function_call_id": part.function_call.id, + }, + ), ) - # Detect HITL function responses (completion events). + # Detect HITL function responses (completion events). HITL + # function responses route ONLY here, never to TOOL_COMPLETED + # (per #293 v5 C7 / verified at this file's HITL test suite). if part.function_response: hitl_event = _HITL_EVENT_MAP.get(part.function_response.name) if hitl_event: @@ -3041,6 +3329,7 @@ async def on_event_callback( callback_ctx, raw_content=content_dict, is_truncated=is_truncated, + event_data=EventData(source_event=event), ) # --- A2A interaction logging --- @@ -3076,6 +3365,7 @@ async def on_event_callback( raw_content=content_dict, is_truncated=is_truncated or content_truncated, event_data=EventData( + source_event=event, extra_attributes={ "a2a_metadata": a2a_truncated, }, @@ -3112,12 +3402,17 @@ async def on_event_callback( role=event.content.role, parts=visible_parts ) formatted, truncated = self._format_content_safely(visible_content) + # source_event=event carries the ADK envelope (A3 / node / + # branch / scope). The flat ``source_event_*`` extras are + # retained for backward compat with existing AGENT_RESPONSE + # consumers; the canonical keys are under ``attributes.adk.*``. await self._log_event( "AGENT_RESPONSE", callback_ctx, raw_content={"response": formatted}, is_truncated=truncated, event_data=EventData( + source_event=event, extra_attributes={ "source_event_id": event.id, "source_event_author": event.author, @@ -3128,24 +3423,6 @@ async def on_event_callback( return None - async def on_state_change_callback( - self, - *, - callback_context: CallbackContext, - state_delta: dict[str, Any], - ) -> None: - """Deprecated: use on_event_callback instead. - - This method is retained for API compatibility but is never invoked - by the framework (not in BasePlugin, PluginManager, or Runner). - State deltas are now captured via on_event_callback. - """ - logger.warning( - "on_state_change_callback is deprecated and never called by" - " the framework. State deltas are captured via" - " on_event_callback." - ) - @_safe_callback async def before_run_callback( self, *, invocation_context: "InvocationContext" diff --git a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py index 46cd233ef2..d072929aa3 100644 --- a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py +++ b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py @@ -8005,3 +8005,473 @@ async def test_skips_executable_code_only_events( ) await asyncio.sleep(0.05) assert mock_write_client.append_rows.call_count == 0 + + + +# ----------------------------------------------------------------------------- +# ADK 2.0 minimum producer cut (#293 v5) +# +# Coverage matrix: +# A1 / A2 attributes.adk.{schema_version, app_name} on every row +# A3 attributes.adk.source_event_id on Event-originating rows +# C1 attributes.adk.node {path, run_id, parent_path} +# C2 attributes.adk.branch +# C3 attributes.adk.scope {id, kind} +# C4 AGENT_TRANSFER emit +# C5 EVENT_COMPACTION emit (preserves fractional float epoch) +# C6 AGENT_STATE_CHECKPOINT emit (both shapes) + id-stabilization +# C7 TOOL_PAUSED with pause_kind / function_call_id +# HITL non-routing to TOOL_COMPLETED +# user-message TOOL_COMPLETED with pause_kind='tool' +# C8 attributes.adk.{route, render_ui_widgets, rewind_before_invocation_id} +# D1 on_state_change_callback removed +# ----------------------------------------------------------------------------- + + +def test_derive_scope_unscoped(): + """C3: None isolation_scope → scope = null.""" + assert bigquery_agent_analytics_plugin._derive_scope(None) is None + + +def test_derive_scope_node_run_bare(): + """C3: bare 'name@run_id' classifies as node_run (not function_call).""" + scope = bigquery_agent_analytics_plugin._derive_scope("loopA@42") + assert scope == {"id": "loopA@42", "kind": "node_run"} + + +def test_derive_scope_node_run_path(): + """C3: 'parent/name@run_id' classifies as node_run.""" + scope = bigquery_agent_analytics_plugin._derive_scope("wf/A@1/B@2") + assert scope == {"id": "wf/A@1/B@2", "kind": "node_run"} + + +def test_derive_scope_function_call_provider_id(): + """C3: model-provided FC IDs (call_*, toolu_*) classify as function_call.""" + for fc_id in ("call_abc123", "toolu_xyz", "adk-fc-1"): + scope = bigquery_agent_analytics_plugin._derive_scope(fc_id) + assert scope == {"id": fc_id, "kind": "function_call"}, fc_id + + +def test_derive_scope_empty_string_unknown(): + """C3: empty/non-string anomalies classify as unknown.""" + scope = bigquery_agent_analytics_plugin._derive_scope("") + assert scope == {"id": "", "kind": "unknown"} + + +def test_d1_on_state_change_callback_removed(): + """D1: the deprecated stub is gone from the public surface.""" + assert not hasattr( + bigquery_agent_analytics_plugin.BigQueryAgentAnalyticsPlugin, + "on_state_change_callback", + ) + + +class TestAdkEnvelope: + """A1 / A2 / A3 / C1 / C2 / C3 / C8 envelope shape on emitted rows.""" + + @pytest.mark.asyncio + async def test_envelope_on_non_event_row( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """USER_MESSAGE_RECEIVED has no source Event → A1/A2 only, A3/C1/C2/C3 null.""" + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_user_message_callback( + invocation_context=invocation_context, + user_message=types.Content(role="user", parts=[types.Part(text="hi")]), + ) + await asyncio.sleep(0.01) + log_entry = await _get_captured_event_dict_async( + mock_write_client, dummy_arrow_schema + ) + _assert_common_fields(log_entry, "USER_MESSAGE_RECEIVED") + attributes = json.loads(log_entry["attributes"]) + adk = attributes["adk"] + # A1: schema_version always present. + assert adk["schema_version"] == ( + bigquery_agent_analytics_plugin._ADK_ENVELOPE_SCHEMA_VERSION + ) + # A2: app_name always present (from session). + assert adk["app_name"] == "test_app" + # A3 / C1 / C2 / C3 absent on rows without an originating Event. + assert "source_event_id" not in adk + assert "node" not in adk + assert "branch" not in adk + assert "scope" not in adk + + @pytest.mark.asyncio + async def test_envelope_on_event_row( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """STATE_DELTA from on_event_callback carries the full envelope.""" + state_delta = {"k": "v"} + event = event_lib.Event( + author="agent_a", + branch="branch-x", + actions=event_actions_lib.EventActions(state_delta=state_delta), + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + log_entry = await _get_captured_event_dict_async( + mock_write_client, dummy_arrow_schema + ) + _assert_common_fields(log_entry, "STATE_DELTA") + attributes = json.loads(log_entry["attributes"]) + adk = attributes["adk"] + assert adk["schema_version"] == ( + bigquery_agent_analytics_plugin._ADK_ENVELOPE_SCHEMA_VERSION + ) + assert adk["app_name"] == "test_app" + # A3: real Event.id (model_post_init auto-assigns a UUID). + assert adk["source_event_id"] == event.id + assert len(event.id) == 36 # sanity + # C2: branch passthrough. + assert adk["branch"] == "branch-x" + # C1: node defaults to path="" with parent_path=null (no synthesis). + assert adk["node"]["path"] == "" + assert adk["node"]["parent_path"] is None + + @pytest.mark.asyncio + async def test_envelope_node_with_parent_path( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """C1: parent_path is everything before the final '/segment@run_id'.""" + event = event_lib.Event( + author="agent_b", + actions=event_actions_lib.EventActions(state_delta={"k": "v"}), + ) + event.node_info.path = "wf/A@1/B@2" + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + log_entry = await _get_captured_event_dict_async( + mock_write_client, dummy_arrow_schema + ) + adk = json.loads(log_entry["attributes"])["adk"] + assert adk["node"]["path"] == "wf/A@1/B@2" + assert adk["node"]["parent_path"] == "wf/A@1" + + +class TestC4AgentTransfer: + + @pytest.mark.asyncio + async def test_agent_transfer_emits_from_to_payload( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + event = event_lib.Event( + author="root_agent", + actions=event_actions_lib.EventActions( + transfer_to_agent="specialist_agent" + ), + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + transfers = [r for r in rows if r["event_type"] == "AGENT_TRANSFER"] + assert len(transfers) == 1 + content = json.loads(transfers[0]["content"]) + assert content == { + "from_agent": "root_agent", + "to_agent": "specialist_agent", + } + + +class TestC5EventCompaction: + + @pytest.mark.asyncio + async def test_event_compaction_preserves_float_precision( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """C5: fractional float-epoch seconds must survive the producer.""" + compaction = event_actions_lib.EventCompaction( + start_timestamp=1700000000.125, + end_timestamp=1700000003.875, + compacted_content=types.Content( + role="model", parts=[types.Part(text="summary")] + ), + ) + event = event_lib.Event( + author="agent", + actions=event_actions_lib.EventActions(compaction=compaction), + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + compactions = [r for r in rows if r["event_type"] == "EVENT_COMPACTION"] + assert len(compactions) == 1 + content = json.loads(compactions[0]["content"]) + assert content["start_timestamp"] == 1700000000.125 + assert content["end_timestamp"] == 1700000003.875 + assert content["start_timestamp"] != int(content["start_timestamp"]) + + +class TestC6AgentStateCheckpoint: + + @pytest.mark.asyncio + async def test_checkpoint_state_only( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """{agent_state: {...}, end_of_agent: None} emits a CHECKPOINT row.""" + event = event_lib.Event( + author="agent", + actions=event_actions_lib.EventActions( + agent_state={"step": 3, "ctx": "abc"} + ), + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + cps = [r for r in rows if r["event_type"] == "AGENT_STATE_CHECKPOINT"] + assert len(cps) == 1 + content = json.loads(cps[0]["content"]) + assert content["agent_state"] == {"step": 3, "ctx": "abc"} + assert content["end_of_agent"] is False + + @pytest.mark.asyncio + async def test_checkpoint_end_of_agent_only( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """{agent_state: None, end_of_agent: True} is a valid CHECKPOINT shape.""" + event = event_lib.Event( + author="agent", + actions=event_actions_lib.EventActions(end_of_agent=True), + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + cps = [r for r in rows if r["event_type"] == "AGENT_STATE_CHECKPOINT"] + assert len(cps) == 1 + content = json.loads(cps[0]["content"]) + assert content["agent_state"] is None + assert content["end_of_agent"] is True + + @pytest.mark.asyncio + async def test_checkpoint_carries_real_source_event_id( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """v3 regression guard: Event.model_post_init auto-assigns id, so a + checkpoint Event constructed without explicit id still surfaces a real + 36-char UUID in attributes.adk.source_event_id.""" + event = event_lib.Event( + author="agent", + actions=event_actions_lib.EventActions(end_of_agent=True), + ) + assert event.id and len(event.id) == 36 + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + cps = [r for r in rows if r["event_type"] == "AGENT_STATE_CHECKPOINT"] + assert len(cps) == 1 + adk = json.loads(cps[0]["attributes"])["adk"] + assert adk["source_event_id"] == event.id + + +class TestC7ToolPauseAndComplete: + + @pytest.mark.asyncio + async def test_tool_paused_non_hitl_pause_kind_tool( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + fc = types.FunctionCall( + id="call-1", name="long_running_search", args={"q": "x"} + ) + event = event_lib.Event( + author="agent", + content=types.Content( + role="model", parts=[types.Part(function_call=fc)] + ), + long_running_tool_ids={"call-1"}, + actions=event_actions_lib.EventActions(), + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + pauses = [r for r in rows if r["event_type"] == "TOOL_PAUSED"] + assert len(pauses) == 1 + adk = json.loads(pauses[0]["attributes"])["adk"] + extras = json.loads(pauses[0]["attributes"]) + assert extras["pause_kind"] == "tool" + assert extras["function_call_id"] == "call-1" + + @pytest.mark.asyncio + async def test_tool_paused_hitl_pause_kind( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """C7: HITL long-running call → pause_kind derived from NAME, not id.""" + fc = types.FunctionCall( + id="call-hitl-1", name="adk_request_confirmation", args={} + ) + event = event_lib.Event( + author="agent", + content=types.Content( + role="model", parts=[types.Part(function_call=fc)] + ), + long_running_tool_ids={"call-hitl-1"}, + actions=event_actions_lib.EventActions(), + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + pauses = [r for r in rows if r["event_type"] == "TOOL_PAUSED"] + assert len(pauses) == 1 + extras = json.loads(pauses[0]["attributes"]) + assert extras["pause_kind"] == "hitl_confirmation" + assert extras["function_call_id"] == "call-hitl-1" + + @pytest.mark.asyncio + async def test_user_message_function_response_emits_tool_completed( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """C7: non-HITL function_response in a user message → TOOL_COMPLETED + with pause_kind='tool' (this is the long-running resume path).""" + fr = types.FunctionResponse( + id="call-1", name="long_running_search", response={"hits": 7} + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_user_message_callback( + invocation_context=invocation_context, + user_message=types.Content( + role="user", parts=[types.Part(function_response=fr)] + ), + ) + await asyncio.sleep(0.01) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + completed = [r for r in rows if r["event_type"] == "TOOL_COMPLETED"] + assert len(completed) == 1 + extras = json.loads(completed[0]["attributes"]) + assert extras["pause_kind"] == "tool" + assert extras["function_call_id"] == "call-1" + + @pytest.mark.asyncio + async def test_hitl_user_message_does_not_emit_tool_completed( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """C7 HITL non-routing: an adk_request_confirmation function_response in + a user message emits ONLY HITL_CONFIRMATION_REQUEST_COMPLETED, never + TOOL_COMPLETED.""" + fr = types.FunctionResponse( + id="call-hitl-1", + name="adk_request_confirmation", + response={"approved": True}, + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_user_message_callback( + invocation_context=invocation_context, + user_message=types.Content( + role="user", parts=[types.Part(function_response=fr)] + ), + ) + await asyncio.sleep(0.01) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + types_emitted = {r["event_type"] for r in rows} + assert "HITL_CONFIRMATION_REQUEST_COMPLETED" in types_emitted + assert "TOOL_COMPLETED" not in types_emitted + + +class TestC8ActionAttributes: + + @pytest.mark.asyncio + async def test_route_and_rewind_flat_under_attributes_adk( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """C8 (#203): route / rewind_before_invocation_id mirror under + attributes.adk.* (flat-with-prefix, NOT nested under .actions.).""" + event = event_lib.Event( + author="agent", + actions=event_actions_lib.EventActions( + state_delta={"k": "v"}, # to ensure an emit happens + route="branch_b", + rewind_before_invocation_id="inv-earlier", + ), + ) + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + await asyncio.sleep(0.01) + log_entry = await _get_captured_event_dict_async( + mock_write_client, dummy_arrow_schema + ) + adk = json.loads(log_entry["attributes"])["adk"] + # Flat-with-prefix per #203 / #293 v5. + assert adk["route"] == "branch_b" + assert adk["rewind_before_invocation_id"] == "inv-earlier" + # Not nested under .actions. + assert "actions" not in adk From 236b790fdd2fa470f9c9a03e95c4cd6613d71b6d Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Mon, 8 Jun 2026 01:15:53 -0700 Subject: [PATCH 2/5] fix(bqaa): put C7 pair keys under attributes.adk (not top-level) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Caught in review of caohy1988/adk-python#6: the C7 pair keys (pause_kind, function_call_id) were being passed via EventData.extra_attributes, which _enrich_attributes() copies at the top of attrs *before* attrs["adk"] = _build_adk_envelope(...). That landed them at attributes.pause_kind / attributes.function_call_id, not attributes.adk.pause_kind / attributes.adk.function_call_id. The customer SQL pinned in #293 v5 acceptance #3 is: JSON_VALUE(attributes, '$.adk.function_call_id') = JSON_VALUE(...) so the pair join would have returned null on every row. This commit makes the contract match the SQL. Changes: * EventData gains adk_extras: dict[str, Any], a sibling of extra_attributes that lives INSIDE attributes.adk. * _enrich_attributes merges adk_extras into the envelope after _build_adk_envelope (envelope wins on conflict — producer-derived identity fields like source_event_id are the source of truth). * The two emit sites (TOOL_PAUSED in on_event_callback, TOOL_COMPLETED in on_user_message_callback) pass the pair keys via adk_extras= instead of extra_attributes=. * The three C7 tests are updated to assert json.loads(row["attributes"])["adk"]["pause_kind"] etc., locking in the right shape this time. Full plugin suite: 252 passed. --- .../bigquery_agent_analytics_plugin.py | 20 ++++++++++++++++--- .../test_bigquery_agent_analytics_plugin.py | 20 +++++++++---------- 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 425d3bc89e..17013accd7 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -2020,6 +2020,12 @@ class EventData: # an Event — the envelope helper writes those attributes as null # rather than synthesizing fake identity (per #293 v5 A3 contract). source_event: Optional["Event"] = None + # Producer-supplied extras that belong INSIDE ``attributes.adk`` (not + # at the top level of ``attributes``). C7's pair keys + # (``pause_kind`` / ``function_call_id``) ride here so consumer SQL + # like ``JSON_VALUE(attributes, '$.adk.function_call_id')`` lands at + # the right JSON path per #293 v5. + adk_extras: dict[str, Any] = field(default_factory=dict) class BigQueryAgentAnalyticsPlugin(BasePlugin): @@ -2923,9 +2929,17 @@ def _enrich_attributes( A new dict ready for JSON serialization into the attributes column. """ attrs: dict[str, Any] = dict(event_data.extra_attributes) - attrs["adk"] = self._build_adk_envelope( + adk_envelope = self._build_adk_envelope( callback_context, event_data.source_event ) + # Merge producer-supplied adk_extras (#293 v5 C7 pair keys etc.) + # INTO the adk envelope so consumer SQL on + # ``$.adk.pause_kind`` / ``$.adk.function_call_id`` resolves. + # adk_envelope wins on key conflict — producer-derived envelope + # is the source of truth for identity fields like source_event_id. + for k, v in event_data.adk_extras.items(): + adk_envelope.setdefault(k, v) + attrs["adk"] = adk_envelope attrs["root_agent_name"] = TraceManager.get_root_agent_name() if event_data.model: @@ -3136,7 +3150,7 @@ async def on_user_message_callback( raw_content=content_dict, is_truncated=is_truncated, event_data=EventData( - extra_attributes={ + adk_extras={ "pause_kind": "tool", "function_call_id": part.function_response.id, }, @@ -3304,7 +3318,7 @@ async def on_event_callback( is_truncated=is_truncated, event_data=EventData( source_event=event, - extra_attributes={ + adk_extras={ "pause_kind": pause_kind, "function_call_id": part.function_call.id, }, diff --git a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py index d072929aa3..720979e1b0 100644 --- a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py +++ b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py @@ -8007,7 +8007,6 @@ async def test_skips_executable_code_only_events( assert mock_write_client.append_rows.call_count == 0 - # ----------------------------------------------------------------------------- # ADK 2.0 minimum producer cut (#293 v5) # @@ -8346,10 +8345,11 @@ async def test_tool_paused_non_hitl_pause_kind_tool( rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) pauses = [r for r in rows if r["event_type"] == "TOOL_PAUSED"] assert len(pauses) == 1 + # C7 pair keys live UNDER ``attributes.adk`` so the consumer SQL on + # ``JSON_VALUE(attributes, '$.adk.function_call_id')`` resolves. adk = json.loads(pauses[0]["attributes"])["adk"] - extras = json.loads(pauses[0]["attributes"]) - assert extras["pause_kind"] == "tool" - assert extras["function_call_id"] == "call-1" + assert adk["pause_kind"] == "tool" + assert adk["function_call_id"] == "call-1" @pytest.mark.asyncio async def test_tool_paused_hitl_pause_kind( @@ -8379,9 +8379,9 @@ async def test_tool_paused_hitl_pause_kind( rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) pauses = [r for r in rows if r["event_type"] == "TOOL_PAUSED"] assert len(pauses) == 1 - extras = json.loads(pauses[0]["attributes"]) - assert extras["pause_kind"] == "hitl_confirmation" - assert extras["function_call_id"] == "call-hitl-1" + adk = json.loads(pauses[0]["attributes"])["adk"] + assert adk["pause_kind"] == "hitl_confirmation" + assert adk["function_call_id"] == "call-hitl-1" @pytest.mark.asyncio async def test_user_message_function_response_emits_tool_completed( @@ -8407,9 +8407,9 @@ async def test_user_message_function_response_emits_tool_completed( rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) completed = [r for r in rows if r["event_type"] == "TOOL_COMPLETED"] assert len(completed) == 1 - extras = json.loads(completed[0]["attributes"]) - assert extras["pause_kind"] == "tool" - assert extras["function_call_id"] == "call-1" + adk = json.loads(completed[0]["attributes"])["adk"] + assert adk["pause_kind"] == "tool" + assert adk["function_call_id"] == "call-1" @pytest.mark.asyncio async def test_hitl_user_message_does_not_emit_tool_completed( From 8d8eb0539728be5d6e2502d5ff6db9bc3c96e42e Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Mon, 8 Jun 2026 02:22:07 -0700 Subject: [PATCH 3/5] =?UTF-8?q?docs(bqaa):=20fix=20=5Fbuild=5Fadk=5Fenvelo?= =?UTF-8?q?pe=20docstring=20=E2=80=94=20keys=20are=20omitted,=20not=20JSON?= =?UTF-8?q?-null?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The docstring claimed callback-only rows leave A3/C1/C2/C3/C8 keys 'JSON null'. The implementation actually returns early when source_event is None (line 2837-2838) so those keys are absent from the envelope, not written as null. Behavior is correct (and what the #293 v5 contract intends). Updating the docstring to match — and noting that because the surrounding column is BigQuery JSON, an omitted key resolves to SQL NULL via JSON_VALUE(attributes, '$.adk.'), so consumer SQL using 'IS NOT NULL' to gate Event-originating rows works without the producer writing explicit JSON nulls. Caught by the RFC #97 review against the haiyuan-eng-google SDK repo; no code change required, docstring-only fix. --- .../adk/plugins/bigquery_agent_analytics_plugin.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 17013accd7..ef844eb478 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -2823,8 +2823,13 @@ def _build_adk_envelope( row regardless of origin. A3 / C1 / C2 / C3 (``source_event_id``, ``node``, ``branch``, ``scope``) and C8 (``route``, ``render_ui_widgets``, ``rewind_before_invocation_id``) only stamp - when a source Event is provided — callback-only rows leave them - JSON null rather than synthesizing fake identity. + when a source Event is provided — callback-only rows **omit** those + keys from the envelope rather than synthesizing fake identity. Since + the surrounding column is BigQuery JSON, an omitted key resolves to + SQL NULL via ``JSON_VALUE(attributes, '$.adk.')``; consumers + using ``JSON_VALUE(...) IS NOT NULL`` to gate on Event-originating + rows therefore work correctly without the producer writing explicit + JSON nulls. """ adk: dict[str, Any] = { "schema_version": _ADK_ENVELOPE_SCHEMA_VERSION, From fa20827cc282e84fba0f29a5044618c9740ad25a Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Mon, 8 Jun 2026 02:26:16 -0700 Subject: [PATCH 4/5] docs(bqaa): align EventData.source_event comment with omitted-vs-null contract Last stale 'writes those attributes as null' reference in the producer code. Behavior is unchanged; the helper omits the keys (return early at :2837 when source_event is None) and JSON_VALUE on the BigQuery JSON column resolves an omitted key to SQL NULL, so consumer gating with 'IS NOT NULL' works without explicit JSON nulls. Caught by the RFC #97 final review pass; matches the corrected _build_adk_envelope docstring in 8d8eb05. --- src/google/adk/plugins/bigquery_agent_analytics_plugin.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index ef844eb478..181b582771 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -2017,8 +2017,11 @@ class EventData: # ADK 2.0 envelope: callbacks that hold the source Event pass it here # so ``_log_event`` can stamp ``attributes.adk.{source_event_id, node, # branch, scope, ...}``. Leave None for rows that don't originate from - # an Event — the envelope helper writes those attributes as null - # rather than synthesizing fake identity (per #293 v5 A3 contract). + # an Event — the envelope helper omits those keys rather than + # synthesizing fake identity (per #293 v5 A3 contract). Because the + # surrounding column is BigQuery JSON, an omitted key resolves to SQL + # NULL via ``JSON_VALUE(attributes, '$.adk.')``, so consumer + # gating with ``... IS NOT NULL`` works without explicit JSON nulls. source_event: Optional["Event"] = None # Producer-supplied extras that belong INSIDE ``attributes.adk`` (not # at the top level of ``attributes``). C7's pair keys From ea7a16af967af983a7a5a9ac6032968c9dcdc65f Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Mon, 8 Jun 2026 02:33:44 -0700 Subject: [PATCH 5/5] docs(bqaa): drop GitHub-issue references from docstrings/comments Following review feedback that docstrings shouldn't reference GitHub issue numbers or PR review-thread revisions. The technical substance (contract names like 'A1/A2 envelope', 'C7 pair-key emit', 'flat-with- prefix', 'HITL non-routing') stays where it aids navigation; only the '#NNN' and 'v5' annotations come out. 20 sites swept across the plugin module and test file. Behavior and test names unchanged; suite still 252/252. The existing '#4645' reference in workflow plumbing is left alone -- it was not introduced by this change. --- .../bigquery_agent_analytics_plugin.py | 48 +++++++++---------- .../test_bigquery_agent_analytics_plugin.py | 6 +-- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 181b582771..beeef70b99 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -97,9 +97,9 @@ "adk_request_input": "HITL_INPUT_REQUEST", }) -# Reverse of _HITL_EVENT_MAP for the C7 pause_kind discriminator. The -# id→name lookup in #199/#293 v5 routes ``adk_request_credential`` → -# ``hitl_credential`` etc.; everything else is ``tool``. +# Reverse of _HITL_EVENT_MAP for the long-running-tool pause_kind +# discriminator. The id→name lookup routes ``adk_request_credential`` +# → ``hitl_credential`` etc.; everything else is ``tool``. _HITL_PAUSE_KIND_MAP = MappingProxyType({ "adk_request_credential": "hitl_credential", "adk_request_confirmation": "hitl_confirmation", @@ -110,7 +110,7 @@ def _derive_scope( isolation_scope: Optional[str], ) -> Optional[dict[str, str]]: - """Derives ``attributes.adk.scope`` per the #198 / #293 v5 rule. + """Derives ``attributes.adk.scope`` from an Event's isolation_scope. Order is fixed: (1) None → null; (2) node-shape (``name@run_id`` or ``parent/name@run_id``) → ``node_run``; (3) any other non-empty @@ -2018,7 +2018,7 @@ class EventData: # so ``_log_event`` can stamp ``attributes.adk.{source_event_id, node, # branch, scope, ...}``. Leave None for rows that don't originate from # an Event — the envelope helper omits those keys rather than - # synthesizing fake identity (per #293 v5 A3 contract). Because the + # synthesizing fake identity. Because the # surrounding column is BigQuery JSON, an omitted key resolves to SQL # NULL via ``JSON_VALUE(attributes, '$.adk.')``, so consumer # gating with ``... IS NOT NULL`` works without explicit JSON nulls. @@ -2027,7 +2027,7 @@ class EventData: # at the top level of ``attributes``). C7's pair keys # (``pause_kind`` / ``function_call_id``) ride here so consumer SQL # like ``JSON_VALUE(attributes, '$.adk.function_call_id')`` lands at - # the right JSON path per #293 v5. + # the right JSON path. adk_extras: dict[str, Any] = field(default_factory=dict) @@ -2820,7 +2820,7 @@ def _build_adk_envelope( callback_context: CallbackContext, source_event: Optional["Event"], ) -> dict[str, Any]: - """Builds the ``attributes.adk`` envelope per #293 v5. + """Builds the ``attributes.adk`` envelope. A1 / A2 (``schema_version``, ``app_name``) stamp on every ADK-enriched row regardless of origin. A3 / C1 / C2 / C3 (``source_event_id``, @@ -2883,7 +2883,7 @@ def _build_adk_envelope( except Exception: pass - # C3: scope shape derivation (per #190 v5 / #198). Order matters: + # Scope shape derivation. Order matters: # node-shape patterns must be checked before falling through to # function_call so bare ``name@run_id`` doesn't misclassify. try: @@ -2892,7 +2892,7 @@ def _build_adk_envelope( except Exception: pass - # C8: raw EventActions mirror (flat under attributes.adk per #203). + # Raw EventActions mirror (flat under attributes.adk). # Stamp only when actually set so JSON doesn't bloat with nulls. try: actions = getattr(source_event, "actions", None) @@ -2931,7 +2931,7 @@ def _enrich_attributes( Reads ``model``, ``model_version``, and ``usage_metadata`` from *event_data*, copies ``extra_attributes``, then adds session metadata - and custom tags. Also stamps the ``adk`` envelope (#293 v5). + and custom tags. Also stamps the ``adk`` envelope. Returns: A new dict ready for JSON serialization into the attributes column. @@ -2940,7 +2940,7 @@ def _enrich_attributes( adk_envelope = self._build_adk_envelope( callback_context, event_data.source_event ) - # Merge producer-supplied adk_extras (#293 v5 C7 pair keys etc.) + # Merge producer-supplied adk_extras (long-running pair keys etc.) # INTO the adk envelope so consumer SQL on # ``$.adk.pause_kind`` / ``$.adk.function_call_id`` resolves. # adk_envelope wins on key conflict — producer-derived envelope @@ -3107,7 +3107,7 @@ async def on_user_message_callback( are the long-running tool completions for tools that paused via ``TOOL_PAUSED``. Emitted as ``TOOL_COMPLETED`` with ``pause_kind = 'tool'`` and ``function_call_id`` so the customer - can join the pair from BigQuery (#293 v5 C7 pair-key subset). + can join the pair from BigQuery. Args: invocation_context: The context of the current invocation. @@ -3137,7 +3137,7 @@ async def on_user_message_callback( } if hitl_event: # HITL completions stay on the HITL_*_COMPLETED stream — they - # MUST NOT also emit TOOL_COMPLETED (per #293 v5 C7). + # MUST NOT also emit TOOL_COMPLETED. await self._log_event( hitl_event + "_COMPLETED", callback_ctx, @@ -3150,8 +3150,8 @@ async def on_user_message_callback( # tool calls complete inside the agent run via # after_tool_callback, so a function_response inside a user # message is the resume side of a previously-paused tool. - # Stamp the pair keys; pause_orphan / registry semantics are - # deferred to #206. + # Stamp the pair keys; pause_orphan / registry semantics + # are intentionally deferred. await self._log_event( "TOOL_COMPLETED", callback_ctx, @@ -3211,10 +3211,10 @@ async def on_event_callback( ), ) - # --- AGENT_TRANSFER (C4 / #200) --- + # --- AGENT_TRANSFER --- # actions.transfer_to_agent stores the *target* agent only # (events/event_actions.py:75); from_agent is pinned to event.author - # per #293 v5 C4. Never fabricate authors on non-Event paths. + # by contract. Never fabricate authors on non-Event paths. if event.actions.transfer_to_agent: await self._log_event( "AGENT_TRANSFER", @@ -3226,7 +3226,7 @@ async def on_event_callback( event_data=EventData(source_event=event), ) - # --- EVENT_COMPACTION (C5 / #201) --- + # --- EVENT_COMPACTION --- # EventCompaction.start_timestamp / end_timestamp are float epoch # seconds. Preserve fractional precision here; consumer view # conversion is deferred. @@ -3247,7 +3247,7 @@ async def on_event_callback( event_data=EventData(source_event=event), ) - # --- AGENT_STATE_CHECKPOINT (C6 / #202) --- + # --- AGENT_STATE_CHECKPOINT --- # Fires when *either* agent_state is set or end_of_agent is True; # supports {agent_state: None, end_of_agent: True} payloads. # Inline payload only — oversized-state GCS offload deferred. @@ -3274,7 +3274,7 @@ async def on_event_callback( event_data=EventData(source_event=event), ) - # --- HITL + TOOL_PAUSED (C7 / #199 — pair-key subset) + per-part + # --- HITL + TOOL_PAUSED (pair-key emit) + per-part # iteration over event.content.parts --- # TOOL_PAUSED fires per long_running_tool_id; pause_kind is derived # via the id→name lookup against _HITL_PAUSE_KIND_MAP, so a HITL @@ -3305,9 +3305,9 @@ async def on_event_callback( is_truncated=is_truncated, event_data=EventData(source_event=event), ) - # C7: per-id TOOL_PAUSED emit. pause_kind derives from the - # function_call NAME (#199 / #293 v5) — looking it up against - # the id value would misclassify every HITL pause as 'tool'. + # Per-id TOOL_PAUSED emit. pause_kind derives from the + # function_call NAME — looking it up against the id value + # would misclassify every HITL pause as 'tool'. if part.function_call.id in long_running_ids: pause_kind = _HITL_PAUSE_KIND_MAP.get( part.function_call.name, "tool" @@ -3334,7 +3334,7 @@ async def on_event_callback( ) # Detect HITL function responses (completion events). HITL # function responses route ONLY here, never to TOOL_COMPLETED - # (per #293 v5 C7 / verified at this file's HITL test suite). + # (verified by this file's HITL test suite). if part.function_response: hitl_event = _HITL_EVENT_MAP.get(part.function_response.name) if hitl_event: diff --git a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py index 720979e1b0..e2f7f87e91 100644 --- a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py +++ b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py @@ -8008,7 +8008,7 @@ async def test_skips_executable_code_only_events( # ----------------------------------------------------------------------------- -# ADK 2.0 minimum producer cut (#293 v5) +# ADK 2.0 minimum producer cut # # Coverage matrix: # A1 / A2 attributes.adk.{schema_version, app_name} on every row @@ -8451,7 +8451,7 @@ async def test_route_and_rewind_flat_under_attributes_adk( invocation_context, dummy_arrow_schema, ): - """C8 (#203): route / rewind_before_invocation_id mirror under + """C8: route / rewind_before_invocation_id mirror under attributes.adk.* (flat-with-prefix, NOT nested under .actions.).""" event = event_lib.Event( author="agent", @@ -8470,7 +8470,7 @@ async def test_route_and_rewind_flat_under_attributes_adk( mock_write_client, dummy_arrow_schema ) adk = json.loads(log_entry["attributes"])["adk"] - # Flat-with-prefix per #203 / #293 v5. + # Flat-with-prefix mirror under attributes.adk.*. assert adk["route"] == "branch_b" assert adk["rewind_before_invocation_id"] == "inv-earlier" # Not nested under .actions.