Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions src/supervisor/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,109 @@ describe("SupervisorRuntime thread input", () => {
);
});

it("flushes buffered runtime events before emitting a structured turn-end idle", () => {
const emitted: Array<Record<string, unknown>> = [];
const runtime = makeRuntime((event) => {
emitted.push(event as Record<string, unknown>);
});

(
runtime as unknown as {
spawnThread: (input: Record<string, unknown>) => { status: string };
}
).spawnThread({
threadId: "thread-gui-flush",
agentKind: "codex",
adapter: {
kind: "codex",
label: "Codex",
capabilities: {
models: [{ id: "gpt-5.4", label: "5.4" }],
efforts: ["low"],
modelEfforts: {},
modes: ["agent"],
approvalPolicies: [{ id: "on-request", label: "On Request" }],
sandboxModes: [{ id: "read-only", label: "Read Only" }],
supportsResume: true,
supportsDirectInput: true,
liveInputMode: "server",
presentationMode: "gui",
},
},
projectLocation: {
kind: "windows",
path: "C:\\repo",
},
config: {
model: "gpt-5.4",
},
initialSize: {
cols: 120,
rows: 30,
},
launchPrompt: "",
structuredSession: {
launchOptions: {},
setListener: vi.fn<(listener: unknown) => void>(),
dispose: vi.fn<() => Promise<void>>().mockResolvedValue(undefined),
startTurn: vi.fn<() => Promise<void>>().mockResolvedValue(undefined),
},
presentationMode: "gui",
});

const session = (
runtime as unknown as {
sessions: Map<
string,
{
status: string;
attention: string;
structuredSession: { setListener: ReturnType<typeof vi.fn> };
}
>;
}
).sessions.get("thread-gui-flush")!;
session.status = "working";
session.attention = "working";

const listener = (session.structuredSession.setListener as ReturnType<typeof vi.fn>).mock
.calls[0]?.[0] as {
onUpdate: (update: { status: string; attention: string }) => void;
onRuntimeEvent: (event: RuntimeEvent) => void;
};

// Isolate the turn-end sequence from any spawn-time emits.
emitted.length = 0;

// The turn's final assistant delta is appended to the 16ms runtime-event
// batch buffer (timer not yet fired — no emit yet).
listener.onRuntimeEvent({
type: "content.delta",
threadId: "thread-gui-flush",
itemId: "assistant-1",
stream: "assistant_text",
delta: "done",
});

// Turn completes: the structured session reports idle. The status
// `thread-state` is emitted immediately, so without flushing first it would
// overtake the still-buffered runtime event on the wire and let the renderer
// re-open the GUI turn to "working".
listener.onUpdate({ status: "idle", attention: "none" });

const runtimeIdx = emitted.findIndex(
(e) =>
e.type === "thread-runtime-event" ||
e.type === "thread-runtime-events" ||
e.type === "thread-runtime-events-multi",
);
const idleIdx = emitted.findIndex((e) => e.type === "thread-state" && e.status === "idle");

expect(runtimeIdx).toBeGreaterThanOrEqual(0);
expect(idleIdx).toBeGreaterThanOrEqual(0);
expect(runtimeIdx).toBeLessThan(idleIdx);
});

it("drains a pending steer when the working turn fails with error status", async () => {
const runtime = makeRuntime(() => undefined);
const startTurn = vi.fn<() => Promise<void>>().mockResolvedValue(undefined);
Expand Down
11 changes: 11 additions & 0 deletions src/supervisor/runtime/threadSessionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1703,6 +1703,17 @@ export class ThreadSessionManager {
session.suppressInitialStructuredIdle = undefined;
}

// Wire-ordering: `thread-state` (status) events are emitted to the
// renderer immediately, but this session's runtime events are batched
// (RuntimeEventBuffer, ~16ms). Without flushing here, a turn-end `idle`
// can overtake the turn's final runtime events on the IPC wire; those
// trailing events then land after `idle` in the renderer and re-open the
// GUI turn to "working" via reopenGuiTurnForLiveRuntimeActivity, leaving a
// stale "working" until the next snapshot reconcile (on thread switch).
// Flushing first guarantees the renderer applies the final events before
// the status change, mirroring its own flushPendingRuntimeEventsSync.
this.flushRuntimeEvents();

this.outputPipeline.updateState(
session,
update.status,
Expand Down