Skip to content

Commit 3af3e64

Browse files
committed
fix(cloud-agent): serialize concurrent codex prompts to fix usage accounting
1 parent 4fee8b5 commit 3af3e64

2 files changed

Lines changed: 104 additions & 0 deletions

File tree

packages/agent/src/adapters/codex/codex-agent.test.ts

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,92 @@ describe("CodexAcpAgent", () => {
209209
});
210210
});
211211

212+
it("serializes concurrent prompts so usage accumulators are not wiped mid-turn", async () => {
213+
const { agent } = createAgent();
214+
mockCodexConnection.newSession.mockResolvedValue({
215+
sessionId: "session-1",
216+
modes: { currentModeId: "auto", availableModes: [] },
217+
configOptions: [],
218+
} satisfies Partial<NewSessionResponse>);
219+
await agent.newSession({
220+
cwd: process.cwd(),
221+
_meta: { taskRunId: "run-1" },
222+
} as never);
223+
224+
const callOrder: string[] = [];
225+
let releaseA: () => void;
226+
const aStarted = new Promise<void>((resolve) => {
227+
releaseA = resolve;
228+
});
229+
let allowAResolve: () => void;
230+
const aHold = new Promise<void>((resolve) => {
231+
allowAResolve = resolve;
232+
});
233+
234+
mockCodexConnection.prompt.mockImplementationOnce(async () => {
235+
callOrder.push("A:start");
236+
releaseA();
237+
await aHold;
238+
callOrder.push("A:end");
239+
return { stopReason: "end_turn" };
240+
});
241+
mockCodexConnection.prompt.mockImplementationOnce(async () => {
242+
callOrder.push("B:start");
243+
return { stopReason: "end_turn" };
244+
});
245+
246+
const promptA = agent.prompt({
247+
sessionId: "session-1",
248+
prompt: [{ type: "text", text: "A" }],
249+
} as never);
250+
251+
await aStarted;
252+
253+
const promptB = agent.prompt({
254+
sessionId: "session-1",
255+
prompt: [{ type: "text", text: "B" }],
256+
} as never);
257+
258+
// B must not have started while A is still in-flight.
259+
expect(callOrder).toEqual(["A:start"]);
260+
261+
allowAResolve!();
262+
await Promise.all([promptA, promptB]);
263+
264+
expect(callOrder).toEqual(["A:start", "A:end", "B:start"]);
265+
});
266+
267+
it("does not let a failing prompt block subsequent prompts", async () => {
268+
const { agent } = createAgent();
269+
mockCodexConnection.newSession.mockResolvedValue({
270+
sessionId: "session-1",
271+
modes: { currentModeId: "auto", availableModes: [] },
272+
configOptions: [],
273+
} satisfies Partial<NewSessionResponse>);
274+
await agent.newSession({
275+
cwd: process.cwd(),
276+
} as never);
277+
278+
mockCodexConnection.prompt.mockRejectedValueOnce(new Error("boom"));
279+
mockCodexConnection.prompt.mockResolvedValueOnce({
280+
stopReason: "end_turn",
281+
});
282+
283+
await expect(
284+
agent.prompt({
285+
sessionId: "session-1",
286+
prompt: [{ type: "text", text: "A" }],
287+
} as never),
288+
).rejects.toThrow("boom");
289+
290+
await expect(
291+
agent.prompt({
292+
sessionId: "session-1",
293+
prompt: [{ type: "text", text: "B" }],
294+
} as never),
295+
).resolves.toEqual({ stopReason: "end_turn" });
296+
});
297+
212298
it("broadcasts user prompt as user_message_chunk before delegating to codex-acp", async () => {
213299
const { agent, client } = createAgent();
214300
// Seed an active session so prompt() has the state it expects.

packages/agent/src/adapters/codex/codex-agent.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,17 @@ export class CodexAcpAgent extends BaseAcpAgent {
145145
private codexProcess: CodexProcess;
146146
private codexConnection: ClientSideConnection;
147147
private sessionState: CodexSessionState;
148+
/**
149+
* FIFO serializer for prompt() calls. codex-acp and codex-rs themselves
150+
* serialize submissions at the conversation level, but our adapter
151+
* accumulates per-turn usage into sessionState.accumulatedUsage via the
152+
* codex-client sessionUpdate handler. If two prompts ran concurrently on
153+
* the JS side, the second's resetUsage() would wipe out the first's
154+
* in-flight counters and both TURN_COMPLETE notifications would report
155+
* garbled totals. Serializing on the JS side keeps the accumulator
156+
* single-owner.
157+
*/
158+
private promptMutex: Promise<unknown> = Promise.resolve();
148159

149160
constructor(client: AgentSideConnection, options: CodexAcpAgentOptions) {
150161
super(client);
@@ -397,6 +408,13 @@ export class CodexAcpAgent extends BaseAcpAgent {
397408
}
398409

399410
async prompt(params: PromptRequest): Promise<PromptResponse> {
411+
const previous = this.promptMutex;
412+
const next = previous.catch(() => {}).then(() => this.runPrompt(params));
413+
this.promptMutex = next;
414+
return next;
415+
}
416+
417+
private async runPrompt(params: PromptRequest): Promise<PromptResponse> {
400418
this.session.cancelled = false;
401419
this.session.interruptReason = undefined;
402420
resetUsage(this.sessionState);

0 commit comments

Comments
 (0)