diff --git a/runtime_manager/worker_main.py b/runtime_manager/worker_main.py index 7ef463b17768..8349ff9d4138 100644 --- a/runtime_manager/worker_main.py +++ b/runtime_manager/worker_main.py @@ -1,6 +1,7 @@ from __future__ import annotations import ast +import hashlib import json import os import re @@ -252,6 +253,7 @@ def command_reader() -> None: ) listener.start() + llm_debug_hook_registrations: list[tuple[Any, str, Any]] = [] tool_started_at: dict[str, float] = {} seen_artifact_ids: set[str] = set() tool_lock = threading.Lock() @@ -372,6 +374,9 @@ def on_thinking(message: str | None) -> None: ) register_gateway_notify(approval_session_key, approval_notify) runtime_llm_config = _resolve_runtime_llm_config(request) + model = runtime_llm_config["model"] + provider = runtime_llm_config["provider"] + base_url = runtime_llm_config["base_url"] system_prompt = _compose_effective_system_prompt( request, @@ -410,6 +415,12 @@ def on_thinking(message: str | None) -> None: ) _apply_runtime_llm_config_to_agent(agent, runtime_llm_config) _AGENT_HOLDER["agent"] = agent + llm_debug_hook_registrations = _register_runtime_llm_debug_hooks( + run_id=run_id, + request=request, + system_prompt=system_prompt, + emit_fn=emit, + ) emit({"event": "run.running", "run_id": run_id, "timestamp": time.time()}) result = agent.run_conversation( @@ -463,6 +474,7 @@ def on_thinking(message: str | None) -> None: ) return 1 finally: + _unregister_runtime_llm_debug_hooks(llm_debug_hook_registrations) try: unregister_gateway_notify(approval_session_key) except Exception: @@ -615,6 +627,245 @@ def _base_url_host(value: Any) -> str: return parsed.netloc or parsed.path.split("/", 1)[0] +def _register_runtime_llm_debug_hooks( + *, + run_id: str, + request: dict[str, Any], + system_prompt: str | None, + emit_fn, +) -> list[tuple[Any, str, Any]]: + try: + from hermes_cli.plugins import get_plugin_manager + except Exception: + return [] + + manager = get_plugin_manager() + registrations: list[tuple[Any, str, Any]] = [] + + def on_pre_api_request(**kwargs: Any) -> None: + event = _llm_request_metadata_event( + kwargs, + run_id=run_id, + request=request, + system_prompt=system_prompt, + ) + if event: + emit_fn(event) + + def on_post_api_request(**kwargs: Any) -> None: + event = _llm_response_metadata_event(kwargs, run_id=run_id) + if event: + emit_fn(event) + + for hook_name, callback in ( + ("pre_api_request", on_pre_api_request), + ("post_api_request", on_post_api_request), + ): + hooks = getattr(manager, "_hooks", None) + if not isinstance(hooks, dict): + return registrations + hooks.setdefault(hook_name, []).append(callback) + registrations.append((manager, hook_name, callback)) + return registrations + + +def _unregister_runtime_llm_debug_hooks(registrations: list[tuple[Any, str, Any]]) -> None: + for manager, hook_name, callback in registrations: + try: + hooks = getattr(manager, "_hooks", None) + callbacks = hooks.get(hook_name) if isinstance(hooks, dict) else None + if isinstance(callbacks, list) and callback in callbacks: + callbacks.remove(callback) + except Exception: + continue + + +def _llm_request_metadata_event( + hook_kwargs: dict[str, Any], + *, + run_id: str, + request: dict[str, Any], + system_prompt: str | None, +) -> dict[str, Any]: + body = _hook_request_body(hook_kwargs) + tools = body.get("tools") + if not isinstance(tools, list): + tools = [] + + prompt_text = system_prompt or "" + skills = _normalize_string_list(request.get("skills")) + return { + "event": "llm.request.metadata", + "run_id": run_id, + "timestamp": time.time(), + "api_request_id": str(hook_kwargs.get("api_request_id") or ""), + "turn_id": str(hook_kwargs.get("turn_id") or ""), + "api_call_count": _coerce_positive_int(hook_kwargs.get("api_call_count")), + "provider": str(hook_kwargs.get("provider") or ""), + "model": str(hook_kwargs.get("model") or ""), + "base_url_host": _base_url_host(hook_kwargs.get("base_url")), + "api_mode": str(hook_kwargs.get("api_mode") or ""), + "message_count": _coerce_positive_int(hook_kwargs.get("message_count")), + "request_message_count": _message_count_from_body(body), + "request_char_count": _coerce_positive_int(hook_kwargs.get("request_char_count")), + "approx_input_tokens": _coerce_positive_int(hook_kwargs.get("approx_input_tokens")), + "max_tokens": _coerce_positive_int(hook_kwargs.get("max_tokens")), + "skills_count": len(skills), + "skill_names": skills, + "skill_prompt_presence": { + skill: bool(skill and skill in prompt_text) + for skill in skills + }, + "system_prompt_chars": len(prompt_text), + "system_prompt_sha256": _sha256_hex(prompt_text) if prompt_text else "", + "enabled_toolsets": _normalize_string_list(request.get("enabled_toolsets")), + "disabled_toolsets": _normalize_string_list(request.get("disabled_toolsets")), + "tool_count": _coerce_positive_int(hook_kwargs.get("tool_count")), + "tools_count": len(tools), + "tool_names": _tool_names_from_schemas(tools), + "tool_choice": _summarize_tool_choice(body), + } + + +def _llm_response_metadata_event( + hook_kwargs: dict[str, Any], + *, + run_id: str, +) -> dict[str, Any]: + assistant_message = hook_kwargs.get("assistant_message") + return { + "event": "llm.response.metadata", + "run_id": run_id, + "timestamp": time.time(), + "api_request_id": str(hook_kwargs.get("api_request_id") or ""), + "turn_id": str(hook_kwargs.get("turn_id") or ""), + "api_call_count": _coerce_positive_int(hook_kwargs.get("api_call_count")), + "provider": str(hook_kwargs.get("provider") or ""), + "model": str(hook_kwargs.get("model") or ""), + "base_url_host": _base_url_host(hook_kwargs.get("base_url")), + "api_mode": str(hook_kwargs.get("api_mode") or ""), + "api_duration": _coerce_nonnegative_float(hook_kwargs.get("api_duration")), + "finish_reason": str(hook_kwargs.get("finish_reason") or ""), + "response_model": str(hook_kwargs.get("response_model") or ""), + "message_count": _coerce_positive_int(hook_kwargs.get("message_count")), + "assistant_content_chars": _coerce_positive_int( + hook_kwargs.get("assistant_content_chars") + ), + "assistant_tool_calls_count": _coerce_positive_int( + hook_kwargs.get("assistant_tool_call_count") + ), + "assistant_tool_names": _tool_names_from_calls( + getattr(assistant_message, "tool_calls", None) + ), + "usage": hook_kwargs.get("usage") if isinstance(hook_kwargs.get("usage"), dict) else {}, + } + + +def _hook_request_body(hook_kwargs: dict[str, Any]) -> dict[str, Any]: + request_payload = hook_kwargs.get("request") + if not isinstance(request_payload, dict): + return {} + body = request_payload.get("body") + return body if isinstance(body, dict) else {} + + +def _message_count_from_body(body: dict[str, Any]) -> int: + for key in ("messages", "input"): + value = body.get(key) + if isinstance(value, list): + return len(value) + return 0 + + +def _tool_names_from_schemas(tools: list[Any], *, limit: int = 32) -> list[str]: + names: list[str] = [] + for item in tools: + name = _tool_name_from_schema(item) + if name: + names.append(name) + if len(names) >= limit: + break + return names + + +def _tool_name_from_schema(value: Any) -> str: + if not isinstance(value, dict): + return "" + function = value.get("function") + if isinstance(function, dict) and isinstance(function.get("name"), str): + return function["name"] + for key in ("name", "tool_name"): + item = value.get(key) + if isinstance(item, str) and item.strip(): + return item.strip() + return "" + + +def _tool_names_from_calls(value: Any, *, limit: int = 32) -> list[str]: + if not isinstance(value, list): + return [] + names: list[str] = [] + for item in value: + name = _tool_name_from_call(item) + if name: + names.append(name) + if len(names) >= limit: + break + return names + + +def _tool_name_from_call(value: Any) -> str: + function = getattr(value, "function", None) + name = getattr(function, "name", None) + if isinstance(name, str) and name.strip(): + return name.strip() + if isinstance(value, dict): + function = value.get("function") + if isinstance(function, dict) and isinstance(function.get("name"), str): + return function["name"].strip() + if isinstance(value.get("name"), str): + return value["name"].strip() + return "" + + +def _summarize_tool_choice(body: dict[str, Any]) -> str: + if "tool_choice" not in body: + return "omitted" + value = body.get("tool_choice") + if value is None: + return "null" + if isinstance(value, str): + return value.strip() or "empty" + if isinstance(value, dict): + function = value.get("function") + name = function.get("name") if isinstance(function, dict) else None + if isinstance(name, str) and name.strip(): + return f"function:{name.strip()}" + item_type = value.get("type") + if isinstance(item_type, str) and item_type.strip(): + return f"type:{item_type.strip()}" + return "object" + return type(value).__name__ + + +def _sha256_hex(value: str) -> str: + return hashlib.sha256(value.encode("utf-8", errors="replace")).hexdigest() + + +def _coerce_nonnegative_float(value: Any) -> float: + if isinstance(value, bool) or value is None: + return 0.0 + if isinstance(value, (int, float)): + return float(value) if value >= 0 else 0.0 + if isinstance(value, str): + try: + parsed = float(value.strip()) + except ValueError: + return 0.0 + return parsed if parsed >= 0 else 0.0 + return 0.0 + + def _safe_status_event_text(value: Any, *, limit: int = 512) -> str: text = str(value or "") text = _SENSITIVE_STATUS_VALUE_PATTERN.sub(r"\1\2", text) diff --git a/tests/runtime_manager/test_registry.py b/tests/runtime_manager/test_registry.py index 890b2e46fd65..54c423cba1d3 100644 --- a/tests/runtime_manager/test_registry.py +++ b/tests/runtime_manager/test_registry.py @@ -177,6 +177,104 @@ def __init__(self): } +def test_runtime_worker_llm_request_metadata_is_safe_and_actionable(): + from runtime_manager.worker_main import _llm_request_metadata_event + + event = _llm_request_metadata_event( + { + "api_request_id": "turn-1:api:1", + "turn_id": "turn-1", + "api_call_count": 1, + "provider": "custom", + "model": "qwen3.6-35b-a3b", + "base_url": "https://models.example/v1", + "api_mode": "chat_completions", + "message_count": 3, + "request_char_count": 12345, + "approx_input_tokens": 6789, + "max_tokens": 8192, + "tool_count": 6, + "request": { + "body": { + "messages": [ + {"role": "system", "content": "SECRET PROMPT"}, + {"role": "user", "content": "diagnose"}, + ], + "tools": [ + { + "type": "function", + "function": { + "name": "terminal", + "parameters": {"properties": {"command": {"description": "SECRET SCHEMA"}}}, + }, + }, + {"type": "function", "function": {"name": "read_file"}}, + ], + "tool_choice": "auto", + "api_key": "secret-key", + } + }, + }, + run_id="run-1", + request={ + "skills": ["dba-diagnose-oracle"], + "enabled_toolsets": ["terminal", "file"], + }, + system_prompt="BASE\n\ndba-diagnose-oracle\n\nSECRET PROMPT", + ) + + assert event["event"] == "llm.request.metadata" + assert event["run_id"] == "run-1" + assert event["skills_count"] == 1 + assert event["skill_names"] == ["dba-diagnose-oracle"] + assert event["skill_prompt_presence"] == {"dba-diagnose-oracle": True} + assert event["system_prompt_chars"] > 0 + assert len(event["system_prompt_sha256"]) == 64 + assert event["tools_count"] == 2 + assert event["tool_names"] == ["terminal", "read_file"] + assert event["tool_choice"] == "auto" + assert event["enabled_toolsets"] == ["terminal", "file"] + encoded = json.dumps(event, ensure_ascii=False) + assert "SECRET PROMPT" not in encoded + assert "SECRET SCHEMA" not in encoded + assert "secret-key" not in encoded + + +def test_runtime_worker_llm_response_metadata_is_safe_and_actionable(): + from types import SimpleNamespace + + from runtime_manager.worker_main import _llm_response_metadata_event + + tool_call = SimpleNamespace(function=SimpleNamespace(name="terminal")) + event = _llm_response_metadata_event( + { + "api_request_id": "turn-1:api:1", + "turn_id": "turn-1", + "api_call_count": 1, + "provider": "custom", + "model": "qwen3.6-35b-a3b", + "base_url": "https://models.example/v1", + "api_mode": "chat_completions", + "api_duration": 1.25, + "finish_reason": "tool_calls", + "response_model": "qwen3.6-35b-a3b", + "message_count": 3, + "assistant_content_chars": 0, + "assistant_tool_call_count": 1, + "assistant_message": SimpleNamespace(tool_calls=[tool_call]), + "usage": {"prompt_tokens": 100, "completion_tokens": 20}, + }, + run_id="run-1", + ) + + assert event["event"] == "llm.response.metadata" + assert event["run_id"] == "run-1" + assert event["finish_reason"] == "tool_calls" + assert event["assistant_tool_calls_count"] == 1 + assert event["assistant_tool_names"] == ["terminal"] + assert event["usage"] == {"prompt_tokens": 100, "completion_tokens": 20} + + def test_runtime_worker_projects_compression_status_as_structured_event(): from runtime_manager.worker_main import _compression_event_from_status