Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions src/bot-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { BackendType } from './adapters/backend/types.js';
import type { CliId } from './adapters/cli/types.js';
import { logger } from './utils/logger.js';
import { isLocale, setBotLookup, type Locale } from './i18n/index.js';
import { readWorker, type WorkerConfig } from './global-config.js';
import type { VoiceConfig } from './services/voice/types.js';
import { type Brand, sdkDomain, normalizeBrand } from './im/lark/lark-hosts.js';

Expand Down Expand Up @@ -252,6 +253,16 @@ export interface BotConfig {
* cards render the "🔊 语音总结" button. See services/voice/types.ts.
*/
voice?: VoiceConfig;
/**
* Per-bot live-worker budget override. Fields set here win over the global
* `worker` block in ~/.botmux/config.json for THIS bot's idle sweeper, and
* are NOT subject to the per-daemon auto split — a bot that configures
* `maxLiveWorkers` gets exactly that many, regardless of how many bots share
* the box. Unset fields fall through to the global config, then to the
* auto-derived (machine-budget ÷ bot-count) baseline. Use it to give a
* heavily-used bot a bigger slice without raising every other bot's cap.
*/
worker?: WorkerConfig;
}

export interface BotState {
Expand Down Expand Up @@ -454,6 +465,46 @@ export function resolveBrandLabel(larkAppId: string): string | undefined {
}
}

// Configured-bot count, mtime-cached like the other disk fallbacks above.
let botCountCache: { mtimeMs: number; count: number } | null = null;

/**
* How many bots the shared bots.json configures — i.e. how many daemons share
* this machine (multi-daemon deployments run one bot per process, so the
* in-memory registry only sees this daemon's own bot). Used to split the
* auto-derived worker budget across daemons. Falls back to the in-memory
* registry size (≥1) when no config file is readable, which also covers
* single-process/test setups.
*/
export function countConfiguredBots(): number {
const path = loadedConfigPath ?? botsConfigDiskPath();
if (path) {
try {
const stat = statSync(path);
if (!botCountCache || botCountCache.mtimeMs !== stat.mtimeMs) {
const raw = JSON.parse(readFileSync(path, 'utf-8'));
botCountCache = { mtimeMs: stat.mtimeMs, count: Array.isArray(raw) ? Math.max(1, raw.length) : 1 };
}
return botCountCache.count;
} catch { /* fall through to registry size */ }
}
return Math.max(1, bots.size);
}

/**
* This daemon's own bot's per-bot worker override, if any. A daemon process
* hosts exactly one bot (see daemon.ts — registerBot is called once per
* BOTMUX_BOT_INDEX), so the in-memory registry has a single entry whose
* `worker` block, when present, governs this bot's idle sweeper. Returns
* undefined when unconfigured (caller falls back to global + auto budget).
*/
export function ownBotWorkerConfig(): WorkerConfig | undefined {
for (const bot of bots.values()) {
if (bot.config.worker) return bot.config.worker;
}
return undefined;
}

/**
* Load bot configurations from one of (in priority order):
* 1. BOTS_CONFIG env var — path to a JSON file
Expand Down Expand Up @@ -653,6 +704,10 @@ export function parseBotConfigsFromText(jsonText: string): BotConfig[] {
}
}

// worker:per-bot live-worker 预算覆盖。复用 global-config.readWorker 同口径
// 校验(maxLiveWorkers/idleSuspendMs 仅取正整数,非法/缺省 → undefined)。
const worker = readWorker(entry.worker);

configs.push({
larkAppId: entry.larkAppId,
larkAppSecret: entry.larkAppSecret,
Expand Down Expand Up @@ -720,6 +775,7 @@ export function parseBotConfigsFromText(jsonText: string): BotConfig[] {
? entry.regularGroupMentionMode
: undefined,
voice,
worker,
});
}

Expand Down
9 changes: 5 additions & 4 deletions src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3071,7 +3071,7 @@ function argValues(args: string[], ...flags: string[]): string[] {
import { buildMentionedPendingResponseCard } from './im/lark/card-builder.js';
import { buildCardBodyElements, brandFooterSegment } from './im/lark/md-card.js';
import { COMPLETED_REACTION_EMOJI_TYPE, claimPendingResponseCard, isPendingResponseCardOpen, markPendingResponseCardPatchedIfCurrent, mergePendingResponseState, shouldMarkPendingAsMentionedSend, shouldPatchPendingOnExplicitSend } from './core/pending-response.js';
import { resolveBrandLabel } from './bot-registry.js';
import { countConfiguredBots, resolveBrandLabel } from './bot-registry.js';
import { config } from './config.js';
import { resolveQuoteTarget, validateMentionDecision, parseAttentionFlag, attentionUsageError } from './services/send-policy.js';

Expand Down Expand Up @@ -4911,11 +4911,12 @@ function cmdWorkerBudget(args: string[]): void {
if (sub === 'status') {
const cfg = readGlobalConfig();
const resources = detectWorkerResources();
const budget = resolveWorkerBudget(cfg.worker, resources);
const daemonCount = countConfiguredBots();
const budget = resolveWorkerBudget(cfg.worker, resources, daemonCount);
console.log('Worker budget');
console.log(` maxLiveWorkers: ${budget.maxLiveWorkers} (${budget.maxLiveWorkersSource})`);
console.log(` idleSuspendMs: ${budget.idleSuspendMs} (${budget.idleSuspendMsSource})`);
console.log(` auto baseline: ${budget.autoMaxLiveWorkers} from cpu=${resources.cpuCount}, memory=${formatGib(resources.memoryBytes)}`);
console.log(` auto baseline: ${budget.autoMaxLiveWorkers} from cpu=${resources.cpuCount}, memory=${formatGib(resources.memoryBytes)}, daemons=${daemonCount} (per-daemon share)`);
console.log(` Config file: ${globalConfigPath()}`);
console.log('');
console.log('Agent-safe edit commands:');
Expand Down Expand Up @@ -4945,7 +4946,7 @@ function cmdWorkerBudget(args: string[]): void {
if (idleMinutes !== undefined) next.idleSuspendMs = parsePositiveInt(idleMinutes, '--idle-minutes') * 60_000;

mergeGlobalConfig({ worker: next });
const budget = resolveWorkerBudget(next);
const budget = resolveWorkerBudget(next, undefined, countConfiguredBots());
console.log('✅ Updated worker budget.');
console.log(` maxLiveWorkers: ${budget.maxLiveWorkers} (${budget.maxLiveWorkersSource})`);
console.log(` idleSuspendMs: ${budget.idleSuspendMs} (${budget.idleSuspendMsSource})`);
Expand Down
8 changes: 7 additions & 1 deletion src/core/idle-worker-sweeper.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { DaemonSession } from './types.js';
import { countConfiguredBots, ownBotWorkerConfig } from '../bot-registry.js';
import { readGlobalConfig } from '../global-config.js';
import { DEFAULT_IDLE_SUSPEND_MS, resolveWorkerBudget, type ResolvedWorkerBudget } from './worker-budget.js';
import { suspendWorker } from './worker-pool.js';
Expand All @@ -25,7 +26,12 @@ export function sweepIdleWorkers(
opts: IdleWorkerSweepOptions = {},
): IdleWorkerSweepResult[] {
const now = opts.now ?? Date.now();
const budget = opts.workerBudget ?? resolveWorkerBudget(readGlobalConfig().worker);
// Per-bot worker override (if this bot configured one) wins field-by-field
// over the machine-wide global block; unset fields fall through to the
// per-daemon auto split inside resolveWorkerBudget. An explicit per-bot
// maxLiveWorkers therefore bypasses the split entirely (config source).
const workerConfig = { ...readGlobalConfig().worker, ...ownBotWorkerConfig() };
const budget = opts.workerBudget ?? resolveWorkerBudget(workerConfig, undefined, countConfiguredBots());
const maxLiveWorkers = budget.maxLiveWorkers;
const idleMs = budget.idleSuspendMs;
const running = liveWorkers(activeSessions);
Expand Down
18 changes: 15 additions & 3 deletions src/core/worker-budget.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,29 @@ export function detectWorkerResources(): WorkerResources {
};
}

export function autoMaxLiveWorkers(resources: WorkerResources = detectWorkerResources()): number {
/**
* Auto-derived per-daemon live-worker budget.
*
* `daemonCount` is the number of daemons sharing this machine (= bots in the
* shared bots.json; multi-daemon deployments run one bot per process). The
* machine-level budget min(cpu×2, memGiB) is split evenly across daemons —
* without this, N daemons each claim the whole box and the effective
* machine-wide cap silently becomes N × budget.
*/
export function autoMaxLiveWorkers(resources: WorkerResources = detectWorkerResources(), daemonCount = 1): number {
const cpuBudget = Math.max(1, resources.cpuCount) * 2;
const memoryBudget = Math.max(1, Math.round(resources.memoryBytes / 1024 ** 3));
return clamp(Math.min(cpuBudget, memoryBudget), MIN_AUTO_MAX_LIVE_WORKERS, MAX_AUTO_MAX_LIVE_WORKERS);
const machineBudget = Math.min(cpuBudget, memoryBudget);
const perDaemon = Math.floor(machineBudget / Math.max(1, daemonCount));
return clamp(perDaemon, MIN_AUTO_MAX_LIVE_WORKERS, MAX_AUTO_MAX_LIVE_WORKERS);
}

export function resolveWorkerBudget(
workerConfig?: WorkerConfig,
resources: WorkerResources = detectWorkerResources(),
daemonCount = 1,
): ResolvedWorkerBudget {
const auto = autoMaxLiveWorkers(resources);
const auto = autoMaxLiveWorkers(resources, daemonCount);
return {
maxLiveWorkers: workerConfig?.maxLiveWorkers ?? auto,
idleSuspendMs: workerConfig?.idleSuspendMs ?? DEFAULT_IDLE_SUSPEND_MS,
Expand Down
2 changes: 1 addition & 1 deletion src/global-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ function readPositiveInteger(raw: unknown): number | undefined {
return raw;
}

function readWorker(raw: unknown): WorkerConfig | undefined {
export function readWorker(raw: unknown): WorkerConfig | undefined {
if (!raw || typeof raw !== 'object') return undefined;
const v = raw as Record<string, unknown>;
const worker: WorkerConfig = {};
Expand Down
54 changes: 54 additions & 0 deletions test/bot-registry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,60 @@ describe('parseBotConfigsFromText — brand', () => {
});
});

// ─── per-bot worker parsing ─────────────────────────────────────────────────

describe('parseBotConfigsFromText — per-bot worker', () => {
let mod: Awaited<ReturnType<typeof freshImport>>;

beforeEach(async () => {
mod = await freshImport();
});

it('parses a per-bot worker override of positive integers', () => {
const [cfg] = mod.parseBotConfigsFromText(JSON.stringify([
{ larkAppId: 'a', larkAppSecret: 's', worker: { maxLiveWorkers: 30, idleSuspendMs: 600000 } },
]));
expect(cfg.worker).toEqual({ maxLiveWorkers: 30, idleSuspendMs: 600000 });
});

it('leaves worker undefined when unset', () => {
const [cfg] = mod.parseBotConfigsFromText(JSON.stringify([
{ larkAppId: 'a', larkAppSecret: 's' },
]));
expect(cfg.worker).toBeUndefined();
});

it('drops non-positive / non-integer worker fields', () => {
const [cfg] = mod.parseBotConfigsFromText(JSON.stringify([
{ larkAppId: 'a', larkAppSecret: 's', worker: { maxLiveWorkers: 0, idleSuspendMs: -5 } },
]));
expect(cfg.worker).toBeUndefined();
});

it('keeps only the valid field when one is bogus', () => {
const [cfg] = mod.parseBotConfigsFromText(JSON.stringify([
{ larkAppId: 'a', larkAppSecret: 's', worker: { maxLiveWorkers: 12, idleSuspendMs: 1.5 } },
]));
expect(cfg.worker).toEqual({ maxLiveWorkers: 12 });
});

it('exposes this daemon bot worker override via ownBotWorkerConfig', () => {
const [cfg] = mod.parseBotConfigsFromText(JSON.stringify([
{ larkAppId: 'a', larkAppSecret: 's', worker: { maxLiveWorkers: 24 } },
]));
mod.registerBot(cfg);
expect(mod.ownBotWorkerConfig()).toEqual({ maxLiveWorkers: 24 });
});

it('ownBotWorkerConfig is undefined when the bot has no worker block', () => {
const [cfg] = mod.parseBotConfigsFromText(JSON.stringify([
{ larkAppId: 'a', larkAppSecret: 's' },
]));
mod.registerBot(cfg);
expect(mod.ownBotWorkerConfig()).toBeUndefined();
});
});

// ─── getBot / getBotClient ────────────────────────────────────────────────

describe('getBot / getBotClient', () => {
Expand Down
63 changes: 62 additions & 1 deletion test/idle-worker-sweeper.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { describe, expect, it, vi } from 'vitest';
import { beforeEach, describe, expect, it, vi } from 'vitest';

vi.mock('../src/services/session-store.js', () => ({
updateSessionPid: vi.fn(),
Expand All @@ -16,6 +16,16 @@ vi.mock('../src/utils/logger.js', () => ({
},
}));

const mockReadGlobalConfig = vi.fn();
const mockOwnBotWorkerConfig = vi.fn();
vi.mock('../src/global-config.js', () => ({
readGlobalConfig: () => mockReadGlobalConfig() ?? {},
}));
vi.mock('../src/bot-registry.js', () => ({
countConfiguredBots: () => 6,
ownBotWorkerConfig: () => mockOwnBotWorkerConfig(),
}));

import { sweepIdleWorkers } from '../src/core/idle-worker-sweeper.js';

function ds(sessionId: string, backendType: string, lastMessageAt: number, worker = {}) {
Expand Down Expand Up @@ -216,3 +226,54 @@ describe('sweepIdleWorkers', () => {
expect(activeSessions.get('a').worker).not.toBe(null);
});
});

describe('sweepIdleWorkers per-bot worker override', () => {
beforeEach(() => {
mockReadGlobalConfig.mockReset();
mockOwnBotWorkerConfig.mockReset();
});

const overBudget = (now: number) => new Map<string, any>([
['a', ds('a', 'tmux', now - 90 * 60_000)],
['b', ds('b', 'herdr', now - 80 * 60_000)],
['c', ds('c', 'zellij', now - 70 * 60_000)],
]);

it("uses this bot's per-bot maxLiveWorkers over the global block (not auto-split)", () => {
const now = 1_000_000;
// Global says 1, but this bot configured 2 → its own value wins, so only
// the single oldest idle worker over its budget of 2 is suspended.
mockReadGlobalConfig.mockReturnValue({ worker: { maxLiveWorkers: 1, idleSuspendMs: 30 * 60_000 } });
mockOwnBotWorkerConfig.mockReturnValue({ maxLiveWorkers: 2 });

const activeSessions = overBudget(now);
const suspended = sweepIdleWorkers(activeSessions, { now });

expect(suspended.map(s => s.sessionId)).toEqual(['a']);
});

it('falls through to the global block for fields the bot did not set', () => {
const now = 1_000_000;
// Bot only overrides idleSuspendMs; maxLiveWorkers comes from global (1).
mockReadGlobalConfig.mockReturnValue({ worker: { maxLiveWorkers: 1 } });
mockOwnBotWorkerConfig.mockReturnValue({ idleSuspendMs: 30 * 60_000 });

const activeSessions = overBudget(now);
const suspended = sweepIdleWorkers(activeSessions, { now });

expect(suspended.map(s => s.sessionId)).toEqual(['a', 'b']);
});

it('keeps an idle worker live when its per-bot idleSuspendMs has not elapsed', () => {
const now = 1_000_000;
// High idle threshold (2h) → the 90/80-min-idle workers are too recent to
// suspend even though they exceed the budget of 1.
mockReadGlobalConfig.mockReturnValue({ worker: { maxLiveWorkers: 1 } });
mockOwnBotWorkerConfig.mockReturnValue({ idleSuspendMs: 120 * 60_000 });

const activeSessions = overBudget(now);
const suspended = sweepIdleWorkers(activeSessions, { now });

expect(suspended).toEqual([]);
});
});
23 changes: 22 additions & 1 deletion test/worker-budget.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, expect, it } from 'vitest';

import { resolveWorkerBudget } from '../src/core/worker-budget.js';
import { autoMaxLiveWorkers, resolveWorkerBudget } from '../src/core/worker-budget.js';

const gib = (n: number) => n * 1024 ** 3;

Expand All @@ -11,6 +11,27 @@ describe('resolveWorkerBudget', () => {
expect(resolveWorkerBudget(undefined, { cpuCount: 64, memoryBytes: gib(128) }).maxLiveWorkers).toBe(32);
});

it('splits the machine budget across daemons sharing the box', () => {
const box = { cpuCount: 56, memoryBytes: gib(110) };
// Single daemon: clamp(min(112, 110)) = 32 — unchanged legacy behavior.
expect(autoMaxLiveWorkers(box)).toBe(32);
expect(autoMaxLiveWorkers(box, 1)).toBe(32);
// Six daemons (one bot each): floor(110 / 6) = 18 per daemon.
expect(autoMaxLiveWorkers(box, 6)).toBe(18);
expect(resolveWorkerBudget(undefined, box, 6).maxLiveWorkers).toBe(18);
// The per-daemon share never drops below the MIN floor…
expect(autoMaxLiveWorkers(box, 100)).toBe(4);
// …and a bogus count is treated as a single daemon.
expect(autoMaxLiveWorkers(box, 0)).toBe(32);
});

it('keeps an explicit maxLiveWorkers override per-daemon (not split)', () => {
const resolved = resolveWorkerBudget({ maxLiveWorkers: 12 }, { cpuCount: 56, memoryBytes: gib(110) }, 6);
expect(resolved.maxLiveWorkers).toBe(12);
expect(resolved.maxLiveWorkersSource).toBe('config');
expect(resolved.autoMaxLiveWorkers).toBe(18);
});

it('lets global config override max live workers and idle threshold independently', () => {
const resolved = resolveWorkerBudget(
{ maxLiveWorkers: 12, idleSuspendMs: 45 * 60_000 },
Expand Down