diff --git a/assets/aft.schema.json b/assets/aft.schema.json index ecdbafb6..9d6b52b6 100644 --- a/assets/aft.schema.json +++ b/assets/aft.schema.json @@ -157,6 +157,30 @@ "minimum": 5000, "default": 8000, "description": "How long foreground bash blocks before auto-promoting the task to background, in milliseconds. Minimum 5000; values below the floor are clamped up." + }, + "deferred_completion_fallback_ms": { + "type": "integer", + "minimum": 1, + "default": 500, + "description": "Safety-net delay before a completion deferred for same-turn delivery wakes the session without waiting for OpenCode session.idle." + }, + "wake_retry_max_attempts": { + "type": "integer", + "minimum": 1, + "default": 5, + "description": "Maximum prompt-delivery retry attempts before wake delivery hard-stops and leaves completions pending for explicit in-turn drain." + }, + "wake_debounce_step_ms": { + "type": "integer", + "minimum": 1, + "default": 200, + "description": "Initial wake debounce delay and retry backoff floor for background-completion prompts." + }, + "wake_debounce_cap_ms": { + "type": "integer", + "minimum": 1, + "default": 1000, + "description": "Maximum wake debounce delay and retry backoff cap for background-completion prompts." } }, "additionalProperties": false diff --git a/packages/opencode-plugin/src/__tests__/bg-notifications.test.ts b/packages/opencode-plugin/src/__tests__/bg-notifications.test.ts index 1e0d457c..f97de48b 100644 --- a/packages/opencode-plugin/src/__tests__/bg-notifications.test.ts +++ b/packages/opencode-plugin/src/__tests__/bg-notifications.test.ts @@ -48,7 +48,11 @@ mock.module("../logger.js", () => ({ // That's the post-v0.29 behavior introduced when we removed the // `--port 0` nudge — see shared/live-server-client.ts. let liveServerClient: unknown = null; -let lastLiveServerArgs: { serverUrl: string; directory: string } | null = null; +let lastLiveServerArgs: { + serverUrl: string; + directory: string; + headers?: Record; +} | null = null; let liveServerAvailable = true; // Per-URL availability map — must behave like the real // live-server-client implementation so the live-server-client unit @@ -62,24 +66,37 @@ function normalizeServerUrl(serverUrl: string): string { return serverUrl; } } +function serverAuthHeaders(): Record | undefined { + const password = process.env.OPENCODE_SERVER_PASSWORD; + if (!password) return undefined; + const username = process.env.OPENCODE_SERVER_USERNAME ?? "opencode"; + return { + Authorization: `Basic ${Buffer.from(`${username}:${password}`).toString("base64")}`, + }; +} function setTestLiveServerClient(client: unknown): void { liveServerClient = client; } function setTestLiveServerAvailable(available: boolean): void { liveServerAvailable = available; } -function getLastLiveServerArgs(): { serverUrl: string; directory: string } | null { +function getLastLiveServerArgs(): { + serverUrl: string; + directory: string; + headers?: Record; +} | null { return lastLiveServerArgs; } mock.module("../shared/live-server-client.js", () => ({ - getLiveServerClient: (serverUrl: string, directory: string) => { - lastLiveServerArgs = { serverUrl, directory }; + getLiveServerClient: (serverUrl: string, directory: string, headers?: Record) => { + lastLiveServerArgs = { serverUrl, directory, ...(headers ? { headers } : {}) }; if (!liveServerClient) { throw new Error("test did not configure a live-server client via setTestLiveServerClient()"); } return liveServerClient; }, - useLiveServerWake: (serverUrl?: string) => { + useLiveServerWake: (serverUrl?: string, enabled = true) => { + if (!enabled) return false; if (!serverUrl) return liveServerAvailable; const keyed = perUrlAvailability.get(normalizeServerUrl(serverUrl)); if (keyed !== undefined) return keyed; @@ -118,14 +135,17 @@ mock.module("../shared/live-server-client.js", () => ({ perUrlAvailability.clear(); return false; } - // Mirror the real implementation enough that the unit-test fetch stubs - // drive this code path correctly: hit the URL, accept 2xx/401/403, - // reject 404/5xx and network errors. + // Mirror real implementation enough that unit-test fetch stubs drive + // this code path correctly: hit URL, accept only 2xx, reject 401/403, + // 404/5xx, and network errors. let reachable = false; try { const probeUrl = new URL("/session", serverUrl).toString(); - const res = await globalThis.fetch(probeUrl, { method: "GET" }); - reachable = res.ok || res.status === 401 || res.status === 403; + const res = await globalThis.fetch(probeUrl, { + method: "GET", + headers: serverAuthHeaders(), + }); + reachable = res.ok; } catch { reachable = false; } @@ -152,6 +172,7 @@ afterAll(() => { }); import { + DEFERRED_COMPLETION_FALLBACK_MS, __resetBgNotificationStateForTests, appendInTurnBgCompletions, consumeBgCompletion, @@ -159,6 +180,7 @@ import { formatSystemReminder, handleIdleBgCompletions, handlePushedBgCompletion, + handlePushedBgLongRunning, ingestBgCompletions, markBgCompletionDelivered, markExplicitControl, @@ -179,6 +201,7 @@ beforeEach(() => { sessionWarnSpy.mockClear(); liveServerClient = null; lastLiveServerArgs = null; + perUrlAvailability.clear(); // Default to live-server-available so existing tests keep exercising // the workaround path. Tests covering the fallback flip this to false. liveServerAvailable = true; @@ -189,33 +212,46 @@ afterEach(() => { }); /** - * Configure the live-server client mock to return `{ session: { promptAsync } }`, - * optionally with a `messages` stub so prompt-context resolution works. + * Configure live-server client mock. `prompt` is preferred wake method; tests + * may also provide `promptAsync` for compatibility assertions. */ -function installLiveServerClient( - promptAsync: (input: unknown) => Promise | unknown, - messages?: unknown[], -): void { +function installLiveServerClient(options: { + prompt?: (input: unknown) => Promise | unknown; + promptAsync?: (input: unknown) => Promise | unknown; + messages?: unknown[]; +}): void { setTestLiveServerClient({ session: { - promptAsync, - ...(messages !== undefined ? { messages: async () => ({ data: messages }) } : {}), + ...(options.prompt ? { prompt: options.prompt } : {}), + ...(options.promptAsync ? { promptAsync: options.promptAsync } : {}), + ...(options.messages !== undefined + ? { messages: async () => ({ data: options.messages }) } + : {}), }, }); } /** - * Build a stub plugin-context client shaped like OpenCode's `input.client`. - * Returned so individual tests can read `.session.promptAsync.mock.calls` - * to assert whether the in-process wake fallback fired. + * Build stub plugin-context client shaped like OpenCode's `input.client`. + * Returned so tests can inspect `.session.prompt` / `.session.promptAsync`. */ function makeClient( - promptAsync: ReturnType, + methods: { + prompt?: ReturnType; + promptAsync?: ReturnType; + }, messages?: unknown[], -): { session: { promptAsync: typeof promptAsync; messages?: () => Promise<{ data: unknown[] }> } } { +): { + session: { + prompt?: ReturnType; + promptAsync?: ReturnType; + messages?: () => Promise<{ data: unknown[] }>; + }; +} { return { session: { - promptAsync, + ...(methods.prompt ? { prompt: methods.prompt } : {}), + ...(methods.promptAsync ? { promptAsync: methods.promptAsync } : {}), ...(messages !== undefined ? { messages: async () => ({ data: messages }) } : {}), }, }; @@ -258,7 +294,18 @@ describe("OpenCode background notifications", () => { }, ]), ).toBe( - "\n[BACKGROUND BASH COMPLETED]\n- task d2ed3a9e (exit 0, 1m 23s)\n- task 4f5b71c2 (timed out, 30s)\n", + "\n[BACKGROUND BASH FAILED]\n- task 4f5b71c2 (timed out, 30s)\n\n\n[BACKGROUND BASH COMPLETED]\n- task d2ed3a9e (exit 0, 1m 23s)\n", + ); + }); + + test("formats urgent failures separately from normal completions", () => { + expect( + formatSystemReminder([ + { task_id: "ok-1", status: "completed", exit_code: 0, command: "true" }, + { task_id: "fail-1", status: "failed", exit_code: 1, command: "false" }, + ]), + ).toBe( + "\n[BACKGROUND BASH FAILED]\n- task fail-1 (exit 1)\n\n\n[BACKGROUND BASH COMPLETED]\n- task ok-1 (exit 0)\n", ); }); @@ -420,14 +467,15 @@ describe("OpenCode background notifications", () => { ]); }); - test("turn-end wake sends one promptAsync message with reminder", async () => { + test("turn-end wake uses live session.prompt, not promptAsync", async () => { trackBgTask("s1", "task-1"); const { ctx } = harness(() => ({ success: true, bg_completions: [completion("task-1", "npm test")], })); + const prompt = mock(async () => {}); const promptAsync = mock(async () => {}); - installLiveServerClient(promptAsync); + installLiveServerClient({ prompt, promptAsync }); await handleIdleBgCompletions({ ctx, @@ -436,10 +484,11 @@ describe("OpenCode background notifications", () => { client: {}, serverUrl: TEST_SERVER_URL, }); - await waitForMockCallCount(promptAsync, 1); + await waitForMockCallCount(prompt, 1); - expect(promptAsync).toHaveBeenCalledTimes(1); - const payload = promptAsync.mock.calls[0][0] as { + expect(prompt).toHaveBeenCalledTimes(1); + expect(promptAsync).toHaveBeenCalledTimes(0); + const payload = prompt.mock.calls[0][0] as { body: { noReply: boolean; parts: Array<{ text: string }> }; }; expect(payload.body.noReply).toBe(false); @@ -449,7 +498,181 @@ describe("OpenCode background notifications", () => { expect(getLastLiveServerArgs()).toEqual({ serverUrl: TEST_SERVER_URL, directory: "/tmp/project", + headers: expect.objectContaining({ + "x-aft-delivery-id": expect.any(String), + }), + }); + }); + + test("turn-end wake preserves session method this binding for class-style prompt", async () => { + setTestLiveServerAvailable(false); + trackBgTask("s1", "task-1"); + const { ctx } = harness(() => ({ + success: true, + bg_completions: [completion("task-1", "npm test")], + })); + + class BoundSession { + calls: Array<{ + path: { id: string }; + body: { parts: Array<{ text: string }> }; + throwOnError?: boolean; + }> = []; + + async prompt(input: { + path: { id: string }; + body: { parts: Array<{ text: string }> }; + throwOnError?: boolean; + }) { + if (!(this instanceof BoundSession)) { + throw new Error("prompt lost this binding"); + } + this.calls.push(input); + } + } + + const session = new BoundSession(); + const fallbackClient = { session }; + + await handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }); + + await waitUntil(() => session.calls.length === 1); + expect(session.calls).toHaveLength(1); + expect(session.calls[0]?.path.id).toBe("s1"); + expect(session.calls[0]?.throwOnError).toBe(true); + expect(session.calls[0]?.body.parts[0]?.text).toContain("- task task-1 (exit 0)"); + expect( + sessionWarnSpy.mock.calls.some((call) => String(call[1]).includes("lost this binding")), + ).toBe(false); + }); + + test("live prompt sdk-style non-2xx demotes live server, falls back, then acks fallback", async () => { + trackBgTask("s1", "task-1"); + const send = mock(async (command: string) => + command === "bash_drain_completions" + ? { success: true, bg_completions: [completion("task-1", "npm test")] } + : { success: true, acked_task_ids: ["task-1"] }, + ); + const { ctx } = harness(send); + const fallbackPrompt = mock(async () => undefined); + const fallbackClient = makeClient({ prompt: fallbackPrompt }); + const livePrompt = mock(async () => ({ + error: { message: "missing route" }, + response: { ok: false, status: 404, statusText: "Not Found" }, + })); + installLiveServerClient({ prompt: livePrompt }); + + await handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }); + await waitForMockCallCount(fallbackPrompt, 1); + + expect(livePrompt).toHaveBeenCalledTimes(1); + expect(fallbackPrompt).toHaveBeenCalledTimes(1); + expect(send.mock.calls.filter((call) => call[0] === "bash_ack_completions")).toHaveLength(1); + const fallbackEvent = findTraceEvent("bash_completion_wake_live_server_fallback"); + expect(fallbackEvent).toBeDefined(); + expect(String(fallbackEvent?.error)).toContain("HTTP 404 Not Found"); + expect(String(fallbackEvent?.error)).toContain("missing route"); + }); + + test("in-process sdk-style non-2xx failure does not ack completion", async () => { + setTestLiveServerAvailable(false); + trackBgTask("s1", "task-1"); + const send = mock(async (command: string) => + command === "bash_drain_completions" + ? { success: true, bg_completions: [completion("task-1", "npm test")] } + : { success: true, acked_task_ids: ["task-1"] }, + ); + const { ctx } = harness(send); + const prompt = mock(async () => ({ + error: "bad request body", + response: { ok: false, status: 400, statusText: "Bad Request" }, + })); + const fallbackClient = makeClient({ prompt }); + + await handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }); + await waitForMockCallCount(prompt, 1); + + expect(send.mock.calls.some((call) => call[0] === "bash_ack_completions")).toBe(false); + expect(sessionBgStates.get("s1")?.pendingCompletions).toHaveLength(1); + expect(sessionBgStates.get("s1")?.debounceTimer).not.toBeNull(); + const errorEvent = findTraceEvent("bash_completion_wake_send_error"); + expect(errorEvent).toBeDefined(); + expect(String(errorEvent?.error)).toContain("HTTP 400 Bad Request"); + expect(String(errorEvent?.error)).toContain("bad request body"); + }); + + test("idle wake keeps debounce timer ref'd so autonomous completion reminder can fire", async () => { + trackBgTask("s1", "task-1"); + const { ctx } = harness(() => ({ + success: true, + bg_completions: [completion("task-1", "npm test")], + })); + const prompt = mock(async () => {}); + installLiveServerClient({ prompt }); + + const unrefSpy = await withSetTimeoutUnrefSpy(async () => { + await handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: {}, + serverUrl: TEST_SERVER_URL, + }); + }); + + expect(unrefSpy).not.toBeNull(); + expect(unrefSpy?.mock.calls).toHaveLength(0); + await waitForMockCallCount(prompt, 1); + }); + + test("live wake acks only after session.prompt resolves", async () => { + trackBgTask("s1", "task-1"); + let resolvePrompt: (() => void) | undefined; + const prompt = mock( + () => + new Promise((resolve) => { + resolvePrompt = resolve; + }), + ); + const send = mock(async (command: string) => + command === "bash_drain_completions" + ? { success: true, bg_completions: [completion("task-1", "npm test")] } + : { success: true, acked_task_ids: ["task-1"] }, + ); + const { ctx } = harness(send); + installLiveServerClient({ prompt }); + + const wakePromise = handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: {}, + serverUrl: TEST_SERVER_URL, }); + await waitForMockCallCount(prompt, 1); + expect(send.mock.calls.some((call) => call[0] === "bash_ack_completions")).toBe(false); + + resolvePrompt?.(); + await wakePromise; + await waitUntil(() => send.mock.calls.some((call) => call[0] === "bash_ack_completions")); }); test("turn-end wake forwards resolved agent + model + variant to preserve prefix cache", async () => { @@ -459,17 +682,20 @@ describe("OpenCode background notifications", () => { bg_completions: [completion("task-1", "npm test")], })); const promptAsync = mock(async () => {}); - installLiveServerClient(promptAsync, [ - { - info: { - role: "assistant", - agent: "build", - providerID: "anthropic", - modelID: "claude-opus-4-7", - variant: "thinking", + installLiveServerClient({ + prompt: promptAsync, + messages: [ + { + info: { + role: "assistant", + agent: "build", + providerID: "anthropic", + modelID: "claude-opus-4-7", + variant: "thinking", + }, }, - }, - ]); + ], + }); await handleIdleBgCompletions({ ctx, @@ -507,7 +733,7 @@ describe("OpenCode background notifications", () => { const promptAsync = mock(async () => {}); // Empty session — no prior messages, so the resolver returns null and // the wake should go out without forging a fake model. - installLiveServerClient(promptAsync, []); + installLiveServerClient({ prompt: promptAsync, messages: [] }); await handleIdleBgCompletions({ ctx, @@ -557,24 +783,6 @@ describe("OpenCode background notifications", () => { expect(state?.pendingPatternMatches[0]?.reason).toBe("task_exit"); }); - test("emptying pending queues resets wake hard-stop retry state", () => { - trackBgTask("s1", "task-1"); - ingestBgCompletions("s1", [completion("task-1", "npm test")]); - const state = sessionBgStates.get("s1"); - expect(state?.pendingCompletions).toHaveLength(1); - if (!state) throw new Error("missing state"); - state.retryDelayMs = 1000; - state.wakeRetryAttempts = 5; - state.wakeHardStopped = true; - - consumeBgCompletion("s1", "task-1"); - - expect(state.pendingCompletions).toHaveLength(0); - expect(state.retryDelayMs).toBeNull(); - expect(state.wakeRetryAttempts).toBe(0); - expect(state.wakeHardStopped).toBe(false); - }); - test("markExplicitControl retroactively converts already-pending completion to pattern match", () => { // Race: bash spawns → trackBgTask, completion push frame arrives → // ingestBgCompletions queues into pendingCompletions, THEN bash_watch @@ -657,7 +865,7 @@ describe("OpenCode background notifications", () => { })); const { ctx } = harness(send); const promptAsync = mock(async () => {}); - installLiveServerClient(promptAsync); + installLiveServerClient({ prompt: promptAsync }); await handleIdleBgCompletions({ ctx, directory: "/tmp/project", @@ -691,7 +899,7 @@ describe("OpenCode background notifications", () => { trackBgTask("s1", "task-1"); const { ctx } = harness(() => ({ success: true, bg_completions: [] })); const promptAsync = mock(async () => {}); - installLiveServerClient(promptAsync); + installLiveServerClient({ prompt: promptAsync }); await handlePushedBgCompletion( { @@ -708,6 +916,7 @@ describe("OpenCode background notifications", () => { expect(promptAsync).toHaveBeenCalledTimes(0); expect(sessionBgStates.get("s1")?.pendingCompletions).toHaveLength(1); expect(sessionBgStates.get("s1")?.debounceTimer).toBeNull(); + expect(sessionBgStates.get("s1")?.deferredCompletionTimer).not.toBeNull(); markTaskWaiting("s1", "task-1"); await sleep(300); @@ -716,52 +925,44 @@ describe("OpenCode background notifications", () => { expect(sessionBgStates.get("s1")?.pendingCompletions).toHaveLength(0); }); - test("buffers push completion received before task tracking", async () => { + test("same-turn deferred completion falls back without idle", async () => { + setTestLiveServerAvailable(false); + trackBgTask("s1", "task-1"); const { ctx } = harness(() => ({ success: true, bg_completions: [] })); const promptAsync = mock(async () => {}); - installLiveServerClient(promptAsync); + const fallbackClient = makeClient({ promptAsync }); await handlePushedBgCompletion( { ctx, directory: "/tmp/project", sessionID: "s1", - client: {}, + client: fallbackClient, serverUrl: TEST_SERVER_URL, }, completion("task-1", "npm test"), ); - trackBgTask("s1", "task-1"); - await handleIdleBgCompletions({ - ctx, - directory: "/tmp/project", - sessionID: "s1", - client: {}, - serverUrl: TEST_SERVER_URL, - }); - await waitForMockCallCount(promptAsync, 1); + await sleep(300); + + expect(promptAsync).toHaveBeenCalledTimes(0); + await waitForMockCallCount(promptAsync, 1, DEFERRED_COMPLETION_FALLBACK_MS + 1000); expect(promptAsync).toHaveBeenCalledTimes(1); - const text = (promptAsync.mock.calls[0][0] as { body: { parts: Array<{ text: string }> } }).body - .parts[0].text; - expect(text).toContain("- task task-1 (exit 0)"); + expect(sessionBgStates.get("s1")?.pendingCompletions).toHaveLength(0); + expect(sessionBgStates.get("s1")?.deferredCompletionTimer).toBeNull(); }); - test("failed wake keeps pending completions and retries", async () => { + test("in-turn append drains deferred completion before fallback without duplicate wake", async () => { setTestLiveServerAvailable(false); trackBgTask("s1", "task-1"); - const { ctx } = harness(() => ({ success: true, bg_completions: [] })); - const promptAsync = mock(async () => { - throw new Error("send failed"); - }); - const fallbackClient = makeClient(promptAsync); - await handleIdleBgCompletions({ - ctx, - directory: "/tmp/project", - sessionID: "s1", - client: fallbackClient, - serverUrl: TEST_SERVER_URL, - }); + const send = mock(async (command: string) => + command === "bash_ack_completions" + ? { success: true, acked_task_ids: ["task-1"] } + : { success: true, bg_completions: [] }, + ); + const { ctx } = harness(send); + const promptAsync = mock(async () => {}); + const fallbackClient = makeClient({ promptAsync }); await handlePushedBgCompletion( { @@ -773,28 +974,22 @@ describe("OpenCode background notifications", () => { }, completion("task-1", "npm test"), ); - await waitForMockCallCount(promptAsync, 1); - expect(promptAsync).toHaveBeenCalledTimes(1); - expect(sessionBgStates.get("s1")?.pendingCompletions).toHaveLength(1); - expect(sessionBgStates.get("s1")?.debounceTimer).not.toBeNull(); + const output = { output: "tool output" }; + await appendInTurnBgCompletions({ ctx, directory: "/tmp/project", sessionID: "s1" }, output); + await sleep(DEFERRED_COMPLETION_FALLBACK_MS + 150); + + expect(output.output).toContain("task-1"); + expect(promptAsync).toHaveBeenCalledTimes(0); + expect(send.mock.calls.filter((call) => call[0] === "bash_ack_completions")).toHaveLength(1); }); - test("failed wake hard-stops after capped retries", async () => { + test("markTaskWaiting consumes deferred completion before fallback without duplicate wake", async () => { setTestLiveServerAvailable(false); trackBgTask("s1", "task-1"); const { ctx } = harness(() => ({ success: true, bg_completions: [] })); - const promptAsync = mock(async () => { - throw new Error("send failed"); - }); - const fallbackClient = makeClient(promptAsync); - await handleIdleBgCompletions({ - ctx, - directory: "/tmp/project", - sessionID: "s1", - client: fallbackClient, - serverUrl: TEST_SERVER_URL, - }); + const promptAsync = mock(async () => {}); + const fallbackClient = makeClient({ promptAsync }); await handlePushedBgCompletion( { @@ -806,61 +1001,731 @@ describe("OpenCode background notifications", () => { }, completion("task-1", "npm test"), ); - await waitUntil( - () => promptAsync.mock.calls.length >= 5 && sessionBgStates.get("s1")?.debounceTimer === null, - 10_000, - ); + markTaskWaiting("s1", "task-1"); + await sleep(DEFERRED_COMPLETION_FALLBACK_MS + 150); - expect(promptAsync).toHaveBeenCalledTimes(5); - expect(sessionBgStates.get("s1")?.pendingCompletions).toHaveLength(1); - expect(sessionBgStates.get("s1")?.debounceTimer).toBeNull(); + expect(promptAsync).toHaveBeenCalledTimes(0); + expect(sessionBgStates.get("s1")?.pendingCompletions).toHaveLength(0); + expect(sessionBgStates.get("s1")?.deferredCompletionTimer).toBeNull(); }); - test("post-idle push completion still wakes even when bridge is busy with non-agent RPC", async () => { - // Regression: previously bailed on `isActive()` (bridge.hasPendingRequests()) - // which returned true for the TUI status poll, orphaning the completion when - // no other trigger fired. Once the spawn turn has gone idle, the wake must - // still be scheduled. + test("staggered deferred fallback wakes matured task only", async () => { + setTestLiveServerAvailable(false); + const { ctx } = harness(() => ({ success: true, bg_completions: [] })); + const promptAsync = mock(async () => {}); + const fallbackClient = makeClient({ promptAsync }); + trackBgTask("s1", "task-1"); + await handlePushedBgCompletion( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }, + completion("task-1", "cmd-1"), + ); + await sleep(250); + + trackBgTask("s1", "task-2"); + await handlePushedBgCompletion( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }, + completion("task-2", "cmd-2"), + ); + + await waitForMockCallCount(promptAsync, 1, DEFERRED_COMPLETION_FALLBACK_MS + 1000); + const firstText = ( + promptAsync.mock.calls[0]?.[0] as { body: { parts: Array<{ text: string }> } } + ).body.parts[0].text; + expect(firstText).toContain("task-1"); + expect(firstText).not.toContain("task-2"); + + await waitForMockCallCount(promptAsync, 2, DEFERRED_COMPLETION_FALLBACK_MS + 1000); + const secondText = ( + promptAsync.mock.calls[1]?.[0] as { body: { parts: Array<{ text: string }> } } + ).body.parts[0].text; + expect(secondText).toContain("task-2"); + expect(sessionBgStates.get("s1")?.pendingCompletions).toHaveLength(0); + }); + + test("buffered unknown completion promoted by trackBgTask gets deferred fallback", async () => { + setTestLiveServerAvailable(false); const { ctx } = harness(() => ({ success: true, bg_completions: [] })); const promptAsync = mock(async () => {}); - installLiveServerClient(promptAsync); - await handleIdleBgCompletions({ - ctx, - directory: "/tmp/project", - sessionID: "s1", - client: {}, - serverUrl: TEST_SERVER_URL, - }); + const fallbackClient = makeClient({ promptAsync }); await handlePushedBgCompletion( { ctx, directory: "/tmp/project", sessionID: "s1", - client: {}, + client: fallbackClient, serverUrl: TEST_SERVER_URL, }, completion("task-1", "npm test"), ); - await waitForMockCallCount(promptAsync, 1); + trackBgTask("s1", "task-1"); + + expect(sessionBgStates.get("s1")?.deferredCompletionTimer).not.toBeNull(); + await waitForMockCallCount(promptAsync, 1, DEFERRED_COMPLETION_FALLBACK_MS + 1000); expect(promptAsync).toHaveBeenCalledTimes(1); - const text = (promptAsync.mock.calls[0][0] as { body: { parts: Array<{ text: string }> } }).body - .parts[0].text; - expect(text).toContain("task-1"); expect(sessionBgStates.get("s1")?.pendingCompletions).toHaveLength(0); }); - test("coalesces three idle completions into one notification", async () => { - const responses = [ - { success: true, bg_completions: [completion("task-1", "one")] }, - { success: true, bg_completions: [completion("task-2", "two")] }, - { success: true, bg_completions: [completion("task-3", "three")] }, - ]; - const { ctx } = harness(() => responses.shift() ?? { success: true, bg_completions: [] }); - const promptAsync = mock(async () => {}); - installLiveServerClient(promptAsync); + test("trackBgTask buffered promotion resets hard-stop and wakes on fallback", async () => { + setTestLiveServerAvailable(false); + const { ctx } = harness(() => ({ success: true, bg_completions: [] })); + const promptAsync = mock(async () => {}); + const fallbackClient = makeClient({ promptAsync }); + + await handlePushedBgCompletion( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }, + completion("task-1", "npm test"), + ); + + const state = sessionBgStates.get("s1"); + expect(state).toBeDefined(); + if (!state) throw new Error("missing session state"); + state.wakeHardStopped = true; + state.wakeRetryAttempts = 5; + state.retryDelayMs = 1234; + + trackBgTask("s1", "task-1"); + + expect(state.wakeHardStopped).toBe(false); + expect(state.wakeRetryAttempts).toBe(0); + expect(state.retryDelayMs).toBeNull(); + await waitForMockCallCount(promptAsync, 1, DEFERRED_COMPLETION_FALLBACK_MS + 1000); + + expect(promptAsync).toHaveBeenCalledTimes(1); + }); + + test("trackBgTask buffered promotion prunes stale long-running reminder for same task", async () => { + setTestLiveServerAvailable(false); + const { ctx } = harness(() => ({ success: true, bg_completions: [] })); + const promptAsync = mock(async () => {}); + const fallbackClient = makeClient({ promptAsync }); + + await handlePushedBgCompletion( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }, + completion("task-1", "npm test"), + ); + + const state = sessionBgStates.get("s1"); + expect(state).toBeDefined(); + if (!state) throw new Error("missing session state"); + state.pendingLongRunning.push({ + task_id: "task-1", + session_id: "s1", + command: "npm test", + elapsed_ms: 30_000, + }); + + trackBgTask("s1", "task-1"); + + expect(state.pendingLongRunning).toHaveLength(0); + await waitForMockCallCount(promptAsync, 1, DEFERRED_COMPLETION_FALLBACK_MS + 1000); + + const text = (promptAsync.mock.calls[0]?.[0] as { body: { parts: Array<{ text: string }> } }) + .body.parts[0].text; + expect(text).toContain("[BACKGROUND BASH COMPLETED]"); + expect(text).not.toContain("[BACKGROUND BASH STILL RUNNING]"); + expect(text).not.toContain("still running after"); + }); + + test("buffers push completion received before task tracking", async () => { + const { ctx } = harness(() => ({ success: true, bg_completions: [] })); + const promptAsync = mock(async () => {}); + installLiveServerClient({ prompt: promptAsync }); + + await handlePushedBgCompletion( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: {}, + serverUrl: TEST_SERVER_URL, + }, + completion("task-1", "npm test"), + ); + trackBgTask("s1", "task-1"); + await handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: {}, + serverUrl: TEST_SERVER_URL, + }); + await waitForMockCallCount(promptAsync, 1); + + expect(promptAsync).toHaveBeenCalledTimes(1); + const text = (promptAsync.mock.calls[0][0] as { body: { parts: Array<{ text: string }> } }).body + .parts[0].text; + expect(text).toContain("- task task-1 (exit 0)"); + }); + + test("idle boundary promotes orphaned unknown completion and delivers it once", async () => { + const send = mock(async (command: string) => + command === "bash_ack_completions" + ? { success: true, acked_task_ids: ["task-orphan"] } + : { success: true, bg_completions: [] }, + ); + const { ctx } = harness(send); + const promptAsync = mock(async () => {}); + installLiveServerClient({ prompt: promptAsync }); + + await handlePushedBgCompletion( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: {}, + serverUrl: TEST_SERVER_URL, + }, + completion("task-orphan", "npm test"), + ); + await sleep(300); + + expect(promptAsync).toHaveBeenCalledTimes(0); + expect(sessionBgStates.get("s1")?.pendingCompletions).toHaveLength(0); + + await handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: {}, + serverUrl: TEST_SERVER_URL, + }); + await waitForMockCallCount(promptAsync, 1); + + expect(promptAsync).toHaveBeenCalledTimes(1); + const text = (promptAsync.mock.calls[0][0] as { body: { parts: Array<{ text: string }> } }).body + .parts[0].text; + expect(text).toContain("- task task-orphan (exit 0)"); + expect(sessionBgStates.get("s1")?.pendingCompletions).toHaveLength(0); + expect(sessionBgStates.get("s1")?.unknownCompletions).toHaveLength(0); + expect(send.mock.calls.filter((call) => call[0] === "bash_ack_completions")).toHaveLength(1); + }); + + test("buffered unknown completion respects late explicit-control promotion path", async () => { + const send = mock(async (command: string) => + command === "bash_ack_completions" + ? { success: true, acked_task_ids: ["task-explicit"] } + : { success: true, bg_completions: [] }, + ); + const { ctx } = harness(send); + const output = { output: "watch registered" }; + + await handlePushedBgCompletion( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: {}, + serverUrl: TEST_SERVER_URL, + }, + completion("task-explicit", "npm test"), + ); + + markExplicitControl("s1", "task-explicit", false); + await appendInTurnBgCompletions({ ctx, directory: "/tmp/project", sessionID: "s1" }, output); + + expect(output.output).toContain("[BG BASH NOTIFY]"); + expect(output.output).toContain("- task task-explicit exited:"); + expect(output.output).not.toContain("[BACKGROUND BASH COMPLETED]"); + expect(sessionBgStates.get("s1")?.pendingCompletions).toHaveLength(0); + expect(sessionBgStates.get("s1")?.pendingPatternMatches).toHaveLength(0); + expect(sessionBgStates.get("s1")?.unknownCompletions).toHaveLength(0); + expect(send).toHaveBeenCalledWith("bash_ack_completions", { + session_id: "s1", + task_ids: ["task-explicit"], + }); + }); + + test("buffered unknown completion is dropped after markTaskWaiting consumed path", async () => { + const send = mock(async (command: string) => + command === "bash_ack_completions" + ? { success: true, acked_task_ids: ["task-waiting"] } + : { success: true, bg_completions: [] }, + ); + const { ctx } = harness(send); + const promptAsync = mock(async () => {}); + installLiveServerClient({ prompt: promptAsync }); + + await handlePushedBgCompletion( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: {}, + serverUrl: TEST_SERVER_URL, + }, + completion("task-waiting", "npm test"), + ); + + markTaskWaiting("s1", "task-waiting"); + await handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: {}, + serverUrl: TEST_SERVER_URL, + }); + await sleep(300); + + expect(promptAsync).toHaveBeenCalledTimes(0); + expect(sessionBgStates.get("s1")?.pendingCompletions).toHaveLength(0); + expect(sessionBgStates.get("s1")?.pendingPatternMatches).toHaveLength(0); + expect(sessionBgStates.get("s1")?.unknownCompletions).toHaveLength(0); + expect(send.mock.calls.filter((call) => call[0] === "bash_ack_completions")).toHaveLength(0); + }); + + test("failed wake keeps pending completions and retries", async () => { + setTestLiveServerAvailable(false); + trackBgTask("s1", "task-1"); + const { ctx } = harness(() => ({ success: true, bg_completions: [] })); + const promptAsync = mock(async () => { + throw new Error("send failed"); + }); + const fallbackClient = makeClient({ promptAsync }); + await handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }); + + await handlePushedBgCompletion( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }, + completion("task-1", "npm test"), + ); + await waitForMockCallCount(promptAsync, 1); + + expect(promptAsync).toHaveBeenCalledTimes(1); + expect(sessionBgStates.get("s1")?.pendingCompletions).toHaveLength(1); + expect(sessionBgStates.get("s1")?.debounceTimer).not.toBeNull(); + }); + + test("failed wake hard-stops after capped retries", async () => { + setTestLiveServerAvailable(false); + trackBgTask("s1", "task-1"); + const { ctx } = harness(() => ({ success: true, bg_completions: [] })); + const promptAsync = mock(async () => { + throw new Error("send failed"); + }); + const fallbackClient = makeClient({ promptAsync }); + await handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }); + + await handlePushedBgCompletion( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }, + completion("task-1", "npm test"), + ); + await waitUntil( + () => promptAsync.mock.calls.length >= 5 && sessionBgStates.get("s1")?.debounceTimer === null, + 10_000, + ); + + expect(promptAsync).toHaveBeenCalledTimes(5); + expect(sessionBgStates.get("s1")?.pendingCompletions).toHaveLength(1); + expect(sessionBgStates.get("s1")?.debounceTimer).toBeNull(); + }); + + test("timer reminder hard-stops, then same-task completion push recovers without still-running text", async () => { + setTestLiveServerAvailable(false); + let shouldFail = true; + const promptAsync = mock(async () => { + if (shouldFail) throw new Error("send failed"); + }); + const fallbackClient = makeClient({ promptAsync }); + const { ctx } = harness(() => ({ success: true, bg_completions: [] })); + trackBgTask("s1", "task-1"); + + await handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }); + await handlePushedBgLongRunning( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }, + { task_id: "task-1", session_id: "s1", command: "npm test", elapsed_ms: 30_000 }, + ); + await waitUntil( + () => promptAsync.mock.calls.length >= 5 && sessionBgStates.get("s1")?.debounceTimer === null, + 10_000, + ); + expect(sessionBgStates.get("s1")?.wakeHardStopped).toBe(true); + + shouldFail = false; + await handlePushedBgCompletion( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }, + completion("task-1", "npm test"), + ); + await waitForMockCallCount(promptAsync, 6, 2_000); + + const text = ( + promptAsync.mock.calls.at(-1)?.[0] as { body: { parts: Array<{ text: string }> } } + ).body.parts[0].text; + expect(text).toContain("[BACKGROUND BASH COMPLETED]"); + expect(text).not.toContain("[BACKGROUND BASH STILL RUNNING]"); + expect(sessionBgStates.get("s1")?.pendingLongRunning).toHaveLength(0); + }); + + test("timer reminder hard-stops, then urgent failure recovers immediately", async () => { + setTestLiveServerAvailable(false); + let shouldFail = true; + const promptAsync = mock(async () => { + if (shouldFail) throw new Error("send failed"); + }); + const fallbackClient = makeClient({ promptAsync }); + const { ctx } = harness(() => ({ success: true, bg_completions: [] })); + trackBgTask("s1", "task-1"); + + await handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }); + await handlePushedBgLongRunning( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }, + { task_id: "task-1", session_id: "s1", command: "npm test", elapsed_ms: 30_000 }, + ); + await waitUntil( + () => promptAsync.mock.calls.length >= 5 && sessionBgStates.get("s1")?.debounceTimer === null, + 10_000, + ); + + shouldFail = false; + await handlePushedBgCompletion( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }, + { task_id: "task-1", status: "failed", exit_code: 1, command: "npm test" }, + ); + await waitForMockCallCount(promptAsync, 6, 500); + + const text = ( + promptAsync.mock.calls.at(-1)?.[0] as { body: { parts: Array<{ text: string }> } } + ).body.parts[0].text; + expect(text).toContain("[BACKGROUND BASH FAILED]"); + expect(text).not.toContain("[BACKGROUND BASH STILL RUNNING]"); + }); + + test("drained completion path also recovers after timer hard-stop", async () => { + setTestLiveServerAvailable(false); + let shouldFail = true; + let drainReturnsCompletion = false; + const promptAsync = mock(async () => { + if (shouldFail) throw new Error("send failed"); + }); + const fallbackClient = makeClient({ promptAsync }); + const { ctx } = harness((command) => { + if (command === "bash_drain_completions") { + return { + success: true, + bg_completions: drainReturnsCompletion ? [completion("task-1", "npm test")] : [], + }; + } + return { success: true, acked_task_ids: ["task-1"] }; + }); + + await handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }); + await handlePushedBgLongRunning( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }, + { task_id: "task-1", session_id: "s1", command: "npm test", elapsed_ms: 30_000 }, + ); + await waitUntil( + () => promptAsync.mock.calls.length >= 5 && sessionBgStates.get("s1")?.debounceTimer === null, + 10_000, + ); + + shouldFail = false; + drainReturnsCompletion = true; + trackBgTask("s1", "task-1"); + await handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }); + await waitForMockCallCount(promptAsync, 6, 2_000); + + const text = ( + promptAsync.mock.calls.at(-1)?.[0] as { body: { parts: Array<{ text: string }> } } + ).body.parts[0].text; + expect(text).toContain("[BACKGROUND BASH COMPLETED]"); + expect(text).not.toContain("[BACKGROUND BASH STILL RUNNING]"); + }); + + test("terminal completion prunes stale long-running state", async () => { + setTestLiveServerAvailable(false); + const promptAsync = mock(async () => {}); + const fallbackClient = makeClient({ promptAsync }); + const { ctx } = harness(() => ({ success: true, bg_completions: [] })); + trackBgTask("s1", "task-1"); + + await handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }); + await handlePushedBgLongRunning( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }, + { task_id: "task-1", session_id: "s1", command: "npm test", elapsed_ms: 30_000 }, + ); + await waitForMockCallCount(promptAsync, 1, 2_000); + + await handlePushedBgCompletion( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }, + completion("task-1", "npm test"), + ); + await waitForMockCallCount(promptAsync, 2, 2_000); + + const text = ( + promptAsync.mock.calls.at(-1)?.[0] as { body: { parts: Array<{ text: string }> } } + ).body.parts[0].text; + expect(text).toContain("[BACKGROUND BASH COMPLETED]"); + expect(text).not.toContain("still running after"); + expect(sessionBgStates.get("s1")?.pendingLongRunning).toHaveLength(0); + }); + + test("long-running wake clears completion deferral so later completion push wakes again", async () => { + setTestLiveServerAvailable(false); + const promptAsync = mock(async () => {}); + const fallbackClient = makeClient({ promptAsync }); + const { ctx } = harness(() => ({ success: true, bg_completions: [] })); + trackBgTask("s1", "task-1"); + + await handlePushedBgLongRunning( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }, + { task_id: "task-1", session_id: "s1", command: "npm test", elapsed_ms: 30_000 }, + ); + await waitForMockCallCount(promptAsync, 1, 2_000); + + const firstText = ( + promptAsync.mock.calls[0]?.[0] as { body: { parts: Array<{ text: string }> } } + ).body.parts[0].text; + expect(firstText).toContain("[BACKGROUND BASH STILL RUNNING]"); + expect(firstText).not.toContain("[BACKGROUND BASH COMPLETED]"); + expect(sessionBgStates.get("s1")?.wakeDeferredTaskIds.has("task-1")).toBe(false); + + await handlePushedBgCompletion( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }, + completion("task-1", "npm test"), + ); + await waitForMockCallCount(promptAsync, 2, 2_000); + + const secondText = ( + promptAsync.mock.calls[1]?.[0] as { body: { parts: Array<{ text: string }> } } + ).body.parts[0].text; + expect(secondText).toContain("[BACKGROUND BASH COMPLETED]"); + expect(secondText).not.toContain("[BACKGROUND BASH STILL RUNNING]"); + }); + + test("inline consume path also clears hard-stop and stale long-running state", () => { + trackBgTask("s1", "task-inline"); + const state = sessionBgStates.get("s1"); + expect(state).toBeDefined(); + + state!.pendingLongRunning.push({ + task_id: "task-inline", + session_id: "s1", + command: "sleep 40", + elapsed_ms: 40_000, + }); + state!.wakeHardStopped = true; + state!.wakeRetryAttempts = 5; + state!.retryDelayMs = 1000; + + consumeBgCompletion("s1", "task-inline"); + + expect(state!.wakeHardStopped).toBe(false); + expect(state!.wakeRetryAttempts).toBe(0); + expect(state!.retryDelayMs).toBeNull(); + expect(state!.pendingLongRunning).toHaveLength(0); + }); + + test("post-idle push completion still wakes even when bridge is busy with non-agent RPC", async () => { + // Regression: previously bailed on `isActive()` (bridge.hasPendingRequests()) + // which returned true for the TUI status poll, orphaning the completion when + // no other trigger fired. Once the spawn turn has gone idle, the wake must + // still be scheduled. + trackBgTask("s1", "task-1"); + const { ctx } = harness(() => ({ success: true, bg_completions: [] })); + const promptAsync = mock(async () => {}); + installLiveServerClient({ prompt: promptAsync }); + await handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: {}, + serverUrl: TEST_SERVER_URL, + }); + + await handlePushedBgCompletion( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: {}, + serverUrl: TEST_SERVER_URL, + }, + completion("task-1", "npm test"), + ); + await waitForMockCallCount(promptAsync, 1); + + expect(promptAsync).toHaveBeenCalledTimes(1); + const text = (promptAsync.mock.calls[0][0] as { body: { parts: Array<{ text: string }> } }).body + .parts[0].text; + expect(text).toContain("task-1"); + expect(sessionBgStates.get("s1")?.pendingCompletions).toHaveLength(0); + }); + + test("urgent terminal failure wakes without normal debounce delay", async () => { + setTestLiveServerAvailable(false); + trackBgTask("s1", "task-urgent"); + const { ctx } = harness(() => ({ success: true, bg_completions: [] })); + const promptAsync = mock(async () => {}); + const fallbackClient = makeClient({ promptAsync }); + await handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }); + + await handlePushedBgCompletion( + { + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }, + { task_id: "task-urgent", status: "failed", exit_code: 1, command: "npm test" }, + ); + await waitForMockCallCount(promptAsync, 1, 250); + + expect(promptAsync).toHaveBeenCalledTimes(1); + const text = (promptAsync.mock.calls[0][0] as { body: { parts: Array<{ text: string }> } }).body + .parts[0].text; + expect(text).toContain("[BACKGROUND BASH FAILED]"); + expect(text).not.toContain("[BACKGROUND BASH COMPLETED]"); + }); + + test("coalesces three idle completions into one notification", async () => { + const responses = [ + { success: true, bg_completions: [completion("task-1", "one")] }, + { success: true, bg_completions: [completion("task-2", "two")] }, + { success: true, bg_completions: [completion("task-3", "three")] }, + ]; + const { ctx } = harness(() => responses.shift() ?? { success: true, bg_completions: [] }); + const promptAsync = mock(async () => {}); + installLiveServerClient({ prompt: promptAsync }); for (const taskId of ["task-1", "task-2", "task-3"]) trackBgTask("s1", taskId); await handleIdleBgCompletions({ @@ -911,7 +1776,7 @@ describe("OpenCode background notifications", () => { bg_completions: [completion(`task-${++index}`, `cmd-${index}`)], })); const promptAsync = mock(async () => {}); - installLiveServerClient(promptAsync); + installLiveServerClient({ prompt: promptAsync }); const started = Date.now(); for (let task = 1; task <= 6; task++) trackBgTask("s1", `task-${task}`); @@ -940,7 +1805,7 @@ describe("OpenCode background notifications", () => { test("second pushed background completion wakes without chat message reset", async () => { const promptAsync = mock(async () => {}); - installLiveServerClient(promptAsync); + installLiveServerClient({ prompt: promptAsync }); let responses: BridgeResponse[] = [ { success: true, bg_completions: [completion("task-1", "one")] }, ]; @@ -1057,22 +1922,22 @@ describe("OpenCode background notifications", () => { // path reads the cached decision via `useLiveServerWake()` each time // a reminder fires. // - // • `true` — POST through `createOpencodeClient(input.serverUrl)`. - // Works around anomalyco/opencode#28202 (no duplicate runs). - // • `false` — POST through `drainContext.client.session.promptAsync`. - // Accepts the upstream bug so wakes still arrive instead - // of being indefinitely queued + dropped via wake_hard_stop. + // • `true` — wake through `createOpencodeClient(input.serverUrl)` using + // live `session.prompt(...)` delivery proof. + // • `false` — fall back to `drainContext.client.session.prompt(...)`, or + // degrade to `.promptAsync` only when prompt is missing. - test("live-server wake uses createOpencodeClient and tags trace as live-server", async () => { + test("live-server wake uses session.prompt and tags trace as live-server", async () => { setTestLiveServerAvailable(true); trackBgTask("s1", "task-1"); const { ctx } = harness(() => ({ success: true, bg_completions: [completion("task-1", "npm test")], })); + const livePrompt = mock(async () => {}); const livePromptAsync = mock(async () => {}); - installLiveServerClient(livePromptAsync); - const fallbackClient = makeClient(mock(async () => {})); + installLiveServerClient({ prompt: livePrompt, promptAsync: livePromptAsync }); + const fallbackClient = makeClient({ promptAsync: mock(async () => {}) }); await handleIdleBgCompletions({ ctx, @@ -1081,25 +1946,37 @@ describe("OpenCode background notifications", () => { client: fallbackClient, serverUrl: TEST_SERVER_URL, }); - await waitForMockCallCount(livePromptAsync, 1); + await waitForMockCallCount(livePrompt, 1); // The live-server client was used; the fallback client was NOT. - expect(livePromptAsync).toHaveBeenCalledTimes(1); + expect(livePrompt).toHaveBeenCalledTimes(1); + expect(livePromptAsync).toHaveBeenCalledTimes(0); expect(fallbackClient.session.promptAsync).toHaveBeenCalledTimes(0); - const startMeta = findTraceEvent("bash_completion_wake_prompt_async_start"); + const startMeta = findTraceEvent("bash_completion_wake_send_start"); expect(startMeta).toBeDefined(); expect(startMeta?.wake_client_path).toBe("live-server"); + expect(startMeta?.wake_client_method).toBe("prompt"); expect(typeof startMeta?.delivery_id).toBe("string"); + expect(startMeta?.correlation_header).toBe("x-aft-delivery-id"); expect(startMeta?.task_ids).toEqual(["task-1"]); // The factory saw the serverUrl + directory we configured. expect(getLastLiveServerArgs()).toEqual({ serverUrl: TEST_SERVER_URL, directory: "/tmp/project", + headers: { + "x-aft-delivery-id": startMeta?.delivery_id as string, + }, }); + + const okLogLine = sessionLogSpy.mock.calls.find( + (call) => + (call[2] as { event?: string } | undefined)?.event === "bash_completion_wake_send_ok", + ); + expect(okLogLine?.[1]).toContain("wake send resolved"); }); - test("live-server failure falls back in-process and demotes subsequent wakes", async () => { + test("live prompt failure falls back in-process and demotes subsequent wakes", async () => { setTestLiveServerAvailable(true); const responses: BridgeResponse[] = [ { success: true, bg_completions: [completion("task-1", "npm test")] }, @@ -1111,11 +1988,11 @@ describe("OpenCode background notifications", () => { : { success: true, acked_task_ids: [] }, ); const { ctx } = harness(send); - const livePromptAsync = mock(async () => { + const livePrompt = mock(async () => { throw new Error("connect ECONNREFUSED 127.0.0.1"); }); - installLiveServerClient(livePromptAsync); - const fallbackClient = makeClient(mock(async () => {})); + installLiveServerClient({ prompt: livePrompt }); + const fallbackClient = makeClient({ prompt: mock(async () => {}) }); trackBgTask("s1", "task-1"); await handleIdleBgCompletions({ @@ -1125,10 +2002,10 @@ describe("OpenCode background notifications", () => { client: fallbackClient, serverUrl: TEST_SERVER_URL, }); - await waitForMockCallCount(fallbackClient.session.promptAsync, 1); + await waitForMockCallCount(fallbackClient.session.prompt!, 1); - expect(livePromptAsync).toHaveBeenCalledTimes(1); - expect(fallbackClient.session.promptAsync).toHaveBeenCalledTimes(1); + expect(livePrompt).toHaveBeenCalledTimes(1); + expect(fallbackClient.session.prompt).toHaveBeenCalledTimes(1); // Production code calls setLiveServerWakeAvailable(serverUrl, false) // (per-URL form), so check the per-URL availability map directly. expect(perUrlAvailability.get(normalizeServerUrl(TEST_SERVER_URL))).toBe(false); @@ -1141,9 +2018,9 @@ describe("OpenCode background notifications", () => { const debugEvents = sessionDebugSpy.mock.calls.map( (call) => (call[2] as { event?: string } | undefined)?.event, ); - expect(debugEvents).toContain("bash_completion_wake_prompt_async_error"); + expect(debugEvents).toContain("bash_completion_wake_send_error"); expect(debugEvents).toContain("bash_completion_wake_live_server_fallback"); - expect(warnEvents).not.toContain("bash_completion_wake_prompt_async_error"); + expect(warnEvents).not.toContain("bash_completion_wake_send_error"); expect(warnEvents).not.toContain("bash_completion_wake_live_server_fallback"); trackBgTask("s1", "task-2"); @@ -1154,13 +2031,38 @@ describe("OpenCode background notifications", () => { client: fallbackClient, serverUrl: TEST_SERVER_URL, }); - await waitForMockCallCount(fallbackClient.session.promptAsync, 2); + await waitForMockCallCount(fallbackClient.session.prompt!, 2); + + expect(livePrompt).toHaveBeenCalledTimes(1); + expect(fallbackClient.session.prompt).toHaveBeenCalledTimes(2); + }); + + test("live client missing prompt does not call live promptAsync; falls back and demotes", async () => { + setTestLiveServerAvailable(true); + trackBgTask("s1", "task-1"); + const { ctx } = harness(() => ({ + success: true, + bg_completions: [completion("task-1", "npm test")], + })); + const livePromptAsync = mock(async () => {}); + installLiveServerClient({ promptAsync: livePromptAsync }); + const fallbackClient = makeClient({ prompt: mock(async () => {}) }); + + await handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }); + await waitForMockCallCount(fallbackClient.session.prompt!, 1); - expect(livePromptAsync).toHaveBeenCalledTimes(1); - expect(fallbackClient.session.promptAsync).toHaveBeenCalledTimes(2); + expect(livePromptAsync).toHaveBeenCalledTimes(0); + expect(fallbackClient.session.prompt).toHaveBeenCalledTimes(1); + expect(perUrlAvailability.get(normalizeServerUrl(TEST_SERVER_URL))).toBe(false); }); - test("in-process fallback wake uses drainContext.client and tags trace accordingly", async () => { + test("in-process fallback prefers prompt when available and tags trace accordingly", async () => { // When the live HTTP listener was unreachable at startup, // bg-notifications must use the plugin-provided in-process client so // wakes still arrive — at the cost of the upstream duplicate-runner @@ -1173,8 +2075,11 @@ describe("OpenCode background notifications", () => { bg_completions: [completion("task-1", "npm test")], })); const livePromptAsync = mock(async () => {}); - installLiveServerClient(livePromptAsync); - const fallbackClient = makeClient(mock(async () => {})); + installLiveServerClient({ prompt: livePromptAsync }); + const fallbackClient = makeClient({ + prompt: mock(async () => {}), + promptAsync: mock(async () => {}), + }); await handleIdleBgCompletions({ ctx, @@ -1183,21 +2088,46 @@ describe("OpenCode background notifications", () => { client: fallbackClient, serverUrl: TEST_SERVER_URL, }); - await waitForMockCallCount(fallbackClient.session.promptAsync, 1); + await waitForMockCallCount(fallbackClient.session.prompt!, 1); // The fallback client was used; the live-server factory was NOT // consulted at all (no probe of getLastLiveServerArgs). - expect(fallbackClient.session.promptAsync).toHaveBeenCalledTimes(1); + expect(fallbackClient.session.prompt).toHaveBeenCalledTimes(1); + expect(fallbackClient.session.promptAsync).toHaveBeenCalledTimes(0); expect(livePromptAsync).toHaveBeenCalledTimes(0); expect(getLastLiveServerArgs()).toBeNull(); - const startMeta = findTraceEvent("bash_completion_wake_prompt_async_start"); + const startMeta = findTraceEvent("bash_completion_wake_send_start"); expect(startMeta).toBeDefined(); expect(startMeta?.wake_client_path).toBe("in-process-fallback"); + expect(startMeta?.wake_client_method).toBe("prompt"); expect(typeof startMeta?.delivery_id).toBe("string"); expect(startMeta?.task_ids).toEqual(["task-1"]); }); + test("in-process fallback uses promptAsync only if prompt missing", async () => { + setTestLiveServerAvailable(false); + trackBgTask("s1", "task-1"); + const { ctx } = harness(() => ({ + success: true, + bg_completions: [completion("task-1", "npm test")], + })); + const fallbackClient = makeClient({ promptAsync: mock(async () => {}) }); + + await handleIdleBgCompletions({ + ctx, + directory: "/tmp/project", + sessionID: "s1", + client: fallbackClient, + serverUrl: TEST_SERVER_URL, + }); + await waitForMockCallCount(fallbackClient.session.promptAsync!, 1); + + expect(findTraceEvent("bash_completion_wake_degraded_delivery")?.wake_client_method).toBe( + "promptAsync", + ); + }); + test("in-process fallback without client emits diagnostic and queues for retry", async () => { // If the live-server probe said false AND the drainContext somehow // arrived without a client, the wake has no transport at all. The @@ -1208,7 +2138,7 @@ describe("OpenCode background notifications", () => { trackBgTask("s1", "task-1"); const { ctx } = harness(() => ({ success: true, bg_completions: [] })); const livePromptAsync = mock(async () => {}); - installLiveServerClient(livePromptAsync); + installLiveServerClient({ prompt: livePromptAsync }); await handleIdleBgCompletions({ ctx, directory: "/tmp/project", @@ -1289,3 +2219,24 @@ async function waitUntil( function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } + +async function withSetTimeoutUnrefSpy(run: () => Promise): Promise> { + const originalSetTimeout = globalThis.setTimeout; + let unrefSpy: ReturnType | null = null; + globalThis.setTimeout = ((handler: TimerHandler, timeout?: number, ...args: unknown[]) => { + const timer = originalSetTimeout(handler, timeout, ...args); + if (timer && typeof (timer as NodeJS.Timeout).unref === "function") { + const realUnref = (timer as NodeJS.Timeout).unref.bind(timer as NodeJS.Timeout); + unrefSpy = mock((...unrefArgs: unknown[]) => realUnref(...unrefArgs)); + (timer as NodeJS.Timeout).unref = unrefSpy as unknown as NodeJS.Timeout["unref"]; + } + return timer; + }) as typeof globalThis.setTimeout; + try { + await run(); + } finally { + globalThis.setTimeout = originalSetTimeout; + } + if (!unrefSpy) throw new Error("expected setTimeout to return timer with unref()"); + return unrefSpy; +} diff --git a/packages/opencode-plugin/src/__tests__/e2e/bg-notifications.test.ts b/packages/opencode-plugin/src/__tests__/e2e/bg-notifications.test.ts index 3996c405..8c0244b3 100644 --- a/packages/opencode-plugin/src/__tests__/e2e/bg-notifications.test.ts +++ b/packages/opencode-plugin/src/__tests__/e2e/bg-notifications.test.ts @@ -1,18 +1,18 @@ /// import { afterAll, afterEach, beforeAll, describe, expect, mock, test } from "bun:test"; -import { BridgePool } from "@cortexkit/aft-bridge"; +import { BridgePool } from "../../../../aft-bridge/src/index.js"; import type { ToolContext } from "@opencode-ai/plugin"; -// Mock the live-server SDK factory + wake-availability decision so the -// wake path can route promptAsync to a test stub. The real implementation +// Mock live-server SDK factory + wake-availability decision so wake path can +// route sync `prompt` to test stub. Real implementation // builds a `createOpencodeClient` pointed at `input.serverUrl`, which is // not available in this real-bridge e2e harness (no OpenCode HTTP server // fixture). // -// Post-v0.29, when `useLiveServerWake()` returns false, the wake path -// falls back to `drainContext.client.session.promptAsync`. We pin it to -// `true` here so this e2e keeps exercising the workaround path; a +// When `useLiveServerWake()` returns false, wake path falls back to +// `drainContext.client.session.prompt(...)` or degraded `.promptAsync`. +// We pin it to `true` here so this e2e keeps exercising live wake path; a // dedicated unit test covers the fallback branch in // `__tests__/bg-notifications.test.ts`. let e2eLiveServerClient: unknown = null; @@ -47,6 +47,8 @@ import { __resetBgNotificationStateForTests, appendInTurnBgCompletions, handleIdleBgCompletions, + handlePushedBgCompletion, + handlePushedBgLongRunning, sessionBgStates, trackBgTask, } from "../../bg-notifications.js"; @@ -133,7 +135,7 @@ maybeDescribe("e2e bg notifications (OpenCode adapter + bridge + Rust)", () => { expect(output.output).not.toContain(": printf done"); }); - test("turn-end wake sends promptAsync through OpenCode client", async () => { + test("turn-end wake sends prompt through OpenCode client", async () => { const { h, ctx, bash } = await pluginHarness(); const taskId = await spawnBackground(h, bash, "printf idle-done"); const promptCalls: unknown[] = []; @@ -142,7 +144,7 @@ maybeDescribe("e2e bg notifications (OpenCode adapter + bridge + Rust)", () => { // try to reach `serverUrl` over HTTP — see anomalyco/opencode#28202. setE2ELiveServerClient({ session: { - promptAsync: async (payload: unknown) => { + prompt: async (payload: unknown) => { promptCalls.push(payload); }, messages: async () => ({ data: [] }), @@ -168,6 +170,111 @@ maybeDescribe("e2e bg notifications (OpenCode adapter + bridge + Rust)", () => { expect(text).toContain(" idle-done"); expect(text).not.toContain(": printf idle-done"); }); + + test("turn-end urgent failure wake uses failed reminder class", async () => { + const { h, ctx, bash } = await pluginHarness(); + const taskId = await spawnBackground(h, bash, "sh -c 'printf boom >&2; exit 1'"); + const promptCalls: unknown[] = []; + setE2ELiveServerClient({ + session: { + prompt: async (payload: unknown) => { + promptCalls.push(payload); + }, + messages: async () => ({ data: [] }), + }, + }); + + await waitUntil(async () => { + await handleIdleBgCompletions({ + ctx, + directory: h.tempDir, + sessionID: "e2e-session", + client: {}, + serverUrl: "http://127.0.0.1:0/", + }); + return promptCalls.length > 0 || hasScheduledBgWake(); + }); + await waitUntil(() => promptCalls.length > 0, 5_000); + + const text = (promptCalls[0] as { body: { parts: Array<{ text: string }> } }).body.parts[0] + .text; + expect(text).toContain("[BACKGROUND BASH FAILED]"); + expect(text).toContain(`- task ${taskId} (exit 1)`); + expect(text).toContain("boom"); + }); + + test("autonomous long-running wake followed by completion wake fires twice", async () => { + const { h, ctx, bash } = await pluginHarness(); + const taskId = await spawnBackground(h, bash, "sh -c 'sleep 0.2; printf done'"); + const promptCalls: unknown[] = []; + setE2ELiveServerClient({ + session: { + prompt: async (payload: unknown) => { + promptCalls.push(payload); + }, + messages: async () => ({ data: [] }), + }, + }); + + await handlePushedBgLongRunning( + { + ctx, + directory: h.tempDir, + sessionID: "e2e-session", + client: {}, + serverUrl: "http://127.0.0.1:0/", + }, + { + task_id: taskId, + session_id: "e2e-session", + command: "sleep 0.2; printf done", + elapsed_ms: 30_000, + }, + ); + await waitUntil(() => promptCalls.length >= 1, 5_000); + + const firstText = (promptCalls[0] as { body: { parts: Array<{ text: string }> } }).body.parts[0] + .text; + expect(firstText).toContain("[BACKGROUND BASH STILL RUNNING]"); + expect(firstText).not.toContain("[BACKGROUND BASH COMPLETED]"); + expect(sessionBgStates.get("e2e-session")?.wakeDeferredTaskIds.has(taskId)).toBe(false); + + const bridge = ctx.pool.getActiveBridgeForRoot(h.tempDir) ?? ctx.pool.getBridge(h.tempDir); + let pending: unknown; + await waitUntil(async () => { + const response = await bridge.send("bash_drain_completions", { session_id: "e2e-session" }); + const completions = Array.isArray((response as { bg_completions?: unknown[] }).bg_completions) + ? (response as { bg_completions: unknown[] }).bg_completions + : []; + pending = completions.find( + (completion) => + !!completion && + typeof completion === "object" && + (completion as { task_id?: unknown }).task_id === taskId, + ); + return pending !== undefined; + }, 5_000); + + expect(pending).toBeDefined(); + + await handlePushedBgCompletion( + { + ctx, + directory: h.tempDir, + sessionID: "e2e-session", + client: {}, + serverUrl: "http://127.0.0.1:0/", + }, + pending, + ); + await waitUntil(() => promptCalls.length >= 2, 5_000); + + const secondText = (promptCalls[1] as { body: { parts: Array<{ text: string }> } }).body + .parts[0].text; + expect(secondText).toContain("[BACKGROUND BASH COMPLETED]"); + expect(secondText).not.toContain("[BACKGROUND BASH STILL RUNNING]"); + expect(secondText).toContain(`- task ${taskId} (exit 0)`); + }); }); async function spawnBackground( diff --git a/packages/opencode-plugin/src/__tests__/live-server-client.test.ts b/packages/opencode-plugin/src/__tests__/live-server-client.test.ts index 44856dc6..5d1ca9cc 100644 --- a/packages/opencode-plugin/src/__tests__/live-server-client.test.ts +++ b/packages/opencode-plugin/src/__tests__/live-server-client.test.ts @@ -9,9 +9,21 @@ import { } from "../shared/live-server-client.js"; const originalFetch = globalThis.fetch; +const originalServerPassword = process.env.OPENCODE_SERVER_PASSWORD; +const originalServerUsername = process.env.OPENCODE_SERVER_USERNAME; afterEach(() => { globalThis.fetch = originalFetch; + if (originalServerPassword === undefined) { + delete process.env.OPENCODE_SERVER_PASSWORD; + } else { + process.env.OPENCODE_SERVER_PASSWORD = originalServerPassword; + } + if (originalServerUsername === undefined) { + delete process.env.OPENCODE_SERVER_USERNAME; + } else { + process.env.OPENCODE_SERVER_USERNAME = originalServerUsername; + } __resetLiveServerWakeForTests(); }); @@ -22,17 +34,34 @@ describe("probeServerReachable", () => { await expect(probeServerReachable("http://127.0.0.1:4096/")).resolves.toBe(true); }); - test("accepts auth-protected listeners but rejects plain-TUI 404s", async () => { + test("rejects 401/403 auth-protected listeners without usable env auth", async () => { stubFetch(401); - await expect(probeServerReachable("http://127.0.0.1:4096/")).resolves.toBe(true); + await expect(probeServerReachable("http://127.0.0.1:4096/")).resolves.toBe(false); stubFetch(403); - await expect(probeServerReachable("http://127.0.0.1:4097/")).resolves.toBe(true); + await expect(probeServerReachable("http://127.0.0.1:4097/")).resolves.toBe(false); stubFetch(404); await expect(probeServerReachable("http://127.0.0.1:4098/")).resolves.toBe(false); }); + test("sends env-derived Authorization header and accepts 2xx when auth succeeds", async () => { + process.env.OPENCODE_SERVER_USERNAME = "oracle"; + process.env.OPENCODE_SERVER_PASSWORD = "secret"; + + let callCount = 0; + testFetch(async (_input, init) => { + callCount += 1; + expect(init?.headers).toEqual({ + Authorization: `Basic ${Buffer.from("oracle:secret").toString("base64")}`, + }); + return new Response(null, { status: 204 }); + }); + + await expect(probeServerReachable("http://127.0.0.1:4096/")).resolves.toBe(true); + expect(callCount).toBe(1); + }); + test("records reachability per serverUrl", async () => { setLiveServerWakeAvailable("http://127.0.0.1:4096/", true); setLiveServerWakeAvailable("http://127.0.0.1:4097/", false); @@ -49,8 +78,19 @@ describe("probeServerReachable", () => { expect(useLiveServerWake("http://127.0.0.1:4096/")).toBe(true); expect(useLiveServerWake("http://127.0.0.1:4097/")).toBe(false); }); + + test("disabled config forces in-process fallback decision", () => { + setLiveServerWakeAvailable("http://127.0.0.1:4096/", true); + + expect(useLiveServerWake("http://127.0.0.1:4096/", false)).toBe(false); + }); }); function stubFetch(status: number): void { globalThis.fetch = (async () => new Response(null, { status })) as typeof fetch; } + +function testFetch(fn: typeof fetch): typeof fetch { + globalThis.fetch = fn; + return fn; +} diff --git a/packages/opencode-plugin/src/__tests__/resolve-bash-config.test.ts b/packages/opencode-plugin/src/__tests__/resolve-bash-config.test.ts index f032ba45..617a951a 100644 --- a/packages/opencode-plugin/src/__tests__/resolve-bash-config.test.ts +++ b/packages/opencode-plugin/src/__tests__/resolve-bash-config.test.ts @@ -198,4 +198,22 @@ describe("resolveBashConfig", () => { ); expect(r.long_running_reminder_interval_ms).toBe(1000); }); + + test("wake delivery tuning on top-level carries through", () => { + const r = resolveBashConfig( + cfg({ + bash: { + deferred_completion_fallback_ms: 750, + wake_retry_max_attempts: 3, + wake_debounce_step_ms: 100, + wake_debounce_cap_ms: 900, + }, + }), + ); + expect(r.deferred_completion_fallback_ms).toBe(750); + expect(r.wake_retry_max_attempts).toBe(3); + expect(r.wake_debounce_step_ms).toBe(100); + expect(r.wake_debounce_cap_ms).toBe(900); + }); + }); diff --git a/packages/opencode-plugin/src/bg-notifications.ts b/packages/opencode-plugin/src/bg-notifications.ts index 16dc69c8..f53584c1 100644 --- a/packages/opencode-plugin/src/bg-notifications.ts +++ b/packages/opencode-plugin/src/bg-notifications.ts @@ -38,6 +38,8 @@ export interface BgCompletion { output_path?: string; } +type ReminderClass = "completion" | "urgent_failure" | "timer" | "pattern_match"; + export interface PatternMatchEntry { task_id: string; session_id: string; @@ -69,6 +71,7 @@ type SessionBgState = { firstCompletionAt: number | null; scheduledFireAt: number | null; scheduledCompletionCount: number; + scheduledReminderClass: ReminderClass | null; retryDelayMs: number | null; wakeRetryAttempts: number; wakeHardStopped: boolean; @@ -82,6 +85,9 @@ type SessionBgState = { * append and the next session.idle still deliver normally. */ wakeDeferredTaskIds: Set; + deferredCompletionTimer: NodeJS.Timeout | null; + deferredCompletionDueByTask: Map; + deferredCompletionContext: (DrainContext & { client: unknown }) | null; /** * Task IDs whose completions were consumed inline by an explicit * `bash_status({ exit: true, ... })` wait. The bash_completed push @@ -107,7 +113,8 @@ export const sessionBgStates: Map = new Map(); export const SESSION_BG_STATE_IDLE_TTL_MS = 60 * 60 * 1000; const DEBOUNCE_STEP_MS = 200; const DEBOUNCE_CAP_MS = 1000; -const MAX_WAKE_SEND_ATTEMPTS = 5; +export const DEFERRED_COMPLETION_FALLBACK_MS = 500; +export const WAKE_RETRY_MAX_ATTEMPTS = 5; const UNKNOWN_COMPLETION_TTL_MS = 5000; const UNKNOWN_COMPLETION_CAP = 32; const DEFAULT_SESSION_ID = "__default__"; @@ -119,12 +126,13 @@ interface DrainContext { sessionID: string; /** * Plugin-provided OpenCode SDK client (`input.client`). The wake path - * uses this as a fallback when `useLiveServerWake()` is false — i.e. - * the live HTTP listener was unreachable when probed at plugin init, - * so `getLiveServerClient(...)` cannot be built. Falling back here - * accepts the upstream `promptAsync` runner-split bug - * (anomalyco/opencode#28202; duplicate "stop" messages) in exchange - * for wakes still arriving at all in plain-TUI sessions. + * uses this only as fallback when `useLiveServerWake()` is false or when + * live HTTP wake fails. Preferred wake path is live listener because + * synchronous `session.prompt(...)` resolution is our delivery proof that + * OpenCode accepted wake work. Fallback here keeps wakes flowing when live + * path is unavailable, but `promptAsync` still has false-ack semantics and + * upstream runner-split bug (anomalyco/opencode#28202; duplicate "stop" + * messages). * * Typed `unknown` because the real `@opencode-ai/sdk` `OpencodeClient` * has a narrower, generated `promptAsync` signature than the loose @@ -142,15 +150,83 @@ interface DrainContext { * wake path falls back to `client` above; this URL is unused. */ serverUrl?: string; + deferredCompletionFallbackMs?: number; + wakeRetryMaxAttempts?: number; + wakeDebounceStepMs?: number; + wakeDebounceCapMs?: number; } interface OpenCodeClient { session?: { + prompt?: (input: unknown) => Promise | unknown; promptAsync?: (input: unknown) => Promise | unknown; messages?: (input: { path: { id: string } }) => Promise<{ data?: unknown[] }>; }; } +interface WakeCorrelationMeta { + deliveryID: string; + requestHeaders?: Record; +} + +type SdkPromptResponseLike = { + error?: unknown; + response?: { + ok?: boolean; + status?: number; + statusText?: string; + }; +}; + +function createWakeCorrelationMeta( + clientPath: "live-server" | "in-process-fallback", +): WakeCorrelationMeta { + const deliveryID = `aftdel_${randomUUID()}`; + return { + deliveryID, + requestHeaders: + clientPath === "live-server" + ? { + "x-aft-delivery-id": deliveryID, + } + : undefined, + }; +} + +function formatWakePromptFailure(result: unknown): string | null { + if (!result || typeof result !== "object") return null; + const candidate = result as SdkPromptResponseLike & { request?: unknown }; + const hasError = "error" in candidate && candidate.error != null; + const response = candidate.response; + const responseOk = response?.ok; + const hasBadResponse = responseOk === false; + if (!hasError && !hasBadResponse) return null; + + const status = typeof response?.status === "number" ? response.status : undefined; + const statusText = typeof response?.statusText === "string" ? response.statusText : undefined; + let detail: string | undefined; + if (typeof candidate.error === "string") { + detail = candidate.error; + } else if (candidate.error instanceof Error) { + detail = candidate.error.message; + } else if (candidate.error != null) { + try { + detail = JSON.stringify(candidate.error); + } catch { + detail = String(candidate.error); + } + } + + const parts = ["wake prompt returned error"]; + if (status !== undefined) { + parts.push(`HTTP ${status}${statusText ? ` ${statusText}` : ""}`); + } else if (statusText) { + parts.push(statusText); + } + if (detail) parts.push(detail); + return parts.join(": "); +} + /** * Mark a bg task's completion as consumed by an explicit bash_status wait. * Removes it from pendingCompletions so the next wake/in-turn drain @@ -163,7 +239,9 @@ export function consumeBgCompletion(sessionID: string | undefined, taskId: strin // entry there to drop it. const state = stateFor(sessionID); state.pendingCompletions = state.pendingCompletions.filter((c) => c.task_id !== taskId); - state.wakeDeferredTaskIds.delete(taskId); + prunePendingLongRunningForTask(state, taskId); + resetWakeHardStopForRecovery(state); + clearDeferredCompletionForTask(state, taskId); if (!state.consumedTaskIds.has(taskId)) { state.consumedTaskIds.add(taskId); state.consumedTaskOrder.push(taskId); @@ -205,7 +283,7 @@ export async function markBgCompletionDelivered( export function markTaskWaiting(sessionID: string | undefined, taskId: string): void { const state = stateFor(sessionID); state.pendingCompletions = state.pendingCompletions.filter((c) => c.task_id !== taskId); - state.wakeDeferredTaskIds.delete(taskId); + clearDeferredCompletionForTask(state, taskId); if (state.consumedTaskIds.has(taskId)) { clearWakeTimerIfNoPending(state); return; @@ -231,7 +309,7 @@ export function markTaskWaiting(sessionID: string | undefined, taskId: string): */ export function unmarkTaskWaiting(sessionID: string | undefined, taskId: string): void { const state = stateFor(sessionID); - state.wakeDeferredTaskIds.delete(taskId); + clearDeferredCompletionForTask(state, taskId); if (!state.consumedTaskIds.has(taskId)) return; state.consumedTaskIds.delete(taskId); const idx = state.consumedTaskOrder.indexOf(taskId); @@ -247,11 +325,14 @@ export function trackBgTask(sessionID: string | undefined, taskId: string): void (entry) => entry.completion.task_id !== taskId, ); if (buffered.length > 0) { + const seenTaskIds = new Set(state.pendingCompletions.map((pending) => pending.task_id)); for (const entry of buffered) { - if (!state.pendingCompletions.some((pending) => pending.task_id === taskId)) { - state.pendingCompletions.push(entry.completion); - } + acceptTerminalCompletion(state, entry.completion); + if (seenTaskIds.has(entry.completion.task_id)) continue; + state.pendingCompletions.push(entry.completion); + seenTaskIds.add(entry.completion.task_id); } + scheduleDeferredCompletionFallback(state, taskId); return; } state.outstandingTaskIds.add(taskId); @@ -275,8 +356,8 @@ export function markExplicitControl( if (idx >= 0) { const completion = state.pendingCompletions[idx]; state.pendingCompletions.splice(idx, 1); - queuePendingPatternMatch(state, completionToExitPattern(completion, true)); - state.wakeDeferredTaskIds.delete(taskId); + acceptTerminalExitPattern(state, completion); + clearDeferredCompletionForTask(state, taskId); } } @@ -349,7 +430,7 @@ export function ingestBgCompletions( if (state.explicitControlTasks.has(completion.task_id)) { state.outstandingTaskIds.delete(completion.task_id); state.explicitControlTasks.delete(completion.task_id); - queuePendingPatternMatch(state, completionToExitPattern(completion, true)); + acceptTerminalExitPattern(state, completion); continue; } if (!state.outstandingTaskIds.has(completion.task_id)) { @@ -357,6 +438,7 @@ export function ingestBgCompletions( continue; } state.outstandingTaskIds.delete(completion.task_id); + acceptTerminalCompletion(state, completion); if ( !state.pendingCompletions.some((pending) => pending.task_id === completion.task_id) && !accepted.some((pending) => pending.task_id === completion.task_id) @@ -372,7 +454,18 @@ export async function handlePushedBgCompletion( drainContext: DrainContext & { client: unknown }, completion: unknown, ): Promise { + const state = stateFor(drainContext.sessionID); + const taskId = isBgCompletion(completion) ? completion.task_id : undefined; + sessionDebug(drainContext.sessionID, `${LOG_PREFIX} push completion`, { + event: "bash_completion_push_ingress", + kind: "completion", + task_id: taskId ?? null, + deferred: taskId ? state.wakeDeferredTaskIds.has(taskId) : null, + outstanding: taskId ? state.outstandingTaskIds.has(taskId) : null, + }); ingestBgCompletions(drainContext.sessionID, [completion]); + state.deferredCompletionContext = drainContext; + scheduleDeferredCompletionFallback(state, taskId); await triggerWakeIfPending(drainContext, true, false); } @@ -380,16 +473,47 @@ export async function handlePushedBgLongRunning( drainContext: DrainContext & { client: unknown }, reminder: BgLongRunningReminder, ): Promise { - stateFor(drainContext.sessionID).pendingLongRunning.push(reminder); + const state = stateFor(drainContext.sessionID); + sessionDebug(drainContext.sessionID, `${LOG_PREFIX} push long-running`, { + event: "bash_completion_push_ingress", + kind: "long_running", + task_id: reminder.task_id, + deferred: state.wakeDeferredTaskIds.has(reminder.task_id), + outstanding: state.outstandingTaskIds.has(reminder.task_id), + }); + state.pendingLongRunning.push(reminder); await triggerWakeIfPending(drainContext, true); } +function resetWakeHardStopForRecovery(state: SessionBgState): void { + state.wakeHardStopped = false; + state.wakeRetryAttempts = 0; + state.retryDelayMs = null; +} + +function prunePendingLongRunningForTask(state: SessionBgState, taskId: string): void { + state.pendingLongRunning = state.pendingLongRunning.filter( + (reminder) => reminder.task_id !== taskId, + ); +} + +function acceptTerminalCompletion(state: SessionBgState, completion: BgCompletion): void { + prunePendingLongRunningForTask(state, completion.task_id); + resetWakeHardStopForRecovery(state); +} + +function acceptTerminalExitPattern(state: SessionBgState, completion: BgCompletion): void { + acceptTerminalCompletion(state, completion); + queuePendingPatternMatch(state, completionToExitPattern(completion, true)); +} + export async function appendInTurnBgCompletions( drainContext: DrainContext, output: { output?: string } | undefined, ): Promise { if (!output) return; const state = stateFor(drainContext.sessionID); + promoteBufferedUnknownCompletions(state, Date.now()); if ( state.outstandingTaskIds.size === 0 && state.pendingCompletions.length === 0 && @@ -428,7 +552,7 @@ export async function appendInTurnBgCompletions( ); output.output = appendReminder(output.output ?? "", reminder); // Trace #7 of 7: reminder went out as part of an existing tool result - // instead of through promptAsync. NO wake_prompt_async_start event + // instead of through wake client call. NO bash_completion_wake_send_start event // accompanies this branch — that's the diagnostic signal that the // reminder reached the model via tool-result piggyback. sessionLog(drainContext.sessionID, `${LOG_PREFIX} in-turn append`, { @@ -440,11 +564,10 @@ export async function appendInTurnBgCompletions( }); state.pendingCompletions = []; for (const completion of deliveredCompletions) { - state.wakeDeferredTaskIds.delete(completion.task_id); + clearDeferredCompletionForTask(state, completion.task_id); } state.pendingLongRunning = []; state.pendingPatternMatches = []; - state.retryDelayMs = null; state.wakeRetryAttempts = 0; state.wakeHardStopped = false; await ackCompletions(drainContext, completionAcks); @@ -453,13 +576,24 @@ export async function appendInTurnBgCompletions( // build an empty-body system-reminder ("[BACKGROUND BASH STILL RUNNING]" // with no bullets) since the timer reads `state.pendingLongRunning` // again at fire time. - clearWakeTimerIfNoPending(state); + if (state.debounceTimer) { + clearTimeout(state.debounceTimer); + state.debounceTimer = null; + state.firstCompletionAt = null; + state.scheduledFireAt = null; + state.scheduledCompletionCount = 0; + state.scheduledReminderClass = null; + } } export async function handleIdleBgCompletions( drainContext: DrainContext & { client: unknown }, ): Promise { - stateFor(drainContext.sessionID).wakeDeferredTaskIds.clear(); + const state = stateFor(drainContext.sessionID); + state.deferredCompletionContext = drainContext; + promoteBufferedUnknownCompletions(state, Date.now()); + state.wakeDeferredTaskIds.clear(); + clearDeferredCompletionState(state); await triggerWakeIfPending(drainContext, false, true); } @@ -484,7 +618,24 @@ async function triggerWakeIfPending( await drainCompletions(drainContext); } routeExplicitControlCompletions(state); - if (!hasWakeEligiblePending(state, includeDeferredCompletions)) return; + if (!hasWakeEligiblePending(state, includeDeferredCompletions)) { + const singleKnownTaskId = + state.pendingCompletions[0]?.task_id ?? + state.pendingLongRunning[0]?.task_id ?? + state.pendingPatternMatches[0]?.task_id ?? + null; + sessionDebug(drainContext.sessionID, `${LOG_PREFIX} wake skipped; no eligible pending`, { + event: "bash_completion_wake_no_eligible_pending", + include_deferred_completions: includeDeferredCompletions, + pending_completions: state.pendingCompletions.length, + wake_eligible_completions: wakeEligibleCompletions(state, includeDeferredCompletions).length, + pending_long_running: state.pendingLongRunning.length, + pending_pattern_matches: state.pendingPatternMatches.length, + task_id: singleKnownTaskId, + deferred: singleKnownTaskId ? state.wakeDeferredTaskIds.has(singleKnownTaskId) : null, + }); + return; + } scheduleWake( state, @@ -513,9 +664,13 @@ async function triggerWakeIfPending( const sendPrompt = async ( client: OpenCodeClient, clientPath: "live-server" | "in-process-fallback", + method: "prompt" | "promptAsync", + correlation: WakeCorrelationMeta = createWakeCorrelationMeta(clientPath), ): Promise => { - if (typeof client.session?.promptAsync !== "function") { - throw new Error(`wake client.session.promptAsync is unavailable (path=${clientPath})`); + const promptMethod = client.session?.[method]; + const session = client.session; + if (typeof promptMethod !== "function") { + throw new Error(`wake client.session.${method} is unavailable (path=${clientPath})`); } // Pass the previous turn's prompt context (agent + model + variant) // explicitly. OpenCode's `createUserMessage` resolves variant @@ -539,28 +694,33 @@ async function triggerWakeIfPending( } if (promptContext?.variant) body.variant = promptContext.variant; - // Trace #3 of 7: about to call promptAsync. The deliveryID uniquely - // identifies this single promptAsync invocation across the rest of + // Trace #3 of 7: about to call wake client method. The deliveryID uniquely + // identifies this single wake invocation across the rest of // the trace chain (#3 start → #4 ok / #5 error → #6 ack_ok). One // deliveryID = one HTTP POST to OpenCode's session prompt endpoint. // When the DB shows multiple assistant children but logs show one // start event with this deliveryID, the duplication is downstream // of AFT. - const deliveryID = `aftdel_${randomUUID()}`; + const { deliveryID, requestHeaders } = correlation; const wakeMeta = { delivery_id: deliveryID, + correlation_header: + requestHeaders && Object.keys(requestHeaders).length > 0 + ? Object.keys(requestHeaders)[0] + : null, attempt: state.wakeRetryAttempts + 1, task_ids: taskIDs, directory: drainContext.directory, reminder_sha256: hashReminder(reminder), reminder_chars: reminder.length, - // `live-server` = wake POSTed through `createOpencodeClient` aimed - // at `input.serverUrl` (anomalyco/opencode#28202 workaround, no - // duplicate runs). `in-process-fallback` = wake POSTed through - // `input.client.session.promptAsync` because the live listener - // wasn't reachable at startup or failed mid-session; this accepts - // the upstream bug so wakes still arrive instead of hard-stopping. + // `live-server` = wake sent through `createOpencodeClient` aimed at + // `input.serverUrl`; `prompt` resolution on this path is delivery + // proof that live listener accepted prompt. `in-process-fallback` = + // wake sent through plugin-provided client because live listener was + // unavailable or failed mid-session; this path prefers sync `prompt` + // too, but may degrade to `promptAsync` when older client shape lacks it. wake_client_path: clientPath, + wake_client_method: method, prompt_context: promptContext ? { agent: promptContext.agent, @@ -574,45 +734,56 @@ async function triggerWakeIfPending( } : null, }; - sessionLog(drainContext.sessionID, `${LOG_PREFIX} wake promptAsync start`, { - event: "bash_completion_wake_prompt_async_start", + sessionLog(drainContext.sessionID, `${LOG_PREFIX} wake send start`, { + event: "bash_completion_wake_send_start", ...wakeMeta, }); try { - await client.session.promptAsync({ + const result = await promptMethod.call(session, { path: { id: drainContext.sessionID }, body, + throwOnError: true, }); + const sdkFailure = formatWakePromptFailure(result); + if (sdkFailure) throw new Error(sdkFailure); } catch (err) { - // Trace #5 of 7: promptAsync rejected. Counted toward - // MAX_WAKE_SEND_ATTEMPTS by the catch in scheduleWake unless a + // Trace #5 of 7: wake client method rejected. Counted toward + // wake_retry_max_attempts by the catch in scheduleWake unless a // live-server failure can be delivered by the in-process fallback // below. Re-throw so the retry/fallback path runs. const logPromptError = clientPath === "live-server" ? sessionDebug : sessionWarn; - logPromptError(drainContext.sessionID, `${LOG_PREFIX} wake promptAsync error`, { - event: "bash_completion_wake_prompt_async_error", + logPromptError(drainContext.sessionID, `${LOG_PREFIX} wake send error`, { + event: "bash_completion_wake_send_error", delivery_id: deliveryID, attempt: state.wakeRetryAttempts + 1, task_ids: taskIDs, wake_client_path: clientPath, + wake_client_method: method, error: err instanceof Error ? err.message : String(err), }); throw err; } - // Trace #4 of 7: promptAsync resolved. OpenCode has accepted the - // synthetic user message and will run the agent turn. A subsequent - // assistant child with finish="stop" should appear in OpenCode's - // DB for this parent user message; if MORE than one appears for - // the same parent + reminder_sha256, the duplication is in the - // OpenCode runner, not in AFT (only one promptAsync call exists - // with this deliveryID in the log). - sessionLog(drainContext.sessionID, `${LOG_PREFIX} wake promptAsync ok`, { - event: "bash_completion_wake_prompt_async_ok", + // Trace #4 of 7: wake client method resolved. For live-server `prompt`, + // this is delivery proof that OpenCode accepted wake on live listener. + // For degraded `promptAsync`, resolution is weaker transport-only proof. + sessionLog(drainContext.sessionID, `${LOG_PREFIX} wake send resolved`, { + event: "bash_completion_wake_send_ok", delivery_id: deliveryID, attempt: state.wakeRetryAttempts + 1, task_ids: taskIDs, wake_client_path: clientPath, + wake_client_method: method, }); + if (method === "promptAsync") { + sessionWarn(drainContext.sessionID, `${LOG_PREFIX} wake degraded delivery`, { + event: "bash_completion_wake_degraded_delivery", + delivery_id: deliveryID, + attempt: state.wakeRetryAttempts + 1, + task_ids: taskIDs, + wake_client_path: clientPath, + wake_client_method: method, + }); + } return deliveryID; }; @@ -624,12 +795,25 @@ async function triggerWakeIfPending( // in-process client before spending the scheduler retry budget. if (useLiveServerWake(drainContext.serverUrl) && drainContext.serverUrl) { try { + const { deliveryID, requestHeaders } = createWakeCorrelationMeta("live-server"); const liveClient = getLiveServerClient( drainContext.serverUrl, drainContext.directory, + requestHeaders, ) as OpenCodeClient; - const deliveryID = await sendPrompt(liveClient, "live-server"); - await ackCompletions(drainContext, deliveredCompletions, deliveryID); + const deliveryIDResolved = await sendPrompt(liveClient, "live-server", "prompt", { + deliveryID, + requestHeaders, + }); + if (deliveryIDResolved !== deliveryID) { + sessionDebug(drainContext.sessionID, `${LOG_PREFIX} delivery correlation mismatch`, { + event: "bash_completion_wake_delivery_correlation_mismatch", + expected_delivery_id: deliveryID, + actual_delivery_id: deliveryIDResolved, + wake_client_path: "live-server", + }); + } + await ackCompletions(drainContext, deliveredCompletions, deliveryIDResolved); return; } catch (err) { setLiveServerWakeAvailable(drainContext.serverUrl, false); @@ -650,7 +834,13 @@ async function triggerWakeIfPending( }, ); const fallbackClient = getInProcessClient(); - const deliveryID = await sendPrompt(fallbackClient, "in-process-fallback"); + const fallbackMethod = + typeof fallbackClient.session?.prompt === "function" ? "prompt" : "promptAsync"; + const deliveryID = await sendPrompt( + fallbackClient, + "in-process-fallback", + fallbackMethod, + ); // This delivery succeeded by switching transports; do not carry // over retry attempts spent on the now-demoted live-server path. state.retryDelayMs = null; @@ -662,31 +852,35 @@ async function triggerWakeIfPending( } const fallbackClient = getInProcessClient(); - const deliveryID = await sendPrompt(fallbackClient, "in-process-fallback"); + const fallbackMethod = + typeof fallbackClient.session?.prompt === "function" ? "prompt" : "promptAsync"; + const deliveryID = await sendPrompt(fallbackClient, "in-process-fallback", fallbackMethod); await ackCompletions(drainContext, deliveredCompletions, deliveryID); }, (err, hardStopped) => { sessionWarn( drainContext.sessionID, hardStopped - ? `${LOG_PREFIX} wake send failed ${MAX_WAKE_SEND_ATTEMPTS} times; stopping retries: ${err instanceof Error ? err.message : String(err)}` + ? `${LOG_PREFIX} wake send failed ${(drainContext.wakeRetryMaxAttempts ?? WAKE_RETRY_MAX_ATTEMPTS)} times; stopping retries: ${err instanceof Error ? err.message : String(err)}` : `${LOG_PREFIX} wake send failed: ${err instanceof Error ? err.message : String(err)}`, ); }, drainContext.sessionID, includeDeferredCompletions, + drainContext.wakeRetryMaxAttempts ?? WAKE_RETRY_MAX_ATTEMPTS, + drainContext.wakeDebounceStepMs ?? DEBOUNCE_STEP_MS, + drainContext.wakeDebounceCapMs ?? DEBOUNCE_CAP_MS, ); } export function formatSystemReminder(completions: readonly BgCompletion[]): string { - const bullets = completions.map((completion) => formatCompletion(completion)).join("\n"); - // Only point at bash_status when at least one completion is truncated; - // for fully-captured short outputs the agent already has the full result. - const anyTruncated = completions.some((c) => c.output_truncated === true); - const tail = anyTruncated - ? `\n\nFor truncated tasks, use bash_status({ taskId: "..." }) to retrieve full output.` - : ""; - return `\n[BACKGROUND BASH COMPLETED]\n${bullets}${tail}\n`; + const urgent = completions.filter(isUrgentCompletion); + const normal = completions.filter((completion) => !isUrgentCompletion(completion)); + const sections: string[] = []; + if (urgent.length > 0) sections.push(renderCompletionSection("BACKGROUND BASH FAILED", urgent)); + if (normal.length > 0) + sections.push(renderCompletionSection("BACKGROUND BASH COMPLETED", normal)); + return sections.join("\n"); } export function formatLongRunningReminder(reminders: readonly BgLongRunningReminder[]): string { @@ -743,6 +937,7 @@ export function extractSessionID(value: unknown): string | undefined { export function __resetBgNotificationStateForTests(): void { for (const state of sessionBgStates.values()) { if (state.debounceTimer) clearTimeout(state.debounceTimer); + if (state.deferredCompletionTimer) clearTimeout(state.deferredCompletionTimer); } sessionBgStates.clear(); } @@ -760,7 +955,12 @@ async function drainCompletions({ ctx, directory, sessionID }: DrainContext): Pr return; } state.forcedDrainCompleted = true; - ingestDrainedBgCompletions(sessionID, response.bg_completions); + const accepted = ingestDrainedBgCompletions(sessionID, response.bg_completions); + sessionDebug(sessionID, `${LOG_PREFIX} drain ok`, { + event: "bash_completion_drain_ok", + accepted_count: accepted.length, + accepted_task_ids: accepted.map((completion) => completion.task_id), + }); } catch (err) { sessionWarn( sessionID, @@ -832,20 +1032,95 @@ function wakeEligibleCompletions( function clearWakeTimerIfNoPending(state: SessionBgState): void { if ( - state.pendingCompletions.length > 0 || - state.pendingLongRunning.length > 0 || - state.pendingPatternMatches.length > 0 + state.pendingCompletions.length === 0 && + state.pendingLongRunning.length === 0 && + state.pendingPatternMatches.length === 0 && + state.debounceTimer ) { + clearTimeout(state.debounceTimer); + state.debounceTimer = null; + state.firstCompletionAt = null; + state.scheduledFireAt = null; + state.scheduledCompletionCount = 0; + state.scheduledReminderClass = null; + } + if (state.pendingCompletions.length === 0) clearDeferredCompletionState(state); +} + +function clearDeferredCompletionTimer(state: SessionBgState): void { + if (state.deferredCompletionTimer) clearTimeout(state.deferredCompletionTimer); + state.deferredCompletionTimer = null; +} + +function clearDeferredCompletionState(state: SessionBgState): void { + clearDeferredCompletionTimer(state); + state.deferredCompletionDueByTask.clear(); +} + +function clearDeferredCompletionForTask(state: SessionBgState, taskId: string): void { + state.wakeDeferredTaskIds.delete(taskId); + state.deferredCompletionDueByTask.delete(taskId); + if (state.deferredCompletionDueByTask.size === 0) { + clearDeferredCompletionTimer(state); return; } - if (state.debounceTimer) clearTimeout(state.debounceTimer); - state.debounceTimer = null; - state.firstCompletionAt = null; - state.scheduledFireAt = null; - state.scheduledCompletionCount = 0; - state.retryDelayMs = null; - state.wakeRetryAttempts = 0; - state.wakeHardStopped = false; + scheduleDeferredCompletionFallback(state); +} + +function scheduleDeferredCompletionFallback( + state: SessionBgState, + taskId?: string, + now = Date.now(), +): void { + if (taskId) { + const pending = state.pendingCompletions.some((completion) => completion.task_id === taskId); + if (!pending || !state.wakeDeferredTaskIds.has(taskId)) { + state.deferredCompletionDueByTask.delete(taskId); + } else { + const fallbackMs = + state.deferredCompletionContext?.deferredCompletionFallbackMs ?? + DEFERRED_COMPLETION_FALLBACK_MS; + state.deferredCompletionDueByTask.set(taskId, now + fallbackMs); + } + } + + for (const dueTaskId of [...state.deferredCompletionDueByTask.keys()]) { + if ( + !state.wakeDeferredTaskIds.has(dueTaskId) || + !state.pendingCompletions.some((completion) => completion.task_id === dueTaskId) + ) { + state.deferredCompletionDueByTask.delete(dueTaskId); + } + } + + if (state.deferredCompletionDueByTask.size === 0 || !state.deferredCompletionContext) { + clearDeferredCompletionTimer(state); + return; + } + + const nextDueAt = Math.min(...state.deferredCompletionDueByTask.values()); + const delay = Math.max(0, nextDueAt - now); + clearDeferredCompletionTimer(state); + state.deferredCompletionTimer = setTimeout(() => { + const fireNow = Date.now(); + const maturedTaskIds: string[] = []; + for (const [dueTaskId, dueAt] of state.deferredCompletionDueByTask) { + if (dueAt <= fireNow) maturedTaskIds.push(dueTaskId); + } + for (const maturedTaskId of maturedTaskIds) { + state.deferredCompletionDueByTask.delete(maturedTaskId); + state.wakeDeferredTaskIds.delete(maturedTaskId); + } + if (state.deferredCompletionDueByTask.size === 0) { + state.deferredCompletionTimer = null; + } else { + scheduleDeferredCompletionFallback(state, undefined, fireNow); + } + if (maturedTaskIds.length === 0) return; + const context = state.deferredCompletionContext; + if (!context) return; + void triggerWakeIfPending(context, true, false); + }, delay); } function scheduleWake( @@ -854,8 +1129,19 @@ function scheduleWake( onSendFailure: (err: unknown, hardStopped: boolean) => void, sessionID?: string, includeDeferredCompletions = true, + maxWakeSendAttempts = WAKE_RETRY_MAX_ATTEMPTS, + debounceStepMs = DEBOUNCE_STEP_MS, + debounceCapMs = DEBOUNCE_CAP_MS, ): void { - if (state.wakeHardStopped) return; + if (state.wakeHardStopped) { + sessionDebug(sessionID, `${LOG_PREFIX} wake hard-stopped`, { + event: "bash_completion_wake_hard_stopped", + pending_completions: state.pendingCompletions.length, + pending_long_running: state.pendingLongRunning.length, + pending_pattern_matches: state.pendingPatternMatches.length, + }); + return; + } // Race model: JS state changes are synchronous; awaits only happen before scheduling // drains and during final prompt delivery. Multiple hook invocations can interleave // only at those awaits, so we gate timer extension on the pending completion count. @@ -864,20 +1150,27 @@ function scheduleWake( wakeEligibleCompletions(state, includeDeferredCompletions).length + state.pendingLongRunning.length + state.pendingPatternMatches.length; - if (state.debounceTimer && pendingCount <= state.scheduledCompletionCount) { + const reminderClass = reminderClassForPending(state, includeDeferredCompletions); + if (!reminderClass) return; + if ( + state.debounceTimer && + pendingCount <= state.scheduledCompletionCount && + reminderPriority(reminderClass) <= reminderPriority(state.scheduledReminderClass) + ) { return; } if (state.firstCompletionAt === null) { state.firstCompletionAt = now; - state.scheduledFireAt = now + DEBOUNCE_STEP_MS; + state.scheduledFireAt = now + debounceDelayForReminderClass(reminderClass, debounceStepMs); } else { const previousFireAt = state.scheduledFireAt ?? now; - state.scheduledFireAt = Math.min( - previousFireAt + DEBOUNCE_STEP_MS, - state.firstCompletionAt + DEBOUNCE_CAP_MS, - ); + state.scheduledFireAt = + reminderClass === "urgent_failure" + ? now + : Math.min(previousFireAt + debounceStepMs, state.firstCompletionAt + debounceCapMs); } state.scheduledCompletionCount = pendingCount; + state.scheduledReminderClass = reminderClass; if (state.debounceTimer) clearTimeout(state.debounceTimer); const delay = state.retryDelayMs ?? Math.max(0, (state.scheduledFireAt ?? now) - now); @@ -885,7 +1178,7 @@ function scheduleWake( // Trace #1 of 7 for the wake-delivery chain. Pairs with bash_completion_wake_fire. // When the OpenCode DB later shows N assistant children for one parent // user message, the matching count of wake_scheduled / wake_fire / - // wake_prompt_async_start events for the same task_ids tells us whether + // wake_send_start events for same task_ids tells us whether // AFT submitted the prompt once or N times. See // .alfonso/incident-reports/2026-05-21-bash-reminder-duplicate-runs.md. sessionLog(sessionID, `${LOG_PREFIX} wake scheduled`, { @@ -905,6 +1198,7 @@ function scheduleWake( state.firstCompletionAt = null; state.scheduledFireAt = null; state.scheduledCompletionCount = 0; + state.scheduledReminderClass = null; // Defensive: if another path (e.g. appendInTurnBgCompletions) drained the // pending arrays between schedule and fire and didn't cancel us, just // skip — don't ship an empty "[BACKGROUND BASH STILL RUNNING]" shell. @@ -921,9 +1215,9 @@ function scheduleWake( ); // Trace #2 of 7: timer actually fired and we captured a non-empty - // pending set. The matching wake_prompt_async_start MUST follow within - // ~milliseconds — its absence means sendWake threw synchronously - // before reaching client.session.promptAsync. + // pending set. Matching wake_send_start MUST follow within + // ~milliseconds — absence means sendWake threw synchronously + // before reaching client.session.prompt / promptAsync. sessionLog(sessionID, `${LOG_PREFIX} wake fire`, { event: "bash_completion_wake_fire", task_ids: pending.map((c) => c.task_id), @@ -934,10 +1228,12 @@ function scheduleWake( }); const deliveredTaskIds = new Set(pending.map((completion) => completion.task_id)); + const longRunningTaskIds = new Set(pendingLongRunning.map((reminder) => reminder.task_id)); state.pendingCompletions = state.pendingCompletions.filter( (completion) => !deliveredTaskIds.has(completion.task_id), ); - for (const taskId of deliveredTaskIds) state.wakeDeferredTaskIds.delete(taskId); + for (const taskId of deliveredTaskIds) clearDeferredCompletionForTask(state, taskId); + for (const taskId of longRunningTaskIds) clearDeferredCompletionForTask(state, taskId); state.pendingLongRunning = []; state.pendingPatternMatches = []; const completionAcks = completionAcksForDelivery(pending, pendingPatternMatches); @@ -946,24 +1242,33 @@ function scheduleWake( state.retryDelayMs = null; state.wakeRetryAttempts = 0; state.wakeHardStopped = false; + state.scheduledReminderClass = null; }) .catch((err) => { state.pendingCompletions = [...pending, ...state.pendingCompletions]; state.pendingLongRunning = [...pendingLongRunning, ...state.pendingLongRunning]; state.pendingPatternMatches = [...pendingPatternMatches, ...state.pendingPatternMatches]; state.wakeRetryAttempts += 1; - if (state.wakeRetryAttempts >= MAX_WAKE_SEND_ATTEMPTS) { + if (state.wakeRetryAttempts >= maxWakeSendAttempts) { state.retryDelayMs = null; state.wakeHardStopped = true; onSendFailure(err, true); return; } - state.retryDelayMs = Math.min((delay || DEBOUNCE_STEP_MS) * 2, DEBOUNCE_CAP_MS); + state.retryDelayMs = Math.min((delay || debounceStepMs) * 2, debounceCapMs); onSendFailure(err, false); - scheduleWake(state, sendWake, onSendFailure, sessionID, includeDeferredCompletions); + scheduleWake( + state, + sendWake, + onSendFailure, + sessionID, + includeDeferredCompletions, + maxWakeSendAttempts, + debounceStepMs, + debounceCapMs, + ); }); }, delay); - state.debounceTimer.unref?.(); } function _getSessionState(sessionID: string | undefined): SessionBgState | undefined { @@ -987,12 +1292,16 @@ function stateFor(sessionID: string | undefined): SessionBgState { firstCompletionAt: null, scheduledFireAt: null, scheduledCompletionCount: 0, + scheduledReminderClass: null, retryDelayMs: null, wakeRetryAttempts: 0, wakeHardStopped: false, forcedDrainCompleted: false, unknownCompletions: [], wakeDeferredTaskIds: new Set(), + deferredCompletionTimer: null, + deferredCompletionDueByTask: new Map(), + deferredCompletionContext: null, consumedTaskIds: new Set(), consumedTaskOrder: [], lastSeenAt: now, @@ -1016,12 +1325,13 @@ function ingestDrainedBgCompletions( state.outstandingTaskIds.delete(completion.task_id); if (state.explicitControlTasks.has(completion.task_id)) { state.explicitControlTasks.delete(completion.task_id); - queuePendingPatternMatch(state, completionToExitPattern(completion, true)); + acceptTerminalExitPattern(state, completion); continue; } // Suppress completions for tasks already consumed inline by a // bash_status wait (same dedupe as ingestBgCompletions push path). if (state.consumedTaskIds.has(completion.task_id)) continue; + acceptTerminalCompletion(state, completion); if ( !state.pendingCompletions.some((pending) => pending.task_id === completion.task_id) && !accepted.some((pending) => pending.task_id === completion.task_id) @@ -1039,6 +1349,7 @@ function cleanupIdleSessionStates(now: number): void { if (state.outstandingTaskIds.size > 0) continue; if (state.lastSeenAt >= cutoff) continue; if (state.debounceTimer) clearTimeout(state.debounceTimer); + clearDeferredCompletionState(state); sessionBgStates.delete(sessionID); } } @@ -1061,6 +1372,36 @@ function pruneUnknownCompletions(state: SessionBgState, now: number): void { ); } +function promoteBufferedUnknownCompletions(state: SessionBgState, now: number): void { + pruneUnknownCompletions(state, now); + if (state.unknownCompletions.length === 0) return; + + const remaining: Array<{ completion: BgCompletion; receivedAt: number }> = []; + const promoted: BgCompletion[] = []; + const seenTaskIds = new Set(state.pendingCompletions.map((pending) => pending.task_id)); + + for (const entry of state.unknownCompletions) { + const completion = entry.completion; + state.outstandingTaskIds.delete(completion.task_id); + + if (state.consumedTaskIds.has(completion.task_id)) continue; + + if (state.explicitControlTasks.has(completion.task_id)) { + state.explicitControlTasks.delete(completion.task_id); + acceptTerminalExitPattern(state, completion); + continue; + } + + acceptTerminalCompletion(state, completion); + if (seenTaskIds.has(completion.task_id)) continue; + promoted.push(completion); + seenTaskIds.add(completion.task_id); + } + + state.unknownCompletions = remaining; + if (promoted.length > 0) state.pendingCompletions.push(...promoted); +} + function completionToExitPattern( completion: BgCompletion, ackCompletionOnDelivery = false, @@ -1132,6 +1473,52 @@ function formatCompletion(completion: BgCompletion): string { return previewBlock ? `${header}\n${previewBlock}` : header; } +function isUrgentCompletion(completion: BgCompletion): boolean { + return ["failed", "timed_out", "timeout", "killed"].includes(completion.status); +} + +function renderCompletionSection(header: string, completions: readonly BgCompletion[]): string { + const bullets = completions.map((completion) => formatCompletion(completion)).join("\n"); + const anyTruncated = completions.some((c) => c.output_truncated === true); + const tail = anyTruncated + ? `\n\nFor truncated tasks, use bash_status({ taskId: "..." }) to retrieve full output.` + : ""; + return `\n[${header}]\n${bullets}${tail}\n`; +} + +function reminderClassForPending( + state: SessionBgState, + includeDeferredCompletions: boolean, +): ReminderClass | null { + const completions = wakeEligibleCompletions(state, includeDeferredCompletions); + if (completions.some(isUrgentCompletion)) return "urgent_failure"; + if (state.pendingLongRunning.length > 0) return "timer"; + if (completions.length > 0) return "completion"; + if (state.pendingPatternMatches.length > 0) return "pattern_match"; + return null; +} + +function reminderPriority(reminderClass: ReminderClass | null): number { + switch (reminderClass) { + case "urgent_failure": + return 3; + case "timer": + return 2; + case "completion": + case "pattern_match": + return 1; + default: + return 0; + } +} + +function debounceDelayForReminderClass( + reminderClass: ReminderClass, + debounceStepMs = DEBOUNCE_STEP_MS, +): number { + return reminderClass === "urgent_failure" ? 0 : debounceStepMs; +} + function formatOutputPreview(completion: BgCompletion): string { // Strip ANSI escape sequences defensively — most output passes through bash // compressors first, but raw stdout from non-compressed commands may still diff --git a/packages/opencode-plugin/src/config.ts b/packages/opencode-plugin/src/config.ts index f4aa79b5..07a9c469 100644 --- a/packages/opencode-plugin/src/config.ts +++ b/packages/opencode-plugin/src/config.ts @@ -167,6 +167,25 @@ const BashFeaturesSchema = z.object({ subagent_background: z.boolean().optional(), long_running_reminder_enabled: z.boolean().optional(), long_running_reminder_interval_ms: z.number().int().positive().optional(), + /** + * Safety-net delay before a completion that was deferred for same-turn + * delivery wakes the session without waiting for OpenCode session.idle. + * Default 500ms. + */ + deferred_completion_fallback_ms: z.number().int().positive().optional(), + /** + * Maximum prompt-delivery retry attempts before wake delivery hard-stops + * and leaves completions pending for explicit in-turn drain. Default 5. + */ + wake_retry_max_attempts: z.number().int().positive().optional(), + /** + * Initial wake debounce delay and retry backoff floor. Default 200ms. + */ + wake_debounce_step_ms: z.number().int().positive().optional(), + /** + * Maximum wake debounce delay and retry backoff cap. Default 1000ms. + */ + wake_debounce_cap_ms: z.number().int().positive().optional(), /** * How long foreground bash blocks before auto-promoting the task to * background. Default 8000ms; values below the 5000ms floor are clamped up. @@ -460,6 +479,10 @@ export interface ResolvedBashConfig { background: boolean; /** See BashFeaturesSchema.subagent_background. Default false. */ subagent_background: boolean; + deferred_completion_fallback_ms: number; + wake_retry_max_attempts: number; + wake_debounce_step_ms: number; + wake_debounce_cap_ms: number; long_running_reminder_enabled?: boolean; long_running_reminder_interval_ms?: number; /** @@ -523,6 +546,14 @@ export function resolveBashConfig(config: AftConfig): ResolvedBashConfig { FOREGROUND_WAIT_WINDOW_MIN_MS, rawForegroundWait ?? FOREGROUND_WAIT_WINDOW_DEFAULT_MS, ); + const deferredCompletionFallbackMs = + typeof top === "object" && top !== null ? (top.deferred_completion_fallback_ms ?? 500) : 500; + const wakeRetryMaxAttempts = + typeof top === "object" && top !== null ? (top.wake_retry_max_attempts ?? 5) : 5; + const wakeDebounceStepMs = + typeof top === "object" && top !== null ? (top.wake_debounce_step_ms ?? 200) : 200; + const wakeDebounceCapMs = + typeof top === "object" && top !== null ? (top.wake_debounce_cap_ms ?? 1000) : 1000; const base: ResolvedBashConfig = { enabled: false, @@ -533,6 +564,10 @@ export function resolveBashConfig(config: AftConfig): ResolvedBashConfig { long_running_reminder_enabled: reminderEnabled, long_running_reminder_interval_ms: reminderInterval, foreground_wait_window_ms: foregroundWaitWindowMs, + deferred_completion_fallback_ms: deferredCompletionFallbackMs, + wake_retry_max_attempts: wakeRetryMaxAttempts, + wake_debounce_step_ms: wakeDebounceStepMs, + wake_debounce_cap_ms: wakeDebounceCapMs, }; // Top-level wins over legacy when both are present. diff --git a/packages/opencode-plugin/src/index.ts b/packages/opencode-plugin/src/index.ts index 35431223..4d9b6e81 100644 --- a/packages/opencode-plugin/src/index.ts +++ b/packages/opencode-plugin/src/index.ts @@ -21,7 +21,7 @@ import { handlePushedBgLongRunning, handlePushedPatternMatch, } from "./bg-notifications.js"; -import { loadAftConfig, resolveProjectOverridesForConfigure } from "./config.js"; +import { loadAftConfig, resolveBashConfig, resolveProjectOverridesForConfigure } from "./config.js"; import { enqueueConfigureWarningsForSession, flushConfigureWarningsOnIdle, @@ -518,6 +518,7 @@ async function initializePluginForDirectory(input: Parameters[0]) { // from that bridge, so draining/acking against a session-dir cache fallback // can target the wrong project on cold/stale cache. const sessionDir = bridge.getCwd(); + const projectBashConfig = resolveBashConfig(loadAftConfig(sessionDir)); void handlePushedBgCompletion( { ctx, @@ -529,12 +530,17 @@ async function initializePluginForDirectory(input: Parameters[0]) { // wake transport selection (anomalyco/opencode#28202). client: input.client, serverUrl: input.serverUrl?.toString(), + deferredCompletionFallbackMs: projectBashConfig.deferred_completion_fallback_ms, + wakeRetryMaxAttempts: projectBashConfig.wake_retry_max_attempts, + wakeDebounceStepMs: projectBashConfig.wake_debounce_step_ms, + wakeDebounceCapMs: projectBashConfig.wake_debounce_cap_ms, }, completion, ); }, onBashLongRunning: (reminder, bridge) => { const sessionDir = bridge.getCwd(); + const projectBashConfig = resolveBashConfig(loadAftConfig(sessionDir)); void handlePushedBgLongRunning( { ctx, @@ -544,12 +550,17 @@ async function initializePluginForDirectory(input: Parameters[0]) { // wake transport selection. client: input.client, serverUrl: input.serverUrl?.toString(), + deferredCompletionFallbackMs: projectBashConfig.deferred_completion_fallback_ms, + wakeRetryMaxAttempts: projectBashConfig.wake_retry_max_attempts, + wakeDebounceStepMs: projectBashConfig.wake_debounce_step_ms, + wakeDebounceCapMs: projectBashConfig.wake_debounce_cap_ms, }, reminder, ); }, onBashPatternMatch: (frame, bridge) => { const sessionDir = bridge.getCwd(); + const projectBashConfig = resolveBashConfig(loadAftConfig(sessionDir)); void handlePushedPatternMatch( { ctx, @@ -557,6 +568,10 @@ async function initializePluginForDirectory(input: Parameters[0]) { sessionID: frame.session_id, client: input.client, serverUrl: input.serverUrl?.toString(), + deferredCompletionFallbackMs: projectBashConfig.deferred_completion_fallback_ms, + wakeRetryMaxAttempts: projectBashConfig.wake_retry_max_attempts, + wakeDebounceStepMs: projectBashConfig.wake_debounce_step_ms, + wakeDebounceCapMs: projectBashConfig.wake_debounce_cap_ms, }, frame, ); @@ -977,12 +992,17 @@ async function initializePluginForDirectory(input: Parameters[0]) { // for `-s` resumes from another folder. const sessionDir = (await getSessionDirectory(input.client, sessionID, input.directory)) ?? input.directory; + const projectBashConfig = resolveBashConfig(loadAftConfig(sessionDir)); await handleIdleBgCompletions({ ctx, directory: sessionDir, sessionID, client: input.client, serverUrl: input.serverUrl?.toString(), + deferredCompletionFallbackMs: projectBashConfig.deferred_completion_fallback_ms, + wakeRetryMaxAttempts: projectBashConfig.wake_retry_max_attempts, + wakeDebounceStepMs: projectBashConfig.wake_debounce_step_ms, + wakeDebounceCapMs: projectBashConfig.wake_debounce_cap_ms, }); await flushConfigureWarningsOnIdle(sessionID); }, diff --git a/packages/opencode-plugin/src/shared/live-server-client.ts b/packages/opencode-plugin/src/shared/live-server-client.ts index c3801f02..3fec2f61 100644 --- a/packages/opencode-plugin/src/shared/live-server-client.ts +++ b/packages/opencode-plugin/src/shared/live-server-client.ts @@ -1,5 +1,5 @@ /** - * Workaround helper for the OpenCode plugin promptAsync runner-split bug + * Workaround helper for OpenCode plugin wake delivery bug * (https://github.com/anomalyco/opencode/issues/28202). * * OpenCode's plugin-provided `input.client` is constructed with @@ -13,22 +13,23 @@ * user parent — what users see as duplicate "stop" messages after every * background-bash completion reminder. * - * The workaround is to bypass `input.client` for the wake path and build - * a separate `createOpencodeClient` configured to hit `input.serverUrl` - * via `globalThis.fetch`. That client enters the same live listener the - * UI uses, so the active session's `SessionRunState` is the one that - * resolves `ensureRunning` and overlapping turns coalesce correctly. + * Workaround is to bypass `input.client` for wake path and build separate + * `createOpencodeClient` configured to hit `input.serverUrl` via + * `globalThis.fetch`. That client enters same live listener UI uses, so + * active session's `SessionRunState` is one that resolves `ensureRunning` + * and overlapping turns coalesce correctly. Live wake also uses sync + * `session.prompt(...)`, and resolution there is delivery proof that + * OpenCode listener accepted prompt work. * * The workaround only works when the live HTTP listener is actually * reachable. OpenCode Desktop (Electron+Node) and TUI launched with * `opencode --port 0` bind a real API listener; plain TUI binds an * internal-only listener that 404s for `/session/*`. We probe once at * plugin init and cache the result by `serverUrl`. When that server is - * unreachable, the wake path silently uses the in-process - * `input.client.session.promptAsync`, which keeps wakes flowing (at the - * cost of the upstream duplicate-runner bug) instead of producing no - * notification at all or nagging the user to relaunch with a different - * flag. + * unreachable, wake path must fall back to plugin-provided in-process + * client. That fallback keeps wakes flowing, but only live listener path + * gives sync delivery proof; degraded `promptAsync` fallback may still + * acknowledge before OpenCode persists or handles prompt. * * Tracked upstream as anomalyco/opencode#28202. When OpenCode fixes the * runtime split, this helper and its single consumer in `bg-notifications.ts` @@ -38,6 +39,7 @@ import { createOpencodeClient } from "@opencode-ai/sdk"; export type LiveServerClient = ReturnType; +type RequestHeaders = Record; /** * Cache key is `${serverUrl}|${directory}`. Both are stable per OpenCode @@ -74,6 +76,17 @@ function serverAuthHeaders(): Record | undefined { }; } +function mergeHeaders( + base: RequestHeaders | undefined, + extra: RequestHeaders | undefined, +): RequestHeaders | undefined { + if (!base && !extra) return undefined; + return { + ...(base ?? {}), + ...(extra ?? {}), + }; +} + /** * Return a cached `createOpencodeClient` pointed at the live HTTP listener * for the given `(serverUrl, directory)` pair. One client object is reused @@ -84,14 +97,26 @@ function serverAuthHeaders(): Record | undefined { * but we set it on purpose so anyone reading this code (or grepping for the * bug fix) can see that we intentionally chose the live HTTP transport. */ -export function getLiveServerClient(serverUrl: string, directory: string): LiveServerClient { +export function getLiveServerClient( + serverUrl: string, + directory: string, + headers?: RequestHeaders, +): LiveServerClient { + if (headers && Object.keys(headers).length > 0) { + return createOpencodeClient({ + baseUrl: serverUrl, + directory, + headers: mergeHeaders(serverAuthHeaders(), headers), + fetch: globalThis.fetch, + }); + } const key = cacheKey(serverUrl, directory); const cached = clientCache.get(key); if (cached) return cached; const client = createOpencodeClient({ baseUrl: serverUrl, directory, - headers: serverAuthHeaders(), + headers: mergeHeaders(serverAuthHeaders(), headers), fetch: globalThis.fetch, }); clientCache.set(key, client); @@ -122,9 +147,10 @@ let legacyLiveServerWakeAvailable = false; /** * Probe whether `serverUrl` serves OpenCode's HTTP API within `timeoutMs`. - * Returns `true` only when `/session` proves the API is usable: any 2xx - * response is reachable, and 401/403 also count as reachable because an - * auth-protected listener still exists. Returns `false` for 404 (plain + * Returns `true` only when `/session` proves AFT can actually use API: only + * `res.ok` counts as reachable. 401/403 are rejected unless env-derived + * Authorization header makes response OK, because AFT cannot satisfy host + * `ServerAuth` by any other path. Returns `false` for 401/403/404 (plain * TUI's internal listener), 5xx, connection refused, DNS failure, timeout, * malformed URL, or undefined URL. * @@ -153,7 +179,7 @@ export async function probeServerReachable( headers: serverAuthHeaders(), signal: controller.signal, }); - reachable = res.ok || res.status === 401 || res.status === 403; + reachable = res.ok; } catch { reachable = false; } finally { @@ -190,11 +216,12 @@ export function setLiveServerWakeAvailable( /** * Read the cached probe decision for `serverUrl`. `true` means the wake path - * should use `getLiveServerClient(serverUrl, directory)` and POST through - * the live HTTP listener. `false` means fall back to the in-process client - * passed via plugin context (`input.client`). + * should use `getLiveServerClient(serverUrl, directory)` and wake through + * live HTTP listener with sync `session.prompt(...)`. `false` means fall + * back to in-process client passed via plugin context (`input.client`). */ -export function useLiveServerWake(serverUrl?: string): boolean { +export function useLiveServerWake(serverUrl?: string, enabled = true): boolean { + if (!enabled) return false; if (!serverUrl) return legacyLiveServerWakeAvailable; return liveServerWakeAvailableByServerUrl.get(normalizeServerUrl(serverUrl)) ?? false; }