Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions src/kai/codex_exec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""
Codex `exec --json` NDJSON parsing helpers shared by one-shot callers.

The codex CLI's one-shot mode (`codex exec --json`) emits NDJSON events on
stdout - one JSON object per line, each tagged by a top-level `type`
field from the ThreadEvent enum. One-shot callers (triage.py, review.py,
and any future codex-driven agent) all need to recover the final
agent_message text from that stream, so the parser lives here rather
than being duplicated per caller.

This module is intentionally tiny and dependency-free: no Kai-side
imports, no I/O, no logging. It exists so the two callers above can
share one definition site without one importing from the other (which
would couple unrelated agent surfaces; review.py has no reason to know
about triage.py and vice versa).

Distinct from `codex.py`, which manages the persistent codex app-server
subprocess for conversational use. That module speaks the JSON-RPC
`thread/turn/item` protocol; this one parses the dot-separated event
stream `codex exec --json` writes to stdout in one-shot mode. Same
underlying data model, different wire encodings, different callers,
different lifecycles - hence the separate file.
"""

import json


def extract_codex_text(stdout: str) -> str:
"""
Walk codex's NDJSON event stream and return the agent message text.

`codex exec --json` emits one JSON event per line. Each event has
a top-level `type` tag from the ThreadEvent enum:
`thread.started`, `turn.started`, `turn.completed`, `turn.failed`,
`item.started`, `item.updated`, `item.completed`, `error`. (Note
the DOT separator; the app-server protocol uses slashes
instead. Same data model, different wire encoding.)

Callers only care about the agent's final natural-language
response. The `item.completed` event for an agent_message item
carries the full consolidated text:

{"type": "item.completed",
"item": {"id": "...", "type": "agent_message", "text": "..."}}

Schema reference: codex-rs/exec/src/exec_events.rs in the codex
repo. The `ThreadItemDetails` enum is `#[serde(tag = "type",
rename_all = "snake_case")]` so the discriminator is
`"agent_message"` (snake_case), and `text` is a flat field on
the item object (the inner enum is `#[serde(flatten)]`).

A streaming run may emit `item.updated` events for the same
agent_message id before its `item.completed`. We trust the
completed event as authoritative; if no completed event arrived
(e.g. truncated stream) we fall back to the latest updated text
so the caller gets something rather than nothing.

Schema-drift posture: a future codex release that adds new event
types or item types must not break extraction. Unknown shapes
are silently skipped. A `turn.failed` event short-circuits to an
empty result so the caller raises a clearer error than a partial
body would.

Args:
stdout: The full stdout from `codex exec --json`.

Returns:
The agent_message text from the last `item.completed`
event, or the latest `item.updated` text as a fallback.
Empty string if no agent_message was emitted.
"""
completed_text: str | None = None
latest_updated_text: str | None = None
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
event_type = obj.get("type")
# `turn.failed` is a terminal failure - no body text to
# extract; let the caller see an empty string and surface
# a clearer error than a half-event would.
if event_type == "turn.failed":
return ""
if event_type in ("item.completed", "item.updated"):
text = _recover_agent_message_text(obj)
if text is None:
continue
if event_type == "item.completed":
completed_text = text
else:
latest_updated_text = text
if completed_text is not None:
return completed_text.strip()
if latest_updated_text is not None:
return latest_updated_text.strip()
return ""


def _recover_agent_message_text(obj: dict) -> str | None:
"""
Pull `item.text` from a codex exec event when the item is an
agent_message; return None otherwise.

The wire shape is `{..., "item": {"id": "...", "type": "agent_message", "text": "..."}}`
because ThreadItemDetails is serde-flattened onto ThreadItem.
"""
item = obj.get("item")
if not isinstance(item, dict):
return None
if item.get("type") != "agent_message":
return None
text = item.get("text")
if isinstance(text, str):
return text
return None
132 changes: 120 additions & 12 deletions src/kai/review.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
PR review agent - one-shot subprocess (Claude or Goose) for automated code review.
PR review agent - one-shot subprocess (Claude, Codex, or Goose) for automated code review.

Provides functionality to:
1. Fetch PR diffs and metadata via the GitHub CLI
Expand All @@ -19,9 +19,11 @@
re-evaluate a prior finding.

The LLM subprocess runs in one-shot mode (non-interactive, no tools, no
streaming): `claude --print` for Claude, `goose run -i -` for Goose.
The prompt goes in via stdin to handle large diffs without hitting shell
argument length limits. Output is captured as plain text.
streaming): `claude --print` for Claude, `codex exec --json` for Codex,
`goose run -i -` for Goose. The prompt goes in via stdin to handle
large diffs without hitting shell argument length limits. Output is
captured as plain text (NDJSON for codex; the agent_message text is
extracted by kai.codex_exec.extract_codex_text).
"""

import asyncio
Expand All @@ -35,6 +37,7 @@

import aiohttp

from kai.codex_exec import extract_codex_text
from kai.config import ModelRole, get_model_for, resolve_claude_user
from kai.prompt_utils import make_boundary

Expand Down Expand Up @@ -673,31 +676,136 @@ async def run_review(
"""
Spawn a one-shot LLM subprocess to perform the review.

Dispatches to either Claude (`claude --print`) or Goose
(`goose run -i -`) based on agent_backend. Both read from stdin
and write plain text to stdout; the prompt and output handling
are identical regardless of backend.
Dispatches to Claude (`claude --print`), Codex (`codex exec
--json`), or Goose (`goose run -i -`) based on agent_backend.
All three read the prompt from stdin and return a single string
of review text; the prompt and output handling are identical
regardless of backend.

Claude path: supports sudo -u for OS-level isolation, process
group kills for cleanup, and per-run budget caps.

Codex path: sudo -H -u when claude_user is set (codex needs
HOME pointing at the user's home so it reads the right
~/.codex/auth.json); subprocess emits NDJSON which is collapsed
to the final agent_message text by extract_codex_text.

Goose path: no sudo (Goose has no user isolation), simple
proc.kill() for cleanup, --max-turns 1 as the safety limit.

Args:
prompt: The complete review prompt (from build_review_prompt).
claude_user: Optional OS user to run Claude as (via sudo -u).
Ignored when agent_backend is "goose".
agent_backend: Which LLM backend to use ("claude" or "goose").
claude_user: Optional OS user to run the subprocess as (via
sudo -u for claude, sudo -H -u for codex). Ignored when
agent_backend is "goose".
agent_backend: Which LLM backend to use ("claude", "codex",
or "goose").
provider: LLM provider name (e.g. "anthropic", "openai").
Only used when agent_backend is "goose".
Only used when agent_backend is "goose"; codex always
uses openai, claude always uses anthropic.

Returns:
The review text output from the LLM.

Raises:
RuntimeError: If the subprocess fails or times out.
"""
if agent_backend == "codex":
# Codex one-shot mode: --json emits NDJSON events on stdout
# (thread.started, turn.started, item.*, turn.completed,
# turn.failed, error). The final agent message text is
# recovered by walking the events and accumulating any text
# content; extract_codex_text in kai.codex_exec is the
# schema-defensive parser shared with triage.py.
# Per-user OAuth isolation: when claude_user is set (the
# webhook handler passes the same os_user value to both
# backends through this parameter), wrap the codex argv in
# `sudo -H -u <user>` so codex reads ~<user>/.codex/auth.json
# instead of the service user's home. The parameter name is
# claude-historical; rename is out of scope for this fix.
# No --max-budget-usd: codex on subscription auth has no
# per-call billing; runaway protection comes from
# timeout_s at the asyncio.wait_for below.
review_model = get_model_for(
ModelRole.PR_REVIEW,
agent_backend,
override=os.environ.get("PR_REVIEW_MODEL_CODEX", ""),
)
# Pin the absolute codex path when CODEX_BIN is set; same
# rationale as codex.py and triage.py - sudo cannot resolve
# bare `codex` when the binary lives in a per-os_user home
# that isn't on the service user's PATH. Falls back to bare
# "codex" for installs where codex is on PATH.
codex_bin = os.environ.get("CODEX_BIN") or "codex"
codex_cmd = [
codex_bin,
"exec",
"--json",
"--model",
review_model,
]

# Resolve self-sudo: skip the sudo wrap when claude_user
# matches the bot process user. Mirrors the triage codex
# branch's pattern.
effective_user = resolve_claude_user(claude_user)
if effective_user:
# -H sets HOME to <effective_user>'s pw entry so codex
# reads auth from the right home. --preserve-env passes
# KAI_WEBHOOK_SECRET through sudo's env_reset (the SETENV:
# sudoers rule allows this). Same shape claude uses.
cmd = [
"sudo",
"-H",
"-u",
effective_user,
"--preserve-env=KAI_WEBHOOK_SECRET",
"--",
] + codex_cmd
else:
cmd = codex_cmd

proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
# New session group in cross-user mode so killing sudo
# also kills the codex grandchild (mirrors claude branch
# and the triage codex branch).
start_new_session=bool(effective_user),
)

try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(input=prompt.encode()),
timeout=timeout_s,
)
except TimeoutError:
# Kill the subprocess tree if it exceeds the timeout. In
# cross-user mode (effective_user set) the process is in
# a new group (PGID == PID); kill the group so both sudo
# and the codex grandchild die, preventing orphans.
if effective_user:
try:
os.killpg(proc.pid, signal.SIGKILL)
except ProcessLookupError:
pass # Already dead
else:
proc.kill()
await proc.wait()
raise RuntimeError(f"Review subprocess timed out after {timeout_s}s") from None

if proc.returncode != 0:
error = stderr.decode().strip()
raise RuntimeError(f"Review subprocess failed (exit {proc.returncode}): {error}")

# Codex emits NDJSON; extract the final agent message text and
# return it (mirrors the contract the claude / goose branches
# already satisfy: a single string the caller hands back to
# the webhook handler for posting as a PR comment).
return extract_codex_text(stdout.decode())

if agent_backend == "goose":
if not provider:
raise ValueError(
Expand Down
Loading
Loading