Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,27 @@ const convo = createConversation(bus, {
});
```

### Events (recommended)

`createConversation()` returns a handle that can emit typed events. This is the easiest way to attach multiple independent listeners (CLI output, logging, persistence, UI) without composing callbacks.

```typescript
const convo = createConversation(bus, {
participants: ["agent1", "agent2"],
topic: "Discussion topic",
maxTurns: 10,
});

// Stream tokens
convo.on("token", ({ chunk }) => process.stdout.write(chunk));

// Turn boundaries
convo.on("turnComplete", ({ turn }) => console.log(`\n---\n${turn.speaker}: ${turn.content}`));

// State changes
convo.on("state", ({ state }) => console.log("State:", state));
```

#### ConversationOptions

| Option | Type | Default | Description |
Expand All @@ -109,6 +130,8 @@ const convo = createConversation(bus, {
| `onTurnComplete` | `(turn: ChatMessage) => void` | — | Called when a turn finishes |
| `onStateChange` | `(state: LoopState) => void` | — | Called when conversation state changes |

Note: the callback options above are still supported for backward compatibility, but events are preferred if you need more than one listener.

### `attachInteractiveConsole(convo, config?)`

Attaches readline interface for CLI interaction. Provides real-time message injection and interrupt capabilities.
Expand Down
66 changes: 66 additions & 0 deletions examples/classroom.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { createChatBus, createConversation } from "../src/index.js";
import { anthropicAdapter } from "../src/adapters/anthropic.js";

const colors: Record<string, string> = {
teacher: "\x1b[34m", // Blue
student: "\x1b[32m", // Green
reset: "\x1b[0m",
};
function colorize(name: string): string {
const c = colors[name] || "";
return `${c}[${name}]${colors.reset}`;
}

const bus = createChatBus();

bus.register({
name: "teacher",
type: "llm",
system:
"You are a friendly math teacher. Teach step-by-step, ask one short question at a time, and keep answers concise.",
adapter: anthropicAdapter({
model: "claude-haiku-4-5-20251001",
maxTokens: 180,
}),
});

bus.register({
name: "student",
type: "llm",
system:
"You are a curious student. Answer briefly, show your work, and ask for clarification when confused.",
adapter: anthropicAdapter({
model: "claude-haiku-4-5-20251001",
maxTokens: 180,
}),
});

const convo = createConversation(bus, {
participants: ["teacher", "student"],
topic:
"Teach addition using a simple example: 7 + 5. The teacher should explain, then ask the student to solve 9 + 6.",
maxTurns: 6,
delayMs: 500,
});

// Use the new typed events API
convo.on("turnStart", ({ speaker }) => {
process.stdout.write(`\n${colorize(speaker)} `);
});

convo.on("token", ({ chunk }) => {
process.stdout.write(chunk);
});

convo.on("turnComplete", ({ turn }) => {
process.stdout.write(`\n${"─".repeat(50)}\n`);
if (turn.partial) process.stdout.write("⚠️ (partial — interrupted)\n");
});

convo.on("stopped", ({ reason }) => {
process.stdout.write(`\n✅ Stopped: ${reason}\n`);
});

console.log("🏫 Classroom: Addition (teacher ↔ student)");
const history = await convo.start();
console.log(`\n📜 History: ${history.length} messages`);
96 changes: 96 additions & 0 deletions examples/events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import * as readline from "node:readline";
import { createChatBus, createConversation } from "../src/index.js";
import { anthropicAdapter } from "../src/adapters/anthropic.js";

const bus = createChatBus();

bus.register({
name: "optimist",
type: "llm",
system: "You are an optimistic thinker. Keep responses to 2 sentences max.",
adapter: anthropicAdapter({
model: "claude-haiku-4-5-20251001",
maxTokens: 150,
}),
});

bus.register({
name: "skeptic",
type: "llm",
system: "You are a critical skeptic. Keep responses to 2 sentences max.",
adapter: anthropicAdapter({
model: "claude-haiku-4-5-20251001",
maxTokens: 150,
}),
});

const convo = createConversation(bus, {
participants: ["optimist", "skeptic"],
topic: "Will AI make software developers more productive?",
maxTurns: 6,
delayMs: 2000,
});

// ── Events (preferred): multiple independent listeners

let currentTurnSpeaker: string | null = null;
convo.on("turnStart", ({ speaker }) => {
currentTurnSpeaker = speaker;
process.stdout.write(`\n[${speaker}] `);
});

convo.on("token", ({ chunk }) => {
process.stdout.write(chunk);
});

convo.on("turnComplete", ({ turn }) => {
process.stdout.write(`\n${"─".repeat(50)}\n`);
if (turn.partial) process.stdout.write("⚠️ (partial — interrupted)\n");
});

convo.on("state", ({ state }) => {
if (state === "stopped") process.stdout.write("\n✅ Conversation ended.\n");
});

// Example of once(): run only on the first completed turn
convo.once("turnComplete", ({ turn }) => {
process.stdout.write(`(first turn by ${turn.speaker} completed)\n`);
});

// Example of unsubscribe(): stop printing state transitions after first one
const unsubscribeStateAfterFirst = convo.on("state", ({ state }) => {
process.stdout.write(`(state changed: ${state})\n`);
unsubscribeStateAfterFirst();
});

// ── readline — keyboard interrupt and injection
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});

rl.on("line", (input) => {
const msg = input.trim();
if (!msg) return;
const result = convo.send(msg);
if (result.intent === "interrupt") {
console.log("\n⚡ Interrupted — your message injected.");
} else {
console.log("\n💬 Message injected.");
}
});

rl.on("SIGINT", () => {
console.log("\n🛑 Stopping...");
convo.stop();
rl.close();
});

console.log("🎬 Topic: Will AI make software developers more productive?");
console.log("💡 Type + Enter to interrupt anytime. Ctrl+C to stop.\n");

const history = await convo.start();

console.log(`\n📜 History: ${history.length} messages`);
rl.close();

48 changes: 42 additions & 6 deletions src/conversation.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import { randomUUID } from "node:crypto";
import type {
ChatMessage,
ConversationOptions,
ConversationHandle,
LoopState,
SendResult,
TurnContext,
ConversationEventMap,
ConversationStoppedReason,
HumanAwaitingReason,
} from "./types.js";
import type { ChatBus } from "./bus.js";
import { createMessageStore } from "./history.js";
import { createAbortManager } from "./manager.js";
import { createTypedEmitter } from "./emitter.js";

export function createConversation(
bus: ChatBus,
Expand Down Expand Up @@ -47,9 +50,12 @@ export function createConversation(

const store = createMessageStore();
const manager = createAbortManager();
const events = createTypedEmitter<ConversationEventMap>();

let _state: LoopState = "idle";
let _stopped = false;
let _stopReason: ConversationStoppedReason | null = null;
let _stopTurnIndex: number | null = null;

// The promise/resolve pair for human input (inject or interrupt).
// When the loop needs human input, it awaits _humanInputPromise.
Expand All @@ -72,14 +78,19 @@ export function createConversation(

function setState(next: LoopState) {
_state = next;
events.emit("state", { state: next });
onStateChange?.(next);
}

function waitForHuman(): Promise<string> {
function waitForHuman(
reason: HumanAwaitingReason,
turnIndex: number,
): Promise<string> {
_humanInputPromise = new Promise<string>((resolve) => {
_humanInputResolve = resolve;
});
setState("awaiting-human");
events.emit("humanAwaiting", { reason, turnIndex });
return _humanInputPromise;
}

Expand Down Expand Up @@ -123,7 +134,7 @@ export function createConversation(

// ── Human turn ────────────────────────────────────────────────────────
if (agent.type === "human") {
const humanMsg = await waitForHuman();
const humanMsg = await waitForHuman("humanTurn", turnIndex);
if (_stopped) break;
if (humanMsg.trim()) {
appendHuman(humanMsg, turnIndex);
Expand All @@ -132,26 +143,31 @@ export function createConversation(
}

// ── LLM turn ──────────────────────────────────────────────────────────
events.emit("turnStart", { speaker: speakerName, turnIndex });
setState("streaming");

const projected = store.project(speakerName, agent.system);
const controller = manager.create(turnIndex);

let accumulated = "";
let wasAborted = false;
let fatalError: unknown = null;

try {
for await (const chunk of agent.adapter!.generate(
projected,
controller.signal,
)) {
accumulated += chunk;
events.emit("token", { speaker: speakerName, chunk, turnIndex });
onToken?.(chunk, speakerName);

// Stop-sequence detection — strip it before saving.
if (stopSequence && accumulated.includes(stopSequence)) {
accumulated = accumulated.replace(stopSequence, "").trimEnd();
_stopped = true;
_stopReason = "stopSequence";
_stopTurnIndex = turnIndex;
break;
}

Expand All @@ -164,12 +180,22 @@ export function createConversation(
} catch (err: unknown) {
// AbortError is expected — a human interrupted mid-stream.
const isAbort = err instanceof Error && err.name === "AbortError";
if (!isAbort) throw err;
wasAborted = true;
if (isAbort) {
wasAborted = true;
} else {
fatalError = err;
}
}

manager.release(turnIndex);

if (fatalError !== null) {
events.emit("error", { error: fatalError, speaker: speakerName, turnIndex });
_stopped = true;
if (_stopReason === null) _stopReason = "stop";
if (_stopTurnIndex === null) _stopTurnIndex = turnIndex;
}

// Commit the turn — partial if aborted or interrupted.
const isPartial = wasAborted || _pendingInterrupt !== null;

Expand All @@ -181,6 +207,7 @@ export function createConversation(
...(isPartial ? { partial: true } : {}),
});

events.emit("turnComplete", { turn });
onTurnComplete?.(turn);

// If a human interrupted mid-stream, inject their message now.
Expand Down Expand Up @@ -216,7 +243,7 @@ export function createConversation(
history: store.all(),
};
if (pauseCondition(ctx)) {
const humanMsg = await waitForHuman();
const humanMsg = await waitForHuman("pauseCondition", turnIndex);
if (_stopped) break;
// Only append if user typed something (interactive mode: empty = skip)
if (humanMsg.trim()) {
Expand All @@ -227,6 +254,10 @@ export function createConversation(
}

setState("stopped");
if (_stopReason === null) {
_stopReason = _stopped ? "stop" : "maxTurns";
}
events.emit("stopped", { reason: _stopReason, turnIndex: _stopTurnIndex });
return store.all();
}

Expand Down Expand Up @@ -264,6 +295,8 @@ export function createConversation(

function stop(): void {
_stopped = true;
if (_stopReason === null) _stopReason = "stop";
if (_stopTurnIndex === null) _stopTurnIndex = manager.activeTurnIndex();
manager.abort();
// If the loop is waiting for human input, resolve with empty string
// so the await unblocks. The loop checks _stopped immediately after.
Expand All @@ -278,6 +311,9 @@ export function createConversation(
start,
send,
stop,
on: events.on,
off: events.off,
once: events.once,
get state() {
return _state;
},
Expand Down
Loading
Loading