Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 251 additions & 0 deletions runtime_manager/worker_main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import ast
import hashlib
import json
import os
import re
Expand Down Expand Up @@ -252,6 +253,7 @@ def command_reader() -> None:
)
listener.start()

llm_debug_hook_registrations: list[tuple[Any, str, Any]] = []
tool_started_at: dict[str, float] = {}
seen_artifact_ids: set[str] = set()
tool_lock = threading.Lock()
Expand Down Expand Up @@ -372,6 +374,9 @@ def on_thinking(message: str | None) -> None:
)
register_gateway_notify(approval_session_key, approval_notify)
runtime_llm_config = _resolve_runtime_llm_config(request)
model = runtime_llm_config["model"]
provider = runtime_llm_config["provider"]
base_url = runtime_llm_config["base_url"]

system_prompt = _compose_effective_system_prompt(
request,
Expand Down Expand Up @@ -410,6 +415,12 @@ def on_thinking(message: str | None) -> None:
)
_apply_runtime_llm_config_to_agent(agent, runtime_llm_config)
_AGENT_HOLDER["agent"] = agent
llm_debug_hook_registrations = _register_runtime_llm_debug_hooks(
run_id=run_id,
request=request,
system_prompt=system_prompt,
emit_fn=emit,
)

emit({"event": "run.running", "run_id": run_id, "timestamp": time.time()})
result = agent.run_conversation(
Expand Down Expand Up @@ -463,6 +474,7 @@ def on_thinking(message: str | None) -> None:
)
return 1
finally:
_unregister_runtime_llm_debug_hooks(llm_debug_hook_registrations)
try:
unregister_gateway_notify(approval_session_key)
except Exception:
Expand Down Expand Up @@ -615,6 +627,245 @@ def _base_url_host(value: Any) -> str:
return parsed.netloc or parsed.path.split("/", 1)[0]


def _register_runtime_llm_debug_hooks(
*,
run_id: str,
request: dict[str, Any],
system_prompt: str | None,
emit_fn,
) -> list[tuple[Any, str, Any]]:
try:
from hermes_cli.plugins import get_plugin_manager
except Exception:
return []

manager = get_plugin_manager()
registrations: list[tuple[Any, str, Any]] = []

def on_pre_api_request(**kwargs: Any) -> None:
event = _llm_request_metadata_event(
kwargs,
run_id=run_id,
request=request,
system_prompt=system_prompt,
)
if event:
emit_fn(event)

def on_post_api_request(**kwargs: Any) -> None:
event = _llm_response_metadata_event(kwargs, run_id=run_id)
if event:
emit_fn(event)

for hook_name, callback in (
("pre_api_request", on_pre_api_request),
("post_api_request", on_post_api_request),
):
hooks = getattr(manager, "_hooks", None)
if not isinstance(hooks, dict):
return registrations
hooks.setdefault(hook_name, []).append(callback)
registrations.append((manager, hook_name, callback))
return registrations


def _unregister_runtime_llm_debug_hooks(registrations: list[tuple[Any, str, Any]]) -> None:
for manager, hook_name, callback in registrations:
try:
hooks = getattr(manager, "_hooks", None)
callbacks = hooks.get(hook_name) if isinstance(hooks, dict) else None
if isinstance(callbacks, list) and callback in callbacks:
callbacks.remove(callback)
except Exception:
continue


def _llm_request_metadata_event(
hook_kwargs: dict[str, Any],
*,
run_id: str,
request: dict[str, Any],
system_prompt: str | None,
) -> dict[str, Any]:
body = _hook_request_body(hook_kwargs)
tools = body.get("tools")
if not isinstance(tools, list):
tools = []

prompt_text = system_prompt or ""
skills = _normalize_string_list(request.get("skills"))
return {
"event": "llm.request.metadata",
"run_id": run_id,
"timestamp": time.time(),
"api_request_id": str(hook_kwargs.get("api_request_id") or ""),
"turn_id": str(hook_kwargs.get("turn_id") or ""),
"api_call_count": _coerce_positive_int(hook_kwargs.get("api_call_count")),
"provider": str(hook_kwargs.get("provider") or ""),
"model": str(hook_kwargs.get("model") or ""),
"base_url_host": _base_url_host(hook_kwargs.get("base_url")),
"api_mode": str(hook_kwargs.get("api_mode") or ""),
"message_count": _coerce_positive_int(hook_kwargs.get("message_count")),
"request_message_count": _message_count_from_body(body),
"request_char_count": _coerce_positive_int(hook_kwargs.get("request_char_count")),
"approx_input_tokens": _coerce_positive_int(hook_kwargs.get("approx_input_tokens")),
"max_tokens": _coerce_positive_int(hook_kwargs.get("max_tokens")),
"skills_count": len(skills),
"skill_names": skills,
"skill_prompt_presence": {
skill: bool(skill and skill in prompt_text)
for skill in skills
},
"system_prompt_chars": len(prompt_text),
"system_prompt_sha256": _sha256_hex(prompt_text) if prompt_text else "",
"enabled_toolsets": _normalize_string_list(request.get("enabled_toolsets")),
"disabled_toolsets": _normalize_string_list(request.get("disabled_toolsets")),
"tool_count": _coerce_positive_int(hook_kwargs.get("tool_count")),
"tools_count": len(tools),
"tool_names": _tool_names_from_schemas(tools),
"tool_choice": _summarize_tool_choice(body),
}


def _llm_response_metadata_event(
hook_kwargs: dict[str, Any],
*,
run_id: str,
) -> dict[str, Any]:
assistant_message = hook_kwargs.get("assistant_message")
return {
"event": "llm.response.metadata",
"run_id": run_id,
"timestamp": time.time(),
"api_request_id": str(hook_kwargs.get("api_request_id") or ""),
"turn_id": str(hook_kwargs.get("turn_id") or ""),
"api_call_count": _coerce_positive_int(hook_kwargs.get("api_call_count")),
"provider": str(hook_kwargs.get("provider") or ""),
"model": str(hook_kwargs.get("model") or ""),
"base_url_host": _base_url_host(hook_kwargs.get("base_url")),
"api_mode": str(hook_kwargs.get("api_mode") or ""),
"api_duration": _coerce_nonnegative_float(hook_kwargs.get("api_duration")),
"finish_reason": str(hook_kwargs.get("finish_reason") or ""),
"response_model": str(hook_kwargs.get("response_model") or ""),
"message_count": _coerce_positive_int(hook_kwargs.get("message_count")),
"assistant_content_chars": _coerce_positive_int(
hook_kwargs.get("assistant_content_chars")
),
"assistant_tool_calls_count": _coerce_positive_int(
hook_kwargs.get("assistant_tool_call_count")
),
"assistant_tool_names": _tool_names_from_calls(
getattr(assistant_message, "tool_calls", None)
),
"usage": hook_kwargs.get("usage") if isinstance(hook_kwargs.get("usage"), dict) else {},
}


def _hook_request_body(hook_kwargs: dict[str, Any]) -> dict[str, Any]:
request_payload = hook_kwargs.get("request")
if not isinstance(request_payload, dict):
return {}
body = request_payload.get("body")
return body if isinstance(body, dict) else {}


def _message_count_from_body(body: dict[str, Any]) -> int:
for key in ("messages", "input"):
value = body.get(key)
if isinstance(value, list):
return len(value)
return 0


def _tool_names_from_schemas(tools: list[Any], *, limit: int = 32) -> list[str]:
names: list[str] = []
for item in tools:
name = _tool_name_from_schema(item)
if name:
names.append(name)
if len(names) >= limit:
break
return names


def _tool_name_from_schema(value: Any) -> str:
if not isinstance(value, dict):
return ""
function = value.get("function")
if isinstance(function, dict) and isinstance(function.get("name"), str):
return function["name"]
for key in ("name", "tool_name"):
item = value.get(key)
if isinstance(item, str) and item.strip():
return item.strip()
return ""


def _tool_names_from_calls(value: Any, *, limit: int = 32) -> list[str]:
if not isinstance(value, list):
return []
names: list[str] = []
for item in value:
name = _tool_name_from_call(item)
if name:
names.append(name)
if len(names) >= limit:
break
return names


def _tool_name_from_call(value: Any) -> str:
function = getattr(value, "function", None)
name = getattr(function, "name", None)
if isinstance(name, str) and name.strip():
return name.strip()
if isinstance(value, dict):
function = value.get("function")
if isinstance(function, dict) and isinstance(function.get("name"), str):
return function["name"].strip()
if isinstance(value.get("name"), str):
return value["name"].strip()
return ""


def _summarize_tool_choice(body: dict[str, Any]) -> str:
if "tool_choice" not in body:
return "omitted"
value = body.get("tool_choice")
if value is None:
return "null"
if isinstance(value, str):
return value.strip() or "empty"
if isinstance(value, dict):
function = value.get("function")
name = function.get("name") if isinstance(function, dict) else None
if isinstance(name, str) and name.strip():
return f"function:{name.strip()}"
item_type = value.get("type")
if isinstance(item_type, str) and item_type.strip():
return f"type:{item_type.strip()}"
return "object"
return type(value).__name__


def _sha256_hex(value: str) -> str:
return hashlib.sha256(value.encode("utf-8", errors="replace")).hexdigest()


def _coerce_nonnegative_float(value: Any) -> float:
if isinstance(value, bool) or value is None:
return 0.0
if isinstance(value, (int, float)):
return float(value) if value >= 0 else 0.0
if isinstance(value, str):
try:
parsed = float(value.strip())
except ValueError:
return 0.0
return parsed if parsed >= 0 else 0.0
return 0.0


def _safe_status_event_text(value: Any, *, limit: int = 512) -> str:
text = str(value or "")
text = _SENSITIVE_STATUS_VALUE_PATTERN.sub(r"\1\2<redacted>", text)
Expand Down
98 changes: 98 additions & 0 deletions tests/runtime_manager/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,104 @@ def __init__(self):
}


def test_runtime_worker_llm_request_metadata_is_safe_and_actionable():
from runtime_manager.worker_main import _llm_request_metadata_event

event = _llm_request_metadata_event(
{
"api_request_id": "turn-1:api:1",
"turn_id": "turn-1",
"api_call_count": 1,
"provider": "custom",
"model": "qwen3.6-35b-a3b",
"base_url": "https://models.example/v1",
"api_mode": "chat_completions",
"message_count": 3,
"request_char_count": 12345,
"approx_input_tokens": 6789,
"max_tokens": 8192,
"tool_count": 6,
"request": {
"body": {
"messages": [
{"role": "system", "content": "SECRET PROMPT"},
{"role": "user", "content": "diagnose"},
],
"tools": [
{
"type": "function",
"function": {
"name": "terminal",
"parameters": {"properties": {"command": {"description": "SECRET SCHEMA"}}},
},
},
{"type": "function", "function": {"name": "read_file"}},
],
"tool_choice": "auto",
"api_key": "secret-key",
}
},
},
run_id="run-1",
request={
"skills": ["dba-diagnose-oracle"],
"enabled_toolsets": ["terminal", "file"],
},
system_prompt="BASE\n\ndba-diagnose-oracle\n\nSECRET PROMPT",
)

assert event["event"] == "llm.request.metadata"
assert event["run_id"] == "run-1"
assert event["skills_count"] == 1
assert event["skill_names"] == ["dba-diagnose-oracle"]
assert event["skill_prompt_presence"] == {"dba-diagnose-oracle": True}
assert event["system_prompt_chars"] > 0
assert len(event["system_prompt_sha256"]) == 64
assert event["tools_count"] == 2
assert event["tool_names"] == ["terminal", "read_file"]
assert event["tool_choice"] == "auto"
assert event["enabled_toolsets"] == ["terminal", "file"]
encoded = json.dumps(event, ensure_ascii=False)
assert "SECRET PROMPT" not in encoded
assert "SECRET SCHEMA" not in encoded
assert "secret-key" not in encoded


def test_runtime_worker_llm_response_metadata_is_safe_and_actionable():
from types import SimpleNamespace

from runtime_manager.worker_main import _llm_response_metadata_event

tool_call = SimpleNamespace(function=SimpleNamespace(name="terminal"))
event = _llm_response_metadata_event(
{
"api_request_id": "turn-1:api:1",
"turn_id": "turn-1",
"api_call_count": 1,
"provider": "custom",
"model": "qwen3.6-35b-a3b",
"base_url": "https://models.example/v1",
"api_mode": "chat_completions",
"api_duration": 1.25,
"finish_reason": "tool_calls",
"response_model": "qwen3.6-35b-a3b",
"message_count": 3,
"assistant_content_chars": 0,
"assistant_tool_call_count": 1,
"assistant_message": SimpleNamespace(tool_calls=[tool_call]),
"usage": {"prompt_tokens": 100, "completion_tokens": 20},
},
run_id="run-1",
)

assert event["event"] == "llm.response.metadata"
assert event["run_id"] == "run-1"
assert event["finish_reason"] == "tool_calls"
assert event["assistant_tool_calls_count"] == 1
assert event["assistant_tool_names"] == ["terminal"]
assert event["usage"] == {"prompt_tokens": 100, "completion_tokens": 20}


def test_runtime_worker_projects_compression_status_as_structured_event():
from runtime_manager.worker_main import _compression_event_from_status

Expand Down
Loading