Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions packages/agent/src/adapters/codex/codex-agent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,92 @@ describe("CodexAcpAgent", () => {
});
});

it("serializes concurrent prompts so usage accumulators are not wiped mid-turn", async () => {
const { agent } = createAgent();
mockCodexConnection.newSession.mockResolvedValue({
sessionId: "session-1",
modes: { currentModeId: "auto", availableModes: [] },
configOptions: [],
} satisfies Partial<NewSessionResponse>);
await agent.newSession({
cwd: process.cwd(),
_meta: { taskRunId: "run-1" },
} as never);

const callOrder: string[] = [];
let releaseA: () => void;
const aStarted = new Promise<void>((resolve) => {
releaseA = resolve;
});
let allowAResolve: () => void;
const aHold = new Promise<void>((resolve) => {
allowAResolve = resolve;
});

mockCodexConnection.prompt.mockImplementationOnce(async () => {
callOrder.push("A:start");
releaseA();
await aHold;
callOrder.push("A:end");
return { stopReason: "end_turn" };
});
mockCodexConnection.prompt.mockImplementationOnce(async () => {
callOrder.push("B:start");
return { stopReason: "end_turn" };
});

const promptA = agent.prompt({
sessionId: "session-1",
prompt: [{ type: "text", text: "A" }],
} as never);

await aStarted;

const promptB = agent.prompt({
sessionId: "session-1",
prompt: [{ type: "text", text: "B" }],
} as never);

// B must not have started while A is still in-flight.
expect(callOrder).toEqual(["A:start"]);

allowAResolve!();
await Promise.all([promptA, promptB]);

expect(callOrder).toEqual(["A:start", "A:end", "B:start"]);
});

it("does not let a failing prompt block subsequent prompts", async () => {
const { agent } = createAgent();
mockCodexConnection.newSession.mockResolvedValue({
sessionId: "session-1",
modes: { currentModeId: "auto", availableModes: [] },
configOptions: [],
} satisfies Partial<NewSessionResponse>);
await agent.newSession({
cwd: process.cwd(),
} as never);

mockCodexConnection.prompt.mockRejectedValueOnce(new Error("boom"));
mockCodexConnection.prompt.mockResolvedValueOnce({
stopReason: "end_turn",
});

await expect(
agent.prompt({
sessionId: "session-1",
prompt: [{ type: "text", text: "A" }],
} as never),
).rejects.toThrow("boom");

await expect(
agent.prompt({
sessionId: "session-1",
prompt: [{ type: "text", text: "B" }],
} as never),
).resolves.toEqual({ stopReason: "end_turn" });
});

it("broadcasts user prompt as user_message_chunk before delegating to codex-acp", async () => {
const { agent, client } = createAgent();
// Seed an active session so prompt() has the state it expects.
Expand Down
18 changes: 18 additions & 0 deletions packages/agent/src/adapters/codex/codex-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,17 @@ export class CodexAcpAgent extends BaseAcpAgent {
private codexProcess: CodexProcess;
private codexConnection: ClientSideConnection;
private sessionState: CodexSessionState;
/**
* FIFO serializer for prompt() calls. codex-acp and codex-rs themselves
* serialize submissions at the conversation level, but our adapter
* accumulates per-turn usage into sessionState.accumulatedUsage via the
* codex-client sessionUpdate handler. If two prompts ran concurrently on
* the JS side, the second's resetUsage() would wipe out the first's
* in-flight counters and both TURN_COMPLETE notifications would report
* garbled totals. Serializing on the JS side keeps the accumulator
* single-owner.
*/
private promptMutex: Promise<unknown> = Promise.resolve();

constructor(client: AgentSideConnection, options: CodexAcpAgentOptions) {
super(client);
Expand Down Expand Up @@ -397,6 +408,13 @@ export class CodexAcpAgent extends BaseAcpAgent {
}

async prompt(params: PromptRequest): Promise<PromptResponse> {
const previous = this.promptMutex;
const next = previous.catch(() => {}).then(() => this.runPrompt(params));
this.promptMutex = next;
return next;
}

private async runPrompt(params: PromptRequest): Promise<PromptResponse> {
this.session.cancelled = false;
this.session.interruptReason = undefined;
resetUsage(this.sessionState);
Expand Down
Loading