From b310791040c2d1653aba23d320c5a32428c109a5 Mon Sep 17 00:00:00 2001 From: fujiwaranosai850 Date: Sat, 2 May 2026 01:24:40 +0000 Subject: [PATCH 1/2] fix: reconcile telegram notification export on upstream --- lib/dispatch/index.ts | 1 + lib/dispatch/notify.test.ts | 201 ++++++++++++++++++++++++++++++ lib/dispatch/notify.ts | 115 ++++++++++------- lib/projects/types.ts | 1 + lib/services/pipeline.e2e.test.ts | 105 ++++++++++++++++ lib/services/pipeline.ts | 135 +++++++++++--------- lib/workflow/labels.ts | 4 +- 7 files changed, 459 insertions(+), 103 deletions(-) create mode 100644 lib/dispatch/notify.test.ts diff --git a/lib/dispatch/index.ts b/lib/dispatch/index.ts index 4efd8ca7..9e418017 100644 --- a/lib/dispatch/index.ts +++ b/lib/dispatch/index.ts @@ -272,6 +272,7 @@ export async function dispatchTask( runtime, accountId: notifyTarget?.accountId, runCommand: rc, + messageThreadId: notifyTarget?.messageThreadId, }, ).catch((err) => { auditLog(workspaceDir, "dispatch_warning", { diff --git a/lib/dispatch/notify.test.ts b/lib/dispatch/notify.test.ts new file mode 100644 index 00000000..a818e3a9 --- /dev/null +++ b/lib/dispatch/notify.test.ts @@ -0,0 +1,201 @@ +/** + * Tests for notification delivery fallbacks. + * + * Run with: npx tsx --test lib/dispatch/notify.test.ts + */ +import { after, before, describe, it } from "node:test"; +import assert from "node:assert"; +import { mkdtemp, readFile, rm } from "node:fs/promises"; +import { join } from "node:path"; +import { tmpdir } from "node:os"; +type CommandOptions = { timeoutMs?: number }; +type SpawnResult = { stdout: string; stderr: string; code: number; signal: string | null; killed: boolean; termination: string }; +import { notify } from "./notify.js"; + +describe("notify", () => { + let tempDir: string; + + before(async () => { + tempDir = await mkdtemp(join(tmpdir(), "devclaw-notify-test-")); + }); + + after(async () => { + await rm(tempDir, { recursive: true, force: true }); + }); + + it("falls back to CLI when the Telegram runtime sender is unavailable", async () => { + const calls: Array<{ args: string[]; timeoutMs?: number }> = []; + + const ok = await notify( + { + type: "workerStart", + project: "devclaw", + issueId: 7, + issueTitle: "Fix Telegram worker notifications", + issueUrl: "https://example.com/issues/7", + role: "developer", + level: "senior", + name: "firstlight", + sessionAction: "spawn", + }, + { + workspaceDir: tempDir, + channelId: "-100123", + channel: "telegram", + runtime: { channel: {} } as any, + runCommand: async (args, opts): Promise => { + const options = typeof opts === "number" ? { timeoutMs: opts } : opts as CommandOptions; + calls.push({ args, timeoutMs: options?.timeoutMs }); + return { + stdout: "", + stderr: "", + code: 0, + signal: null, + killed: false, + termination: "exit", + }; + }, + }, + ); + + assert.equal(ok, true); + assert.equal(calls.length, 1); + assert.deepEqual(calls[0]?.args.slice(0, 6), [ + "openclaw", + "message", + "send", + "--channel", + "telegram", + "--target", + ]); + assert.equal(calls[0]?.args[6], "-100123"); + assert.equal(calls[0]?.timeoutMs, 30_000); + }); + + it("falls back to CLI when the runtime sender throws", async () => { + const calls: Array<{ args: string[]; timeoutMs?: number }> = []; + + const ok = await notify( + { + type: "workerComplete", + project: "devclaw", + issueId: 7, + issueUrl: "https://example.com/issues/7", + role: "developer", + result: "done", + }, + { + workspaceDir: tempDir, + channelId: "-100123", + channel: "telegram", + runtime: { + channel: { + telegram: { + sendMessageTelegram: async () => { + throw new Error("telegram runtime unavailable"); + }, + }, + }, + } as any, + runCommand: async (args, opts): Promise => { + const options = typeof opts === "number" ? { timeoutMs: opts } : opts as CommandOptions; + calls.push({ args, timeoutMs: options?.timeoutMs }); + return { + stdout: "", + stderr: "", + code: 0, + signal: null, + killed: false, + termination: "exit", + }; + }, + }, + ); + + assert.equal(ok, true); + assert.equal(calls.length, 1); + + const auditLog = await readFile(join(tempDir, "devclaw", "log", "audit.log"), "utf-8"); + assert.match(auditLog, /"event":"notify_runtime_error"/); + assert.match(auditLog, /telegram runtime unavailable/); + assert.match(auditLog, /"event":"notify_delivery"/); + assert.match(auditLog, /"delivery":"cli-fallback"/); + }); + + it("passes messageThreadId through the Telegram CLI fallback", async () => { + const calls: Array<{ args: string[]; timeoutMs?: number }> = []; + + const ok = await notify( + { + type: "workerComplete", + project: "devclaw", + issueId: 7, + issueUrl: "https://example.com/issues/7", + role: "developer", + result: "refine", + }, + { + workspaceDir: tempDir, + channelId: "-100123", + channel: "telegram", + runtime: { + channel: { + telegram: { + sendMessageTelegram: async () => { + throw new Error("telegram runtime unavailable"); + }, + }, + }, + } as any, + runCommand: async (args, opts): Promise => { + const options = typeof opts === "number" ? { timeoutMs: opts } : opts as CommandOptions; + calls.push({ args, timeoutMs: options?.timeoutMs }); + return { + stdout: "", + stderr: "", + code: 0, + signal: null, + killed: false, + termination: "exit", + }; + }, + messageThreadId: 176, + }, + ); + + assert.equal(ok, true); + assert.equal(calls.length, 1); + assert.ok(calls[0]?.args.includes("--thread-id")); + assert.ok(calls[0]?.args.includes("176")); + + const auditLog = await readFile(join(tempDir, "devclaw", "log", "audit.log"), "utf-8"); + assert.match(auditLog, /"delivery":"cli-fallback"/); + assert.match(auditLog, /"messageThreadId":176/); + }); + + it("logs notify_error and returns false when no sender is available", async () => { + const ok = await notify( + { + type: "workerComplete", + project: "devclaw", + issueId: 7, + issueUrl: "https://example.com/issues/7", + role: "developer", + result: "done", + }, + { + workspaceDir: tempDir, + channelId: "-100123", + channel: "telegram", + runtime: { channel: {} } as any, + }, + ); + + assert.equal(ok, false); + + const auditLog = await readFile(join(tempDir, "devclaw", "log", "audit.log"), "utf-8"); + assert.match(auditLog, /"event":"notify_error"/); + assert.match(auditLog, /"delivery":"failed"/); + assert.match(auditLog, /No runtime sender available for channel telegram/); + }); +}); diff --git a/lib/dispatch/notify.ts b/lib/dispatch/notify.ts index e9abca46..14677a3d 100644 --- a/lib/dispatch/notify.ts +++ b/lib/dispatch/notify.ts @@ -262,59 +262,86 @@ async function sendMessage( runtime?: PluginRuntime, accountId?: string, runCommand?: RunCommand, + messageThreadId?: number, ): Promise { - try { - // Use runtime API when available (avoids CLI subprocess timeouts) - if (runtime) { + const runtimeChannel = runtime?.channel as Record | undefined; + const runtimeSender = + channel === "telegram" ? runtimeChannel?.telegram?.sendMessageTelegram : + channel === "whatsapp" ? runtimeChannel?.whatsapp?.sendMessageWhatsApp : + channel === "discord" ? runtimeChannel?.discord?.sendMessageDiscord : + channel === "slack" ? runtimeChannel?.slack?.sendMessageSlack : + channel === "signal" ? runtimeChannel?.signal?.sendMessageSignal : + undefined; + + if (runtimeSender) { + try { if (channel === "telegram") { - // Cast to any to bypass TypeScript type limitation; disableWebPagePreview is valid in Telegram API - await runtime.channel.telegram.sendMessageTelegram(target, message, { silent: true, disableWebPagePreview: true, accountId } as any); - return true; - } - if (channel === "whatsapp") { - await runtime.channel.whatsapp.sendMessageWhatsApp(target, message, { verbose: false, accountId }); - return true; - } - if (channel === "discord") { - await runtime.channel.discord.sendMessageDiscord(target, message, { accountId }); - return true; + const telegramOpts: Record = { silent: true, disableWebPagePreview: true, accountId }; + if (messageThreadId != null) telegramOpts.messageThreadId = messageThreadId; + await runtimeSender(target, message, telegramOpts as any); + } else if (channel === "whatsapp") { + await runtimeSender(target, message, { verbose: false, accountId }); + } else { + await runtimeSender(target, message, { accountId }); } - if (channel === "slack") { - await runtime.channel.slack.sendMessageSlack(target, message, { accountId }); - return true; - } - if (channel === "signal") { - await runtime.channel.signal.sendMessageSignal(target, message, { accountId }); - return true; - } - } - // Fallback: use CLI (for unsupported channels or when runtime isn't available) - if (!runCommand) throw new Error("runCommand is required when runtime is not available"); - const rc = runCommand; - // Note: openclaw message send CLI doesn't expose disable_web_page_preview flag. - // The runtime API path (above) handles it; CLI fallback won't suppress previews. - await rc( - [ - "openclaw", - "message", - "send", - "--channel", + await auditLog(workspaceDir, "notify_delivery", { + target, channel, - "--target", + delivery: "runtime", + }); + return true; + } catch (err) { + await auditLog(workspaceDir, "notify_runtime_error", { target, - "--message", - message, - "--json", - ], - { timeoutMs: 30_000 }, - ); + channel, + error: (err as Error).message, + }); + } + } + + if (!runCommand) { + await auditLog(workspaceDir, "notify_error", { + target, + channel, + delivery: "failed", + error: `No runtime sender available for channel ${channel} and runCommand is not available`, + }); + return false; + } + + try { + const rc = runCommand; + const argv = [ + "openclaw", + "message", + "send", + "--channel", + channel, + "--target", + target, + "--message", + message, + ]; + if (channel === "telegram" && messageThreadId != null) { + argv.push("--thread-id", String(messageThreadId)); + } + argv.push("--json"); + + await rc(argv, { timeoutMs: 30_000 }); + + await auditLog(workspaceDir, "notify_delivery", { + target, + channel, + delivery: "cli-fallback", + ...(channel === "telegram" && messageThreadId != null ? { messageThreadId } : {}), + }); return true; } catch (err) { - // Log but don't throw — notifications shouldn't break the main flow await auditLog(workspaceDir, "notify_error", { target, channel, + delivery: "failed", error: (err as Error).message, }); return false; @@ -341,6 +368,8 @@ export async function notify( accountId?: string; /** Injected runCommand for dependency injection. */ runCommand?: RunCommand; + /** Optional Telegram topic for forum routing. */ + messageThreadId?: number; }, ): Promise { if (opts.config?.[event.type] === false) return true; @@ -364,7 +393,7 @@ export async function notify( message, }); - return sendMessage(target, message, channel, opts.workspaceDir, opts.runtime, opts.accountId, opts.runCommand); + return sendMessage(target, message, channel, opts.workspaceDir, opts.runtime, opts.accountId, opts.runCommand, opts.messageThreadId); } /** diff --git a/lib/projects/types.ts b/lib/projects/types.ts index 26526235..2db7e7e6 100644 --- a/lib/projects/types.ts +++ b/lib/projects/types.ts @@ -33,6 +33,7 @@ export type Channel = { name: string; // e.g. "primary", "dev-chat" events: string[]; // e.g. ["*"] for all, ["workerComplete"] for filtered accountId?: string; // Optional account ID for multi-account setups + messageThreadId?: number; // Optional Telegram forum topic ID for scoped routing }; /** diff --git a/lib/services/pipeline.e2e.test.ts b/lib/services/pipeline.e2e.test.ts index 6482d2b0..fb91867b 100644 --- a/lib/services/pipeline.e2e.test.ts +++ b/lib/services/pipeline.e2e.test.ts @@ -341,6 +341,58 @@ describe("E2E pipeline", () => { assert.strictEqual(closeCalls.length, 1); assert.strictEqual(closeCalls[0].args.issueId, 30); }); + + it("should use CLI fallback for completion notifications and preserve Telegram topic routing", async () => { + h.project.channels[0]!.messageThreadId = 176; + + await executeCompletion({ + workspaceDir: h.workspaceDir, + projectSlug: h.project.slug, + channels: h.project.channels, + role: "tester", + result: "pass", + issueId: 30, + summary: "All tests pass", + provider: h.provider, + repoPath: "/tmp/test-repo", + projectName: "test-project", + runCommand: h.runCommand, + }); + + const notifyCalls = h.commands.commands.filter( + (c) => c.argv[0] === "openclaw" && c.argv[1] === "message" && c.argv[2] === "send", + ); + assert.strictEqual(notifyCalls.length, 1, "Expected workerComplete CLI fallback notification"); + assert.ok(notifyCalls[0]!.argv.includes("--thread-id") && notifyCalls[0]!.argv.includes("176")); + }); + + it("should await completion notification fallback delivery before returning", async () => { + h.project.channels[0]!.messageThreadId = 176; + const delayedRunCommand = async (argv: string[], opts: any) => { + if (argv[0] === "openclaw" && argv[1] === "message" && argv[2] === "send") { + await new Promise((resolve) => setTimeout(resolve, 25)); + } + return h.runCommand(argv, opts); + }; + + const startedAt = Date.now(); + await executeCompletion({ + workspaceDir: h.workspaceDir, + projectSlug: h.project.slug, + channels: h.project.channels, + role: "tester", + result: "pass", + issueId: 30, + summary: "All tests pass", + provider: h.provider, + repoPath: "/tmp/test-repo", + projectName: "test-project", + runCommand: delayedRunCommand, + }); + const elapsedMs = Date.now() - startedAt; + + assert.ok(elapsedMs >= 25, `Expected executeCompletion to await fallback send, got ${elapsedMs}ms`); + }); }); // ========================================================================= @@ -420,6 +472,59 @@ describe("E2E pipeline", () => { const issue = await h.provider.getIssue(50); assert.ok(issue.labels.includes("Refining")); }); + + + it("should use CLI fallback for Refining notifications and preserve Telegram topic routing", async () => { + h.project.channels[0]!.messageThreadId = 176; + + await executeCompletion({ + workspaceDir: h.workspaceDir, + projectSlug: h.project.slug, + channels: h.project.channels, + role: "developer", + result: "blocked", + issueId: 50, + summary: "Need design decision", + provider: h.provider, + repoPath: "/tmp/test-repo", + projectName: "test-project", + runCommand: h.runCommand, + }); + + const notifyCalls = h.commands.commands.filter( + (c) => c.argv[0] === "openclaw" && c.argv[1] === "message" && c.argv[2] === "send", + ); + assert.strictEqual(notifyCalls.length, 1, "Expected workerComplete CLI fallback notification"); + assert.ok(notifyCalls[0]!.argv.includes("--thread-id") && notifyCalls[0]!.argv.includes("176")); + }); + + it("should await Refining notification fallback delivery before returning", async () => { + h.project.channels[0]!.messageThreadId = 176; + const delayedRunCommand = async (argv: string[], opts: any) => { + if (argv[0] === "openclaw" && argv[1] === "message" && argv[2] === "send") { + await new Promise((resolve) => setTimeout(resolve, 25)); + } + return h.runCommand(argv, opts); + }; + + const startedAt = Date.now(); + await executeCompletion({ + workspaceDir: h.workspaceDir, + projectSlug: h.project.slug, + channels: h.project.channels, + role: "developer", + result: "blocked", + issueId: 50, + summary: "Need design decision", + provider: h.provider, + repoPath: "/tmp/test-repo", + projectName: "test-project", + runCommand: delayedRunCommand, + }); + const elapsedMs = Date.now() - startedAt; + + assert.ok(elapsedMs >= 25, `Expected executeCompletion to await Refining fallback send, got ${elapsedMs}ms`); + }); }); // ========================================================================= diff --git a/lib/services/pipeline.ts b/lib/services/pipeline.ts index 681ebb9d..ae0d0c4f 100644 --- a/lib/services/pipeline.ts +++ b/lib/services/pipeline.ts @@ -154,51 +154,66 @@ export async function executeCompletion(opts: { // Send notification early (before deactivation and label transition which can fail) const notifyConfig = getNotificationConfig(pluginConfig); - notify( - { - type: "workerComplete", - project: projectName, - issueId, - issueUrl: issue.web_url, - role, - level: opts.level, - name: workerName, - result: result as "done" | "pass" | "fail" | "refine" | "blocked", - summary, - nextState, - prUrl, - createdTasks, - }, - { - workspaceDir, - config: notifyConfig, - channelId: notifyTarget?.channelId, - channel: notifyTarget?.channel ?? "telegram", - runtime, - accountId: notifyTarget?.accountId, - }, - ).catch((err) => { - auditLog(workspaceDir, "pipeline_warning", { step: "notify", issue: issueId, role, error: (err as Error).message ?? String(err) }).catch(() => {}); - }); - - // Send merge notification when PR was merged during this completion - if (mergedPr) { - notify( + try { + await notify( { - type: "prMerged", + type: "workerComplete", project: projectName, issueId, issueUrl: issue.web_url, - issueTitle: issue.title, + role, + level: opts.level, + name: workerName, + result: result as "done" | "pass" | "fail" | "refine" | "blocked", + summary, + nextState, prUrl, - prTitle, - sourceBranch, - mergedBy: "pipeline", + createdTasks, }, - { workspaceDir, config: notifyConfig, channelId: notifyTarget?.channelId, channel: notifyTarget?.channel ?? "telegram", runtime, accountId: notifyTarget?.accountId }, - ).catch((err) => { + { + workspaceDir, + config: notifyConfig, + channelId: notifyTarget?.channelId, + channel: notifyTarget?.channel ?? "telegram", + runtime, + accountId: notifyTarget?.accountId, + runCommand: rc, + messageThreadId: notifyTarget?.messageThreadId, + }, + ); + } catch (err) { + auditLog(workspaceDir, "pipeline_warning", { step: "notify", issue: issueId, role, error: (err as Error).message ?? String(err) }).catch(() => {}); + } + + // Send merge notification when PR was merged during this completion + if (mergedPr) { + try { + await notify( + { + type: "prMerged", + project: projectName, + issueId, + issueUrl: issue.web_url, + issueTitle: issue.title, + prUrl, + prTitle, + sourceBranch, + mergedBy: "pipeline", + }, + { + workspaceDir, + config: notifyConfig, + channelId: notifyTarget?.channelId, + channel: notifyTarget?.channel ?? "telegram", + runtime, + accountId: notifyTarget?.accountId, + runCommand: rc, + messageThreadId: notifyTarget?.messageThreadId, + }, + ); + } catch (err) { auditLog(workspaceDir, "pipeline_warning", { step: "mergeNotify", issue: issueId, role, error: (err as Error).message ?? String(err) }).catch(() => {}); - }); + } } // Transition label first (critical — if this fails, issue still has correct state) @@ -228,27 +243,31 @@ export async function executeCompletion(opts: { const updated = await provider.getIssue(issueId); const routing = detectStepRouting(updated.labels, "review") as "human" | "agent" | null; if (routing === "human" || routing === "agent") { - notify( - { - type: "reviewNeeded", - project: projectName, - issueId, - issueUrl: updated.web_url, - issueTitle: updated.title, - routing, - prUrl, - }, - { - workspaceDir, - config: notifyConfig, - channelId: notifyTarget?.channelId, - channel: notifyTarget?.channel ?? "telegram", - runtime, - accountId: notifyTarget?.accountId, - }, - ).catch((err) => { + try { + await notify( + { + type: "reviewNeeded", + project: projectName, + issueId, + issueUrl: updated.web_url, + issueTitle: updated.title, + routing, + prUrl, + }, + { + workspaceDir, + config: notifyConfig, + channelId: notifyTarget?.channelId, + channel: notifyTarget?.channel ?? "telegram", + runtime, + accountId: notifyTarget?.accountId, + runCommand: rc, + messageThreadId: notifyTarget?.messageThreadId, + }, + ); + } catch (err) { auditLog(workspaceDir, "pipeline_warning", { step: "reviewNotify", issue: issueId, role, error: (err as Error).message ?? String(err) }).catch(() => {}); - }); + } } } diff --git a/lib/workflow/labels.ts b/lib/workflow/labels.ts index cb712693..773ed0b1 100644 --- a/lib/workflow/labels.ts +++ b/lib/workflow/labels.ts @@ -44,8 +44,8 @@ export function getNotifyLabel(channel: string, nameOrIndex: string): string { */ export function resolveNotifyChannel( issueLabels: string[], - channels: Array<{ channelId: string; channel: string; name?: string; accountId?: string }>, -): { channelId: string; channel: string; accountId?: string } | undefined { + channels: Array<{ channelId: string; channel: string; name?: string; accountId?: string; messageThreadId?: number }>, +): { channelId: string; channel: string; accountId?: string; messageThreadId?: number } | undefined { const notifyLabel = issueLabels.find((l) => l.startsWith(NOTIFY_LABEL_PREFIX)); if (notifyLabel) { const value = notifyLabel.slice(NOTIFY_LABEL_PREFIX.length); From 9a17b5a8c443a856415cd3cc9fd8bd3e03ce7423 Mon Sep 17 00:00:00 2001 From: fujiwaranosai850 Date: Sun, 3 May 2026 11:00:25 +0000 Subject: [PATCH 2/2] fix: align notification export with future upstream routing --- lib/dispatch/index.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/dispatch/index.ts b/lib/dispatch/index.ts index 9e418017..84db7193 100644 --- a/lib/dispatch/index.ts +++ b/lib/dispatch/index.ts @@ -132,7 +132,11 @@ export async function dispatchTask( // Compute session key deterministically (avoids waiting for gateway) // Slot name provides both collision prevention and human-readable identity const botName = slotName(project.name, role, level, slotIndex); - const sessionKey = `agent:${agentId ?? "unknown"}:subagent:${project.name}-${role}-${level}-${botName.toLowerCase()}`; + // Use project.slug (always lowercase) to build session key. + // project.name may have mixed case (e.g. "UpMoltWork"), which caused heartbeat + // mismatches when the gateway stores session keys in lowercase format. + const projectKey = (project.slug ?? project.name).toLowerCase(); + const sessionKey = `agent:${agentId ?? "unknown"}:subagent:${projectKey}-${role}-${level}-${botName.toLowerCase()}`; // Clear stale session key if it doesn't match the current deterministic key // (handles migration from old numeric format like ...-0 to name-based ...-Cordelia) @@ -295,6 +299,7 @@ export async function dispatchTask( dispatchTimeoutMs: timeouts.dispatchMs, extraSystemPrompt: roleInstructions.trim() || undefined, runCommand: rc, + notifyTarget, }); // Step 5: Update worker state