From 4f6f25a04c4f1d425c41a38abbe8a6809f3a0146 Mon Sep 17 00:00:00 2001 From: Pushkar Kathayat Date: Sat, 14 Mar 2026 10:14:03 +0530 Subject: [PATCH] Implement steer() method for SDK query interface Use pi-agent-core's native agent.steer() API to allow redirecting an agent mid-conversation. The steer message is queued and picked up between tool calls by the agent loop, avoiding the need for custom abort/re-prompt logic. - Hoist agent reference to outer scope so steer() can access it - Implement steer() using agent.steer() with UserMessage format - Add steer() tests (throws before agent loads, method exists) - Add examples/test-steer.ts for manual SDK steer verification --- examples/test-steer.ts | 80 ++++++++++++++++++++++++++++++++++++++++++ src/sdk.ts | 15 ++++++-- test/sdk.test.ts | 30 ++++++++++++++++ 3 files changed, 122 insertions(+), 3 deletions(-) create mode 100644 examples/test-steer.ts diff --git a/examples/test-steer.ts b/examples/test-steer.ts new file mode 100644 index 0000000..1e7e038 --- /dev/null +++ b/examples/test-steer.ts @@ -0,0 +1,80 @@ +/** + * Test the steer() SDK method. + * + * Strategy: Use a prompt that forces tool usage (e.g. "read a file"), + * so the agent loop stays alive long enough for steer() to be picked up + * between tool calls. + * + * Usage: + * export OPENAI_API_KEY="sk-..." + * node --experimental-strip-types --experimental-detect-module examples/test-steer.ts + */ + +import { query } from "../dist/exports.js"; + +async function main() { + const agentDir = new URL("../agents/assistant", import.meta.url).pathname; + + console.log("Starting query...\n"); + const q = query({ + prompt: "Read the file SOUL.md and then read RULES.md and summarize both files.", + dir: agentDir, + }); + + let steered = false; + + // Steer after 3 seconds — the agent should still be processing tool calls + setTimeout(() => { + if (!steered) { + steered = true; + console.log("\n[STEERING] → 'Stop. Forget the files. Just say: steering works!'\n"); + q.steer("Stop. Forget the files. Just say exactly this: STEERING WORKS!"); + } + }, 3000); + + for await (const msg of q) { + if (msg.type === "system" && msg.subtype === "session_start") { + console.log(`[session started]`); + } + + if (msg.type === "delta" && msg.deltaType === "text") { + process.stdout.write(msg.content); + } + + if (msg.type === "assistant") { + console.log(`\n[assistant message, stopReason=${msg.stopReason}]`); + } + + if (msg.type === "tool_use") { + console.log(`\n[tool call: ${msg.toolName}]`); + } + + if (msg.type === "tool_result") { + console.log(`[tool result: ${msg.toolName}, ${msg.content.length} chars]`); + } + + if (msg.type === "user") { + console.log(`\n[user steer: "${msg.content}"]`); + } + + if (msg.type === "system" && msg.subtype === "session_end") { + console.log(`\n[session ended]`); + } + + if (msg.type === "system" && msg.subtype === "error") { + console.log(`\n[error: ${msg.content}]`); + } + } + + console.log(`\nTotal messages: ${q.messages().length}`); + const userMsgs = q.messages().filter((m: any) => m.type === "user"); + console.log(`Steer messages: ${userMsgs.length}`); + if (userMsgs.length > 0) { + console.log("Steer content:", userMsgs.map((m: any) => m.content)); + } +} + +main().catch((err) => { + console.error("Error:", err.message); + process.exit(1); +}); diff --git a/src/sdk.ts b/src/sdk.ts index 0e51f50..f80ccd1 100644 --- a/src/sdk.ts +++ b/src/sdk.ts @@ -110,6 +110,7 @@ export function query(options: QueryOptions): Query { // These are set once the agent is loaded (async init below) let _sessionId = options.sessionId ?? ""; let _manifest: AgentManifest | null = null; + let _agent: Agent | null = null; // Accumulate streaming deltas for the current message let accText = ""; @@ -280,6 +281,7 @@ export function query(options: QueryOptions): Query { ...modelOptions, }, }); + _agent = agent; // 9. Subscribe to events and map to GCMessage agent.subscribe((event: AgentEvent) => { @@ -467,9 +469,16 @@ export function query(options: QueryOptions): Query { ac.abort(); }, - steer(_message: string) { - // Steering requires agent reference — for now this is a placeholder. - // Full steering support would require exposing the Agent instance. + steer(message: string) { + if (!_agent) { + throw new Error("Agent not yet loaded — cannot steer before the query starts streaming"); + } + pushMsg({ type: "user", content: message }); + _agent.steer({ + role: "user", + content: message, + timestamp: Date.now(), + }); }, sessionId() { diff --git a/test/sdk.test.ts b/test/sdk.test.ts index a71fcc6..ff692ce 100644 --- a/test/sdk.test.ts +++ b/test/sdk.test.ts @@ -201,6 +201,36 @@ describe("wrapToolWithProgrammaticHooks()", () => { }); }); +// ── steer() ─────────────────────────────────────────────────────────── + +describe("steer()", () => { + it("throws when agent is not loaded yet", () => { + const q = query({ + prompt: "hello", + dir: "/nonexistent/path", + }); + + assert.throws(() => { + q.steer("focus on security instead"); + }, /Agent not yet loaded/); + + // Clean up + q.return(); + }); + + it("steer method exists on Query object", () => { + const q = query({ + prompt: "hello", + dir: "/nonexistent/path", + }); + + assert.equal(typeof q.steer, "function"); + + // Clean up + q.return(); + }); +}); + // ── query() error handling ───────────────────────────────────────────── describe("query()", () => {