From 1ba067c76d613977b1b351b4630b6559c5d3cca4 Mon Sep 17 00:00:00 2001 From: Alessandro Pogliaghi Date: Fri, 17 Apr 2026 15:27:55 +0100 Subject: [PATCH] fix(cloud-agent): serialize concurrent codex prompts to fix usage accounting --- .../src/adapters/codex/codex-agent.test.ts | 86 +++++++++++++++++++ .../agent/src/adapters/codex/codex-agent.ts | 18 ++++ 2 files changed, 104 insertions(+) diff --git a/packages/agent/src/adapters/codex/codex-agent.test.ts b/packages/agent/src/adapters/codex/codex-agent.test.ts index 9f4bf5c2d..d915addd4 100644 --- a/packages/agent/src/adapters/codex/codex-agent.test.ts +++ b/packages/agent/src/adapters/codex/codex-agent.test.ts @@ -209,6 +209,92 @@ describe("CodexAcpAgent", () => { }); }); + it("serializes concurrent prompts so usage accumulators are not wiped mid-turn", async () => { + const { agent } = createAgent(); + mockCodexConnection.newSession.mockResolvedValue({ + sessionId: "session-1", + modes: { currentModeId: "auto", availableModes: [] }, + configOptions: [], + } satisfies Partial); + await agent.newSession({ + cwd: process.cwd(), + _meta: { taskRunId: "run-1" }, + } as never); + + const callOrder: string[] = []; + let releaseA: () => void; + const aStarted = new Promise((resolve) => { + releaseA = resolve; + }); + let allowAResolve: () => void; + const aHold = new Promise((resolve) => { + allowAResolve = resolve; + }); + + mockCodexConnection.prompt.mockImplementationOnce(async () => { + callOrder.push("A:start"); + releaseA(); + await aHold; + callOrder.push("A:end"); + return { stopReason: "end_turn" }; + }); + mockCodexConnection.prompt.mockImplementationOnce(async () => { + callOrder.push("B:start"); + return { stopReason: "end_turn" }; + }); + + const promptA = agent.prompt({ + sessionId: "session-1", + prompt: [{ type: "text", text: "A" }], + } as never); + + await aStarted; + + const promptB = agent.prompt({ + sessionId: "session-1", + prompt: [{ type: "text", text: "B" }], + } as never); + + // B must not have started while A is still in-flight. + expect(callOrder).toEqual(["A:start"]); + + allowAResolve!(); + await Promise.all([promptA, promptB]); + + expect(callOrder).toEqual(["A:start", "A:end", "B:start"]); + }); + + it("does not let a failing prompt block subsequent prompts", async () => { + const { agent } = createAgent(); + mockCodexConnection.newSession.mockResolvedValue({ + sessionId: "session-1", + modes: { currentModeId: "auto", availableModes: [] }, + configOptions: [], + } satisfies Partial); + await agent.newSession({ + cwd: process.cwd(), + } as never); + + mockCodexConnection.prompt.mockRejectedValueOnce(new Error("boom")); + mockCodexConnection.prompt.mockResolvedValueOnce({ + stopReason: "end_turn", + }); + + await expect( + agent.prompt({ + sessionId: "session-1", + prompt: [{ type: "text", text: "A" }], + } as never), + ).rejects.toThrow("boom"); + + await expect( + agent.prompt({ + sessionId: "session-1", + prompt: [{ type: "text", text: "B" }], + } as never), + ).resolves.toEqual({ stopReason: "end_turn" }); + }); + it("broadcasts user prompt as user_message_chunk before delegating to codex-acp", async () => { const { agent, client } = createAgent(); // Seed an active session so prompt() has the state it expects. diff --git a/packages/agent/src/adapters/codex/codex-agent.ts b/packages/agent/src/adapters/codex/codex-agent.ts index c975fa7af..0dcbf0f78 100644 --- a/packages/agent/src/adapters/codex/codex-agent.ts +++ b/packages/agent/src/adapters/codex/codex-agent.ts @@ -145,6 +145,17 @@ export class CodexAcpAgent extends BaseAcpAgent { private codexProcess: CodexProcess; private codexConnection: ClientSideConnection; private sessionState: CodexSessionState; + /** + * FIFO serializer for prompt() calls. codex-acp and codex-rs themselves + * serialize submissions at the conversation level, but our adapter + * accumulates per-turn usage into sessionState.accumulatedUsage via the + * codex-client sessionUpdate handler. If two prompts ran concurrently on + * the JS side, the second's resetUsage() would wipe out the first's + * in-flight counters and both TURN_COMPLETE notifications would report + * garbled totals. Serializing on the JS side keeps the accumulator + * single-owner. + */ + private promptMutex: Promise = Promise.resolve(); constructor(client: AgentSideConnection, options: CodexAcpAgentOptions) { super(client); @@ -397,6 +408,13 @@ export class CodexAcpAgent extends BaseAcpAgent { } async prompt(params: PromptRequest): Promise { + const previous = this.promptMutex; + const next = previous.catch(() => {}).then(() => this.runPrompt(params)); + this.promptMutex = next; + return next; + } + + private async runPrompt(params: PromptRequest): Promise { this.session.cancelled = false; this.session.interruptReason = undefined; resetUsage(this.sessionState);