From 32bb408a258cbb9808edadc7040d84af2497da34 Mon Sep 17 00:00:00 2001 From: Kai Liu Date: Sat, 18 Apr 2026 13:31:20 +0800 Subject: [PATCH 1/4] Add one-time Task scheduler for agent-driven follow-ups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tasks are one-shot scheduled prompts: the scheduler fires each one exactly once at an absolute time, then posts the agent result back to the channel (or thread, if anchored). This lets agents schedule follow-ups and return instead of blocking on long waits (deploys, nightly builds, approvals). - Storage: new 'tasks' table in shared SQLite (~/.config/ode/inbox.db) with pending → running → success|failed|cancelled state machine. Cross-process idempotency via atomic UPDATE WHERE status='pending'. - Scheduler: polls every 10s, reuses an existing thread's session when --thread is set (preserving context) or posts as a new channel message under a synthetic task:{id} thread otherwise. - HTTP API at /api/tasks* mirrors the cron-jobs route surface. - CLI: ode task create/list/show/cancel/delete/run, talking to the local daemon API rather than the DB directly. - Agent awareness: system prompt gains a ONE-TIME SCHEDULED TASKS section so all providers learn when and how to schedule follow-ups. - Web UI: /tasks settings page + sidebar entry next to Cron Jobs. --- AGENTS.md | 11 + packages/agents/shared.ts | 8 + packages/config/local/inbox.ts | 2 +- packages/config/local/tasks.test.ts | 245 +++++++ packages/config/local/tasks.ts | 525 +++++++++++++++ packages/core/cli-handlers/task.ts | 294 +++++++++ packages/core/cli.ts | 9 + packages/core/index.ts | 3 + packages/core/tasks/scheduler.ts | 400 ++++++++++++ packages/core/web/app.ts | 2 + packages/core/web/routes/tasks.ts | 270 ++++++++ .../src/routes/(settings)/+layout.svelte | 17 +- .../src/routes/(settings)/tasks/+page.svelte | 612 ++++++++++++++++++ 13 files changed, 2393 insertions(+), 5 deletions(-) create mode 100644 packages/config/local/tasks.test.ts create mode 100644 packages/config/local/tasks.ts create mode 100644 packages/core/cli-handlers/task.ts create mode 100644 packages/core/tasks/scheduler.ts create mode 100644 packages/core/web/routes/tasks.ts create mode 100644 packages/web-ui/src/routes/(settings)/tasks/+page.svelte diff --git a/AGENTS.md b/AGENTS.md index 25616c3c..52bf1e94 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -25,6 +25,17 @@ Ode is a Slack bot that bridges messages to OpenCode for AI-assisted coding. - Prod: `./start.sh` - User: `@ode ` and `stop` +## One-time Tasks (`ode task`) +- A Task is a one-shot scheduled prompt: the scheduler fires it exactly once at an absolute time, then posts the agent result back to the channel (or thread, if anchored). +- CLI: + - `ode task create --time --channel --message "" [--thread ] [--title ] [--agent <agentId>] [--run-now]` + - `ode task list [--status pending|running|success|failed|cancelled] [--json]` + - `ode task show <id>` / `ode task cancel <id>` / `ode task delete <id>` / `ode task run <id>` +- When `--thread` is set, the scheduler reuses the existing thread's session so the agent wakes up with full context. When `--thread` is omitted, the task posts as a new channel message under a synthetic thread (`task:{id}`). +- Agents should prefer scheduling a Task instead of blocking on long waits (deploys, overnight builds, approvals): schedule the follow-up and return. +- Persistence: SQLite at `~/.config/ode/inbox.db` (table `tasks`); scheduler polls every 10s and uses `UPDATE ... WHERE status='pending'` for cross-process idempotency. +- HTTP API mirrors the CLI under `/api/tasks*`; the Web UI lives at Settings → Tasks. + ## Bun conventions - Use Bun instead of Node.js - Run: `bun run src/index.ts` diff --git a/packages/agents/shared.ts b/packages/agents/shared.ts index 750a0678..909b8406 100644 --- a/packages/agents/shared.ts +++ b/packages/agents/shared.ts @@ -91,6 +91,14 @@ export function buildSystemPrompt(slack?: SlackContext): string { lines.push("- When sharing tasks, put each item on its own line"); lines.push("- Use four states: * not started, ♻️ in progress, ✅ done, 🚫 cancelled"); lines.push("- If you include a task list, keep the tasks you have done at the top of the response"); + lines.push(""); + lines.push("ONE-TIME SCHEDULED TASKS:"); + lines.push("- Ode provides a one-shot task scheduler for follow-ups that need to fire at a specific time."); + lines.push("- Use it when you need to wait on something that may take minutes, hours, or days (deploys, nightly builds, external approvals). Schedule a task and return instead of blocking the conversation."); + lines.push("- Create via CLI: `ode task create --time <ISO8601> --channel <channelId> [--thread <threadId>] --message \"<prompt>\" [--agent <agentId>]`."); + lines.push("- `--time` accepts ISO 8601 (e.g. `2026-04-19T09:00:00+08:00`). `--thread` is optional; when set, the task reuses this thread's session to keep context; when omitted, the task posts as a new channel message."); + lines.push("- When scheduling a follow-up for the current conversation, pass the current channel and thread so the agent wakes up with the same session history."); + lines.push("- Manage tasks with `ode task list`, `ode task show <id>`, `ode task cancel <id>`, `ode task delete <id>`. Tasks persist across restarts."); const channelSystemMessage = slack.channelSystemMessage?.trim(); if (channelSystemMessage) { diff --git a/packages/config/local/inbox.ts b/packages/config/local/inbox.ts index 1315962c..c515977d 100644 --- a/packages/config/local/inbox.ts +++ b/packages/config/local/inbox.ts @@ -29,7 +29,7 @@ export type MessageDetailKind = export type MessageDetailStatus = "pending" | "completed" | "failed"; -export type MessageThreadSourceKind = "user" | "cron_job"; +export type MessageThreadSourceKind = "user" | "cron_job" | "task"; export interface MessageThreadSummary { id: string; diff --git a/packages/config/local/tasks.test.ts b/packages/config/local/tasks.test.ts new file mode 100644 index 00000000..61de2c8a --- /dev/null +++ b/packages/config/local/tasks.test.ts @@ -0,0 +1,245 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import * as fs from "fs"; +import * as os from "os"; +import * as path from "path"; +import { invalidateOdeConfigCache, ODE_CONFIG_FILE } from "./ode-store"; +import { + cancelTask, + clearTasksForTests, + closeTaskDatabaseForTests, + createTask, + deleteTask, + getTaskById, + listDueTasks, + listTasks, + markTaskCompleted, + markTaskFailed, + markTaskTriggered, + updateTask, +} from "./tasks"; + +// We reuse the real `~/.config/ode/ode.json` path (resolved at module load) +// but swap its contents for test fixtures and restore them on teardown. +// The inbox SQLite DB is redirected to a temp dir via ODE_INBOX_DB_FILE so +// test data never touches the user's real inbox.db. +let tempDir: string; +let originalConfigEnv: string | undefined; +let originalConfigContent: string | null; +let originalConfigExisted: boolean; + +function writeTestOdeConfig(): void { + const config = { + user: {}, + workspaces: [ + { + id: "ws-test", + name: "Test Workspace", + type: "slack", + channelDetails: [ + { id: "C_TEST", name: "general" }, + { id: "C_OTHER", name: "random" }, + ], + }, + ], + }; + fs.mkdirSync(path.dirname(ODE_CONFIG_FILE), { recursive: true }); + fs.writeFileSync(ODE_CONFIG_FILE, JSON.stringify(config)); + invalidateOdeConfigCache(); +} + +beforeEach(() => { + tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "ode-tasks-test-")); + originalConfigEnv = process.env.ODE_INBOX_DB_FILE; + process.env.ODE_INBOX_DB_FILE = path.join(tempDir, "inbox.db"); + + originalConfigExisted = fs.existsSync(ODE_CONFIG_FILE); + originalConfigContent = originalConfigExisted ? fs.readFileSync(ODE_CONFIG_FILE, "utf-8") : null; + writeTestOdeConfig(); + + closeTaskDatabaseForTests(); + clearTasksForTests(); +}); + +afterEach(() => { + closeTaskDatabaseForTests(); + if (originalConfigEnv === undefined) { + delete process.env.ODE_INBOX_DB_FILE; + } else { + process.env.ODE_INBOX_DB_FILE = originalConfigEnv; + } + + // Restore the real ode.json so the user's config isn't left in a test state. + try { + if (originalConfigExisted && originalConfigContent !== null) { + fs.writeFileSync(ODE_CONFIG_FILE, originalConfigContent); + } else { + fs.rmSync(ODE_CONFIG_FILE, { force: true }); + } + } catch { + // Best effort. + } + invalidateOdeConfigCache(); + + try { + fs.rmSync(tempDir, { recursive: true, force: true }); + } catch { + // Best effort. + } +}); + +describe("tasks storage", () => { + test("createTask persists fields and resolves channel snapshot", () => { + const scheduledAt = Date.now() + 60_000; + const task = createTask({ + title: "Check deploy", + scheduledAt, + channelId: "C_TEST", + threadId: "1234.5678", + messageText: "Check deployment status", + agent: "opencode", + }); + + expect(task.id).toBeTruthy(); + expect(task.title).toBe("Check deploy"); + expect(task.scheduledAt).toBe(scheduledAt); + expect(task.platform).toBe("slack"); + expect(task.workspaceId).toBe("ws-test"); + expect(task.workspaceName).toBe("Test Workspace"); + expect(task.channelId).toBe("C_TEST"); + expect(task.channelName).toBe("general"); + expect(task.threadId).toBe("1234.5678"); + expect(task.agent).toBe("opencode"); + expect(task.status).toBe("pending"); + expect(task.lastError).toBeNull(); + }); + + test("createTask rejects unknown channel", () => { + expect(() => + createTask({ + title: "x", + scheduledAt: Date.now() + 1000, + channelId: "C_UNKNOWN", + messageText: "hi", + }), + ).toThrow(/Channel not found/); + }); + + test("createTask normalizes seconds-valued scheduledAt to milliseconds", () => { + const seconds = Math.floor(Date.now() / 1000) + 60; + const task = createTask({ + title: "s", + scheduledAt: seconds, + channelId: "C_TEST", + messageText: "hi", + }); + expect(task.scheduledAt).toBe(seconds * 1000); + }); + + test("listDueTasks returns only pending tasks at or before now", () => { + const now = Date.now(); + const past = createTask({ + title: "past", + scheduledAt: now - 10_000, + channelId: "C_TEST", + messageText: "a", + }); + const future = createTask({ + title: "future", + scheduledAt: now + 60_000, + channelId: "C_TEST", + messageText: "b", + }); + + const due = listDueTasks(now); + expect(due.map((t) => t.id)).toContain(past.id); + expect(due.map((t) => t.id)).not.toContain(future.id); + }); + + test("markTaskTriggered is atomic: first caller wins, second caller is no-op", () => { + const task = createTask({ + title: "race", + scheduledAt: Date.now() - 1000, + channelId: "C_TEST", + messageText: "race me", + }); + + const first = markTaskTriggered(task.id); + const second = markTaskTriggered(task.id); + expect(first).toBe(true); + expect(second).toBe(false); + + const updated = getTaskById(task.id); + expect(updated?.status).toBe("running"); + expect(updated?.triggeredAt).not.toBeNull(); + }); + + test("markTaskCompleted and markTaskFailed set terminal status", () => { + const a = createTask({ title: "ok", scheduledAt: Date.now(), channelId: "C_TEST", messageText: "x" }); + const b = createTask({ title: "err", scheduledAt: Date.now(), channelId: "C_TEST", messageText: "y" }); + markTaskTriggered(a.id); + markTaskCompleted(a.id); + markTaskTriggered(b.id); + markTaskFailed(b.id, "boom"); + + expect(getTaskById(a.id)?.status).toBe("success"); + expect(getTaskById(b.id)?.status).toBe("failed"); + expect(getTaskById(b.id)?.lastError).toBe("boom"); + }); + + test("cancelTask only cancels pending tasks", () => { + const task = createTask({ title: "c", scheduledAt: Date.now() + 60_000, channelId: "C_TEST", messageText: "hi" }); + expect(cancelTask(task.id)).toBe(true); + expect(getTaskById(task.id)?.status).toBe("cancelled"); + // Second cancel is a no-op. + expect(cancelTask(task.id)).toBe(false); + + // Cannot cancel a running or completed task. + const running = createTask({ title: "r", scheduledAt: Date.now(), channelId: "C_TEST", messageText: "r" }); + markTaskTriggered(running.id); + expect(cancelTask(running.id)).toBe(false); + expect(getTaskById(running.id)?.status).toBe("running"); + }); + + test("updateTask rejects edits on non-pending tasks", () => { + const task = createTask({ title: "u", scheduledAt: Date.now() + 60_000, channelId: "C_TEST", messageText: "hi" }); + markTaskTriggered(task.id); + expect(() => updateTask(task.id, { title: "nope" })).toThrow(/pending/); + }); + + test("updateTask preserves unspecified fields", () => { + const task = createTask({ + title: "u2", + scheduledAt: Date.now() + 60_000, + channelId: "C_TEST", + threadId: "T1", + messageText: "original", + agent: "opencode", + }); + const updated = updateTask(task.id, { messageText: "new text" }); + expect(updated.messageText).toBe("new text"); + expect(updated.title).toBe(task.title); + expect(updated.channelId).toBe("C_TEST"); + expect(updated.threadId).toBe("T1"); + expect(updated.agent).toBe("opencode"); + expect(updated.scheduledAt).toBe(task.scheduledAt); + }); + + test("deleteTask removes the record", () => { + const task = createTask({ title: "d", scheduledAt: Date.now(), channelId: "C_TEST", messageText: "x" }); + deleteTask(task.id); + expect(getTaskById(task.id)).toBeNull(); + }); + + test("listTasks orders running first, then pending by scheduled time", () => { + const now = Date.now(); + const later = createTask({ title: "later", scheduledAt: now + 120_000, channelId: "C_TEST", messageText: "l" }); + const sooner = createTask({ title: "sooner", scheduledAt: now + 60_000, channelId: "C_TEST", messageText: "s" }); + const runningTask = createTask({ title: "running", scheduledAt: now, channelId: "C_TEST", messageText: "r" }); + markTaskTriggered(runningTask.id); + + const list = listTasks(); + expect(list[0]?.id).toBe(runningTask.id); + const pendingOrder = list.filter((t) => t.status === "pending").map((t) => t.id); + expect(pendingOrder).toEqual([sooner.id, later.id]); + }); +}); diff --git a/packages/config/local/tasks.ts b/packages/config/local/tasks.ts new file mode 100644 index 00000000..89ed6aa6 --- /dev/null +++ b/packages/config/local/tasks.ts @@ -0,0 +1,525 @@ +import { Database } from "bun:sqlite"; +import * as fs from "fs"; +import * as os from "os"; +import * as path from "path"; +import { loadOdeConfig } from "./ode-store"; + +// --------------------------------------------------------------------------- +// One-time scheduled "Task" storage. +// +// Conceptually a Task is a one-shot cron job: it carries a prompt to send to +// an agent at a specific absolute timestamp. Tasks are particularly useful +// for agents themselves — instead of blocking on a long wait, an agent can +// schedule a Task and return, letting the scheduler resume the conversation +// later. +// +// Storage mirrors `cron-jobs.ts` (shared SQLite DB at ~/.config/ode/inbox.db) +// so persistence, WAL, and test helpers stay consistent across the codebase. +// --------------------------------------------------------------------------- + +export type TaskPlatform = "slack" | "discord" | "lark"; +export type TaskStatus = "pending" | "running" | "success" | "failed" | "cancelled"; + +export type TaskRecord = { + id: string; + title: string; + scheduledAt: number; // unix ms, absolute + platform: TaskPlatform; + workspaceId: string | null; + workspaceName: string | null; + channelId: string; + channelName: string | null; + /** + * Optional thread anchor. When set, the scheduler reuses the existing + * session (if any) for (channelId, threadId) so the agent keeps context. + * When null, the task posts as a fresh channel message with its own + * synthetic thread id (`task:{id}`). + */ + threadId: string | null; + messageText: string; + /** + * Optional agent provider override (e.g. "opencode", "claudecode"). When + * null the scheduler uses the channel default resolved by the agent + * adapter. Stored verbatim for audit; the scheduler is responsible for + * validating the value. + */ + agent: string | null; + status: TaskStatus; + lastError: string | null; + triggeredAt: number | null; + completedAt: number | null; + createdAt: number; + updatedAt: number; +}; + +export type TaskChannelOption = { + value: string; + platform: TaskPlatform; + workspaceId: string; + workspaceName: string; + channelId: string; + channelName: string; + label: string; +}; + +export type CreateTaskParams = { + title: string; + scheduledAt: number; + channelId: string; + threadId?: string | null; + messageText: string; + agent?: string | null; +}; + +export type UpdateTaskParams = { + title?: string; + scheduledAt?: number; + channelId?: string; + threadId?: string | null; + messageText?: string; + agent?: string | null; +}; + +type TaskRow = { + id: string; + title: string; + scheduled_at: number; + platform: TaskPlatform; + workspace_id: string | null; + workspace_name: string | null; + channel_id: string; + channel_name: string | null; + thread_id: string | null; + message_text: string; + agent: string | null; + status: TaskStatus; + last_error: string | null; + triggered_at: number | null; + completed_at: number | null; + created_at: number; + updated_at: number; +}; + +const existsSync = fs.existsSync; +const mkdirSync = fs.mkdirSync; +const join = typeof path.join === "function" ? path.join : (...parts: string[]) => parts.join("/"); +const homedir = typeof os.homedir === "function" ? os.homedir : () => ""; + +const ODE_CONFIG_DIR = join(homedir(), ".config", "ode"); +const DEFAULT_DB_FILE = join(ODE_CONFIG_DIR, "inbox.db"); + +let cachedDatabase: { path: string; db: Database } | null = null; + +function resolveDbFile(): string { + const override = process.env.ODE_INBOX_DB_FILE?.trim(); + return override && override.length > 0 ? override : DEFAULT_DB_FILE; +} + +function ensureParentDir(filePath: string): void { + const dir = path.dirname(filePath); + if (!existsSync(dir)) { + mkdirSync(dir, { recursive: true }); + } +} + +function initializeDatabase(db: Database): void { + db.exec("PRAGMA journal_mode = WAL;"); + db.exec("PRAGMA busy_timeout = 5000;"); + db.exec(` + CREATE TABLE IF NOT EXISTS tasks ( + id TEXT PRIMARY KEY, + title TEXT NOT NULL, + scheduled_at INTEGER NOT NULL, + platform TEXT NOT NULL, + workspace_id TEXT, + workspace_name TEXT, + channel_id TEXT NOT NULL, + channel_name TEXT, + thread_id TEXT, + message_text TEXT NOT NULL, + agent TEXT, + status TEXT NOT NULL DEFAULT 'pending', + last_error TEXT, + triggered_at INTEGER, + completed_at INTEGER, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ); + `); + db.exec("CREATE INDEX IF NOT EXISTS idx_tasks_status_scheduled ON tasks(status, scheduled_at);"); + db.exec("CREATE INDEX IF NOT EXISTS idx_tasks_channel ON tasks(channel_id, scheduled_at DESC);"); +} + +function getDatabase(): Database { + const filePath = resolveDbFile(); + if (cachedDatabase?.path === filePath) { + return cachedDatabase.db; + } + + if (cachedDatabase) { + try { + cachedDatabase.db.close(); + } catch { + // Ignore close errors on path switch. + } + } + + ensureParentDir(filePath); + const db = new Database(filePath); + initializeDatabase(db); + cachedDatabase = { path: filePath, db }; + return db; +} + +function mapRow(row: TaskRow): TaskRecord { + return { + id: row.id, + title: row.title, + scheduledAt: row.scheduled_at, + platform: row.platform, + workspaceId: row.workspace_id, + workspaceName: row.workspace_name, + channelId: row.channel_id, + channelName: row.channel_name, + threadId: row.thread_id, + messageText: row.message_text, + agent: row.agent, + status: row.status, + lastError: row.last_error, + triggeredAt: row.triggered_at, + completedAt: row.completed_at, + createdAt: row.created_at, + updatedAt: row.updated_at, + }; +} + +function resolveConfigChannelId(channelId: string): string { + const trimmed = channelId.trim(); + if (!trimmed) return trimmed; + const delimiter = "::"; + const index = trimmed.lastIndexOf(delimiter); + if (index < 0) return trimmed; + const raw = trimmed.slice(index + delimiter.length).trim(); + return raw || trimmed; +} + +function getChannelSnapshot(channelId: string): { + platform: TaskPlatform; + workspaceId: string; + workspaceName: string; + channelId: string; + channelName: string; +} { + const resolvedChannelId = resolveConfigChannelId(channelId); + const config = loadOdeConfig(); + for (const workspace of config.workspaces) { + const channel = workspace.channelDetails.find((item) => item.id === resolvedChannelId); + if (!channel) continue; + return { + platform: workspace.type, + workspaceId: workspace.id, + workspaceName: workspace.name || workspace.id, + channelId: channel.id, + channelName: channel.name || channel.id, + }; + } + throw new Error("Channel not found in configured workspaces"); +} + +function normalizeTitle(value: string): string { + const normalized = value.trim(); + if (!normalized) { + throw new Error("Task title is required"); + } + return normalized; +} + +function normalizeMessageText(value: string): string { + const normalized = value.trim(); + if (!normalized) { + throw new Error("Task message is required"); + } + return normalized; +} + +function normalizeScheduledAt(value: number): number { + if (!Number.isFinite(value)) { + throw new Error("Task scheduledAt must be a finite number"); + } + // Accept seconds *or* milliseconds. Anything below 10^12 is treated as + // seconds (timestamps before year 33658 in seconds < 10^12 ms cutoff). + const normalized = value < 1e12 ? Math.floor(value * 1000) : Math.floor(value); + if (normalized <= 0) { + throw new Error("Task scheduledAt must be a positive timestamp"); + } + return normalized; +} + +function normalizeThreadId(value: string | null | undefined): string | null { + if (value === null || value === undefined) return null; + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : null; +} + +function normalizeAgent(value: string | null | undefined): string | null { + if (value === null || value === undefined) return null; + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : null; +} + +export function listTaskChannelOptions(): TaskChannelOption[] { + const config = loadOdeConfig(); + return config.workspaces.flatMap((workspace) => + workspace.channelDetails.map((channel) => ({ + value: `${workspace.id}::${channel.id}`, + platform: workspace.type, + workspaceId: workspace.id, + workspaceName: workspace.name || workspace.id, + channelId: channel.id, + channelName: channel.name || channel.id, + label: `${workspace.name || workspace.id} / ${channel.name || channel.id}`, + })) + ); +} + +export function listTasks(): TaskRecord[] { + const db = getDatabase(); + const rows = db.query(` + SELECT * + FROM tasks + ORDER BY + CASE status + WHEN 'running' THEN 0 + WHEN 'pending' THEN 1 + ELSE 2 + END, + scheduled_at ASC, + created_at DESC + `).all() as TaskRow[]; + return rows.map(mapRow); +} + +/** + * Return tasks that are candidates for the scheduler tick: pending rows whose + * scheduled time is at or before `nowMs`. Callers still need to race the + * atomic `markTaskTriggered` before actually running the task. + */ +export function listDueTasks(nowMs: number = Date.now()): TaskRecord[] { + const db = getDatabase(); + const rows = db.query(` + SELECT * + FROM tasks + WHERE status = 'pending' + AND scheduled_at <= ? + ORDER BY scheduled_at ASC + `).all(nowMs) as TaskRow[]; + return rows.map(mapRow); +} + +export function getTaskById(id: string): TaskRecord | null { + const db = getDatabase(); + const row = db.query("SELECT * FROM tasks WHERE id = ?").get(id) as TaskRow | null; + return row ? mapRow(row) : null; +} + +export function createTask(params: CreateTaskParams): TaskRecord { + const db = getDatabase(); + const channel = getChannelSnapshot(params.channelId); + const now = Date.now(); + const id = crypto.randomUUID(); + const title = normalizeTitle(params.title); + const scheduledAt = normalizeScheduledAt(params.scheduledAt); + const messageText = normalizeMessageText(params.messageText); + const threadId = normalizeThreadId(params.threadId); + const agent = normalizeAgent(params.agent); + + db.query(` + INSERT INTO tasks ( + id, + title, + scheduled_at, + platform, + workspace_id, + workspace_name, + channel_id, + channel_name, + thread_id, + message_text, + agent, + status, + created_at, + updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending', ?, ?) + `).run( + id, + title, + scheduledAt, + channel.platform, + channel.workspaceId, + channel.workspaceName, + channel.channelId, + channel.channelName, + threadId, + messageText, + agent, + now, + now + ); + + return getTaskById(id)!; +} + +export function updateTask(id: string, params: UpdateTaskParams): TaskRecord { + const existing = getTaskById(id); + if (!existing) { + throw new Error("Task not found"); + } + if (existing.status !== "pending") { + throw new Error("Only pending tasks can be updated"); + } + + const db = getDatabase(); + const now = Date.now(); + + const title = params.title !== undefined ? normalizeTitle(params.title) : existing.title; + const scheduledAt = params.scheduledAt !== undefined + ? normalizeScheduledAt(params.scheduledAt) + : existing.scheduledAt; + const messageText = params.messageText !== undefined + ? normalizeMessageText(params.messageText) + : existing.messageText; + const threadId = params.threadId !== undefined + ? normalizeThreadId(params.threadId) + : existing.threadId; + const agent = params.agent !== undefined ? normalizeAgent(params.agent) : existing.agent; + + let channelSnapshot = { + platform: existing.platform, + workspaceId: existing.workspaceId ?? "", + workspaceName: existing.workspaceName ?? "", + channelId: existing.channelId, + channelName: existing.channelName ?? "", + }; + if (params.channelId !== undefined) { + channelSnapshot = getChannelSnapshot(params.channelId); + } + + db.query(` + UPDATE tasks + SET + title = ?, + scheduled_at = ?, + platform = ?, + workspace_id = ?, + workspace_name = ?, + channel_id = ?, + channel_name = ?, + thread_id = ?, + message_text = ?, + agent = ?, + updated_at = ? + WHERE id = ? + `).run( + title, + scheduledAt, + channelSnapshot.platform, + channelSnapshot.workspaceId || null, + channelSnapshot.workspaceName || null, + channelSnapshot.channelId, + channelSnapshot.channelName || null, + threadId, + messageText, + agent, + now, + id + ); + + return getTaskById(id)!; +} + +export function deleteTask(id: string): void { + const db = getDatabase(); + db.query("DELETE FROM tasks WHERE id = ?").run(id); +} + +/** + * Atomically claim a pending task for execution. Returns true if this caller + * won the race; false if another scheduler tick (or a manual trigger) got + * there first. This is the cross-process idempotency key. + */ +export function markTaskTriggered(id: string): boolean { + const db = getDatabase(); + const now = Date.now(); + const result = db.query(` + UPDATE tasks + SET + status = 'running', + triggered_at = ?, + last_error = NULL, + updated_at = ? + WHERE id = ? AND status = 'pending' + `).run(now, now, id); + return result.changes > 0; +} + +export function markTaskCompleted(id: string): void { + const db = getDatabase(); + const now = Date.now(); + db.query(` + UPDATE tasks + SET + status = 'success', + completed_at = ?, + last_error = NULL, + updated_at = ? + WHERE id = ? + `).run(now, now, id); +} + +export function markTaskFailed(id: string, errorMessage: string): void { + const db = getDatabase(); + const now = Date.now(); + db.query(` + UPDATE tasks + SET + status = 'failed', + last_error = ?, + completed_at = ?, + updated_at = ? + WHERE id = ? + `).run(errorMessage, now, now, id); +} + +/** + * Cancel a pending task. Returns true if the task was cancellable (was + * pending). Returns false if the task was already running / finished / + * cancelled / missing — callers can treat that as a no-op. + */ +export function cancelTask(id: string): boolean { + const db = getDatabase(); + const now = Date.now(); + const result = db.query(` + UPDATE tasks + SET + status = 'cancelled', + updated_at = ? + WHERE id = ? AND status = 'pending' + `).run(now, id); + return result.changes > 0; +} + +export function clearTasksForTests(): void { + const db = getDatabase(); + db.exec("DELETE FROM tasks;"); +} + +export function closeTaskDatabaseForTests(): void { + if (!cachedDatabase) return; + try { + cachedDatabase.db.close(); + } catch { + // Ignore close errors in tests. + } finally { + cachedDatabase = null; + } +} diff --git a/packages/core/cli-handlers/task.ts b/packages/core/cli-handlers/task.ts new file mode 100644 index 00000000..140fca72 --- /dev/null +++ b/packages/core/cli-handlers/task.ts @@ -0,0 +1,294 @@ +import { getWebHost, getWebPort } from "@/config"; +import type { TaskRecord } from "@/config/local/tasks"; + +type CliArgs = string[]; + +type FlagSpec = Record<string, boolean>; // name -> whether it takes a value + +function parseFlags(args: CliArgs, specs: FlagSpec): { flags: Record<string, string | boolean>; positional: string[] } { + const flags: Record<string, string | boolean> = {}; + const positional: string[] = []; + for (let i = 0; i < args.length; i += 1) { + const arg = args[i] ?? ""; + if (arg.startsWith("--")) { + const eqIdx = arg.indexOf("="); + let name: string; + let value: string | undefined; + if (eqIdx >= 0) { + name = arg.slice(2, eqIdx); + value = arg.slice(eqIdx + 1); + } else { + name = arg.slice(2); + } + const takesValue = specs[name]; + if (takesValue === undefined) { + throw new Error(`Unknown flag: --${name}`); + } + if (!takesValue) { + flags[name] = true; + continue; + } + if (value === undefined) { + const next = args[i + 1]; + if (next === undefined || next.startsWith("--")) { + throw new Error(`Flag --${name} requires a value`); + } + value = next; + i += 1; + } + flags[name] = value; + } else { + positional.push(arg); + } + } + return { flags, positional }; +} + +function apiBase(): string { + return `http://${getWebHost()}:${getWebPort()}`; +} + +type ApiResponse<T> = { ok?: boolean; error?: string; result?: T }; + +async function apiFetch<T>(path: string, init?: RequestInit): Promise<T> { + const url = `${apiBase()}${path}`; + let response: Response; + try { + response = await fetch(url, init); + } catch (error) { + throw new Error( + `Failed to reach Ode daemon at ${url}. Is the daemon running? (Try \`ode status\` / \`ode start\`.) ${String(error)}`, + ); + } + const payload = (await response.json().catch(() => ({}))) as ApiResponse<T>; + if (!response.ok || payload.ok === false) { + throw new Error(payload.error || `Request failed with status ${response.status}`); + } + if (payload.result === undefined) { + throw new Error("Empty response from Ode daemon"); + } + return payload.result; +} + +function parseIsoTime(value: string): number { + const trimmed = value.trim(); + if (!trimmed) throw new Error("--time is required"); + const ms = Date.parse(trimmed); + if (!Number.isFinite(ms)) { + throw new Error(`Invalid --time value: ${value} (expected ISO 8601, e.g. 2026-04-18T23:30:00+08:00)`); + } + return ms; +} + +function formatTimestamp(value: number | null | undefined): string { + if (!value || !Number.isFinite(value)) return "n/a"; + return new Date(value).toISOString(); +} + +function statusLabel(task: TaskRecord): string { + return task.status; +} + +function printTaskRow(task: TaskRecord): void { + const channel = task.channelName || task.channelId; + const workspace = task.workspaceName || task.workspaceId || "-"; + const thread = task.threadId ?? "(none)"; + console.log( + [ + task.id, + statusLabel(task).padEnd(9), + formatTimestamp(task.scheduledAt), + `${workspace}/${channel}`, + `thread=${thread}`, + task.title, + ].join(" "), + ); +} + +function printTaskDetail(task: TaskRecord): void { + console.log(`id: ${task.id}`); + console.log(`title: ${task.title}`); + console.log(`status: ${task.status}`); + console.log(`scheduledAt: ${formatTimestamp(task.scheduledAt)}`); + console.log(`platform: ${task.platform}`); + console.log(`workspace: ${task.workspaceName || task.workspaceId || "-"}`); + console.log(`channel: ${task.channelName || task.channelId} (${task.channelId})`); + console.log(`thread: ${task.threadId ?? "(none)"}`); + console.log(`agent: ${task.agent ?? "(default)"}`); + console.log(`triggeredAt: ${formatTimestamp(task.triggeredAt)}`); + console.log(`completedAt: ${formatTimestamp(task.completedAt)}`); + console.log(`createdAt: ${formatTimestamp(task.createdAt)}`); + console.log(`updatedAt: ${formatTimestamp(task.updatedAt)}`); + if (task.lastError) { + console.log(`lastError: ${task.lastError}`); + } + console.log("--- message ---"); + console.log(task.messageText); +} + +function printTaskHelp(): void { + console.log( + [ + "ode task - one-time scheduled tasks", + "", + "Usage:", + " ode task create --time <ISO8601> --channel <channelId> --message <text> [--thread <threadId>] [--title <title>] [--agent <agentId>] [--run-now]", + " ode task list [--status <status>] [--json]", + " ode task show <id> [--json]", + " ode task cancel <id>", + " ode task delete <id>", + " ode task run <id>", + "", + "Notes:", + " --time accepts ISO 8601, e.g. 2026-04-18T23:30:00+08:00", + " --thread is optional. When set, the task reuses the thread's session; when omitted, it posts as a fresh channel message.", + " --channel accepts either a raw channel id or a \"workspaceId::channelId\" value.", + ].join("\n"), + ); +} + +async function handleCreate(args: CliArgs): Promise<void> { + const { flags } = parseFlags(args, { + time: true, + channel: true, + thread: true, + message: true, + title: true, + agent: true, + "run-now": false, + }); + + const timeRaw = flags.time as string | undefined; + const channel = flags.channel as string | undefined; + const message = flags.message as string | undefined; + const threadId = (flags.thread as string | undefined) ?? null; + const agent = (flags.agent as string | undefined) ?? null; + const runNow = flags["run-now"] === true; + + if (!timeRaw) throw new Error("--time is required"); + if (!channel) throw new Error("--channel is required"); + if (!message) throw new Error("--message is required"); + + const scheduledAt = parseIsoTime(timeRaw); + const title = (flags.title as string | undefined)?.trim() + || message.split("\n")[0]!.slice(0, 80) + || "Scheduled task"; + + const result = await apiFetch<{ task: TaskRecord }>("/api/tasks", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + title, + scheduledAt, + channelId: channel, + threadId, + messageText: message, + agent, + runImmediately: runNow, + }), + }); + printTaskDetail(result.task); +} + +async function handleList(args: CliArgs): Promise<void> { + const { flags } = parseFlags(args, { status: true, json: false }); + const result = await apiFetch<{ tasks: TaskRecord[] }>("/api/tasks"); + let tasks = result.tasks; + const statusFilter = flags.status as string | undefined; + if (statusFilter) { + tasks = tasks.filter((task) => task.status === statusFilter); + } + if (flags.json) { + console.log(JSON.stringify(tasks, null, 2)); + return; + } + if (tasks.length === 0) { + console.log("No tasks."); + return; + } + console.log(`id status scheduled_at workspace/channel thread title`); + console.log("-".repeat(80)); + for (const task of tasks) { + printTaskRow(task); + } +} + +async function handleShow(args: CliArgs): Promise<void> { + const { flags, positional } = parseFlags(args, { json: false }); + const id = positional[0]; + if (!id) throw new Error("Task id is required: ode task show <id>"); + const result = await apiFetch<{ task: TaskRecord }>(`/api/tasks/${encodeURIComponent(id)}`); + if (flags.json) { + console.log(JSON.stringify(result.task, null, 2)); + return; + } + printTaskDetail(result.task); +} + +async function handleCancel(args: CliArgs): Promise<void> { + const { positional } = parseFlags(args, {}); + const id = positional[0]; + if (!id) throw new Error("Task id is required: ode task cancel <id>"); + const result = await apiFetch<{ task: TaskRecord }>(`/api/tasks/${encodeURIComponent(id)}/cancel`, { + method: "POST", + }); + console.log(`Task ${id} cancelled.`); + if (result.task) printTaskDetail(result.task); +} + +async function handleDelete(args: CliArgs): Promise<void> { + const { positional } = parseFlags(args, {}); + const id = positional[0]; + if (!id) throw new Error("Task id is required: ode task delete <id>"); + await apiFetch(`/api/tasks/${encodeURIComponent(id)}`, { method: "DELETE" }); + console.log(`Task ${id} deleted.`); +} + +async function handleRunNow(args: CliArgs): Promise<void> { + const { positional } = parseFlags(args, {}); + const id = positional[0]; + if (!id) throw new Error("Task id is required: ode task run <id>"); + await apiFetch(`/api/tasks/${encodeURIComponent(id)}/run`, { method: "POST" }); + console.log(`Task ${id} triggered.`); +} + +export async function handleTaskCommand(args: CliArgs): Promise<number> { + const sub = args[0]; + if (!sub || sub === "help" || sub === "--help" || sub === "-h") { + printTaskHelp(); + return 0; + } + try { + const rest = args.slice(1); + if (sub === "create") { + await handleCreate(rest); + return 0; + } + if (sub === "list" || sub === "ls") { + await handleList(rest); + return 0; + } + if (sub === "show" || sub === "get") { + await handleShow(rest); + return 0; + } + if (sub === "cancel") { + await handleCancel(rest); + return 0; + } + if (sub === "delete" || sub === "rm") { + await handleDelete(rest); + return 0; + } + if (sub === "run") { + await handleRunNow(rest); + return 0; + } + console.error(`Unknown task subcommand: ${sub}`); + printTaskHelp(); + return 1; + } catch (error) { + console.error(error instanceof Error ? error.message : String(error)); + return 1; + } +} diff --git a/packages/core/cli.ts b/packages/core/cli.ts index 4613325b..5b80dbdc 100644 --- a/packages/core/cli.ts +++ b/packages/core/cli.ts @@ -8,6 +8,7 @@ import { runDaemon } from "@/core/daemon/manager"; import { getDaemonLogPath } from "@/core/daemon/paths"; import { isProcessAlive, readDaemonState, type DaemonState } from "@/core/daemon/state"; import { runOnboarding } from "@/core/onboarding"; +import { handleTaskCommand } from "@/core/cli-handlers/task"; import { isInstalledBinary, performUpgrade } from "@/core/upgrade"; const rawArgs = process.argv.slice(2); @@ -45,6 +46,7 @@ function printHelp(): void { " ode onboard", " ode onboarding", " ode config", + " ode task <subcommand> # manage one-time scheduled tasks", " ode upgrade", " ode --version", "", @@ -56,6 +58,8 @@ function printHelp(): void { " ode restart", " ode stop", " ode onboard", + " ode task create --time 2026-04-19T09:00:00+08:00 --channel C123 --message \"check deploy\"", + " ode task list", " ode --foreground", " ODE_WEB_HOST=0.0.0.0 ode #run ode process and expose setting UI", ].join("\n"), @@ -488,6 +492,11 @@ if (command === "start") { process.exit(0); } +if (command === "task") { + const code = await handleTaskCommand(args.slice(1)); + process.exit(code); +} + if (foregroundRequested) { await import("./index"); await new Promise(() => {}); diff --git a/packages/core/index.ts b/packages/core/index.ts index 01cdada6..539ae9b1 100644 --- a/packages/core/index.ts +++ b/packages/core/index.ts @@ -33,6 +33,7 @@ import { markRuntimeReady, scheduleUpgradeRestart } from "@/core/daemon/control" import { checkForUpdate, isInstalledBinary, performUpgrade } from "./upgrade"; import { runOnboardingIfNeeded } from "./onboarding"; import { startCronJobScheduler, stopCronJobScheduler } from "@/core/cron/scheduler"; +import { startTaskScheduler, stopTaskScheduler } from "@/core/tasks/scheduler"; import { initSentry, shutdownSentry } from "@/core/observability/sentry"; import packageJson from "../../package.json" with { type: "json" }; @@ -283,6 +284,7 @@ async function main(): Promise<void> { await startDiscordRuntime("startup"); await startLarkRuntime("startup"); startCronJobScheduler(); + startTaskScheduler(); if (slackApps.length > 0) { log.debug("Slack app created"); @@ -302,6 +304,7 @@ async function main(): Promise<void> { try { stopCronJobScheduler(); + stopTaskScheduler(); stopOAuthServer(); await stopSlackRuntime("shutdown"); await stopDiscordRuntime("shutdown"); diff --git a/packages/core/tasks/scheduler.ts b/packages/core/tasks/scheduler.ts new file mode 100644 index 00000000..ad096618 --- /dev/null +++ b/packages/core/tasks/scheduler.ts @@ -0,0 +1,400 @@ +import { createAgentAdapter } from "@/agents/adapter"; +import type { OpenCodeMessageContext } from "@/agents"; +import { + getChannelBaseBranch, + getChannelModel, + getChannelSystemMessage, + getUserGeneralSettings, + resolveChannelCwd, +} from "@/config"; +import { + type TaskRecord, + getTaskById, + listDueTasks, + markTaskCompleted, + markTaskFailed, + markTaskTriggered, +} from "@/config/local/tasks"; +import { + buildThreadKey, + completeAgentResult, + ensureMessageThread, + failAgentResult, + recordUserPrompt, + startAgentResult, +} from "@/config/local/inbox"; +import { + loadSession, + saveSession, + type PersistedSession, +} from "@/config/local/sessions"; +import { buildMessageOptions } from "@/core/runtime/message-options"; +import { buildFinalResponseText, categorizeRuntimeError } from "@/core/runtime/helpers"; +import { buildSessionEnvironment, prepareSessionWorkspace } from "@/core/session"; +import { sendChannelMessage as sendDiscordChannelMessage } from "@/ims/discord/client"; +import { sendChannelMessage as sendLarkChannelMessage } from "@/ims/lark/client"; +import { + sendChannelMessage as sendSlackChannelMessage, + sendMessage as sendSlackThreadMessage, +} from "@/ims/slack/client"; +import { log } from "@/utils"; + +// --------------------------------------------------------------------------- +// One-time task scheduler. +// +// Closely mirrors `packages/core/cron/scheduler.ts`: a polling loop claims +// tasks atomically via SQL, fires the agent turn, and persists the result. +// The key differences vs. cron: +// - Tasks fire exactly once (status lifecycle: pending -> running -> +// success | failed | cancelled). +// - Tasks can anchor to an existing thread (reuse its session for +// continuity) or post as a fresh channel message with a synthetic +// threadId of `task:{id}`. +// --------------------------------------------------------------------------- + +const TASK_POLL_INTERVAL_MS = 10_000; + +let taskSchedulerTimer: ReturnType<typeof setInterval> | null = null; +const runningTaskIds = new Set<string>(); + +function getSyntheticThreadId(taskId: string): string { + return `task:${taskId}`; +} + +function getTaskUserId(taskId: string): string { + return `task:${taskId}`; +} + +function getTaskMessageId(task: TaskRecord): string { + // `triggeredAt` is set atomically in `markTaskTriggered` before the run + // begins; prefer it for a stable per-run id. Fall back to the creation + // time for manual early runs where `triggeredAt` may not yet be visible. + return String(task.triggeredAt ?? task.createdAt); +} + +function resolveTaskThreadId(task: TaskRecord): string { + return task.threadId ?? getSyntheticThreadId(task.id); +} + +function resolveInboxModelForTask( + task: TaskRecord, + options: ReturnType<typeof buildMessageOptions>, +): string | null { + const explicitModel = options?.model; + if (explicitModel?.providerID && explicitModel.modelID) { + return `${explicitModel.providerID}/${explicitModel.modelID}`; + } + const fallbackModel = getChannelModel(task.channelId)?.trim(); + return fallbackModel && fallbackModel.length > 0 ? fallbackModel : null; +} + +async function sendResultToChannel(task: TaskRecord, text: string): Promise<void> { + if (task.platform === "slack") { + // Slack is the only platform with a stable "reply in thread" helper; use + // it whenever the caller anchored the task to a real thread so the reply + // lands back in the conversation. Without a thread the task posts at + // the top of the channel. + if (task.threadId && task.threadId.trim().length > 0) { + await sendSlackThreadMessage(task.channelId, task.threadId, text); + return; + } + await sendSlackChannelMessage(task.channelId, text); + return; + } + if (task.platform === "discord") { + await sendDiscordChannelMessage(task.channelId, text); + return; + } + await sendLarkChannelMessage(task.channelId, text); +} + +function buildTaskAgentContext(task: TaskRecord): OpenCodeMessageContext { + const userId = getTaskUserId(task.id); + const threadId = resolveTaskThreadId(task); + return { + slack: { + platform: task.platform, + channelId: task.channelId, + threadId, + userId, + hasGitHubToken: false, + channelSystemMessage: getChannelSystemMessage(task.channelId) ?? undefined, + }, + }; +} + +async function prepareTaskSession(task: TaskRecord): Promise<{ + session: PersistedSession; + sessionId: string; + cwd: string; + created: boolean; + threadId: string; +}> { + const threadId = resolveTaskThreadId(task); + const userId = getTaskUserId(task.id); + const agent = createAgentAdapter(); + + let cwd = resolveChannelCwd(task.channelId).cwd; + let session = loadSession(task.channelId, threadId); + if (session?.workingDirectory) { + cwd = session.workingDirectory; + } + + const { env: sessionEnv, gitIdentity } = buildSessionEnvironment({ + threadOwnerUserId: session?.threadOwnerUserId ?? userId, + }); + + const { sessionId } = await agent.getOrCreateSession(task.channelId, threadId, cwd, sessionEnv); + const created = !session; + + // Only create a fresh worktree when we are starting a brand-new session. + // When a task reuses an existing thread's session, we inherit whatever + // worktree that thread already set up. + if (created && getUserGeneralSettings().gitStrategy === "worktree") { + const baseBranch = getChannelBaseBranch(task.channelId); + const prepared = await prepareSessionWorkspace({ + channelId: task.channelId, + threadId, + cwd, + worktreeId: `ode_task_${task.id.replace(/[^a-zA-Z0-9_-]/g, "_")}`, + baseBranch, + sessionEnv, + gitIdentity, + }); + cwd = prepared.cwd; + } + + if (!session) { + session = { + sessionId, + providerId: agent.getProviderForSession(sessionId), + platform: task.platform, + channelId: task.channelId, + threadId, + workingDirectory: cwd, + threadOwnerUserId: userId, + participantBotIds: ["task"], + createdAt: Date.now(), + lastActivityAt: Date.now(), + lastActivityBotId: "task", + }; + } else { + session.sessionId = sessionId; + session.providerId = agent.getProviderForSession(sessionId); + session.platform = task.platform; + session.workingDirectory = cwd; + session.lastActivityBotId = "task"; + // Preserve the original threadOwnerUserId when reusing a session so + // GitHub/git identity stay attached to the human owner rather than the + // synthetic task user. + } + + saveSession(session); + return { session, sessionId, cwd, created, threadId }; +} + +async function runTask(task: TaskRecord): Promise<void> { + const agent = createAgentAdapter(); + const taskMessageId = getTaskMessageId(task); + let agentResultDetailId: string | null = null; + let threadKey: string | null = null; + + try { + const { session, sessionId, cwd, threadId } = await prepareTaskSession(task); + threadKey = buildThreadKey(task.channelId, threadId); + const providerId = agent.getProviderForSession(sessionId); + const options = buildMessageOptions({ + text: task.messageText, + channelId: task.channelId, + providerId, + }); + const model = resolveInboxModelForTask(task, options); + + try { + ensureMessageThread({ + platform: task.platform, + channelId: task.channelId, + threadId, + replyThreadId: threadId, + sessionId, + providerId, + model, + workingDirectory: cwd, + threadOwnerUserId: session.threadOwnerUserId ?? getTaskUserId(task.id), + branchName: session.branchName, + sourceKind: "task", + context: { + sourceKind: "task", + taskId: task.id, + taskTitle: task.title, + }, + }); + recordUserPrompt({ + threadKey, + messageId: taskMessageId, + userId: getTaskUserId(task.id), + promptText: task.messageText, + context: { + taskId: task.id, + scheduledAt: task.scheduledAt, + triggeredAt: task.triggeredAt, + }, + }); + const detail = startAgentResult({ + threadKey, + requestMessageId: taskMessageId, + providerId, + model, + workingDirectory: cwd, + context: { + taskId: task.id, + scheduledAt: task.scheduledAt, + }, + }); + agentResultDetailId = detail.id; + } catch (error) { + log.warn("Failed to record task inbox message", { + taskId: task.id, + error: String(error), + }); + } + + const responses = await agent.sendMessage( + task.channelId, + sessionId, + task.messageText, + cwd, + options, + buildTaskAgentContext(task), + ); + const finalText = buildFinalResponseText(responses) ?? "_Done_"; + + await sendResultToChannel(task, finalText); + if (agentResultDetailId) { + try { + completeAgentResult({ + detailId: agentResultDetailId, + resultText: finalText, + providerId, + model, + workingDirectory: cwd, + }); + } catch (error) { + log.warn("Failed to complete task agent_result detail", { + detailId: agentResultDetailId, + error: String(error), + }); + } + } + markTaskCompleted(task.id); + } catch (error) { + const { message } = categorizeRuntimeError(error); + if (agentResultDetailId) { + try { + failAgentResult({ + detailId: agentResultDetailId, + errorText: message, + }); + } catch (failError) { + log.warn("Failed to mark task agent_result detail as failed", { + detailId: agentResultDetailId, + error: String(failError), + }); + } + } + markTaskFailed(task.id, message); + log.warn("Task execution failed", { + taskId: task.id, + title: task.title, + channelId: task.channelId, + error: String(error), + }); + } +} + +async function tickTasks(): Promise<void> { + const now = Date.now(); + const due = listDueTasks(now); + for (const task of due) { + if (runningTaskIds.has(task.id)) continue; + if (!markTaskTriggered(task.id)) continue; + runningTaskIds.add(task.id); + // Re-read after the atomic claim so `triggeredAt` is populated for the + // inbox message id. + const claimed = getTaskById(task.id) ?? task; + void runTask(claimed).finally(() => { + runningTaskIds.delete(task.id); + }); + } +} + +export function startTaskScheduler(): void { + if (taskSchedulerTimer) return; + void tickTasks(); + taskSchedulerTimer = setInterval(() => { + void tickTasks(); + }, TASK_POLL_INTERVAL_MS); + log.debug("Task scheduler started", { intervalMs: TASK_POLL_INTERVAL_MS }); +} + +export function stopTaskScheduler(): void { + if (!taskSchedulerTimer) return; + clearInterval(taskSchedulerTimer); + taskSchedulerTimer = null; + runningTaskIds.clear(); + log.debug("Task scheduler stopped"); +} + +export class TaskAlreadyRunningError extends Error { + constructor(taskId: string) { + super(`Task ${taskId} is already running`); + this.name = "TaskAlreadyRunningError"; + } +} + +export class TaskNotFoundError extends Error { + constructor(taskId: string) { + super(`Task ${taskId} not found`); + this.name = "TaskNotFoundError"; + } +} + +export class TaskNotPendingError extends Error { + constructor(taskId: string, status: string) { + super(`Task ${taskId} is not pending (status: ${status})`); + this.name = "TaskNotPendingError"; + } +} + +/** + * Trigger a task immediately, bypassing the scheduled time. The task must + * still be in `pending` status — finished, failed, and cancelled tasks are + * terminal. The in-process `runningTaskIds` guard prevents duplicate runs. + */ +export function beginTriggerTaskNow(taskId: string): Promise<void> { + const task = getTaskById(taskId); + if (!task) { + throw new TaskNotFoundError(taskId); + } + if (runningTaskIds.has(task.id)) { + throw new TaskAlreadyRunningError(task.id); + } + if (task.status !== "pending") { + throw new TaskNotPendingError(task.id, task.status); + } + if (!markTaskTriggered(task.id)) { + // Lost the race to the polling loop. Surface the same error shape as the + // in-process check so HTTP handlers can 409 uniformly. + throw new TaskAlreadyRunningError(task.id); + } + runningTaskIds.add(task.id); + const claimed = getTaskById(task.id) ?? task; + const runPromise = runTask(claimed).finally(() => { + runningTaskIds.delete(task.id); + }); + return runPromise; +} + +export async function triggerTaskNow(taskId: string): Promise<void> { + await beginTriggerTaskNow(taskId); +} diff --git a/packages/core/web/app.ts b/packages/core/web/app.ts index 02e8b815..890e422c 100644 --- a/packages/core/web/app.ts +++ b/packages/core/web/app.ts @@ -8,6 +8,7 @@ import { registerSessionRoutes } from "./routes/sessions"; import { registerActionRoutes } from "./routes/action"; import { registerInboxRoutes } from "./routes/inbox"; import { registerCronJobRoutes } from "./routes/cron-jobs"; +import { registerTaskRoutes } from "./routes/tasks"; export function createWebApp(): Elysia { const app = new Elysia(); @@ -34,6 +35,7 @@ export function createWebApp(): Elysia { registerActionRoutes(app); registerInboxRoutes(app); registerCronJobRoutes(app); + registerTaskRoutes(app); app.all("*", async ({ request }: { request: Request }) => { return serveStaticAsset(request); diff --git a/packages/core/web/routes/tasks.ts b/packages/core/web/routes/tasks.ts new file mode 100644 index 00000000..cb696561 --- /dev/null +++ b/packages/core/web/routes/tasks.ts @@ -0,0 +1,270 @@ +import type { Elysia } from "elysia"; +import { + createTask, + deleteTask, + cancelTask, + getTaskById, + listTaskChannelOptions, + listTasks, + updateTask, + type CreateTaskParams, + type UpdateTaskParams, +} from "@/config/local/tasks"; +import { + TaskAlreadyRunningError, + TaskNotFoundError, + TaskNotPendingError, + beginTriggerTaskNow, +} from "@/core/tasks/scheduler"; +import { log } from "@/utils"; +import { jsonResponse, readJsonBody, runRoute } from "../http"; + +function getString(payload: Record<string, unknown>, key: string): string { + const value = payload[key]; + return typeof value === "string" ? value : ""; +} + +function getOptionalString(payload: Record<string, unknown>, key: string): string | null | undefined { + if (!(key in payload)) return undefined; + const value = payload[key]; + if (value === null) return null; + if (typeof value === "string") return value; + return undefined; +} + +function getNumber(payload: Record<string, unknown>, key: string): number | undefined { + const value = payload[key]; + if (typeof value === "number" && Number.isFinite(value)) return value; + if (typeof value === "string") { + const parsed = Number(value); + if (Number.isFinite(parsed)) return parsed; + } + return undefined; +} + +function getBoolean(payload: Record<string, unknown>, key: string): boolean | undefined { + const value = payload[key]; + return typeof value === "boolean" ? value : undefined; +} + +function parseCreateTaskPayload(payload: Record<string, unknown>): CreateTaskParams { + const scheduledAt = getNumber(payload, "scheduledAt"); + if (scheduledAt === undefined) { + throw new Error("scheduledAt is required"); + } + const threadIdRaw = getOptionalString(payload, "threadId"); + const agentRaw = getOptionalString(payload, "agent"); + return { + title: getString(payload, "title"), + scheduledAt, + channelId: getString(payload, "channelId"), + threadId: threadIdRaw === undefined ? null : threadIdRaw, + messageText: getString(payload, "messageText"), + agent: agentRaw === undefined ? null : agentRaw, + }; +} + +function parseUpdateTaskPayload(payload: Record<string, unknown>): UpdateTaskParams { + const update: UpdateTaskParams = {}; + if ("title" in payload) update.title = getString(payload, "title"); + if ("scheduledAt" in payload) { + const scheduledAt = getNumber(payload, "scheduledAt"); + if (scheduledAt !== undefined) update.scheduledAt = scheduledAt; + } + if ("channelId" in payload) update.channelId = getString(payload, "channelId"); + if ("threadId" in payload) update.threadId = getOptionalString(payload, "threadId") ?? null; + if ("messageText" in payload) update.messageText = getString(payload, "messageText"); + if ("agent" in payload) update.agent = getOptionalString(payload, "agent") ?? null; + return update; +} + +export function registerTaskRoutes(app: Elysia): void { + app.get("/api/tasks", async () => { + return runRoute( + async () => ({ + tasks: listTasks(), + channels: listTaskChannelOptions(), + }), + (result) => jsonResponse(200, { ok: true, result }), + { fallbackMessage: "Internal server error", status: 500 }, + ); + }); + + app.get("/api/tasks/:id", async ({ params }: { params: { id?: string } }) => { + return runRoute( + async () => { + const id = params.id?.trim(); + if (!id) throw new Error("Missing task id"); + const task = getTaskById(id); + if (!task) throw new Error("Task not found"); + return { task }; + }, + (result) => jsonResponse(200, { ok: true, result }), + { + fallbackMessage: "Failed to load task", + resolveStatus: (message) => { + if (message === "Missing task id") return 400; + if (message === "Task not found") return 404; + return 500; + }, + }, + ); + }); + + app.post("/api/tasks", async ({ request }: { request: Request }) => { + return runRoute( + async () => { + const body = await readJsonBody(request); + const payload = parseCreateTaskPayload(body); + const runImmediately = getBoolean(body, "runImmediately") === true; + const task = createTask(payload); + + if (runImmediately) { + try { + const runPromise = beginTriggerTaskNow(task.id); + runPromise.catch((error) => { + log.warn("Immediate task run after create failed", { + taskId: task.id, + error: String(error), + }); + }); + } catch (error) { + log.warn("Unable to start immediate task run after create", { + taskId: task.id, + error: String(error), + }); + } + } + + return { + task, + tasks: listTasks(), + channels: listTaskChannelOptions(), + }; + }, + (result) => jsonResponse(200, { ok: true, result }), + { fallbackMessage: "Invalid task payload", status: 400 }, + ); + }); + + app.put("/api/tasks/:id", async ({ params, request }: { params: { id?: string }; request: Request }) => { + return runRoute( + async () => { + const id = params.id?.trim(); + if (!id) throw new Error("Missing task id"); + const payload = parseUpdateTaskPayload(await readJsonBody(request)); + const task = updateTask(id, payload); + return { + task, + tasks: listTasks(), + channels: listTaskChannelOptions(), + }; + }, + (result) => jsonResponse(200, { ok: true, result }), + { + fallbackMessage: "Invalid task payload", + resolveStatus: (message) => { + if (message === "Missing task id") return 400; + if (message === "Task not found") return 404; + if (message === "Only pending tasks can be updated") return 409; + return 400; + }, + }, + ); + }); + + app.delete("/api/tasks/:id", async ({ params }: { params: { id?: string } }) => { + return runRoute( + async () => { + const id = params.id?.trim(); + if (!id) throw new Error("Missing task id"); + deleteTask(id); + return { + tasks: listTasks(), + channels: listTaskChannelOptions(), + }; + }, + (result) => jsonResponse(200, { ok: true, result }), + { + fallbackMessage: "Failed to delete task", + resolveStatus: (message) => (message === "Missing task id" ? 400 : 400), + }, + ); + }); + + app.post("/api/tasks/:id/cancel", async ({ params }: { params: { id?: string } }) => { + return runRoute( + async () => { + const id = params.id?.trim(); + if (!id) throw new Error("Missing task id"); + const existing = getTaskById(id); + if (!existing) throw new Error("Task not found"); + const cancelled = cancelTask(id); + if (!cancelled) { + // Either already cancelled / completed / running. Surface the + // current status so the UI can show a sensible message. + throw new Error(`Task is not pending (status: ${existing.status})`); + } + return { + task: getTaskById(id), + tasks: listTasks(), + channels: listTaskChannelOptions(), + }; + }, + (result) => jsonResponse(200, { ok: true, result }), + { + fallbackMessage: "Failed to cancel task", + resolveStatus: (message) => { + if (message === "Missing task id") return 400; + if (message === "Task not found") return 404; + if (message.startsWith("Task is not pending")) return 409; + return 500; + }, + }, + ); + }); + + app.post("/api/tasks/:id/run", async ({ params }: { params: { id?: string } }) => { + return runRoute( + async () => { + const id = params.id?.trim(); + if (!id) throw new Error("Missing task id"); + try { + const runPromise = beginTriggerTaskNow(id); + runPromise.catch((error) => { + log.warn("Manually triggered task run failed", { + taskId: id, + error: String(error), + }); + }); + } catch (error) { + if (error instanceof TaskAlreadyRunningError) { + throw new Error("Task already running"); + } + if (error instanceof TaskNotFoundError) { + throw new Error("Task not found"); + } + if (error instanceof TaskNotPendingError) { + throw new Error(error.message); + } + throw error; + } + return { + tasks: listTasks(), + channels: listTaskChannelOptions(), + }; + }, + (result) => jsonResponse(200, { ok: true, result }), + { + fallbackMessage: "Failed to run task", + resolveStatus: (message) => { + if (message === "Missing task id") return 400; + if (message === "Task not found") return 404; + if (message === "Task already running") return 409; + if (message.startsWith("Task ") && message.includes("is not pending")) return 409; + return 500; + }, + }, + ); + }); +} diff --git a/packages/web-ui/src/routes/(settings)/+layout.svelte b/packages/web-ui/src/routes/(settings)/+layout.svelte index dccf7d77..fa4fc646 100644 --- a/packages/web-ui/src/routes/(settings)/+layout.svelte +++ b/packages/web-ui/src/routes/(settings)/+layout.svelte @@ -13,16 +13,18 @@ const pathname = $derived($page.url.pathname); const normalizedPathname = $derived(pathname.endsWith("/") && pathname.length > 1 ? pathname.slice(0, -1) : pathname); - const activeSection = $derived.by<"general" | "agents" | "inbox" | "cronJobs" | "workspace">(() => + const activeSection = $derived.by<"general" | "agents" | "inbox" | "cronJobs" | "tasks" | "workspace">(() => normalizedPathname === "/agents" ? "agents" : normalizedPathname === "/inbox" ? "inbox" : normalizedPathname === "/cron-jobs" ? "cronJobs" - : normalizedPathname.startsWith("/workspace") - ? "workspace" - : "general" + : normalizedPathname === "/tasks" + ? "tasks" + : normalizedPathname.startsWith("/workspace") + ? "workspace" + : "general" ); let pendingWorkspaceType = $state<"slack" | "discord" | "lark">("slack"); let pendingSlackAppToken = $state(""); @@ -215,6 +217,13 @@ > {t("Cron Jobs", "定时任务")} </Button> + <Button + variant={activeSection === "tasks" ? "default" : "secondary"} + className="w-full justify-start" + on:click={() => goto("/tasks")} + > + {t("Tasks", "一次性任务")} + </Button> </div> </Card> diff --git a/packages/web-ui/src/routes/(settings)/tasks/+page.svelte b/packages/web-ui/src/routes/(settings)/tasks/+page.svelte new file mode 100644 index 00000000..a7e8a5ee --- /dev/null +++ b/packages/web-ui/src/routes/(settings)/tasks/+page.svelte @@ -0,0 +1,612 @@ +<script lang="ts"> + import { onMount } from "svelte"; + import { Ban, CalendarClock, Pencil, Play, Plus, RefreshCw, Trash2 } from "lucide-svelte"; + import { Badge, Button, Card, Input, Label, Select, Textarea } from "$lib/components/ui"; + import { locale } from "$lib/i18n"; + + type TaskPlatform = "slack" | "discord" | "lark"; + type TaskStatus = "pending" | "running" | "success" | "failed" | "cancelled"; + + type TaskRecord = { + id: string; + title: string; + scheduledAt: number; + platform: TaskPlatform; + workspaceId: string | null; + workspaceName: string | null; + channelId: string; + channelName: string | null; + threadId: string | null; + messageText: string; + agent: string | null; + status: TaskStatus; + lastError: string | null; + triggeredAt: number | null; + completedAt: number | null; + createdAt: number; + updatedAt: number; + }; + + type TaskChannelOption = { + value: string; + platform: TaskPlatform; + workspaceId: string; + workspaceName: string; + channelId: string; + channelName: string; + label: string; + }; + + type TaskPayload = { + tasks: TaskRecord[]; + channels: TaskChannelOption[]; + }; + + let tasks = $state<TaskRecord[]>([]); + let channels = $state<TaskChannelOption[]>([]); + let isLoading = $state(false); + let isSaving = $state(false); + let message = $state(""); + + let editingTaskId = $state<string | null>(null); + let formTitle = $state(""); + let formChannelId = $state(""); + let formThreadId = $state(""); + let formMessageText = $state(""); + let formAgent = $state(""); + // datetime-local input value, local time (no timezone). + let formScheduledLocal = $state(""); + let formRunImmediately = $state(false); + let runningTaskIds = $state<Set<string>>(new Set()); + + function t(en: string, zh: string): string { + return $locale === "zh-CN" ? zh : en; + } + + function formatTimestamp(value: number | null | undefined): string { + if (!value || !Number.isFinite(value)) return t("n/a", "无"); + return new Date(value).toLocaleString($locale === "zh-CN" ? "zh-CN" : "en-US"); + } + + function toLocalDatetimeInput(epochMs: number): string { + const d = new Date(epochMs); + const pad = (n: number) => String(n).padStart(2, "0"); + return `${d.getFullYear()}-${pad(d.getMonth() + 1)}-${pad(d.getDate())}T${pad(d.getHours())}:${pad(d.getMinutes())}`; + } + + function fromLocalDatetimeInput(value: string): number | null { + if (!value) return null; + const parsed = Date.parse(value); + if (!Number.isFinite(parsed)) return null; + return parsed; + } + + function getStatusVariant(status: TaskStatus): "secondary" | "success" | "destructive" | "outline" { + if (status === "success") return "success"; + if (status === "failed") return "destructive"; + if (status === "cancelled") return "outline"; + return "secondary"; + } + + function getStatusLabel(status: TaskStatus): string { + if (status === "pending") return t("Pending", "待执行"); + if (status === "running") return t("Running", "运行中"); + if (status === "success") return t("Success", "成功"); + if (status === "failed") return t("Failed", "失败"); + return t("Cancelled", "已取消"); + } + + function getChannelLabel(task: TaskRecord): string { + const workspace = task.workspaceName || task.workspaceId || t("Unknown workspace", "未知工作区"); + const channel = task.channelName || task.channelId; + return `${workspace} / ${channel}`; + } + + function findChannelFormValue(task: TaskRecord): string { + const exactMatch = channels.find((channel) => + channel.channelId === task.channelId + && channel.workspaceId === (task.workspaceId || channel.workspaceId) + && channel.platform === task.platform + ); + if (exactMatch) return exactMatch.value; + const fallbackMatch = channels.find((channel) => channel.channelId === task.channelId); + return fallbackMatch?.value ?? task.channelId; + } + + function applyPayload(payload: TaskPayload): void { + tasks = payload.tasks; + channels = payload.channels; + + const hasSelectedChannel = channels.some((channel) => channel.value === formChannelId); + if (!hasSelectedChannel) { + formChannelId = channels[0]?.value ?? ""; + } + } + + function resetForm(): void { + editingTaskId = null; + formTitle = ""; + formChannelId = channels[0]?.value ?? ""; + formThreadId = ""; + formMessageText = ""; + formAgent = ""; + formRunImmediately = false; + // Default to 15 minutes from now. + formScheduledLocal = toLocalDatetimeInput(Date.now() + 15 * 60 * 1000); + } + + function startEdit(task: TaskRecord): void { + editingTaskId = task.id; + formTitle = task.title; + formChannelId = findChannelFormValue(task); + formThreadId = task.threadId ?? ""; + formMessageText = task.messageText; + formAgent = task.agent ?? ""; + formScheduledLocal = toLocalDatetimeInput(task.scheduledAt); + formRunImmediately = false; + message = ""; + if (typeof window !== "undefined") { + window.scrollTo({ top: 0, behavior: "smooth" }); + } + } + + async function loadTasks(): Promise<void> { + isLoading = true; + message = ""; + try { + const response = await fetch("/api/tasks"); + const payload = (await response.json()) as { + ok?: boolean; + error?: string; + result?: TaskPayload; + }; + if (!response.ok || !payload.ok || !payload.result) { + throw new Error(payload.error || "Failed to load tasks"); + } + applyPayload(payload.result); + if (!editingTaskId && !formChannelId && payload.result.channels.length > 0) { + formChannelId = payload.result.channels[0]!.value; + } + } catch (error) { + message = `Tasks load failed: ${error instanceof Error ? error.message : String(error)}`; + } finally { + isLoading = false; + } + } + + async function saveTask(): Promise<void> { + if (channels.length === 0) { + message = t("Add a workspace channel first before creating tasks.", "请先添加工作区和频道,再创建一次性任务。"); + return; + } + const scheduledAt = fromLocalDatetimeInput(formScheduledLocal); + if (scheduledAt === null) { + message = t("Please choose a valid scheduled time.", "请选择有效的执行时间。"); + return; + } + + isSaving = true; + message = ""; + try { + const isCreate = !editingTaskId; + const body: Record<string, unknown> = { + title: formTitle, + scheduledAt, + channelId: formChannelId, + threadId: formThreadId.trim() || null, + messageText: formMessageText, + agent: formAgent.trim() || null, + }; + if (isCreate && formRunImmediately) { + body.runImmediately = true; + } + const response = await fetch(editingTaskId ? `/api/tasks/${encodeURIComponent(editingTaskId)}` : "/api/tasks", { + method: editingTaskId ? "PUT" : "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(body), + }); + const payload = (await response.json()) as { + ok?: boolean; + error?: string; + result?: TaskPayload; + }; + if (!response.ok || !payload.ok || !payload.result) { + throw new Error(payload.error || "Failed to save task"); + } + applyPayload(payload.result); + message = isCreate + ? (formRunImmediately + ? t("Task created and triggered.", "任务已创建并立即执行。") + : t("Task created.", "任务已创建。")) + : t("Task updated.", "任务已更新。"); + resetForm(); + } catch (error) { + message = `Task save failed: ${error instanceof Error ? error.message : String(error)}`; + } finally { + isSaving = false; + } + } + + async function runTaskNow(task: TaskRecord): Promise<void> { + if (runningTaskIds.has(task.id)) return; + runningTaskIds = new Set(runningTaskIds).add(task.id); + message = ""; + try { + const response = await fetch(`/api/tasks/${encodeURIComponent(task.id)}/run`, { + method: "POST", + }); + const payload = (await response.json()) as { + ok?: boolean; + error?: string; + result?: TaskPayload; + }; + if (!response.ok || !payload.ok || !payload.result) { + if (response.status === 409) { + message = payload.error ?? t("This task is not runnable.", "此任务当前不可运行。"); + } else { + throw new Error(payload.error || "Failed to run task"); + } + } else { + applyPayload(payload.result); + message = t("Task triggered.", "任务已触发。"); + } + } catch (error) { + message = `Task run failed: ${error instanceof Error ? error.message : String(error)}`; + } finally { + const next = new Set(runningTaskIds); + next.delete(task.id); + runningTaskIds = next; + } + } + + async function cancelTaskById(task: TaskRecord): Promise<void> { + const confirmText = $locale === "zh-CN" + ? `确认取消任务「${task.title}」?` + : `Cancel task '${task.title}'?`; + if (!window.confirm(confirmText)) return; + + isSaving = true; + message = ""; + try { + const response = await fetch(`/api/tasks/${encodeURIComponent(task.id)}/cancel`, { + method: "POST", + }); + const payload = (await response.json()) as { + ok?: boolean; + error?: string; + result?: TaskPayload; + }; + if (!response.ok || !payload.ok || !payload.result) { + throw new Error(payload.error || "Failed to cancel task"); + } + applyPayload(payload.result); + message = t("Task cancelled.", "任务已取消。"); + } catch (error) { + message = `Task cancel failed: ${error instanceof Error ? error.message : String(error)}`; + } finally { + isSaving = false; + } + } + + async function removeTask(task: TaskRecord): Promise<void> { + const confirmText = $locale === "zh-CN" + ? `确认删除任务「${task.title}」?` + : `Delete task '${task.title}'?`; + if (!window.confirm(confirmText)) return; + + isSaving = true; + message = ""; + try { + const response = await fetch(`/api/tasks/${encodeURIComponent(task.id)}`, { + method: "DELETE", + }); + const payload = (await response.json()) as { + ok?: boolean; + error?: string; + result?: TaskPayload; + }; + if (!response.ok || !payload.ok || !payload.result) { + throw new Error(payload.error || "Failed to delete task"); + } + applyPayload(payload.result); + if (editingTaskId === task.id) { + resetForm(); + } + message = t("Task deleted.", "任务已删除。"); + } catch (error) { + message = `Task delete failed: ${error instanceof Error ? error.message : String(error)}`; + } finally { + isSaving = false; + } + } + + onMount(() => { + // Default scheduled time = 15 minutes from now, unless overridden. + if (!formScheduledLocal) { + formScheduledLocal = toLocalDatetimeInput(Date.now() + 15 * 60 * 1000); + } + void loadTasks(); + }); +</script> + +<Card className="p-5"> + <div class="mb-4 flex flex-wrap items-center justify-between gap-3"> + <div class="flex items-center gap-2"> + <CalendarClock class="h-4 w-4 text-[hsl(var(--muted-foreground))]" /> + <h2 class="text-lg font-semibold">{t("One-time Tasks", "一次性任务")}</h2> + </div> + + <div class="flex items-center gap-2"> + <Button + variant="outline" + on:click={resetForm} + disabled={isSaving} + > + <Plus class="h-4 w-4" /> + {t("New Task", "新建任务")} + </Button> + <Button + variant="outline" + on:click={() => void loadTasks()} + disabled={isLoading} + > + <RefreshCw class="h-4 w-4" /> + {isLoading ? t("Loading...", "加载中...") : t("Refresh", "刷新")} + </Button> + </div> + </div> + + <div class="space-y-6"> + <!-- Create / edit form --> + <div class="rounded-lg border p-4"> + <div class="mb-4"> + <p class="text-sm font-medium"> + {editingTaskId ? t("Edit Task", "编辑任务") : t("Create Task", "创建任务")} + </p> + <p class="mt-1 text-xs text-[hsl(var(--muted-foreground))]"> + {t( + "Tasks fire once at the scheduled time. When a thread is set, the task reuses that thread's session to keep context; otherwise it posts as a fresh channel message.", + "任务会在设定的时间一次性触发。如填写 Thread,则会复用该 Thread 的会话保留上下文;否则作为新的频道消息发送。", + )} + </p> + </div> + + <div class="space-y-4"> + <div class="space-y-2"> + <Label for="task-title">{t("Title", "标题")}</Label> + <Input + id="task-title" + value={formTitle} + placeholder={t("Check deploy status after 1 hour", "1 小时后检查部署状态")} + on:input={(event) => { + formTitle = (event.currentTarget as HTMLInputElement).value; + }} + /> + </div> + + <div class="space-y-2"> + <Label for="task-time">{t("Scheduled time", "执行时间")}</Label> + <Input + id="task-time" + type="datetime-local" + value={formScheduledLocal} + on:input={(event) => { + formScheduledLocal = (event.currentTarget as HTMLInputElement).value; + }} + /> + <p class="text-xs text-[hsl(var(--muted-foreground))]"> + {t("Uses your local timezone.", "使用你所在时区。")} + </p> + </div> + + <div class="space-y-2"> + <Label for="task-channel">{t("Channel", "频道")}</Label> + <Select id="task-channel" bind:value={formChannelId} disabled={channels.length === 0}> + {#if channels.length === 0} + <option value="">{t("No available channels", "暂无可用频道")}</option> + {:else} + {#each channels as channel} + <option value={channel.value}> + {channel.label} ({channel.platform}) + </option> + {/each} + {/if} + </Select> + </div> + + <div class="space-y-2"> + <Label for="task-thread">{t("Thread (optional)", "Thread(可选)")}</Label> + <Input + id="task-thread" + value={formThreadId} + placeholder={t("Leave empty to post as a new channel message", "留空则作为新频道消息发送")} + on:input={(event) => { + formThreadId = (event.currentTarget as HTMLInputElement).value; + }} + /> + <p class="text-xs text-[hsl(var(--muted-foreground))]"> + {t( + "When set, the task reuses the existing thread's session. Only Slack supports replying inside a thread; Discord and Lark will still post as a new channel message.", + "设置 Thread 会复用该 Thread 已有的会话。仅 Slack 支持 thread 内回复,Discord / Lark 仍会作为新消息发到频道。", + )} + </p> + </div> + + <div class="space-y-2"> + <Label for="task-agent">{t("Agent (optional)", "Agent(可选)")}</Label> + <Input + id="task-agent" + value={formAgent} + placeholder={t("e.g. opencode, claudecode, codex", "例如 opencode / claudecode / codex")} + on:input={(event) => { + formAgent = (event.currentTarget as HTMLInputElement).value; + }} + /> + <p class="text-xs text-[hsl(var(--muted-foreground))]"> + {t( + "Leave empty to use the channel's default agent.", + "留空则使用频道默认的 agent。", + )} + </p> + </div> + + <div class="space-y-2"> + <Label for="task-message">{t("Message", "消息")}</Label> + <Textarea + id="task-message" + className="min-h-[160px]" + value={formMessageText} + placeholder={t( + "Run `gh pr list --state open` and summarize blockers.", + "运行 `gh pr list --state open`,总结 blocker。", + )} + on:input={(event) => { + formMessageText = (event.currentTarget as HTMLTextAreaElement).value; + }} + /> + </div> + + {#if !editingTaskId} + <label class="flex items-center gap-2 text-sm"> + <input + type="checkbox" + checked={formRunImmediately} + onchange={(event) => { + formRunImmediately = (event.currentTarget as HTMLInputElement).checked; + }} + /> + <span>{t("Also run this task once right after creating", "创建后立即执行一次")}</span> + </label> + {/if} + + <div class="flex flex-wrap items-center gap-2"> + <Button + on:click={() => void saveTask()} + disabled={isSaving || channels.length === 0} + > + {isSaving + ? t("Saving...", "保存中...") + : editingTaskId + ? t("Update Task", "更新任务") + : t("Create Task", "创建任务")} + </Button> + {#if editingTaskId} + <Button + variant="outline" + on:click={resetForm} + disabled={isSaving} + > + {t("Cancel Edit", "取消编辑")} + </Button> + {/if} + </div> + </div> + </div> + + <!-- Task list --> + <div class="rounded-lg border p-4"> + <div class="mb-3 flex items-center justify-between gap-2"> + <p class="text-sm font-medium">{t("Scheduled Tasks", "任务列表")}</p> + <Badge variant="outline">{tasks.length} {t("tasks", "个任务")}</Badge> + </div> + + {#if isLoading && tasks.length === 0} + <p class="text-sm text-[hsl(var(--muted-foreground))]">{t("Loading tasks...", "正在加载任务...")}</p> + {:else if tasks.length === 0} + <p class="text-sm text-[hsl(var(--muted-foreground))]">{t("No tasks yet.", "还没有任务。")}</p> + {:else} + <div class="space-y-3"> + {#each tasks as task} + <div class="rounded-lg border p-4"> + <div class="mb-3 flex flex-wrap items-start justify-between gap-3"> + <div class="space-y-2"> + <div class="flex flex-wrap items-center gap-2"> + <Badge variant={getStatusVariant(task.status)}>{getStatusLabel(task.status)}</Badge> + <Badge variant="outline">{task.platform}</Badge> + {#if task.agent} + <Badge variant="outline">{task.agent}</Badge> + {/if} + </div> + <p class="text-sm font-medium">{task.title}</p> + <p class="text-xs text-[hsl(var(--muted-foreground))]"> + {t("Scheduled", "计划时间")}: {formatTimestamp(task.scheduledAt)} + </p> + <p class="text-xs text-[hsl(var(--muted-foreground))]">{getChannelLabel(task)}</p> + {#if task.threadId} + <p class="text-xs text-[hsl(var(--muted-foreground))]"> + {t("Thread", "Thread")}: <code>{task.threadId}</code> + </p> + {/if} + </div> + + <div class="flex flex-wrap items-center gap-2"> + {#if task.status === "pending"} + <Button + variant="outline" + size="sm" + on:click={() => void runTaskNow(task)} + disabled={isSaving || runningTaskIds.has(task.id)} + > + <Play class="h-3.5 w-3.5" /> + {runningTaskIds.has(task.id) + ? t("Running...", "运行中...") + : t("Run Now", "立即执行")} + </Button> + <Button + variant="outline" + size="sm" + on:click={() => startEdit(task)} + > + <Pencil class="h-3.5 w-3.5" /> + {t("Edit", "编辑")} + </Button> + <Button + variant="outline" + size="sm" + on:click={() => void cancelTaskById(task)} + disabled={isSaving} + > + <Ban class="h-3.5 w-3.5" /> + {t("Cancel", "取消")} + </Button> + {/if} + <Button + variant="outline" + size="sm" + on:click={() => void removeTask(task)} + disabled={isSaving} + > + <Trash2 class="h-3.5 w-3.5" /> + {t("Delete", "删除")} + </Button> + </div> + </div> + + <div class="rounded-md bg-[hsl(var(--muted)/0.4)] p-3"> + <p class="mb-1 text-xs font-medium uppercase tracking-wide text-[hsl(var(--muted-foreground))]">{t("Message", "消息内容")}</p> + <p class="text-sm leading-6 whitespace-pre-wrap">{task.messageText}</p> + </div> + + <div class="mt-3 grid gap-2 text-xs text-[hsl(var(--muted-foreground))] md:grid-cols-2"> + <p>{t("Triggered", "触发时间")}: {formatTimestamp(task.triggeredAt)}</p> + <p>{t("Completed", "完成时间")}: {formatTimestamp(task.completedAt)}</p> + <p>{t("Created", "创建于")}: {formatTimestamp(task.createdAt)}</p> + <p>{t("Updated", "更新于")}: {formatTimestamp(task.updatedAt)}</p> + </div> + + {#if task.lastError} + <div class="mt-3 rounded-md border border-[hsl(var(--destructive)/0.35)] bg-[hsl(var(--destructive)/0.08)] p-3 text-sm text-[hsl(var(--destructive))]"> + {task.lastError} + </div> + {/if} + </div> + {/each} + </div> + {/if} + </div> + </div> + + {#if message} + <p class="mt-4 text-sm text-[hsl(var(--muted-foreground))]">{message}</p> + {/if} +</Card> From b407ef19ef34d4baa10d1ccfeac17a58b4e9bee1 Mon Sep 17 00:00:00 2001 From: Kai Liu <kailiu@mmini.tail90d2d5.ts.net> Date: Sat, 18 Apr 2026 13:55:06 +0800 Subject: [PATCH 2/4] Validate task agent against known provider ids - tasks.ts: normalizeAgent now lower-cases and rejects values that are not in AGENT_PROVIDERS, mirroring how channel-level agent providers are resolved. The scheduler is still channel-default; this makes the column a usable override surface rather than free text, and fails fast on typos ("claude-code-beta", "gpt4", etc.) from either the CLI or HTTP API. - Web UI Tasks form: swap the free-text agent input for a Select populated from the enabled agent providers (same source the workspace channel picker uses). Includes a sticky extra option for an already-persisted-but-now-disabled agent so edits don't silently lose data. Task list badges now show the human label. - CLI help: clarify --agent accepts a provider id (opencode, codex, claudecode, ...), not an arbitrary agent name. - Tests: +4 cases covering unsupported agent rejection on create and update, case-insensitive normalization, and blank strings collapsing to channel default. 15 pass total in tasks.test.ts (was 11); full suite 247 pass / 0 fail / 1 skip. --- packages/config/local/tasks.test.ts | 46 ++++++++++++++++++ packages/config/local/tasks.ts | 11 ++++- packages/core/cli-handlers/task.ts | 4 +- .../src/routes/(settings)/tasks/+page.svelte | 47 ++++++++++++++----- 4 files changed, 94 insertions(+), 14 deletions(-) diff --git a/packages/config/local/tasks.test.ts b/packages/config/local/tasks.test.ts index 61de2c8a..e3ac3a40 100644 --- a/packages/config/local/tasks.test.ts +++ b/packages/config/local/tasks.test.ts @@ -242,4 +242,50 @@ describe("tasks storage", () => { const pendingOrder = list.filter((t) => t.status === "pending").map((t) => t.id); expect(pendingOrder).toEqual([sooner.id, later.id]); }); + + test("createTask rejects unsupported agent ids", () => { + expect(() => + createTask({ + title: "bad-agent", + scheduledAt: Date.now() + 60_000, + channelId: "C_TEST", + messageText: "hi", + agent: "not-a-real-agent", + }), + ).toThrow(/Unsupported agent/); + }); + + test("createTask normalizes agent casing and accepts known ids", () => { + const task = createTask({ + title: "case", + scheduledAt: Date.now() + 60_000, + channelId: "C_TEST", + messageText: "hi", + agent: "Codex", + }); + expect(task.agent).toBe("codex"); + }); + + test("createTask treats empty agent string as null (channel default)", () => { + const task = createTask({ + title: "default", + scheduledAt: Date.now() + 60_000, + channelId: "C_TEST", + messageText: "hi", + agent: " ", + }); + expect(task.agent).toBeNull(); + }); + + test("updateTask rejects unsupported agent ids", () => { + const task = createTask({ + title: "u-agent", + scheduledAt: Date.now() + 60_000, + channelId: "C_TEST", + messageText: "hi", + agent: "opencode", + }); + expect(() => updateTask(task.id, { agent: "claude-code-beta" })).toThrow(/Unsupported agent/); + expect(getTaskById(task.id)?.agent).toBe("opencode"); + }); }); diff --git a/packages/config/local/tasks.ts b/packages/config/local/tasks.ts index 89ed6aa6..827baba6 100644 --- a/packages/config/local/tasks.ts +++ b/packages/config/local/tasks.ts @@ -2,6 +2,7 @@ import { Database } from "bun:sqlite"; import * as fs from "fs"; import * as os from "os"; import * as path from "path"; +import { AGENT_PROVIDERS, isAgentProviderId } from "@/shared/agent-provider"; import { loadOdeConfig } from "./ode-store"; // --------------------------------------------------------------------------- @@ -263,8 +264,14 @@ function normalizeThreadId(value: string | null | undefined): string | null { function normalizeAgent(value: string | null | undefined): string | null { if (value === null || value === undefined) return null; - const trimmed = value.trim(); - return trimmed.length > 0 ? trimmed : null; + const trimmed = value.trim().toLowerCase(); + if (trimmed.length === 0) return null; + if (!isAgentProviderId(trimmed)) { + throw new Error( + `Unsupported agent "${value}". Expected one of: ${AGENT_PROVIDERS.join(", ")}`, + ); + } + return trimmed; } export function listTaskChannelOptions(): TaskChannelOption[] { diff --git a/packages/core/cli-handlers/task.ts b/packages/core/cli-handlers/task.ts index 140fca72..05db99b3 100644 --- a/packages/core/cli-handlers/task.ts +++ b/packages/core/cli-handlers/task.ts @@ -132,7 +132,7 @@ function printTaskHelp(): void { "ode task - one-time scheduled tasks", "", "Usage:", - " ode task create --time <ISO8601> --channel <channelId> --message <text> [--thread <threadId>] [--title <title>] [--agent <agentId>] [--run-now]", + " ode task create --time <ISO8601> --channel <channelId> --message <text> [--thread <threadId>] [--title <title>] [--agent <provider>] [--run-now]", " ode task list [--status <status>] [--json]", " ode task show <id> [--json]", " ode task cancel <id>", @@ -143,6 +143,8 @@ function printTaskHelp(): void { " --time accepts ISO 8601, e.g. 2026-04-18T23:30:00+08:00", " --thread is optional. When set, the task reuses the thread's session; when omitted, it posts as a fresh channel message.", " --channel accepts either a raw channel id or a \"workspaceId::channelId\" value.", + " --agent is optional; accepts a CLI provider id: opencode | claudecode | codex | kimi | kiro | kilo | qwen | goose | gemini.", + " When omitted, the task uses the channel's default agent.", ].join("\n"), ); } diff --git a/packages/web-ui/src/routes/(settings)/tasks/+page.svelte b/packages/web-ui/src/routes/(settings)/tasks/+page.svelte index a7e8a5ee..dfcdea46 100644 --- a/packages/web-ui/src/routes/(settings)/tasks/+page.svelte +++ b/packages/web-ui/src/routes/(settings)/tasks/+page.svelte @@ -3,6 +3,13 @@ import { Ban, CalendarClock, Pencil, Play, Plus, RefreshCw, Trash2 } from "lucide-svelte"; import { Badge, Button, Card, Input, Label, Select, Textarea } from "$lib/components/ui"; import { locale } from "$lib/i18n"; + import { + AGENT_PROVIDERS, + AGENT_PROVIDER_LABELS, + isAgentProviderId, + type AgentProviderId, + } from "@/shared/agent-provider"; + import { localSettingStore } from "$lib/local-setting/store"; type TaskPlatform = "slack" | "discord" | "lark"; type TaskStatus = "pending" | "running" | "success" | "failed" | "cancelled"; @@ -59,6 +66,15 @@ let formRunImmediately = $state(false); let runningTaskIds = $state<Set<string>>(new Set()); + function isProviderEnabled(provider: AgentProviderId): boolean { + const agents = $localSettingStore.config.agents as Record<string, { enabled?: boolean }>; + return agents[provider]?.enabled === true; + } + + const enabledAgentProviders = $derived( + AGENT_PROVIDERS.filter((provider) => isProviderEnabled(provider)), + ); + function t(en: string, zh: string): string { return $locale === "zh-CN" ? zh : en; } @@ -434,18 +450,23 @@ <div class="space-y-2"> <Label for="task-agent">{t("Agent (optional)", "Agent(可选)")}</Label> - <Input - id="task-agent" - value={formAgent} - placeholder={t("e.g. opencode, claudecode, codex", "例如 opencode / claudecode / codex")} - on:input={(event) => { - formAgent = (event.currentTarget as HTMLInputElement).value; - }} - /> + <Select id="task-agent" bind:value={formAgent}> + <option value="">{t("Channel default", "使用频道默认")}</option> + {#each enabledAgentProviders as provider} + <option value={provider}> + {AGENT_PROVIDER_LABELS[provider]} ({provider}) + </option> + {/each} + {#if formAgent && isAgentProviderId(formAgent) && !enabledAgentProviders.includes(formAgent as AgentProviderId)} + <option value={formAgent}> + {AGENT_PROVIDER_LABELS[formAgent as AgentProviderId]} ({formAgent}) — {t("not enabled", "未启用")} + </option> + {/if} + </Select> <p class="text-xs text-[hsl(var(--muted-foreground))]"> {t( - "Leave empty to use the channel's default agent.", - "留空则使用频道默认的 agent。", + "Leave empty to use the channel's default agent. Only enabled CLIs are listed.", + "留空则使用频道默认的 agent。仅列出已启用的 CLI。", )} </p> </div> @@ -524,7 +545,11 @@ <Badge variant={getStatusVariant(task.status)}>{getStatusLabel(task.status)}</Badge> <Badge variant="outline">{task.platform}</Badge> {#if task.agent} - <Badge variant="outline">{task.agent}</Badge> + <Badge variant="outline"> + {isAgentProviderId(task.agent) + ? AGENT_PROVIDER_LABELS[task.agent as AgentProviderId] + : task.agent} + </Badge> {/if} </div> <p class="text-sm font-medium">{task.title}</p> From 162a7e79cafa0e65cac45b0e9816d95d1c846bab Mon Sep 17 00:00:00 2001 From: Kai Liu <kailiu@mmini.tail90d2d5.ts.net> Date: Sat, 18 Apr 2026 14:03:58 +0800 Subject: [PATCH 3/4] Honor task.agent override when running one-time tasks The task scheduler previously ignored `task.agent` and always ran on the channel's configured provider, so the `agent` column was recorded but had no effect. This wires the field through the adapter so per-task overrides actually take effect. - createAgentAdapter accepts an optional `providerOverride`. When set, `getOrCreateSession` picks that provider instead of the channel's default. All other adapter calls remain keyed by sessionId (session -> provider map), so downstream behaviour is unchanged. - scheduler.resolveTaskAgentProvider implements the documented fallback chain: task.agent -> channel agent -> getChannelAgentProvider default (opencode). Unknown/legacy values on `task.agent` fall through to the channel default instead of crashing the tick. Creation/update already rejects bad values at the source, so this is purely defense-in-depth. - runTask + prepareTaskSession both construct the adapter with the resolved override so session creation and message dispatch agree. Tests: - New scheduler.test.ts: 4 cases covering the full fallback chain (override wins, channel fallback, legacy-agent fallback, global opencode fallback). - Full suite: 251 pass / 0 fail / 1 skip (was 247). --- packages/agents/adapter.ts | 20 ++++- packages/core/tasks/scheduler.test.ts | 110 ++++++++++++++++++++++++++ packages/core/tasks/scheduler.ts | 27 ++++++- 3 files changed, 153 insertions(+), 4 deletions(-) create mode 100644 packages/core/tasks/scheduler.test.ts diff --git a/packages/agents/adapter.ts b/packages/agents/adapter.ts index b38d51a8..a1d5666e 100644 --- a/packages/agents/adapter.ts +++ b/packages/agents/adapter.ts @@ -22,7 +22,23 @@ function rememberSessionProvider(sessionId: string, providerId: AgentProviderId) sessionProviders.set(sessionId, providerId); } -export function createAgentAdapter(): AgentAdapter { +export type AgentAdapterOptions = { + /** + * Optional per-adapter provider override. When set, `getOrCreateSession` + * uses this provider instead of the channel's configured agent. Downstream + * calls (sendMessage, abort, subscribe, ...) remain keyed by the session id + * as usual, since `getOrCreateSession` writes the chosen provider into the + * `sessionProviders` map. Intended for schedulers that carry a per-job + * agent override (e.g. one-time tasks). + */ + providerOverride?: AgentProviderId | null; +}; + +export function createAgentAdapter(options: AgentAdapterOptions = {}): AgentAdapter { + const { providerOverride } = options; + const resolveProviderForChannel = (channelId: string): AgentProviderId => + providerOverride ?? getProviderForChannel(channelId); + return { supportsEventStream: true, getProviderForSession(sessionId) { @@ -33,7 +49,7 @@ export function createAgentAdapter(): AgentAdapter { return getAgentProviderLabel(providerId); }, async getOrCreateSession(channelId, threadId, cwd, env) { - const providerId = getProviderForChannel(channelId); + const providerId = resolveProviderForChannel(channelId); const provider = getAgentProvider(providerId); const result = await provider.getOrCreateSession(channelId, threadId, cwd, env); rememberSessionProvider(result.sessionId, providerId); diff --git a/packages/core/tasks/scheduler.test.ts b/packages/core/tasks/scheduler.test.ts new file mode 100644 index 00000000..9b6da62b --- /dev/null +++ b/packages/core/tasks/scheduler.test.ts @@ -0,0 +1,110 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import * as fs from "fs"; +import * as path from "path"; +import { invalidateOdeConfigCache, ODE_CONFIG_FILE } from "@/config/local/ode-store"; +import type { TaskRecord } from "@/config/local/tasks"; +import { resolveTaskAgentProvider } from "./scheduler"; + +// Scheduler-level tests for pure helpers. We swap the user's real +// ~/.config/ode/ode.json for a deterministic fixture and restore it on +// teardown so these tests never touch production config. + +let originalConfigExisted: boolean; +let originalConfigContent: string | null; + +function writeTestConfig(agentProviderForC1: string | undefined): void { + const config = { + user: {}, + workspaces: [ + { + id: "ws-test", + name: "Test Workspace", + type: "slack", + channelDetails: [ + { + id: "C_CODEX_CHANNEL", + name: "codex-room", + ...(agentProviderForC1 !== undefined ? { agentProvider: agentProviderForC1 } : {}), + }, + // Channel with no agentProvider set — getChannelAgentProvider falls + // back to "opencode" as the global default. + { id: "C_DEFAULT_CHANNEL", name: "default-room" }, + ], + }, + ], + }; + fs.mkdirSync(path.dirname(ODE_CONFIG_FILE), { recursive: true }); + fs.writeFileSync(ODE_CONFIG_FILE, JSON.stringify(config)); + invalidateOdeConfigCache(); +} + +function makeTask(overrides: Partial<TaskRecord> = {}): TaskRecord { + return { + id: "t1", + title: "test", + scheduledAt: Date.now(), + platform: "slack", + workspaceId: "ws-test", + workspaceName: "Test Workspace", + channelId: "C_CODEX_CHANNEL", + channelName: "codex-room", + threadId: null, + messageText: "hi", + agent: null, + status: "pending", + lastError: null, + triggeredAt: null, + completedAt: null, + createdAt: Date.now(), + updatedAt: Date.now(), + ...overrides, + }; +} + +beforeEach(() => { + originalConfigExisted = fs.existsSync(ODE_CONFIG_FILE); + originalConfigContent = originalConfigExisted ? fs.readFileSync(ODE_CONFIG_FILE, "utf-8") : null; +}); + +afterEach(() => { + try { + if (originalConfigExisted && originalConfigContent !== null) { + fs.writeFileSync(ODE_CONFIG_FILE, originalConfigContent); + } else { + fs.rmSync(ODE_CONFIG_FILE, { force: true }); + } + } catch { + // Best effort. + } + invalidateOdeConfigCache(); +}); + +describe("resolveTaskAgentProvider", () => { + test("prefers task.agent when it is a known provider id", () => { + writeTestConfig("codex"); + const task = makeTask({ agent: "claudecode" }); + expect(resolveTaskAgentProvider(task)).toBe("claudecode"); + }); + + test("falls back to channel agent when task.agent is null", () => { + writeTestConfig("codex"); + const task = makeTask({ channelId: "C_CODEX_CHANNEL", agent: null }); + expect(resolveTaskAgentProvider(task)).toBe("codex"); + }); + + test("falls back to channel agent when task.agent is an unknown string", () => { + // Defense-in-depth: createTask rejects bad values at write time, but a + // legacy or manually-edited row should not break the scheduler tick. + writeTestConfig("codex"); + const task = makeTask({ channelId: "C_CODEX_CHANNEL", agent: "no-such-agent" }); + expect(resolveTaskAgentProvider(task)).toBe("codex"); + }); + + test("falls back to 'opencode' when neither task nor channel specify one", () => { + // C_DEFAULT_CHANNEL has no agentProvider field, so + // getChannelAgentProvider returns the global default. + writeTestConfig(undefined); + const task = makeTask({ channelId: "C_DEFAULT_CHANNEL", agent: null }); + expect(resolveTaskAgentProvider(task)).toBe("opencode"); + }); +}); diff --git a/packages/core/tasks/scheduler.ts b/packages/core/tasks/scheduler.ts index ad096618..237f3911 100644 --- a/packages/core/tasks/scheduler.ts +++ b/packages/core/tasks/scheduler.ts @@ -1,6 +1,7 @@ import { createAgentAdapter } from "@/agents/adapter"; import type { OpenCodeMessageContext } from "@/agents"; import { + getChannelAgentProvider, getChannelBaseBranch, getChannelModel, getChannelSystemMessage, @@ -37,6 +38,7 @@ import { sendChannelMessage as sendSlackChannelMessage, sendMessage as sendSlackThreadMessage, } from "@/ims/slack/client"; +import { type AgentProviderId, isAgentProviderId } from "@/shared/agent-provider"; import { log } from "@/utils"; // --------------------------------------------------------------------------- @@ -76,6 +78,27 @@ function resolveTaskThreadId(task: TaskRecord): string { return task.threadId ?? getSyntheticThreadId(task.id); } +/** + * Resolve the agent provider a task should run on, following the fallback + * chain: + * 1. `task.agent` (per-task override set by CLI / Web UI), + * 2. the channel's configured agent (`channelDetails.agentProvider`), + * 3. the global default baked into `getChannelAgentProvider` (`opencode`). + * + * Unknown string values on `task.agent` (e.g. a provider that used to be + * supported but was removed) fall through to the channel default rather than + * blowing up the scheduler tick; creation/update already rejects bad values + * at the source. + * + * Exported for unit tests that don't want to touch the real SQLite DB. + */ +export function resolveTaskAgentProvider(task: TaskRecord): AgentProviderId { + if (task.agent && isAgentProviderId(task.agent)) { + return task.agent; + } + return getChannelAgentProvider(task.channelId); +} + function resolveInboxModelForTask( task: TaskRecord, options: ReturnType<typeof buildMessageOptions>, @@ -132,7 +155,7 @@ async function prepareTaskSession(task: TaskRecord): Promise<{ }> { const threadId = resolveTaskThreadId(task); const userId = getTaskUserId(task.id); - const agent = createAgentAdapter(); + const agent = createAgentAdapter({ providerOverride: resolveTaskAgentProvider(task) }); let cwd = resolveChannelCwd(task.channelId).cwd; let session = loadSession(task.channelId, threadId); @@ -194,7 +217,7 @@ async function prepareTaskSession(task: TaskRecord): Promise<{ } async function runTask(task: TaskRecord): Promise<void> { - const agent = createAgentAdapter(); + const agent = createAgentAdapter({ providerOverride: resolveTaskAgentProvider(task) }); const taskMessageId = getTaskMessageId(task); let agentResultDetailId: string | null = null; let threadKey: string | null = null; From 9c61d79ce822d9eb43c8d8d2cbaed841f8c68497 Mon Sep 17 00:00:00 2001 From: Kai Liu <kailiu@mmini.tail90d2d5.ts.net> Date: Sat, 18 Apr 2026 14:09:52 +0800 Subject: [PATCH 4/4] Prefer anchored thread's agent over task.agent override MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a one-time task is anchored to an existing thread, resume on that thread's provider instead of honouring task.agent. Sessions are provider-scoped in storage — getThreadSessionId keys on (channelId, threadId, providerId) — so switching providers mid- thread would silently spin up a fresh session under the override and drop the conversation history the task was trying to continue. resolveTaskAgentProvider now follows this order: 1. anchored thread's persisted session provider (when present), 2. task.agent (per-task override), 3. channel agent (channelDetails.agentProvider), 4. global default from getChannelAgentProvider ("opencode"). If the task is anchored but no session was ever persisted (brand new thread), step 1 is skipped and the resolver falls through to the normal override/channel/default chain. Tests: +2 cases covering anchored-thread wins and anchored-but-no- session-yet falls through. 6 pass in scheduler.test.ts (was 4); full suite 253 pass / 0 fail / 1 skip (was 251). --- packages/core/tasks/scheduler.test.ts | 90 +++++++++++++++++++++++++-- packages/core/tasks/scheduler.ts | 25 +++++++- 2 files changed, 107 insertions(+), 8 deletions(-) diff --git a/packages/core/tasks/scheduler.test.ts b/packages/core/tasks/scheduler.test.ts index 9b6da62b..4b115aae 100644 --- a/packages/core/tasks/scheduler.test.ts +++ b/packages/core/tasks/scheduler.test.ts @@ -3,14 +3,21 @@ import * as fs from "fs"; import * as path from "path"; import { invalidateOdeConfigCache, ODE_CONFIG_FILE } from "@/config/local/ode-store"; import type { TaskRecord } from "@/config/local/tasks"; +import { deleteSession, saveSession } from "@/config/local/sessions"; import { resolveTaskAgentProvider } from "./scheduler"; // Scheduler-level tests for pure helpers. We swap the user's real // ~/.config/ode/ode.json for a deterministic fixture and restore it on // teardown so these tests never touch production config. +// +// Persisted sessions under ~/.config/ode/sessions are written via +// saveSession and cleaned up in afterEach by deleting every (channelId, +// threadId) pair we created. This mirrors what other scheduler/runtime +// tests in the repo do. let originalConfigExisted: boolean; let originalConfigContent: string | null; +const createdSessionKeys: Array<{ channelId: string; threadId: string }> = []; function writeTestConfig(agentProviderForC1: string | undefined): void { const config = { @@ -61,9 +68,35 @@ function makeTask(overrides: Partial<TaskRecord> = {}): TaskRecord { }; } +function seedSession( + channelId: string, + threadId: string, + providerId: "opencode" | "claudecode" | "codex" | "kimi" | "kiro" | "kilo" | "qwen" | "goose" | "gemini", +): void { + const now = Date.now(); + saveSession( + { + sessionId: `sess-${providerId}-${now}`, + providerId, + platform: "slack", + channelId, + threadId, + workingDirectory: "/tmp", + threadOwnerUserId: "U_TEST", + participantBotIds: [], + createdAt: now, + lastActivityAt: now, + lastActivityBotId: "test", + }, + { immediate: true }, + ); + createdSessionKeys.push({ channelId, threadId }); +} + beforeEach(() => { originalConfigExisted = fs.existsSync(ODE_CONFIG_FILE); originalConfigContent = originalConfigExisted ? fs.readFileSync(ODE_CONFIG_FILE, "utf-8") : null; + createdSessionKeys.length = 0; }); afterEach(() => { @@ -77,18 +110,28 @@ afterEach(() => { // Best effort. } invalidateOdeConfigCache(); + + // Clean up any session files we seeded. + for (const key of createdSessionKeys) { + try { + deleteSession(key.channelId, key.threadId); + } catch { + // Best effort. + } + } + createdSessionKeys.length = 0; }); describe("resolveTaskAgentProvider", () => { - test("prefers task.agent when it is a known provider id", () => { + test("prefers task.agent when it is a known provider id (no anchored thread)", () => { writeTestConfig("codex"); - const task = makeTask({ agent: "claudecode" }); + const task = makeTask({ threadId: null, agent: "claudecode" }); expect(resolveTaskAgentProvider(task)).toBe("claudecode"); }); test("falls back to channel agent when task.agent is null", () => { writeTestConfig("codex"); - const task = makeTask({ channelId: "C_CODEX_CHANNEL", agent: null }); + const task = makeTask({ channelId: "C_CODEX_CHANNEL", threadId: null, agent: null }); expect(resolveTaskAgentProvider(task)).toBe("codex"); }); @@ -96,7 +139,11 @@ describe("resolveTaskAgentProvider", () => { // Defense-in-depth: createTask rejects bad values at write time, but a // legacy or manually-edited row should not break the scheduler tick. writeTestConfig("codex"); - const task = makeTask({ channelId: "C_CODEX_CHANNEL", agent: "no-such-agent" }); + const task = makeTask({ + channelId: "C_CODEX_CHANNEL", + threadId: null, + agent: "no-such-agent", + }); expect(resolveTaskAgentProvider(task)).toBe("codex"); }); @@ -104,7 +151,40 @@ describe("resolveTaskAgentProvider", () => { // C_DEFAULT_CHANNEL has no agentProvider field, so // getChannelAgentProvider returns the global default. writeTestConfig(undefined); - const task = makeTask({ channelId: "C_DEFAULT_CHANNEL", agent: null }); + const task = makeTask({ + channelId: "C_DEFAULT_CHANNEL", + threadId: null, + agent: null, + }); expect(resolveTaskAgentProvider(task)).toBe("opencode"); }); + + test("anchored thread wins over task.agent override", () => { + // Thread has an existing claudecode session. Even though the task asks + // for codex and the channel defaults to kimi, we honour the thread to + // preserve its conversation context. + writeTestConfig("kimi"); + const channelId = "C_CODEX_CHANNEL"; + const threadId = "T_ANCHOR_THREAD"; + seedSession(channelId, threadId, "claudecode"); + const task = makeTask({ + channelId, + threadId, + agent: "codex", + }); + expect(resolveTaskAgentProvider(task)).toBe("claudecode"); + }); + + test("anchored thread without a persisted session falls back to override then channel", () => { + // threadId is set but we never seeded a session — resolver must not + // pretend the thread has one, and should continue down the normal + // fallback chain. + writeTestConfig("kimi"); + const task = makeTask({ + channelId: "C_CODEX_CHANNEL", + threadId: "T_NEW_THREAD_NO_SESSION", + agent: "goose", + }); + expect(resolveTaskAgentProvider(task)).toBe("goose"); + }); }); diff --git a/packages/core/tasks/scheduler.ts b/packages/core/tasks/scheduler.ts index 237f3911..6d189b04 100644 --- a/packages/core/tasks/scheduler.ts +++ b/packages/core/tasks/scheduler.ts @@ -81,9 +81,17 @@ function resolveTaskThreadId(task: TaskRecord): string { /** * Resolve the agent provider a task should run on, following the fallback * chain: - * 1. `task.agent` (per-task override set by CLI / Web UI), - * 2. the channel's configured agent (`channelDetails.agentProvider`), - * 3. the global default baked into `getChannelAgentProvider` (`opencode`). + * 1. If the task is anchored to an existing thread that already has a + * persisted session, use that thread's provider. Tasks anchored to a + * thread reuse its session for context continuity, and sessions are + * provider-scoped in storage (`getThreadSessionId(..., providerId)`), + * so honouring a different per-task override here would silently spin + * up a fresh session under a new provider and drop the thread's + * history. In that case we intentionally ignore `task.agent`. + * 2. Otherwise `task.agent` (per-task override set by CLI / Web UI), + * 3. Otherwise the channel's configured agent (`channelDetails.agentProvider`), + * 4. Otherwise the global default baked into `getChannelAgentProvider` + * (currently `opencode`). * * Unknown string values on `task.agent` (e.g. a provider that used to be * supported but was removed) fall through to the channel default rather than @@ -93,9 +101,20 @@ function resolveTaskThreadId(task: TaskRecord): string { * Exported for unit tests that don't want to touch the real SQLite DB. */ export function resolveTaskAgentProvider(task: TaskRecord): AgentProviderId { + // 1. Anchored thread wins over everything — keep the existing session's + // provider so we don't silently fork the conversation. + if (task.threadId && task.threadId.trim().length > 0) { + const existing = loadSession(task.channelId, task.threadId); + if (existing?.providerId && isAgentProviderId(existing.providerId)) { + return existing.providerId; + } + } + // 2. Per-task override. if (task.agent && isAgentProviderId(task.agent)) { return task.agent; } + // 3 + 4. Channel default -> global default (handled inside + // getChannelAgentProvider). return getChannelAgentProvider(task.channelId); }