Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/adapters/cli/pi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export function createPiAdapter(pathOverride?: string): CliAdapter {
},

completionPattern: undefined,
busyPattern: /Working\.\.\./,
readyPattern: undefined,
systemHints: BOTMUX_SHELL_HINTS,
altScreen: true,
Expand Down
11 changes: 11 additions & 0 deletions src/adapters/cli/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ export interface CliAdapter {
/** Completion marker regex (beyond generic quiescence). undefined = quiescence only. */
readonly completionPattern?: RegExp;

/** Busy marker regex — matches when the CLI is explicitly rendering a
* still-running state. Used for re-attached persistent sessions where there
* may be no new PTY output: if the current screen does NOT match this marker,
* the worker may safely let quiescence mark the session idle. */
readonly busyPattern?: RegExp;

/** Ready marker regex — matches when the CLI's input prompt is rendered and
* functional. When set, the idle detector suppresses quiescence-based idle
* until this pattern appears in the PTY output. Checked every cycle (reset
Expand Down Expand Up @@ -210,6 +216,11 @@ export interface CliAdapter {
* correct for both shapes. */
readonly supportsTypeAhead?: boolean;

/** When true, worker may squash additional queued Lark messages into the
* pending tail instead of preserving one botmux turn per queued message.
* Keep this opt-in: most adapters rely on distinct turnId / card routing. */
readonly mergeQueuedInput?: boolean;

/** Whether CLI uses alternate screen buffer */
readonly altScreen: boolean;

Expand Down
15 changes: 15 additions & 0 deletions src/utils/pending-input-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export interface PendingCliInput {
content: string;
turnId?: string;
}

export function mergeQueuedCliInput(
pending: PendingCliInput[],
next: PendingCliInput,
): boolean {
const tail = pending[pending.length - 1];
if (!tail) return false;
tail.content = `${tail.content}\n\n${next.content}`;
tail.turnId = next.turnId ?? tail.turnId;
return true;
}
98 changes: 94 additions & 4 deletions src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { drainTranscript, joinAssistantText, trailingAssistantText, findJsonlCon
import { BridgeTurnQueue, makeFingerprint, normaliseForFingerprint } from './services/bridge-turn-queue.js';
import { shouldSuppressBridgeEmit, type BridgeSendMarker } from './services/bridge-fallback-gate.js';
import { shouldWriteNow } from './utils/input-gate.js';
import { mergeQueuedCliInput, type PendingCliInput } from './utils/pending-input-queue.js';
import { ReadyGate, shouldArmReadyGate } from './utils/ready-gate.js';
import { InflightInputTracker } from './core/inflight-input-tracker.js';
import {
Expand Down Expand Up @@ -109,6 +110,10 @@ let consecutiveInWorkerRestarts = 0;
* lifecycle so a 4× crash loop does not spam the Lark thread with 4 copies
* of the same warning. */
let resumeFallbackNotified = false;
const IDLE_PROBE_INTERVAL_MS = 3_500;
const IDLE_PROBE_MAX_ATTEMPTS = 24;
let busyPatternIdleProbeTimer: ReturnType<typeof setTimeout> | null = null;
let reattachIdleProbeTimer: ReturnType<typeof setTimeout> | null = null;
/** The effectiveResume flag used by the most recent spawnCli call. Written
* immediately after the two-tier fallback check so late-attach timers
* (hermes, cursor, etc.) can read THE SAME semantics the spawn used,
Expand Down Expand Up @@ -218,7 +223,7 @@ function releaseReadyGate(reason: string): void {
settleThenFlush(Date.now());
}
}
const pendingMessages: Array<{ content: string; turnId?: string }> = [];
const pendingMessages: PendingCliInput[] = [];
/** Inputs written to the CLI whose turn hasn't completed — re-queued across a
* CLI crash so a submit-time death can't silently eat user messages. */
const inflightInputs = new InflightInputTracker();
Expand Down Expand Up @@ -2761,6 +2766,7 @@ function onPtyData(data: string): void {

function markPromptReady(): void {
if (isPromptReady) return; // guard against duplicate calls
stopBusyPatternIdleProbe();
// Ready-gate: a startup selector's ❯ (cjadk et al.) falsely matches
// readyPattern → the IdleDetector fires idle while the CLI is NOT actually at
// its input box. Hold off declaring ready until the SessionStart hook signal
Expand Down Expand Up @@ -3022,6 +3028,7 @@ async function flushPending(): Promise<void> {
let result: Awaited<ReturnType<typeof cliAdapter.writeInput>> | undefined;
try {
result = await cliAdapter.writeInput(backend, msg);
scheduleBusyPatternIdleProbe(`${cliName()} post-submit`);
} catch (err: any) {
log(`writeInput threw: ${err?.message ?? err}`);
// If the CLI exited mid-write the backend already fired onExit (which
Expand Down Expand Up @@ -3064,7 +3071,19 @@ async function flushPending(): Promise<void> {

function sendToPty(content: string, turnId?: string): void {
if (!backend || !cliAdapter) return;
pendingMessages.push({ content, turnId });
const next = { content, turnId };
const shouldMergeQueued = !isFlushing && !shouldWriteNow({
isPromptReady,
isFlushing,
supportsTypeAhead: cliAdapter.supportsTypeAhead === true,
awaitingFirstPrompt,
}) && cliAdapter.mergeQueuedInput === true;
const mergedQueued = shouldMergeQueued && mergeQueuedCliInput(pendingMessages, next);
if (mergedQueued) {
log(`Merged queued message (${pendingMessages.length} pending): "${content.substring(0, 80)}" — ${cliName()} ${awaitingFirstPrompt ? 'still booting' : 'is busy'}`);
} else {
pendingMessages.push(next);
}
// User-override semantics: a fresh Lark message while a TUI prompt is "active"
// takes precedence over the AI-detected prompt. The screen analyzer can be
// wrong (false positive on a question that has no rendered options) and a
Expand All @@ -3089,10 +3108,10 @@ function sendToPty(content: string, turnId?: string): void {
// delivers queued messages instead. See input-gate.ts; this fixes dispatch's
// brief reaching Codex before its first idle and never landing.
if (shouldWriteNow({ isPromptReady, isFlushing, supportsTypeAhead: cliAdapter.supportsTypeAhead === true, awaitingFirstPrompt })) {
log(`Writing to PTY: "${content.substring(0, 80)}"`);
if (!mergedQueued) log(`Writing to PTY: "${content.substring(0, 80)}"`);
flushPending(); // fire-and-forget async; no-op if already flushing
} else {
log(`Queued message (${pendingMessages.length} pending): "${content.substring(0, 80)}" — ${cliName()} ${awaitingFirstPrompt ? 'still booting' : 'is busy'}`);
if (!mergedQueued) log(`Queued message (${pendingMessages.length} pending): "${content.substring(0, 80)}" — ${cliName()} ${awaitingFirstPrompt ? 'still booting' : 'is busy'}`);
}
}

Expand Down Expand Up @@ -3328,6 +3347,74 @@ function seedBackendScreen(source: string, be: Pick<SessionBackend, 'captureCurr
}
}

function captureBackendScreen(be: Pick<SessionBackend, 'captureCurrentScreen' | 'captureViewport'>): string {
return be.captureViewport?.() ?? be.captureCurrentScreen?.() ?? '';
}

function probeBusyPatternIdle(
source: string,
be: Pick<SessionBackend, 'captureCurrentScreen' | 'captureViewport'>,
): boolean {
try {
const content = captureBackendScreen(be);
if (!content) return false;
if (cliAdapter?.busyPattern) {
if (cliAdapter.busyPattern.test(content)) return false;
log(`${source} idle probe: busy marker absent, marking prompt ready`);
markPromptReady();
return true;
}
} catch (err: any) {
log(`${source} idle probe captureCurrentScreen failed: ${err.message}`);
}
return false;
}

function scheduleReattachIdleProbe(source: string, be: Pick<SessionBackend, 'captureCurrentScreen' | 'captureViewport'>): void {
stopReattachIdleProbe();
if (!cliAdapter?.busyPattern || (!be.captureCurrentScreen && !be.captureViewport)) return;
reattachIdleProbeTimer = setTimeout(() => {
reattachIdleProbeTimer = null;
if (backend !== be || !awaitingFirstPrompt || isPromptReady) return;
probeBusyPatternIdle(source, be);
}, IDLE_PROBE_INTERVAL_MS);
reattachIdleProbeTimer.unref?.();
}

function stopReattachIdleProbe(): void {
if (reattachIdleProbeTimer) {
clearTimeout(reattachIdleProbeTimer);
reattachIdleProbeTimer = null;
}
}

function stopBusyPatternIdleProbe(): void {
if (busyPatternIdleProbeTimer) {
clearTimeout(busyPatternIdleProbeTimer);
busyPatternIdleProbeTimer = null;
}
}

function scheduleBusyPatternIdleProbe(source: string): void {
stopBusyPatternIdleProbe();
if (!cliAdapter?.busyPattern || (!backend?.captureCurrentScreen && !backend?.captureViewport)) return;

let attempts = 0;
const tick = () => {
busyPatternIdleProbeTimer = null;
if (!backend || isPromptReady) return;
attempts += 1;
if (probeBusyPatternIdle(source, backend)) return;
if (attempts < IDLE_PROBE_MAX_ATTEMPTS && !isPromptReady) {
busyPatternIdleProbeTimer = setTimeout(tick, IDLE_PROBE_INTERVAL_MS);
busyPatternIdleProbeTimer.unref?.();
}
};

busyPatternIdleProbeTimer = setTimeout(tick, IDLE_PROBE_INTERVAL_MS);
busyPatternIdleProbeTimer.unref?.();
}

function spawnCli(cfg: Extract<DaemonToWorker, { type: 'init' }>): void {
// Re-deliver inputs that were in-flight when the previous CLI died (see
// backend.onExit). killCli() already wiped pendingMessages, so these go to
Expand Down Expand Up @@ -4082,6 +4169,7 @@ function spawnCli(cfg: Extract<DaemonToWorker, { type: 'init' }>): void {
if (isPipeMode && backend && 'isReattach' in backend && backend.isReattach) {
log(`Re-attached to existing ${effectiveBackendType} session via pipe backend: ${persistentSessionName}`);
seedBackendScreen(`${effectiveBackendType} reattach`, backend);
scheduleReattachIdleProbe(`${effectiveBackendType} reattach`, backend);
}

// Fallback: if the CLI takes too long to show its prompt (e.g. slow
Expand All @@ -4108,6 +4196,8 @@ function spawnCli(cfg: Extract<DaemonToWorker, { type: 'init' }>): void {
function killCli(): void {
idleDetector?.dispose();
idleDetector = null;
stopReattachIdleProbe();
stopBusyPatternIdleProbe();
// Cancel any pending ready-gate fallback / settle timers; spawnCli re-arms on respawn.
if (readySignalTimer) { clearTimeout(readySignalTimer); readySignalTimer = null; }
if (readyFlushSettleTimer) { clearTimeout(readyFlushSettleTimer); readyFlushSettleTimer = null; }
Expand Down
23 changes: 23 additions & 0 deletions test/worker-pipe-initial-screen-order.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,27 @@ describe('worker pipe initial screen ordering', () => {
const idleIdx = source.indexOf('// Set up idle detection');
expect(captureIdx).toBeGreaterThan(idleIdx);
});

it('runs a busy-pattern idle probe after each submitted input', () => {
const source = readFileSync(join(process.cwd(), 'src/worker.ts'), 'utf8');
const writeIdx = source.indexOf('result = await cliAdapter.writeInput(backend, msg);');
const probeIdx = source.indexOf('scheduleBusyPatternIdleProbe(`${cliName()} post-submit`);');
const helperIdx = source.indexOf('function scheduleBusyPatternIdleProbe(source: string): void');

expect(helperIdx).toBeGreaterThan(-1);
expect(writeIdx).toBeGreaterThan(-1);
expect(probeIdx).toBeGreaterThan(writeIdx);
});

it('limits the reattach idle probe to adapters with a busy marker', () => {
const source = readFileSync(join(process.cwd(), 'src/worker.ts'), 'utf8');
const helperStart = source.indexOf('function scheduleReattachIdleProbe');
const helperEnd = source.indexOf('function stopReattachIdleProbe');
const helper = source.slice(helperStart, helperEnd);

expect(helperStart).toBeGreaterThan(-1);
expect(helper).toContain('if (!cliAdapter?.busyPattern || (!be.captureCurrentScreen && !be.captureViewport)) return;');
expect(helper).toContain('if (backend !== be || !awaitingFirstPrompt || isPromptReady) return;');
expect(helper).not.toContain('pendingMessages.length > 0');
});
});
19 changes: 19 additions & 0 deletions test/worker-queue-merge.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { describe, expect, it } from 'vitest';
import { mergeQueuedCliInput } from '../src/utils/pending-input-queue.js';

describe('mergeQueuedCliInput', () => {
it('returns false when there is no queued message to merge into', () => {
const pending: Array<{ content: string; turnId?: string }> = [];

expect(mergeQueuedCliInput(pending, { content: 'next', turnId: 't2' })).toBe(false);
expect(pending).toEqual([]);
});

it('merges incremental queued messages into the pending tail', () => {
const pending = [{ content: 'first', turnId: 't1' }];

expect(mergeQueuedCliInput(pending, { content: 'second', turnId: 't2' })).toBe(true);

expect(pending).toEqual([{ content: 'first\n\nsecond', turnId: 't2' }]);
});
});
24 changes: 23 additions & 1 deletion test/write-input.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import { createOpenCodeAdapter } from '../src/adapters/cli/opencode.js';
import { createMtrAdapter } from '../src/adapters/cli/mtr.js';
import { createHermesAdapter } from '../src/adapters/cli/hermes.js';
import { createMiraAdapter } from '../src/adapters/cli/mira.js';
import { createPiAdapter } from '../src/adapters/cli/pi.js';
import type { CliAdapter, PtyHandle } from '../src/adapters/cli/types.js';
import { appendFileSync, mkdirSync, rmSync, writeFileSync } from 'node:fs';
import { homedir, platform } from 'node:os';
Expand Down Expand Up @@ -171,13 +172,14 @@ const HUMAN_TYPING_ADAPTERS: AdapterEntry[] = [
];

/** Adapters that use tmux pasteText (load-buffer + paste-buffer -d) with
* delayed Enter — CoCo / Trae CLI and Codex. See coco.ts for the Trae 0.120.31
* delayed Enter — CoCo / Trae CLI, Codex, and Pi. See coco.ts for the Trae 0.120.31
* burst bug, and codex.ts for the per-line-submit bug bracketed paste fixes
* (Codex 0.134+ handles bracketed paste correctly — the old "Codex exits on
* bracketed paste" note was true only for a much earlier build). */
const PASTE_BUFFER_ADAPTERS: AdapterEntry[] = [
['coco', createCocoAdapter('/bin/coco')],
['codex', createCodexAdapter('/bin/codex')],
['pi', createPiAdapter('/bin/pi')],
];

/** Adapters that wrap content in bracketed-paste markers (\x1b[200~ ... \x1b[201~)
Expand Down Expand Up @@ -491,6 +493,26 @@ describe('supportsTypeAhead flag', () => {
expect(createCodexAdapter('/bin/codex').supportsTypeAhead).toBe(true);
});

it('pi: undefined (uses busy marker probes instead of type-ahead)', () => {
expect(createPiAdapter('/bin/pi').supportsTypeAhead).toBeUndefined();
});

it('pi: exposes Working... as the explicit busy marker', () => {
const adapter = createPiAdapter('/bin/pi');
expect(adapter.busyPattern?.test('⠙ Working...')).toBe(true);
expect(adapter.busyPattern?.test('已完成,等待下一条输入')).toBe(false);
});

it('pi: does not squash queued botmux turns', () => {
expect(createPiAdapter('/bin/pi').mergeQueuedInput).toBeUndefined();
});

it('non-pi type-ahead adapters do not squash queued botmux turns', () => {
expect(createClaudeCodeAdapter('/bin/claude').mergeQueuedInput).toBeUndefined();
expect(createCocoAdapter('/bin/coco').mergeQueuedInput).toBeUndefined();
expect(createCodexAdapter('/bin/codex').mergeQueuedInput).toBeUndefined();
});

it.each(PLAIN_ADAPTERS.filter(([name]) => name !== 'codex'))('%s: undefined (default behavior)', (_name, adapter) => {
expect(adapter.supportsTypeAhead).toBeUndefined();
});
Expand Down