diff --git a/README.md b/README.md index ebead81..bb30d1e 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,27 @@ const convo = createConversation(bus, { }); ``` +### Events (recommended) + +`createConversation()` returns a handle that can emit typed events. This is the easiest way to attach multiple independent listeners (CLI output, logging, persistence, UI) without composing callbacks. + +```typescript +const convo = createConversation(bus, { + participants: ["agent1", "agent2"], + topic: "Discussion topic", + maxTurns: 10, +}); + +// Stream tokens +convo.on("token", ({ chunk }) => process.stdout.write(chunk)); + +// Turn boundaries +convo.on("turnComplete", ({ turn }) => console.log(`\n---\n${turn.speaker}: ${turn.content}`)); + +// State changes +convo.on("state", ({ state }) => console.log("State:", state)); +``` + #### ConversationOptions | Option | Type | Default | Description | @@ -109,6 +130,8 @@ const convo = createConversation(bus, { | `onTurnComplete` | `(turn: ChatMessage) => void` | — | Called when a turn finishes | | `onStateChange` | `(state: LoopState) => void` | — | Called when conversation state changes | +Note: the callback options above are still supported for backward compatibility, but events are preferred if you need more than one listener. + ### `attachInteractiveConsole(convo, config?)` Attaches readline interface for CLI interaction. Provides real-time message injection and interrupt capabilities. diff --git a/examples/classroom.ts b/examples/classroom.ts new file mode 100644 index 0000000..807e37d --- /dev/null +++ b/examples/classroom.ts @@ -0,0 +1,66 @@ +import { createChatBus, createConversation } from "../src/index.js"; +import { anthropicAdapter } from "../src/adapters/anthropic.js"; + +const colors: Record = { + teacher: "\x1b[34m", // Blue + student: "\x1b[32m", // Green + reset: "\x1b[0m", +}; +function colorize(name: string): string { + const c = colors[name] || ""; + return `${c}[${name}]${colors.reset}`; +} + +const bus = createChatBus(); + +bus.register({ + name: "teacher", + type: "llm", + system: + "You are a friendly math teacher. Teach step-by-step, ask one short question at a time, and keep answers concise.", + adapter: anthropicAdapter({ + model: "claude-haiku-4-5-20251001", + maxTokens: 180, + }), +}); + +bus.register({ + name: "student", + type: "llm", + system: + "You are a curious student. Answer briefly, show your work, and ask for clarification when confused.", + adapter: anthropicAdapter({ + model: "claude-haiku-4-5-20251001", + maxTokens: 180, + }), +}); + +const convo = createConversation(bus, { + participants: ["teacher", "student"], + topic: + "Teach addition using a simple example: 7 + 5. The teacher should explain, then ask the student to solve 9 + 6.", + maxTurns: 6, + delayMs: 500, +}); + +// Use the new typed events API +convo.on("turnStart", ({ speaker }) => { + process.stdout.write(`\n${colorize(speaker)} `); +}); + +convo.on("token", ({ chunk }) => { + process.stdout.write(chunk); +}); + +convo.on("turnComplete", ({ turn }) => { + process.stdout.write(`\n${"─".repeat(50)}\n`); + if (turn.partial) process.stdout.write("⚠️ (partial — interrupted)\n"); +}); + +convo.on("stopped", ({ reason }) => { + process.stdout.write(`\n✅ Stopped: ${reason}\n`); +}); + +console.log("🏫 Classroom: Addition (teacher ↔ student)"); +const history = await convo.start(); +console.log(`\n📜 History: ${history.length} messages`); diff --git a/examples/events.ts b/examples/events.ts new file mode 100644 index 0000000..dce433c --- /dev/null +++ b/examples/events.ts @@ -0,0 +1,96 @@ +import * as readline from "node:readline"; +import { createChatBus, createConversation } from "../src/index.js"; +import { anthropicAdapter } from "../src/adapters/anthropic.js"; + +const bus = createChatBus(); + +bus.register({ + name: "optimist", + type: "llm", + system: "You are an optimistic thinker. Keep responses to 2 sentences max.", + adapter: anthropicAdapter({ + model: "claude-haiku-4-5-20251001", + maxTokens: 150, + }), +}); + +bus.register({ + name: "skeptic", + type: "llm", + system: "You are a critical skeptic. Keep responses to 2 sentences max.", + adapter: anthropicAdapter({ + model: "claude-haiku-4-5-20251001", + maxTokens: 150, + }), +}); + +const convo = createConversation(bus, { + participants: ["optimist", "skeptic"], + topic: "Will AI make software developers more productive?", + maxTurns: 6, + delayMs: 2000, +}); + +// ── Events (preferred): multiple independent listeners + +let currentTurnSpeaker: string | null = null; +convo.on("turnStart", ({ speaker }) => { + currentTurnSpeaker = speaker; + process.stdout.write(`\n[${speaker}] `); +}); + +convo.on("token", ({ chunk }) => { + process.stdout.write(chunk); +}); + +convo.on("turnComplete", ({ turn }) => { + process.stdout.write(`\n${"─".repeat(50)}\n`); + if (turn.partial) process.stdout.write("⚠️ (partial — interrupted)\n"); +}); + +convo.on("state", ({ state }) => { + if (state === "stopped") process.stdout.write("\n✅ Conversation ended.\n"); +}); + +// Example of once(): run only on the first completed turn +convo.once("turnComplete", ({ turn }) => { + process.stdout.write(`(first turn by ${turn.speaker} completed)\n`); +}); + +// Example of unsubscribe(): stop printing state transitions after first one +const unsubscribeStateAfterFirst = convo.on("state", ({ state }) => { + process.stdout.write(`(state changed: ${state})\n`); + unsubscribeStateAfterFirst(); +}); + +// ── readline — keyboard interrupt and injection +const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, +}); + +rl.on("line", (input) => { + const msg = input.trim(); + if (!msg) return; + const result = convo.send(msg); + if (result.intent === "interrupt") { + console.log("\n⚡ Interrupted — your message injected."); + } else { + console.log("\n💬 Message injected."); + } +}); + +rl.on("SIGINT", () => { + console.log("\n🛑 Stopping..."); + convo.stop(); + rl.close(); +}); + +console.log("🎬 Topic: Will AI make software developers more productive?"); +console.log("💡 Type + Enter to interrupt anytime. Ctrl+C to stop.\n"); + +const history = await convo.start(); + +console.log(`\n📜 History: ${history.length} messages`); +rl.close(); + diff --git a/src/conversation.ts b/src/conversation.ts index 6f9f436..e0f8ace 100644 --- a/src/conversation.ts +++ b/src/conversation.ts @@ -1,4 +1,3 @@ -import { randomUUID } from "node:crypto"; import type { ChatMessage, ConversationOptions, @@ -6,10 +5,14 @@ import type { LoopState, SendResult, TurnContext, + ConversationEventMap, + ConversationStoppedReason, + HumanAwaitingReason, } from "./types.js"; import type { ChatBus } from "./bus.js"; import { createMessageStore } from "./history.js"; import { createAbortManager } from "./manager.js"; +import { createTypedEmitter } from "./emitter.js"; export function createConversation( bus: ChatBus, @@ -47,9 +50,12 @@ export function createConversation( const store = createMessageStore(); const manager = createAbortManager(); + const events = createTypedEmitter(); let _state: LoopState = "idle"; let _stopped = false; + let _stopReason: ConversationStoppedReason | null = null; + let _stopTurnIndex: number | null = null; // The promise/resolve pair for human input (inject or interrupt). // When the loop needs human input, it awaits _humanInputPromise. @@ -72,14 +78,19 @@ export function createConversation( function setState(next: LoopState) { _state = next; + events.emit("state", { state: next }); onStateChange?.(next); } - function waitForHuman(): Promise { + function waitForHuman( + reason: HumanAwaitingReason, + turnIndex: number, + ): Promise { _humanInputPromise = new Promise((resolve) => { _humanInputResolve = resolve; }); setState("awaiting-human"); + events.emit("humanAwaiting", { reason, turnIndex }); return _humanInputPromise; } @@ -123,7 +134,7 @@ export function createConversation( // ── Human turn ──────────────────────────────────────────────────────── if (agent.type === "human") { - const humanMsg = await waitForHuman(); + const humanMsg = await waitForHuman("humanTurn", turnIndex); if (_stopped) break; if (humanMsg.trim()) { appendHuman(humanMsg, turnIndex); @@ -132,6 +143,7 @@ export function createConversation( } // ── LLM turn ────────────────────────────────────────────────────────── + events.emit("turnStart", { speaker: speakerName, turnIndex }); setState("streaming"); const projected = store.project(speakerName, agent.system); @@ -139,6 +151,7 @@ export function createConversation( let accumulated = ""; let wasAborted = false; + let fatalError: unknown = null; try { for await (const chunk of agent.adapter!.generate( @@ -146,12 +159,15 @@ export function createConversation( controller.signal, )) { accumulated += chunk; + events.emit("token", { speaker: speakerName, chunk, turnIndex }); onToken?.(chunk, speakerName); // Stop-sequence detection — strip it before saving. if (stopSequence && accumulated.includes(stopSequence)) { accumulated = accumulated.replace(stopSequence, "").trimEnd(); _stopped = true; + _stopReason = "stopSequence"; + _stopTurnIndex = turnIndex; break; } @@ -164,12 +180,22 @@ export function createConversation( } catch (err: unknown) { // AbortError is expected — a human interrupted mid-stream. const isAbort = err instanceof Error && err.name === "AbortError"; - if (!isAbort) throw err; - wasAborted = true; + if (isAbort) { + wasAborted = true; + } else { + fatalError = err; + } } manager.release(turnIndex); + if (fatalError !== null) { + events.emit("error", { error: fatalError, speaker: speakerName, turnIndex }); + _stopped = true; + if (_stopReason === null) _stopReason = "stop"; + if (_stopTurnIndex === null) _stopTurnIndex = turnIndex; + } + // Commit the turn — partial if aborted or interrupted. const isPartial = wasAborted || _pendingInterrupt !== null; @@ -181,6 +207,7 @@ export function createConversation( ...(isPartial ? { partial: true } : {}), }); + events.emit("turnComplete", { turn }); onTurnComplete?.(turn); // If a human interrupted mid-stream, inject their message now. @@ -216,7 +243,7 @@ export function createConversation( history: store.all(), }; if (pauseCondition(ctx)) { - const humanMsg = await waitForHuman(); + const humanMsg = await waitForHuman("pauseCondition", turnIndex); if (_stopped) break; // Only append if user typed something (interactive mode: empty = skip) if (humanMsg.trim()) { @@ -227,6 +254,10 @@ export function createConversation( } setState("stopped"); + if (_stopReason === null) { + _stopReason = _stopped ? "stop" : "maxTurns"; + } + events.emit("stopped", { reason: _stopReason, turnIndex: _stopTurnIndex }); return store.all(); } @@ -264,6 +295,8 @@ export function createConversation( function stop(): void { _stopped = true; + if (_stopReason === null) _stopReason = "stop"; + if (_stopTurnIndex === null) _stopTurnIndex = manager.activeTurnIndex(); manager.abort(); // If the loop is waiting for human input, resolve with empty string // so the await unblocks. The loop checks _stopped immediately after. @@ -278,6 +311,9 @@ export function createConversation( start, send, stop, + on: events.on, + off: events.off, + once: events.once, get state() { return _state; }, diff --git a/src/emitter.ts b/src/emitter.ts new file mode 100644 index 0000000..16757b1 --- /dev/null +++ b/src/emitter.ts @@ -0,0 +1,57 @@ +export type Handler = (payload: Payload) => void; + +export type TypedEmitter> = { + on(event: K, handler: Handler): () => void; + off(event: K, handler: Handler): void; + once(event: K, handler: Handler): () => void; + emit(event: K, payload: EventMap[K]): void; +}; + +export function createTypedEmitter>(): TypedEmitter { + const listeners = new Map>>(); + + function on( + event: K, + handler: Handler, + ): () => void { + const set = listeners.get(event) ?? new Set>(); + set.add(handler); + listeners.set(event, set); + return () => off(event, handler); + } + + function off(event: K, handler: Handler): void { + const set = listeners.get(event); + if (!set) return; + set.delete(handler as Handler); + if (set.size === 0) listeners.delete(event); + } + + function once( + event: K, + handler: Handler, + ): () => void { + const unsub = on(event, (payload) => { + unsub(); + handler(payload); + }); + return unsub; + } + + function emit(event: K, payload: EventMap[K]): void { + const set = listeners.get(event); + if (!set || set.size === 0) return; + // Snapshot to prevent mutation during emit from affecting iteration order. + const snapshot = [...set]; + for (const fn of snapshot) { + try { + fn(payload); + } catch { + // Listener errors are isolated by design. + } + } + } + + return { on, off, once, emit }; +} + diff --git a/src/types.ts b/src/types.ts index 5c000dd..b320db0 100644 --- a/src/types.ts +++ b/src/types.ts @@ -80,6 +80,30 @@ export type ConversationOptions = { onStateChange?: (state: LoopState) => void; }; +// ─── Conversation Events ────────────────────────────────────────────────────── + +export type ConversationStoppedReason = "maxTurns" | "stop" | "stopSequence"; +export type HumanAwaitingReason = "humanTurn" | "pauseCondition"; + +export type ConversationEventMap = { + state: { state: LoopState }; + token: { speaker: string; chunk: string; turnIndex: number }; + turnStart: { speaker: string; turnIndex: number }; + turnComplete: { turn: ChatMessage }; + humanAwaiting: { turnIndex: number; reason: HumanAwaitingReason }; + stopped: { reason: ConversationStoppedReason; turnIndex: number | null }; + error: { error: unknown; speaker?: string; turnIndex?: number }; +}; + +export type ConversationEventName = keyof ConversationEventMap; + +export type ConversationEvent = + { name: K } & ConversationEventMap[K]; + +export type ConversationEventHandler = ( + payload: ConversationEventMap[K], +) => void; + // ─── Conversation Handle // What createConversation() returns. @@ -89,4 +113,17 @@ export type ConversationHandle = { stop(): void; readonly state: LoopState; readonly history: ChatMessage[]; + + on( + event: K, + handler: ConversationEventHandler, + ): () => void; + off( + event: K, + handler: ConversationEventHandler, + ): void; + once( + event: K, + handler: ConversationEventHandler, + ): () => void; }; diff --git a/tests/events.test.ts b/tests/events.test.ts new file mode 100644 index 0000000..e1c1019 --- /dev/null +++ b/tests/events.test.ts @@ -0,0 +1,106 @@ +import { describe, it, expect } from "vitest"; +import { createChatBus } from "../src/bus.js"; +import { createConversation } from "../src/conversation.js"; +import type { AgentAdapter, ChatAgent } from "../src/types.js"; + +function makeBus(agents: ChatAgent[]) { + const bus = createChatBus(); + for (const a of agents) bus.register(a); + return bus; +} + +describe("Conversation events", () => { + it("emits token events and keeps onToken working", async () => { + const tokensFromEvent: string[] = []; + const tokensFromCallback: string[] = []; + + const mockAdapter: AgentAdapter = { + async *generate(_messages, signal) { + for (const char of "ABC") { + if (signal.aborted) break; + yield char; + } + }, + }; + + const bus = makeBus([ + { name: "ai", type: "llm", adapter: mockAdapter }, + ]); + + const convo = createConversation(bus, { + participants: ["ai", "ai"], + topic: "test", + maxTurns: 2, + onToken: (chunk) => tokensFromCallback.push(chunk), + }); + + convo.on("token", ({ chunk }) => tokensFromEvent.push(chunk)); + + await convo.start(); + + expect(tokensFromEvent.join("")).toBe("ABCABC"); + expect(tokensFromCallback.join("")).toBe("ABCABC"); + }); + + it("supports multiple listeners, off(), and once()", async () => { + const adapter: AgentAdapter = { + async *generate() { + yield "X"; + }, + }; + + const bus = makeBus([{ name: "ai", type: "llm", adapter }]); + const convo = createConversation(bus, { + participants: ["ai", "ai"], + topic: "t", + maxTurns: 2, + }); + + const order: string[] = []; + + const h1 = () => order.push("h1"); + const h2 = () => order.push("h2"); + const h3 = () => order.push("h3"); + + convo.on("turnComplete", h1); + const unsub2 = convo.on("turnComplete", h2); + convo.once("turnComplete", h3); + + // Remove h2 before running. + unsub2(); + + await convo.start(); + + // Two turns, h1 should fire twice, h3 once. + expect(order).toEqual(["h1", "h3", "h1"]); + }); + + it("emits state events and keeps onStateChange working", async () => { + const statesFromEvent: string[] = []; + const statesFromCallback: string[] = []; + + const adapter: AgentAdapter = { + async *generate() { + yield "ok"; + }, + }; + + const bus = makeBus([{ name: "ai", type: "llm", adapter }]); + const convo = createConversation(bus, { + participants: ["ai", "ai"], + topic: "t", + maxTurns: 2, + onStateChange: (s) => statesFromCallback.push(s), + }); + + convo.on("state", ({ state }) => statesFromEvent.push(state)); + + await convo.start(); + + expect(statesFromEvent).toContain("streaming"); + expect(statesFromEvent.at(-1)).toBe("stopped"); + expect(statesFromCallback).toContain("streaming"); + expect(statesFromCallback.at(-1)).toBe("stopped"); + }); +}); +