Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion lib/dispatch/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ export async function dispatchTask(
// Compute session key deterministically (avoids waiting for gateway)
// Slot name provides both collision prevention and human-readable identity
const botName = slotName(project.name, role, level, slotIndex);
const sessionKey = `agent:${agentId ?? "unknown"}:subagent:${project.name}-${role}-${level}-${botName.toLowerCase()}`;
// Use project.slug (always lowercase) to build session key.
// project.name may have mixed case (e.g. "UpMoltWork"), which caused heartbeat
// mismatches when the gateway stores session keys in lowercase format.
const projectKey = (project.slug ?? project.name).toLowerCase();
const sessionKey = `agent:${agentId ?? "unknown"}:subagent:${projectKey}-${role}-${level}-${botName.toLowerCase()}`;

// Clear stale session key if it doesn't match the current deterministic key
// (handles migration from old numeric format like ...-0 to name-based ...-Cordelia)
Expand Down Expand Up @@ -272,6 +276,7 @@ export async function dispatchTask(
runtime,
accountId: notifyTarget?.accountId,
runCommand: rc,
messageThreadId: notifyTarget?.messageThreadId,
},
).catch((err) => {
auditLog(workspaceDir, "dispatch_warning", {
Expand All @@ -294,6 +299,7 @@ export async function dispatchTask(
dispatchTimeoutMs: timeouts.dispatchMs,
extraSystemPrompt: roleInstructions.trim() || undefined,
runCommand: rc,
notifyTarget,
});

// Step 5: Update worker state
Expand Down
201 changes: 201 additions & 0 deletions lib/dispatch/notify.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/**
* Tests for notification delivery fallbacks.
*
* Run with: npx tsx --test lib/dispatch/notify.test.ts
*/
import { after, before, describe, it } from "node:test";
import assert from "node:assert";
import { mkdtemp, readFile, rm } from "node:fs/promises";
import { join } from "node:path";
import { tmpdir } from "node:os";
type CommandOptions = { timeoutMs?: number };
type SpawnResult = { stdout: string; stderr: string; code: number; signal: string | null; killed: boolean; termination: string };
import { notify } from "./notify.js";

describe("notify", () => {
let tempDir: string;

before(async () => {
tempDir = await mkdtemp(join(tmpdir(), "devclaw-notify-test-"));
});

after(async () => {
await rm(tempDir, { recursive: true, force: true });
});

it("falls back to CLI when the Telegram runtime sender is unavailable", async () => {
const calls: Array<{ args: string[]; timeoutMs?: number }> = [];

const ok = await notify(
{
type: "workerStart",
project: "devclaw",
issueId: 7,
issueTitle: "Fix Telegram worker notifications",
issueUrl: "https://example.com/issues/7",
role: "developer",
level: "senior",
name: "firstlight",
sessionAction: "spawn",
},
{
workspaceDir: tempDir,
channelId: "-100123",
channel: "telegram",
runtime: { channel: {} } as any,
runCommand: async (args, opts): Promise<SpawnResult> => {
const options = typeof opts === "number" ? { timeoutMs: opts } : opts as CommandOptions;
calls.push({ args, timeoutMs: options?.timeoutMs });
return {
stdout: "",
stderr: "",
code: 0,
signal: null,
killed: false,
termination: "exit",
};
},
},
);

assert.equal(ok, true);
assert.equal(calls.length, 1);
assert.deepEqual(calls[0]?.args.slice(0, 6), [
"openclaw",
"message",
"send",
"--channel",
"telegram",
"--target",
]);
assert.equal(calls[0]?.args[6], "-100123");
assert.equal(calls[0]?.timeoutMs, 30_000);
});

it("falls back to CLI when the runtime sender throws", async () => {
const calls: Array<{ args: string[]; timeoutMs?: number }> = [];

const ok = await notify(
{
type: "workerComplete",
project: "devclaw",
issueId: 7,
issueUrl: "https://example.com/issues/7",
role: "developer",
result: "done",
},
{
workspaceDir: tempDir,
channelId: "-100123",
channel: "telegram",
runtime: {
channel: {
telegram: {
sendMessageTelegram: async () => {
throw new Error("telegram runtime unavailable");
},
},
},
} as any,
runCommand: async (args, opts): Promise<SpawnResult> => {
const options = typeof opts === "number" ? { timeoutMs: opts } : opts as CommandOptions;
calls.push({ args, timeoutMs: options?.timeoutMs });
return {
stdout: "",
stderr: "",
code: 0,
signal: null,
killed: false,
termination: "exit",
};
},
},
);

assert.equal(ok, true);
assert.equal(calls.length, 1);

const auditLog = await readFile(join(tempDir, "devclaw", "log", "audit.log"), "utf-8");
assert.match(auditLog, /"event":"notify_runtime_error"/);
assert.match(auditLog, /telegram runtime unavailable/);
assert.match(auditLog, /"event":"notify_delivery"/);
assert.match(auditLog, /"delivery":"cli-fallback"/);
});

it("passes messageThreadId through the Telegram CLI fallback", async () => {
const calls: Array<{ args: string[]; timeoutMs?: number }> = [];

const ok = await notify(
{
type: "workerComplete",
project: "devclaw",
issueId: 7,
issueUrl: "https://example.com/issues/7",
role: "developer",
result: "refine",
},
{
workspaceDir: tempDir,
channelId: "-100123",
channel: "telegram",
runtime: {
channel: {
telegram: {
sendMessageTelegram: async () => {
throw new Error("telegram runtime unavailable");
},
},
},
} as any,
runCommand: async (args, opts): Promise<SpawnResult> => {
const options = typeof opts === "number" ? { timeoutMs: opts } : opts as CommandOptions;
calls.push({ args, timeoutMs: options?.timeoutMs });
return {
stdout: "",
stderr: "",
code: 0,
signal: null,
killed: false,
termination: "exit",
};
},
messageThreadId: 176,
},
);

assert.equal(ok, true);
assert.equal(calls.length, 1);
assert.ok(calls[0]?.args.includes("--thread-id"));
assert.ok(calls[0]?.args.includes("176"));

const auditLog = await readFile(join(tempDir, "devclaw", "log", "audit.log"), "utf-8");
assert.match(auditLog, /"delivery":"cli-fallback"/);
assert.match(auditLog, /"messageThreadId":176/);
});

it("logs notify_error and returns false when no sender is available", async () => {
const ok = await notify(
{
type: "workerComplete",
project: "devclaw",
issueId: 7,
issueUrl: "https://example.com/issues/7",
role: "developer",
result: "done",
},
{
workspaceDir: tempDir,
channelId: "-100123",
channel: "telegram",
runtime: { channel: {} } as any,
},
);

assert.equal(ok, false);

const auditLog = await readFile(join(tempDir, "devclaw", "log", "audit.log"), "utf-8");
assert.match(auditLog, /"event":"notify_error"/);
assert.match(auditLog, /"delivery":"failed"/);
assert.match(auditLog, /No runtime sender available for channel telegram/);
});
});
115 changes: 72 additions & 43 deletions lib/dispatch/notify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,59 +262,86 @@ async function sendMessage(
runtime?: PluginRuntime,
accountId?: string,
runCommand?: RunCommand,
messageThreadId?: number,
): Promise<boolean> {
try {
// Use runtime API when available (avoids CLI subprocess timeouts)
if (runtime) {
const runtimeChannel = runtime?.channel as Record<string, any> | undefined;
const runtimeSender =
channel === "telegram" ? runtimeChannel?.telegram?.sendMessageTelegram :
channel === "whatsapp" ? runtimeChannel?.whatsapp?.sendMessageWhatsApp :
channel === "discord" ? runtimeChannel?.discord?.sendMessageDiscord :
channel === "slack" ? runtimeChannel?.slack?.sendMessageSlack :
channel === "signal" ? runtimeChannel?.signal?.sendMessageSignal :
undefined;

if (runtimeSender) {
try {
if (channel === "telegram") {
// Cast to any to bypass TypeScript type limitation; disableWebPagePreview is valid in Telegram API
await runtime.channel.telegram.sendMessageTelegram(target, message, { silent: true, disableWebPagePreview: true, accountId } as any);
return true;
}
if (channel === "whatsapp") {
await runtime.channel.whatsapp.sendMessageWhatsApp(target, message, { verbose: false, accountId });
return true;
}
if (channel === "discord") {
await runtime.channel.discord.sendMessageDiscord(target, message, { accountId });
return true;
const telegramOpts: Record<string, unknown> = { silent: true, disableWebPagePreview: true, accountId };
if (messageThreadId != null) telegramOpts.messageThreadId = messageThreadId;
await runtimeSender(target, message, telegramOpts as any);
} else if (channel === "whatsapp") {
await runtimeSender(target, message, { verbose: false, accountId });
} else {
await runtimeSender(target, message, { accountId });
}
if (channel === "slack") {
await runtime.channel.slack.sendMessageSlack(target, message, { accountId });
return true;
}
if (channel === "signal") {
await runtime.channel.signal.sendMessageSignal(target, message, { accountId });
return true;
}
}

// Fallback: use CLI (for unsupported channels or when runtime isn't available)
if (!runCommand) throw new Error("runCommand is required when runtime is not available");
const rc = runCommand;
// Note: openclaw message send CLI doesn't expose disable_web_page_preview flag.
// The runtime API path (above) handles it; CLI fallback won't suppress previews.
await rc(
[
"openclaw",
"message",
"send",
"--channel",
await auditLog(workspaceDir, "notify_delivery", {
target,
channel,
"--target",
delivery: "runtime",
});
return true;
} catch (err) {
await auditLog(workspaceDir, "notify_runtime_error", {
target,
"--message",
message,
"--json",
],
{ timeoutMs: 30_000 },
);
channel,
error: (err as Error).message,
});
}
}

if (!runCommand) {
await auditLog(workspaceDir, "notify_error", {
target,
channel,
delivery: "failed",
error: `No runtime sender available for channel ${channel} and runCommand is not available`,
});
return false;
}

try {
const rc = runCommand;
const argv = [
"openclaw",
"message",
"send",
"--channel",
channel,
"--target",
target,
"--message",
message,
];
if (channel === "telegram" && messageThreadId != null) {
argv.push("--thread-id", String(messageThreadId));
}
argv.push("--json");

await rc(argv, { timeoutMs: 30_000 });

await auditLog(workspaceDir, "notify_delivery", {
target,
channel,
delivery: "cli-fallback",
...(channel === "telegram" && messageThreadId != null ? { messageThreadId } : {}),
});
return true;
} catch (err) {
// Log but don't throw — notifications shouldn't break the main flow
await auditLog(workspaceDir, "notify_error", {
target,
channel,
delivery: "failed",
error: (err as Error).message,
});
return false;
Expand All @@ -341,6 +368,8 @@ export async function notify(
accountId?: string;
/** Injected runCommand for dependency injection. */
runCommand?: RunCommand;
/** Optional Telegram topic for forum routing. */
messageThreadId?: number;
},
): Promise<boolean> {
if (opts.config?.[event.type] === false) return true;
Expand All @@ -364,7 +393,7 @@ export async function notify(
message,
});

return sendMessage(target, message, channel, opts.workspaceDir, opts.runtime, opts.accountId, opts.runCommand);
return sendMessage(target, message, channel, opts.workspaceDir, opts.runtime, opts.accountId, opts.runCommand, opts.messageThreadId);
}

/**
Expand Down
1 change: 1 addition & 0 deletions lib/projects/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export type Channel = {
name: string; // e.g. "primary", "dev-chat"
events: string[]; // e.g. ["*"] for all, ["workerComplete"] for filtered
accountId?: string; // Optional account ID for multi-account setups
messageThreadId?: number; // Optional Telegram forum topic ID for scoped routing
};

/**
Expand Down
Loading