Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ode",
"version": "0.1.27",
"version": "0.1.28",
"description": "Coding anywhere with your coding agents connected",
"module": "packages/core/index.ts",
"type": "module",
Expand Down
66 changes: 60 additions & 6 deletions packages/core/cron/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,58 @@ function resolveInboxModelForCron(job: CronJobRecord, options: ReturnType<typeof
return fallbackModel && fallbackModel.length > 0 ? fallbackModel : null;
}

async function sendResultToChannel(job: CronJobRecord, text: string): Promise<void> {
async function sendResultToChannel(
job: CronJobRecord,
text: string,
): Promise<string | undefined> {
if (job.platform === "slack") {
await sendSlackChannelMessage(job.channelId, text);
return;
return sendSlackChannelMessage(job.channelId, text);
}
if (job.platform === "discord") {
await sendDiscordChannelMessage(job.channelId, text);
return sendDiscordChannelMessage(job.channelId, text);
}
return sendLarkChannelMessage(job.channelId, text);
}

/**
* After the cron run posts its result as a top-level channel message,
* mirror the synthetic thread's session onto the real platform-assigned
* thread id so humans replying in that thread are routed to this run's
* agent session and can claim ownership (see
* `packages/ims/shared/synthetic-owner.ts`).
*/
function seedCronChannelThreadSession(params: {
platform: "slack" | "discord" | "lark";
channelId: string;
realThreadId: string;
sessionId: string;
providerId: PersistedSession["providerId"];
workingDirectory: string;
syntheticOwnerId: string;
branchName?: string;
}): void {
const existing = loadSession(params.channelId, params.realThreadId);
if (existing) {
existing.lastActivityBotId = "cron-job";
saveSession(existing);
return;
}
await sendLarkChannelMessage(job.channelId, text);
const now = Date.now();
const session: PersistedSession = {
sessionId: params.sessionId,
providerId: params.providerId,
platform: params.platform,
channelId: params.channelId,
threadId: params.realThreadId,
workingDirectory: params.workingDirectory,
threadOwnerUserId: params.syntheticOwnerId,
participantBotIds: ["cron-job"],
createdAt: now,
lastActivityAt: now,
lastActivityBotId: "cron-job",
branchName: params.branchName,
};
saveSession(session);
}

function buildCronAgentContext(job: CronJobRecord, runId: string): OpenCodeMessageContext {
Expand Down Expand Up @@ -243,7 +285,19 @@ async function runCronJob(job: CronJobRecord, minuteStartMs: number): Promise<vo
);
const finalText = buildFinalResponseText(responses) ?? "_Done_";

await sendResultToChannel(job, finalText);
const realThreadId = await sendResultToChannel(job, finalText);
if (realThreadId) {
seedCronChannelThreadSession({
platform: job.platform,
channelId: job.channelId,
realThreadId,
sessionId,
providerId,
workingDirectory: cwd,
syntheticOwnerId: getCronUserId(job.id),
branchName: session.branchName,
});
}
if (agentResultDetailId) {
try {
completeAgentResult({
Expand Down
9 changes: 8 additions & 1 deletion packages/core/kernel/pending-question.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
import { buildQuestionAnswers, formatSingleQuestionPrompt } from "@/core/runtime/helpers";
import type { AgentAdapter, IMAdapter } from "@/core/types";
import type { RuntimeRequestContext } from "@/core/kernel/request-context";
import { isSyntheticOwner } from "@/ims/shared/synthetic-owner";
import { log } from "@/utils";

export async function handlePendingQuestionReply(params: {
Expand All @@ -33,7 +34,13 @@ export async function handlePendingQuestionReply(params: {

const session = loadSession(context.channelId, context.threadId);
const threadOwnerUserId = session?.threadOwnerUserId;
if (threadOwnerUserId && threadOwnerUserId !== context.userId) {
// Synthetic owners (task:/cron:) are claimable by any real human, so
// don't gate pending-question replies on them.
if (
threadOwnerUserId
&& !isSyntheticOwner(threadOwnerUserId)
&& threadOwnerUserId !== context.userId
) {
return false;
}

Expand Down
10 changes: 8 additions & 2 deletions packages/core/kernel/session-bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { buildSessionEnvironment, prepareSessionWorkspace } from "@/core/session
import { categorizeRuntimeError } from "@/core/runtime/helpers";
import type { AgentAdapter, IMAdapter } from "@/core/types";
import type { RuntimeRequestContext } from "@/core/kernel/request-context";
import { isSyntheticOwner } from "@/ims/shared/synthetic-owner";
import { log } from "@/utils";
import { createHash } from "crypto";

Expand Down Expand Up @@ -58,7 +59,12 @@ export async function prepareRuntimeSession(params: {
if (session?.workingDirectory) {
cwd = session.workingDirectory;
}
const threadOwnerUserId = session?.threadOwnerUserId ?? context.userId;
// A synthetic owner (task:/cron:) is a placeholder stored for a thread
// Ode started on its own; the first real human to reply claims the
// thread. Treat synthetic owners as "unset" so the current user wins.
const existingOwner = session?.threadOwnerUserId;
const hasRealOwner = !!existingOwner && !isSyntheticOwner(existingOwner);
const threadOwnerUserId = hasRealOwner ? existingOwner : context.userId;
const { env: sessionEnv, gitIdentity } = buildSessionEnvironment({
threadOwnerUserId,
});
Expand Down Expand Up @@ -141,7 +147,7 @@ export async function prepareRuntimeSession(params: {
session.workingDirectory = cwd;
}

if (!session.threadOwnerUserId) {
if (!session.threadOwnerUserId || isSyntheticOwner(session.threadOwnerUserId)) {
session.threadOwnerUserId = threadOwnerUserId;
}
const participantBotIds = session.participantBotIds ?? [];
Expand Down
80 changes: 73 additions & 7 deletions packages/core/tasks/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,24 +130,73 @@ function resolveInboxModelForTask(
return fallbackModel && fallbackModel.length > 0 ? fallbackModel : null;
}

async function sendResultToChannel(task: TaskRecord, text: string): Promise<void> {
async function sendResultToChannel(
task: TaskRecord,
text: string,
): Promise<{ threadedReply: boolean; newThreadId: string | undefined }> {
if (task.platform === "slack") {
// Slack is the only platform with a stable "reply in thread" helper; use
// it whenever the caller anchored the task to a real thread so the reply
// lands back in the conversation. Without a thread the task posts at
// the top of the channel.
if (task.threadId && task.threadId.trim().length > 0) {
await sendSlackThreadMessage(task.channelId, task.threadId, text);
return;
return { threadedReply: true, newThreadId: undefined };
}
await sendSlackChannelMessage(task.channelId, text);
return;
const newThreadId = await sendSlackChannelMessage(task.channelId, text);
return { threadedReply: false, newThreadId };
}
if (task.platform === "discord") {
await sendDiscordChannelMessage(task.channelId, text);
const newThreadId = await sendDiscordChannelMessage(task.channelId, text);
return { threadedReply: false, newThreadId };
}
const newThreadId = await sendLarkChannelMessage(task.channelId, text);
return { threadedReply: false, newThreadId };
}

/**
* After a Task (or similar bot-initiated flow) posts a top-level channel
* message that creates a fresh thread, mirror the synthetic thread's
* session onto the real platform-assigned thread id. This makes the thread
* "active" for inbound routing and marks the owner as synthetic so the
* first human replier can claim the thread via session-bootstrap.
*/
function seedChannelThreadSession(params: {
platform: "slack" | "discord" | "lark";
channelId: string;
realThreadId: string;
sessionId: string;
providerId: PersistedSession["providerId"];
workingDirectory: string;
syntheticOwnerId: string;
botParticipantId: string;
branchName?: string;
}): void {
const existing = loadSession(params.channelId, params.realThreadId);
if (existing) {
// Respect any pre-existing session (should be rare — the thread was
// just created), but keep `lastActivityBotId` fresh so isThreadActive
// returns true for subsequent replies.
existing.lastActivityBotId = params.botParticipantId;
saveSession(existing);
return;
}
await sendLarkChannelMessage(task.channelId, text);
const now = Date.now();
const session: PersistedSession = {
sessionId: params.sessionId,
providerId: params.providerId,
platform: params.platform,
channelId: params.channelId,
threadId: params.realThreadId,
workingDirectory: params.workingDirectory,
threadOwnerUserId: params.syntheticOwnerId,
participantBotIds: [params.botParticipantId],
createdAt: now,
lastActivityAt: now,
lastActivityBotId: params.botParticipantId,
branchName: params.branchName,
};
saveSession(session);
}

function buildTaskAgentContext(task: TaskRecord): OpenCodeMessageContext {
Expand Down Expand Up @@ -311,7 +360,24 @@ async function runTask(task: TaskRecord): Promise<void> {
);
const finalText = buildFinalResponseText(responses) ?? "_Done_";

await sendResultToChannel(task, finalText);
await sendResultToChannel(task, finalText).then((outcome) => {
if (!outcome.threadedReply && outcome.newThreadId) {
// The task opened a brand-new channel thread. Mirror the synthetic
// session to the real thread id so humans replying there are
// routed to the same agent session and can claim ownership.
seedChannelThreadSession({
platform: task.platform,
channelId: task.channelId,
realThreadId: outcome.newThreadId,
sessionId,
providerId,
workingDirectory: cwd,
syntheticOwnerId: getTaskUserId(task.id),
botParticipantId: "task",
branchName: session.branchName,
});
}
});
if (agentResultDetailId) {
try {
completeAgentResult({
Expand Down
54 changes: 54 additions & 0 deletions packages/core/test/pending-question.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,4 +267,58 @@ describe("handlePendingQuestionReply", () => {

deleteSession(channelId, threadId);
});

it("lets any real human claim a pending question when owner is synthetic (task:)", async () => {
const channelId = "CQ-PENDING-3";
const threadId = "TQ-PENDING-3";
const replies: Array<Array<Array<string>>> = [];
const pending: PendingQuestion = {
requestId: "req-3",
sessionId: "ses-3",
askedAt: Date.now(),
questions: [{ question: "Q1" }],
collectedAnswers: [],
};

saveSession({
sessionId: "ses-3",
channelId,
threadId,
workingDirectory: "/tmp",
// Synthetic owner — thread was started by a one-time Task before any
// human joined. The first real human replier should be allowed in.
threadOwnerUserId: "task:abc123",
createdAt: Date.now(),
lastActivityAt: Date.now(),
pendingQuestion: pending,
});
setPendingQuestion(channelId, threadId, pending);

const handled = await handlePendingQuestionReply({
deps: {
agent: {
replyToQuestion: async ({ answers }: { answers: Array<Array<string>> }) => {
replies.push(answers);
},
} as any,
im: {
sendMessage: async () => undefined,
} as any,
},
pendingQuestion: pending,
context: {
channelId,
replyThreadId: threadId,
threadId,
userId: "U-FIRST-HUMAN",
messageId: `m-${Date.now()}-3`,
},
text: "real answer",
});

expect(handled).toBe(true);
expect(replies).toEqual([[["real answer"]]]);

deleteSession(channelId, threadId);
});
});
4 changes: 3 additions & 1 deletion packages/ims/discord/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
sendLauncherReplyForMessage,
} from "@/ims/discord/settings";
import { createProcessorManager } from "@/ims/shared/processor-manager";
import { isSyntheticOwner } from "@/ims/shared/synthetic-owner";
import {
buildMeaningfulThreadName,
cleanBotMention,
Expand Down Expand Up @@ -512,7 +513,8 @@ async function startDiscordRuntimeInternal(reason: string): Promise<boolean> {
messageId: message.id,
userId: message.author.id,
selfMessage: false,
threadOwnerMessage: threadSession?.threadOwnerUserId === message.author.id,
threadOwnerMessage: isSyntheticOwner(threadSession?.threadOwnerUserId)
|| threadSession?.threadOwnerUserId === message.author.id,
isTopLevel: false,
hasAnyMention,
mentionedBot: mentioned,
Expand Down
4 changes: 3 additions & 1 deletion packages/ims/lark/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {
} from "./settings";
import { refreshSettingsProviderData } from "@/ims/shared/settings-provider-data";
import { createProcessorManager } from "@/ims/shared/processor-manager";
import { isSyntheticOwner } from "@/ims/shared/synthetic-owner";
import {
extractFormValues,
firstNonEmptyString,
Expand Down Expand Up @@ -1123,7 +1124,8 @@ async function processLarkIncomingEvent(event: LarkIncomingEvent, processorAppId
messageId,
userId: senderOpenId,
selfMessage: isSelfMessage,
threadOwnerMessage: threadSession?.threadOwnerUserId === senderOpenId,
threadOwnerMessage: isSyntheticOwner(threadSession?.threadOwnerUserId)
|| threadSession?.threadOwnerUserId === senderOpenId,
isTopLevel: topLevelMessage,
hasAnyMention,
mentionedBot: isMentioned,
Expand Down
32 changes: 32 additions & 0 deletions packages/ims/shared/synthetic-owner.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { describe, expect, it } from "bun:test";
import { isSyntheticOwner } from "./synthetic-owner";

describe("isSyntheticOwner", () => {
it("returns true for task: prefix", () => {
expect(isSyntheticOwner("task:abc123")).toBe(true);
});

it("returns true for cron-job: prefix (current cron id scheme)", () => {
expect(isSyntheticOwner("cron-job:daily-report")).toBe(true);
});

it("returns true for legacy cron: prefix", () => {
expect(isSyntheticOwner("cron:daily")).toBe(true);
});

it("returns false for real user ids", () => {
expect(isSyntheticOwner("U0AUCN52VJ4")).toBe(false);
expect(isSyntheticOwner("123456789")).toBe(false);
});

it("returns false for null/undefined/empty", () => {
expect(isSyntheticOwner(null)).toBe(false);
expect(isSyntheticOwner(undefined)).toBe(false);
expect(isSyntheticOwner("")).toBe(false);
});

it("only matches as prefix, not substring", () => {
expect(isSyntheticOwner("prefix-task:abc")).toBe(false);
expect(isSyntheticOwner("user-cron-job:x")).toBe(false);
});
});
Loading
Loading