From c00e11175da81a086262c297a757a874969ee43e Mon Sep 17 00:00:00 2001 From: Daniel Ellison Date: Fri, 15 May 2026 17:49:59 -0400 Subject: [PATCH] feat(review): codex branch for one-shot PR review Phase 4 of the codex backend epic. Adds the codex branch to run_review so GitHub webhook-driven PR reviews run through codex on a codex-active install: - run_review codex branch invokes `codex exec --json --model ` with the model from get_model_for(ModelRole.PR_REVIEW, "codex", override=os.environ.get("PR_REVIEW_MODEL_CODEX", "")). sudo -H -u wrap when claude_user is set so codex reads ~/.codex/auth.json instead of the service user's home. start_new_session in cross-user mode so a timeout kill collects both sudo and the codex grandchild. CODEX_BIN env override honored so installs where codex lives in a per-os_user home (not on the service user's PATH) still resolve the absolute binary. - New kai.codex_exec module hosts the schema-defensive NDJSON parser (extract_codex_text + private _recover_agent_message_text). Both triage.py and review.py import from there so review.py does not reach into triage.py for a generic codex helper. Codex re-review of #489 made canonical helpers an explicit aesthetic preference. - Codex re-review of #489 also caught that get_effective_provider needed the codex->openai rule; that fix landed in #489. No new config plumbing needed here; the webhook handler already threads per-user agent_backend / provider into review_pr and run_review. - Tests: TestRunReviewCodex (11 tests) mirrors TestRunTriageCodex argv/sudo/timeout/extraction coverage; TestExtractCodexText moved from test_triage.py to test_codex_exec.py (location follows implementation). --- src/kai/codex_exec.py | 119 ++++++++++++++++++++++ src/kai/review.py | 132 +++++++++++++++++++++--- src/kai/triage.py | 103 ++----------------- tests/test_codex_exec.py | 98 ++++++++++++++++++ tests/test_review.py | 215 +++++++++++++++++++++++++++++++++++++++ tests/test_triage.py | 92 +---------------- 6 files changed, 560 insertions(+), 199 deletions(-) create mode 100644 src/kai/codex_exec.py create mode 100644 tests/test_codex_exec.py diff --git a/src/kai/codex_exec.py b/src/kai/codex_exec.py new file mode 100644 index 0000000..c8ba86b --- /dev/null +++ b/src/kai/codex_exec.py @@ -0,0 +1,119 @@ +""" +Codex `exec --json` NDJSON parsing helpers shared by one-shot callers. + +The codex CLI's one-shot mode (`codex exec --json`) emits NDJSON events on +stdout - one JSON object per line, each tagged by a top-level `type` +field from the ThreadEvent enum. One-shot callers (triage.py, review.py, +and any future codex-driven agent) all need to recover the final +agent_message text from that stream, so the parser lives here rather +than being duplicated per caller. + +This module is intentionally tiny and dependency-free: no Kai-side +imports, no I/O, no logging. It exists so the two callers above can +share one definition site without one importing from the other (which +would couple unrelated agent surfaces; review.py has no reason to know +about triage.py and vice versa). + +Distinct from `codex.py`, which manages the persistent codex app-server +subprocess for conversational use. That module speaks the JSON-RPC +`thread/turn/item` protocol; this one parses the dot-separated event +stream `codex exec --json` writes to stdout in one-shot mode. Same +underlying data model, different wire encodings, different callers, +different lifecycles - hence the separate file. +""" + +import json + + +def extract_codex_text(stdout: str) -> str: + """ + Walk codex's NDJSON event stream and return the agent message text. + + `codex exec --json` emits one JSON event per line. Each event has + a top-level `type` tag from the ThreadEvent enum: + `thread.started`, `turn.started`, `turn.completed`, `turn.failed`, + `item.started`, `item.updated`, `item.completed`, `error`. (Note + the DOT separator; the app-server protocol uses slashes + instead. Same data model, different wire encoding.) + + Callers only care about the agent's final natural-language + response. The `item.completed` event for an agent_message item + carries the full consolidated text: + + {"type": "item.completed", + "item": {"id": "...", "type": "agent_message", "text": "..."}} + + Schema reference: codex-rs/exec/src/exec_events.rs in the codex + repo. The `ThreadItemDetails` enum is `#[serde(tag = "type", + rename_all = "snake_case")]` so the discriminator is + `"agent_message"` (snake_case), and `text` is a flat field on + the item object (the inner enum is `#[serde(flatten)]`). + + A streaming run may emit `item.updated` events for the same + agent_message id before its `item.completed`. We trust the + completed event as authoritative; if no completed event arrived + (e.g. truncated stream) we fall back to the latest updated text + so the caller gets something rather than nothing. + + Schema-drift posture: a future codex release that adds new event + types or item types must not break extraction. Unknown shapes + are silently skipped. A `turn.failed` event short-circuits to an + empty result so the caller raises a clearer error than a partial + body would. + + Args: + stdout: The full stdout from `codex exec --json`. + + Returns: + The agent_message text from the last `item.completed` + event, or the latest `item.updated` text as a fallback. + Empty string if no agent_message was emitted. + """ + completed_text: str | None = None + latest_updated_text: str | None = None + for line in stdout.splitlines(): + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + except json.JSONDecodeError: + continue + event_type = obj.get("type") + # `turn.failed` is a terminal failure - no body text to + # extract; let the caller see an empty string and surface + # a clearer error than a half-event would. + if event_type == "turn.failed": + return "" + if event_type in ("item.completed", "item.updated"): + text = _recover_agent_message_text(obj) + if text is None: + continue + if event_type == "item.completed": + completed_text = text + else: + latest_updated_text = text + if completed_text is not None: + return completed_text.strip() + if latest_updated_text is not None: + return latest_updated_text.strip() + return "" + + +def _recover_agent_message_text(obj: dict) -> str | None: + """ + Pull `item.text` from a codex exec event when the item is an + agent_message; return None otherwise. + + The wire shape is `{..., "item": {"id": "...", "type": "agent_message", "text": "..."}}` + because ThreadItemDetails is serde-flattened onto ThreadItem. + """ + item = obj.get("item") + if not isinstance(item, dict): + return None + if item.get("type") != "agent_message": + return None + text = item.get("text") + if isinstance(text, str): + return text + return None diff --git a/src/kai/review.py b/src/kai/review.py index 57f4d46..410d776 100644 --- a/src/kai/review.py +++ b/src/kai/review.py @@ -1,5 +1,5 @@ """ -PR review agent - one-shot subprocess (Claude or Goose) for automated code review. +PR review agent - one-shot subprocess (Claude, Codex, or Goose) for automated code review. Provides functionality to: 1. Fetch PR diffs and metadata via the GitHub CLI @@ -19,9 +19,11 @@ re-evaluate a prior finding. The LLM subprocess runs in one-shot mode (non-interactive, no tools, no -streaming): `claude --print` for Claude, `goose run -i -` for Goose. -The prompt goes in via stdin to handle large diffs without hitting shell -argument length limits. Output is captured as plain text. +streaming): `claude --print` for Claude, `codex exec --json` for Codex, +`goose run -i -` for Goose. The prompt goes in via stdin to handle +large diffs without hitting shell argument length limits. Output is +captured as plain text (NDJSON for codex; the agent_message text is +extracted by kai.codex_exec.extract_codex_text). """ import asyncio @@ -35,6 +37,7 @@ import aiohttp +from kai.codex_exec import extract_codex_text from kai.config import ModelRole, get_model_for, resolve_claude_user from kai.prompt_utils import make_boundary @@ -673,24 +676,33 @@ async def run_review( """ Spawn a one-shot LLM subprocess to perform the review. - Dispatches to either Claude (`claude --print`) or Goose - (`goose run -i -`) based on agent_backend. Both read from stdin - and write plain text to stdout; the prompt and output handling - are identical regardless of backend. + Dispatches to Claude (`claude --print`), Codex (`codex exec + --json`), or Goose (`goose run -i -`) based on agent_backend. + All three read the prompt from stdin and return a single string + of review text; the prompt and output handling are identical + regardless of backend. Claude path: supports sudo -u for OS-level isolation, process group kills for cleanup, and per-run budget caps. + Codex path: sudo -H -u when claude_user is set (codex needs + HOME pointing at the user's home so it reads the right + ~/.codex/auth.json); subprocess emits NDJSON which is collapsed + to the final agent_message text by extract_codex_text. + Goose path: no sudo (Goose has no user isolation), simple proc.kill() for cleanup, --max-turns 1 as the safety limit. Args: prompt: The complete review prompt (from build_review_prompt). - claude_user: Optional OS user to run Claude as (via sudo -u). - Ignored when agent_backend is "goose". - agent_backend: Which LLM backend to use ("claude" or "goose"). + claude_user: Optional OS user to run the subprocess as (via + sudo -u for claude, sudo -H -u for codex). Ignored when + agent_backend is "goose". + agent_backend: Which LLM backend to use ("claude", "codex", + or "goose"). provider: LLM provider name (e.g. "anthropic", "openai"). - Only used when agent_backend is "goose". + Only used when agent_backend is "goose"; codex always + uses openai, claude always uses anthropic. Returns: The review text output from the LLM. @@ -698,6 +710,102 @@ async def run_review( Raises: RuntimeError: If the subprocess fails or times out. """ + if agent_backend == "codex": + # Codex one-shot mode: --json emits NDJSON events on stdout + # (thread.started, turn.started, item.*, turn.completed, + # turn.failed, error). The final agent message text is + # recovered by walking the events and accumulating any text + # content; extract_codex_text in kai.codex_exec is the + # schema-defensive parser shared with triage.py. + # Per-user OAuth isolation: when claude_user is set (the + # webhook handler passes the same os_user value to both + # backends through this parameter), wrap the codex argv in + # `sudo -H -u ` so codex reads ~/.codex/auth.json + # instead of the service user's home. The parameter name is + # claude-historical; rename is out of scope for this fix. + # No --max-budget-usd: codex on subscription auth has no + # per-call billing; runaway protection comes from + # timeout_s at the asyncio.wait_for below. + review_model = get_model_for( + ModelRole.PR_REVIEW, + agent_backend, + override=os.environ.get("PR_REVIEW_MODEL_CODEX", ""), + ) + # Pin the absolute codex path when CODEX_BIN is set; same + # rationale as codex.py and triage.py - sudo cannot resolve + # bare `codex` when the binary lives in a per-os_user home + # that isn't on the service user's PATH. Falls back to bare + # "codex" for installs where codex is on PATH. + codex_bin = os.environ.get("CODEX_BIN") or "codex" + codex_cmd = [ + codex_bin, + "exec", + "--json", + "--model", + review_model, + ] + + # Resolve self-sudo: skip the sudo wrap when claude_user + # matches the bot process user. Mirrors the triage codex + # branch's pattern. + effective_user = resolve_claude_user(claude_user) + if effective_user: + # -H sets HOME to 's pw entry so codex + # reads auth from the right home. --preserve-env passes + # KAI_WEBHOOK_SECRET through sudo's env_reset (the SETENV: + # sudoers rule allows this). Same shape claude uses. + cmd = [ + "sudo", + "-H", + "-u", + effective_user, + "--preserve-env=KAI_WEBHOOK_SECRET", + "--", + ] + codex_cmd + else: + cmd = codex_cmd + + proc = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + # New session group in cross-user mode so killing sudo + # also kills the codex grandchild (mirrors claude branch + # and the triage codex branch). + start_new_session=bool(effective_user), + ) + + try: + stdout, stderr = await asyncio.wait_for( + proc.communicate(input=prompt.encode()), + timeout=timeout_s, + ) + except TimeoutError: + # Kill the subprocess tree if it exceeds the timeout. In + # cross-user mode (effective_user set) the process is in + # a new group (PGID == PID); kill the group so both sudo + # and the codex grandchild die, preventing orphans. + if effective_user: + try: + os.killpg(proc.pid, signal.SIGKILL) + except ProcessLookupError: + pass # Already dead + else: + proc.kill() + await proc.wait() + raise RuntimeError(f"Review subprocess timed out after {timeout_s}s") from None + + if proc.returncode != 0: + error = stderr.decode().strip() + raise RuntimeError(f"Review subprocess failed (exit {proc.returncode}): {error}") + + # Codex emits NDJSON; extract the final agent message text and + # return it (mirrors the contract the claude / goose branches + # already satisfy: a single string the caller hands back to + # the webhook handler for posting as a PR comment). + return extract_codex_text(stdout.decode()) + if agent_backend == "goose": if not provider: raise ValueError( diff --git a/src/kai/triage.py b/src/kai/triage.py index 0311d57..0435291 100644 --- a/src/kai/triage.py +++ b/src/kai/triage.py @@ -35,6 +35,7 @@ import aiohttp +from kai.codex_exec import extract_codex_text from kai.config import ModelRole, get_model_for, resolve_claude_user from kai.prompt_utils import make_boundary @@ -418,7 +419,7 @@ async def run_triage( # (thread.started, turn.started, item.*, turn.completed, error). # The final agent message text is recovered by walking the # events and accumulating any text content; see - # _extract_codex_text() below for the schema-defensive parser. + # extract_codex_text (kai.codex_exec) is the schema-defensive parser. # Per-user OAuth isolation: when claude_user is set (the # webhook handler passes the same os_user value to both # backends through this parameter), wrap the codex argv in @@ -503,8 +504,10 @@ async def run_triage( # Codex emits NDJSON; extract the final agent message text and # return it (mirrors the contract the claude / goose branches # already satisfy: a single string the caller hands to - # _parse_triage_json). - return _extract_codex_text(stdout.decode()) + # _parse_triage_json). extract_codex_text lives in codex_exec + # so review.py can share the same parser without importing + # from triage.py. + return extract_codex_text(stdout.decode()) if agent_backend == "goose": if not provider: @@ -630,100 +633,6 @@ async def run_triage( return stdout.decode().strip() -def _extract_codex_text(stdout: str) -> str: - """ - Walk codex's NDJSON event stream and return the agent message text. - - `codex exec --json` emits one JSON event per line. Each event has - a top-level `type` tag from the ThreadEvent enum: - `thread.started`, `turn.started`, `turn.completed`, `turn.failed`, - `item.started`, `item.updated`, `item.completed`, `error`. (Note - the DOT separator; the app-server protocol uses slashes - instead. Same data model, different wire encoding.) - - For triage we only care about the agent's final natural-language - response. The `item.completed` event for an agent_message item - carries the full consolidated text: - - {"type": "item.completed", - "item": {"id": "...", "type": "agent_message", "text": "..."}} - - Schema reference: codex-rs/exec/src/exec_events.rs in the codex - repo. The `ThreadItemDetails` enum is `#[serde(tag = "type", - rename_all = "snake_case")]` so the discriminator is - `"agent_message"` (snake_case), and `text` is a flat field on - the item object (the inner enum is `#[serde(flatten)]`). - - A streaming run may emit `item.updated` events for the same - agent_message id before its `item.completed`. We trust the - completed event as authoritative; if no completed event arrived - (e.g. truncated stream) we fall back to the latest updated text - so triage gets something rather than nothing. - - Schema-drift posture: a future codex release that adds new event - types or item types must not break extraction. Unknown shapes - are silently skipped. A `turn.failed` event short-circuits to an - empty result so the caller raises a clearer error than a partial - body would. - - Args: - stdout: The full stdout from `codex exec --json`. - - Returns: - The agent_message text from the last `item.completed` - event, or the latest `item.updated` text as a fallback. - Empty string if no agent_message was emitted. - """ - completed_text: str | None = None - latest_updated_text: str | None = None - for line in stdout.splitlines(): - line = line.strip() - if not line: - continue - try: - obj = json.loads(line) - except json.JSONDecodeError: - continue - event_type = obj.get("type") - # `turn.failed` is a terminal failure - no body text to - # extract; let the caller see an empty string and surface - # a clearer error than a half-event would. - if event_type == "turn.failed": - return "" - if event_type in ("item.completed", "item.updated"): - text = _recover_agent_message_text(obj) - if text is None: - continue - if event_type == "item.completed": - completed_text = text - else: - latest_updated_text = text - if completed_text is not None: - return completed_text.strip() - if latest_updated_text is not None: - return latest_updated_text.strip() - return "" - - -def _recover_agent_message_text(obj: dict) -> str | None: - """ - Pull `item.text` from a codex exec event when the item is an - agent_message; return None otherwise. - - The wire shape is `{..., "item": {"id": "...", "type": "agent_message", "text": "..."}}` - because ThreadItemDetails is serde-flattened onto ThreadItem. - """ - item = obj.get("item") - if not isinstance(item, dict): - return None - if item.get("type") != "agent_message": - return None - text = item.get("text") - if isinstance(text, str): - return text - return None - - def _parse_triage_json(raw: str) -> dict: """ Parse Claude's triage response, stripping markdown fencing if present. diff --git a/tests/test_codex_exec.py b/tests/test_codex_exec.py new file mode 100644 index 0000000..f604964 --- /dev/null +++ b/tests/test_codex_exec.py @@ -0,0 +1,98 @@ +""" +Tests for kai.codex_exec - shared NDJSON parser for codex one-shot +callers (triage, review, and any future codex-driven one-shot agent). + +The codex exec --json schema (codex-rs/exec/src/exec_events.rs): +each event has top-level `type` discriminator; agent_message +items carry their full text in `item.text`. item.completed is +authoritative; item.updated is interim. turn.failed terminates +extraction with an empty result. + +These tests originally lived in test_triage.py alongside the helper. +Moved here when the helper was promoted to a shared module so the +test file's location matches the implementation file's location; +no behavior change. +""" + +import json + +from kai.codex_exec import extract_codex_text + + +class TestExtractCodexText: + """Unit tests for the extract_codex_text NDJSON parser.""" + + def test_empty_input(self): + """An empty stream returns the empty string.""" + assert extract_codex_text("") == "" + + def test_skips_non_json_lines(self): + """Non-JSON lines are silently skipped.""" + valid = json.dumps({"type": "item.completed", "item": {"id": "i", "type": "agent_message", "text": "hello"}}) + stream = "not-json\n" + valid + "\n" + assert extract_codex_text(stream) == "hello" + + def test_extracts_item_completed_agent_message(self): + """item.completed for an agent_message returns item.text.""" + event = {"type": "item.completed", "item": {"id": "i1", "type": "agent_message", "text": "hello"}} + stream = json.dumps(event) + "\n" + assert extract_codex_text(stream) == "hello" + + def test_ignores_non_agent_message_items(self): + """item.completed for non-agent_message items (e.g. reasoning) contributes nothing.""" + event = {"type": "item.completed", "item": {"id": "i1", "type": "reasoning", "text": "thinking..."}} + stream = json.dumps(event) + "\n" + assert extract_codex_text(stream) == "" + + def test_ignores_lifecycle_events(self): + """thread.started, turn.started, turn.completed contribute nothing.""" + events = [ + {"type": "thread.started", "thread_id": "thr_1"}, + {"type": "turn.started"}, + {"type": "turn.completed", "usage": {"input_tokens": 0, "output_tokens": 0}}, + ] + stream = "\n".join(json.dumps(e) for e in events) + "\n" + assert extract_codex_text(stream) == "" + + def test_strips_outer_whitespace(self): + """The accumulated result has leading/trailing whitespace stripped.""" + event = {"type": "item.completed", "item": {"id": "i", "type": "agent_message", "text": " hello "}} + stream = json.dumps(event) + "\n" + assert extract_codex_text(stream) == "hello" + + def test_completed_wins_over_updated(self): + """item.completed text supersedes any earlier item.updated text.""" + events = [ + {"type": "item.updated", "item": {"id": "i", "type": "agent_message", "text": "partial"}}, + {"type": "item.updated", "item": {"id": "i", "type": "agent_message", "text": "partial more"}}, + {"type": "item.completed", "item": {"id": "i", "type": "agent_message", "text": "FINAL"}}, + ] + stream = "\n".join(json.dumps(e) for e in events) + "\n" + assert extract_codex_text(stream) == "FINAL" + + def test_last_completed_wins(self): + """Multiple item.completed events (rare): last one wins.""" + events = [ + {"type": "item.completed", "item": {"id": "i1", "type": "agent_message", "text": "first"}}, + {"type": "item.completed", "item": {"id": "i2", "type": "agent_message", "text": "second"}}, + ] + stream = "\n".join(json.dumps(e) for e in events) + "\n" + assert extract_codex_text(stream) == "second" + + def test_updated_fallback_when_no_completed(self): + """If only item.updated events arrive (truncated stream), use the latest one.""" + events = [ + {"type": "item.updated", "item": {"id": "i", "type": "agent_message", "text": "first"}}, + {"type": "item.updated", "item": {"id": "i", "type": "agent_message", "text": "second"}}, + ] + stream = "\n".join(json.dumps(e) for e in events) + "\n" + assert extract_codex_text(stream) == "second" + + def test_turn_failed_short_circuits(self): + """A turn.failed event terminates extraction with the empty string.""" + events = [ + {"type": "item.completed", "item": {"id": "i", "type": "agent_message", "text": "ignored"}}, + {"type": "turn.failed", "error": {"message": "model unavailable"}}, + ] + stream = "\n".join(json.dumps(e) for e in events) + "\n" + assert extract_codex_text(stream) == "" diff --git a/tests/test_review.py b/tests/test_review.py index 31dbd69..c388b0c 100644 --- a/tests/test_review.py +++ b/tests/test_review.py @@ -715,6 +715,221 @@ async def test_prompt_sent_via_stdin(self): assert call_kwargs[1]["input"] == b"the review prompt" +# ── run_review (Codex backend) ───────────────────────────────────── + + +class TestRunReviewCodex: + """ + Tests for the codex branch of run_review. + + Mirrors TestRunTriageCodex in test_triage.py: the two codex + branches share the same pattern (codex exec --json + sudo -H -u + wrap + NDJSON parse) and the same kai.codex_exec parser. Locks + the "no overlap" guarantee at the subprocess boundary - a codex- + backed review never spawns claude or goose. + """ + + @staticmethod + def _codex_ndjson(text: str) -> bytes: + """ + Build a minimal NDJSON stream that mirrors the real + `codex exec --json` schema: each event has a top-level `type` + tag; item.completed for an agent_message carries the full + consolidated `text` field. Returns bytes (the review tests' + _mock_process expects bytes via communicate()). + """ + events = [ + {"type": "thread.started", "thread_id": "thr_test"}, + {"type": "turn.started"}, + { + "type": "item.completed", + "item": {"id": "item_1", "type": "agent_message", "text": text}, + }, + {"type": "turn.completed", "usage": {"input_tokens": 0, "output_tokens": 0}}, + ] + return ("\n".join(json.dumps(e) for e in events) + "\n").encode() + + @pytest.mark.asyncio + async def test_codex_argv_uses_codex_exec(self): + """ + Argv is `codex exec --json --model `, never claude or + goose. Locks the "no overlap" guarantee at the subprocess + boundary: the codex review branch never spawns claude. + """ + mock_proc = _mock_process(stdout=self._codex_ndjson("review body")) + with ( + patch.dict(os.environ, {}, clear=False), + patch("kai.review.asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec, + ): + os.environ.pop("CODEX_BIN", None) + await run_review("prompt", agent_backend="codex") + cmd = mock_exec.call_args[0] + assert cmd[0] == "codex" + assert cmd[1] == "exec" + assert "--json" in cmd + assert "--print" not in cmd # No claude flag + + @pytest.mark.asyncio + async def test_codex_argv_uses_codex_bin_env_var(self): + """ + CODEX_BIN env var overrides bare "codex" in the review argv. + Same install-time lever the persistent backend and the codex + triage branch honor; needed for multi-user installs where + codex lives in a per-os_user home not on the service user's + PATH. + """ + mock_proc = _mock_process(stdout=self._codex_ndjson("")) + with ( + patch.dict(os.environ, {"CODEX_BIN": "/Users/daniel/.npm-global/bin/codex"}), + patch("kai.review.asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec, + ): + await run_review("prompt", agent_backend="codex") + cmd = mock_exec.call_args[0] + assert cmd[0] == "/Users/daniel/.npm-global/bin/codex" + assert cmd[1] == "exec" + + @pytest.mark.asyncio + async def test_codex_argv_uses_registry_model(self): + """ + With agent_backend=codex and no env override, the --model + argv slot matches the registry's (codex, PR_REVIEW) row + ("gpt-5.4-mini"). Locks the registry as the source of truth + for the codex side. + """ + mock_proc = _mock_process(stdout=self._codex_ndjson("")) + with patch("kai.review.asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec: + await run_review("prompt", agent_backend="codex") + cmd = mock_exec.call_args[0] + i = cmd.index("--model") + assert cmd[i + 1] == "gpt-5.4-mini" + + @pytest.mark.asyncio + async def test_codex_env_override_honored_at_call_site(self, monkeypatch): + """ + PR_REVIEW_MODEL_CODEX in the environment overrides the + registry value at the call site. Same end-to-end env-override + wiring as the claude path. + """ + monkeypatch.setenv("PR_REVIEW_MODEL_CODEX", "gpt-5.4") + mock_proc = _mock_process(stdout=self._codex_ndjson("")) + with patch("kai.review.asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec: + await run_review("prompt", agent_backend="codex") + cmd = mock_exec.call_args[0] + i = cmd.index("--model") + assert cmd[i + 1] == "gpt-5.4" + + @pytest.mark.asyncio + async def test_codex_no_sudo_when_user_unset(self): + """ + With no claude_user passed, codex runs as the bot process + user directly: argv begins with "codex", no "sudo" wrap. + """ + mock_proc = _mock_process(stdout=self._codex_ndjson("")) + with patch("kai.review.asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec: + await run_review("prompt", agent_backend="codex") + cmd = mock_exec.call_args[0] + assert cmd[0] == "codex" + assert "sudo" not in cmd + + @pytest.mark.asyncio + async def test_codex_wraps_sudo_when_user_set(self): + """ + With claude_user set to a non-self user, codex argv is wrapped + in `sudo -H -u --preserve-env=KAI_WEBHOOK_SECRET --`. + The per-user os_user lever is what makes a multi-user install + spawn codex as each user, reading their per-user + ~/.codex/auth.json. + + Same fake-username trick as the triage codex tests: pass a + username implausible enough to never match the test runner's + user, so resolve_claude_user does NOT short-circuit to no-sudo. + """ + mock_proc = _mock_process(stdout=self._codex_ndjson("")) + with patch("kai.review.asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec: + await run_review("prompt", agent_backend="codex", claude_user="ci-fake-user") + cmd = mock_exec.call_args[0] + assert cmd[0] == "sudo" + assert "-H" in cmd + i = cmd.index("-u") + assert cmd[i + 1] == "ci-fake-user" + assert any(arg.startswith("--preserve-env=") and "KAI_WEBHOOK_SECRET" in arg for arg in cmd) + assert "codex" in cmd + codex_i = cmd.index("codex") + assert cmd[codex_i + 1] == "exec" + + @pytest.mark.asyncio + async def test_codex_no_max_budget_flag(self): + """ + --max-budget-usd is not emitted on the codex branch. Codex on + subscription auth has no per-call billing; runaway protection + comes from the asyncio.wait_for timeout. Mirror of the + existing claude-side absence assertion. + """ + mock_proc = _mock_process(stdout=self._codex_ndjson("")) + with patch("kai.review.asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec: + await run_review("prompt", agent_backend="codex") + cmd = mock_exec.call_args[0] + assert "--max-budget-usd" not in cmd + + @pytest.mark.asyncio + async def test_codex_extracts_final_text_from_ndjson(self): + """ + Return value is the agent message text extracted from the + NDJSON event stream, not the raw stdout. The downstream + webhook handler posts the returned text directly as a PR + comment; it must not see multi-line NDJSON. + """ + body = "## Review\n\nLooks good." + mock_proc = _mock_process(stdout=self._codex_ndjson(body)) + with patch("kai.review.asyncio.create_subprocess_exec", return_value=mock_proc): + result = await run_review("prompt", agent_backend="codex") + assert result == body + + @pytest.mark.asyncio + async def test_codex_subprocess_failure_raises(self): + """Non-zero exit from codex raises RuntimeError with stderr.""" + mock_proc = _mock_process(returncode=1, stderr=b"auth failed") + with ( + patch("kai.review.asyncio.create_subprocess_exec", return_value=mock_proc), + pytest.raises(RuntimeError, match="auth failed"), + ): + await run_review("prompt", agent_backend="codex") + + @pytest.mark.asyncio + async def test_codex_timeout_kills_process_group(self): + """ + On TimeoutError with claude_user set, the subprocess group is + killed via os.killpg(PID, SIGKILL) so both sudo and the codex + grandchild die. Mirror of the claude branch's process-group + teardown. + """ + proc = AsyncMock() + proc.communicate = AsyncMock(side_effect=TimeoutError()) + proc.kill = MagicMock() + proc.wait = AsyncMock() + proc.pid = 99999 # Fake PID for killpg + with ( + patch("kai.review.asyncio.create_subprocess_exec", return_value=proc), + patch("kai.review.os.killpg") as mock_killpg, + pytest.raises(RuntimeError, match="timed out"), + ): + await run_review("prompt", agent_backend="codex", claude_user="ci-fake-user", timeout_s=1) + mock_killpg.assert_called_once_with(99999, signal.SIGKILL) + + @pytest.mark.asyncio + async def test_codex_stdin_carries_prompt(self): + """ + Codex one-shot mode reads the review prompt from stdin (the + prompt is too large for argv on real diffs). Mirrors the + existing claude- and goose-side stdin assertions. + """ + mock_proc = _mock_process(stdout=self._codex_ndjson("")) + with patch("kai.review.asyncio.create_subprocess_exec", return_value=mock_proc): + await run_review("the review prompt", agent_backend="codex") + mock_proc.communicate.assert_called_once() + assert mock_proc.communicate.call_args.kwargs["input"] == b"the review prompt" + + # ── run_review (Goose backend) ───────────────────────────────────── diff --git a/tests/test_triage.py b/tests/test_triage.py index a72b9f1..31aed3b 100644 --- a/tests/test_triage.py +++ b/tests/test_triage.py @@ -12,7 +12,6 @@ from kai.triage import ( _GOOSE_AGENT_MODELS, IssueMetadata, - _extract_codex_text, _parse_triage_json, _resolve_goose_model, _sanitize_search_query, @@ -657,7 +656,7 @@ class TestRunTriageCodex: Tests for the codex branch of run_triage. The codex branch invokes `codex exec --json` and parses NDJSON - output via _extract_codex_text. No sudo wrap; subscription auth + output via extract_codex_text. No sudo wrap; subscription auth uses the service user's own ~/.codex/auth.json. """ @@ -818,7 +817,7 @@ async def test_codex_extracts_final_text_from_ndjson(self): mock_proc = _mock_subprocess(stdout=self._codex_ndjson(expected_json)) with patch("kai.triage.asyncio.create_subprocess_exec", return_value=mock_proc): result = await run_triage("prompt", agent_backend="codex") - # _extract_codex_text strips, so the result is the expected + # extract_codex_text strips, so the result is the expected # JSON string with no leading/trailing whitespace. assert result == expected_json @@ -865,93 +864,6 @@ async def test_codex_completed_supersedes_updated(self): assert result == expected_json -class TestExtractCodexText: - """ - Unit tests for the _extract_codex_text NDJSON parser. - - The codex exec --json schema (codex-rs/exec/src/exec_events.rs): - each event has top-level `type` discriminator; agent_message - items carry their full text in `item.text`. item.completed is - authoritative; item.updated is interim. turn.failed terminates - extraction with an empty result. - """ - - def test_empty_input(self): - """An empty stream returns the empty string.""" - assert _extract_codex_text("") == "" - - def test_skips_non_json_lines(self): - """Non-JSON lines are silently skipped.""" - valid = json.dumps({"type": "item.completed", "item": {"id": "i", "type": "agent_message", "text": "hello"}}) - stream = "not-json\n" + valid + "\n" - assert _extract_codex_text(stream) == "hello" - - def test_extracts_item_completed_agent_message(self): - """item.completed for an agent_message returns item.text.""" - event = {"type": "item.completed", "item": {"id": "i1", "type": "agent_message", "text": "hello"}} - stream = json.dumps(event) + "\n" - assert _extract_codex_text(stream) == "hello" - - def test_ignores_non_agent_message_items(self): - """item.completed for non-agent_message items (e.g. reasoning) contributes nothing.""" - event = {"type": "item.completed", "item": {"id": "i1", "type": "reasoning", "text": "thinking..."}} - stream = json.dumps(event) + "\n" - assert _extract_codex_text(stream) == "" - - def test_ignores_lifecycle_events(self): - """thread.started, turn.started, turn.completed contribute nothing.""" - events = [ - {"type": "thread.started", "thread_id": "thr_1"}, - {"type": "turn.started"}, - {"type": "turn.completed", "usage": {"input_tokens": 0, "output_tokens": 0}}, - ] - stream = "\n".join(json.dumps(e) for e in events) + "\n" - assert _extract_codex_text(stream) == "" - - def test_strips_outer_whitespace(self): - """The accumulated result has leading/trailing whitespace stripped.""" - event = {"type": "item.completed", "item": {"id": "i", "type": "agent_message", "text": " hello "}} - stream = json.dumps(event) + "\n" - assert _extract_codex_text(stream) == "hello" - - def test_completed_wins_over_updated(self): - """item.completed text supersedes any earlier item.updated text.""" - events = [ - {"type": "item.updated", "item": {"id": "i", "type": "agent_message", "text": "partial"}}, - {"type": "item.updated", "item": {"id": "i", "type": "agent_message", "text": "partial more"}}, - {"type": "item.completed", "item": {"id": "i", "type": "agent_message", "text": "FINAL"}}, - ] - stream = "\n".join(json.dumps(e) for e in events) + "\n" - assert _extract_codex_text(stream) == "FINAL" - - def test_last_completed_wins(self): - """Multiple item.completed events (rare): last one wins.""" - events = [ - {"type": "item.completed", "item": {"id": "i1", "type": "agent_message", "text": "first"}}, - {"type": "item.completed", "item": {"id": "i2", "type": "agent_message", "text": "second"}}, - ] - stream = "\n".join(json.dumps(e) for e in events) + "\n" - assert _extract_codex_text(stream) == "second" - - def test_updated_fallback_when_no_completed(self): - """If only item.updated events arrive (truncated stream), use the latest one.""" - events = [ - {"type": "item.updated", "item": {"id": "i", "type": "agent_message", "text": "first"}}, - {"type": "item.updated", "item": {"id": "i", "type": "agent_message", "text": "second"}}, - ] - stream = "\n".join(json.dumps(e) for e in events) + "\n" - assert _extract_codex_text(stream) == "second" - - def test_turn_failed_short_circuits(self): - """A turn.failed event terminates extraction with the empty string.""" - events = [ - {"type": "item.completed", "item": {"id": "i", "type": "agent_message", "text": "ignored"}}, - {"type": "turn.failed", "error": {"message": "model unavailable"}}, - ] - stream = "\n".join(json.dumps(e) for e in events) + "\n" - assert _extract_codex_text(stream) == "" - - class TestResolveGooseModelTriage: """Tests for _resolve_goose_model in triage.py."""