diff --git a/package.json b/package.json index 547df39..995236c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "openclaw-node", - "version": "0.12.1", + "version": "0.13.0", "description": "Node.js client for the OpenClaw Gateway WebSocket protocol", "main": "dist/index.js", "module": "dist/index.mjs", diff --git a/src/__tests__/agent-wait.test.ts b/src/__tests__/agent-wait.test.ts new file mode 100644 index 0000000..7e3b6bf --- /dev/null +++ b/src/__tests__/agent-wait.test.ts @@ -0,0 +1,149 @@ +import * as fs from "fs"; +import * as path from "path"; +import * as os from "os"; +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { OpenClawClient } from "../client"; +import type { AgentWaitResult } from "../types"; +import { installMockWebSocket, getMockWs, completeHandshake } from "./helpers/mock-ws"; + +describe("agentWait", () => { + let client: OpenClawClient; + let tmpDir: string; + + beforeEach(async () => { + installMockWebSocket(); + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-agent-wait-test-")); + client = new OpenClawClient({ + url: "ws://localhost:18789", + deviceIdentityPath: path.join(tmpDir, "device-identity.json"), + autoReconnect: false, + }); + await completeHandshake(client); + }); + + afterEach(async () => { + await client.disconnect(); + vi.restoreAllMocks(); + fs.rmSync(tmpDir, { recursive: true, force: true }); + }); + + it("sends agent.wait with runId and timeoutMs when timeoutMs is given", async () => { + const ws = getMockWs(); + const sentBefore = ws.sent.length; + + const waitPromise = client.agentWait("run-123", { timeoutMs: 5000 }); + + const sentMsg = JSON.parse(ws.sent[sentBefore]); + expect(sentMsg.type).toBe("req"); + expect(sentMsg.method).toBe("agent.wait"); + expect(sentMsg.params).toEqual({ runId: "run-123", timeoutMs: 5000 }); + + ws.simulateMessage({ + type: "res", + id: sentMsg.id, + ok: true, + payload: { status: "pending" }, + }); + + await waitPromise; + }); + + it("sends params without timeoutMs when omitted", async () => { + const ws = getMockWs(); + const sentBefore = ws.sent.length; + + const waitPromise = client.agentWait("run-456"); + + const sentMsg = JSON.parse(ws.sent[sentBefore]); + expect(sentMsg.type).toBe("req"); + expect(sentMsg.method).toBe("agent.wait"); + expect(sentMsg.params).toEqual({ runId: "run-456" }); + expect("timeoutMs" in sentMsg.params).toBe(false); + + ws.simulateMessage({ + type: "res", + id: sentMsg.id, + ok: true, + payload: { status: "pending" }, + }); + + await waitPromise; + }); + + it("forwards an explicit timeoutMs of 0 (not dropped as falsy)", async () => { + const ws = getMockWs(); + const sentBefore = ws.sent.length; + + const waitPromise = client.agentWait("run-zero", { timeoutMs: 0 }); + + const sentMsg = JSON.parse(ws.sent[sentBefore]); + expect(sentMsg.params).toEqual({ runId: "run-zero", timeoutMs: 0 }); + expect(sentMsg.params.timeoutMs).toBe(0); + + ws.simulateMessage({ + type: "res", + id: sentMsg.id, + ok: true, + payload: { status: "pending" }, + }); + + await waitPromise; + }); + + it("resolves to the response payload typed as AgentWaitResult", async () => { + const ws = getMockWs(); + const sentBefore = ws.sent.length; + + const waitPromise = client.agentWait("run-789", { timeoutMs: 1000 }); + + const sentMsg = JSON.parse(ws.sent[sentBefore]); + const payload: AgentWaitResult = { + status: "pending", + runId: "run-789", + livenessState: "working", + endedAt: undefined, + }; + + ws.simulateMessage({ + type: "res", + id: sentMsg.id, + ok: true, + payload, + }); + + const result = await waitPromise; + expect(result).toEqual(payload); + expect(result.status).toBe("pending"); + expect(result.livenessState).toBe("working"); + }); + + it("resolves a terminal run with endedAt and stopReason", async () => { + const ws = getMockWs(); + const sentBefore = ws.sent.length; + + const waitPromise = client.agentWait("run-done", { timeoutMs: 1000 }); + + const sentMsg = JSON.parse(ws.sent[sentBefore]); + const payload: AgentWaitResult = { + status: "ok", + runId: "run-done", + endedAt: 1718539200000, + stopReason: "completed", + yielded: false, + providerStarted: true, + }; + + ws.simulateMessage({ + type: "res", + id: sentMsg.id, + ok: true, + payload, + }); + + const result = await waitPromise; + expect(result).toEqual(payload); + expect(result.status).toBe("ok"); + expect(result.endedAt).toBe(1718539200000); + expect(result.stopReason).toBe("completed"); + }); +}); diff --git a/src/__tests__/session-message-subscription.test.ts b/src/__tests__/session-message-subscription.test.ts new file mode 100644 index 0000000..e031b0a --- /dev/null +++ b/src/__tests__/session-message-subscription.test.ts @@ -0,0 +1,253 @@ +import * as fs from "fs"; +import * as path from "path"; +import * as os from "os"; +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { OpenClawClient } from "../client"; +import type { ProtocolEvent } from "../types"; +import { installMockWebSocket, getMockWs, completeHandshake } from "./helpers/mock-ws"; + +const KEY = "agent:agt-1:direct:usr-1"; + +describe("sessions.subscribeMessages", () => { + let client: OpenClawClient; + let tmpDir: string; + + beforeEach(async () => { + installMockWebSocket(); + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-session-sub-test-")); + client = new OpenClawClient({ + url: "ws://localhost:18789", + deviceIdentityPath: path.join(tmpDir, "device-identity.json"), + autoReconnect: false, + }); + await completeHandshake(client); + }); + + afterEach(async () => { + await client.disconnect(); + vi.restoreAllMocks(); + fs.rmSync(tmpDir, { recursive: true, force: true }); + }); + + /** Subscribe and resolve the subscribe response; returns the handle + captured events. */ + async function subscribe(opts?: { agentId?: string }) { + const ws = getMockWs(); + const sentBefore = ws.sent.length; + const events: ProtocolEvent[] = []; + const subPromise = client.sessions.subscribeMessages(KEY, (e) => events.push(e), opts); + + const subReq = JSON.parse(ws.sent[sentBefore]); + ws.simulateMessage({ + type: "res", + id: subReq.id, + ok: true, + payload: { subscribed: true, key: KEY }, + }); + const handle = await subPromise; + return { ws, events, handle, subReq }; + } + + it("sends sessions.messages.subscribe with the key (no agentId when omitted)", async () => { + const { subReq } = await subscribe(); + expect(subReq.type).toBe("req"); + expect(subReq.method).toBe("sessions.messages.subscribe"); + expect(subReq.params).toEqual({ key: KEY }); + expect("agentId" in subReq.params).toBe(false); + }); + + it("includes agentId when given", async () => { + const { subReq } = await subscribe({ agentId: "agt-1" }); + expect(subReq.params).toEqual({ key: KEY, agentId: "agt-1" }); + }); + + it("delivers session.message snapshot events for the subscribed session", async () => { + const { ws, events } = await subscribe(); + ws.simulateMessage({ + type: "event", + event: "session.message", + payload: { sessionKey: KEY, message: { role: "assistant", content: "hi" }, messageSeq: 3 }, + }); + expect(events).toHaveLength(1); + expect(events[0].event).toBe("session.message"); + expect((events[0].payload as { messageSeq?: number }).messageSeq).toBe(3); + }); + + it("delivers word-for-word agent deltas for the subscribed session", async () => { + const { ws, events } = await subscribe(); + ws.simulateMessage({ + type: "event", + event: "agent", + payload: { + sessionKey: KEY, + runId: "run-1", + stream: "assistant", + data: { delta: "Hel", text: "Hel" }, + }, + }); + expect(events).toHaveLength(1); + expect(events[0].event).toBe("agent"); + expect((events[0].payload as { data?: { delta?: string } }).data?.delta).toBe("Hel"); + }); + + it("ignores events for a different session", async () => { + const { ws, events } = await subscribe(); + ws.simulateMessage({ + type: "event", + event: "session.message", + payload: { sessionKey: "agent:other:direct:usr-9", message: {} }, + }); + expect(events).toHaveLength(0); + }); + + it("unsubscribe sends sessions.messages.unsubscribe and stops delivering events", async () => { + const { ws, events, handle } = await subscribe(); + + ws.simulateMessage({ + type: "event", + event: "session.message", + payload: { sessionKey: KEY, message: { content: "first" } }, + }); + expect(events).toHaveLength(1); + + const sentBefore = ws.sent.length; + const unsubPromise = handle.unsubscribe(); + const unsubReq = JSON.parse(ws.sent[sentBefore]); + expect(unsubReq.method).toBe("sessions.messages.unsubscribe"); + expect(unsubReq.params).toEqual({ key: KEY }); + ws.simulateMessage({ type: "res", id: unsubReq.id, ok: true, payload: { subscribed: false } }); + await unsubPromise; + + ws.simulateMessage({ + type: "event", + event: "session.message", + payload: { sessionKey: KEY, message: { content: "after-unsub" } }, + }); + expect(events).toHaveLength(1); + }); + + it("delivers events tagged with the Gateway's canonical key", async () => { + const canonical = "agent:agt-1:direct:usr-1#canonical"; + const ws = getMockWs(); + const sentBefore = ws.sent.length; + const events: ProtocolEvent[] = []; + const subPromise = client.sessions.subscribeMessages(KEY, (e) => events.push(e)); + + const subReq = JSON.parse(ws.sent[sentBefore]); + // Gateway echoes a DIFFERENT canonical key than the caller supplied. + ws.simulateMessage({ + type: "res", + id: subReq.id, + ok: true, + payload: { subscribed: true, key: canonical }, + }); + await subPromise; + + ws.simulateMessage({ + type: "event", + event: "agent", + payload: { sessionKey: canonical, runId: "r1", stream: "assistant", data: { delta: "hi" } }, + }); + expect(events).toHaveLength(1); + expect((events[0].payload as { sessionKey?: string }).sessionKey).toBe(canonical); + }); + + it("isolates concurrent subscriptions to different sessions", async () => { + const keyB = "agent:agt-2:direct:usr-2"; + const ws = getMockWs(); + + const aEvents: ProtocolEvent[] = []; + let s = ws.sent.length; + const subA = client.sessions.subscribeMessages(KEY, (e) => aEvents.push(e)); + const aReq = JSON.parse(ws.sent[s]); + ws.simulateMessage({ + type: "res", + id: aReq.id, + ok: true, + payload: { subscribed: true, key: KEY }, + }); + await subA; + + const bEvents: ProtocolEvent[] = []; + s = ws.sent.length; + const subB = client.sessions.subscribeMessages(keyB, (e) => bEvents.push(e)); + const bReq = JSON.parse(ws.sent[s]); + ws.simulateMessage({ + type: "res", + id: bReq.id, + ok: true, + payload: { subscribed: true, key: keyB }, + }); + const handleB = await subB; + + ws.simulateMessage({ + type: "event", + event: "session.message", + payload: { sessionKey: KEY, message: { content: "for A" } }, + }); + expect(aEvents).toHaveLength(1); + expect(bEvents).toHaveLength(0); + + // Unsubscribing B must not affect A. + s = ws.sent.length; + const unsubP = handleB.unsubscribe(); + const unsubReq = JSON.parse(ws.sent[s]); + ws.simulateMessage({ type: "res", id: unsubReq.id, ok: true, payload: { subscribed: false } }); + await unsubP; + + ws.simulateMessage({ + type: "event", + event: "session.message", + payload: { sessionKey: KEY, message: { content: "for A again" } }, + }); + expect(aEvents).toHaveLength(2); + expect(bEvents).toHaveLength(0); + }); + + it("removes the listener if the subscribe request rejects", async () => { + const ws = getMockWs(); + const sentBefore = ws.sent.length; + const events: ProtocolEvent[] = []; + const subPromise = client.sessions.subscribeMessages(KEY, (e) => events.push(e)); + + const subReq = JSON.parse(ws.sent[sentBefore]); + ws.simulateMessage({ + type: "res", + id: subReq.id, + ok: false, + error: { message: "subscribe denied" }, + }); + await expect(subPromise).rejects.toThrow("subscribe denied"); + + // Listener must have been removed — a matching event is ignored. + ws.simulateMessage({ + type: "event", + event: "session.message", + payload: { sessionKey: KEY, message: { content: "should be ignored" } }, + }); + expect(events).toHaveLength(0); + }); + + it("unsubscribe never throws even when the unsubscribe RPC fails", async () => { + // Same catch path as a send on an already-closed socket during teardown. + const { ws, events, handle } = await subscribe(); + + const sentBefore = ws.sent.length; + const unsubPromise = handle.unsubscribe(); + const unsubReq = JSON.parse(ws.sent[sentBefore]); + ws.simulateMessage({ + type: "res", + id: unsubReq.id, + ok: false, + error: { message: "unsubscribe failed" }, + }); + await expect(unsubPromise).resolves.toBeUndefined(); + + // Listener is still removed despite the failed RPC. + ws.simulateMessage({ + type: "event", + event: "session.message", + payload: { sessionKey: KEY, message: { content: "after-failed-unsub" } }, + }); + expect(events).toHaveLength(0); + }); +}); diff --git a/src/__tests__/sessions-describe.test.ts b/src/__tests__/sessions-describe.test.ts new file mode 100644 index 0000000..dd38897 --- /dev/null +++ b/src/__tests__/sessions-describe.test.ts @@ -0,0 +1,114 @@ +import * as fs from "fs"; +import * as path from "path"; +import * as os from "os"; +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { OpenClawClient } from "../client"; +import type { SessionDescribeResult } from "../types"; +import { installMockWebSocket, getMockWs, completeHandshake } from "./helpers/mock-ws"; + +describe("sessions.describe", () => { + let client: OpenClawClient; + let tmpDir: string; + + beforeEach(async () => { + installMockWebSocket(); + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-sessions-describe-test-")); + client = new OpenClawClient({ + url: "ws://localhost:18789", + deviceIdentityPath: path.join(tmpDir, "device-identity.json"), + autoReconnect: false, + }); + await completeHandshake(client); + }); + + afterEach(async () => { + await client.disconnect(); + vi.restoreAllMocks(); + fs.rmSync(tmpDir, { recursive: true, force: true }); + }); + + it("sends sessions.describe with key and no agentId when omitted", async () => { + const ws = getMockWs(); + const sentBefore = ws.sent.length; + + const describePromise = client.sessions.describe("agent:agt-1:direct:usr-1"); + + const sentMsg = JSON.parse(ws.sent[sentBefore]); + expect(sentMsg.type).toBe("req"); + expect(sentMsg.method).toBe("sessions.describe"); + expect(sentMsg.params).toEqual({ key: "agent:agt-1:direct:usr-1" }); + expect("agentId" in sentMsg.params).toBe(false); + + ws.simulateMessage({ + type: "res", + id: sentMsg.id, + ok: true, + payload: { key: "agent:agt-1:direct:usr-1", exists: true }, + }); + + await describePromise; + }); + + it("includes agentId when given", async () => { + const ws = getMockWs(); + const sentBefore = ws.sent.length; + + const describePromise = client.sessions.describe("agent:agt-1:direct:usr-1", { + agentId: "agt-1", + }); + + const sentMsg = JSON.parse(ws.sent[sentBefore]); + expect(sentMsg.method).toBe("sessions.describe"); + expect(sentMsg.params).toEqual({ key: "agent:agt-1:direct:usr-1", agentId: "agt-1" }); + + ws.simulateMessage({ + type: "res", + id: sentMsg.id, + ok: true, + payload: { key: "agent:agt-1:direct:usr-1", exists: true }, + }); + + await describePromise; + }); + + it("resolves to the response payload typed as SessionDescribeResult", async () => { + const ws = getMockWs(); + const sentBefore = ws.sent.length; + + const describePromise = client.sessions.describe("agent:agt-1:direct:usr-1"); + + const sentMsg = JSON.parse(ws.sent[sentBefore]); + const payload: SessionDescribeResult = { + key: "agent:agt-1:direct:usr-1", + exists: true, + status: "processing", + activeRunId: "run-1", + lastActivityAt: 1718539200000, + }; + + ws.simulateMessage({ type: "res", id: sentMsg.id, ok: true, payload }); + + const result = await describePromise; + expect(result).toEqual(payload); + expect(result.exists).toBe(true); + expect(result.activeRunId).toBe("run-1"); + }); + + it("resolves a not-found session", async () => { + const ws = getMockWs(); + const sentBefore = ws.sent.length; + + const describePromise = client.sessions.describe("agent:gone:direct:usr-1"); + + const sentMsg = JSON.parse(ws.sent[sentBefore]); + ws.simulateMessage({ + type: "res", + id: sentMsg.id, + ok: true, + payload: { exists: false }, + }); + + const result = await describePromise; + expect(result.exists).toBe(false); + }); +}); diff --git a/src/client.ts b/src/client.ts index fd7d4b4..24c09d7 100644 --- a/src/client.ts +++ b/src/client.ts @@ -2,6 +2,8 @@ import { EventEmitter } from "events"; import { isValidProtocolMessage, type AgentsListResult, + type AgentWaitResult, + type SessionDescribeResult, type ChatAttachment, type ClientRole, type ConnectParams, @@ -708,6 +710,73 @@ export class OpenClawClient extends EventEmitter { }); return res.payload; }, + describe: async (key: string, opts?: { agentId?: string }): Promise => { + const res = await this.request("sessions.describe", { + key, + ...(opts?.agentId !== undefined && { agentId: opts.agentId }), + }); + return res.payload as unknown as SessionDescribeResult; + }, + /** + * Subscribe to a session's live event stream via `sessions.messages.subscribe`. + * + * The Gateway then pushes every event for this session to this connection — + * such as word-for-word assistant deltas (`event: "agent"`), transcript + * snapshots (`event: "session.message"`), tool events, and lifecycle/state + * changes — regardless of which connection triggered the run. `handler` is + * invoked with each `ProtocolEvent` whose `payload.sessionKey` matches the + * subscribed session (the caller-supplied key OR the canonical key the + * Gateway echoes in the subscribe response — so canonicalization can't + * silently drop events). + * + * The listener is registered before the subscribe request is sent, so no + * event is missed in the subscribe window. The subscription does NOT survive + * a reconnect (the Gateway drops it when the socket closes) — re-subscribe on + * the client's `connected` event if you need it to persist. Call the returned + * `unsubscribe()` to stop; it always removes the local listener and makes a + * best-effort unsubscribe RPC (it never throws, so teardown is safe even when + * the socket is already closed). + */ + subscribeMessages: async ( + key: string, + handler: (event: ProtocolEvent) => void, + opts?: { agentId?: string }, + ): Promise<{ unsubscribe: () => Promise }> => { + // Match both the caller-supplied key and the Gateway's canonical key + // (added once the subscribe response arrives) so a canonicalized + // `sessionKey` on pushed events is still delivered. + const matchKeys = new Set([key]); + const listener = (event: ProtocolEvent) => { + const sessionKey = (event.payload as { sessionKey?: string } | undefined)?.sessionKey; + if (sessionKey !== undefined && matchKeys.has(sessionKey)) handler(event); + }; + this.on("event", listener); + const params = { + key, + ...(opts?.agentId !== undefined && { agentId: opts.agentId }), + }; + let res: ProtocolResponse; + try { + res = await this.request("sessions.messages.subscribe", params); + } catch (err) { + this.off("event", listener); + throw err; + } + const canonicalKey = (res.payload as { key?: string } | undefined)?.key; + if (typeof canonicalKey === "string") matchKeys.add(canonicalKey); + return { + unsubscribe: async () => { + this.off("event", listener); + // Best-effort: the local listener is already gone, so a closed socket + // (e.g. during teardown) must not turn cleanup into a thrown error. + try { + await this.request("sessions.messages.unsubscribe", params); + } catch { + // ignore — nothing left to clean up locally + } + }, + }; + }, }; /** @@ -801,6 +870,27 @@ export class OpenClawClient extends EventEmitter { }, }; + /** + * Wait for a run to reach a terminal state via the `agent.wait` RPC. + * + * This is the Gateway's authoritative run-liveness oracle: given a `runId` it + * long-polls and returns the run's current or terminal state. While the run + * is still going the result has `status: "pending"`; once it ends the result + * has `status: "ok"` with `endedAt` and `stopReason` set. If the poll window + * elapses first the result has `status: "timeout"`. + * + * @param runId - The run to wait on. + * @param opts.timeoutMs - How long the Gateway should long-poll before + * returning a "timeout" result. Omit to use the Gateway default. + */ + async agentWait(runId: string, opts?: { timeoutMs?: number }): Promise { + const res = await this.request("agent.wait", { + runId, + ...(opts?.timeoutMs !== undefined && { timeoutMs: opts.timeoutMs }), + }); + return res.payload as unknown as AgentWaitResult; + } + /** * Send a raw protocol request and wait for the response. */ diff --git a/src/index.ts b/src/index.ts index 4dd5708..de3dd6c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -20,4 +20,6 @@ export type { AgentIdentity, AgentSummary, AgentsListResult, + AgentWaitResult, + SessionDescribeResult, } from "./types"; diff --git a/src/types.ts b/src/types.ts index f7e616f..3fb0434 100644 --- a/src/types.ts +++ b/src/types.ts @@ -209,6 +209,46 @@ export interface AgentsListResult { agents: AgentSummary[]; } +/** + * Result of `agent.wait`. The Gateway's authoritative run-liveness oracle: + * given a `runId`, it long-polls and reports the run's current or terminal + * state. `status` is the long-poll outcome ("pending" while still running, + * "ok" once it ended, "timeout" if the poll window elapsed first, "error" on + * failure). Terminal fields (`endedAt`, `stopReason`) are only set once the run + * has finished. Fields beyond `status` are best-effort and may vary by Gateway + * version. + */ +export interface AgentWaitResult { + status: "ok" | "pending" | "timeout" | "error"; + runId?: string; + /** Epoch millis when the run finished; set once the run has ended. */ + endedAt?: number; + stopReason?: string; + livenessState?: "working" | "paused" | "blocked" | "abandoned"; + yielded?: boolean; + timeoutPhase?: string; + providerStarted?: boolean; +} + +/** + * Result of `sessions.describe`. The Gateway's view of a single session: whether + * it exists, its current state, and any in-flight run. Use it to answer "does + * this session exist / is a run active?" without polling history. The Gateway + * owns the exact shape — these fields are best-effort and may vary by Gateway + * version. + */ +export interface SessionDescribeResult { + key?: string; + /** Whether the Gateway knows this session. */ + exists?: boolean; + /** Current session state, e.g. "idle" | "processing" | "waiting". */ + status?: string; + /** The in-flight run for this session, or null when idle. */ + activeRunId?: string | null; + /** Epoch millis of the last activity on this session. */ + lastActivityAt?: number; +} + // --- Config --- export interface ConfigGetResult {