Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions runtime_manager/worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ def emit(event: dict[str, Any]) -> None:
sys.stdout.flush()


def log_event(event: dict[str, Any]) -> None:
with _OUTPUT_LOCK:
sys.stderr.write(json.dumps(event, ensure_ascii=False) + "\n")
sys.stderr.flush()


def main() -> int:
first_line = sys.stdin.readline()
if not first_line:
Expand Down Expand Up @@ -651,11 +657,13 @@ def on_pre_api_request(**kwargs: Any) -> None:
)
if event:
emit_fn(event)
_log_llm_metadata_event(event, request=request)

def on_post_api_request(**kwargs: Any) -> None:
event = _llm_response_metadata_event(kwargs, run_id=run_id)
if event:
emit_fn(event)
_log_llm_metadata_event(event, request=request)

for hook_name, callback in (
("pre_api_request", on_pre_api_request),
Expand Down Expand Up @@ -761,6 +769,87 @@ def _llm_response_metadata_event(
}


def _log_llm_metadata_event(event: dict[str, Any], *, request: dict[str, Any]) -> None:
try:
log_event(_llm_metadata_log_record(event, request=request))
except Exception:
return


def _llm_metadata_log_record(event: dict[str, Any], *, request: dict[str, Any]) -> dict[str, Any]:
record: dict[str, Any] = {
"message": "runtime.llm.metadata",
"event": str(event.get("event") or ""),
"run_id": str(event.get("run_id") or ""),
"session_id": str(request.get("session_id") or request.get("conversation_id") or ""),
"user_id": str(request.get("user_id") or ""),
"conversation_id": str(request.get("conversation_id") or ""),
"timestamp": event.get("timestamp"),
"api_request_id": str(event.get("api_request_id") or ""),
"turn_id": str(event.get("turn_id") or ""),
"api_call_count": _coerce_positive_int(event.get("api_call_count")),
"provider": str(event.get("provider") or ""),
"model": str(event.get("model") or ""),
"base_url_host": str(event.get("base_url_host") or ""),
"api_mode": str(event.get("api_mode") or ""),
}

if event.get("event") == "llm.request.metadata":
record.update(
{
"skills_count": _coerce_positive_int(event.get("skills_count")),
"skill_names": _normalize_string_list(event.get("skill_names")),
"skill_prompt_presence": event.get("skill_prompt_presence")
if isinstance(event.get("skill_prompt_presence"), dict)
else {},
"enabled_toolsets": _normalize_string_list(event.get("enabled_toolsets")),
"disabled_toolsets": _normalize_string_list(event.get("disabled_toolsets")),
"tools_count": _coerce_positive_int(event.get("tools_count")),
"tool_names": _normalize_string_list(event.get("tool_names")),
"tool_choice": str(event.get("tool_choice") or ""),
"request_message_count": _coerce_positive_int(
event.get("request_message_count")
),
"request_char_count": _coerce_positive_int(event.get("request_char_count")),
"approx_input_tokens": _coerce_positive_int(event.get("approx_input_tokens")),
"max_tokens": _coerce_positive_int(event.get("max_tokens")),
"system_prompt_chars": _coerce_positive_int(event.get("system_prompt_chars")),
"system_prompt_sha256": str(event.get("system_prompt_sha256") or ""),
}
)
elif event.get("event") == "llm.response.metadata":
record.update(
{
"finish_reason": str(event.get("finish_reason") or ""),
"assistant_tool_calls_count": _coerce_positive_int(
event.get("assistant_tool_calls_count")
),
"assistant_tool_names": _normalize_string_list(
event.get("assistant_tool_names")
),
"assistant_content_chars": _coerce_positive_int(
event.get("assistant_content_chars")
),
"response_model": str(event.get("response_model") or ""),
"output_tokens": _usage_output_tokens(event.get("usage")),
}
)
return record


def _usage_output_tokens(value: Any) -> int:
if not isinstance(value, dict):
return 0
return _coerce_positive_int(
_first_present(
value.get("completion_tokens"),
value.get("output_tokens"),
value.get("completionTokens"),
value.get("outputTokens"),
)
)


def _hook_request_body(hook_kwargs: dict[str, Any]) -> dict[str, Any]:
request_payload = hook_kwargs.get("request")
if not isinstance(request_payload, dict):
Expand Down
123 changes: 123 additions & 0 deletions tests/runtime_manager/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,80 @@ def test_runtime_worker_llm_request_metadata_is_safe_and_actionable():
assert "secret-key" not in encoded


def test_runtime_worker_llm_request_metadata_log_record_is_safe_and_greppable():
from runtime_manager.worker_main import (
_llm_metadata_log_record,
_llm_request_metadata_event,
)

event = _llm_request_metadata_event(
{
"api_request_id": "turn-1:api:1",
"turn_id": "turn-1",
"api_call_count": 1,
"provider": "custom",
"model": "qwen3.6-35b-a3b",
"base_url": "https://models.example/v1",
"request_char_count": 12345,
"approx_input_tokens": 6789,
"request": {
"body": {
"messages": [{"role": "user", "content": "SECRET USER INPUT"}],
"tools": [
{
"type": "function",
"function": {
"name": "terminal",
"parameters": {"properties": {"command": {"description": "SECRET SCHEMA"}}},
},
}
],
"tool_choice": "auto",
"api_key": "secret-key",
}
},
},
run_id="run-1",
request={
"user_id": "user-1",
"conversation_id": "conv-1",
"session_id": "session-1",
"message": "SECRET REQUEST",
"skills": ["dba-diagnose-oracle"],
"enabled_toolsets": ["terminal", "file"],
},
system_prompt="BASE\n\ndba-diagnose-oracle\n\nSECRET PROMPT",
)

record = _llm_metadata_log_record(
event,
request={
"user_id": "user-1",
"conversation_id": "conv-1",
"session_id": "session-1",
"message": "SECRET REQUEST",
},
)

assert record["message"] == "runtime.llm.metadata"
assert record["event"] == "llm.request.metadata"
assert record["run_id"] == "run-1"
assert record["session_id"] == "session-1"
assert record["user_id"] == "user-1"
assert record["conversation_id"] == "conv-1"
assert record["skill_names"] == ["dba-diagnose-oracle"]
assert record["tools_count"] == 1
assert record["tool_names"] == ["terminal"]
assert record["tool_choice"] == "auto"

encoded = json.dumps(record, ensure_ascii=False)
assert "SECRET USER INPUT" not in encoded
assert "SECRET REQUEST" not in encoded
assert "SECRET PROMPT" not in encoded
assert "SECRET SCHEMA" not in encoded
assert "secret-key" not in encoded


def test_runtime_worker_llm_response_metadata_is_safe_and_actionable():
from types import SimpleNamespace

Expand Down Expand Up @@ -275,6 +349,55 @@ def test_runtime_worker_llm_response_metadata_is_safe_and_actionable():
assert event["usage"] == {"prompt_tokens": 100, "completion_tokens": 20}


def test_runtime_worker_llm_response_metadata_log_record_is_safe_and_greppable():
from types import SimpleNamespace

from runtime_manager.worker_main import (
_llm_metadata_log_record,
_llm_response_metadata_event,
)

tool_call = SimpleNamespace(function=SimpleNamespace(name="terminal"))
event = _llm_response_metadata_event(
{
"api_request_id": "turn-1:api:1",
"turn_id": "turn-1",
"api_call_count": 1,
"provider": "custom",
"model": "qwen3.6-35b-a3b",
"base_url": "https://models.example/v1",
"api_mode": "chat_completions",
"finish_reason": "tool_calls",
"response_model": "qwen3.6-35b-a3b",
"assistant_content_chars": 0,
"assistant_tool_call_count": 1,
"assistant_message": SimpleNamespace(tool_calls=[tool_call]),
"usage": {"prompt_tokens": 100, "completion_tokens": 20},
},
run_id="run-1",
)

record = _llm_metadata_log_record(
event,
request={
"user_id": "user-1",
"conversation_id": "conv-1",
"session_id": "session-1",
"message": "SECRET REQUEST",
},
)

assert record["message"] == "runtime.llm.metadata"
assert record["event"] == "llm.response.metadata"
assert record["run_id"] == "run-1"
assert record["session_id"] == "session-1"
assert record["finish_reason"] == "tool_calls"
assert record["assistant_tool_calls_count"] == 1
assert record["assistant_tool_names"] == ["terminal"]
assert record["output_tokens"] == 20
assert "SECRET REQUEST" not in json.dumps(record, ensure_ascii=False)


def test_runtime_worker_projects_compression_status_as_structured_event():
from runtime_manager.worker_main import _compression_event_from_status

Expand Down
Loading