From a66c99b10836a86df0e6dc688edeb651421870aa Mon Sep 17 00:00:00 2001 From: AliAlharmoodi Date: Sat, 14 Mar 2026 04:01:46 +0000 Subject: [PATCH 1/2] Add AgentWatch tool handling for Codex long-running jobs --- apps/server/src/agentWatch.test.ts | 44 +++ apps/server/src/agentWatch.ts | 308 ++++++++++++++++++ apps/server/src/codexAppServerManager.test.ts | 53 +++ apps/server/src/codexAppServerManager.ts | 52 +++ 4 files changed, 457 insertions(+) create mode 100644 apps/server/src/agentWatch.test.ts create mode 100644 apps/server/src/agentWatch.ts diff --git a/apps/server/src/agentWatch.test.ts b/apps/server/src/agentWatch.test.ts new file mode 100644 index 000000000..b30e59c19 --- /dev/null +++ b/apps/server/src/agentWatch.test.ts @@ -0,0 +1,44 @@ +import { describe, expect, it } from "vitest"; + +import { AgentWatch } from "./agentWatch"; + +describe("AgentWatch", () => { + it("starts a detached job and reports non-zero exits for inspection", async () => { + const watch = new AgentWatch(20); + + try { + const started = watch.start({ + command: "sleep 0.05; echo boom; exit 17", + staleAfterMs: 10_000, + }); + + await new Promise((resolve) => setTimeout(resolve, 220)); + + const status = watch.status(started.jobId); + expect(status.status).toBe("exited"); + expect(status.exitCode).toBe(17); + expect(status.shouldInspect).toBe(true); + expect(status.conditions.some((condition) => condition.code === "non_zero_exit")).toBe(true); + } finally { + watch.dispose(); + } + }); + + it("returns only flagged jobs by default when polling", async () => { + const watch = new AgentWatch(20); + + try { + watch.start({ + command: "sleep 0.2; echo done", + staleAfterMs: 10_000, + }); + + await new Promise((resolve) => setTimeout(resolve, 40)); + + const poll = watch.poll(); + expect(poll.jobs).toHaveLength(0); + } finally { + watch.dispose(); + } + }); +}); diff --git a/apps/server/src/agentWatch.ts b/apps/server/src/agentWatch.ts new file mode 100644 index 000000000..44c7f43cc --- /dev/null +++ b/apps/server/src/agentWatch.ts @@ -0,0 +1,308 @@ +import { closeSync, mkdtempSync, openSync, readFileSync, statSync } from "node:fs"; +import { tmpdir } from "node:os"; +import path from "node:path"; +import { spawn } from "node:child_process"; +import { randomUUID } from "node:crypto"; + +export type AgentWatchConditionCode = + | "stale_output" + | "non_zero_exit" + | "abnormal_exit" + | "missing_job"; + +export interface AgentWatchCondition { + code: AgentWatchConditionCode; + message: string; +} + +export interface AgentWatchJobSnapshot { + jobId: string; + label: string; + command: string; + cwd: string; + pid: number; + status: "running" | "exited"; + exitCode?: number; + startedAt: string; + finishedAt?: string; + lastOutputAt?: string; + outputFreshnessMs?: number; + shouldInspect: boolean; + conditions: AgentWatchCondition[]; +} + +interface AgentWatchJob { + jobId: string; + label: string; + command: string; + cwd: string; + pid: number; + logPath: string; + staleAfterMs: number; + startedAt: number; + finishedAt?: number; + exitCode?: number; + lastOutputAt?: number; +} + +const DEFAULT_STALE_AFTER_MS = 90_000; +const DEFAULT_WATCHDOG_INTERVAL_MS = 5_000; +const EXIT_MARKER_PREFIX = "__AGENTWATCH_EXIT_CODE:"; + +function nowIso(ms: number): string { + return new Date(ms).toISOString(); +} + +function isProcessAlive(pid: number): boolean { + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +function parseExitCodeFromLog(logPath: string): number | undefined { + try { + const content = readFileSync(logPath, "utf8"); + const match = content.match(/__AGENTWATCH_EXIT_CODE:(\d+)/g); + if (!match || match.length === 0) { + return undefined; + } + const latest = match[match.length - 1]; + const parsed = Number.parseInt(latest.slice(EXIT_MARKER_PREFIX.length), 10); + return Number.isInteger(parsed) ? parsed : undefined; + } catch { + return undefined; + } +} + +function tailLog(logPath: string, lines: number): string { + try { + const content = readFileSync(logPath, "utf8"); + const parsedLines = content.split(/\r?\n/g); + return parsedLines.slice(-Math.max(1, lines)).join("\n").trim(); + } catch { + return ""; + } +} + +export interface AgentWatchStartInput { + command: string; + cwd?: string; + label?: string; + staleAfterMs?: number; +} + +export interface AgentWatchPollInput { + jobId?: string; + includeHealthy?: boolean; +} + +export class AgentWatch { + private readonly jobs = new Map(); + private readonly runtimeDir: string; + private readonly interval: ReturnType; + + constructor(watchdogIntervalMs = DEFAULT_WATCHDOG_INTERVAL_MS) { + this.runtimeDir = mkdtempSync(path.join(tmpdir(), "agentwatch-")); + this.interval = setInterval(() => { + this.tick(); + }, watchdogIntervalMs); + this.interval.unref(); + } + + dispose(): void { + clearInterval(this.interval); + } + + start(input: AgentWatchStartInput): AgentWatchJobSnapshot { + const command = input.command.trim(); + if (command.length === 0) { + throw new Error("AgentWatch start requires a non-empty command."); + } + + const jobId = randomUUID(); + const cwd = input.cwd ?? process.cwd(); + const logPath = path.join(this.runtimeDir, `${jobId}.log`); + const logFd = openSync(logPath, "a"); + + const wrappedCommand = `${command}; __agentwatch_exit=$?; echo "${EXIT_MARKER_PREFIX}$__agentwatch_exit"`; + + const child = spawn("bash", ["-lc", wrappedCommand], { + cwd, + detached: true, + stdio: ["ignore", logFd, logFd], + env: process.env, + }); + child.unref(); + closeSync(logFd); + + const job: AgentWatchJob = { + jobId, + label: input.label?.trim() || `job-${jobId.slice(0, 8)}`, + command, + cwd, + pid: child.pid ?? -1, + logPath, + staleAfterMs: input.staleAfterMs ?? DEFAULT_STALE_AFTER_MS, + startedAt: Date.now(), + }; + + this.jobs.set(jobId, job); + this.tick(); + return this.toSnapshot(job, Date.now()); + } + + status(jobId: string): AgentWatchJobSnapshot { + this.tick(); + const job = this.jobs.get(jobId); + if (!job) { + return { + jobId, + label: "unknown", + command: "", + cwd: "", + pid: -1, + status: "exited", + startedAt: nowIso(Date.now()), + shouldInspect: true, + conditions: [{ code: "missing_job", message: `No AgentWatch job found for id ${jobId}.` }], + }; + } + + return this.toSnapshot(job, Date.now()); + } + + poll(input: AgentWatchPollInput = {}): { jobs: AgentWatchJobSnapshot[] } { + this.tick(); + const now = Date.now(); + + if (input.jobId) { + const snapshot = this.status(input.jobId); + if (!input.includeHealthy && !snapshot.shouldInspect) { + return { jobs: [] }; + } + return { jobs: [snapshot] }; + } + + const snapshots = Array.from(this.jobs.values()).map((job) => this.toSnapshot(job, now)); + return { + jobs: input.includeHealthy ? snapshots : snapshots.filter((job) => job.shouldInspect), + }; + } + + tail(jobId: string, lines = 50): { jobId: string; output: string } { + const job = this.jobs.get(jobId); + if (!job) { + return { jobId, output: "" }; + } + + return { + jobId, + output: tailLog(job.logPath, lines), + }; + } + + private tick(): void { + const now = Date.now(); + for (const job of this.jobs.values()) { + try { + const stats = statSync(job.logPath); + job.lastOutputAt = stats.mtimeMs; + } catch { + // log may not exist yet + } + + if (job.finishedAt !== undefined) { + continue; + } + + if (!isProcessAlive(job.pid)) { + job.finishedAt = now; + job.exitCode = parseExitCodeFromLog(job.logPath); + } + } + } + + private toSnapshot(job: AgentWatchJob, now: number): AgentWatchJobSnapshot { + const status = job.finishedAt === undefined ? "running" : "exited"; + const outputFreshnessMs = + job.lastOutputAt !== undefined ? Math.max(0, Math.round(now - job.lastOutputAt)) : undefined; + + const conditions: AgentWatchCondition[] = []; + + if (status === "running" && outputFreshnessMs !== undefined && outputFreshnessMs > job.staleAfterMs) { + conditions.push({ + code: "stale_output", + message: `No terminal output for ${outputFreshnessMs}ms (threshold ${job.staleAfterMs}ms).`, + }); + } + + if (status === "exited") { + if (job.exitCode === undefined) { + conditions.push({ + code: "abnormal_exit", + message: "Process exited without a recorded exit code.", + }); + } else if (job.exitCode !== 0) { + conditions.push({ + code: "non_zero_exit", + message: `Process exited with code ${job.exitCode}.`, + }); + } + } + + return { + jobId: job.jobId, + label: job.label, + command: job.command, + cwd: job.cwd, + pid: job.pid, + status, + ...(job.exitCode !== undefined ? { exitCode: job.exitCode } : {}), + startedAt: nowIso(job.startedAt), + ...(job.finishedAt !== undefined ? { finishedAt: nowIso(job.finishedAt) } : {}), + ...(job.lastOutputAt !== undefined ? { lastOutputAt: nowIso(job.lastOutputAt) } : {}), + ...(outputFreshnessMs !== undefined ? { outputFreshnessMs } : {}), + shouldInspect: conditions.length > 0, + conditions, + }; + } + + handleToolCall(toolName: string, args: Record | undefined): unknown { + if (toolName === "agentwatch.start") { + return { + job: this.start({ + command: typeof args?.command === "string" ? args.command : "", + ...(typeof args?.cwd === "string" ? { cwd: args.cwd } : {}), + ...(typeof args?.label === "string" ? { label: args.label } : {}), + ...(typeof args?.staleAfterMs === "number" ? { staleAfterMs: args.staleAfterMs } : {}), + }), + }; + } + + if (toolName === "agentwatch.status") { + if (typeof args?.jobId !== "string") { + throw new Error("agentwatch.status requires jobId."); + } + return { job: this.status(args.jobId) }; + } + + if (toolName === "agentwatch.poll") { + return this.poll({ + ...(typeof args?.jobId === "string" ? { jobId: args.jobId } : {}), + ...(typeof args?.includeHealthy === "boolean" ? { includeHealthy: args.includeHealthy } : {}), + }); + } + + if (toolName === "agentwatch.tail") { + if (typeof args?.jobId !== "string") { + throw new Error("agentwatch.tail requires jobId."); + } + return this.tail(args.jobId, typeof args?.lines === "number" ? args.lines : 50); + } + + throw new Error(`Unsupported tool: ${toolName}`); + } +} diff --git a/apps/server/src/codexAppServerManager.test.ts b/apps/server/src/codexAppServerManager.test.ts index cea8df0a0..9bbe59a87 100644 --- a/apps/server/src/codexAppServerManager.test.ts +++ b/apps/server/src/codexAppServerManager.test.ts @@ -748,6 +748,59 @@ describe("respondToUserInput", () => { }); }); + + +describe("handleServerRequest tool calls", () => { + it("routes item/tool/call to AgentWatch and writes a JSON-RPC result", () => { + const manager = new CodexAppServerManager(); + const context = { + session: { + sessionId: "sess_1", + provider: "codex", + status: "ready", + threadId: asThreadId("thread_1"), + resumeCursor: { threadId: "thread_1" }, + runtimeMode: "full-access", + createdAt: "2026-02-10T00:00:00.000Z", + updatedAt: "2026-02-10T00:00:00.000Z", + }, + pendingApprovals: new Map(), + pendingUserInputs: new Map(), + }; + + const writeMessage = vi + .spyOn(manager as unknown as { writeMessage: (...args: unknown[]) => void }, "writeMessage") + .mockImplementation(() => {}); + + const handleToolCall = vi + .spyOn( + (manager as unknown as { agentWatch: { handleToolCall: (...args: unknown[]) => unknown } }) + .agentWatch, + "handleToolCall", + ) + .mockReturnValue({ jobs: [] }); + + ( + manager as unknown as { + handleServerRequest: (context: typeof context, request: Record) => void; + } + ).handleServerRequest(context, { + jsonrpc: "2.0", + id: 3, + method: "item/tool/call", + params: { + toolName: "agentwatch.poll", + arguments: { includeHealthy: false }, + }, + }); + + expect(handleToolCall).toHaveBeenCalledWith("agentwatch.poll", { includeHealthy: false }); + expect(writeMessage).toHaveBeenCalledWith(context, { + id: 3, + result: { jobs: [] }, + }); + }); +}); describe.skipIf(!process.env.CODEX_BINARY_PATH)("startSession live Codex resume", () => { it("keeps prior thread history when resuming with a changed runtime mode", async () => { const workspaceDir = mkdtempSync(path.join(os.tmpdir(), "codex-live-resume-")); diff --git a/apps/server/src/codexAppServerManager.ts b/apps/server/src/codexAppServerManager.ts index a8a8ce460..b2604343f 100644 --- a/apps/server/src/codexAppServerManager.ts +++ b/apps/server/src/codexAppServerManager.ts @@ -27,6 +27,7 @@ import { isCodexCliVersionSupported, parseCodexCliVersion, } from "./provider/codexCliVersion"; +import { AgentWatch } from "./agentWatch"; type PendingRequestKey = string; @@ -1015,6 +1016,7 @@ export class CodexAppServerManager extends EventEmitter | undefined { + if (!value) { + return undefined; + } + + try { + const parsed = JSON.parse(value); + return this.readObject(parsed); + } catch { + return undefined; + } + } + private readObject(value: unknown, key?: string): Record | undefined { const target = key === undefined From 664fb015c2ac7f52a12a248741a0bfb0bc5378b0 Mon Sep 17 00:00:00 2001 From: AliAlharmoodi Date: Sat, 14 Mar 2026 19:15:51 +0000 Subject: [PATCH 2/2] Fix AgentWatch core typecheck regressions --- apps/server/src/agentWatch.ts | 20 +++++++++++++++---- apps/server/src/codexAppServerManager.test.ts | 5 ++--- apps/server/src/codexAppServerManager.ts | 4 +++- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/apps/server/src/agentWatch.ts b/apps/server/src/agentWatch.ts index 44c7f43cc..2142f4ef6 100644 --- a/apps/server/src/agentWatch.ts +++ b/apps/server/src/agentWatch.ts @@ -69,7 +69,10 @@ function parseExitCodeFromLog(logPath: string): number | undefined { if (!match || match.length === 0) { return undefined; } - const latest = match[match.length - 1]; + const latest = match.at(-1); + if (!latest) { + return undefined; + } const parsed = Number.parseInt(latest.slice(EXIT_MARKER_PREFIX.length), 10); return Number.isInteger(parsed) ? parsed : undefined; } catch { @@ -220,7 +223,10 @@ export class AgentWatch { if (!isProcessAlive(job.pid)) { job.finishedAt = now; - job.exitCode = parseExitCodeFromLog(job.logPath); + const exitCode = parseExitCodeFromLog(job.logPath); + if (exitCode !== undefined) { + job.exitCode = exitCode; + } } } } @@ -232,7 +238,11 @@ export class AgentWatch { const conditions: AgentWatchCondition[] = []; - if (status === "running" && outputFreshnessMs !== undefined && outputFreshnessMs > job.staleAfterMs) { + if ( + status === "running" && + outputFreshnessMs !== undefined && + outputFreshnessMs > job.staleAfterMs + ) { conditions.push({ code: "stale_output", message: `No terminal output for ${outputFreshnessMs}ms (threshold ${job.staleAfterMs}ms).`, @@ -292,7 +302,9 @@ export class AgentWatch { if (toolName === "agentwatch.poll") { return this.poll({ ...(typeof args?.jobId === "string" ? { jobId: args.jobId } : {}), - ...(typeof args?.includeHealthy === "boolean" ? { includeHealthy: args.includeHealthy } : {}), + ...(typeof args?.includeHealthy === "boolean" + ? { includeHealthy: args.includeHealthy } + : {}), }); } diff --git a/apps/server/src/codexAppServerManager.test.ts b/apps/server/src/codexAppServerManager.test.ts index 9bbe59a87..758b2161e 100644 --- a/apps/server/src/codexAppServerManager.test.ts +++ b/apps/server/src/codexAppServerManager.test.ts @@ -748,8 +748,6 @@ describe("respondToUserInput", () => { }); }); - - describe("handleServerRequest tool calls", () => { it("routes item/tool/call to AgentWatch and writes a JSON-RPC result", () => { const manager = new CodexAppServerManager(); @@ -767,6 +765,7 @@ describe("handleServerRequest tool calls", () => { pendingApprovals: new Map(), pendingUserInputs: new Map(), }; + type TestContext = typeof context; const writeMessage = vi .spyOn(manager as unknown as { writeMessage: (...args: unknown[]) => void }, "writeMessage") @@ -782,7 +781,7 @@ describe("handleServerRequest tool calls", () => { ( manager as unknown as { - handleServerRequest: (context: typeof context, request: Record) => void; + handleServerRequest: (context: TestContext, request: Record) => void; } ).handleServerRequest(context, { jsonrpc: "2.0", diff --git a/apps/server/src/codexAppServerManager.ts b/apps/server/src/codexAppServerManager.ts index b2604343f..10b0fe66e 100644 --- a/apps/server/src/codexAppServerManager.ts +++ b/apps/server/src/codexAppServerManager.ts @@ -515,6 +515,7 @@ export interface CodexAppServerManagerEvents { export class CodexAppServerManager extends EventEmitter { private readonly sessions = new Map(); + private readonly agentWatch = new AgentWatch(); private runPromise: (effect: Effect.Effect) => Promise; constructor(services?: ServiceMap.ServiceMap) { @@ -1219,7 +1220,8 @@ export class CodexAppServerManager extends EventEmitter