From 5f94e0e6d30bd7201b6fa9cba10423186ef4991c Mon Sep 17 00:00:00 2001 From: Kai Liu Date: Sat, 18 Apr 2026 14:29:58 +0800 Subject: [PATCH 1/2] Let the first human replier claim task/cron-started threads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When Ode posts a top-level channel message from a one-time Task or Cron run, the resulting Slack/Discord/Lark thread used to belong to a "synthetic" owner (e.g. `task:{id}`) that never matches any real user. Any human who replied without an explicit @-mention was dropped as `not_mentioned_and_inactive`, and even with a mention the first human would fail the `isThreadOwner` gate in pending-question flows. Changes: - New shared helper `packages/ims/shared/synthetic-owner.ts` recognising the `task:`, `cron-job:`, and legacy `cron:` owner prefixes. - Slack/Discord/Lark routing now treat a synthetic owner as "claimable" — any real user replying is reported as `threadOwnerMessage: true`. - `prepareRuntimeSession` and `handlePendingQuestionReply` overwrite a synthetic `threadOwnerUserId` with the first real human user, so ownership is persisted against them from that reply onwards. - Task and Cron schedulers capture the real platform thread id returned by `sendChannelMessage` and seed a mirrored session under `(channelId, realThreadId)` pointing at the same agent sessionId. This makes the thread "active" for inbound routing so subsequent replies are processed without forcing the user to @-mention the bot. Tests: new unit tests for `isSyntheticOwner` and a pending-question case where a synthetic owner is claimed by a real user. All 267 tests pass (was 260). --- packages/core/cron/scheduler.ts | 66 +++++++++++++++-- packages/core/kernel/pending-question.ts | 9 ++- packages/core/kernel/session-bootstrap.ts | 10 ++- packages/core/tasks/scheduler.ts | 80 +++++++++++++++++++-- packages/core/test/pending-question.test.ts | 54 ++++++++++++++ packages/ims/discord/client.ts | 4 +- packages/ims/lark/client.ts | 4 +- packages/ims/shared/synthetic-owner.test.ts | 32 +++++++++ packages/ims/shared/synthetic-owner.ts | 19 +++++ packages/ims/slack/client.ts | 9 ++- 10 files changed, 268 insertions(+), 19 deletions(-) create mode 100644 packages/ims/shared/synthetic-owner.test.ts create mode 100644 packages/ims/shared/synthetic-owner.ts diff --git a/packages/core/cron/scheduler.ts b/packages/core/cron/scheduler.ts index 854bd3b..0f187a7 100644 --- a/packages/core/cron/scheduler.ts +++ b/packages/core/cron/scheduler.ts @@ -71,16 +71,58 @@ function resolveInboxModelForCron(job: CronJobRecord, options: ReturnType 0 ? fallbackModel : null; } -async function sendResultToChannel(job: CronJobRecord, text: string): Promise { +async function sendResultToChannel( + job: CronJobRecord, + text: string, +): Promise { if (job.platform === "slack") { - await sendSlackChannelMessage(job.channelId, text); - return; + return sendSlackChannelMessage(job.channelId, text); } if (job.platform === "discord") { - await sendDiscordChannelMessage(job.channelId, text); + return sendDiscordChannelMessage(job.channelId, text); + } + return sendLarkChannelMessage(job.channelId, text); +} + +/** + * After the cron run posts its result as a top-level channel message, + * mirror the synthetic thread's session onto the real platform-assigned + * thread id so humans replying in that thread are routed to this run's + * agent session and can claim ownership (see + * `packages/ims/shared/synthetic-owner.ts`). + */ +function seedCronChannelThreadSession(params: { + platform: "slack" | "discord" | "lark"; + channelId: string; + realThreadId: string; + sessionId: string; + providerId: PersistedSession["providerId"]; + workingDirectory: string; + syntheticOwnerId: string; + branchName?: string; +}): void { + const existing = loadSession(params.channelId, params.realThreadId); + if (existing) { + existing.lastActivityBotId = "cron-job"; + saveSession(existing); return; } - await sendLarkChannelMessage(job.channelId, text); + const now = Date.now(); + const session: PersistedSession = { + sessionId: params.sessionId, + providerId: params.providerId, + platform: params.platform, + channelId: params.channelId, + threadId: params.realThreadId, + workingDirectory: params.workingDirectory, + threadOwnerUserId: params.syntheticOwnerId, + participantBotIds: ["cron-job"], + createdAt: now, + lastActivityAt: now, + lastActivityBotId: "cron-job", + branchName: params.branchName, + }; + saveSession(session); } function buildCronAgentContext(job: CronJobRecord, runId: string): OpenCodeMessageContext { @@ -243,7 +285,19 @@ async function runCronJob(job: CronJobRecord, minuteStartMs: number): Promise 0 ? fallbackModel : null; } -async function sendResultToChannel(task: TaskRecord, text: string): Promise { +async function sendResultToChannel( + task: TaskRecord, + text: string, +): Promise<{ threadedReply: boolean; newThreadId: string | undefined }> { if (task.platform === "slack") { // Slack is the only platform with a stable "reply in thread" helper; use // it whenever the caller anchored the task to a real thread so the reply @@ -138,16 +141,62 @@ async function sendResultToChannel(task: TaskRecord, text: string): Promise 0) { await sendSlackThreadMessage(task.channelId, task.threadId, text); - return; + return { threadedReply: true, newThreadId: undefined }; } - await sendSlackChannelMessage(task.channelId, text); - return; + const newThreadId = await sendSlackChannelMessage(task.channelId, text); + return { threadedReply: false, newThreadId }; } if (task.platform === "discord") { - await sendDiscordChannelMessage(task.channelId, text); + const newThreadId = await sendDiscordChannelMessage(task.channelId, text); + return { threadedReply: false, newThreadId }; + } + const newThreadId = await sendLarkChannelMessage(task.channelId, text); + return { threadedReply: false, newThreadId }; +} + +/** + * After a Task (or similar bot-initiated flow) posts a top-level channel + * message that creates a fresh thread, mirror the synthetic thread's + * session onto the real platform-assigned thread id. This makes the thread + * "active" for inbound routing and marks the owner as synthetic so the + * first human replier can claim the thread via session-bootstrap. + */ +function seedChannelThreadSession(params: { + platform: "slack" | "discord" | "lark"; + channelId: string; + realThreadId: string; + sessionId: string; + providerId: PersistedSession["providerId"]; + workingDirectory: string; + syntheticOwnerId: string; + botParticipantId: string; + branchName?: string; +}): void { + const existing = loadSession(params.channelId, params.realThreadId); + if (existing) { + // Respect any pre-existing session (should be rare — the thread was + // just created), but keep `lastActivityBotId` fresh so isThreadActive + // returns true for subsequent replies. + existing.lastActivityBotId = params.botParticipantId; + saveSession(existing); return; } - await sendLarkChannelMessage(task.channelId, text); + const now = Date.now(); + const session: PersistedSession = { + sessionId: params.sessionId, + providerId: params.providerId, + platform: params.platform, + channelId: params.channelId, + threadId: params.realThreadId, + workingDirectory: params.workingDirectory, + threadOwnerUserId: params.syntheticOwnerId, + participantBotIds: [params.botParticipantId], + createdAt: now, + lastActivityAt: now, + lastActivityBotId: params.botParticipantId, + branchName: params.branchName, + }; + saveSession(session); } function buildTaskAgentContext(task: TaskRecord): OpenCodeMessageContext { @@ -311,7 +360,24 @@ async function runTask(task: TaskRecord): Promise { ); const finalText = buildFinalResponseText(responses) ?? "_Done_"; - await sendResultToChannel(task, finalText); + await sendResultToChannel(task, finalText).then((outcome) => { + if (!outcome.threadedReply && outcome.newThreadId) { + // The task opened a brand-new channel thread. Mirror the synthetic + // session to the real thread id so humans replying there are + // routed to the same agent session and can claim ownership. + seedChannelThreadSession({ + platform: task.platform, + channelId: task.channelId, + realThreadId: outcome.newThreadId, + sessionId, + providerId, + workingDirectory: cwd, + syntheticOwnerId: getTaskUserId(task.id), + botParticipantId: "task", + branchName: session.branchName, + }); + } + }); if (agentResultDetailId) { try { completeAgentResult({ diff --git a/packages/core/test/pending-question.test.ts b/packages/core/test/pending-question.test.ts index 05689f5..0d97b68 100644 --- a/packages/core/test/pending-question.test.ts +++ b/packages/core/test/pending-question.test.ts @@ -193,4 +193,58 @@ describe("handlePendingQuestionReply", () => { deleteSession(channelId, threadId); }); + + it("lets any real human claim a pending question when owner is synthetic (task:)", async () => { + const channelId = "CQ-PENDING-3"; + const threadId = "TQ-PENDING-3"; + const replies: Array>> = []; + const pending: PendingQuestion = { + requestId: "req-3", + sessionId: "ses-3", + askedAt: Date.now(), + questions: [{ question: "Q1" }], + collectedAnswers: [], + }; + + saveSession({ + sessionId: "ses-3", + channelId, + threadId, + workingDirectory: "/tmp", + // Synthetic owner — thread was started by a one-time Task before any + // human joined. The first real human replier should be allowed in. + threadOwnerUserId: "task:abc123", + createdAt: Date.now(), + lastActivityAt: Date.now(), + pendingQuestion: pending, + }); + setPendingQuestion(channelId, threadId, pending); + + const handled = await handlePendingQuestionReply({ + deps: { + agent: { + replyToQuestion: async ({ answers }: { answers: Array> }) => { + replies.push(answers); + }, + } as any, + im: { + sendMessage: async () => undefined, + } as any, + }, + pendingQuestion: pending, + context: { + channelId, + replyThreadId: threadId, + threadId, + userId: "U-FIRST-HUMAN", + messageId: `m-${Date.now()}-3`, + }, + text: "real answer", + }); + + expect(handled).toBe(true); + expect(replies).toEqual([[["real answer"]]]); + + deleteSession(channelId, threadId); + }); }); diff --git a/packages/ims/discord/client.ts b/packages/ims/discord/client.ts index 59f1d07..2d021ff 100644 --- a/packages/ims/discord/client.ts +++ b/packages/ims/discord/client.ts @@ -32,6 +32,7 @@ import { sendLauncherReplyForMessage, } from "@/ims/discord/settings"; import { createProcessorManager } from "@/ims/shared/processor-manager"; +import { isSyntheticOwner } from "@/ims/shared/synthetic-owner"; import { buildMeaningfulThreadName, cleanBotMention, @@ -512,7 +513,8 @@ async function startDiscordRuntimeInternal(reason: string): Promise { messageId: message.id, userId: message.author.id, selfMessage: false, - threadOwnerMessage: threadSession?.threadOwnerUserId === message.author.id, + threadOwnerMessage: isSyntheticOwner(threadSession?.threadOwnerUserId) + || threadSession?.threadOwnerUserId === message.author.id, isTopLevel: false, hasAnyMention, mentionedBot: mentioned, diff --git a/packages/ims/lark/client.ts b/packages/ims/lark/client.ts index c1d0c0a..6f0ce83 100644 --- a/packages/ims/lark/client.ts +++ b/packages/ims/lark/client.ts @@ -42,6 +42,7 @@ import { } from "./settings"; import { refreshSettingsProviderData } from "@/ims/shared/settings-provider-data"; import { createProcessorManager } from "@/ims/shared/processor-manager"; +import { isSyntheticOwner } from "@/ims/shared/synthetic-owner"; import { extractFormValues, firstNonEmptyString, @@ -1123,7 +1124,8 @@ async function processLarkIncomingEvent(event: LarkIncomingEvent, processorAppId messageId, userId: senderOpenId, selfMessage: isSelfMessage, - threadOwnerMessage: threadSession?.threadOwnerUserId === senderOpenId, + threadOwnerMessage: isSyntheticOwner(threadSession?.threadOwnerUserId) + || threadSession?.threadOwnerUserId === senderOpenId, isTopLevel: topLevelMessage, hasAnyMention, mentionedBot: isMentioned, diff --git a/packages/ims/shared/synthetic-owner.test.ts b/packages/ims/shared/synthetic-owner.test.ts new file mode 100644 index 0000000..0f8732d --- /dev/null +++ b/packages/ims/shared/synthetic-owner.test.ts @@ -0,0 +1,32 @@ +import { describe, expect, it } from "bun:test"; +import { isSyntheticOwner } from "./synthetic-owner"; + +describe("isSyntheticOwner", () => { + it("returns true for task: prefix", () => { + expect(isSyntheticOwner("task:abc123")).toBe(true); + }); + + it("returns true for cron-job: prefix (current cron id scheme)", () => { + expect(isSyntheticOwner("cron-job:daily-report")).toBe(true); + }); + + it("returns true for legacy cron: prefix", () => { + expect(isSyntheticOwner("cron:daily")).toBe(true); + }); + + it("returns false for real user ids", () => { + expect(isSyntheticOwner("U0AUCN52VJ4")).toBe(false); + expect(isSyntheticOwner("123456789")).toBe(false); + }); + + it("returns false for null/undefined/empty", () => { + expect(isSyntheticOwner(null)).toBe(false); + expect(isSyntheticOwner(undefined)).toBe(false); + expect(isSyntheticOwner("")).toBe(false); + }); + + it("only matches as prefix, not substring", () => { + expect(isSyntheticOwner("prefix-task:abc")).toBe(false); + expect(isSyntheticOwner("user-cron-job:x")).toBe(false); + }); +}); diff --git a/packages/ims/shared/synthetic-owner.ts b/packages/ims/shared/synthetic-owner.ts new file mode 100644 index 0000000..616b414 --- /dev/null +++ b/packages/ims/shared/synthetic-owner.ts @@ -0,0 +1,19 @@ +/** + * A "synthetic" thread owner is an internal marker for threads that Ode + * started on its own (one-time Tasks, scheduled Cron jobs, etc.), before + * any human joined the conversation. These user IDs use the form + * `task:{taskId}` or `cron-job:{jobId}` and never correspond to a real + * Slack / Discord / Lark user. + * + * Synthetic owners are _claimable_: the first real human to reply in such + * a thread becomes the persistent `threadOwnerUserId`. Downstream policy + * and session-bootstrap code treats a synthetic owner as "unset" so the + * owner gate does not reject real humans. + */ + +const SYNTHETIC_OWNER_PREFIXES = ["task:", "cron-job:", "cron:"] as const; + +export function isSyntheticOwner(userId: string | null | undefined): boolean { + if (!userId) return false; + return SYNTHETIC_OWNER_PREFIXES.some((prefix) => userId.startsWith(prefix)); +} diff --git a/packages/ims/slack/client.ts b/packages/ims/slack/client.ts index d2c87dd..292c758 100644 --- a/packages/ims/slack/client.ts +++ b/packages/ims/slack/client.ts @@ -30,6 +30,7 @@ import { createProcessorManager } from "@/ims/shared/processor-manager"; import { SlackAuthRegistry, type WorkspaceAuth } from "@/ims/slack/state/auth-registry"; import { SlackMessageUpdateManager } from "@/ims/slack/message-update-manager"; import { deliveryStats, isRateLimitError } from "@/ims/shared/delivery-stats"; +import { isSyntheticOwner } from "@/ims/shared/synthetic-owner"; export interface MessageContext { channelId: string; @@ -679,7 +680,13 @@ export function setupMessageHandlers(): void { }, isThreadOwner: (channelId, threadId, userId) => { const session = loadSession(channelId, threadId); - return session?.threadOwnerUserId === userId; + const owner = session?.threadOwnerUserId; + if (!owner) return false; + // Synthetic owners (task:/cron:) are placeholders for bot-started + // threads; treat any real user as the claimable owner so the first + // human replier can adopt the thread. + if (isSyntheticOwner(owner)) return true; + return owner === userId; }, isThreadActive, postGeneralSettingsLauncher: postSlackGeneralSettingsLauncher, From d909d3428c88e9e9ab5d39bcb052fe6e3649794d Mon Sep 17 00:00:00 2001 From: Kai Liu Date: Sat, 18 Apr 2026 14:30:57 +0800 Subject: [PATCH 2/2] Bump version to 0.1.28 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 61b4379..3ec5840 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ode", - "version": "0.1.27", + "version": "0.1.28", "description": "Coding anywhere with your coding agents connected", "module": "packages/core/index.ts", "type": "module",