From fc9346d79edd577a0194fbee07c3904f57650250 Mon Sep 17 00:00:00 2001 From: root Date: Sat, 25 Apr 2026 11:41:17 +0800 Subject: [PATCH 1/2] =?UTF-8?q?fix:=20session=5Fend=E9=98=B2=E6=8A=96=20+?= =?UTF-8?q?=20=E5=A2=9E=E9=87=8F=E7=A4=BE=E5=8C=BA=E6=91=98=E8=A6=81=20+?= =?UTF-8?q?=20maintenance=E5=8F=AF=E8=A7=82=E6=B5=8B=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题: - session_end对所有session(含短命subagent)都执行finalize+maintenance,645/1928个短session白白消耗大量LLM调用 - 社区摘要每次maintenance全量刷新所有社区,LLM调用成本高 - 缺少maintenance触发次数等可观测数据 修复: 1. index.ts: 两道防线(消息<3条跳过 + 无节点跳过maintenance) 2. community.ts: 增量摘要模式(只处理新增+成员变化社区),24h自动全量兜底 3. maintenance.ts: gm_meta触发计数器 + 结果记录 4. db.ts: 新增m7_meta迁移(gm_meta表) 5. store.ts: getMeta/setMeta/getIncrementalCommunities辅助函数 --- index.ts | 27 ++++++++++++++++--- src/graph/community.ts | 57 +++++++++++++++++++++++++++++++++++++--- src/graph/maintenance.ts | 47 +++++++++++++++++++++++++++++++-- src/store/db.ts | 13 ++++++++- src/store/store.ts | 45 +++++++++++++++++++++++++++++++ 5 files changed, 178 insertions(+), 11 deletions(-) diff --git a/index.ts b/index.ts index 1c0cbb4..74be147 100755 --- a/index.ts +++ b/index.ts @@ -481,9 +481,9 @@ const graphMemoryPlugin = { try { const { summarizeCommunities } = await import("./src/graph/community.ts"); const embedFn = (recaller as any).embed ?? undefined; - const summaries = await summarizeCommunities(db, comm.communities, llm, embedFn); + const summaries = await summarizeCommunities(db, comm.communities, llm, embedFn, "incremental"); api.logger.info( - `[graph-memory] community summaries refreshed: ${summaries} summaries`, + `[graph-memory] community summaries refreshed: ${summaries} summaries (incremental)`, ); } catch (e) { api.logger.error(`[graph-memory] community summary failed: ${e}`); @@ -533,6 +533,17 @@ const graphMemoryPlugin = { if (!sid) return; try { + // 【防线1】短 session(<3条消息)直接跳过 finalize + maintenance + const msgCount = db.prepare( + "SELECT COUNT(*) as cnt FROM gm_messages WHERE session_id = ?" + ).get(sid) as any; + if (msgCount.cnt < 3) { + api.logger.info( + `[graph-memory] session_end skipped: short session (${msgCount.cnt} msgs)` + ); + return; + } + const nodes = getBySession(db, sid); if (nodes.length) { const summary = ( @@ -569,8 +580,16 @@ const graphMemoryPlugin = { for (const id of fin.invalidations) deprecate(db, id); } + // 【防线2】无节点变更则跳过 maintenance + if (nodes.length === 0) { + api.logger.info( + `[graph-memory] session_end skipped maintenance: no nodes for session` + ); + return; + } + const embedFn = (recaller as any).embed ?? undefined; - const result = await runMaintenance(db, cfg, llm, embedFn); + const result = await runMaintenance(db, cfg, llm, embedFn, "incremental"); api.logger.info( `[graph-memory] maintenance: ${result.durationMs}ms, ` + `dedup=${result.dedup.merged}, ` + @@ -711,7 +730,7 @@ const graphMemoryPlugin = { parameters: Type.Object({}), async execute(_toolCallId: string, _params: any) { const embedFn = (recaller as any).embed ?? undefined; - const result = await runMaintenance(db, cfg, llm, embedFn); + const result = await runMaintenance(db, cfg, llm, embedFn, "full"); const text = [ `图维护完成(${result.durationMs}ms)`, `去重:发现 ${result.dedup.pairs.length} 对相似节点,合并 ${result.dedup.merged} 对`, diff --git a/src/graph/community.ts b/src/graph/community.ts index 4382fec..c9127d5 100755 --- a/src/graph/community.ts +++ b/src/graph/community.ts @@ -168,7 +168,7 @@ export function getCommunityPeers(db: DatabaseSyncInstance, nodeId: string, limi import type { CompleteFn } from "../engine/llm.ts"; import type { EmbedFn } from "../engine/embed.ts"; -import { upsertCommunitySummary, pruneCommunitySummaries } from "../store/store.ts"; +import { upsertCommunitySummary, pruneCommunitySummaries, getIncrementalCommunities, getMeta, setMeta } from "../store/store.ts"; const COMMUNITY_SUMMARY_SYS = `你是知识图谱摘要引擎。根据节点列表,用简短的描述概括这组节点的主题领域。 要求: @@ -176,21 +176,65 @@ const COMMUNITY_SUMMARY_SYS = `你是知识图谱摘要引擎。根据节点列 - 描述涵盖的工具/技术/任务领域 - 不要使用"社区"这个词`; +export type SummaryMode = "incremental" | "full"; + +/** 全量摘要的最小间隔(毫秒),默认 24 小时 */ +const FULL_SUMMARY_INTERVAL_MS = 24 * 60 * 60 * 1000; + /** - * 为所有社区生成 LLM 摘要描述 + embedding 向量 + * 为社区生成 LLM 摘要描述 + embedding 向量 + * + * @param mode - "incremental": 只处理新增+成员变化的社区 + * "full": 处理所有社区(用于每日全量刷新) + * 默认 "incremental" * - * 调用时机:runMaintenance → detectCommunities 之后 + * 增量逻辑: + * - 新增社区(gm_communities 无记录)→ 摘要 + * - 成员数变化(node_count 不匹配)→ 摘要 + * - 无变化 → 跳过 + * + * 全量逻辑: + * - 处理所有社区,并记录完成时间到 gm_meta */ export async function summarizeCommunities( db: DatabaseSyncInstance, communities: Map, llm: CompleteFn, embedFn?: EmbedFn, + mode: SummaryMode = "incremental", ): Promise { pruneCommunitySummaries(db); + + // 决定要处理的社区子集 + let targetCommunities: Map; + + if (mode === "full") { + targetCommunities = communities; + } else { + // incremental: 只处理新增 + 成员变化的社区 + targetCommunities = getIncrementalCommunities(db, communities); + + // 自动全量兜底:距上次全量 >24h 则自动切换为 full + const lastFullStr = getMeta(db, "last_full_summary_at"); + if (lastFullStr) { + const elapsed = Date.now() - parseInt(lastFullStr, 10); + if (elapsed > FULL_SUMMARY_INTERVAL_MS) { + if (process.env.GM_DEBUG) { + console.log(` [DEBUG] auto full summary: ${Math.round(elapsed / 3600000)}h since last full`); + } + targetCommunities = communities; + mode = "full"; + } + } else { + // 从未做过全量摘要,强制做一次 + targetCommunities = communities; + mode = "full"; + } + } + let generated = 0; - for (const [communityId, memberIds] of communities) { + for (const [communityId, memberIds] of targetCommunities) { if (memberIds.length === 0) continue; const placeholders = memberIds.map(() => "?").join(","); @@ -245,5 +289,10 @@ export async function summarizeCommunities( } } + // 全量模式完成后记录时间戳 + if (mode === "full" && generated > 0) { + setMeta(db, "last_full_summary_at", String(Date.now())); + } + return generated; } \ No newline at end of file diff --git a/src/graph/maintenance.ts b/src/graph/maintenance.ts index 1e6629f..f5c4726 100755 --- a/src/graph/maintenance.ts +++ b/src/graph/maintenance.ts @@ -33,9 +33,27 @@ export interface MaintenanceResult { export async function runMaintenance( db: DatabaseSyncInstance, cfg: GmConfig, llm?: CompleteFn, embedFn?: EmbedFn, + summaryMode: "incremental" | "full" = "incremental", ): Promise { const start = Date.now(); + // ─── maintenance 触发计数器 ────────────────────────────── + try { + const counterKey = "maintenance_trigger_count"; + const lastTriggerKey = "maintenance_last_trigger_at"; + const row = db.prepare("SELECT value FROM gm_meta WHERE key = ?").get(counterKey) as any; + const count = row ? parseInt(row.value, 10) + 1 : 1; + db.prepare("INSERT OR REPLACE INTO gm_meta (key, value) VALUES (?, ?)").run(counterKey, String(count)); + db.prepare("INSERT OR REPLACE INTO gm_meta (key, value) VALUES (?, ?)").run(lastTriggerKey, String(Date.now())); + if (process.env.GM_DEBUG) { + console.log(` [DEBUG] maintenance: trigger #${count} at ${new Date().toISOString()}`); + } + } catch (err) { + if (process.env.GM_DEBUG) { + console.log(` [DEBUG] maintenance: counter write failed: ${err}`); + } + } + // 去重/新增节点后清除图结构缓存 invalidateGraphCache(); @@ -55,7 +73,9 @@ export async function runMaintenance( let communitySummaries = 0; if (llm && communityResult.communities.size > 0) { try { - communitySummaries = await summarizeCommunities(db, communityResult.communities, llm, embedFn); + communitySummaries = await summarizeCommunities( + db, communityResult.communities, llm, embedFn, summaryMode, + ); if (process.env.GM_DEBUG) { console.log(` [DEBUG] maintenance: generated ${communitySummaries} community summaries`); } @@ -66,11 +86,34 @@ export async function runMaintenance( } } + const durationMs = Date.now() - start; + + // ─── 记录本次 maintenance 结果 ────────────────────────── + try { + const resultKey = "maintenance_last_result"; + const result = { + at: new Date().toISOString(), + durationMs, + dedupMerged: dedupResult.merged, + communityCount: communityResult.communities.size, + communitySummaries, + summaryMode, + }; + db.prepare("INSERT OR REPLACE INTO gm_meta (key, value) VALUES (?, ?)").run(resultKey, JSON.stringify(result)); + if (process.env.GM_DEBUG) { + console.log(` [DEBUG] maintenance: completed in ${durationMs}ms, ${communitySummaries} summaries`); + } + } catch (err) { + if (process.env.GM_DEBUG) { + console.log(` [DEBUG] maintenance: result write failed: ${err}`); + } + } + return { dedup: dedupResult, pagerank: pagerankResult, community: communityResult, communitySummaries, - durationMs: Date.now() - start, + durationMs, }; } \ No newline at end of file diff --git a/src/store/db.ts b/src/store/db.ts index 60b24cd..1a98e63 100755 --- a/src/store/db.ts +++ b/src/store/db.ts @@ -51,7 +51,7 @@ export function closeDb(): void { function migrate(db: DatabaseSyncInstance): void { db.exec(`CREATE TABLE IF NOT EXISTS _migrations (v INTEGER PRIMARY KEY, at INTEGER NOT NULL)`); const cur = (db.prepare("SELECT MAX(v) as v FROM _migrations").get() as any)?.v ?? 0; - const steps = [m1_core, m2_messages, m3_signals, m4_fts5, m5_vectors, m6_communities]; + const steps = [m1_core, m2_messages, m3_signals, m4_fts5, m5_vectors, m6_communities, m7_meta]; for (let i = cur; i < steps.length; i++) { steps[i](db); db.prepare("INSERT INTO _migrations (v,at) VALUES (?,?)").run(i + 1, Date.now()); @@ -189,3 +189,14 @@ function m6_communities(db: DatabaseSyncInstance): void { ); `); } + +// ─── 元数据表 ────────────────────────────────────────────── + +function m7_meta(db: DatabaseSyncInstance): void { + db.exec(` + CREATE TABLE IF NOT EXISTS gm_meta ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ); + `); +} diff --git a/src/store/store.ts b/src/store/store.ts index ec472a9..e348671 100755 --- a/src/store/store.ts +++ b/src/store/store.ts @@ -640,4 +640,49 @@ export function pruneCommunitySummaries(db: DatabaseSyncInstance): number { ) `).run(); return result.changes; +} + +// ─── 元数据读写 ────────────────────────────────────────────── + +export function getMeta(db: DatabaseSyncInstance, key: string): string | null { + const row = db.prepare("SELECT value FROM gm_meta WHERE key=?").get(key) as any; + return row?.value ?? null; +} + +export function setMeta(db: DatabaseSyncInstance, key: string, value: string): void { + db.prepare("INSERT OR REPLACE INTO gm_meta (key, value) VALUES (?, ?)").run(key, value); +} + +/** + * 获取需要增量摘要的社区列表(新增 + 成员数变化的社区) + * + * 判断逻辑: + * - gm_communities 中无记录 → 新增社区 + * - gm_communities.node_count != 实际活跃节点数 → 成员变化 + * - 其余跳过 + */ +export function getIncrementalCommunities( + db: DatabaseSyncInstance, + communities: Map, +): Map { + const result = new Map(); + + for (const [communityId, memberIds] of communities) { + if (memberIds.length === 0) continue; + + const existing = db.prepare( + "SELECT node_count FROM gm_communities WHERE id=?" + ).get(communityId) as any; + + if (!existing) { + // 新增社区:无摘要记录 + result.set(communityId, memberIds); + } else if (existing.node_count !== memberIds.length) { + // 成员数变化 + result.set(communityId, memberIds); + } + // else: 无变化,跳过 + } + + return result; } \ No newline at end of file From fe036334e96ce41dfa06961693b09941eea2062d Mon Sep 17 00:00:00 2001 From: "zhuyan.karl" Date: Sat, 25 Apr 2026 15:01:32 +0800 Subject: [PATCH 2/2] Optimize graph memory LLM call control --- README.md | 23 +++++ README_CN.md | 23 +++++ index.ts | 180 +++++++++++++++++++++++++++++---- openclaw.plugin.json | 61 ++++++++++- src/engine/budget.ts | 211 +++++++++++++++++++++++++++++++++++++++ src/engine/llm.ts | 46 ++++++--- src/extractor/extract.ts | 18 +++- src/extractor/gate.ts | 128 ++++++++++++++++++++++++ src/graph/community.ts | 8 +- src/graph/maintenance.ts | 4 +- src/types.ts | 48 +++++++++ test/budget.test.ts | 75 ++++++++++++++ test/gate.test.ts | 106 ++++++++++++++++++++ test/helpers.ts | 10 +- 14 files changed, 902 insertions(+), 39 deletions(-) create mode 100644 src/engine/budget.ts create mode 100644 src/extractor/gate.ts create mode 100644 test/budget.test.ts create mode 100644 test/gate.test.ts diff --git a/README.md b/README.md index de42975..e1e6756 100644 --- a/README.md +++ b/README.md @@ -251,6 +251,10 @@ Add your API credentials inside `plugins.entries.graph-memory.config`: "baseURL": "https://api.openai.com/v1", "model": "gpt-4o-mini" }, + "llmMonthlyCallBudget": 90000, + "llmMonthlyCommunitySummaryBudget": 3000, + "llmMonthlyFinalizeBudget": 3000, + "llmBudgetTimeZone": "Asia/Shanghai", "embedding": { "apiKey": "your-embedding-api-key", "baseURL": "https://api.openai.com/v1", @@ -266,6 +270,14 @@ Add your API credentials inside `plugins.entries.graph-memory.config`: **LLM** (`config.llm`) — Required. Used for knowledge extraction and community summaries. Any OpenAI-compatible endpoint works. Use a cheap/fast model. +**Virtual monthly LLM plan** (`llmMonthlyCallBudget`) — Optional but recommended for call-count-limited plans such as CodePlan. graph-memory tracks its own LLM calls in SQLite by month (`YYYY-MM`) and computes each day's dynamic allowance as: + +``` +remaining monthly calls / remaining days in the month, including today +``` + +Quiet days automatically increase the allowance for later days. If you start using this mid-month, set `llmMonthlyCallBudget` to your remaining calls for the current month; next month you can set it back to your full plan size. + **Embedding** (`config.embedding`) — Optional but recommended. Enables semantic vector search, community-level recall, and vector dedup. Without it, falls back to FTS5 full-text search (still works, just keyword-based). > **⚠️ Important**: `pnpm openclaw plugins install` may reset your config. Always verify `config.llm` and `config.embedding` are present after reinstalling. @@ -346,6 +358,17 @@ All parameters have defaults. Only set what you want to override. | `compactTurnCount` | `7` | Turns between maintenance cycles (PageRank + community + summaries) | | `recallMaxNodes` | `6` | Max nodes injected per recall | | `recallMaxDepth` | `2` | Graph traversal hops from seed nodes | +| `llmMonthlyCallBudget` | `90000` | Virtual monthly LLM call plan. Daily allowance is computed from remaining monthly calls / remaining days | +| `llmMonthlyCommunitySummaryBudget` | `3000` | Monthly sub-budget for community summary LLM calls | +| `llmMonthlyFinalizeBudget` | `3000` | Monthly sub-budget for session-end finalize LLM calls | +| `llmBudgetTimeZone` | `Asia/Shanghai` | Time zone used to roll monthly/daily budget counters | +| `extractBatchMinMessages` | `6` | Pending meaningful messages before normal extraction runs | +| `extractBatchMinChars` | `1600` | Pending meaningful characters before normal extraction runs | +| `extractTrivialMaxChars` | `40` | Low-signal short confirmations at or below this length are skipped | +| `extractMaxMessageChars` | `600` | Max text kept per message in extraction prompts | +| `extractMaxBatchMessages` | `30` | Max unextracted messages sent to one extraction call | +| `extractDebounceMs` | `45000` | Quiet period before normal pending messages are flushed | +| `extractFlushIntervalMs` | `120000` | Periodic fallback flush interval for pending messages | | `dedupThreshold` | `0.90` | Cosine similarity threshold for node dedup | | `pagerankDamping` | `0.85` | PPR damping factor | | `pagerankIterations` | `20` | PPR iteration count | diff --git a/README_CN.md b/README_CN.md index 1d1f335..0838e00 100644 --- a/README_CN.md +++ b/README_CN.md @@ -253,6 +253,10 @@ pnpm openclaw plugins install . "baseURL": "https://api.openai.com/v1", "model": "gpt-4o-mini" }, + "llmMonthlyCallBudget": 90000, + "llmMonthlyCommunitySummaryBudget": 3000, + "llmMonthlyFinalizeBudget": 3000, + "llmBudgetTimeZone": "Asia/Shanghai", "embedding": { "apiKey": "你的Embedding-API密钥", "baseURL": "https://api.openai.com/v1", @@ -268,6 +272,14 @@ pnpm openclaw plugins install . **LLM**(`config.llm`)— 必填。用于知识提取和社区摘要生成。支持任何 OpenAI 兼容端点。建议用便宜/快速的模型。 +**虚拟月调用计划**(`llmMonthlyCallBudget`)— 可选但推荐,适合 CodePlan 这类按调用次数限制的套餐。graph-memory 会在 SQLite 中按月份(`YYYY-MM`)记录自己的 LLM 调用次数,并按下面的公式动态计算当天额度: + +``` +本月剩余调用次数 / 含今天在内的本月剩余天数 +``` + +某几天少用,后面每天的可用额度会自动变高。若你在月中启用,建议先把 `llmMonthlyCallBudget` 设置成本月剩余调用次数;下个月再改回完整套餐额度。 + **Embedding**(`config.embedding`)— 可选但推荐。启用语义向量搜索、社区级召回和向量去重。不配则降级为 FTS5 全文搜索(仍然可用,只是基于关键词匹配)。 > **⚠️ 注意**:`pnpm openclaw plugins install` 可能会重置你的配置。每次重装插件后请检查 `config.llm` 和 `config.embedding` 是否还在。 @@ -348,6 +360,17 @@ sqlite3 ~/.openclaw/graph-memory.db "SELECT id, summary FROM gm_communities;" | `compactTurnCount` | `7` | 维护周期(每隔多少轮触发 PageRank + 社区检测 + 摘要) | | `recallMaxNodes` | `6` | 每次召回最多注入的节点数 | | `recallMaxDepth` | `2` | 图遍历跳数 | +| `llmMonthlyCallBudget` | `90000` | 虚拟月调用计划。每日额度按本月剩余调用数 / 本月剩余天数动态计算 | +| `llmMonthlyCommunitySummaryBudget` | `3000` | 社区摘要 LLM 调用的月度子预算 | +| `llmMonthlyFinalizeBudget` | `3000` | 会话结束整理 LLM 调用的月度子预算 | +| `llmBudgetTimeZone` | `Asia/Shanghai` | 月度/每日预算计数切换使用的时区 | +| `extractBatchMinMessages` | `6` | 普通内容累计多少条消息后触发知识提取 | +| `extractBatchMinChars` | `1600` | 普通内容累计多少字符后触发知识提取 | +| `extractTrivialMaxChars` | `40` | 低价值短确认/寒暄消息在该长度内会跳过 LLM 提取 | +| `extractMaxMessageChars` | `600` | 知识提取时每条消息最多保留的字符数 | +| `extractMaxBatchMessages` | `30` | 单次知识提取最多处理的未提取消息数 | +| `extractDebounceMs` | `45000` | 普通消息安静多久后合批抽取 | +| `extractFlushIntervalMs` | `120000` | 定时兜底 flush 间隔,避免普通消息长期不抽取 | | `dedupThreshold` | `0.90` | 向量去重的余弦相似度阈值 | | `pagerankDamping` | `0.85` | PPR 阻尼系数 | | `pagerankIterations` | `20` | PPR 迭代次数 | diff --git a/index.ts b/index.ts index 74be147..e08e53b 100755 --- a/index.ts +++ b/index.ts @@ -19,10 +19,12 @@ import { getBySession, edgesFrom, edgesTo, deprecate, getStats, } from "./src/store/store.ts"; -import { createCompleteFn } from "./src/engine/llm.ts"; +import { createCompleteFn, type CompleteFn } from "./src/engine/llm.ts"; +import { reserveLlmCall } from "./src/engine/budget.ts"; import { createEmbedFn } from "./src/engine/embed.ts"; import { Recaller } from "./src/recaller/recall.ts"; import { Extractor } from "./src/extractor/extract.ts"; +import { classifyExtractionBatch, isImmediateExtraction } from "./src/extractor/gate.ts"; import { assembleContext } from "./src/format/assemble.ts"; import { sanitizeToolUseResultPairing } from "./src/format/transcript-repair.ts"; import { runMaintenance } from "./src/graph/maintenance.ts"; @@ -136,7 +138,21 @@ const graphMemoryPlugin = { const anthropicApiKey = cfg.llm?.apiKey && !cfg.llm?.baseURL ? cfg.llm.apiKey // If apiKey set but no baseURL, assume Anthropic direct : undefined; - const llm = createCompleteFn(provider, model, cfg.llm, anthropicApiKey); + const rawLlm = createCompleteFn(provider, model, cfg.llm, anthropicApiKey); + const llm: CompleteFn = async (system, user, options = {}) => { + const kind = options.kind ?? "other"; + const reservation = reserveLlmCall(db, cfg, kind); + if (!reservation.allowed) { + api.logger.warn( + `[graph-memory] LLM budget skipped ${kind}: ${reservation.reason} ` + + `(day=${reservation.day}, today=${reservation.todayUsed}/${reservation.todayLimit || "∞"}, ` + + `month=${reservation.monthUsed}/${reservation.monthlyLimit || "∞"}, ` + + `${kind}_today=${reservation.todayKindUsed}/${reservation.todayKindLimit || "∞"})`, + ); + throw new Error(`[graph-memory] ${reservation.reason}`); + } + return rawLlm(system, user, options); + }; const recaller = new Recaller(db, cfg); const extractor = new Extractor(cfg, llm); @@ -158,9 +174,12 @@ const graphMemoryPlugin = { const msgSeq = new Map(); const recalled = new Map(); const turnCounter = new Map(); // 社区维护计数器 + const debounceTimers = new Map>(); + const flushTimers = new Map>(); // ── 提取串行化(同 session Promise chain,不同 session 并行)──── - const extractChain = new Map>(); + type ExtractRunResult = "empty" | "skipped" | "deferred" | "extracted" | "failed"; + const extractChain = new Map>(); /** 存一条消息到 gm_messages(同步,零 LLM) */ function ingestMessage(sessionId: string, message: any): void { @@ -177,18 +196,77 @@ const graphMemoryPlugin = { saveMessage(db, sessionId, seq, message.role ?? "unknown", message); } - /** 每轮结束后直接提取当前轮的消息(同 session 串行,不丢消息) */ - async function runTurnExtract(sessionId: string, newMessages: any[]): Promise { - if (!newMessages.length) return; + function clearDebounce(sessionId: string): void { + const timer = debounceTimers.get(sessionId); + if (timer) clearTimeout(timer); + debounceTimers.delete(sessionId); + } + + function clearFlushTimer(sessionId: string): void { + const timer = flushTimers.get(sessionId); + if (timer) clearInterval(timer); + flushTimers.delete(sessionId); + } + + function ensureFlushTimer(sessionId: string): void { + if (cfg.extractFlushIntervalMs <= 0 || flushTimers.has(sessionId)) return; + const timer = setInterval(() => { + forceFlushSession(sessionId, "interval").catch((err) => { + api.logger.error(`[graph-memory] interval flush failed: ${err}`); + }); + }, cfg.extractFlushIntervalMs); + flushTimers.set(sessionId, timer); + } + + function scheduleDebounce(sessionId: string): void { + if (cfg.extractDebounceMs <= 0) { + forceFlushSession(sessionId, "debounce-disabled").catch((err) => { + api.logger.error(`[graph-memory] debounce flush failed: ${err}`); + }); + return; + } + clearDebounce(sessionId); + const timer = setTimeout(() => { + debounceTimers.delete(sessionId); + forceFlushSession(sessionId, "debounce").catch((err) => { + api.logger.error(`[graph-memory] debounce flush failed: ${err}`); + }); + }, cfg.extractDebounceMs); + debounceTimers.set(sessionId, timer); + } + + /** 按门控策略抽取一批消息(同 session 串行,不丢消息) */ + async function runTurnExtract( + sessionId: string, + newMessages: any[], + force = false, + ): Promise { + if (!newMessages.length && !force) return "empty"; // Promise chain:上一次提取完了才跑下一次,不会跳过 - const prev = extractChain.get(sessionId) ?? Promise.resolve(); - const next = prev.then(async () => { + const prev = extractChain.get(sessionId) ?? Promise.resolve("empty" as ExtractRunResult); + const next = prev.then(async (): Promise => { try { - const msgs = getUnextracted(db, sessionId, 50); - if (!msgs.length) return; + const msgs = getUnextracted(db, sessionId, cfg.extractMaxBatchMessages); + if (!msgs.length) return "empty"; - const existing = getBySession(db, sessionId).map((n) => n.name); + const decision = classifyExtractionBatch(msgs, cfg, force); + const maxTurn = Math.max(...msgs.map((m: any) => m.turn_index)); + if (decision.action === "skip") { + markExtracted(db, sessionId, maxTurn); + api.logger.info(`[graph-memory] extraction skipped: ${decision.reason}, msgs=${msgs.length}`); + return "skipped"; + } + if (decision.action === "defer") { + if (process.env.GM_DEBUG) { + api.logger.info(`[graph-memory] extraction deferred: ${decision.reason}, msgs=${msgs.length}`); + } + return "deferred"; + } + + const existing = getBySession(db, sessionId) + .map((n) => n.name) + .slice(-cfg.extractExistingNamesLimit); const result = await extractor.extract({ messages: msgs, existingNames: existing, @@ -215,7 +293,6 @@ const graphMemoryPlugin = { } } - const maxTurn = Math.max(...msgs.map((m: any) => m.turn_index)); markExtracted(db, sessionId, maxTurn); if (result.nodes.length || result.edges.length) { @@ -226,15 +303,65 @@ const graphMemoryPlugin = { `[graph-memory] extracted ${result.nodes.length} nodes [${nodeDetails}], ${result.edges.length} edges [${edgeDetails}]`, ); } + return "extracted"; } catch (err) { api.logger.error(`[graph-memory] turn extract failed: ${err}`); // 不 throw — 失败不阻塞 chain 中下一次提取 + return "failed"; } }); extractChain.set(sessionId, next); return next; } + async function forceFlushSession(sessionId: string, reason: string): Promise { + clearDebounce(sessionId); + for (let i = 0; i < 100; i++) { + const pending = getUnextracted(db, sessionId, cfg.extractMaxBatchMessages); + if (!pending.length) return; + + const result = await runTurnExtract(sessionId, pending, true); + if (result === "empty" || result === "deferred" || result === "failed") { + api.logger.warn(`[graph-memory] ${reason} flush stopped: ${result}, pending=${pending.length}`); + return; + } + } + api.logger.warn(`[graph-memory] ${reason} flush stopped after safety limit`); + } + + function scheduleTurnExtract(sessionId: string, newMessages: any[]): void { + if (!newMessages.length) return; + ensureFlushTimer(sessionId); + + const pending = getUnextracted(db, sessionId, cfg.extractMaxBatchMessages); + if (!pending.length) return; + + const decision = classifyExtractionBatch(pending, cfg, false); + if (decision.action === "skip") { + runTurnExtract(sessionId, newMessages).catch((err) => { + api.logger.error(`[graph-memory] skip extraction failed: ${err}`); + }); + return; + } + + if (isImmediateExtraction(decision)) { + clearDebounce(sessionId); + runTurnExtract(sessionId, newMessages).catch((err) => { + api.logger.error(`[graph-memory] immediate extraction failed: ${err}`); + }); + return; + } + + if (pending.length >= cfg.extractMaxBatchMessages) { + forceFlushSession(sessionId, "max-batch").catch((err) => { + api.logger.error(`[graph-memory] max-batch flush failed: ${err}`); + }); + return; + } + + scheduleDebounce(sessionId); + } + // ── before_prompt_build:召回 ──────────────────────────── api.on("before_prompt_build", async (event: any, ctx: any) => { @@ -454,10 +581,12 @@ const graphMemoryPlugin = { `[graph-memory] afterTurn sid=${sessionId.slice(0, 8)} newMsgs=${newMessages.length} totalMsgs=${totalMsgs}`, ); - // ★ 每轮直接提取 - runTurnExtract(sessionId, newMessages).catch((err) => { - api.logger.error(`[graph-memory] turn extract failed: ${err}`); - }); + // ★ 调度策略: + // - error / 用户纠正 / 明确完成:立即抽取 + // - 普通消息:debounce 后合批抽取 + // - 到 extractMaxBatchMessages:强制抽取 + // - extractFlushIntervalMs:定时兜底 flush + scheduleTurnExtract(sessionId, newMessages); // ★ 社区维护:每 N 轮触发一次(纯计算,<5ms) const turns = (turnCounter.get(sessionId) ?? 0) + 1; @@ -481,7 +610,9 @@ const graphMemoryPlugin = { try { const { summarizeCommunities } = await import("./src/graph/community.ts"); const embedFn = (recaller as any).embed ?? undefined; - const summaries = await summarizeCommunities(db, comm.communities, llm, embedFn, "incremental"); + const summaries = await summarizeCommunities( + db, comm.communities, llm, embedFn, "incremental", cfg.communitySummaryMaxTokens, + ); api.logger.info( `[graph-memory] community summaries refreshed: ${summaries} summaries (incremental)`, ); @@ -511,12 +642,18 @@ const graphMemoryPlugin = { async onSubagentEnded({ childSessionKey }: { childSessionKey: string }) { recalled.delete(childSessionKey); msgSeq.delete(childSessionKey); + clearDebounce(childSessionKey); + clearFlushTimer(childSessionKey); }, async dispose() { extractChain.clear(); msgSeq.clear(); recalled.clear(); + for (const timer of debounceTimers.values()) clearTimeout(timer); + for (const timer of flushTimers.values()) clearInterval(timer); + debounceTimers.clear(); + flushTimers.clear(); }, }; @@ -533,7 +670,10 @@ const graphMemoryPlugin = { if (!sid) return; try { - // 【防线1】短 session(<3条消息)直接跳过 finalize + maintenance + // session_end 强制 flush,确保 debounce/interval 里积累的普通消息落入图谱。 + await forceFlushSession(sid, "session_end"); + + // 【防线1】短 session(<3条消息)只跳过 finalize + maintenance const msgCount = db.prepare( "SELECT COUNT(*) as cnt FROM gm_messages WHERE session_id = ?" ).get(sid) as any; @@ -600,6 +740,8 @@ const graphMemoryPlugin = { } catch (err) { api.logger.error(`[graph-memory] session_end error: ${err}`); } finally { + clearDebounce(sid); + clearFlushTimer(sid); extractChain.delete(sid); msgSeq.delete(sid); recalled.delete(sid); @@ -894,4 +1036,4 @@ function sliceLastTurn( return { messages: kept, tokens, dropped }; } -export default graphMemoryPlugin; \ No newline at end of file +export default graphMemoryPlugin; diff --git a/openclaw.plugin.json b/openclaw.plugin.json index 95d04a6..ff3b655 100755 --- a/openclaw.plugin.json +++ b/openclaw.plugin.json @@ -31,6 +31,61 @@ "default": 10, "description": "(deprecated) 已改为按用户轮次切分,此参数不再使用" }, + "llmMonthlyCallBudget": { + "type": "integer", + "default": 90000, + "description": "虚拟月调用计划:graph-memory 每月最多使用多少次 LLM 调用。0 表示不限制" + }, + "llmMonthlyCommunitySummaryBudget": { + "type": "integer", + "default": 3000, + "description": "社区摘要每月最多使用多少次 LLM 调用。0 表示不单独限制" + }, + "llmMonthlyFinalizeBudget": { + "type": "integer", + "default": 3000, + "description": "会话结束整理每月最多使用多少次 LLM 调用。0 表示不单独限制" + }, + "llmBudgetTimeZone": { + "type": "string", + "default": "Asia/Shanghai", + "description": "月度调用计划按哪个时区切换月份" + }, + "extractBatchMinMessages": { + "type": "integer", + "default": 6, + "description": "普通消息累计多少条后触发一次知识提取" + }, + "extractBatchMinChars": { + "type": "integer", + "default": 1600, + "description": "普通消息累计多少字符后触发一次知识提取" + }, + "extractTrivialMaxChars": { + "type": "integer", + "default": 40, + "description": "短于该字符数且低价值的确认/寒暄消息会跳过 LLM 提取" + }, + "extractMaxMessageChars": { + "type": "integer", + "default": 600, + "description": "知识提取时每条消息最多保留多少字符" + }, + "extractMaxBatchMessages": { + "type": "integer", + "default": 30, + "description": "单次知识提取最多处理多少条未提取消息" + }, + "extractDebounceMs": { + "type": "integer", + "default": 45000, + "description": "普通消息安静多久后合批抽取。0 表示不等待" + }, + "extractFlushIntervalMs": { + "type": "integer", + "default": 120000, + "description": "定时兜底 flush 间隔,避免普通消息长期不抽取。0 表示关闭" + }, "dedupThreshold": { "type": "number", "default": 0.90, @@ -62,9 +117,11 @@ "properties": { "apiKey": { "type": "string" }, "baseURL": { "type": "string" }, - "model": { "type": "string" } + "model": { "type": "string" }, + "maxTokens": { "type": "integer", "description": "可选:显式限制单次 LLM 输出 token。默认不限制" }, + "jsonMode": { "type": "boolean", "default": false, "description": "可选:启用 OpenAI 兼容 JSON mode。模型不支持时请保持 false" } } } } } -} \ No newline at end of file +} diff --git a/src/engine/budget.ts b/src/engine/budget.ts new file mode 100644 index 0000000..bc1fc0e --- /dev/null +++ b/src/engine/budget.ts @@ -0,0 +1,211 @@ +/** + * Monthly LLM call budgeting with dynamic daily allowance. + * + * CodePlan-style plans are call-count limited. We track monthly usage and + * compute today's allowance as: + * + * remaining monthly calls / days left in the month, including today + * + * This keeps usage close to the monthly limit while allowing quiet days to + * increase later daily capacity. + */ + +import type { DatabaseSyncInstance } from "@photostructure/sqlite"; +import type { GmConfig } from "../types.ts"; +import { getMeta, setMeta } from "../store/store.ts"; + +export type LlmCallKind = "extract" | "finalize" | "community_summary" | "other"; + +export interface BudgetReservation { + allowed: boolean; + reason?: string; + day: string; + month: string; + todayUsed: number; + todayKindUsed: number; + monthUsed: number; + monthKindUsed: number; + todayLimit: number; + todayKindLimit: number; + monthlyLimit: number; + monthlyKindLimit: number; +} + +interface DateParts { + day: string; + month: string; + dayOfMonth: number; + daysInMonth: number; +} + +function dateParts(timeZone: string): DateParts { + try { + const parts = new Intl.DateTimeFormat("en-CA", { + timeZone, + year: "numeric", + month: "2-digit", + day: "2-digit", + }).formatToParts(new Date()); + const year = Number(parts.find(p => p.type === "year")?.value); + const month = Number(parts.find(p => p.type === "month")?.value); + const day = Number(parts.find(p => p.type === "day")?.value); + const daysInMonth = new Date(year, month, 0).getDate(); + const yyyy = String(year).padStart(4, "0"); + const mm = String(month).padStart(2, "0"); + const dd = String(day).padStart(2, "0"); + return { + day: `${yyyy}-${mm}-${dd}`, + month: `${yyyy}-${mm}`, + dayOfMonth: day, + daysInMonth, + }; + } catch { + const now = new Date(); + const year = now.getUTCFullYear(); + const month = now.getUTCMonth() + 1; + const day = now.getUTCDate(); + const daysInMonth = new Date(Date.UTC(year, month, 0)).getUTCDate(); + const yyyy = String(year).padStart(4, "0"); + const mm = String(month).padStart(2, "0"); + const dd = String(day).padStart(2, "0"); + return { + day: `${yyyy}-${mm}-${dd}`, + month: `${yyyy}-${mm}`, + dayOfMonth: day, + daysInMonth, + }; + } +} + +function metaKey(scope: "day" | "month", key: string, kind?: LlmCallKind): string { + return kind ? `llm_calls:${scope}:${key}:${kind}` : `llm_calls:${scope}:${key}:total`; +} + +function readCount(db: DatabaseSyncInstance, key: string): number { + const raw = getMeta(db, key); + if (!raw) return 0; + const n = Number.parseInt(raw, 10); + return Number.isFinite(n) && n > 0 ? n : 0; +} + +function monthlyKindLimit(cfg: GmConfig, kind: LlmCallKind): number { + if (kind === "community_summary") return cfg.llmMonthlyCommunitySummaryBudget; + if (kind === "finalize") return cfg.llmMonthlyFinalizeBudget; + return 0; +} + +function dynamicDailyLimit(monthlyLimit: number, monthUsed: number, parts: DateParts): number { + if (monthlyLimit <= 0) return 0; + const remaining = Math.max(0, monthlyLimit - monthUsed); + const daysLeft = Math.max(1, parts.daysInMonth - parts.dayOfMonth + 1); + return Math.ceil(remaining / daysLeft); +} + +export function reserveLlmCall( + db: DatabaseSyncInstance, + cfg: GmConfig, + kind: LlmCallKind, +): BudgetReservation { + const parts = dateParts(cfg.llmBudgetTimeZone || "Asia/Shanghai"); + const monthlyLimit = cfg.llmMonthlyCallBudget; + const perKindMonthlyLimit = monthlyKindLimit(cfg, kind); + + const dayTotalKey = metaKey("day", parts.day); + const dayKindKey = metaKey("day", parts.day, kind); + const monthTotalKey = metaKey("month", parts.month); + const monthKindKey = metaKey("month", parts.month, kind); + + const todayUsed = readCount(db, dayTotalKey); + const todayKindUsed = readCount(db, dayKindKey); + const monthUsed = readCount(db, monthTotalKey); + const monthKindUsed = readCount(db, monthKindKey); + + const todayLimit = dynamicDailyLimit(monthlyLimit, monthUsed, parts); + const todayKindLimit = dynamicDailyLimit(perKindMonthlyLimit, monthKindUsed, parts); + + if (monthlyLimit > 0 && monthUsed >= monthlyLimit) { + return { + allowed: false, + reason: "monthly LLM call budget exhausted", + day: parts.day, + month: parts.month, + todayUsed, + todayKindUsed, + monthUsed, + monthKindUsed, + todayLimit, + todayKindLimit, + monthlyLimit, + monthlyKindLimit: perKindMonthlyLimit, + }; + } + + if (monthlyLimit > 0 && todayUsed >= todayLimit) { + return { + allowed: false, + reason: "dynamic daily LLM call allowance exhausted", + day: parts.day, + month: parts.month, + todayUsed, + todayKindUsed, + monthUsed, + monthKindUsed, + todayLimit, + todayKindLimit, + monthlyLimit, + monthlyKindLimit: perKindMonthlyLimit, + }; + } + + if (perKindMonthlyLimit > 0 && monthKindUsed >= perKindMonthlyLimit) { + return { + allowed: false, + reason: `monthly ${kind} call budget exhausted`, + day: parts.day, + month: parts.month, + todayUsed, + todayKindUsed, + monthUsed, + monthKindUsed, + todayLimit, + todayKindLimit, + monthlyLimit, + monthlyKindLimit: perKindMonthlyLimit, + }; + } + + if (perKindMonthlyLimit > 0 && todayKindUsed >= todayKindLimit) { + return { + allowed: false, + reason: `dynamic daily ${kind} call allowance exhausted`, + day: parts.day, + month: parts.month, + todayUsed, + todayKindUsed, + monthUsed, + monthKindUsed, + todayLimit, + todayKindLimit, + monthlyLimit, + monthlyKindLimit: perKindMonthlyLimit, + }; + } + + setMeta(db, dayTotalKey, String(todayUsed + 1)); + setMeta(db, dayKindKey, String(todayKindUsed + 1)); + setMeta(db, monthTotalKey, String(monthUsed + 1)); + setMeta(db, monthKindKey, String(monthKindUsed + 1)); + return { + allowed: true, + day: parts.day, + month: parts.month, + todayUsed: todayUsed + 1, + todayKindUsed: todayKindUsed + 1, + monthUsed: monthUsed + 1, + monthKindUsed: monthKindUsed + 1, + todayLimit, + todayKindLimit, + monthlyLimit, + monthlyKindLimit: perKindMonthlyLimit, + }; +} diff --git a/src/engine/llm.ts b/src/engine/llm.ts index de7fd48..6e2f91e 100755 --- a/src/engine/llm.ts +++ b/src/engine/llm.ts @@ -18,9 +18,18 @@ export interface LlmConfig { apiKey?: string; baseURL?: string; model?: string; + maxTokens?: number; + jsonMode?: boolean; } -export type CompleteFn = (system: string, user: string) => Promise; +export interface CompleteOptions { + maxTokens?: number; + json?: boolean; + temperature?: number; + kind?: "extract" | "finalize" | "community_summary" | "other"; +} + +export type CompleteFn = (system: string, user: string, options?: CompleteOptions) => Promise; // ─── 带重试+超时的 fetch ───────────────────────────────────── @@ -52,25 +61,33 @@ export function createCompleteFn( llmConfig?: LlmConfig, anthropicApiKey?: string, ): CompleteFn { - return async (system, user) => { + return async (system, user, options = {}) => { // ── 路径 A(优先):pluginConfig.llm 直接调 OpenAI 兼容 API ── if (llmConfig?.apiKey && llmConfig?.baseURL) { const baseURL = llmConfig.baseURL.replace(/\/+$/, ""); const llmModel = llmConfig.model ?? model; + const body: Record = { + model: llmModel, + messages: [ + ...(system.trim() ? [{ role: "system", content: system.trim() }] : []), + { role: "user", content: user }, + ], + temperature: options.temperature ?? 0.1, + }; + + const maxTokens = options.maxTokens ?? llmConfig.maxTokens; + if (maxTokens && maxTokens > 0) body.max_tokens = maxTokens; + if (options.json && llmConfig.jsonMode) { + body.response_format = { type: "json_object" }; + } + const res = await fetchRetry(`${baseURL}/chat/completions`, { method: "POST", headers: { "Content-Type": "application/json", "Authorization": `Bearer ${llmConfig.apiKey}`, }, - body: JSON.stringify({ - model: llmModel, - messages: [ - ...(system.trim() ? [{ role: "system", content: system.trim() }] : []), - { role: "user", content: user }, - ], - temperature: 0.1, - }), + body: JSON.stringify(body), }); if (!res.ok) { const errText = await res.text().catch(() => ""); @@ -91,7 +108,12 @@ export function createCompleteFn( const res = await fetchRetry("https://api.anthropic.com/v1/messages", { method: "POST", headers: { "Content-Type": "application/json", "x-api-key": anthropicApiKey, "anthropic-version": "2023-06-01" }, - body: JSON.stringify({ model: llmConfig?.model ?? model, max_tokens: 4096, system, messages: [{ role: "user", content: user }] }), + body: JSON.stringify({ + model: llmConfig?.model ?? model, + max_tokens: options.maxTokens ?? llmConfig?.maxTokens ?? 4096, + system, + messages: [{ role: "user", content: user }], + }), }); if (!res.ok) throw new Error(`[graph-memory] Anthropic API ${res.status}`); const data = await res.json() as any; @@ -99,4 +121,4 @@ export function createCompleteFn( if (text) return text; throw new Error("[graph-memory] Anthropic API returned empty content"); }; -} \ No newline at end of file +} diff --git a/src/extractor/extract.ts b/src/extractor/extract.ts index 4bda0ea..ac6a9d5 100755 --- a/src/extractor/extract.ts +++ b/src/extractor/extract.ts @@ -7,6 +7,7 @@ import type { GmConfig, ExtractionResult, FinalizeResult } from "../types.ts"; import type { CompleteFn } from "../engine/llm.ts"; +import { messageTextForExtraction } from "./gate.ts"; // ─── 节点/边合法值 ────────────────────────────────────────────── @@ -233,12 +234,17 @@ export class Extractor { }): Promise { const msgs = params.messages .map(m => `[${(m.role ?? "?").toUpperCase()} t=${m.turn_index ?? 0}]\n${ - String(typeof m.content === "string" ? m.content : JSON.stringify(m.content)).slice(0, 800) + messageTextForExtraction(m).slice(0, this._cfg.extractMaxMessageChars) }`).join("\n\n---\n\n"); const raw = await this.llm( EXTRACT_SYS, EXTRACT_USER(msgs, params.existingNames.join(", ")), + { + json: true, + kind: "extract", + maxTokens: this._cfg.extractOutputMaxTokens || undefined, + }, ); if (process.env.GM_DEBUG) { @@ -250,7 +256,15 @@ export class Extractor { } async finalize(params: { sessionNodes: any[]; graphSummary: string }): Promise { - const raw = await this.llm(FINALIZE_SYS, FINALIZE_USER(params.sessionNodes, params.graphSummary)); + const raw = await this.llm( + FINALIZE_SYS, + FINALIZE_USER(params.sessionNodes, params.graphSummary), + { + json: true, + kind: "finalize", + maxTokens: this._cfg.finalizeOutputMaxTokens || undefined, + }, + ); return this.parseFinalize(raw, params.sessionNodes); } diff --git a/src/extractor/gate.ts b/src/extractor/gate.ts new file mode 100644 index 0000000..c6633bd --- /dev/null +++ b/src/extractor/gate.ts @@ -0,0 +1,128 @@ +/** + * Cheap extraction gating for high-frequency context engines. + * + * The goal is to keep all messages persisted, but avoid spending LLM calls on + * acknowledgements, heartbeats, and low-signal fragments until there is enough + * pending material to batch. + */ + +import type { GmConfig } from "../types.ts"; + +export type ExtractionDecision = + | { action: "extract"; reason: string } + | { action: "defer"; reason: string } + | { action: "skip"; reason: string }; + +export function isImmediateExtraction(decision: ExtractionDecision): boolean { + return decision.action === "extract" && decision.reason === "immediate signal"; +} + +const ACK_RE = /^(好|好的|嗯|嗯嗯|OK|ok|收到|明白|继续|谢谢|谢了|辛苦了|可以|是的|对|没事|不用|先这样|hello|hi|你好|再见)[。!!,.,\s]*$/i; +const ERROR_RE = /(error|failed|failure|exception|traceback|stack trace|exit code:\s*[1-9]|npm err|pnpm err|timeout|timed out|denied|not found|报错|错误|异常|失败|超时|拒绝访问|找不到|崩溃)/i; +const CORRECTION_RE = /(不对|不是|错了|纠正|应该是|改成|别|不要|撤销|回退|重新来|漏了)/i; +const COMPLETION_RE = /(已完成|完成了|修复了|新增了|更新了|已修改|已创建|已删除|测试通过|验证通过|run tests|tests passed|修复|实现|部署成功)/i; + +function safeJsonParse(value: string): any | null { + try { + return JSON.parse(value); + } catch { + return null; + } +} + +function textFromContent(content: unknown): string { + if (typeof content === "string") return content; + if (!Array.isArray(content)) return ""; + + return content + .map((block: any) => { + if (!block || typeof block !== "object") return ""; + if (typeof block.text === "string") return block.text; + if (typeof block.content === "string") return block.content; + return ""; + }) + .filter(Boolean) + .join("\n") + .trim(); +} + +export function messageTextForExtraction(message: any): string { + if (!message || typeof message !== "object") return ""; + + let payload: any = message; + if (typeof message.content === "string") { + const parsed = safeJsonParse(message.content); + if (parsed && typeof parsed === "object") payload = parsed; + } + + const text = textFromContent(payload.content ?? message.content); + return text + .replace(/Sender \(untrusted metadata\):\s*```json[\s\S]*?```\s*/g, "") + .replace(/^\/\w+\s+/, "") + .trim(); +} + +function roleOf(message: any): string { + if (!message || typeof message !== "object") return ""; + if (typeof message.role === "string") return message.role; + if (typeof message.content === "string") { + const parsed = safeJsonParse(message.content); + if (parsed && typeof parsed.role === "string") return parsed.role; + } + return ""; +} + +function hasImmediateSignal(message: any, text: string): boolean { + const role = roleOf(message); + if (ERROR_RE.test(text)) return true; + if (CORRECTION_RE.test(text)) return true; + if (role === "assistant" && COMPLETION_RE.test(text)) return true; + if ((role === "tool" || role === "toolResult") && ERROR_RE.test(text)) return true; + return false; +} + +function isTrivial(message: any, cfg: GmConfig): boolean { + const role = roleOf(message); + if (role !== "user" && role !== "assistant") return false; + + const text = messageTextForExtraction(message); + if (!text) return true; + if (hasImmediateSignal(message, text)) return false; + + const maxChars = cfg.extractTrivialMaxChars; + return text.length <= maxChars && (ACK_RE.test(text) || text.length <= 8); +} + +export function classifyExtractionBatch( + messages: any[], + cfg: GmConfig, + force = false, +): ExtractionDecision { + if (!messages.length) return { action: "defer", reason: "no pending messages" }; + + const texts = messages.map(messageTextForExtraction); + const totalChars = texts.reduce((sum, text) => sum + text.length, 0); + const meaningful = messages.filter((m) => !isTrivial(m, cfg)); + + if (!meaningful.length) { + return { action: "skip", reason: "only trivial messages" }; + } + + if (messages.some((m, i) => hasImmediateSignal(m, texts[i] ?? ""))) { + return { action: "extract", reason: "immediate signal" }; + } + + if (force) { + return { action: "extract", reason: "forced flush" }; + } + + if (meaningful.length >= cfg.extractBatchMinMessages) { + return { action: "extract", reason: "message batch threshold" }; + } + + if (totalChars >= cfg.extractBatchMinChars) { + return { action: "extract", reason: "character batch threshold" }; + } + + return { action: "defer", reason: "waiting for batch" }; +} diff --git a/src/graph/community.ts b/src/graph/community.ts index c9127d5..a6ebb37 100755 --- a/src/graph/community.ts +++ b/src/graph/community.ts @@ -202,6 +202,7 @@ export async function summarizeCommunities( llm: CompleteFn, embedFn?: EmbedFn, mode: SummaryMode = "incremental", + maxTokens = 0, ): Promise { pruneCommunitySummaries(db); @@ -256,6 +257,11 @@ export async function summarizeCommunities( const summary = await llm( COMMUNITY_SUMMARY_SYS, `社区成员:\n${memberText}`, + { + kind: "community_summary", + maxTokens: maxTokens || undefined, + temperature: 0, + }, ); const cleaned = summary.trim() @@ -295,4 +301,4 @@ export async function summarizeCommunities( } return generated; -} \ No newline at end of file +} diff --git a/src/graph/maintenance.ts b/src/graph/maintenance.ts index f5c4726..653422e 100755 --- a/src/graph/maintenance.ts +++ b/src/graph/maintenance.ts @@ -74,7 +74,7 @@ export async function runMaintenance( if (llm && communityResult.communities.size > 0) { try { communitySummaries = await summarizeCommunities( - db, communityResult.communities, llm, embedFn, summaryMode, + db, communityResult.communities, llm, embedFn, summaryMode, cfg.communitySummaryMaxTokens, ); if (process.env.GM_DEBUG) { console.log(` [DEBUG] maintenance: generated ${communitySummaries} community summaries`); @@ -116,4 +116,4 @@ export async function runMaintenance( communitySummaries, durationMs, }; -} \ No newline at end of file +} diff --git a/src/types.ts b/src/types.ts index 082c196..2500203 100755 --- a/src/types.ts +++ b/src/types.ts @@ -132,7 +132,40 @@ export interface GmConfig { apiKey?: string; baseURL?: string; model?: string; + maxTokens?: number; + /** Enable OpenAI-compatible JSON mode for extraction/finalize calls. */ + jsonMode?: boolean; }; + /** Monthly LLM call budget across all graph-memory LLM tasks. 0 means unlimited. */ + llmMonthlyCallBudget: number; + /** Monthly cap for community summary LLM calls. 0 means unlimited within the global budget. */ + llmMonthlyCommunitySummaryBudget: number; + /** Monthly cap for session finalize LLM calls. 0 means unlimited within the global budget. */ + llmMonthlyFinalizeBudget: number; + /** Time zone used to roll monthly/daily LLM counters. */ + llmBudgetTimeZone: string; + /** Minimum pending messages before a normal extraction batch runs. */ + extractBatchMinMessages: number; + /** Minimum pending text characters before a normal extraction batch runs. */ + extractBatchMinChars: number; + /** Short low-signal user/assistant messages at or below this length are skipped. */ + extractTrivialMaxChars: number; + /** Max characters retained from each message in extraction prompts. */ + extractMaxMessageChars: number; + /** Max unextracted messages sent to one extraction call. */ + extractMaxBatchMessages: number; + /** Quiet period before normal pending messages are flushed. 0 disables debounce. */ + extractDebounceMs: number; + /** Periodic fallback flush for pending messages. 0 disables interval flush. */ + extractFlushIntervalMs: number; + /** Max existing node names sent to the extractor for de-duplication hints. */ + extractExistingNamesLimit: number; + /** Output cap for JSON extraction calls. */ + extractOutputMaxTokens: number; + /** Output cap for session finalize calls. */ + finalizeOutputMaxTokens: number; + /** Output cap for short community summary calls. */ + communitySummaryMaxTokens: number; /** 向量去重阈值,余弦相似度超过此值视为重复 (0-1) */ dedupThreshold: number; /** PageRank 阻尼系数 */ @@ -147,6 +180,21 @@ export const DEFAULT_CONFIG: GmConfig = { recallMaxNodes: 6, recallMaxDepth: 2, freshTailCount: 10, + llmMonthlyCallBudget: 90_000, + llmMonthlyCommunitySummaryBudget: 3_000, + llmMonthlyFinalizeBudget: 3_000, + llmBudgetTimeZone: "Asia/Shanghai", + extractBatchMinMessages: 6, + extractBatchMinChars: 1600, + extractTrivialMaxChars: 40, + extractMaxMessageChars: 600, + extractMaxBatchMessages: 30, + extractDebounceMs: 45_000, + extractFlushIntervalMs: 120_000, + extractExistingNamesLimit: 80, + extractOutputMaxTokens: 0, + finalizeOutputMaxTokens: 0, + communitySummaryMaxTokens: 0, dedupThreshold: 0.90, pagerankDamping: 0.85, pagerankIterations: 20, diff --git a/test/budget.test.ts b/test/budget.test.ts new file mode 100644 index 0000000..58ee40f --- /dev/null +++ b/test/budget.test.ts @@ -0,0 +1,75 @@ +/** + * graph-memory — LLM call budget tests + */ + +import { describe, it, expect } from "vitest"; +import { createTestDb } from "./helpers.ts"; +import { DEFAULT_CONFIG } from "../src/types.ts"; +import { reserveLlmCall } from "../src/engine/budget.ts"; + +describe("monthly dynamic LLM call budget", () => { + it("allows calls until today's dynamic allowance is exhausted", () => { + const db = createTestDb(); + const cfg = { + ...DEFAULT_CONFIG, + llmMonthlyCallBudget: 1, + llmMonthlyCommunitySummaryBudget: 0, + llmMonthlyFinalizeBudget: 0, + }; + + expect(reserveLlmCall(db, cfg, "extract").allowed).toBe(true); + + const second = reserveLlmCall(db, cfg, "extract"); + expect(second.allowed).toBe(false); + expect(second.reason).toContain("budget exhausted"); + }); + + it("enforces community summary dynamic allowance without consuming skipped calls", () => { + const db = createTestDb(); + const cfg = { + ...DEFAULT_CONFIG, + llmMonthlyCallBudget: 10_000, + llmMonthlyCommunitySummaryBudget: 1, + llmMonthlyFinalizeBudget: 0, + }; + + expect(reserveLlmCall(db, cfg, "community_summary").allowed).toBe(true); + + const skipped = reserveLlmCall(db, cfg, "community_summary"); + expect(skipped.allowed).toBe(false); + expect(skipped.reason).toContain("community_summary"); + + const extract = reserveLlmCall(db, cfg, "extract"); + expect(extract.allowed).toBe(true); + expect(extract.monthUsed).toBe(2); + }); + + it("zero budgets mean unlimited for that dimension", () => { + const db = createTestDb(); + const cfg = { + ...DEFAULT_CONFIG, + llmMonthlyCallBudget: 0, + llmMonthlyCommunitySummaryBudget: 0, + llmMonthlyFinalizeBudget: 0, + }; + + for (let i = 0; i < 5; i++) { + expect(reserveLlmCall(db, cfg, "community_summary").allowed).toBe(true); + } + }); + + it("unused monthly calls raise the dynamic daily allowance later in the month", () => { + const db = createTestDb(); + const cfg = { + ...DEFAULT_CONFIG, + llmMonthlyCallBudget: 90_000, + llmMonthlyCommunitySummaryBudget: 0, + llmMonthlyFinalizeBudget: 0, + }; + + const reservation = reserveLlmCall(db, cfg, "extract"); + expect(reservation.allowed).toBe(true); + expect(reservation.todayLimit).toBeGreaterThan(0); + expect(reservation.todayLimit).toBeLessThanOrEqual(90_000); + }); +}); diff --git a/test/gate.test.ts b/test/gate.test.ts new file mode 100644 index 0000000..c9b8738 --- /dev/null +++ b/test/gate.test.ts @@ -0,0 +1,106 @@ +/** + * graph-memory — extraction gating tests + */ + +import { describe, it, expect } from "vitest"; +import { DEFAULT_CONFIG } from "../src/types.ts"; +import { classifyExtractionBatch, isImmediateExtraction, messageTextForExtraction } from "../src/extractor/gate.ts"; +import { Extractor } from "../src/extractor/extract.ts"; + +function row(role: string, text: string, turn = 1): any { + return { + role, + turn_index: turn, + content: JSON.stringify({ role, content: [{ type: "text", text }] }), + }; +} + +describe("extraction gating", () => { + it("skips only trivial acknowledgements", () => { + const decision = classifyExtractionBatch( + [row("user", "好的"), row("assistant", "收到")], + DEFAULT_CONFIG, + ); + + expect(decision.action).toBe("skip"); + }); + + it("defers low-signal messages below batch thresholds", () => { + const decision = classifyExtractionBatch( + [ + row("user", "我们后面再看一下这个插件的成本问题"), + row("assistant", "可以,我先记住这个方向。", 2), + ], + DEFAULT_CONFIG, + ); + + expect(decision.action).toBe("defer"); + }); + + it("extracts once enough meaningful messages are pending", () => { + const messages = Array.from({ length: DEFAULT_CONFIG.extractBatchMinMessages }, (_, i) => + row(i % 2 === 0 ? "user" : "assistant", `关于插件优化的第 ${i} 条有效上下文`, i + 1), + ); + + const decision = classifyExtractionBatch(messages, DEFAULT_CONFIG); + expect(decision.action).toBe("extract"); + expect(isImmediateExtraction(decision)).toBe(false); + }); + + it("extracts immediately on errors and corrections", () => { + const errorDecision = classifyExtractionBatch( + [row("toolResult", "Exit code: 1\nError: failed to connect")], + DEFAULT_CONFIG, + ); + expect(errorDecision.action).toBe("extract"); + expect(isImmediateExtraction(errorDecision)).toBe(true); + + const correctionDecision = classifyExtractionBatch( + [row("user", "不对,这里应该改成批量抽取")], + DEFAULT_CONFIG, + ); + expect(correctionDecision.action).toBe("extract"); + expect(isImmediateExtraction(correctionDecision)).toBe(true); + }); + + it("force flush extracts meaningful pending messages", () => { + const decision = classifyExtractionBatch( + [row("user", "这个会话里有一个需要保留的实现思路")], + DEFAULT_CONFIG, + true, + ); + + expect(decision.action).toBe("extract"); + }); + + it("normalizes stored OpenClaw message JSON to plain text", () => { + const message = row("user", "Sender (untrusted metadata):\n```json\n{\"x\":1}\n```\n真正内容"); + expect(messageTextForExtraction(message)).toBe("真正内容"); + }); +}); + +describe("Extractor prompt controls", () => { + it("caps per-message prompt text and requests bounded JSON output", async () => { + let seenUser = ""; + let seenOptions: any; + const cfg = { + ...DEFAULT_CONFIG, + extractMaxMessageChars: 10, + extractOutputMaxTokens: 321, + }; + const ext = new Extractor(cfg, async (_system, user, options) => { + seenUser = user; + seenOptions = options; + return '{"nodes":[],"edges":[]}'; + }); + + await ext.extract({ + messages: [row("user", "abcdefghijklmnopqrstuvwxyz")], + existingNames: [], + }); + + expect(seenUser).toContain("abcdefghij"); + expect(seenUser).not.toContain("klmnop"); + expect(seenOptions).toMatchObject({ json: true, maxTokens: 321 }); + }); +}); diff --git a/test/helpers.ts b/test/helpers.ts index aa3eace..6432260 100755 --- a/test/helpers.ts +++ b/test/helpers.ts @@ -113,6 +113,14 @@ export function createTestDb(): DatabaseSyncInstance { ); `); + // m7: 元数据 + db.exec(` + CREATE TABLE IF NOT EXISTS gm_meta ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ); + `); + return db; } @@ -176,4 +184,4 @@ export function insertEdge( "test-session", Date.now(), ); -} \ No newline at end of file +}