Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/loop/bridge-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { buildLaunchArgv } from "./launch";
import type { Agent } from "./types";

const CODEX_AUTO_APPROVED_BRIDGE_TOOLS = [
"send_to_agent",
"send_message",
"bridge_status",
"receive_messages",
] as const;
Expand Down
4 changes: 2 additions & 2 deletions src/loop/bridge-guidance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ export const receiveMessagesStuckGuidance =
'Use "bridge_status" or "receive_messages" only if delivery looks stuck.';

export const sendToClaudeGuidance = (): string =>
`Use "send_to_agent" with ${bridgeTargetLiteral("claude")} for Claude-facing messages, not a human-facing message.`;
`Use "send_message" with ${bridgeTargetLiteral("claude")} for Claude-facing messages, not a human-facing message.`;

export const sendProactiveCodexGuidance = (): string =>
`Use "send_to_agent" with ${bridgeTargetLiteral("codex")} for Codex-facing messages, including replies to inbound Codex channel messages; do not send Codex-facing responses as a human-facing message.`;
`Use "send_message" with ${bridgeTargetLiteral("codex")} for Codex-facing messages, including replies to inbound Codex channel messages; do not send Codex-facing responses as a human-facing message.`;

export const claudeChannelInstructions = (): string =>
[
Expand Down
2 changes: 1 addition & 1 deletion src/loop/bridge-message-format.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ export const formatCodexBridgeMessage = (
};

export const normalizeBridgeMessage = (message: string): string =>
message.trim().replace(BRIDGE_PREFIX_RE, "").replace(/\s+/g, " ");
message.trim().replace(BRIDGE_PREFIX_RE, "").replace(/\s+/g, " ").trim();
128 changes: 93 additions & 35 deletions src/loop/bridge.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { mkdirSync, watch } from "node:fs";
import { basename } from "node:path";
import { claudeChannelServerName } from "./bridge-config";
import { BRIDGE_SERVER as BRIDGE_SERVER_VALUE } from "./bridge-constants";
import {
Expand Down Expand Up @@ -28,8 +30,8 @@ import {
import { LOOP_VERSION } from "./constants";
import type { Agent } from "./types";

const CHANNEL_POLL_DELAY_MS = 500;
const CLAUDE_CHANNEL_CAPABILITY = "claude/channel";
const CLAUDE_CHANNEL_FALLBACK_SWEEP_MS = 2000;
const CONTENT_LENGTH_RE = /Content-Length:\s*(\d+)/i;
const CONTENT_LENGTH_PREFIX = "content-length:";
const DEFAULT_PROTOCOL_VERSION = "2024-11-05";
Expand Down Expand Up @@ -142,7 +144,7 @@ const handleReceiveMessagesTool = (
});
};

const handleSendToAgentTool = async (
const handleSendMessageTool = async (
id: JsonRpcRequest["id"],
runDir: string,
source: Agent,
Expand All @@ -154,7 +156,7 @@ const handleSendToAgentTool = async (
writeError(
id,
MCP_INVALID_PARAMS,
"send_to_agent requires a non-empty target"
"send_message requires a non-empty target"
);
return;
}
Expand All @@ -171,15 +173,15 @@ const handleSendToAgentTool = async (
writeError(
id,
MCP_INVALID_PARAMS,
"send_to_agent requires a non-empty message"
"send_message requires a non-empty message"
);
return;
}
if (target === source) {
writeError(
id,
MCP_INVALID_PARAMS,
"send_to_agent cannot target the current agent"
"send_message cannot target the current agent"
);
return;
}
Expand Down Expand Up @@ -244,12 +246,21 @@ const handleToolCall = async (
return;
}

if (name !== "send_to_agent") {
if (name === "send_to_agent") {
writeError(
id,
MCP_INVALID_PARAMS,
'Unknown tool: send_to_agent. Use "send_message" instead.'
);
return;
}

if (name !== "send_message") {
writeError(id, MCP_INVALID_PARAMS, `Unknown tool: ${name}`);
return;
}

await handleSendToAgentTool(id, runDir, source, args);
await handleSendMessageTool(id, runDir, source, args);
};

const requestedProtocolVersion = (request: JsonRpcRequest): string =>
Expand Down Expand Up @@ -312,7 +323,7 @@ const handleBridgeRequest = async (
tools: [
{
annotations: MUTATING_TOOL_ANNOTATIONS,
description: "Send an explicit message to the paired agent.",
description: "Send a direct message to the paired agent.",
inputSchema: {
additionalProperties: false,
properties: {
Expand All @@ -325,12 +336,12 @@ const handleBridgeRequest = async (
required: ["target", "message"],
type: "object",
},
name: "send_to_agent",
name: "send_message",
},
{
annotations: READ_ONLY_TOOL_ANNOTATIONS,
description:
"Inspect the current paired run and pending bridge messages.",
"Inspect the current paired run and pending bridge state when delivery looks stuck.",
inputSchema: {
additionalProperties: false,
properties: {},
Expand All @@ -341,7 +352,7 @@ const handleBridgeRequest = async (
{
annotations: RECEIVE_MESSAGES_TOOL_ANNOTATIONS,
description:
"Read and clear pending bridge messages addressed to you.",
"Read and clear pending bridge messages addressed to you when delivery looks stuck.",
inputSchema: {
additionalProperties: false,
properties: {},
Expand Down Expand Up @@ -481,12 +492,24 @@ const consumeFrames = (
process.stdin.on("error", reject);
});

const isBridgeWatchEvent = (
runDir: string,
filename: string | Buffer | null
): boolean => {
if (!filename) {
return true;
}
return filename.toString() === basename(bridgePath(runDir));
};

export const runBridgeMcpServer = async (
runDir: string,
source: Agent
): Promise<void> => {
let channelReady = false;
let bridgeWatcher: { close: () => void } | undefined;
let closed = false;
let fallbackSweep: ReturnType<typeof setTimeout> | undefined;
let flushQueue: Promise<void> = Promise.resolve();
let requestQueue: Promise<void> = Promise.resolve();
const queueClaudeFlush = (): Promise<void> => {
Expand All @@ -499,39 +522,74 @@ export const runBridgeMcpServer = async (
flushQueue = flushQueue.then(next, next);
return flushQueue;
};
const pollClaudeChannel = async (): Promise<void> => {
while (!closed) {
await queueClaudeFlush();

const triggerClaudeFlush = (): void => {
queueClaudeFlush().catch(() => undefined);
};

const clearClaudeSweep = (): void => {
if (!fallbackSweep) {
return;
}
clearTimeout(fallbackSweep);
fallbackSweep = undefined;
};

const scheduleClaudeSweep = (): void => {
if (!(source === "claude" && channelReady) || closed || fallbackSweep) {
return;
}
fallbackSweep = setTimeout(() => {
fallbackSweep = undefined;
if (closed) {
return;
}
await new Promise((resolve) => {
setTimeout(resolve, CHANNEL_POLL_DELAY_MS);
});
}
triggerClaudeFlush();
scheduleClaudeSweep();
}, CLAUDE_CHANNEL_FALLBACK_SWEEP_MS);
fallbackSweep.unref?.();
};
Comment on lines +538 to 551
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The fallback sweep logic could potentially lead to multiple overlapping timers if scheduleClaudeSweep is called concurrently before fallbackSweep is assigned. While the current usage in notifications/initialized seems safe, adding a check to ensure any existing timer is cleared before scheduling a new one would be more robust.


process.stdin.resume();
const poller = source === "claude" ? pollClaudeChannel() : Promise.resolve();
await consumeFrames(
(request) => {
const handleRequest = async (): Promise<void> => {
if (request.method === "notifications/initialized") {
channelReady = true;
if (source === "claude") {
mkdirSync(runDir, { recursive: true });
try {
bridgeWatcher = watch(runDir, (_eventType, filename) => {
if (!isBridgeWatchEvent(runDir, filename)) {
return;
}
await handleBridgeRequest(runDir, source, request);
await queueClaudeFlush();
};
requestQueue = requestQueue.then(handleRequest, handleRequest);
},
() => {
closed = true;
triggerClaudeFlush();
});
Comment on lines +556 to +561
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The fs.watch callback is triggered on every file system event in the runDir. Since triggerClaudeFlush initiates a promise chain that reads the entire bridge inbox, frequent events (e.g., multiple rapid writes to the bridge file) could lead to redundant processing. Consider adding a small debounce to triggerClaudeFlush to improve efficiency during bursts of activity.

} catch {
bridgeWatcher = undefined;
}
);
closed = true;
}

process.stdin.resume();
try {
await consumeFrames(
(request) => {
const handleRequest = async (): Promise<void> => {
if (request.method === "notifications/initialized") {
channelReady = true;
scheduleClaudeSweep();
}
await handleBridgeRequest(runDir, source, request);
await queueClaudeFlush();
};
requestQueue = requestQueue.then(handleRequest, handleRequest);
},
() => {
closed = true;
}
);
} finally {
closed = true;
bridgeWatcher?.close();
clearClaudeSweep();
}

await requestQueue;
await queueClaudeFlush();
await poller;
};

export const bridgeInternals = {
Expand Down
25 changes: 16 additions & 9 deletions src/loop/paired-loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,15 @@ const bridgeGuidance = (agent: Agent): string => {
const target = agent === "claude" ? "codex" : "claude";
return [
"Paired mode:",
`You are in a persistent Claude/Codex pair. Use the MCP tool "send_to_agent" with ${bridgeTargetLiteral(target)} when you want ${peer} to act, review, or answer.`,
'Do not ask the human to relay messages between agents or answer the human on the other agent\'s behalf. Use "bridge_status" if you need the current bridge state.',
'If "bridge_status" shows pending messages addressed to you, call "receive_messages" to read them.',
`You are in a persistent Claude/Codex pair. Use the MCP tool "send_message" with ${bridgeTargetLiteral(target)} when you want ${peer} to act, review, or answer.`,
'Do not ask the human to relay messages between agents or answer the human on the other agent\'s behalf. Use "bridge_status" only if delivery looks stuck.',
'Use "receive_messages" only if "bridge_status" shows pending messages addressed to you and direct delivery looks stuck.',
].join("\n");
};

const bridgeToolGuidance = [
'You can use the MCP tools "send_to_agent", "bridge_status", and "receive_messages" for direct Claude/Codex coordination.',
'You can use the MCP tools "send_message", "bridge_status", and "receive_messages" for direct Claude/Codex coordination.',
'Only use "bridge_status" or "receive_messages" when delivery looks stuck.',
"Do not ask the human to relay messages between agents.",
].join("\n");

Expand All @@ -116,7 +117,7 @@ const reviewDeliveryGuidance = (reviewer: Agent, opts: Options): string => {
return "If review is needed, keep the actionable notes in your review body before the final review signal.";
}

return `If review is needed, send the actionable notes to ${capitalize(opts.agent)} with "send_to_agent" using ${bridgeTargetLiteral(opts.agent)} before returning your final review signal.`;
return `If review is needed, send the actionable notes to ${capitalize(opts.agent)} with "send_message" using ${bridgeTargetLiteral(opts.agent)} before returning your final review signal.`;
};

const reviewToolGuidance = (reviewer: Agent, opts: Options): string =>
Expand Down Expand Up @@ -151,19 +152,25 @@ const reviewBridgePrompt = (
.filter(Boolean)
.join("\n\n");

const forwardBridgePrompt = (source: Agent, message: string): string =>
const forwardBridgePrompt = ({
message,
source,
}: {
message: string;
source: Agent;
}): string =>
(source === "claude"
? [
formatCodexBridgeMessage(source, message),
"Treat this as direct agent-to-agent coordination. Do not reply to the human.",
'Send a message to the other agent with "send_to_agent" only when you have something useful for them to act on.',
'Send a message to the other agent with "send_message" only when you have something useful for them to act on.',
"Do not acknowledge receipt without new information.",
]
: [
`Message from ${capitalize(source)} via the loop bridge:`,
message.trim(),
"Treat this as direct agent-to-agent coordination. Do not reply to the human.",
'Send a message to the other agent with "send_to_agent" only when you have something useful for them to act on.',
'Send a message to the other agent with "send_message" only when you have something useful for them to act on.',
"Do not acknowledge receipt without new information.",
]
).join("\n\n");
Expand Down Expand Up @@ -284,7 +291,7 @@ const drainBridge = async (
const result = await tryRunPairedAgent(
state,
message.target,
forwardBridgePrompt(message.source, message.message)
forwardBridgePrompt(message)
);
if (!result) {
return { deliveredToPrimary };
Expand Down
6 changes: 3 additions & 3 deletions src/loop/tmux.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,14 @@ const buildPrimaryPrompt = (
const parts = [
`Agent-to-agent pair programming: you are the primary ${capitalize(opts.agent)} agent for this run.`,
`Task:\n${task.trim()}`,
`Your peer is ${peer}. Do the initial pass yourself, then use "send_to_agent" when you want review or targeted help from ${peer}.`,
`Your peer is ${peer}. Do the initial pass yourself, then use "send_message" when you want review or targeted help from ${peer}.`,
];
appendProofPrompt(parts, opts.proof);
parts.push(SPAWN_TEAM_WITH_WORKTREE_ISOLATION);
parts.push(pairedBridgeGuidance(opts.agent, runId, serverName));
parts.push(pairedWorkflowGuidance(opts, opts.agent));
parts.push(
`${peer} should send a short ready message. Wait briefly if it arrives, then inspect the repo and start. Ask ${peer} for review once you have concrete work or a specific question.`
`Inspect the repo and start. Ask ${peer} for review once you have concrete work or a specific question.`
);
return parts.join("\n\n");
};
Expand Down Expand Up @@ -245,7 +245,7 @@ const buildInteractivePrimaryPrompt = (
const parts = [
`Agent-to-agent pair programming: you are the primary ${capitalize(opts.agent)} agent for this run.`,
"No task has been assigned yet.",
`Your peer is ${peer}. Use "send_to_agent" for review or help once the human gives you a task.`,
`Your peer is ${peer}. Use "send_message" for review or help once the human gives you a task.`,
];
appendProofPrompt(parts, opts.proof);
parts.push(
Expand Down
Loading
Loading