Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,18 @@ jobs:
- uses: actions/checkout@v4
- uses: oven-sh/setup-bun@v2
- run: bun install --frozen-lockfile
- name: Setup test config
run: |
mkdir -p ~/.agentara
cat > ~/.agentara/config.yaml << 'EOF'
agents:
default:
type: claude-code
model: claude-sonnet-4-6
tasking:
max_retries: 3
messaging:
default_channel_id: test-channel
channels: []
EOF
- run: bun test
76 changes: 60 additions & 16 deletions src/community/anthropic/claude-agent-runner.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
config,
createLogger,
extractTextContent,
type MessageContent,
type ToolMessage,
Expand All @@ -10,6 +11,18 @@ import {
type UserMessage,
} from "@/shared";

const logger = createLogger("claude-agent-runner");

/**
* Error thrown when the agent runner is aborted.
*/
export class AgentAbortError extends Error {
constructor(message = "Agent execution was aborted") {
super(message);
this.name = "AgentAbortError";
}
}

/**
* The agent runner for Claude Code CLI.
*/
Expand All @@ -22,6 +35,7 @@ export class ClaudeAgentRunner implements AgentRunner {
): AsyncIterableIterator<SystemMessage | AssistantMessage | ToolMessage> {
const sessionId = message.session_id;
const isNew = options?.isNewSession ?? false;
const signal = options?.signal;
const textContentOfUserMessage = JSON.stringify(
extractTextContent(message),
);
Expand All @@ -43,6 +57,22 @@ export class ClaudeAgentRunner implements AgentRunner {
},
stderr: "pipe",
});

// Handle abort signal
let aborted = false;
const abortHandler = () => {
aborted = true;
logger.info({ session_id: sessionId }, "killing Claude Code process");
proc.kill();
};
if (signal) {
if (signal.aborted) {
proc.kill();
throw new AgentAbortError();
}
signal.addEventListener("abort", abortHandler, { once: true });
}

const decoder = new TextDecoder();
const stderrChunks: Uint8Array[] = [];
const stderrPipe = proc.stderr.pipeTo(
Expand All @@ -54,27 +84,41 @@ export class ClaudeAgentRunner implements AgentRunner {
);
let buffer = "";
let stdoutRaw = "";
for await (const chunk of proc.stdout) {
const decoded = decoder.decode(chunk, { stream: true });
buffer += decoded;
stdoutRaw += decoded;
const lines = buffer.split("\n");
buffer = lines.pop()!;
for (const line of lines) {
if (line.trim()) {
const parsed = this._parseStreamLine(line.trim(), sessionId);
if (parsed) {
yield parsed;
try {
for await (const chunk of proc.stdout) {
if (aborted) {
break;
}
const decoded = decoder.decode(chunk, { stream: true });
buffer += decoded;
stdoutRaw += decoded;
const lines = buffer.split("\n");
buffer = lines.pop()!;
for (const line of lines) {
if (line.trim()) {
const parsed = this._parseStreamLine(line.trim(), sessionId);
if (parsed) {
yield parsed;
}
}
}
}
}
if (buffer.trim()) {
const parsed = this._parseStreamLine(buffer.trim(), sessionId);
if (parsed) {
yield parsed;
if (!aborted && buffer.trim()) {
const parsed = this._parseStreamLine(buffer.trim(), sessionId);
if (parsed) {
yield parsed;
}
}
} finally {
if (signal) {
signal.removeEventListener("abort", abortHandler);
}
}

if (aborted) {
throw new AgentAbortError();
}

const exitCode = await proc.exited;
await stderrPipe;
if (exitCode !== 0) {
Expand Down
12 changes: 12 additions & 0 deletions src/community/feishu/messaging/message-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ export class FeishuMessageChannel
await this._inboundClient.start({
eventDispatcher: new EventDispatcher({}).register({
"im.message.receive_v1": this._handleMessageReceive,
"im.message.recalled_v1": this._handleMessageRecall,
}),
});
}
Expand Down Expand Up @@ -563,6 +564,17 @@ export class FeishuMessageChannel
this.emit("message:inbound", userMessage);
};

private _handleMessageRecall = async (data: {
message_id?: string;
chat_id?: string;
recall_time?: string;
recall_type?: string;
}) => {
if (!data.message_id) return;
this._logger.info({ message_id: data.message_id }, "message recalled");
this.emit("message:recalled", data.message_id, this.id);
};

private _threadIdToSessionId = new Map<string, string>();

/** Persist a thread→session mapping to DB and update the in-memory cache. */
Expand Down
71 changes: 55 additions & 16 deletions src/community/openai/codex-agent-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ import {

const logger = createLogger("codex-agent-runner");

/**
* Error thrown when the agent runner is aborted.
*/
export class AgentAbortError extends Error {
constructor(message = "Agent execution was aborted") {
super(message);
this.name = "AgentAbortError";
}
}

/**
* The agent runner for OpenAI Codex CLI.
*
Expand All @@ -33,6 +43,7 @@ export class CodexAgentRunner implements AgentRunner {
): AsyncIterableIterator<SystemMessage | AssistantMessage | ToolMessage> {
const sessionId = message.session_id;
const isNew = options?.isNewSession ?? false;
const signal = options?.signal;
const resumeId = options.runnerSessionId ?? sessionId;
const textContentOfUserMessage = JSON.stringify(
extractTextContent(message),
Expand All @@ -56,6 +67,21 @@ export class CodexAgentRunner implements AgentRunner {
stderr: "pipe",
});

// Handle abort signal
let aborted = false;
const abortHandler = () => {
aborted = true;
logger.info({ session_id: sessionId }, "killing Codex CLI process");
proc.kill();
};
if (signal) {
if (signal.aborted) {
proc.kill();
throw new AgentAbortError();
}
signal.addEventListener("abort", abortHandler, { once: true });
}

const decoder = new TextDecoder();
const stderrChunks: Uint8Array[] = [];
const stderrPipe = proc.stderr.pipeTo(
Expand All @@ -68,27 +94,40 @@ export class CodexAgentRunner implements AgentRunner {

let buffer = "";
let stdoutRaw = "";
for await (const chunk of proc.stdout) {
const decoded = decoder.decode(chunk, { stream: true });
buffer += decoded;
stdoutRaw += decoded;
const lines = buffer.split("\n");
buffer = lines.pop()!;
for (const line of lines) {
if (line.trim()) {
const messages = this._parseStreamLine(line.trim(), sessionId);
for (const msg of messages) {
yield msg;
try {
for await (const chunk of proc.stdout) {
if (aborted) {
break;
}
const decoded = decoder.decode(chunk, { stream: true });
buffer += decoded;
stdoutRaw += decoded;
const lines = buffer.split("\n");
buffer = lines.pop()!;
for (const line of lines) {
if (line.trim()) {
const messages = this._parseStreamLine(line.trim(), sessionId);
for (const msg of messages) {
yield msg;
}
}
}
}
}

if (buffer.trim()) {
const messages = this._parseStreamLine(buffer.trim(), sessionId);
for (const msg of messages) {
yield msg;
if (!aborted && buffer.trim()) {
const messages = this._parseStreamLine(buffer.trim(), sessionId);
for (const msg of messages) {
yield msg;
}
}
} finally {
if (signal) {
signal.removeEventListener("abort", abortHandler);
}
}

if (aborted) {
throw new AgentAbortError();
}

const exitCode = await proc.exited;
Expand Down
50 changes: 48 additions & 2 deletions src/kernel/kernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class Kernel {
);
}
this._messageGateway.on("message:inbound", this._handleInboundMessage);
this._messageGateway.on("message:recalled", this._handleMessageRecall);
}

/**
Expand All @@ -111,17 +112,61 @@ class Kernel {
}

private _handleInboundMessage = async (message: UserMessage) => {
const text = extractTextContent(message).trim();

// Handle /stop command
if (text === "/stop") {
await this._handleStopCommand(message);
return;
}

const task: InboundMessageTaskPayload = {
type: "inbound_message",
message,
};
await this._taskDispatcher.dispatch(message.session_id, task);
};

private _handleStopCommand = async (message: UserMessage) => {
const sessionId = message.session_id;
const runningTaskId =
this._taskDispatcher.getRunningTaskForSession(sessionId);

if (runningTaskId) {
await this._taskDispatcher.deleteTask(runningTaskId);
await this._messageGateway.replyMessage(message.id, {
role: "assistant",
session_id: sessionId,
content: [{ type: "text", text: "Task stopped." }],
});
} else {
await this._messageGateway.replyMessage(message.id, {
role: "assistant",
session_id: sessionId,
content: [{ type: "text", text: "No running task found." }],
});
}
};

private _handleMessageRecall = async (
messageId: string,
channelId: string,
) => {
const taskId = this._taskDispatcher.getTaskByMessageId(messageId);
if (taskId) {
await this._taskDispatcher.deleteTask(taskId);
this._logger.info(
{ message_id: messageId, task_id: taskId, channel_id: channelId },
"task stopped due to message recall",
);
}
};

private _handleInboundMessageTask = async (
taskId: string,
sessionId: string,
payload: InboundMessageTaskPayload,
signal?: AbortSignal,
) => {
const inboundMessage = payload.message;
const session = await this._sessionManager.resolveSession(sessionId, {
Expand All @@ -146,7 +191,7 @@ class Kernel {
},
);
contents = [];
const stream = await session.stream(inboundMessage);
const stream = await session.stream(inboundMessage, { signal });
let lastMessage: AssistantMessage | undefined;
for await (const message of stream) {
if (message.role === "assistant") {
Expand Down Expand Up @@ -175,6 +220,7 @@ class Kernel {
_taskId: string,
sessionId: string,
payload: ScheduledTaskPayload,
signal?: AbortSignal,
) => {
const payload_without_instruction: { instruction?: string } = {
...payload,
Expand All @@ -201,7 +247,7 @@ ${payload.instruction}`,
firstMessage: userMessage,
});
delete payload_without_instruction.instruction;
const assistantMessage = await session.run(userMessage);
const assistantMessage = await session.run(userMessage, { signal });
if (extractTextContent(assistantMessage).includes("[SKIPPED]")) {
return;
}
Expand Down
3 changes: 3 additions & 0 deletions src/kernel/messaging/multi-channel-message-gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ export class MultiChannelMessageGateway
channel.on("message:inbound", (message: UserMessage) => {
this._handleInboundMessage(channel.id, message);
});
channel.on("message:recalled", (messageId: string, channelId: string) => {
this.emit("message:recalled", messageId, channelId);
});
this._logger.info(`Registered channel: ${channel.id}`);
}

Expand Down
Loading
Loading