-
Notifications
You must be signed in to change notification settings - Fork 14
Clean up paired bridge messaging #56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,5 @@ | ||
| import { mkdirSync, watch } from "node:fs"; | ||
| import { basename } from "node:path"; | ||
| import { claudeChannelServerName } from "./bridge-config"; | ||
| import { BRIDGE_SERVER as BRIDGE_SERVER_VALUE } from "./bridge-constants"; | ||
| import { | ||
|
|
@@ -28,8 +30,8 @@ import { | |
| import { LOOP_VERSION } from "./constants"; | ||
| import type { Agent } from "./types"; | ||
|
|
||
| const CHANNEL_POLL_DELAY_MS = 500; | ||
| const CLAUDE_CHANNEL_CAPABILITY = "claude/channel"; | ||
| const CLAUDE_CHANNEL_FALLBACK_SWEEP_MS = 2000; | ||
| const CONTENT_LENGTH_RE = /Content-Length:\s*(\d+)/i; | ||
| const CONTENT_LENGTH_PREFIX = "content-length:"; | ||
| const DEFAULT_PROTOCOL_VERSION = "2024-11-05"; | ||
|
|
@@ -142,7 +144,7 @@ const handleReceiveMessagesTool = ( | |
| }); | ||
| }; | ||
|
|
||
| const handleSendToAgentTool = async ( | ||
| const handleSendMessageTool = async ( | ||
| id: JsonRpcRequest["id"], | ||
| runDir: string, | ||
| source: Agent, | ||
|
|
@@ -154,7 +156,7 @@ const handleSendToAgentTool = async ( | |
| writeError( | ||
| id, | ||
| MCP_INVALID_PARAMS, | ||
| "send_to_agent requires a non-empty target" | ||
| "send_message requires a non-empty target" | ||
| ); | ||
| return; | ||
| } | ||
|
|
@@ -171,15 +173,15 @@ const handleSendToAgentTool = async ( | |
| writeError( | ||
| id, | ||
| MCP_INVALID_PARAMS, | ||
| "send_to_agent requires a non-empty message" | ||
| "send_message requires a non-empty message" | ||
| ); | ||
| return; | ||
| } | ||
| if (target === source) { | ||
| writeError( | ||
| id, | ||
| MCP_INVALID_PARAMS, | ||
| "send_to_agent cannot target the current agent" | ||
| "send_message cannot target the current agent" | ||
| ); | ||
| return; | ||
| } | ||
|
|
@@ -244,12 +246,21 @@ const handleToolCall = async ( | |
| return; | ||
| } | ||
|
|
||
| if (name !== "send_to_agent") { | ||
| if (name === "send_to_agent") { | ||
| writeError( | ||
| id, | ||
| MCP_INVALID_PARAMS, | ||
| 'Unknown tool: send_to_agent. Use "send_message" instead.' | ||
| ); | ||
| return; | ||
| } | ||
|
|
||
| if (name !== "send_message") { | ||
| writeError(id, MCP_INVALID_PARAMS, `Unknown tool: ${name}`); | ||
| return; | ||
| } | ||
|
|
||
| await handleSendToAgentTool(id, runDir, source, args); | ||
| await handleSendMessageTool(id, runDir, source, args); | ||
| }; | ||
|
|
||
| const requestedProtocolVersion = (request: JsonRpcRequest): string => | ||
|
|
@@ -312,7 +323,7 @@ const handleBridgeRequest = async ( | |
| tools: [ | ||
| { | ||
| annotations: MUTATING_TOOL_ANNOTATIONS, | ||
| description: "Send an explicit message to the paired agent.", | ||
| description: "Send a direct message to the paired agent.", | ||
| inputSchema: { | ||
| additionalProperties: false, | ||
| properties: { | ||
|
|
@@ -325,12 +336,12 @@ const handleBridgeRequest = async ( | |
| required: ["target", "message"], | ||
| type: "object", | ||
| }, | ||
| name: "send_to_agent", | ||
| name: "send_message", | ||
| }, | ||
| { | ||
| annotations: READ_ONLY_TOOL_ANNOTATIONS, | ||
| description: | ||
| "Inspect the current paired run and pending bridge messages.", | ||
| "Inspect the current paired run and pending bridge state when delivery looks stuck.", | ||
| inputSchema: { | ||
| additionalProperties: false, | ||
| properties: {}, | ||
|
|
@@ -341,7 +352,7 @@ const handleBridgeRequest = async ( | |
| { | ||
| annotations: RECEIVE_MESSAGES_TOOL_ANNOTATIONS, | ||
| description: | ||
| "Read and clear pending bridge messages addressed to you.", | ||
| "Read and clear pending bridge messages addressed to you when delivery looks stuck.", | ||
| inputSchema: { | ||
| additionalProperties: false, | ||
| properties: {}, | ||
|
|
@@ -481,12 +492,24 @@ const consumeFrames = ( | |
| process.stdin.on("error", reject); | ||
| }); | ||
|
|
||
| const isBridgeWatchEvent = ( | ||
| runDir: string, | ||
| filename: string | Buffer | null | ||
| ): boolean => { | ||
| if (!filename) { | ||
| return true; | ||
| } | ||
| return filename.toString() === basename(bridgePath(runDir)); | ||
| }; | ||
|
|
||
| export const runBridgeMcpServer = async ( | ||
| runDir: string, | ||
| source: Agent | ||
| ): Promise<void> => { | ||
| let channelReady = false; | ||
| let bridgeWatcher: { close: () => void } | undefined; | ||
| let closed = false; | ||
| let fallbackSweep: ReturnType<typeof setTimeout> | undefined; | ||
| let flushQueue: Promise<void> = Promise.resolve(); | ||
| let requestQueue: Promise<void> = Promise.resolve(); | ||
| const queueClaudeFlush = (): Promise<void> => { | ||
|
|
@@ -499,39 +522,74 @@ export const runBridgeMcpServer = async ( | |
| flushQueue = flushQueue.then(next, next); | ||
| return flushQueue; | ||
| }; | ||
| const pollClaudeChannel = async (): Promise<void> => { | ||
| while (!closed) { | ||
| await queueClaudeFlush(); | ||
|
|
||
| const triggerClaudeFlush = (): void => { | ||
| queueClaudeFlush().catch(() => undefined); | ||
| }; | ||
|
|
||
| const clearClaudeSweep = (): void => { | ||
| if (!fallbackSweep) { | ||
| return; | ||
| } | ||
| clearTimeout(fallbackSweep); | ||
| fallbackSweep = undefined; | ||
| }; | ||
|
|
||
| const scheduleClaudeSweep = (): void => { | ||
| if (!(source === "claude" && channelReady) || closed || fallbackSweep) { | ||
| return; | ||
| } | ||
| fallbackSweep = setTimeout(() => { | ||
| fallbackSweep = undefined; | ||
| if (closed) { | ||
| return; | ||
| } | ||
| await new Promise((resolve) => { | ||
| setTimeout(resolve, CHANNEL_POLL_DELAY_MS); | ||
| }); | ||
| } | ||
| triggerClaudeFlush(); | ||
| scheduleClaudeSweep(); | ||
| }, CLAUDE_CHANNEL_FALLBACK_SWEEP_MS); | ||
| fallbackSweep.unref?.(); | ||
| }; | ||
|
|
||
| process.stdin.resume(); | ||
| const poller = source === "claude" ? pollClaudeChannel() : Promise.resolve(); | ||
| await consumeFrames( | ||
| (request) => { | ||
| const handleRequest = async (): Promise<void> => { | ||
| if (request.method === "notifications/initialized") { | ||
| channelReady = true; | ||
| if (source === "claude") { | ||
| mkdirSync(runDir, { recursive: true }); | ||
| try { | ||
| bridgeWatcher = watch(runDir, (_eventType, filename) => { | ||
| if (!isBridgeWatchEvent(runDir, filename)) { | ||
| return; | ||
| } | ||
| await handleBridgeRequest(runDir, source, request); | ||
| await queueClaudeFlush(); | ||
| }; | ||
| requestQueue = requestQueue.then(handleRequest, handleRequest); | ||
| }, | ||
| () => { | ||
| closed = true; | ||
| triggerClaudeFlush(); | ||
| }); | ||
|
Comment on lines
+556
to
+561
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
| } catch { | ||
| bridgeWatcher = undefined; | ||
| } | ||
| ); | ||
| closed = true; | ||
| } | ||
|
|
||
| process.stdin.resume(); | ||
| try { | ||
| await consumeFrames( | ||
| (request) => { | ||
| const handleRequest = async (): Promise<void> => { | ||
| if (request.method === "notifications/initialized") { | ||
| channelReady = true; | ||
| scheduleClaudeSweep(); | ||
| } | ||
| await handleBridgeRequest(runDir, source, request); | ||
| await queueClaudeFlush(); | ||
| }; | ||
| requestQueue = requestQueue.then(handleRequest, handleRequest); | ||
| }, | ||
| () => { | ||
| closed = true; | ||
| } | ||
| ); | ||
| } finally { | ||
| closed = true; | ||
| bridgeWatcher?.close(); | ||
| clearClaudeSweep(); | ||
| } | ||
|
|
||
| await requestQueue; | ||
| await queueClaudeFlush(); | ||
| await poller; | ||
| }; | ||
|
|
||
| export const bridgeInternals = { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fallback sweep logic could potentially lead to multiple overlapping timers if
scheduleClaudeSweepis called concurrently beforefallbackSweepis assigned. While the current usage innotifications/initializedseems safe, adding a check to ensure any existing timer is cleared before scheduling a new one would be more robust.