diff --git a/runtime_manager/worker_main.py b/runtime_manager/worker_main.py index 8349ff9d4138..d53e7ce25d94 100644 --- a/runtime_manager/worker_main.py +++ b/runtime_manager/worker_main.py @@ -65,6 +65,12 @@ def emit(event: dict[str, Any]) -> None: sys.stdout.flush() +def log_event(event: dict[str, Any]) -> None: + with _OUTPUT_LOCK: + sys.stderr.write(json.dumps(event, ensure_ascii=False) + "\n") + sys.stderr.flush() + + def main() -> int: first_line = sys.stdin.readline() if not first_line: @@ -651,11 +657,13 @@ def on_pre_api_request(**kwargs: Any) -> None: ) if event: emit_fn(event) + _log_llm_metadata_event(event, request=request) def on_post_api_request(**kwargs: Any) -> None: event = _llm_response_metadata_event(kwargs, run_id=run_id) if event: emit_fn(event) + _log_llm_metadata_event(event, request=request) for hook_name, callback in ( ("pre_api_request", on_pre_api_request), @@ -761,6 +769,87 @@ def _llm_response_metadata_event( } +def _log_llm_metadata_event(event: dict[str, Any], *, request: dict[str, Any]) -> None: + try: + log_event(_llm_metadata_log_record(event, request=request)) + except Exception: + return + + +def _llm_metadata_log_record(event: dict[str, Any], *, request: dict[str, Any]) -> dict[str, Any]: + record: dict[str, Any] = { + "message": "runtime.llm.metadata", + "event": str(event.get("event") or ""), + "run_id": str(event.get("run_id") or ""), + "session_id": str(request.get("session_id") or request.get("conversation_id") or ""), + "user_id": str(request.get("user_id") or ""), + "conversation_id": str(request.get("conversation_id") or ""), + "timestamp": event.get("timestamp"), + "api_request_id": str(event.get("api_request_id") or ""), + "turn_id": str(event.get("turn_id") or ""), + "api_call_count": _coerce_positive_int(event.get("api_call_count")), + "provider": str(event.get("provider") or ""), + "model": str(event.get("model") or ""), + "base_url_host": str(event.get("base_url_host") or ""), + "api_mode": str(event.get("api_mode") or ""), + } + + if event.get("event") == "llm.request.metadata": + record.update( + { + "skills_count": _coerce_positive_int(event.get("skills_count")), + "skill_names": _normalize_string_list(event.get("skill_names")), + "skill_prompt_presence": event.get("skill_prompt_presence") + if isinstance(event.get("skill_prompt_presence"), dict) + else {}, + "enabled_toolsets": _normalize_string_list(event.get("enabled_toolsets")), + "disabled_toolsets": _normalize_string_list(event.get("disabled_toolsets")), + "tools_count": _coerce_positive_int(event.get("tools_count")), + "tool_names": _normalize_string_list(event.get("tool_names")), + "tool_choice": str(event.get("tool_choice") or ""), + "request_message_count": _coerce_positive_int( + event.get("request_message_count") + ), + "request_char_count": _coerce_positive_int(event.get("request_char_count")), + "approx_input_tokens": _coerce_positive_int(event.get("approx_input_tokens")), + "max_tokens": _coerce_positive_int(event.get("max_tokens")), + "system_prompt_chars": _coerce_positive_int(event.get("system_prompt_chars")), + "system_prompt_sha256": str(event.get("system_prompt_sha256") or ""), + } + ) + elif event.get("event") == "llm.response.metadata": + record.update( + { + "finish_reason": str(event.get("finish_reason") or ""), + "assistant_tool_calls_count": _coerce_positive_int( + event.get("assistant_tool_calls_count") + ), + "assistant_tool_names": _normalize_string_list( + event.get("assistant_tool_names") + ), + "assistant_content_chars": _coerce_positive_int( + event.get("assistant_content_chars") + ), + "response_model": str(event.get("response_model") or ""), + "output_tokens": _usage_output_tokens(event.get("usage")), + } + ) + return record + + +def _usage_output_tokens(value: Any) -> int: + if not isinstance(value, dict): + return 0 + return _coerce_positive_int( + _first_present( + value.get("completion_tokens"), + value.get("output_tokens"), + value.get("completionTokens"), + value.get("outputTokens"), + ) + ) + + def _hook_request_body(hook_kwargs: dict[str, Any]) -> dict[str, Any]: request_payload = hook_kwargs.get("request") if not isinstance(request_payload, dict): diff --git a/tests/runtime_manager/test_registry.py b/tests/runtime_manager/test_registry.py index 54c423cba1d3..7509c1e18f99 100644 --- a/tests/runtime_manager/test_registry.py +++ b/tests/runtime_manager/test_registry.py @@ -240,6 +240,80 @@ def test_runtime_worker_llm_request_metadata_is_safe_and_actionable(): assert "secret-key" not in encoded +def test_runtime_worker_llm_request_metadata_log_record_is_safe_and_greppable(): + from runtime_manager.worker_main import ( + _llm_metadata_log_record, + _llm_request_metadata_event, + ) + + event = _llm_request_metadata_event( + { + "api_request_id": "turn-1:api:1", + "turn_id": "turn-1", + "api_call_count": 1, + "provider": "custom", + "model": "qwen3.6-35b-a3b", + "base_url": "https://models.example/v1", + "request_char_count": 12345, + "approx_input_tokens": 6789, + "request": { + "body": { + "messages": [{"role": "user", "content": "SECRET USER INPUT"}], + "tools": [ + { + "type": "function", + "function": { + "name": "terminal", + "parameters": {"properties": {"command": {"description": "SECRET SCHEMA"}}}, + }, + } + ], + "tool_choice": "auto", + "api_key": "secret-key", + } + }, + }, + run_id="run-1", + request={ + "user_id": "user-1", + "conversation_id": "conv-1", + "session_id": "session-1", + "message": "SECRET REQUEST", + "skills": ["dba-diagnose-oracle"], + "enabled_toolsets": ["terminal", "file"], + }, + system_prompt="BASE\n\ndba-diagnose-oracle\n\nSECRET PROMPT", + ) + + record = _llm_metadata_log_record( + event, + request={ + "user_id": "user-1", + "conversation_id": "conv-1", + "session_id": "session-1", + "message": "SECRET REQUEST", + }, + ) + + assert record["message"] == "runtime.llm.metadata" + assert record["event"] == "llm.request.metadata" + assert record["run_id"] == "run-1" + assert record["session_id"] == "session-1" + assert record["user_id"] == "user-1" + assert record["conversation_id"] == "conv-1" + assert record["skill_names"] == ["dba-diagnose-oracle"] + assert record["tools_count"] == 1 + assert record["tool_names"] == ["terminal"] + assert record["tool_choice"] == "auto" + + encoded = json.dumps(record, ensure_ascii=False) + assert "SECRET USER INPUT" not in encoded + assert "SECRET REQUEST" not in encoded + assert "SECRET PROMPT" not in encoded + assert "SECRET SCHEMA" not in encoded + assert "secret-key" not in encoded + + def test_runtime_worker_llm_response_metadata_is_safe_and_actionable(): from types import SimpleNamespace @@ -275,6 +349,55 @@ def test_runtime_worker_llm_response_metadata_is_safe_and_actionable(): assert event["usage"] == {"prompt_tokens": 100, "completion_tokens": 20} +def test_runtime_worker_llm_response_metadata_log_record_is_safe_and_greppable(): + from types import SimpleNamespace + + from runtime_manager.worker_main import ( + _llm_metadata_log_record, + _llm_response_metadata_event, + ) + + tool_call = SimpleNamespace(function=SimpleNamespace(name="terminal")) + event = _llm_response_metadata_event( + { + "api_request_id": "turn-1:api:1", + "turn_id": "turn-1", + "api_call_count": 1, + "provider": "custom", + "model": "qwen3.6-35b-a3b", + "base_url": "https://models.example/v1", + "api_mode": "chat_completions", + "finish_reason": "tool_calls", + "response_model": "qwen3.6-35b-a3b", + "assistant_content_chars": 0, + "assistant_tool_call_count": 1, + "assistant_message": SimpleNamespace(tool_calls=[tool_call]), + "usage": {"prompt_tokens": 100, "completion_tokens": 20}, + }, + run_id="run-1", + ) + + record = _llm_metadata_log_record( + event, + request={ + "user_id": "user-1", + "conversation_id": "conv-1", + "session_id": "session-1", + "message": "SECRET REQUEST", + }, + ) + + assert record["message"] == "runtime.llm.metadata" + assert record["event"] == "llm.response.metadata" + assert record["run_id"] == "run-1" + assert record["session_id"] == "session-1" + assert record["finish_reason"] == "tool_calls" + assert record["assistant_tool_calls_count"] == 1 + assert record["assistant_tool_names"] == ["terminal"] + assert record["output_tokens"] == 20 + assert "SECRET REQUEST" not in json.dumps(record, ensure_ascii=False) + + def test_runtime_worker_projects_compression_status_as_structured_event(): from runtime_manager.worker_main import _compression_event_from_status