diff --git a/src/adapters/cli/pi.ts b/src/adapters/cli/pi.ts index 9cfff0a3..9f3709bd 100644 --- a/src/adapters/cli/pi.ts +++ b/src/adapters/cli/pi.ts @@ -41,6 +41,7 @@ export function createPiAdapter(pathOverride?: string): CliAdapter { }, completionPattern: undefined, + busyPattern: /Working\.\.\./, readyPattern: undefined, systemHints: BOTMUX_SHELL_HINTS, altScreen: true, diff --git a/src/adapters/cli/types.ts b/src/adapters/cli/types.ts index 1817bbd7..9e2db363 100644 --- a/src/adapters/cli/types.ts +++ b/src/adapters/cli/types.ts @@ -172,6 +172,12 @@ export interface CliAdapter { /** Completion marker regex (beyond generic quiescence). undefined = quiescence only. */ readonly completionPattern?: RegExp; + /** Busy marker regex — matches when the CLI is explicitly rendering a + * still-running state. Used for re-attached persistent sessions where there + * may be no new PTY output: if the current screen does NOT match this marker, + * the worker may safely let quiescence mark the session idle. */ + readonly busyPattern?: RegExp; + /** Ready marker regex — matches when the CLI's input prompt is rendered and * functional. When set, the idle detector suppresses quiescence-based idle * until this pattern appears in the PTY output. Checked every cycle (reset @@ -210,6 +216,11 @@ export interface CliAdapter { * correct for both shapes. */ readonly supportsTypeAhead?: boolean; + /** When true, worker may squash additional queued Lark messages into the + * pending tail instead of preserving one botmux turn per queued message. + * Keep this opt-in: most adapters rely on distinct turnId / card routing. */ + readonly mergeQueuedInput?: boolean; + /** Whether CLI uses alternate screen buffer */ readonly altScreen: boolean; diff --git a/src/utils/pending-input-queue.ts b/src/utils/pending-input-queue.ts new file mode 100644 index 00000000..1ba2ea89 --- /dev/null +++ b/src/utils/pending-input-queue.ts @@ -0,0 +1,15 @@ +export interface PendingCliInput { + content: string; + turnId?: string; +} + +export function mergeQueuedCliInput( + pending: PendingCliInput[], + next: PendingCliInput, +): boolean { + const tail = pending[pending.length - 1]; + if (!tail) return false; + tail.content = `${tail.content}\n\n${next.content}`; + tail.turnId = next.turnId ?? tail.turnId; + return true; +} diff --git a/src/worker.ts b/src/worker.ts index 35629e7b..76c8a93d 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -20,6 +20,7 @@ import { drainTranscript, joinAssistantText, trailingAssistantText, findJsonlCon import { BridgeTurnQueue, makeFingerprint, normaliseForFingerprint } from './services/bridge-turn-queue.js'; import { shouldSuppressBridgeEmit, type BridgeSendMarker } from './services/bridge-fallback-gate.js'; import { shouldWriteNow } from './utils/input-gate.js'; +import { mergeQueuedCliInput, type PendingCliInput } from './utils/pending-input-queue.js'; import { ReadyGate, shouldArmReadyGate } from './utils/ready-gate.js'; import { InflightInputTracker } from './core/inflight-input-tracker.js'; import { @@ -109,6 +110,10 @@ let consecutiveInWorkerRestarts = 0; * lifecycle so a 4× crash loop does not spam the Lark thread with 4 copies * of the same warning. */ let resumeFallbackNotified = false; +const IDLE_PROBE_INTERVAL_MS = 3_500; +const IDLE_PROBE_MAX_ATTEMPTS = 24; +let busyPatternIdleProbeTimer: ReturnType | null = null; +let reattachIdleProbeTimer: ReturnType | null = null; /** The effectiveResume flag used by the most recent spawnCli call. Written * immediately after the two-tier fallback check so late-attach timers * (hermes, cursor, etc.) can read THE SAME semantics the spawn used, @@ -218,7 +223,7 @@ function releaseReadyGate(reason: string): void { settleThenFlush(Date.now()); } } -const pendingMessages: Array<{ content: string; turnId?: string }> = []; +const pendingMessages: PendingCliInput[] = []; /** Inputs written to the CLI whose turn hasn't completed — re-queued across a * CLI crash so a submit-time death can't silently eat user messages. */ const inflightInputs = new InflightInputTracker(); @@ -2761,6 +2766,7 @@ function onPtyData(data: string): void { function markPromptReady(): void { if (isPromptReady) return; // guard against duplicate calls + stopBusyPatternIdleProbe(); // Ready-gate: a startup selector's ❯ (cjadk et al.) falsely matches // readyPattern → the IdleDetector fires idle while the CLI is NOT actually at // its input box. Hold off declaring ready until the SessionStart hook signal @@ -3022,6 +3028,7 @@ async function flushPending(): Promise { let result: Awaited> | undefined; try { result = await cliAdapter.writeInput(backend, msg); + scheduleBusyPatternIdleProbe(`${cliName()} post-submit`); } catch (err: any) { log(`writeInput threw: ${err?.message ?? err}`); // If the CLI exited mid-write the backend already fired onExit (which @@ -3064,7 +3071,19 @@ async function flushPending(): Promise { function sendToPty(content: string, turnId?: string): void { if (!backend || !cliAdapter) return; - pendingMessages.push({ content, turnId }); + const next = { content, turnId }; + const shouldMergeQueued = !isFlushing && !shouldWriteNow({ + isPromptReady, + isFlushing, + supportsTypeAhead: cliAdapter.supportsTypeAhead === true, + awaitingFirstPrompt, + }) && cliAdapter.mergeQueuedInput === true; + const mergedQueued = shouldMergeQueued && mergeQueuedCliInput(pendingMessages, next); + if (mergedQueued) { + log(`Merged queued message (${pendingMessages.length} pending): "${content.substring(0, 80)}" — ${cliName()} ${awaitingFirstPrompt ? 'still booting' : 'is busy'}`); + } else { + pendingMessages.push(next); + } // User-override semantics: a fresh Lark message while a TUI prompt is "active" // takes precedence over the AI-detected prompt. The screen analyzer can be // wrong (false positive on a question that has no rendered options) and a @@ -3089,10 +3108,10 @@ function sendToPty(content: string, turnId?: string): void { // delivers queued messages instead. See input-gate.ts; this fixes dispatch's // brief reaching Codex before its first idle and never landing. if (shouldWriteNow({ isPromptReady, isFlushing, supportsTypeAhead: cliAdapter.supportsTypeAhead === true, awaitingFirstPrompt })) { - log(`Writing to PTY: "${content.substring(0, 80)}"`); + if (!mergedQueued) log(`Writing to PTY: "${content.substring(0, 80)}"`); flushPending(); // fire-and-forget async; no-op if already flushing } else { - log(`Queued message (${pendingMessages.length} pending): "${content.substring(0, 80)}" — ${cliName()} ${awaitingFirstPrompt ? 'still booting' : 'is busy'}`); + if (!mergedQueued) log(`Queued message (${pendingMessages.length} pending): "${content.substring(0, 80)}" — ${cliName()} ${awaitingFirstPrompt ? 'still booting' : 'is busy'}`); } } @@ -3328,6 +3347,74 @@ function seedBackendScreen(source: string, be: Pick): string { + return be.captureViewport?.() ?? be.captureCurrentScreen?.() ?? ''; +} + +function probeBusyPatternIdle( + source: string, + be: Pick, +): boolean { + try { + const content = captureBackendScreen(be); + if (!content) return false; + if (cliAdapter?.busyPattern) { + if (cliAdapter.busyPattern.test(content)) return false; + log(`${source} idle probe: busy marker absent, marking prompt ready`); + markPromptReady(); + return true; + } + } catch (err: any) { + log(`${source} idle probe captureCurrentScreen failed: ${err.message}`); + } + return false; +} + +function scheduleReattachIdleProbe(source: string, be: Pick): void { + stopReattachIdleProbe(); + if (!cliAdapter?.busyPattern || (!be.captureCurrentScreen && !be.captureViewport)) return; + reattachIdleProbeTimer = setTimeout(() => { + reattachIdleProbeTimer = null; + if (backend !== be || !awaitingFirstPrompt || isPromptReady) return; + probeBusyPatternIdle(source, be); + }, IDLE_PROBE_INTERVAL_MS); + reattachIdleProbeTimer.unref?.(); +} + +function stopReattachIdleProbe(): void { + if (reattachIdleProbeTimer) { + clearTimeout(reattachIdleProbeTimer); + reattachIdleProbeTimer = null; + } +} + +function stopBusyPatternIdleProbe(): void { + if (busyPatternIdleProbeTimer) { + clearTimeout(busyPatternIdleProbeTimer); + busyPatternIdleProbeTimer = null; + } +} + +function scheduleBusyPatternIdleProbe(source: string): void { + stopBusyPatternIdleProbe(); + if (!cliAdapter?.busyPattern || (!backend?.captureCurrentScreen && !backend?.captureViewport)) return; + + let attempts = 0; + const tick = () => { + busyPatternIdleProbeTimer = null; + if (!backend || isPromptReady) return; + attempts += 1; + if (probeBusyPatternIdle(source, backend)) return; + if (attempts < IDLE_PROBE_MAX_ATTEMPTS && !isPromptReady) { + busyPatternIdleProbeTimer = setTimeout(tick, IDLE_PROBE_INTERVAL_MS); + busyPatternIdleProbeTimer.unref?.(); + } + }; + + busyPatternIdleProbeTimer = setTimeout(tick, IDLE_PROBE_INTERVAL_MS); + busyPatternIdleProbeTimer.unref?.(); +} + function spawnCli(cfg: Extract): void { // Re-deliver inputs that were in-flight when the previous CLI died (see // backend.onExit). killCli() already wiped pendingMessages, so these go to @@ -4082,6 +4169,7 @@ function spawnCli(cfg: Extract): void { if (isPipeMode && backend && 'isReattach' in backend && backend.isReattach) { log(`Re-attached to existing ${effectiveBackendType} session via pipe backend: ${persistentSessionName}`); seedBackendScreen(`${effectiveBackendType} reattach`, backend); + scheduleReattachIdleProbe(`${effectiveBackendType} reattach`, backend); } // Fallback: if the CLI takes too long to show its prompt (e.g. slow @@ -4108,6 +4196,8 @@ function spawnCli(cfg: Extract): void { function killCli(): void { idleDetector?.dispose(); idleDetector = null; + stopReattachIdleProbe(); + stopBusyPatternIdleProbe(); // Cancel any pending ready-gate fallback / settle timers; spawnCli re-arms on respawn. if (readySignalTimer) { clearTimeout(readySignalTimer); readySignalTimer = null; } if (readyFlushSettleTimer) { clearTimeout(readyFlushSettleTimer); readyFlushSettleTimer = null; } diff --git a/test/worker-pipe-initial-screen-order.test.ts b/test/worker-pipe-initial-screen-order.test.ts index f872baaa..6f60f5c8 100644 --- a/test/worker-pipe-initial-screen-order.test.ts +++ b/test/worker-pipe-initial-screen-order.test.ts @@ -14,4 +14,27 @@ describe('worker pipe initial screen ordering', () => { const idleIdx = source.indexOf('// Set up idle detection'); expect(captureIdx).toBeGreaterThan(idleIdx); }); + + it('runs a busy-pattern idle probe after each submitted input', () => { + const source = readFileSync(join(process.cwd(), 'src/worker.ts'), 'utf8'); + const writeIdx = source.indexOf('result = await cliAdapter.writeInput(backend, msg);'); + const probeIdx = source.indexOf('scheduleBusyPatternIdleProbe(`${cliName()} post-submit`);'); + const helperIdx = source.indexOf('function scheduleBusyPatternIdleProbe(source: string): void'); + + expect(helperIdx).toBeGreaterThan(-1); + expect(writeIdx).toBeGreaterThan(-1); + expect(probeIdx).toBeGreaterThan(writeIdx); + }); + + it('limits the reattach idle probe to adapters with a busy marker', () => { + const source = readFileSync(join(process.cwd(), 'src/worker.ts'), 'utf8'); + const helperStart = source.indexOf('function scheduleReattachIdleProbe'); + const helperEnd = source.indexOf('function stopReattachIdleProbe'); + const helper = source.slice(helperStart, helperEnd); + + expect(helperStart).toBeGreaterThan(-1); + expect(helper).toContain('if (!cliAdapter?.busyPattern || (!be.captureCurrentScreen && !be.captureViewport)) return;'); + expect(helper).toContain('if (backend !== be || !awaitingFirstPrompt || isPromptReady) return;'); + expect(helper).not.toContain('pendingMessages.length > 0'); + }); }); diff --git a/test/worker-queue-merge.test.ts b/test/worker-queue-merge.test.ts new file mode 100644 index 00000000..89698388 --- /dev/null +++ b/test/worker-queue-merge.test.ts @@ -0,0 +1,19 @@ +import { describe, expect, it } from 'vitest'; +import { mergeQueuedCliInput } from '../src/utils/pending-input-queue.js'; + +describe('mergeQueuedCliInput', () => { + it('returns false when there is no queued message to merge into', () => { + const pending: Array<{ content: string; turnId?: string }> = []; + + expect(mergeQueuedCliInput(pending, { content: 'next', turnId: 't2' })).toBe(false); + expect(pending).toEqual([]); + }); + + it('merges incremental queued messages into the pending tail', () => { + const pending = [{ content: 'first', turnId: 't1' }]; + + expect(mergeQueuedCliInput(pending, { content: 'second', turnId: 't2' })).toBe(true); + + expect(pending).toEqual([{ content: 'first\n\nsecond', turnId: 't2' }]); + }); +}); diff --git a/test/write-input.test.ts b/test/write-input.test.ts index cd13d92d..23fa2151 100644 --- a/test/write-input.test.ts +++ b/test/write-input.test.ts @@ -46,6 +46,7 @@ import { createOpenCodeAdapter } from '../src/adapters/cli/opencode.js'; import { createMtrAdapter } from '../src/adapters/cli/mtr.js'; import { createHermesAdapter } from '../src/adapters/cli/hermes.js'; import { createMiraAdapter } from '../src/adapters/cli/mira.js'; +import { createPiAdapter } from '../src/adapters/cli/pi.js'; import type { CliAdapter, PtyHandle } from '../src/adapters/cli/types.js'; import { appendFileSync, mkdirSync, rmSync, writeFileSync } from 'node:fs'; import { homedir, platform } from 'node:os'; @@ -171,13 +172,14 @@ const HUMAN_TYPING_ADAPTERS: AdapterEntry[] = [ ]; /** Adapters that use tmux pasteText (load-buffer + paste-buffer -d) with - * delayed Enter — CoCo / Trae CLI and Codex. See coco.ts for the Trae 0.120.31 + * delayed Enter — CoCo / Trae CLI, Codex, and Pi. See coco.ts for the Trae 0.120.31 * burst bug, and codex.ts for the per-line-submit bug bracketed paste fixes * (Codex 0.134+ handles bracketed paste correctly — the old "Codex exits on * bracketed paste" note was true only for a much earlier build). */ const PASTE_BUFFER_ADAPTERS: AdapterEntry[] = [ ['coco', createCocoAdapter('/bin/coco')], ['codex', createCodexAdapter('/bin/codex')], + ['pi', createPiAdapter('/bin/pi')], ]; /** Adapters that wrap content in bracketed-paste markers (\x1b[200~ ... \x1b[201~) @@ -491,6 +493,26 @@ describe('supportsTypeAhead flag', () => { expect(createCodexAdapter('/bin/codex').supportsTypeAhead).toBe(true); }); + it('pi: undefined (uses busy marker probes instead of type-ahead)', () => { + expect(createPiAdapter('/bin/pi').supportsTypeAhead).toBeUndefined(); + }); + + it('pi: exposes Working... as the explicit busy marker', () => { + const adapter = createPiAdapter('/bin/pi'); + expect(adapter.busyPattern?.test('⠙ Working...')).toBe(true); + expect(adapter.busyPattern?.test('已完成,等待下一条输入')).toBe(false); + }); + + it('pi: does not squash queued botmux turns', () => { + expect(createPiAdapter('/bin/pi').mergeQueuedInput).toBeUndefined(); + }); + + it('non-pi type-ahead adapters do not squash queued botmux turns', () => { + expect(createClaudeCodeAdapter('/bin/claude').mergeQueuedInput).toBeUndefined(); + expect(createCocoAdapter('/bin/coco').mergeQueuedInput).toBeUndefined(); + expect(createCodexAdapter('/bin/codex').mergeQueuedInput).toBeUndefined(); + }); + it.each(PLAIN_ADAPTERS.filter(([name]) => name !== 'codex'))('%s: undefined (default behavior)', (_name, adapter) => { expect(adapter.supportsTypeAhead).toBeUndefined(); });