From d4003128c609a31312791bd9234e895393f64768 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 29 Mar 2026 22:37:05 +0000 Subject: [PATCH 1/5] Fix stalled execution completion handling Co-authored-by: Aiden Bai --- apps/cli/src/utils/run-test.ts | 6 +- apps/cli/tests/ci-reporter.test.ts | 14 ++ packages/shared/src/models.ts | 135 +++++++++++++++----- packages/shared/tests/dynamic-steps.test.ts | 65 ++++++++++ packages/supervisor/src/executor.ts | 127 ++++++++++++++---- 5 files changed, 293 insertions(+), 54 deletions(-) diff --git a/apps/cli/src/utils/run-test.ts b/apps/cli/src/utils/run-test.ts index 58c33c104..b785d8db3 100644 --- a/apps/cli/src/utils/run-test.ts +++ b/apps/cli/src/utils/run-test.ts @@ -94,21 +94,24 @@ export const runHeadless = (options: HeadlessRunOptions) => for (const event of executed.events) { if (seenEvents.has(event.id)) continue; seenEvents.add(event.id); - lastOutputAt = Date.now(); switch (event._tag) { case "RunStarted": + lastOutputAt = Date.now(); ciReporter.planTitle(event.plan.title, Option.getOrUndefined(event.plan.baseUrl)); break; case "StepStarted": + lastOutputAt = Date.now(); ciReporter.stepStarted(event.title); break; case "StepCompleted": { + lastOutputAt = Date.now(); const step = executed.steps.find((step) => step.id === event.stepId); const elapsed = step ? getStepElapsedMs(step) : undefined; ciReporter.stepCompleted(event.summary, elapsed); break; } case "StepFailed": { + lastOutputAt = Date.now(); const failedStep = executed.steps.find((step) => step.id === event.stepId); const failedTitle = failedStep?.title ?? event.stepId; const failedElapsed = failedStep ? getStepElapsedMs(failedStep) : undefined; @@ -116,6 +119,7 @@ export const runHeadless = (options: HeadlessRunOptions) => break; } case "StepSkipped": { + lastOutputAt = Date.now(); const skippedStep = executed.steps.find((step) => step.id === event.stepId); const skippedTitle = skippedStep?.title ?? event.stepId; ciReporter.stepSkipped(skippedTitle, event.reason); diff --git a/apps/cli/tests/ci-reporter.test.ts b/apps/cli/tests/ci-reporter.test.ts index eaa6e6b08..00505984d 100644 --- a/apps/cli/tests/ci-reporter.test.ts +++ b/apps/cli/tests/ci-reporter.test.ts @@ -313,5 +313,19 @@ describe("createCiReporter", () => { expect(output).toContain("Still running"); expect(output).toContain("2m"); }); + + it("remains the only visible progress signal when no step events are printed", () => { + const reporter = createCiReporter({ + version: "1.0.0", + agent: "claude", + timeoutMs: undefined, + isGitHubActions: false, + }); + reporter.heartbeat(120_000); + const output = stderrText(); + expect(output).toContain("Still running"); + expect(output).not.toContain("STEP_START|"); + expect(output).not.toContain("RUN_COMPLETED|"); + }); }); }); diff --git a/packages/shared/src/models.ts b/packages/shared/src/models.ts index 71a0e364d..2e8e07596 100644 --- a/packages/shared/src/models.ts +++ b/packages/shared/src/models.ts @@ -594,14 +594,16 @@ const serializeToolResult = (value: unknown): string => { }; const parseMarker = (line: string): ExecutionEvent | undefined => { - const pipeIndex = line.indexOf("|"); + const normalizedLine = line.endsWith("\r") ? line.slice(0, -1) : line; + const pipeIndex = normalizedLine.indexOf("|"); if (pipeIndex === -1) return undefined; - const marker = line.slice(0, pipeIndex); - const rest = line.slice(pipeIndex + 1); + const marker = normalizedLine.slice(0, pipeIndex); + const rest = normalizedLine.slice(pipeIndex + 1); const secondPipeIndex = rest.indexOf("|"); - const first = secondPipeIndex === -1 ? rest : rest.slice(0, secondPipeIndex); - const second = secondPipeIndex === -1 ? "" : rest.slice(secondPipeIndex + 1); + if (secondPipeIndex === -1) return undefined; + const first = rest.slice(0, secondPipeIndex); + const second = rest.slice(secondPipeIndex + 1); if (marker === "STEP_START") { return new StepStarted({ stepId: StepId.makeUnsafe(first), title: second }); @@ -669,6 +671,83 @@ export class Update extends Schema.Class("@supervisor/Update")({ receivedAt: Schema.DateTimeUtc, }) {} +interface ProcessedTextBlock { + readonly events: readonly ExecutionEvent[]; + readonly markers: readonly ExecutionEvent[]; +} + +const buildTextEvent = ( + textEventTag: "AgentText" | "AgentThinking", + text: string, +): AgentText | AgentThinking => + textEventTag === "AgentText" ? new AgentText({ text }) : new AgentThinking({ text }); + +const processTextBlock = ( + textEventTag: "AgentText" | "AgentThinking", + text: string, + includeTrailingPartialLine: boolean, +): ProcessedTextBlock => { + const events: ExecutionEvent[] = []; + const markers: ExecutionEvent[] = []; + let bufferedText = ""; + + const flushBufferedText = () => { + if (bufferedText.length === 0) return; + events.push(buildTextEvent(textEventTag, bufferedText)); + bufferedText = ""; + }; + + const processLine = (line: string, hasTrailingNewline: boolean) => { + const marker = parseMarker(line); + if (marker !== undefined) { + flushBufferedText(); + events.push(marker); + markers.push(marker); + return; + } + bufferedText += line; + if (hasTrailingNewline) { + bufferedText += "\n"; + } + }; + + let completeLines: readonly string[] = []; + let trailingLine = ""; + + if (text.endsWith("\n")) { + completeLines = text.slice(0, -1).split("\n"); + } else { + const parts = text.split("\n"); + trailingLine = parts.pop() ?? ""; + completeLines = parts; + } + + for (const completeLine of completeLines) { + processLine(completeLine, true); + } + + if (includeTrailingPartialLine) { + processLine(trailingLine, false); + } else { + bufferedText += trailingLine; + } + + flushBufferedText(); + + return { events, markers }; +}; + +const applyMarkersToPlan = ( + executed: ExecutedTestPlan, + markers: readonly ExecutionEvent[], +): ExecutedTestPlan => { + let result = executed; + for (const marker of markers) { + result = result.applyMarker(marker); + } + return result; +}; + export class PullRequest extends Schema.Class("@supervisor/PullRequest")({ number: Schema.Number, url: Schema.String, @@ -753,38 +832,40 @@ export class ExecutedTestPlan extends TestPlan.extend( if (update.content.type !== "text" || update.content.text === undefined) return this; const lastEvent = this.events.at(-1); if (lastEvent?._tag === "AgentThinking") { - return new ExecutedTestPlan({ + const processed = processTextBlock("AgentThinking", lastEvent.text + update.content.text, false); + const withEvents = new ExecutedTestPlan({ ...this, - events: [ - ...this.events.slice(0, -1), - new AgentThinking({ text: lastEvent.text + update.content.text }), - ], + events: [...this.events.slice(0, -1), ...processed.events], }); + return applyMarkersToPlan(withEvents, processed.markers); } const base = this.finalizeTextBlock(); - return new ExecutedTestPlan({ + const processed = processTextBlock("AgentThinking", update.content.text, false); + const withEvents = new ExecutedTestPlan({ ...base, - events: [...base.events, new AgentThinking({ text: update.content.text })], + events: [...base.events, ...processed.events], }); + return applyMarkersToPlan(withEvents, processed.markers); } if (update.sessionUpdate === "agent_message_chunk") { if (update.content.type !== "text" || update.content.text === undefined) return this; const lastEvent = this.events.at(-1); if (lastEvent?._tag === "AgentText") { - return new ExecutedTestPlan({ + const processed = processTextBlock("AgentText", lastEvent.text + update.content.text, false); + const withEvents = new ExecutedTestPlan({ ...this, - events: [ - ...this.events.slice(0, -1), - new AgentText({ text: lastEvent.text + update.content.text }), - ], + events: [...this.events.slice(0, -1), ...processed.events], }); + return applyMarkersToPlan(withEvents, processed.markers); } const base = this.finalizeTextBlock(); - return new ExecutedTestPlan({ + const processed = processTextBlock("AgentText", update.content.text, false); + const withEvents = new ExecutedTestPlan({ ...base, - events: [...base.events, new AgentText({ text: update.content.text })], + events: [...base.events, ...processed.events], }); + return applyMarkersToPlan(withEvents, processed.markers); } if (update.sessionUpdate === "tool_call") { @@ -859,19 +940,13 @@ export class ExecutedTestPlan extends TestPlan.extend( finalizeTextBlock(): ExecutedTestPlan { const lastEvent = this.events.at(-1); if (lastEvent?._tag !== "AgentText" && lastEvent?._tag !== "AgentThinking") return this; - const foundMarkers = lastEvent.text - .split("\n") - .map(parseMarker) - .filter(Predicate.isNotUndefined); - if (foundMarkers.length === 0) return this; - let result: ExecutedTestPlan = new ExecutedTestPlan({ + const processed = processTextBlock(lastEvent._tag, lastEvent.text, true); + if (processed.markers.length === 0) return this; + const withEvents = new ExecutedTestPlan({ ...this, - events: [...this.events, ...foundMarkers], + events: [...this.events.slice(0, -1), ...processed.events], }); - for (const marker of foundMarkers) { - result = result.applyMarker(marker); - } - return result; + return applyMarkersToPlan(withEvents, processed.markers); } applyMarker(marker: ExecutionEvent): ExecutedTestPlan { diff --git a/packages/shared/tests/dynamic-steps.test.ts b/packages/shared/tests/dynamic-steps.test.ts index 516a80ea5..93242eaf9 100644 --- a/packages/shared/tests/dynamic-steps.test.ts +++ b/packages/shared/tests/dynamic-steps.test.ts @@ -256,6 +256,71 @@ describe("dynamic step discovery", () => { expect(finalized.steps[0].status).toBe("failed"); expect(finalized.hasRunFinished).toBe(true); }); + + it("addEvent parses complete marker lines immediately and strips them from AgentText", () => { + let executed = makeEmptyExecuted(); + + const chunk = (text: string) => + new AcpAgentMessageChunk({ + sessionUpdate: "agent_message_chunk", + content: { type: "text" as const, text }, + }); + + executed = executed.addEvent(chunk("Planning next action...\nSTEP_START|step-01|Open login page\n")); + + expect(executed.steps.length).toBe(1); + expect(executed.steps[0].title).toBe("Open login page"); + expect(executed.steps[0].status).toBe("active"); + expect(executed.events.some((event) => event._tag === "StepStarted")).toBe(true); + + const agentTextEvents = executed.events.filter( + (event): event is AgentText => event._tag === "AgentText", + ); + expect(agentTextEvents).toHaveLength(1); + expect(agentTextEvents[0].text).toBe("Planning next action...\n"); + expect(agentTextEvents[0].text.includes("STEP_START|step-01|Open login page")).toBe(false); + + executed = executed.addEvent(chunk("STEP_DONE|step-01|Login page loaded\nRUN_COMPLETED|passed|Completed successfully\n")); + + expect(executed.steps[0].status).toBe("passed"); + expect(executed.hasRunFinished).toBe(true); + const runFinished = executed.events.find( + (event): event is RunFinished => event._tag === "RunFinished", + ); + expect(runFinished?.summary).toBe("Completed successfully"); + + const textAfterCompletion = executed.events.filter( + (event): event is AgentText => event._tag === "AgentText", + ); + expect(textAfterCompletion).toHaveLength(1); + expect(textAfterCompletion[0].text).toBe("Planning next action...\n"); + }); + + it("keeps partial marker text buffered until the line is complete", () => { + let executed = makeEmptyExecuted(); + + const chunk = (text: string) => + new AcpAgentMessageChunk({ + sessionUpdate: "agent_message_chunk", + content: { type: "text" as const, text }, + }); + + executed = executed.addEvent(chunk("STEP_START|step-01|Open")); + expect(executed.steps).toHaveLength(0); + expect(executed.hasRunFinished).toBe(false); + const partialText = executed.events.findLast( + (event): event is AgentText => event._tag === "AgentText", + ); + expect(partialText?.text).toBe("STEP_START|step-01|Open"); + + executed = executed.addEvent(chunk(" login page\n")); + expect(executed.steps).toHaveLength(1); + expect(executed.steps[0].title).toBe("Open login page"); + const remainingText = executed.events.filter( + (event): event is AgentText => event._tag === "AgentText", + ); + expect(remainingText).toHaveLength(0); + }); }); describe("run completion detection", () => { diff --git a/packages/supervisor/src/executor.ts b/packages/supervisor/src/executor.ts index 86f4e6d46..3a532eaed 100644 --- a/packages/supervisor/src/executor.ts +++ b/packages/supervisor/src/executor.ts @@ -7,7 +7,7 @@ import { Agent, AgentStreamOptions, } from "@expect/agent"; -import { Effect, Layer, Option, Schema, ServiceMap, Stream } from "effect"; +import { Effect, Fiber, Layer, Option, Queue, Ref, Schema, ServiceMap, Stream } from "effect"; import { type ChangesFor, type ChangedFile, @@ -170,28 +170,109 @@ export class Executor extends ServiceMap.Service()("@supervisor/Execut mcpEnv, }); - return agent.stream(streamOptions).pipe( - Stream.mapAccum( - (): ExecutorAccumState => ({ - plan: initial, - allTerminalSince: undefined, - }), - (state, part) => { - const updated = state.plan.addEvent(part); - const terminalTimestamp = resolveTerminalTimestamp(updated, state.allTerminalSince); - const finalized = - terminalTimestamp !== undefined && - !updated.hasRunFinished && - Date.now() - terminalTimestamp >= ALL_STEPS_TERMINAL_GRACE_MS - ? updated.synthesizeRunFinished() - : updated; - - return [{ plan: finalized, allTerminalSince: terminalTimestamp }, [finalized]] as const; - }, - ), - Stream.takeUntil((executed) => executed.hasRunFinished), - Stream.mapError((reason) => new ExecutionError({ reason })), - ); + return yield* Effect.gen(function* () { + const outputQueue = yield* Queue.unbounded(); + const terminalGraceFiberRef = yield* Ref.make>>( + Option.none(), + ); + + const emit = (executed: ExecutedTestPlan) => Queue.offer(outputQueue, executed); + yield* emit(initial); + const cancelTerminalGraceWatcher = Effect.gen(function* () { + const currentFiber = yield* Ref.get(terminalGraceFiberRef); + if (Option.isNone(currentFiber)) return; + yield* Fiber.interrupt(currentFiber.value); + yield* Ref.set(terminalGraceFiberRef, Option.none()); + }); + const updateTerminalGraceWatcher = (executed: ExecutedTestPlan) => + Effect.gen(function* () { + if (executed.hasRunFinished || !executed.allStepsTerminal) { + yield* cancelTerminalGraceWatcher; + return; + } + + const currentFiber = yield* Ref.get(terminalGraceFiberRef); + if (Option.isSome(currentFiber)) { + return; + } + + const graceFiber = yield* Effect.gen(function* () { + yield* Effect.sleep(`${ALL_STEPS_TERMINAL_GRACE_MS} millis`); + yield* emit(executed.synthesizeRunFinished()); + yield* Queue.end(outputQueue); + yield* Ref.set(terminalGraceFiberRef, Option.none()); + }).pipe(Effect.forkScoped); + + yield* Ref.set(terminalGraceFiberRef, Option.some(graceFiber)); + }); + + const runStream = agent.stream(streamOptions).pipe( + Stream.runFoldEffect( + { + plan: initial, + allTerminalSince: undefined, + } satisfies ExecutorAccumState, + (state, part) => + Effect.gen(function* () { + const updated = state.plan.addEvent(part); + const terminalTimestamp = resolveTerminalTimestamp(updated, state.allTerminalSince); + const finalized = + terminalTimestamp !== undefined && + !updated.hasRunFinished && + Date.now() - terminalTimestamp >= ALL_STEPS_TERMINAL_GRACE_MS + ? updated.synthesizeRunFinished() + : updated; + + yield* emit(finalized); + yield* updateTerminalGraceWatcher(finalized); + if (finalized.hasRunFinished) { + yield* Queue.end(outputQueue); + } + + return { + plan: finalized, + allTerminalSince: terminalTimestamp, + } satisfies ExecutorAccumState; + }), + ), + Effect.flatMap((finalState) => + Effect.gen(function* () { + const finalizedPlan = finalState.plan.finalizeTextBlock(); + const finalizedTerminalTimestamp = resolveTerminalTimestamp( + finalizedPlan, + finalState.allTerminalSince, + ); + const completedPlan = + finalizedTerminalTimestamp !== undefined && !finalizedPlan.hasRunFinished + ? finalizedPlan.synthesizeRunFinished() + : finalizedPlan; + + if (completedPlan !== finalState.plan) { + yield* emit(completedPlan); + } + yield* updateTerminalGraceWatcher(completedPlan); + yield* Queue.end(outputQueue); + }), + ), + Effect.catchTag("AcpStreamError", (reason) => + Queue.fail(outputQueue, new ExecutionError({ reason })), + ), + Effect.catchTag("AcpSessionCreateError", (reason) => + Queue.fail(outputQueue, new ExecutionError({ reason })), + ), + Effect.catchTag("AcpProviderUnauthenticatedError", (reason) => + Queue.fail(outputQueue, new ExecutionError({ reason })), + ), + Effect.catchTag("AcpProviderUsageLimitError", (reason) => + Queue.fail(outputQueue, new ExecutionError({ reason })), + ), + Effect.forkScoped, + ); + + yield* runStream; + + return Stream.fromQueue(outputQueue); + }).pipe(Stream.unwrap); }, Stream.unwrap); return { execute } as const; From 08e87cd50876c47045e35196ba648afac5c7e484 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 29 Mar 2026 22:42:47 +0000 Subject: [PATCH 2/5] Improve executor completion watchdog Co-authored-by: Aiden Bai --- packages/supervisor/src/executor.ts | 99 ++++++++++++++--------------- 1 file changed, 47 insertions(+), 52 deletions(-) diff --git a/packages/supervisor/src/executor.ts b/packages/supervisor/src/executor.ts index 3a532eaed..e2b179a50 100644 --- a/packages/supervisor/src/executor.ts +++ b/packages/supervisor/src/executor.ts @@ -7,7 +7,7 @@ import { Agent, AgentStreamOptions, } from "@expect/agent"; -import { Effect, Fiber, Layer, Option, Queue, Ref, Schema, ServiceMap, Stream } from "effect"; +import { Cause, Effect, Fiber, Layer, Option, Queue, Ref, Schema, ServiceMap, Stream } from "effect"; import { type ChangesFor, type ChangedFile, @@ -170,48 +170,49 @@ export class Executor extends ServiceMap.Service()("@supervisor/Execut mcpEnv, }); - return yield* Effect.gen(function* () { - const outputQueue = yield* Queue.unbounded(); - const terminalGraceFiberRef = yield* Ref.make>>( - Option.none(), - ); + const outputQueue = yield* Queue.unbounded>(); + const terminalGraceFiberRef = yield* Ref.make>>(Option.none()); - const emit = (executed: ExecutedTestPlan) => Queue.offer(outputQueue, executed); - yield* emit(initial); - const cancelTerminalGraceWatcher = Effect.gen(function* () { - const currentFiber = yield* Ref.get(terminalGraceFiberRef); - if (Option.isNone(currentFiber)) return; - yield* Fiber.interrupt(currentFiber.value); - yield* Ref.set(terminalGraceFiberRef, Option.none()); - }); - const updateTerminalGraceWatcher = (executed: ExecutedTestPlan) => - Effect.gen(function* () { - if (executed.hasRunFinished || !executed.allStepsTerminal) { - yield* cancelTerminalGraceWatcher; - return; - } + const emit = (executed: ExecutedTestPlan) => Queue.offer(outputQueue, executed); + yield* emit(initial); + + const cancelTerminalGraceWatcher = Effect.gen(function* () { + const currentFiber = yield* Ref.get(terminalGraceFiberRef); + if (Option.isNone(currentFiber)) return; + yield* Fiber.interrupt(currentFiber.value); + yield* Ref.set(terminalGraceFiberRef, Option.none()); + }); - const currentFiber = yield* Ref.get(terminalGraceFiberRef); - if (Option.isSome(currentFiber)) { - return; - } + const updateTerminalGraceWatcher = (executed: ExecutedTestPlan) => + Effect.gen(function* () { + if (executed.hasRunFinished || !executed.allStepsTerminal) { + yield* cancelTerminalGraceWatcher; + return; + } - const graceFiber = yield* Effect.gen(function* () { - yield* Effect.sleep(`${ALL_STEPS_TERMINAL_GRACE_MS} millis`); - yield* emit(executed.synthesizeRunFinished()); - yield* Queue.end(outputQueue); - yield* Ref.set(terminalGraceFiberRef, Option.none()); - }).pipe(Effect.forkScoped); + const currentFiber = yield* Ref.get(terminalGraceFiberRef); + if (Option.isSome(currentFiber)) { + return; + } - yield* Ref.set(terminalGraceFiberRef, Option.some(graceFiber)); - }); + const graceFiber = yield* Effect.gen(function* () { + yield* Effect.sleep(`${ALL_STEPS_TERMINAL_GRACE_MS} millis`); + yield* emit(executed.synthesizeRunFinished()); + yield* Queue.end(outputQueue); + yield* Ref.set(terminalGraceFiberRef, Option.none()); + }).pipe(Effect.forkScoped); - const runStream = agent.stream(streamOptions).pipe( + yield* Ref.set(terminalGraceFiberRef, Option.some(graceFiber)); + }); + + yield* agent + .stream(streamOptions) + .pipe( Stream.runFoldEffect( - { + (): ExecutorAccumState => ({ plan: initial, allTerminalSince: undefined, - } satisfies ExecutorAccumState, + }), (state, part) => Effect.gen(function* () { const updated = state.plan.addEvent(part); @@ -232,7 +233,7 @@ export class Executor extends ServiceMap.Service()("@supervisor/Execut return { plan: finalized, allTerminalSince: terminalTimestamp, - } satisfies ExecutorAccumState; + }; }), ), Effect.flatMap((finalState) => @@ -254,25 +255,19 @@ export class Executor extends ServiceMap.Service()("@supervisor/Execut yield* Queue.end(outputQueue); }), ), - Effect.catchTag("AcpStreamError", (reason) => - Queue.fail(outputQueue, new ExecutionError({ reason })), - ), - Effect.catchTag("AcpSessionCreateError", (reason) => - Queue.fail(outputQueue, new ExecutionError({ reason })), - ), - Effect.catchTag("AcpProviderUnauthenticatedError", (reason) => - Queue.fail(outputQueue, new ExecutionError({ reason })), - ), - Effect.catchTag("AcpProviderUsageLimitError", (reason) => - Queue.fail(outputQueue, new ExecutionError({ reason })), - ), + Effect.catchTags({ + AcpStreamError: (reason) => Queue.fail(outputQueue, new ExecutionError({ reason })), + AcpSessionCreateError: (reason) => + Queue.fail(outputQueue, new ExecutionError({ reason })), + AcpProviderUnauthenticatedError: (reason) => + Queue.fail(outputQueue, new ExecutionError({ reason })), + AcpProviderUsageLimitError: (reason) => + Queue.fail(outputQueue, new ExecutionError({ reason })), + }), Effect.forkScoped, ); - yield* runStream; - - return Stream.fromQueue(outputQueue); - }).pipe(Stream.unwrap); + return Stream.fromQueue(outputQueue); }, Stream.unwrap); return { execute } as const; From 3eb499bae41b60cab376fe736a6c506d8624fcfe Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 29 Mar 2026 22:44:46 +0000 Subject: [PATCH 3/5] Format shared marker handling changes Co-authored-by: Aiden Bai --- packages/shared/src/models.ts | 12 ++++++++++-- packages/shared/tests/dynamic-steps.test.ts | 8 ++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/packages/shared/src/models.ts b/packages/shared/src/models.ts index 2e8e07596..8e4fc6ad6 100644 --- a/packages/shared/src/models.ts +++ b/packages/shared/src/models.ts @@ -832,7 +832,11 @@ export class ExecutedTestPlan extends TestPlan.extend( if (update.content.type !== "text" || update.content.text === undefined) return this; const lastEvent = this.events.at(-1); if (lastEvent?._tag === "AgentThinking") { - const processed = processTextBlock("AgentThinking", lastEvent.text + update.content.text, false); + const processed = processTextBlock( + "AgentThinking", + lastEvent.text + update.content.text, + false, + ); const withEvents = new ExecutedTestPlan({ ...this, events: [...this.events.slice(0, -1), ...processed.events], @@ -852,7 +856,11 @@ export class ExecutedTestPlan extends TestPlan.extend( if (update.content.type !== "text" || update.content.text === undefined) return this; const lastEvent = this.events.at(-1); if (lastEvent?._tag === "AgentText") { - const processed = processTextBlock("AgentText", lastEvent.text + update.content.text, false); + const processed = processTextBlock( + "AgentText", + lastEvent.text + update.content.text, + false, + ); const withEvents = new ExecutedTestPlan({ ...this, events: [...this.events.slice(0, -1), ...processed.events], diff --git a/packages/shared/tests/dynamic-steps.test.ts b/packages/shared/tests/dynamic-steps.test.ts index 93242eaf9..f3037098c 100644 --- a/packages/shared/tests/dynamic-steps.test.ts +++ b/packages/shared/tests/dynamic-steps.test.ts @@ -266,7 +266,9 @@ describe("dynamic step discovery", () => { content: { type: "text" as const, text }, }); - executed = executed.addEvent(chunk("Planning next action...\nSTEP_START|step-01|Open login page\n")); + executed = executed.addEvent( + chunk("Planning next action...\nSTEP_START|step-01|Open login page\n"), + ); expect(executed.steps.length).toBe(1); expect(executed.steps[0].title).toBe("Open login page"); @@ -280,7 +282,9 @@ describe("dynamic step discovery", () => { expect(agentTextEvents[0].text).toBe("Planning next action...\n"); expect(agentTextEvents[0].text.includes("STEP_START|step-01|Open login page")).toBe(false); - executed = executed.addEvent(chunk("STEP_DONE|step-01|Login page loaded\nRUN_COMPLETED|passed|Completed successfully\n")); + executed = executed.addEvent( + chunk("STEP_DONE|step-01|Login page loaded\nRUN_COMPLETED|passed|Completed successfully\n"), + ); expect(executed.steps[0].status).toBe("passed"); expect(executed.hasRunFinished).toBe(true); From 1823ec1bd03092999199a95efbe4907238c95b69 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 29 Mar 2026 22:46:41 +0000 Subject: [PATCH 4/5] Format executor completion changes Co-authored-by: Aiden Bai --- packages/supervisor/src/executor.ts | 132 +++++++++++++++------------- 1 file changed, 73 insertions(+), 59 deletions(-) diff --git a/packages/supervisor/src/executor.ts b/packages/supervisor/src/executor.ts index e2b179a50..458b6c0ff 100644 --- a/packages/supervisor/src/executor.ts +++ b/packages/supervisor/src/executor.ts @@ -7,7 +7,18 @@ import { Agent, AgentStreamOptions, } from "@expect/agent"; -import { Cause, Effect, Fiber, Layer, Option, Queue, Ref, Schema, ServiceMap, Stream } from "effect"; +import { + Cause, + Effect, + Fiber, + Layer, + Option, + Queue, + Ref, + Schema, + ServiceMap, + Stream, +} from "effect"; import { type ChangesFor, type ChangedFile, @@ -170,8 +181,13 @@ export class Executor extends ServiceMap.Service()("@supervisor/Execut mcpEnv, }); - const outputQueue = yield* Queue.unbounded>(); - const terminalGraceFiberRef = yield* Ref.make>>(Option.none()); + const outputQueue = yield* Queue.unbounded< + ExecutedTestPlan, + ExecutionError | Cause.Done + >(); + const terminalGraceFiberRef = yield* Ref.make>>( + Option.none(), + ); const emit = (executed: ExecutedTestPlan) => Queue.offer(outputQueue, executed); yield* emit(initial); @@ -205,67 +221,65 @@ export class Executor extends ServiceMap.Service()("@supervisor/Execut yield* Ref.set(terminalGraceFiberRef, Option.some(graceFiber)); }); - yield* agent - .stream(streamOptions) - .pipe( - Stream.runFoldEffect( - (): ExecutorAccumState => ({ - plan: initial, - allTerminalSince: undefined, - }), - (state, part) => - Effect.gen(function* () { - const updated = state.plan.addEvent(part); - const terminalTimestamp = resolveTerminalTimestamp(updated, state.allTerminalSince); - const finalized = - terminalTimestamp !== undefined && - !updated.hasRunFinished && - Date.now() - terminalTimestamp >= ALL_STEPS_TERMINAL_GRACE_MS - ? updated.synthesizeRunFinished() - : updated; - - yield* emit(finalized); - yield* updateTerminalGraceWatcher(finalized); - if (finalized.hasRunFinished) { - yield* Queue.end(outputQueue); - } - - return { - plan: finalized, - allTerminalSince: terminalTimestamp, - }; - }), - ), - Effect.flatMap((finalState) => + yield* agent.stream(streamOptions).pipe( + Stream.runFoldEffect( + (): ExecutorAccumState => ({ + plan: initial, + allTerminalSince: undefined, + }), + (state, part) => Effect.gen(function* () { - const finalizedPlan = finalState.plan.finalizeTextBlock(); - const finalizedTerminalTimestamp = resolveTerminalTimestamp( - finalizedPlan, - finalState.allTerminalSince, - ); - const completedPlan = - finalizedTerminalTimestamp !== undefined && !finalizedPlan.hasRunFinished - ? finalizedPlan.synthesizeRunFinished() - : finalizedPlan; + const updated = state.plan.addEvent(part); + const terminalTimestamp = resolveTerminalTimestamp(updated, state.allTerminalSince); + const finalized = + terminalTimestamp !== undefined && + !updated.hasRunFinished && + Date.now() - terminalTimestamp >= ALL_STEPS_TERMINAL_GRACE_MS + ? updated.synthesizeRunFinished() + : updated; - if (completedPlan !== finalState.plan) { - yield* emit(completedPlan); + yield* emit(finalized); + yield* updateTerminalGraceWatcher(finalized); + if (finalized.hasRunFinished) { + yield* Queue.end(outputQueue); } - yield* updateTerminalGraceWatcher(completedPlan); - yield* Queue.end(outputQueue); + + return { + plan: finalized, + allTerminalSince: terminalTimestamp, + }; }), - ), - Effect.catchTags({ - AcpStreamError: (reason) => Queue.fail(outputQueue, new ExecutionError({ reason })), - AcpSessionCreateError: (reason) => - Queue.fail(outputQueue, new ExecutionError({ reason })), - AcpProviderUnauthenticatedError: (reason) => - Queue.fail(outputQueue, new ExecutionError({ reason })), - AcpProviderUsageLimitError: (reason) => - Queue.fail(outputQueue, new ExecutionError({ reason })), + ), + Effect.flatMap((finalState) => + Effect.gen(function* () { + const finalizedPlan = finalState.plan.finalizeTextBlock(); + const finalizedTerminalTimestamp = resolveTerminalTimestamp( + finalizedPlan, + finalState.allTerminalSince, + ); + const completedPlan = + finalizedTerminalTimestamp !== undefined && !finalizedPlan.hasRunFinished + ? finalizedPlan.synthesizeRunFinished() + : finalizedPlan; + + if (completedPlan !== finalState.plan) { + yield* emit(completedPlan); + } + yield* updateTerminalGraceWatcher(completedPlan); + yield* Queue.end(outputQueue); }), - Effect.forkScoped, - ); + ), + Effect.catchTags({ + AcpStreamError: (reason) => Queue.fail(outputQueue, new ExecutionError({ reason })), + AcpSessionCreateError: (reason) => + Queue.fail(outputQueue, new ExecutionError({ reason })), + AcpProviderUnauthenticatedError: (reason) => + Queue.fail(outputQueue, new ExecutionError({ reason })), + AcpProviderUsageLimitError: (reason) => + Queue.fail(outputQueue, new ExecutionError({ reason })), + }), + Effect.forkScoped, + ); return Stream.fromQueue(outputQueue); }, Stream.unwrap); From 0394efb4fee135fcdfa12fe1539b98e0de7fbcb8 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 29 Mar 2026 23:28:16 +0000 Subject: [PATCH 5/5] Handle leaked markers and JS-only playwright prompts Co-authored-by: Aiden Bai --- packages/shared/src/models.ts | 60 +++++++++++++++++++-- packages/shared/src/prompts.ts | 1 + packages/shared/tests/dynamic-steps.test.ts | 46 ++++++++++++++++ packages/shared/tests/prompts.test.ts | 7 +++ 4 files changed, 110 insertions(+), 4 deletions(-) diff --git a/packages/shared/src/models.ts b/packages/shared/src/models.ts index 8e4fc6ad6..cc66ae142 100644 --- a/packages/shared/src/models.ts +++ b/packages/shared/src/models.ts @@ -676,12 +676,53 @@ interface ProcessedTextBlock { readonly markers: readonly ExecutionEvent[]; } +interface MarkerMatch { + readonly marker: ExecutionEvent; + readonly start: number; + readonly end: number; +} + const buildTextEvent = ( textEventTag: "AgentText" | "AgentThinking", text: string, ): AgentText | AgentThinking => textEventTag === "AgentText" ? new AgentText({ text }) : new AgentThinking({ text }); +const MARKER_PREFIXES = [ + "STEP_START|", + "STEP_DONE|", + "ASSERTION_FAILED|", + "STEP_SKIPPED|", + "RUN_COMPLETED|", +] as const; + +const markerBoundaryAfter = (line: string, start: number): number => { + let earliestBoundary = line.length; + for (const prefix of MARKER_PREFIXES) { + const nextBoundary = line.indexOf(prefix, start + 1); + if (nextBoundary !== -1 && nextBoundary < earliestBoundary) { + earliestBoundary = nextBoundary; + } + } + return earliestBoundary; +}; + +const findMarkerInLine = (line: string): MarkerMatch | undefined => { + for (const prefix of MARKER_PREFIXES) { + const start = line.indexOf(prefix); + if (start === -1) continue; + const end = markerBoundaryAfter(line, start); + const marker = parseMarker(line.slice(start, end)); + if (marker === undefined) continue; + return { + marker, + start, + end, + }; + } + return undefined; +}; + const processTextBlock = ( textEventTag: "AgentText" | "AgentThinking", text: string, @@ -698,11 +739,22 @@ const processTextBlock = ( }; const processLine = (line: string, hasTrailingNewline: boolean) => { - const marker = parseMarker(line); - if (marker !== undefined) { + const markerMatch = findMarkerInLine(line); + if (markerMatch !== undefined) { + const prefixText = line.slice(0, markerMatch.start); + if (prefixText.length > 0) { + bufferedText += prefixText; + if (hasTrailingNewline) { + bufferedText += "\n"; + } + } flushBufferedText(); - events.push(marker); - markers.push(marker); + events.push(markerMatch.marker); + markers.push(markerMatch.marker); + const suffixText = line.slice(markerMatch.end); + if (suffixText.length > 0) { + processLine(suffixText, hasTrailingNewline); + } return; } bufferedText += line; diff --git a/packages/shared/src/prompts.ts b/packages/shared/src/prompts.ts index b2c0b1cc2..b52c1b4d0 100644 --- a/packages/shared/src/prompts.ts +++ b/packages/shared/src/prompts.ts @@ -121,6 +121,7 @@ export const buildExecutionPrompt = (options: ExecutionPromptOptions): string => "", "1. open — Launch a browser and navigate to a URL.", "2. playwright — Execute Playwright code in Node. Globals: page (Page), context (BrowserContext), browser (Browser), ref(id) (resolves a snapshot ref like 'e4' to a Playwright Locator). Supports await. Return a value to get it back as JSON.", + " IMPORTANT: playwright snippets run as plain JavaScript, not TypeScript. Never use TS-only syntax such as `as`, type annotations, interfaces, enums, or generics inside a playwright call.", "3. screenshot — Capture page state. Set mode: 'snapshot' (ARIA accessibility tree, default and preferred), 'screenshot' (PNG image), or 'annotated' (PNG with numbered labels on interactive elements).", "4. console_logs — Get browser console messages. Filter by type ('error', 'warning', 'log'). Use after navigation or interactions to catch errors.", "5. network_requests — Get captured network requests. Filter by method, URL substring, or resource type ('xhr', 'fetch', 'document').", diff --git a/packages/shared/tests/dynamic-steps.test.ts b/packages/shared/tests/dynamic-steps.test.ts index f3037098c..1e24d97f8 100644 --- a/packages/shared/tests/dynamic-steps.test.ts +++ b/packages/shared/tests/dynamic-steps.test.ts @@ -325,6 +325,52 @@ describe("dynamic step discovery", () => { ); expect(remainingText).toHaveLength(0); }); + + it("splits leaked inline markers from the preceding summary text", () => { + let executed = makeEmptyExecuted(); + + const chunk = (text: string) => + new AcpAgentMessageChunk({ + sessionUpdate: "agent_message_chunk", + content: { type: "text" as const, text }, + }); + + executed = executed.addEvent( + chunk( + "Recovered after hiccupSTEP_START|step-04|Retry submission with corrected JS payload\n", + ), + ); + + expect(executed.steps).toHaveLength(1); + expect(executed.steps[0].id).toBe("step-04"); + expect(executed.steps[0].title).toBe("Retry submission with corrected JS payload"); + + const textEvents = executed.events.filter( + (event): event is AgentText => event._tag === "AgentText", + ); + expect(textEvents).toHaveLength(1); + expect(textEvents[0].text).toBe("Recovered after hiccup\n"); + + executed = executed.addEvent( + chunk( + "Corrected payload capturedSTEP_DONE|step-04|Submission confirmedSTEP_START|step-05|Verify success state remains stable\n", + ), + ); + + expect(executed.steps).toHaveLength(2); + expect(executed.steps[0].status).toBe("active"); + expect(executed.steps[1].status).toBe("active"); + expect(executed.steps[1].title).toBe("Verify success state remains stable"); + + const updatedTextEvents = executed.events.filter( + (event): event is AgentText => event._tag === "AgentText", + ); + expect(updatedTextEvents).toHaveLength(2); + expect(updatedTextEvents[1].text).toBe( + "Corrected payload capturedSTEP_DONE|step-04|Submission confirmed\n", + ); + expect(updatedTextEvents[1].text.includes("STEP_START|")).toBe(false); + }); }); describe("run completion detection", () => { diff --git a/packages/shared/tests/prompts.test.ts b/packages/shared/tests/prompts.test.ts index f92f1248e..065d4ee74 100644 --- a/packages/shared/tests/prompts.test.ts +++ b/packages/shared/tests/prompts.test.ts @@ -41,6 +41,13 @@ describe("buildExecutionPrompt", () => { expect(prompt).toContain("close — Close the browser"); }); + it("requires plain JavaScript in playwright snippets", () => { + const prompt = buildExecutionPrompt(makeDefaultOptions()); + expect(prompt).toContain("playwright snippets run as plain JavaScript, not TypeScript"); + expect(prompt).toContain("Never use TS-only syntax"); + expect(prompt).toContain("`as`, type annotations, interfaces, enums, or generics"); + }); + it("includes step marker protocol", () => { const prompt = buildExecutionPrompt(makeDefaultOptions()); expect(prompt).toContain("STEP_START||");