From 9dac019b9c7eae01b977f91dce6fcaf0f8b88f93 Mon Sep 17 00:00:00 2001 From: Daniel Ellison Date: Fri, 15 May 2026 12:22:27 -0400 Subject: [PATCH 1/3] fix(codex): rewrite to real app-server thread/turn/item protocol The codex backend and triage path were modeled on goose's ACP session/* + agent_message_chunk vocabulary, but the actual codex app-server protocol uses a distinct thread/turn/item vocabulary. The persistent backend's handshake failed with `unknown variant session/new` (codex enumerated 50+ valid method names instead), and the triage path's NDJSON parser looked for fields the codex exec --json schema never emits. Per the operator's go-ahead, this PR rewrites both surfaces against the schema in codex-rs/app-server/README.md and codex-rs/exec/src/exec_events.rs in the openai/codex repo. App-server (codex.py): - Handshake is now initialize -> initialized notification -> thread/start. protocolVersion field dropped from initialize (not in the schema). initialize.capabilities.optOutNotificationMethods suppresses remoteControl/status/changed, mcpServer/startupStatus/updated, thread/started, thread/tokenUsage/updated. - _session_id holds the codex thread.id for ABC consistency. - send loop uses turn/start with input array of {type:"text",text} blocks, optional model override per-turn. - Event parser: item/started, item/agentMessage/delta (concatenate delta per itemId), item/completed (authoritative for agentMessage), turn/completed (terminal), error notification (terminal). - _write_notification helper for the initialized step. codex exec --json (triage.py): - _extract_codex_text now walks events keyed on top-level type discriminator (thread.started / turn.started / item.started / item.updated / item.completed / turn.completed / turn.failed / error). For item.completed of an agent_message item, returns item.text. Falls back to latest item.updated if no completed arrives. turn.failed short-circuits to empty string. - _recover_terminal_text / _recover_chunk_text removed; the new _recover_agent_message_text replaces them with the right shape. Tests updated to match the real schemas; full suite 3084 pass. --- src/kai/codex.py | 322 ++++++++++++++++++++++++++++++------------- src/kai/triage.py | 167 +++++++++------------- tests/test_codex.py | 230 +++++++++++++++++-------------- tests/test_triage.py | 179 ++++++++++-------------- 4 files changed, 490 insertions(+), 408 deletions(-) diff --git a/src/kai/codex.py b/src/kai/codex.py index 22caa5d..222218c 100644 --- a/src/kai/codex.py +++ b/src/kai/codex.py @@ -1,33 +1,40 @@ """ OpenAI Codex CLI subprocess backend. -Implements the AgentBackend ABC for Codex's JSON-RPC 2.0 protocol over -a persistent `codex app-server` subprocess. Structurally equivalent to -GooseBackend: one subprocess per user, kept alive across messages, -restarted on /new or workspace switch. The wire protocol is JSON-RPC -2.0 (same family as goose's ACP), but the message types and event -vocabulary are codex-specific. - -The codex protocol (per pinned codex CLI version): - Startup: initialize -> session/new (handshake) - Input: session/prompt (JSON-RPC request with sessionId + prompt) - Output: session/update (streaming notifications, no id field) - Finish: JSON-RPC result with matching id - -Schema-drift posture: the documented codex event names are known to be -out of sync with actual CLI output (openai/codex#4776). This module -matches the goose ACP envelope (session/update notifications carrying -an agent_message_chunk payload) because that envelope is what JSON-RPC -2.0 servers conventionally emit; the first smoke test against a real -codex binary will reveal whatever variations exist for the pinned -version, and the unknown-event branches log at DEBUG and skip rather -than aborting. The codex CLI version is captured in install metadata -so a future bump triggers a re-pin pass. - -This module does NOT take a dependency on OpenAI's Codex Python SDK. -The decision matches goose.py: keeping the wire protocol in our own -code keeps Kai's release schedule decoupled from a vendor SDK's -versioning and avoids inheriting transitive dependencies. +Implements the AgentBackend ABC for the `codex app-server` JSON-RPC +protocol. Persistent subprocess per user, kept alive across messages, +restarted on /new or workspace switch. The wire framing is newline- +delimited JSON (NDJSON); each message is a JSON-RPC 2.0 envelope (the +`"jsonrpc":"2.0"` header may be omitted on the wire). + +The codex app-server protocol uses its own thread/turn/item vocabulary +- NOT the session/* / agent_message_chunk shape goose uses. The +authoritative reference is `codex-rs/app-server/README.md` in the +openai/codex repo: + + Handshake (per connection): + 1. client `initialize` request (clientInfo + capabilities) + 2. server response (userAgent, codexHome, platformFamily, platformOs) + 3. client `initialized` notification (no id) + 4. client `thread/start` request -> thread object with thread.id + + Per-message: + - client `turn/start` request (threadId, input[], optional model/cwd) + - server streams notifications: item/started -> N x + item/agentMessage/delta -> item/completed -> turn/completed. + - text is accumulated from item/agentMessage/delta `delta` fields + per itemId; the item/completed event carries the authoritative + full text and overrides our delta accumulation when present. + +Schema-drift posture: unknown notification methods and unrecognized +item types are logged at DEBUG and skipped rather than aborting the +stream. A future codex release that adds new item types must not +break the conversational stream. The codex CLI version is captured +in install metadata so a bump triggers a re-pin pass. + +This module does NOT depend on OpenAI's Codex Python SDK; the wire +protocol is implemented directly to keep Kai's release schedule +decoupled from a vendor SDK's versioning. """ import asyncio @@ -75,9 +82,11 @@ class CodexBackend(AgentBackend): AgentBackend implementation for OpenAI Codex CLI's JSON-RPC protocol. Manages the lifecycle of a persistent `codex app-server` subprocess: - starting with a two-step handshake (initialize + session/new), - sending prompts via session/prompt, streaming responses from - session/update notifications, and killing/restarting on demand. + starting with the initialize / initialized / thread/start handshake, + sending prompts via turn/start, streaming responses from the + item/* and turn/* notifications until turn/completed arrives, and + killing/restarting on demand. self._session_id stores the codex + `thread.id` for ABC consistency with the other backends. All message sends are serialized via an internal asyncio lock to prevent interleaving. Tool auto-approval is handled by codex's own @@ -292,42 +301,62 @@ async def _ensure_started(self) -> None: self._effective_codex_user = effective_codex_user self._inner_codex_pid = None - # Step 1: initialize - establish protocol version + # Handshake per the codex app-server protocol: + # 1. Client `initialize` request (clientInfo + optional + # capabilities). NO protocolVersion field; the server + # reports the protocol it speaks via its response and + # via per-method error messages, not a version echo. + # 2. Server response: userAgent, codexHome, platformFamily, + # platformOs. We do not need any of these at runtime; + # reading the result is purely a handshake gate. + # 3. Client `initialized` notification (no id). Required + # before any other request on the connection. Skipping + # this would have all subsequent calls rejected with + # "Not initialized". + # 4. Client `thread/start` request. Returns a thread object + # whose `id` field is the persistent session handle the + # bot uses for every subsequent turn/start. + # The optOutNotificationMethods list suppresses streams the + # bot does not consume - remoteControl status pings, MCP + # startup chatter, and thread/started which we already get + # via the thread/start response itself. self._next_id = 1 await self._write_rpc( "initialize", { - "protocolVersion": "v1", "clientInfo": {"name": "kai", "version": kai.__version__}, + "capabilities": { + "optOutNotificationMethods": [ + "remoteControl/status/changed", + "mcpServer/startupStatus/updated", + "thread/started", + "thread/tokenUsage/updated", + ], + }, }, ) await self._read_result(expected_id=1) - # Step 2: session/new - create a session with workspace cwd. - # Field shape mirrors the goose ACP convention; if the pinned - # codex version's actual schema differs, the smoke test will - # surface the error and this call needs adjustment. - await self._write_rpc( - "session/new", - { - "cwd": str(self.workspace), - }, - ) + await self._write_notification("initialized") + + thread_params: dict = {"cwd": str(self.workspace)} + if self.model: + thread_params["model"] = self.model + await self._write_rpc("thread/start", thread_params) result = await self._read_result(expected_id=2) - # Accept either camelCase or snake_case to tolerate codex CLI - # schema variants observed across versions. Both absent is a - # loud failure: a None session_id would otherwise flow into - # the next session/prompt as "sessionId": None and surface as - # a confusing downstream prompt error rather than a clear - # handshake mismatch. Fail at the boundary instead. - session_id = result.get("sessionId") or result.get("session_id") - if not session_id: + + # The thread.id is the conversational handle for all + # subsequent turn/start calls. We reuse the existing + # self._session_id attribute (ABC-mandated) to hold it; + # naming stays "session_id" for cross-backend consistency + # but the value is codex's thread UUID. + thread = result.get("thread", {}) + thread_id = thread.get("id") + if not thread_id: raise RuntimeError( - "Codex session/new returned no session id " - "(expected 'sessionId' or 'session_id' in result); " - "pinned codex CLI schema may differ from this build." + "Codex thread/start returned no thread.id; the pinned codex CLI may have an incompatible schema." ) - self._session_id = session_id + self._session_id = thread_id self._fresh_session = True async def _drain_stderr(self) -> None: @@ -351,13 +380,14 @@ async def _drain_stderr(self) -> None: # ── JSON-RPC helpers ─────────────────────────────────────────── - async def _write_rpc(self, method: str, params: dict) -> None: + async def _write_rpc(self, method: str, params: dict) -> int: """ Write a JSON-RPC 2.0 request to the subprocess stdin. Increments the monotonic request ID, serializes the message - with a trailing newline, and flushes. Raises RuntimeError if - the process or its stdin pipe is gone. + with a trailing newline, and flushes. Returns the request id + so callers can correlate the response. Raises if the process + or its stdin pipe is gone. """ assert self._proc is not None and self._proc.stdin is not None request_id = self._next_id @@ -375,6 +405,22 @@ async def _write_rpc(self, method: str, params: dict) -> None: ) self._proc.stdin.write(msg.encode()) await self._proc.stdin.drain() + return request_id + + async def _write_notification(self, method: str, params: dict | None = None) -> None: + """ + Write a JSON-RPC 2.0 notification (no id, no response expected). + + Used for the `initialized` handshake step: the codex app-server + spec requires the client to send `initialized` between the + `initialize` response and any other request on that connection. + """ + assert self._proc is not None and self._proc.stdin is not None + body: dict = {"jsonrpc": "2.0", "method": method} + if params is not None: + body["params"] = params + self._proc.stdin.write((json.dumps(body) + "\n").encode()) + await self._proc.stdin.drain() async def _read_result(self, expected_id: int) -> dict: """ @@ -503,20 +549,26 @@ async def _send_locked(self, prompt: str | list, chat_id: int | None = None) -> if not rpc_prompt: rpc_prompt = [{"type": "text", "text": "(empty prompt)"}] - # Send the session/prompt request + # Send the turn/start request. The codex app-server protocol + # uses `input` (array of typed content blocks) plus `threadId`; + # the JSON-RPC response carries the new turn's id, and the + # actual model output streams as item/* and turn/* notifications + # until a final turn/completed arrives. assert self._proc is not None assert self._proc.stdin is not None assert self._proc.stdout is not None try: - prompt_id = self._next_id - await self._write_rpc( - "session/prompt", - { - "sessionId": self._session_id, - "prompt": rpc_prompt, - }, - ) + turn_params: dict = { + "threadId": self._session_id, + "input": rpc_prompt, + } + # Pin model per-turn so workspace_config overrides or + # /model switches take effect on the next message without + # restarting the thread. + if self.model: + turn_params["model"] = self.model + prompt_id = await self._write_rpc("turn/start", turn_params) except OSError as e: log.error("Failed to write to Codex process: %s", e) await self._kill() @@ -606,42 +658,114 @@ async def _send_locked(self, prompt: str | list, chat_id: int | None = None) -> log.debug("Skipping non-JSON line: %s", line[:200]) continue - # Streaming notifications (no id field) - accumulate - # message chunks; log everything else at DEBUG and - # skip. The agent_message_chunk envelope mirrors goose - # ACP; the smoke test confirms or corrects this name. - if msg.get("method") == "session/update": - update = msg.get("params", {}).get("update", {}) - event_type = update.get("sessionUpdate") or update.get("event") or update.get("type") - if event_type == "agent_message_chunk": - text = update.get("content", {}).get("text", "") - if text: - accumulated += text + # Codex app-server event vocabulary (per the protocol + # README at codex-rs/app-server/README.md): + # + # - turn/start RESPONSE: matched by id, returns the + # turn object. We acknowledge it but do not finish + # here; the turn is still streaming. + # - turn/started NOTIFICATION: opted out via + # initialize.capabilities so it never arrives. + # - item/started: full ThreadItem with type and id. + # We only care about agentMessage items for the + # conversational stream; other item types (reasoning, + # commandExecution, fileChange, etc.) are logged at + # DEBUG. We capture the agentMessage item id so + # subsequent deltas can be tied back to the right + # stream. + # - item/agentMessage/delta: streaming text chunks. + # Concatenate `delta` per itemId. Codex emits text + # in roughly token-sized chunks. + # - item/completed: authoritative final state of the + # item. For agentMessage this carries the full + # accumulated `text`; we trust this over our own + # concatenation in case any delta was missed. + # - turn/completed: terminal event. Carries turn + # status (`completed` / `interrupted` / `failed`) + # and an optional error payload on failure. This + # is the signal to yield the final StreamEvent. + # - error notification: mid-turn error; may precede + # a failed turn/completed. We treat it as terminal. + method = msg.get("method") + if method == "item/started": + item = msg.get("params", {}).get("item", {}) + log.debug("Codex: item/started type=%s id=%s", item.get("type"), item.get("id")) + + elif method == "item/agentMessage/delta": + delta_text = msg.get("params", {}).get("delta", "") + if delta_text: + accumulated += delta_text + yield StreamEvent(text_so_far=accumulated) + + elif method == "item/completed": + item = msg.get("params", {}).get("item", {}) + if item.get("type") == "agentMessage": + # Authoritative full text; trust it over our + # delta-accumulation in case any delta was + # missed or arrived out of order. + final_text = item.get("text", "") + if final_text and final_text != accumulated: + accumulated = final_text yield StreamEvent(text_so_far=accumulated) else: - # Unknown event - log and skip. Schema-drift - # defense: a future codex version emitting a - # new event type (e.g. tool_call_update) does - # not break the conversational stream. - log.debug("Codex: skipping session/update event=%s", event_type) - - # Final result for our prompt (has matching id) - elif msg.get("id") == prompt_id and "result" in msg: - response = AgentResponse( - success=True, - text=accumulated, - session_id=self._session_id, - cost_usd=0.0, # subscription auth; codex reports no per-call cost - duration_ms=0, + log.debug("Codex: item/completed type=%s", item.get("type")) + + elif method == "turn/completed": + turn = msg.get("params", {}).get("turn", {}) + status = turn.get("status") + if status == "completed": + yield StreamEvent( + text_so_far=accumulated, + done=True, + response=AgentResponse( + success=True, + text=accumulated, + session_id=self._session_id, + cost_usd=0.0, + duration_ms=0, + ), + ) + return + # Non-completed terminal: interrupted or failed. + # turn.error carries the diagnostic when present. + err_obj = turn.get("error") or {} + err_msg = err_obj.get("message") or f"Codex turn ended with status={status}" + yield StreamEvent( + text_so_far=accumulated, + done=True, + response=AgentResponse( + success=False, + text=accumulated, + error=err_msg, + ), ) + return + + elif method == "error": + # Mid-turn error notification. Treat as terminal; + # the subsequent turn/completed with status=failed + # would be redundant. + err_obj = msg.get("params", {}).get("error", {}) + err_msg = err_obj.get("message") or "Codex error" yield StreamEvent( text_so_far=accumulated, done=True, - response=response, + response=AgentResponse( + success=False, + text=accumulated, + error=err_msg, + ), ) return - # JSON-RPC error for our prompt + # Response to our turn/start (id-matched). The result + # carries the initial turn object; we acknowledge it + # but the stream continues until turn/completed. + elif msg.get("id") == prompt_id and "result" in msg: + log.debug("Codex: turn/start acknowledged for id=%s", prompt_id) + + # JSON-RPC error matched on our turn/start id - request + # never made it to streaming, so finish here. elif msg.get("id") == prompt_id and "error" in msg: err = msg["error"].get("message", "unknown codex error") yield StreamEvent( @@ -656,9 +780,15 @@ async def _send_locked(self, prompt: str | list, chat_id: int | None = None) -> return else: - # Unknown top-level message shape. Schema-drift - # defense: log and skip rather than abort. - log.debug("Codex: skipping unrecognized message id=%s method=%s", msg.get("id"), msg.get("method")) + # Unknown method or unmatched id. Schema-drift + # defense: a new codex release adding extra + # notification types must not break the + # conversational stream. + log.debug( + "Codex: skipping unrecognized message id=%s method=%s", + msg.get("id"), + method, + ) except Exception as e: log.exception("Unexpected error reading Codex stream") diff --git a/src/kai/triage.py b/src/kai/triage.py index 779ee95..c66066a 100644 --- a/src/kai/triage.py +++ b/src/kai/triage.py @@ -628,46 +628,48 @@ def _extract_codex_text(stdout: str) -> str: """ Walk codex's NDJSON event stream and return the agent message text. - `codex exec --json` emits one JSON event per line. A streaming run - can produce two representations of the same message: incremental - chunks (delta events) and a single terminal/complete event that - carries the full consolidated text. Accumulating across both - representations would double the message; this parser prefers - the terminal text and only falls back to delta accumulation when - no terminal event appears in the stream. - - The exact event name and field path are not yet pinned by smoke - test, so this parser is defensive about field names: - - - Each line is attempted as JSON; non-JSON lines are skipped. - - Terminal/complete events are recognized by their FIELD PATH - rather than by an event-name string. Any of these signals a - complete message: top-level "content" as a string, top-level - "content" as a list of {"type":"text", "text":...} blocks, or - "item.content" with the same list shape. When multiple - terminal events appear, the last one wins (matches the - streaming convention that "completed" supersedes prior partials). - - Chunk/delta events use "delta.text" or a top-level "text" - field. Chunks accumulate in order. Used only when the stream - contains no terminal event. - - On any genuine schema mismatch the result is the empty string; - `_parse_triage_json` then raises a clearer error than a - doubled-up partial would. - - The smoke test against a real codex CLI will reveal which of - these field paths actually fire for the pinned version; the - others remain as fallbacks for schema drift. + `codex exec --json` emits one JSON event per line. Each event has + a top-level `type` tag from the ThreadEvent enum: + `thread.started`, `turn.started`, `turn.completed`, `turn.failed`, + `item.started`, `item.updated`, `item.completed`, `error`. (Note + the DOT separator; the app-server protocol uses slashes + instead. Same data model, different wire encoding.) + + For triage we only care about the agent's final natural-language + response. The `item.completed` event for an agent_message item + carries the full consolidated text: + + {"type": "item.completed", + "item": {"id": "...", "type": "agent_message", "text": "..."}} + + Schema reference: codex-rs/exec/src/exec_events.rs in the codex + repo. The `ThreadItemDetails` enum is `#[serde(tag = "type", + rename_all = "snake_case")]` so the discriminator is + `"agent_message"` (snake_case), and `text` is a flat field on + the item object (the inner enum is `#[serde(flatten)]`). + + A streaming run may emit `item.updated` events for the same + agent_message id before its `item.completed`. We trust the + completed event as authoritative; if no completed event arrived + (e.g. truncated stream) we fall back to the latest updated text + so triage gets something rather than nothing. + + Schema-drift posture: a future codex release that adds new event + types or item types must not break extraction. Unknown shapes + are silently skipped. A `turn.failed` event short-circuits to an + empty result so the caller raises a clearer error than a partial + body would. Args: stdout: The full stdout from `codex exec --json`. Returns: - The agent message text. Terminal text if any terminal event - was found; otherwise the accumulated delta/chunk text. Empty - string if no recognizable text was found in any event. + The agent_message text from the last `item.completed` + event, or the latest `item.updated` text as a fallback. + Empty string if no agent_message was emitted. """ - terminal_text: str | None = None - accumulated_chunks: list[str] = [] + completed_text: str | None = None + latest_updated_text: str | None = None for line in stdout.splitlines(): line = line.strip() if not line: @@ -676,80 +678,41 @@ def _extract_codex_text(stdout: str) -> str: obj = json.loads(line) except json.JSONDecodeError: continue - # Terminal event wins; the most recent terminal event - # supersedes earlier ones (a streaming run that emits - # interim consolidated events before the final one ends - # up with the latest version). - terminal = _recover_terminal_text(obj) - if terminal is not None: - terminal_text = terminal - continue - # Delta/chunk events accumulate, but only matter when no - # terminal event ever fires. - chunk = _recover_chunk_text(obj) - if chunk is not None: - accumulated_chunks.append(chunk) - if terminal_text is not None: - return terminal_text.strip() - return "".join(accumulated_chunks).strip() + event_type = obj.get("type") + # `turn.failed` is a terminal failure - no body text to + # extract; let the caller see an empty string and surface + # a clearer error than a half-event would. + if event_type == "turn.failed": + return "" + if event_type in ("item.completed", "item.updated"): + text = _recover_agent_message_text(obj) + if text is None: + continue + if event_type == "item.completed": + completed_text = text + else: + latest_updated_text = text + if completed_text is not None: + return completed_text.strip() + if latest_updated_text is not None: + return latest_updated_text.strip() + return "" -def _recover_terminal_text(obj: dict) -> str | None: +def _recover_agent_message_text(obj: dict) -> str | None: """ - Pull a complete/terminal agent message from one codex event. - - Recognized field paths for a complete message: - - Top-level "content" as a string (single-block shape). - - Top-level "content" as a list of {"type":"text", "text":...} blocks. - - "item.content" with the same list-of-blocks shape (events - wrapped in an "item" object). + Pull `item.text` from a codex exec event when the item is an + agent_message; return None otherwise. - Returns the consolidated text string, or None if the event has - none of these paths (and is therefore either a chunk/delta or - pure metadata). + The wire shape is `{..., "item": {"id": "...", "type": "agent_message", "text": "..."}}` + because ThreadItemDetails is serde-flattened onto ThreadItem. """ - content = obj.get("content") - if isinstance(content, str): - return content - if isinstance(content, list): - parts = [ - block["text"] - for block in content - if isinstance(block, dict) and block.get("type") == "text" and isinstance(block.get("text"), str) - ] - if parts: - return "".join(parts) item = obj.get("item") - if isinstance(item, dict): - i_content = item.get("content") - if isinstance(i_content, list): - parts = [ - block["text"] - for block in i_content - if isinstance(block, dict) and block.get("type") == "text" and isinstance(block.get("text"), str) - ] - if parts: - return "".join(parts) - return None - - -def _recover_chunk_text(obj: dict) -> str | None: - """ - Pull a streaming chunk/delta text fragment from one codex event. - - Recognized field paths for an incremental chunk: - - "delta.text" (the conventional streaming-delta shape). - - Top-level "text" string (an alternate shape some CLI versions emit). - - Returns the fragment text, or None if the event has no chunk-style - text field. - """ - delta = obj.get("delta") - if isinstance(delta, dict): - d_text = delta.get("text") - if isinstance(d_text, str): - return d_text - text = obj.get("text") + if not isinstance(item, dict): + return None + if item.get("type") != "agent_message": + return None + text = item.get("text") if isinstance(text, str): return text return None diff --git a/tests/test_codex.py b/tests/test_codex.py index e98d297..1cf5da3 100644 --- a/tests/test_codex.py +++ b/tests/test_codex.py @@ -66,64 +66,90 @@ def _initialize_result() -> bytes: ) -def _session_new_result(session_id: str = "codex-session-1") -> bytes: - """Build the server's response to a session/new request.""" +def _thread_start_result(thread_id: str = "codex-thread-1") -> bytes: + """Build the server's response to a thread/start request.""" return _json_line( { "jsonrpc": "2.0", "id": 2, - "result": {"sessionId": session_id}, + "result": { + "thread": { + "id": thread_id, + "sessionId": thread_id, + "modelProvider": "openai", + "cwd": "/tmp/test-workspace", + }, + "model": "gpt-5.4-mini", + }, } ) -def _agent_message_chunk(text: str, session_id: str = "codex-session-1") -> bytes: - """ - Build a session/update notification carrying an agent_message_chunk. +def _agent_message_delta(text: str, item_id: str = "item-1") -> bytes: + """Build an item/agentMessage/delta notification (streaming text chunk).""" + return _json_line( + { + "jsonrpc": "2.0", + "method": "item/agentMessage/delta", + "params": {"itemId": item_id, "delta": text}, + } + ) - Schema mirrors goose ACP. The pinned codex CLI version's actual - event names may differ; the schema-drift defense in the parser - treats unknown event types as skip-and-log, not as errors. - """ + +def _item_started_agent(item_id: str = "item-1") -> bytes: + """Build an item/started notification for an agentMessage item.""" return _json_line( { "jsonrpc": "2.0", - "method": "session/update", - "params": { - "sessionId": session_id, - "update": { - "sessionUpdate": "agent_message_chunk", - "content": {"type": "text", "text": text}, - }, - }, + "method": "item/started", + "params": {"item": {"id": item_id, "type": "agentMessage", "text": ""}}, } ) -def _unknown_event(event_name: str, session_id: str = "codex-session-1") -> bytes: - """Build a session/update with a deliberately unrecognized event type.""" +def _item_completed_agent(text: str, item_id: str = "item-1") -> bytes: + """Build an item/completed notification for an agentMessage item.""" return _json_line( { "jsonrpc": "2.0", - "method": "session/update", - "params": { - "sessionId": session_id, - "update": { - "sessionUpdate": event_name, - "content": {"type": "text", "text": "should be ignored"}, - }, - }, + "method": "item/completed", + "params": {"item": {"id": item_id, "type": "agentMessage", "text": text}}, + } + ) + + +def _unknown_event(method_name: str) -> bytes: + """Build a deliberately unrecognized notification.""" + return _json_line( + { + "jsonrpc": "2.0", + "method": method_name, + "params": {"foo": "bar"}, } ) -def _completion_result(prompt_id: int = 3) -> bytes: - """Build a completion result for the given prompt id.""" +def _turn_completed(status: str = "completed", error_msg: str | None = None) -> bytes: + """Build a terminal turn/completed notification.""" + turn: dict = {"id": "turn-1", "status": status, "items": []} + if error_msg is not None: + turn["error"] = {"message": error_msg} + return _json_line( + { + "jsonrpc": "2.0", + "method": "turn/completed", + "params": {"turn": turn}, + } + ) + + +def _turn_start_ack(prompt_id: int = 3) -> bytes: + """Build the JSON-RPC response to a turn/start (acknowledgement, not terminal).""" return _json_line( { "jsonrpc": "2.0", "id": prompt_id, - "result": {"stopReason": "end_turn"}, + "result": {"turn": {"id": "turn-1", "status": "inProgress", "items": [], "error": None}}, } ) @@ -157,9 +183,15 @@ def _make_mock_proc(stdout_lines: list[bytes]) -> MagicMock: return proc -def _handshake_lines(session_id: str = "codex-session-1") -> list[bytes]: - """Return the two stdout lines for a successful handshake.""" - return [_initialize_result(), _session_new_result(session_id)] +def _handshake_lines(thread_id: str = "codex-thread-1") -> list[bytes]: + """Return the two stdout lines for a successful handshake. + + The codex app-server handshake is: + 1. client `initialize` -> server response (id=1) + 2. client `initialized` notification (no response) + 3. client `thread/start` -> server response (id=2) + """ + return [_initialize_result(), _thread_start_result(thread_id)] async def _collect_events(c: CodexBackend, prompt: str | list = "test") -> list[StreamEvent]: @@ -277,11 +309,11 @@ def test_session_id_none(self): class TestHandshake: - """Verify _ensure_started() runs the initialize + session/new handshake.""" + """Verify _ensure_started() runs the initialize + initialized + thread/start handshake.""" @pytest.mark.asyncio async def test_successful_handshake(self): - """Handshake sets _session_id from session/new result.""" + """Handshake sets _session_id from thread/start result.thread.id.""" c = _make_codex() proc = _make_mock_proc(_handshake_lines("test-codex-42")) @@ -292,19 +324,33 @@ async def test_successful_handshake(self): assert c._fresh_session is True assert c.is_alive is True - # Verify the two handshake messages were written + # Three stdin writes: initialize (request), initialized + # (notification, no id), thread/start (request). writes = proc.stdin.write.call_args_list - assert len(writes) == 2 + assert len(writes) == 3 init_msg = json.loads(writes[0][0][0].decode()) assert init_msg["method"] == "initialize" assert init_msg["id"] == 1 assert init_msg["params"]["clientInfo"]["name"] == "kai" - - session_msg = json.loads(writes[1][0][0].decode()) - assert session_msg["method"] == "session/new" - assert session_msg["id"] == 2 - assert session_msg["params"]["cwd"] == "/tmp/test-workspace" + # protocolVersion must NOT be sent; the field is not part of + # the codex app-server initialize schema. + assert "protocolVersion" not in init_msg["params"] + # opt-out list ships in capabilities to suppress noisy + # notifications we never consume. + opt_out = init_msg["params"]["capabilities"]["optOutNotificationMethods"] + assert "remoteControl/status/changed" in opt_out + assert "mcpServer/startupStatus/updated" in opt_out + + initialized_msg = json.loads(writes[1][0][0].decode()) + assert initialized_msg["method"] == "initialized" + assert "id" not in initialized_msg # Notifications carry no id. + + thread_msg = json.loads(writes[2][0][0].decode()) + assert thread_msg["method"] == "thread/start" + assert thread_msg["id"] == 2 + assert thread_msg["params"]["cwd"] == "/tmp/test-workspace" + assert thread_msg["params"]["model"] == "gpt-5.4" @pytest.mark.asyncio async def test_argv_invokes_codex_app_server(self): @@ -366,54 +412,30 @@ async def test_handshake_eof_raises(self): await c._ensure_started() @pytest.mark.asyncio - async def test_handshake_missing_session_id_raises(self): + async def test_handshake_missing_thread_id_raises(self): """ - session/new without a recognizable session-id key raises - RuntimeError at the handshake boundary. - - A silent None session_id would otherwise flow into the next - session/prompt as `"sessionId": None`, producing a confusing - downstream prompt error instead of a clear handshake mismatch. - Fail loudly at the schema boundary. + thread/start without a thread.id raises RuntimeError at the + handshake boundary. A silent None thread_id would otherwise + flow into turn/start as `"threadId": None`, producing a + confusing downstream error instead of a clear handshake + mismatch. Fail loudly at the schema boundary. """ - bad_session_result = _json_line( + bad_thread_result = _json_line( { "jsonrpc": "2.0", "id": 2, - "result": {"someOtherKey": "value"}, + "result": {"thread": {"sessionId": "no-id-here"}}, } ) - proc = _make_mock_proc([_initialize_result(), bad_session_result]) + proc = _make_mock_proc([_initialize_result(), bad_thread_result]) c = _make_codex() with ( patch("asyncio.create_subprocess_exec", AsyncMock(return_value=proc)), - pytest.raises(RuntimeError, match="no session id"), + pytest.raises(RuntimeError, match=r"no thread\.id"), ): await c._ensure_started() - @pytest.mark.asyncio - async def test_handshake_accepts_snake_case_session_id(self): - """ - session/new result with `session_id` (snake_case) is accepted - alongside camelCase `sessionId`. Tolerates codex CLI schema - variants observed across versions. - """ - snake_case_result = _json_line( - { - "jsonrpc": "2.0", - "id": 2, - "result": {"session_id": "snake-case-session"}, - } - ) - proc = _make_mock_proc([_initialize_result(), snake_case_result]) - c = _make_codex() - - with patch("asyncio.create_subprocess_exec", AsyncMock(return_value=proc)): - await c._ensure_started() - - assert c._session_id == "snake-case-session" - @pytest.mark.asyncio async def test_model_env_var_set(self): """CODEX_MODEL env var is set during startup.""" @@ -474,9 +496,9 @@ async def test_agent_message_chunk_accumulation(self): c = _make_codex() c._proc = _make_mock_proc( [ - _agent_message_chunk("Hello"), - _agent_message_chunk(" world"), - _completion_result(prompt_id=3), + _agent_message_delta("Hello"), + _agent_message_delta(" world"), + _turn_completed("completed"), ] ) c._session_id = "test-session" @@ -506,8 +528,8 @@ async def test_unknown_event_type_skipped(self): c._proc = _make_mock_proc( [ _unknown_event("future.unknown.thing"), - _agent_message_chunk("real text"), - _completion_result(prompt_id=3), + _agent_message_delta("real text"), + _turn_completed("completed"), ] ) c._session_id = "test-session" @@ -529,8 +551,8 @@ async def test_non_json_lines_skipped(self): c._proc = _make_mock_proc( [ b"some random output\n", - _agent_message_chunk("hello"), - _completion_result(prompt_id=3), + _agent_message_delta("hello"), + _turn_completed("completed"), ] ) c._session_id = "test-session" @@ -568,8 +590,8 @@ async def test_alternate_event_field_names_skipped(self): c._proc = _make_mock_proc( [ weird_event, - _agent_message_chunk("real"), - _completion_result(prompt_id=3), + _agent_message_delta("real"), + _turn_completed("completed"), ] ) c._session_id = "test-session" @@ -593,8 +615,8 @@ async def test_completion_yields_done_event(self): c = _make_codex() c._proc = _make_mock_proc( [ - _agent_message_chunk("answer"), - _completion_result(prompt_id=3), + _agent_message_delta("answer"), + _turn_completed("completed"), ] ) c._session_id = "test-session" @@ -617,7 +639,7 @@ async def test_eof_with_accumulated_text(self): c = _make_codex() c._proc = _make_mock_proc( [ - _agent_message_chunk("partial"), + _agent_message_delta("partial"), b"", # EOF ] ) @@ -677,7 +699,7 @@ async def test_error_after_partial_text(self): c = _make_codex() c._proc = _make_mock_proc( [ - _agent_message_chunk("partial"), + _agent_message_delta("partial"), _error_result(prompt_id=3, message="cut off"), ] ) @@ -710,8 +732,8 @@ async def test_fresh_session_injects_context(self): ) c._proc = _make_mock_proc( [ - _agent_message_chunk("ok"), - _completion_result(prompt_id=3), + _agent_message_delta("ok"), + _turn_completed("completed"), ] ) c._session_id = "test-session" @@ -723,7 +745,7 @@ async def test_fresh_session_injects_context(self): write_calls = c._proc.stdin.write.call_args_list prompt_msg = json.loads(write_calls[-1][0][0].decode()) - prompt_text = prompt_msg["params"]["prompt"][0]["text"] + prompt_text = prompt_msg["params"]["input"][0]["text"] assert prompt_text.startswith("[CONTEXT]") assert "hello" in prompt_text @@ -733,8 +755,8 @@ async def test_second_send_no_context(self): c = _make_codex(workspace=Path("/tmp/ws"), home_workspace=Path("/tmp/ws")) c._proc = _make_mock_proc( [ - _agent_message_chunk("ok"), - _completion_result(prompt_id=3), + _agent_message_delta("ok"), + _turn_completed("completed"), ] ) c._session_id = "test-session" @@ -757,7 +779,7 @@ class TestPromptCoercion: async def test_string_prompt(self): """A string prompt becomes a single text block.""" c = _make_codex() - c._proc = _make_mock_proc([_agent_message_chunk("ok"), _completion_result(prompt_id=3)]) + c._proc = _make_mock_proc([_agent_message_delta("ok"), _turn_completed("completed")]) c._session_id = "test-session" c._fresh_session = False c._next_id = 3 @@ -766,13 +788,13 @@ async def test_string_prompt(self): write_calls = c._proc.stdin.write.call_args_list prompt_msg = json.loads(write_calls[-1][0][0].decode()) - assert prompt_msg["params"]["prompt"] == [{"type": "text", "text": "hello"}] + assert prompt_msg["params"]["input"] == [{"type": "text", "text": "hello"}] @pytest.mark.asyncio async def test_list_prompt_text_only(self): """A list of text blocks passes through unchanged.""" c = _make_codex() - c._proc = _make_mock_proc([_agent_message_chunk("ok"), _completion_result(prompt_id=3)]) + c._proc = _make_mock_proc([_agent_message_delta("ok"), _turn_completed("completed")]) c._session_id = "test-session" c._fresh_session = False c._next_id = 3 @@ -782,13 +804,13 @@ async def test_list_prompt_text_only(self): write_calls = c._proc.stdin.write.call_args_list prompt_msg = json.loads(write_calls[-1][0][0].decode()) - assert prompt_msg["params"]["prompt"] == blocks + assert prompt_msg["params"]["input"] == blocks @pytest.mark.asyncio async def test_list_prompt_drops_non_text_blocks(self): """Non-text blocks are dropped with a warning; text blocks preserved.""" c = _make_codex() - c._proc = _make_mock_proc([_agent_message_chunk("ok"), _completion_result(prompt_id=3)]) + c._proc = _make_mock_proc([_agent_message_delta("ok"), _turn_completed("completed")]) c._session_id = "test-session" c._fresh_session = False c._next_id = 3 @@ -802,13 +824,13 @@ async def test_list_prompt_drops_non_text_blocks(self): write_calls = c._proc.stdin.write.call_args_list prompt_msg = json.loads(write_calls[-1][0][0].decode()) # Image block dropped; text block kept - assert prompt_msg["params"]["prompt"] == [{"type": "text", "text": "keep"}] + assert prompt_msg["params"]["input"] == [{"type": "text", "text": "keep"}] @pytest.mark.asyncio async def test_empty_list_prompt_synthesizes_placeholder(self): """An all-non-text list yields a single placeholder text block.""" c = _make_codex() - c._proc = _make_mock_proc([_agent_message_chunk("ok"), _completion_result(prompt_id=3)]) + c._proc = _make_mock_proc([_agent_message_delta("ok"), _turn_completed("completed")]) c._session_id = "test-session" c._fresh_session = False c._next_id = 3 @@ -818,7 +840,7 @@ async def test_empty_list_prompt_synthesizes_placeholder(self): write_calls = c._proc.stdin.write.call_args_list prompt_msg = json.loads(write_calls[-1][0][0].decode()) - assert prompt_msg["params"]["prompt"] == [{"type": "text", "text": "(empty prompt)"}] + assert prompt_msg["params"]["input"] == [{"type": "text", "text": "(empty prompt)"}] # ── Restart / force_kill / shutdown / change_workspace ──────────── @@ -960,7 +982,7 @@ class TestSendLock: async def test_lock_serializes_sends(self): """The lock is held across the entire send() generator.""" c = _make_codex() - c._proc = _make_mock_proc([_agent_message_chunk("first"), _completion_result(prompt_id=3)]) + c._proc = _make_mock_proc([_agent_message_delta("first"), _turn_completed("completed")]) c._session_id = "test-session" c._fresh_session = False c._next_id = 3 diff --git a/tests/test_triage.py b/tests/test_triage.py index 5dacc4c..6eee44d 100644 --- a/tests/test_triage.py +++ b/tests/test_triage.py @@ -664,19 +664,19 @@ class TestRunTriageCodex: @staticmethod def _codex_ndjson(text: str) -> str: """ - Build a minimal NDJSON stream that _extract_codex_text resolves - to the given final text. Uses the "content" list-of-blocks - shape (the JSON-RPC convention goose ACP uses) since the - actual codex schema is not yet pinned. + Build a minimal NDJSON stream that mirrors the real + `codex exec --json` schema (codex-rs/exec/src/exec_events.rs): + each event has a top-level `type` tag; item.completed for an + agent_message item carries the full consolidated `text`. """ events = [ - {"event": "thread.started"}, - {"event": "turn.started"}, + {"type": "thread.started", "thread_id": "thr_test"}, + {"type": "turn.started"}, { - "event": "item.message.completed", - "content": [{"type": "text", "text": text}], + "type": "item.completed", + "item": {"id": "item_1", "type": "agent_message", "text": text}, }, - {"event": "turn.completed"}, + {"type": "turn.completed", "usage": {"input_tokens": 0, "output_tokens": 0}}, ] return "\n".join(json.dumps(e) for e in events) + "\n" @@ -811,32 +811,35 @@ async def test_codex_subprocess_failure_raises(self): await run_triage("prompt", agent_backend="codex") @pytest.mark.asyncio - async def test_codex_handles_streaming_deltas_plus_terminal(self): + async def test_codex_completed_supersedes_updated(self): """ End-to-end through run_triage: a stream that contains both - delta chunks AND a terminal consolidated message returns the - terminal JSON exactly once, parseable by _parse_triage_json. - - This is the integration counterpart to - TestExtractCodexText::test_terminal_text_wins_over_accumulated_deltas. - Without the terminal-wins rule, the triage path would return - a doubled JSON string (`{"labels":[]}{"labels":[]}`) and the - downstream parser would fail on every codex run that streams. + item.updated events (interim consolidated text) AND a final + item.completed returns the completed text exactly once, + parseable by _parse_triage_json. + + Without the completed-wins rule, the triage path could + accumulate or return stale interim text and the downstream + parser would fail on every codex run that streams. """ expected_json = '{"labels": ["bug"], "summary": "ok"}' events = [ - {"event": "delta", "delta": {"text": '{"labels":'}}, - {"event": "delta", "delta": {"text": ' ["bug"], "summary": "ok"}'}}, + {"type": "thread.started", "thread_id": "thr_test"}, + {"type": "turn.started"}, + {"type": "item.started", "item": {"id": "i1", "type": "agent_message", "text": ""}}, + {"type": "item.updated", "item": {"id": "i1", "type": "agent_message", "text": '{"labels":'}}, { - "event": "item.message.completed", - "item": {"content": [{"type": "text", "text": expected_json}]}, + "type": "item.updated", + "item": {"id": "i1", "type": "agent_message", "text": '{"labels": ["bug"], "summa'}, }, + {"type": "item.completed", "item": {"id": "i1", "type": "agent_message", "text": expected_json}}, + {"type": "turn.completed", "usage": {"input_tokens": 0, "output_tokens": 0}}, ] stream = "\n".join(json.dumps(e) for e in events) + "\n" mock_proc = _mock_subprocess(stdout=stream) with patch("kai.triage.asyncio.create_subprocess_exec", return_value=mock_proc): result = await run_triage("prompt", agent_backend="codex") - # Exactly the terminal JSON, not deltas + terminal concatenated. + # Exactly the completed text, not interim updates concatenated. assert result == expected_json @@ -844,11 +847,11 @@ class TestExtractCodexText: """ Unit tests for the _extract_codex_text NDJSON parser. - The codex CLI schema is not yet pinned by smoke test; the parser - accepts multiple field paths (top-level "text"; "content" as - string or list-of-blocks; "delta.text"; "item.content") so a - schema variant the docs do not describe still yields the agent - message. These tests lock each path independently. + The codex exec --json schema (codex-rs/exec/src/exec_events.rs): + each event has top-level `type` discriminator; agent_message + items carry their full text in `item.text`. item.completed is + authoritative; item.updated is interim. turn.failed terminates + extraction with an empty result. """ def test_empty_input(self): @@ -857,110 +860,74 @@ def test_empty_input(self): def test_skips_non_json_lines(self): """Non-JSON lines are silently skipped.""" - stream = "not-json\n" + json.dumps({"text": "hello"}) + "\n" - assert _extract_codex_text(stream) == "hello" - - def test_extracts_top_level_text(self): - """Events with a top-level 'text' field contribute that string.""" - stream = json.dumps({"event": "delta", "text": "abc"}) + "\n" - assert _extract_codex_text(stream) == "abc" - - def test_extracts_delta_text(self): - """Events with 'delta.text' (alternate streaming shape) work.""" - stream = json.dumps({"event": "delta", "delta": {"text": "abc"}}) + "\n" - assert _extract_codex_text(stream) == "abc" - - def test_extracts_content_string(self): - """Events with 'content' as a string contribute that string.""" - stream = json.dumps({"event": "msg", "content": "hello"}) + "\n" + valid = json.dumps({"type": "item.completed", "item": {"id": "i", "type": "agent_message", "text": "hello"}}) + stream = "not-json\n" + valid + "\n" assert _extract_codex_text(stream) == "hello" - def test_extracts_content_list_of_text_blocks(self): - """Events with 'content' as a list of {type:text, text:...} blocks work.""" - event = { - "event": "msg", - "content": [ - {"type": "text", "text": "part-A"}, - {"type": "text", "text": " part-B"}, - ], - } + def test_extracts_item_completed_agent_message(self): + """item.completed for an agent_message returns item.text.""" + event = {"type": "item.completed", "item": {"id": "i1", "type": "agent_message", "text": "hello"}} stream = json.dumps(event) + "\n" - assert _extract_codex_text(stream) == "part-A part-B" + assert _extract_codex_text(stream) == "hello" - def test_extracts_item_content(self): - """Events with text inside 'item.content[...]' work.""" - event = {"event": "item.message.completed", "item": {"content": [{"type": "text", "text": "from item"}]}} + def test_ignores_non_agent_message_items(self): + """item.completed for non-agent_message items (e.g. reasoning) contributes nothing.""" + event = {"type": "item.completed", "item": {"id": "i1", "type": "reasoning", "text": "thinking..."}} stream = json.dumps(event) + "\n" - assert _extract_codex_text(stream) == "from item" - - def test_ignores_unknown_event_types(self): - """An event with no recognizable text payload contributes nothing.""" - stream = json.dumps({"event": "thread.started", "metadata": {"foo": "bar"}}) + "\n" assert _extract_codex_text(stream) == "" - def test_accumulates_across_events(self): - """Text from multiple events accumulates in order.""" + def test_ignores_lifecycle_events(self): + """thread.started, turn.started, turn.completed contribute nothing.""" events = [ - {"event": "delta", "text": "Hello"}, - {"event": "delta", "text": " "}, - {"event": "delta", "text": "world"}, + {"type": "thread.started", "thread_id": "thr_1"}, + {"type": "turn.started"}, + {"type": "turn.completed", "usage": {"input_tokens": 0, "output_tokens": 0}}, ] stream = "\n".join(json.dumps(e) for e in events) + "\n" - assert _extract_codex_text(stream) == "Hello world" + assert _extract_codex_text(stream) == "" def test_strips_outer_whitespace(self): """The accumulated result has leading/trailing whitespace stripped.""" - stream = json.dumps({"text": " hello "}) + "\n" + event = {"type": "item.completed", "item": {"id": "i", "type": "agent_message", "text": " hello "}} + stream = json.dumps(event) + "\n" assert _extract_codex_text(stream) == "hello" - def test_terminal_text_wins_over_accumulated_deltas(self): - """ - A stream with both delta chunks and a terminal consolidated - message returns the terminal text exactly once, NOT the - deltas concatenated with the terminal. - - Without the terminal-wins rule, the parser would yield - `"HelloHello"` for the worked example below: triage's JSON - parser then fails because `{"labels":[]}{"labels":[]}` is - not a single JSON object, breaking the triage path on - every codex run that streams. This regression guard - protects against a future refactor that re-introduces - accumulate-across-representations. - """ + def test_completed_wins_over_updated(self): + """item.completed text supersedes any earlier item.updated text.""" events = [ - {"event": "delta", "delta": {"text": "Hel"}}, - {"event": "delta", "delta": {"text": "lo"}}, - {"event": "item.message.completed", "item": {"content": [{"type": "text", "text": "Hello"}]}}, + {"type": "item.updated", "item": {"id": "i", "type": "agent_message", "text": "partial"}}, + {"type": "item.updated", "item": {"id": "i", "type": "agent_message", "text": "partial more"}}, + {"type": "item.completed", "item": {"id": "i", "type": "agent_message", "text": "FINAL"}}, ] stream = "\n".join(json.dumps(e) for e in events) + "\n" - assert _extract_codex_text(stream) == "Hello" + assert _extract_codex_text(stream) == "FINAL" - def test_last_terminal_wins_on_multiple_terminals(self): - """ - When a stream emits more than one terminal/complete event - (e.g. an interim consolidated text followed by a final one), - the most recent terminal text wins. Mirrors the streaming - convention that "completed" supersedes prior partials. - """ + def test_last_completed_wins(self): + """Multiple item.completed events (rare): last one wins.""" events = [ - {"item": {"content": [{"type": "text", "text": "first"}]}}, - {"item": {"content": [{"type": "text", "text": "final"}]}}, + {"type": "item.completed", "item": {"id": "i1", "type": "agent_message", "text": "first"}}, + {"type": "item.completed", "item": {"id": "i2", "type": "agent_message", "text": "second"}}, ] stream = "\n".join(json.dumps(e) for e in events) + "\n" - assert _extract_codex_text(stream) == "final" + assert _extract_codex_text(stream) == "second" - def test_deltas_only_when_no_terminal(self): - """ - With no terminal event, delta chunks accumulate as the result. - Locks the fallback behavior the terminal-wins rule does not - short-circuit when no terminal text was emitted. - """ + def test_updated_fallback_when_no_completed(self): + """If only item.updated events arrive (truncated stream), use the latest one.""" events = [ - {"event": "delta", "delta": {"text": "Hel"}}, - {"event": "delta", "delta": {"text": "lo"}}, + {"type": "item.updated", "item": {"id": "i", "type": "agent_message", "text": "first"}}, + {"type": "item.updated", "item": {"id": "i", "type": "agent_message", "text": "second"}}, ] stream = "\n".join(json.dumps(e) for e in events) + "\n" - assert _extract_codex_text(stream) == "Hello" + assert _extract_codex_text(stream) == "second" + + def test_turn_failed_short_circuits(self): + """A turn.failed event terminates extraction with the empty string.""" + events = [ + {"type": "item.completed", "item": {"id": "i", "type": "agent_message", "text": "ignored"}}, + {"type": "turn.failed", "error": {"message": "model unavailable"}}, + ] + stream = "\n".join(json.dumps(e) for e in events) + "\n" + assert _extract_codex_text(stream) == "" class TestResolveGooseModelTriage: From c4ca1f93d5d8a61112484a0718d34a8ca01b4a7b Mon Sep 17 00:00:00 2001 From: Daniel Ellison Date: Fri, 15 May 2026 12:48:26 -0400 Subject: [PATCH 2/3] fix(codex): honor CODEX_BIN at runtime, persist it to /etc/kai/env Smoke-test discovery: sudo from the kai service user uses kai's PATH to resolve bare command names, and on multi-user installs codex typically lives in a per-os_user home (e.g. /Users/daniel/.npm-global/bin) that is NOT on the service user's PATH. The bot's bare `codex` spawn then fails with "a password is required" because sudo cannot find a binary to match the sudoers rule against. - src/kai/codex.py: argv[0] now reads from CODEX_BIN env var, falling back to bare "codex" for single-user installs where it's on PATH. - src/kai/triage.py: same lever for the codex exec --json triage path. - src/kai/install.py: persist CODEX_BIN to /etc/kai/env when set at install time, so the running bot picks up the same absolute path the sudoers rule names. - Tests lock both branches: bare "codex" when env unset, full path when set. --- src/kai/codex.py | 11 ++++++++++- src/kai/install.py | 13 +++++++++++++ src/kai/triage.py | 8 +++++++- tests/test_codex.py | 26 +++++++++++++++++++++++++- tests/test_triage.py | 24 +++++++++++++++++++++++- 5 files changed, 78 insertions(+), 4 deletions(-) diff --git a/src/kai/codex.py b/src/kai/codex.py index 222218c..6f9483c 100644 --- a/src/kai/codex.py +++ b/src/kai/codex.py @@ -238,7 +238,16 @@ async def _ensure_started(self) -> None: # an unnecessary self-sudo) and the value unchanged otherwise. effective_codex_user = resolve_claude_user(self.codex_user) - codex_argv = ["codex", "app-server"] + # Resolve the codex binary path. When `codex` is not on the + # service user's PATH (multi-user installs where codex lives + # in a per-os_user home, e.g. /Users/daniel/.npm-global/bin), + # sudo cannot find the bare name and the spawn dies with + # "a password is required". The CODEX_BIN env var lets the + # install (or operator) pin an absolute path that the sudoers + # rule also names exactly. Falls back to bare "codex" so + # single-user installs with codex on PATH still work. + codex_bin = os.environ.get("CODEX_BIN") or "codex" + codex_argv = [codex_bin, "app-server"] if effective_codex_user: # -H sets HOME to 's pw entry so codex reads # auth from ~/.codex/auth.json, not the bot's diff --git a/src/kai/install.py b/src/kai/install.py index 39393ef..25c7621 100644 --- a/src/kai/install.py +++ b/src/kai/install.py @@ -1152,6 +1152,19 @@ def _cmd_config() -> None: env["CODEX_AUTH_MODE"] = codex_auth_mode if codex_api_key: env["OPENAI_API_KEY"] = codex_api_key + # Persist the resolved codex binary path so the running bot + # invokes the same absolute path as the sudoers rule. Sudo + # uses the service user's PATH to resolve bare command names, + # and on multi-user installs codex usually lives in a per- + # os_user home that isn't on the service PATH - the spawn + # then fails with "a password is required" because sudo + # can't find a binary to match the rule against. Only + # emitted when explicitly set via CODEX_BIN at install time, + # otherwise the runtime falls back to bare "codex" (correct + # for single-user installs where it's on PATH). + codex_bin = os.environ.get("CODEX_BIN") + if codex_bin: + env["CODEX_BIN"] = codex_bin # Remove stale renamed keys if present - leaving both the old and # new key causes silent confusion (the deprecation warning is diff --git a/src/kai/triage.py b/src/kai/triage.py index c66066a..0311d57 100644 --- a/src/kai/triage.py +++ b/src/kai/triage.py @@ -433,8 +433,14 @@ async def run_triage( agent_backend, override=os.environ.get("ISSUE_TRIAGE_MODEL_CODEX", ""), ) + # Pin the absolute codex path when CODEX_BIN is set; same + # rationale as codex.py - sudo cannot resolve bare `codex` + # when the binary lives in a per-os_user home that isn't on + # the service user's PATH. Falls back to bare "codex" for + # installs where codex is on PATH. + codex_bin = os.environ.get("CODEX_BIN") or "codex" codex_cmd = [ - "codex", + codex_bin, "exec", "--json", "--model", diff --git a/tests/test_codex.py b/tests/test_codex.py index 1cf5da3..25d1e08 100644 --- a/tests/test_codex.py +++ b/tests/test_codex.py @@ -362,13 +362,37 @@ async def test_argv_invokes_codex_app_server(self): c = _make_codex() proc = _make_mock_proc(_handshake_lines()) - with patch("asyncio.create_subprocess_exec", AsyncMock(return_value=proc)) as mock_exec: + with ( + patch.dict(os.environ, {}, clear=False), + patch("asyncio.create_subprocess_exec", AsyncMock(return_value=proc)) as mock_exec, + ): + os.environ.pop("CODEX_BIN", None) await c._ensure_started() argv = mock_exec.call_args[0] assert argv[0] == "codex" assert argv[1] == "app-server" + @pytest.mark.asyncio + async def test_argv_uses_codex_bin_env_var(self): + """ + CODEX_BIN env var overrides the bare "codex" argv[0]. Locks + the absolute-path invocation needed when codex lives in a + per-os_user home not on the service user's PATH. + """ + c = _make_codex() + proc = _make_mock_proc(_handshake_lines()) + + with ( + patch.dict(os.environ, {"CODEX_BIN": "/Users/daniel/.npm-global/bin/codex"}), + patch("asyncio.create_subprocess_exec", AsyncMock(return_value=proc)) as mock_exec, + ): + await c._ensure_started() + + argv = mock_exec.call_args[0] + assert argv[0] == "/Users/daniel/.npm-global/bin/codex" + assert argv[1] == "app-server" + @pytest.mark.asyncio async def test_skips_when_alive(self): """_ensure_started() is a no-op when the process is already running.""" diff --git a/tests/test_triage.py b/tests/test_triage.py index 6eee44d..a72b9f1 100644 --- a/tests/test_triage.py +++ b/tests/test_triage.py @@ -688,7 +688,11 @@ async def test_codex_argv_uses_codex_exec(self): boundary: the codex triage branch never spawns claude. """ mock_proc = _mock_subprocess(stdout=self._codex_ndjson('{"labels": []}')) - with patch("kai.triage.asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec: + with ( + patch.dict(os.environ, {}, clear=False), + patch("kai.triage.asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec, + ): + os.environ.pop("CODEX_BIN", None) await run_triage("prompt", agent_backend="codex") cmd = mock_exec.call_args[0] assert cmd[0] == "codex" @@ -696,6 +700,24 @@ async def test_codex_argv_uses_codex_exec(self): assert "--json" in cmd assert "--print" not in cmd # No claude flag + @pytest.mark.asyncio + async def test_codex_argv_uses_codex_bin_env_var(self): + """ + CODEX_BIN env var overrides bare "codex" in the triage argv. + Same install-time lever the persistent backend honors; needed + for multi-user installs where codex lives in a per-os_user + home not on the service user's PATH. + """ + mock_proc = _mock_subprocess(stdout=self._codex_ndjson("{}")) + with ( + patch.dict(os.environ, {"CODEX_BIN": "/Users/daniel/.npm-global/bin/codex"}), + patch("kai.triage.asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec, + ): + await run_triage("prompt", agent_backend="codex") + cmd = mock_exec.call_args[0] + assert cmd[0] == "/Users/daniel/.npm-global/bin/codex" + assert cmd[1] == "exec" + @pytest.mark.asyncio async def test_codex_argv_uses_registry_model(self): """ From 177d813d8e1b71137b31ffe29e91fb2613a1ca0f Mon Sep 17 00:00:00 2001 From: Daniel Ellison Date: Fri, 15 May 2026 12:54:06 -0400 Subject: [PATCH 3/3] fix(install): inject CODEX_BIN at apply time, not just from wizard Previous commit wrote CODEX_BIN inside _cmd_config (the wizard), so operators running 'sudo CODEX_BIN=... kai install apply' bypassed it - the wizard had never seen the var. Apply now reads CODEX_BIN from the environment after loading install.conf and injects it into the env dict before _apply_secrets writes /etc/kai/env. Apply-time env wins over any stale install.conf value so the operator's explicit override is honored. --- src/kai/install.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/kai/install.py b/src/kai/install.py index 25c7621..25e286e 100644 --- a/src/kai/install.py +++ b/src/kai/install.py @@ -2986,6 +2986,17 @@ def _cmd_apply() -> None: _apply_models(install_path, dry_run) # -- Step 5: Write secrets -- + # Inject CODEX_BIN from the apply-time env so a multi-user + # codex install can pin an absolute codex path without + # round-tripping through the wizard. Apply-time env wins over + # any stale value already in install.conf so the operator's + # explicit `sudo CODEX_BIN=... kai install apply` is honored. + # Only codex installs care; on other backends the var is + # ignored at runtime so writing it is harmless but noisy - + # skip the write to keep /etc/kai/env clean. + env_codex_bin = os.environ.get("CODEX_BIN") + if env_codex_bin and env.get("AGENT_BACKEND") == "codex": + env["CODEX_BIN"] = env_codex_bin _apply_secrets(env, dry_run) # -- Step 6: Deploy Goose config (if backend=goose) --