diff --git a/apps/cli/src/utils/run-test.ts b/apps/cli/src/utils/run-test.ts index 58c33c104..b785d8db3 100644 --- a/apps/cli/src/utils/run-test.ts +++ b/apps/cli/src/utils/run-test.ts @@ -94,21 +94,24 @@ export const runHeadless = (options: HeadlessRunOptions) => for (const event of executed.events) { if (seenEvents.has(event.id)) continue; seenEvents.add(event.id); - lastOutputAt = Date.now(); switch (event._tag) { case "RunStarted": + lastOutputAt = Date.now(); ciReporter.planTitle(event.plan.title, Option.getOrUndefined(event.plan.baseUrl)); break; case "StepStarted": + lastOutputAt = Date.now(); ciReporter.stepStarted(event.title); break; case "StepCompleted": { + lastOutputAt = Date.now(); const step = executed.steps.find((step) => step.id === event.stepId); const elapsed = step ? getStepElapsedMs(step) : undefined; ciReporter.stepCompleted(event.summary, elapsed); break; } case "StepFailed": { + lastOutputAt = Date.now(); const failedStep = executed.steps.find((step) => step.id === event.stepId); const failedTitle = failedStep?.title ?? event.stepId; const failedElapsed = failedStep ? getStepElapsedMs(failedStep) : undefined; @@ -116,6 +119,7 @@ export const runHeadless = (options: HeadlessRunOptions) => break; } case "StepSkipped": { + lastOutputAt = Date.now(); const skippedStep = executed.steps.find((step) => step.id === event.stepId); const skippedTitle = skippedStep?.title ?? event.stepId; ciReporter.stepSkipped(skippedTitle, event.reason); diff --git a/apps/cli/tests/ci-reporter.test.ts b/apps/cli/tests/ci-reporter.test.ts index eaa6e6b08..00505984d 100644 --- a/apps/cli/tests/ci-reporter.test.ts +++ b/apps/cli/tests/ci-reporter.test.ts @@ -313,5 +313,19 @@ describe("createCiReporter", () => { expect(output).toContain("Still running"); expect(output).toContain("2m"); }); + + it("remains the only visible progress signal when no step events are printed", () => { + const reporter = createCiReporter({ + version: "1.0.0", + agent: "claude", + timeoutMs: undefined, + isGitHubActions: false, + }); + reporter.heartbeat(120_000); + const output = stderrText(); + expect(output).toContain("Still running"); + expect(output).not.toContain("STEP_START|"); + expect(output).not.toContain("RUN_COMPLETED|"); + }); }); }); diff --git a/packages/shared/src/models.ts b/packages/shared/src/models.ts index 71a0e364d..cc66ae142 100644 --- a/packages/shared/src/models.ts +++ b/packages/shared/src/models.ts @@ -594,14 +594,16 @@ const serializeToolResult = (value: unknown): string => { }; const parseMarker = (line: string): ExecutionEvent | undefined => { - const pipeIndex = line.indexOf("|"); + const normalizedLine = line.endsWith("\r") ? line.slice(0, -1) : line; + const pipeIndex = normalizedLine.indexOf("|"); if (pipeIndex === -1) return undefined; - const marker = line.slice(0, pipeIndex); - const rest = line.slice(pipeIndex + 1); + const marker = normalizedLine.slice(0, pipeIndex); + const rest = normalizedLine.slice(pipeIndex + 1); const secondPipeIndex = rest.indexOf("|"); - const first = secondPipeIndex === -1 ? rest : rest.slice(0, secondPipeIndex); - const second = secondPipeIndex === -1 ? "" : rest.slice(secondPipeIndex + 1); + if (secondPipeIndex === -1) return undefined; + const first = rest.slice(0, secondPipeIndex); + const second = rest.slice(secondPipeIndex + 1); if (marker === "STEP_START") { return new StepStarted({ stepId: StepId.makeUnsafe(first), title: second }); @@ -669,6 +671,135 @@ export class Update extends Schema.Class("@supervisor/Update")({ receivedAt: Schema.DateTimeUtc, }) {} +interface ProcessedTextBlock { + readonly events: readonly ExecutionEvent[]; + readonly markers: readonly ExecutionEvent[]; +} + +interface MarkerMatch { + readonly marker: ExecutionEvent; + readonly start: number; + readonly end: number; +} + +const buildTextEvent = ( + textEventTag: "AgentText" | "AgentThinking", + text: string, +): AgentText | AgentThinking => + textEventTag === "AgentText" ? new AgentText({ text }) : new AgentThinking({ text }); + +const MARKER_PREFIXES = [ + "STEP_START|", + "STEP_DONE|", + "ASSERTION_FAILED|", + "STEP_SKIPPED|", + "RUN_COMPLETED|", +] as const; + +const markerBoundaryAfter = (line: string, start: number): number => { + let earliestBoundary = line.length; + for (const prefix of MARKER_PREFIXES) { + const nextBoundary = line.indexOf(prefix, start + 1); + if (nextBoundary !== -1 && nextBoundary < earliestBoundary) { + earliestBoundary = nextBoundary; + } + } + return earliestBoundary; +}; + +const findMarkerInLine = (line: string): MarkerMatch | undefined => { + for (const prefix of MARKER_PREFIXES) { + const start = line.indexOf(prefix); + if (start === -1) continue; + const end = markerBoundaryAfter(line, start); + const marker = parseMarker(line.slice(start, end)); + if (marker === undefined) continue; + return { + marker, + start, + end, + }; + } + return undefined; +}; + +const processTextBlock = ( + textEventTag: "AgentText" | "AgentThinking", + text: string, + includeTrailingPartialLine: boolean, +): ProcessedTextBlock => { + const events: ExecutionEvent[] = []; + const markers: ExecutionEvent[] = []; + let bufferedText = ""; + + const flushBufferedText = () => { + if (bufferedText.length === 0) return; + events.push(buildTextEvent(textEventTag, bufferedText)); + bufferedText = ""; + }; + + const processLine = (line: string, hasTrailingNewline: boolean) => { + const markerMatch = findMarkerInLine(line); + if (markerMatch !== undefined) { + const prefixText = line.slice(0, markerMatch.start); + if (prefixText.length > 0) { + bufferedText += prefixText; + if (hasTrailingNewline) { + bufferedText += "\n"; + } + } + flushBufferedText(); + events.push(markerMatch.marker); + markers.push(markerMatch.marker); + const suffixText = line.slice(markerMatch.end); + if (suffixText.length > 0) { + processLine(suffixText, hasTrailingNewline); + } + return; + } + bufferedText += line; + if (hasTrailingNewline) { + bufferedText += "\n"; + } + }; + + let completeLines: readonly string[] = []; + let trailingLine = ""; + + if (text.endsWith("\n")) { + completeLines = text.slice(0, -1).split("\n"); + } else { + const parts = text.split("\n"); + trailingLine = parts.pop() ?? ""; + completeLines = parts; + } + + for (const completeLine of completeLines) { + processLine(completeLine, true); + } + + if (includeTrailingPartialLine) { + processLine(trailingLine, false); + } else { + bufferedText += trailingLine; + } + + flushBufferedText(); + + return { events, markers }; +}; + +const applyMarkersToPlan = ( + executed: ExecutedTestPlan, + markers: readonly ExecutionEvent[], +): ExecutedTestPlan => { + let result = executed; + for (const marker of markers) { + result = result.applyMarker(marker); + } + return result; +}; + export class PullRequest extends Schema.Class("@supervisor/PullRequest")({ number: Schema.Number, url: Schema.String, @@ -753,38 +884,48 @@ export class ExecutedTestPlan extends TestPlan.extend( if (update.content.type !== "text" || update.content.text === undefined) return this; const lastEvent = this.events.at(-1); if (lastEvent?._tag === "AgentThinking") { - return new ExecutedTestPlan({ + const processed = processTextBlock( + "AgentThinking", + lastEvent.text + update.content.text, + false, + ); + const withEvents = new ExecutedTestPlan({ ...this, - events: [ - ...this.events.slice(0, -1), - new AgentThinking({ text: lastEvent.text + update.content.text }), - ], + events: [...this.events.slice(0, -1), ...processed.events], }); + return applyMarkersToPlan(withEvents, processed.markers); } const base = this.finalizeTextBlock(); - return new ExecutedTestPlan({ + const processed = processTextBlock("AgentThinking", update.content.text, false); + const withEvents = new ExecutedTestPlan({ ...base, - events: [...base.events, new AgentThinking({ text: update.content.text })], + events: [...base.events, ...processed.events], }); + return applyMarkersToPlan(withEvents, processed.markers); } if (update.sessionUpdate === "agent_message_chunk") { if (update.content.type !== "text" || update.content.text === undefined) return this; const lastEvent = this.events.at(-1); if (lastEvent?._tag === "AgentText") { - return new ExecutedTestPlan({ + const processed = processTextBlock( + "AgentText", + lastEvent.text + update.content.text, + false, + ); + const withEvents = new ExecutedTestPlan({ ...this, - events: [ - ...this.events.slice(0, -1), - new AgentText({ text: lastEvent.text + update.content.text }), - ], + events: [...this.events.slice(0, -1), ...processed.events], }); + return applyMarkersToPlan(withEvents, processed.markers); } const base = this.finalizeTextBlock(); - return new ExecutedTestPlan({ + const processed = processTextBlock("AgentText", update.content.text, false); + const withEvents = new ExecutedTestPlan({ ...base, - events: [...base.events, new AgentText({ text: update.content.text })], + events: [...base.events, ...processed.events], }); + return applyMarkersToPlan(withEvents, processed.markers); } if (update.sessionUpdate === "tool_call") { @@ -859,19 +1000,13 @@ export class ExecutedTestPlan extends TestPlan.extend( finalizeTextBlock(): ExecutedTestPlan { const lastEvent = this.events.at(-1); if (lastEvent?._tag !== "AgentText" && lastEvent?._tag !== "AgentThinking") return this; - const foundMarkers = lastEvent.text - .split("\n") - .map(parseMarker) - .filter(Predicate.isNotUndefined); - if (foundMarkers.length === 0) return this; - let result: ExecutedTestPlan = new ExecutedTestPlan({ + const processed = processTextBlock(lastEvent._tag, lastEvent.text, true); + if (processed.markers.length === 0) return this; + const withEvents = new ExecutedTestPlan({ ...this, - events: [...this.events, ...foundMarkers], + events: [...this.events.slice(0, -1), ...processed.events], }); - for (const marker of foundMarkers) { - result = result.applyMarker(marker); - } - return result; + return applyMarkersToPlan(withEvents, processed.markers); } applyMarker(marker: ExecutionEvent): ExecutedTestPlan { diff --git a/packages/shared/src/prompts.ts b/packages/shared/src/prompts.ts index b2c0b1cc2..b52c1b4d0 100644 --- a/packages/shared/src/prompts.ts +++ b/packages/shared/src/prompts.ts @@ -121,6 +121,7 @@ export const buildExecutionPrompt = (options: ExecutionPromptOptions): string => "", "1. open — Launch a browser and navigate to a URL.", "2. playwright — Execute Playwright code in Node. Globals: page (Page), context (BrowserContext), browser (Browser), ref(id) (resolves a snapshot ref like 'e4' to a Playwright Locator). Supports await. Return a value to get it back as JSON.", + " IMPORTANT: playwright snippets run as plain JavaScript, not TypeScript. Never use TS-only syntax such as `as`, type annotations, interfaces, enums, or generics inside a playwright call.", "3. screenshot — Capture page state. Set mode: 'snapshot' (ARIA accessibility tree, default and preferred), 'screenshot' (PNG image), or 'annotated' (PNG with numbered labels on interactive elements).", "4. console_logs — Get browser console messages. Filter by type ('error', 'warning', 'log'). Use after navigation or interactions to catch errors.", "5. network_requests — Get captured network requests. Filter by method, URL substring, or resource type ('xhr', 'fetch', 'document').", diff --git a/packages/shared/tests/dynamic-steps.test.ts b/packages/shared/tests/dynamic-steps.test.ts index 516a80ea5..1e24d97f8 100644 --- a/packages/shared/tests/dynamic-steps.test.ts +++ b/packages/shared/tests/dynamic-steps.test.ts @@ -256,6 +256,121 @@ describe("dynamic step discovery", () => { expect(finalized.steps[0].status).toBe("failed"); expect(finalized.hasRunFinished).toBe(true); }); + + it("addEvent parses complete marker lines immediately and strips them from AgentText", () => { + let executed = makeEmptyExecuted(); + + const chunk = (text: string) => + new AcpAgentMessageChunk({ + sessionUpdate: "agent_message_chunk", + content: { type: "text" as const, text }, + }); + + executed = executed.addEvent( + chunk("Planning next action...\nSTEP_START|step-01|Open login page\n"), + ); + + expect(executed.steps.length).toBe(1); + expect(executed.steps[0].title).toBe("Open login page"); + expect(executed.steps[0].status).toBe("active"); + expect(executed.events.some((event) => event._tag === "StepStarted")).toBe(true); + + const agentTextEvents = executed.events.filter( + (event): event is AgentText => event._tag === "AgentText", + ); + expect(agentTextEvents).toHaveLength(1); + expect(agentTextEvents[0].text).toBe("Planning next action...\n"); + expect(agentTextEvents[0].text.includes("STEP_START|step-01|Open login page")).toBe(false); + + executed = executed.addEvent( + chunk("STEP_DONE|step-01|Login page loaded\nRUN_COMPLETED|passed|Completed successfully\n"), + ); + + expect(executed.steps[0].status).toBe("passed"); + expect(executed.hasRunFinished).toBe(true); + const runFinished = executed.events.find( + (event): event is RunFinished => event._tag === "RunFinished", + ); + expect(runFinished?.summary).toBe("Completed successfully"); + + const textAfterCompletion = executed.events.filter( + (event): event is AgentText => event._tag === "AgentText", + ); + expect(textAfterCompletion).toHaveLength(1); + expect(textAfterCompletion[0].text).toBe("Planning next action...\n"); + }); + + it("keeps partial marker text buffered until the line is complete", () => { + let executed = makeEmptyExecuted(); + + const chunk = (text: string) => + new AcpAgentMessageChunk({ + sessionUpdate: "agent_message_chunk", + content: { type: "text" as const, text }, + }); + + executed = executed.addEvent(chunk("STEP_START|step-01|Open")); + expect(executed.steps).toHaveLength(0); + expect(executed.hasRunFinished).toBe(false); + const partialText = executed.events.findLast( + (event): event is AgentText => event._tag === "AgentText", + ); + expect(partialText?.text).toBe("STEP_START|step-01|Open"); + + executed = executed.addEvent(chunk(" login page\n")); + expect(executed.steps).toHaveLength(1); + expect(executed.steps[0].title).toBe("Open login page"); + const remainingText = executed.events.filter( + (event): event is AgentText => event._tag === "AgentText", + ); + expect(remainingText).toHaveLength(0); + }); + + it("splits leaked inline markers from the preceding summary text", () => { + let executed = makeEmptyExecuted(); + + const chunk = (text: string) => + new AcpAgentMessageChunk({ + sessionUpdate: "agent_message_chunk", + content: { type: "text" as const, text }, + }); + + executed = executed.addEvent( + chunk( + "Recovered after hiccupSTEP_START|step-04|Retry submission with corrected JS payload\n", + ), + ); + + expect(executed.steps).toHaveLength(1); + expect(executed.steps[0].id).toBe("step-04"); + expect(executed.steps[0].title).toBe("Retry submission with corrected JS payload"); + + const textEvents = executed.events.filter( + (event): event is AgentText => event._tag === "AgentText", + ); + expect(textEvents).toHaveLength(1); + expect(textEvents[0].text).toBe("Recovered after hiccup\n"); + + executed = executed.addEvent( + chunk( + "Corrected payload capturedSTEP_DONE|step-04|Submission confirmedSTEP_START|step-05|Verify success state remains stable\n", + ), + ); + + expect(executed.steps).toHaveLength(2); + expect(executed.steps[0].status).toBe("active"); + expect(executed.steps[1].status).toBe("active"); + expect(executed.steps[1].title).toBe("Verify success state remains stable"); + + const updatedTextEvents = executed.events.filter( + (event): event is AgentText => event._tag === "AgentText", + ); + expect(updatedTextEvents).toHaveLength(2); + expect(updatedTextEvents[1].text).toBe( + "Corrected payload capturedSTEP_DONE|step-04|Submission confirmed\n", + ); + expect(updatedTextEvents[1].text.includes("STEP_START|")).toBe(false); + }); }); describe("run completion detection", () => { diff --git a/packages/shared/tests/prompts.test.ts b/packages/shared/tests/prompts.test.ts index f92f1248e..065d4ee74 100644 --- a/packages/shared/tests/prompts.test.ts +++ b/packages/shared/tests/prompts.test.ts @@ -41,6 +41,13 @@ describe("buildExecutionPrompt", () => { expect(prompt).toContain("close — Close the browser"); }); + it("requires plain JavaScript in playwright snippets", () => { + const prompt = buildExecutionPrompt(makeDefaultOptions()); + expect(prompt).toContain("playwright snippets run as plain JavaScript, not TypeScript"); + expect(prompt).toContain("Never use TS-only syntax"); + expect(prompt).toContain("`as`, type annotations, interfaces, enums, or generics"); + }); + it("includes step marker protocol", () => { const prompt = buildExecutionPrompt(makeDefaultOptions()); expect(prompt).toContain("STEP_START||"); diff --git a/packages/supervisor/src/executor.ts b/packages/supervisor/src/executor.ts index 86f4e6d46..458b6c0ff 100644 --- a/packages/supervisor/src/executor.ts +++ b/packages/supervisor/src/executor.ts @@ -7,7 +7,18 @@ import { Agent, AgentStreamOptions, } from "@expect/agent"; -import { Effect, Layer, Option, Schema, ServiceMap, Stream } from "effect"; +import { + Cause, + Effect, + Fiber, + Layer, + Option, + Queue, + Ref, + Schema, + ServiceMap, + Stream, +} from "effect"; import { type ChangesFor, type ChangedFile, @@ -170,28 +181,107 @@ export class Executor extends ServiceMap.Service()("@supervisor/Execut mcpEnv, }); - return agent.stream(streamOptions).pipe( - Stream.mapAccum( + const outputQueue = yield* Queue.unbounded< + ExecutedTestPlan, + ExecutionError | Cause.Done + >(); + const terminalGraceFiberRef = yield* Ref.make>>( + Option.none(), + ); + + const emit = (executed: ExecutedTestPlan) => Queue.offer(outputQueue, executed); + yield* emit(initial); + + const cancelTerminalGraceWatcher = Effect.gen(function* () { + const currentFiber = yield* Ref.get(terminalGraceFiberRef); + if (Option.isNone(currentFiber)) return; + yield* Fiber.interrupt(currentFiber.value); + yield* Ref.set(terminalGraceFiberRef, Option.none()); + }); + + const updateTerminalGraceWatcher = (executed: ExecutedTestPlan) => + Effect.gen(function* () { + if (executed.hasRunFinished || !executed.allStepsTerminal) { + yield* cancelTerminalGraceWatcher; + return; + } + + const currentFiber = yield* Ref.get(terminalGraceFiberRef); + if (Option.isSome(currentFiber)) { + return; + } + + const graceFiber = yield* Effect.gen(function* () { + yield* Effect.sleep(`${ALL_STEPS_TERMINAL_GRACE_MS} millis`); + yield* emit(executed.synthesizeRunFinished()); + yield* Queue.end(outputQueue); + yield* Ref.set(terminalGraceFiberRef, Option.none()); + }).pipe(Effect.forkScoped); + + yield* Ref.set(terminalGraceFiberRef, Option.some(graceFiber)); + }); + + yield* agent.stream(streamOptions).pipe( + Stream.runFoldEffect( (): ExecutorAccumState => ({ plan: initial, allTerminalSince: undefined, }), - (state, part) => { - const updated = state.plan.addEvent(part); - const terminalTimestamp = resolveTerminalTimestamp(updated, state.allTerminalSince); - const finalized = - terminalTimestamp !== undefined && - !updated.hasRunFinished && - Date.now() - terminalTimestamp >= ALL_STEPS_TERMINAL_GRACE_MS - ? updated.synthesizeRunFinished() - : updated; - - return [{ plan: finalized, allTerminalSince: terminalTimestamp }, [finalized]] as const; - }, + (state, part) => + Effect.gen(function* () { + const updated = state.plan.addEvent(part); + const terminalTimestamp = resolveTerminalTimestamp(updated, state.allTerminalSince); + const finalized = + terminalTimestamp !== undefined && + !updated.hasRunFinished && + Date.now() - terminalTimestamp >= ALL_STEPS_TERMINAL_GRACE_MS + ? updated.synthesizeRunFinished() + : updated; + + yield* emit(finalized); + yield* updateTerminalGraceWatcher(finalized); + if (finalized.hasRunFinished) { + yield* Queue.end(outputQueue); + } + + return { + plan: finalized, + allTerminalSince: terminalTimestamp, + }; + }), + ), + Effect.flatMap((finalState) => + Effect.gen(function* () { + const finalizedPlan = finalState.plan.finalizeTextBlock(); + const finalizedTerminalTimestamp = resolveTerminalTimestamp( + finalizedPlan, + finalState.allTerminalSince, + ); + const completedPlan = + finalizedTerminalTimestamp !== undefined && !finalizedPlan.hasRunFinished + ? finalizedPlan.synthesizeRunFinished() + : finalizedPlan; + + if (completedPlan !== finalState.plan) { + yield* emit(completedPlan); + } + yield* updateTerminalGraceWatcher(completedPlan); + yield* Queue.end(outputQueue); + }), ), - Stream.takeUntil((executed) => executed.hasRunFinished), - Stream.mapError((reason) => new ExecutionError({ reason })), + Effect.catchTags({ + AcpStreamError: (reason) => Queue.fail(outputQueue, new ExecutionError({ reason })), + AcpSessionCreateError: (reason) => + Queue.fail(outputQueue, new ExecutionError({ reason })), + AcpProviderUnauthenticatedError: (reason) => + Queue.fail(outputQueue, new ExecutionError({ reason })), + AcpProviderUsageLimitError: (reason) => + Queue.fail(outputQueue, new ExecutionError({ reason })), + }), + Effect.forkScoped, ); + + return Stream.fromQueue(outputQueue); }, Stream.unwrap); return { execute } as const;