From 833304afd612fa763f76ac2f733d2cf6d9598230 Mon Sep 17 00:00:00 2001 From: YumemiDream Date: Tue, 16 Jun 2026 19:16:42 +0800 Subject: [PATCH 1/2] fix: replace window.confirm with custom modal in embedded page The embedded Plugin Page runs inside a sandboxed iframe without 'allow-modals', so window.confirm() calls are silently blocked by the browser. This caused all batch operations (approve/reject/delete) on the reviews, jargon, and expression-learning pages to appear unresponsive. Replace all window.confirm() calls with a custom showConfirm() function that renders a confirmation dialog using the existing element, which works within sandbox restrictions. --- pages/dashboard/app.js | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/pages/dashboard/app.js b/pages/dashboard/app.js index b2a089b9..16a4d441 100644 --- a/pages/dashboard/app.js +++ b/pages/dashboard/app.js @@ -408,6 +408,34 @@ else modal.removeAttribute("open"); } + function showConfirm(title, message, confirmText) { + return new Promise((resolve) => { + const modal = $("detail-modal"); + if (!modal) { resolve(window.confirm(message)); return; } + setText("modal-title", title); + setHtml("modal-body", ` +

${escapeHtml(message)}

+
+ + +
+ `); + const done = (result) => { + if (typeof modal.close === "function") modal.close(); + else modal.removeAttribute("open"); + resolve(result); + }; + $("confirm-ok").addEventListener("click", () => done(true), { once: true }); + $("confirm-cancel").addEventListener("click", () => done(false), { once: true }); + $("modal-close").addEventListener("click", () => done(false), { once: true }); + modal.addEventListener("cancel", (e) => { e.preventDefault(); done(false); }, { once: true }); + if (typeof modal.showModal === "function") { + try { modal.showModal(); return; } catch (_) {} + } + modal.setAttribute("open", ""); + }); + } + function resolvePageFromHash() { const raw = window.location.hash.replace(/^#\/?/, ""); return PAGE_META[raw] ? raw : "home"; @@ -1095,11 +1123,11 @@ const typeText = { persona: t("reviews.personaUpdates", "人格更新"), style: t("reviews.expressionReviews", "表达审查"), jargon: t("reviews.jargonCandidates", "黑话候选") }[kind] || t("reviews.items", "审查项"); const actionText = action === "approve" ? t("actions.pass", "通过") : action === "reject" ? t("actions.reject", "拒绝") : t("actions.delete", "删除"); const scopeText = selectedReviewIds(kind).length ? t("selection.selected", "选中") : t("selection.currentPage", "当前页"); - if (!window.confirm(t("reviews.confirmBatch", "确定批量{action}{scope} {count} 条{type}?") + if (!await showConfirm(t("reviews.batchConfirmTitle", "批量操作确认"), t("reviews.confirmBatch", "确定批量{action}{scope} {count} 条{type}?") .replace("{action}", actionText) .replace("{scope}", scopeText) .replace("{count}", fmt(ids.length, 0)) - .replace("{type}", typeText))) return; + .replace("{type}", typeText), actionText)) return; const payload = { action: action === "delete" @@ -1305,7 +1333,7 @@ return; } const actionText = action === "approve" ? t("actions.confirm", "确认") : action === "reject" ? t("actions.rejectBack", "驳回") : t("actions.delete", "删除"); - if (!window.confirm(t("jargon.confirmBatch", "确定批量{action}选中的 {count} 条黑话?").replace("{action}", actionText).replace("{count}", fmt(ids.length, 0)))) return; + if (!await showConfirm(t("jargon.batchConfirmTitle", "批量操作确认"), t("jargon.confirmBatch", "确定批量{action}选中的 {count} 条黑话?").replace("{action}", actionText).replace("{count}", fmt(ids.length, 0)), actionText)) return; const result = await apiPost("jargon/action", { action: action === "delete" ? "batch_delete" : "batch_review", @@ -1367,7 +1395,7 @@ return; } const actionText = action === "approve" ? t("actions.approve", "批准") : action === "reject" ? t("actions.reject", "拒绝") : t("actions.delete", "删除"); - if (!window.confirm(t("style.confirmBatch", "确定批量{action}选中的 {count} 条表达审查?").replace("{action}", actionText).replace("{count}", fmt(ids.length, 0)))) return; + if (!await showConfirm(t("style.batchConfirmTitle", "批量操作确认"), t("style.confirmBatch", "确定批量{action}选中的 {count} 条表达审查?").replace("{action}", actionText).replace("{count}", fmt(ids.length, 0)), actionText)) return; const result = await apiPost("style/action", { action: action === "delete" ? "batch_delete" : "batch_review", From 824ac4a01ccd1490685158ab8bf17c7c891c4532 Mon Sep 17 00:00:00 2001 From: YumemiAI <71859504+YumemiDream@users.noreply.github.com> Date: Tue, 16 Jun 2026 19:41:24 +0800 Subject: [PATCH 2/2] fix: skip re-learning jargon that has been manually edited or fully inferred When the v2 learning system encounters a jargon term that already exists and is_complete=True (manually edited or fully inferred), skip the save to prevent overwriting the user's edits. Also preserve the existing count when updating an existing record, instead of resetting it to 1. Fixes: manually edited jargon meanings get overwritten by re-learning. --- .../core_learning/v2_learning_integration.py | 1887 +++++++++-------- 1 file changed, 951 insertions(+), 936 deletions(-) diff --git a/services/core_learning/v2_learning_integration.py b/services/core_learning/v2_learning_integration.py index 5f0a1c85..4f90397c 100644 --- a/services/core_learning/v2_learning_integration.py +++ b/services/core_learning/v2_learning_integration.py @@ -1,936 +1,951 @@ -""" -V2 learning integration layer. - -Wires together the v2-architecture modules and provides a unified -interface for the ``MaiBotEnhancedLearningManager`` to delegate to. -When v2 features are enabled in ``PluginConfig`` the learning manager -instantiates this class and calls its ``process_message`` and -``get_enhanced_context`` methods alongside (or instead of) the legacy -code paths. - -Modules orchestrated: - * ``TieredLearningTrigger`` — per-message / batch operation scheduling - * ``LightRAGKnowledgeManager`` — knowledge graph (replaces legacy) - * ``Mem0MemoryManager`` — memory management (replaces legacy) - * ``ExemplarLibrary`` — few-shot style exemplar retrieval - * ``SocialGraphAnalyzer`` — community detection / influence ranking - * ``JargonStatisticalFilter`` — statistical jargon pre-filter - * ``IRerankProvider`` — cross-source context reranking - -Design notes: - - All module construction is guarded by the relevant config flags so - that unused modules are never instantiated. - - ``start()`` / ``stop()`` manage the full lifecycle of every active - v2 module. - - Each module that can fail during construction logs a warning and - falls back gracefully (the integration layer keeps working with - the remaining modules). - - Thread-safe for single-event-loop asyncio usage. -""" - -import asyncio -import hashlib -import time -from collections import defaultdict -from typing import Any, Dict, List, Optional, Tuple - -from astrbot.api import logger - -from ...config import PluginConfig -from ...core.interfaces import MessageData -from ...utils.cache_manager import get_cache_manager -from ..monitoring.instrumentation import monitored -from ..quality import ( - BatchTriggerPolicy, - TieredLearningTrigger, - TriggerResult, -) - -# Minimum message length to consider for LLM-heavy ingestion operations. -_MIN_INGESTION_LENGTH = 15 - -# Maximum buffered messages per group before force-flushing. -_INGESTION_BUFFER_MAX = 10 - - -class V2LearningIntegration: - """Facade that initialises, wires, and exposes v2 learning modules. - - Usage:: - - v2 = V2LearningIntegration(config, llm_adapter, db_manager, context) - await v2.start() - result = await v2.process_message(message, group_id) - context = await v2.get_enhanced_context("query", group_id) - await v2.stop() - """ - - def __init__( - self, - config: PluginConfig, - llm_adapter: Optional[Any] = None, - db_manager: Optional[Any] = None, - context: Optional[Any] = None, - feature_delegation: Optional[Any] = None, - ) -> None: - self._config = config - self._llm = llm_adapter - self._db = db_manager - self._context = context - self._feature_delegation = feature_delegation - self._started = False - self._provider_retry_lock = asyncio.Lock() - self._last_provider_retry: float = 0.0 - self._provider_retry_interval: float = max( - 0.1, - float(getattr(config, "provider_retry_interval_seconds", 10.0) or 10.0), - ) - self._knowledge_manager_retryable = True - self._memory_manager_retryable = True - - # --- Resolve framework providers via factories --------------- - self._embedding_provider = self._create_embedding_provider() - self._rerank_provider = self._create_rerank_provider() - - # --- Instantiate v2 modules ---------------------------------- - self._knowledge_manager = self._create_knowledge_manager() - self._memory_manager = self._create_memory_manager() - self._exemplar_library = self._create_exemplar_library() - self._social_analyzer = self._create_social_analyzer() - self._jargon_filter = self._create_jargon_filter() - - # --- Query result cache via CacheManager ---------------------- - self._cache = get_cache_manager() - - # --- Message buffer for batch ingestion ----------------------- - # Knowledge and memory ingestion are LLM-heavy operations and - # must not run per-message. Instead, messages are buffered here - # and flushed as a batch in a Tier 2 operation. - self._ingestion_buffer: Dict[str, List[MessageData]] = defaultdict(list) - - # --- Tiered trigger ------------------------------------------ - self._trigger = TieredLearningTrigger() - self._register_trigger_operations() - - logger.info( - "[V2Integration] Initialised — " - f"knowledge={self._config.knowledge_engine}, " - f"memory={self._config.memory_engine}, " - f"memory_delegated={self._memory_delegated()}, " - f"embedding={'yes' if self._embedding_provider else 'no'}, " - f"reranker={'yes' if self._rerank_provider else 'no'}" - ) - - # Lifecycle - - async def start(self) -> None: - """Start all active v2 modules that expose a ``start`` method.""" - await self.refresh_provider_bindings(force=True) - - await asyncio.gather(*( - self._start_one(name, module) - for name, module in self._active_modules() - if module and hasattr(module, "start") - )) - self._started = True - logger.info("[V2Integration] All modules started") - - async def refresh_provider_bindings(self, *, force: bool = False) -> bool: - """Retry framework provider binding and create dependent modules. - - AstrBot can load plugins before provider registries are populated. This - lets startup, warmup, and first-use paths bind providers later without a - manual plugin reload. - """ - if not self._needs_provider_or_module_retry(): - return False - - if not force and not self._provider_retry_due(): - return False - - async with self._provider_retry_lock: - if not self._needs_provider_or_module_retry(): - return False - - if not force and not self._provider_retry_due(): - return False - self._last_provider_retry = time.monotonic() - - changed = False - modules_to_start: List[Tuple[str, Any]] = [] - - if not self._embedding_provider and self._embedding_provider_configured(): - provider = self._create_embedding_provider() - if provider: - self._embedding_provider = provider - changed = True - - if not self._rerank_provider and self._rerank_provider_configured(): - provider = self._create_rerank_provider() - if provider: - self._rerank_provider = provider - changed = True - - if self._embedding_provider: - if self._knowledge_manager is None: - self._knowledge_manager = self._create_knowledge_manager() - if self._knowledge_manager: - changed = True - modules_to_start.append(( - "knowledge_manager", - self._knowledge_manager, - )) - - if self._memory_manager is None: - self._memory_manager = self._create_memory_manager() - if self._memory_manager: - changed = True - modules_to_start.append(( - "memory_manager", - self._memory_manager, - )) - - if self._exemplar_library_needs_embedding_refresh(): - self._exemplar_library = self._create_exemplar_library() - changed = True - - if changed: - self._register_trigger_operations() - if self._started and modules_to_start: - await asyncio.gather(*( - self._start_one(name, module) - for name, module in modules_to_start - if module and hasattr(module, "start") - )) - logger.info( - "[V2Integration] Provider bindings refreshed — " - f"embedding={'yes' if self._embedding_provider else 'no'}, " - f"reranker={'yes' if self._rerank_provider else 'no'}" - ) - return changed - - def _provider_retry_due(self) -> bool: - return ( - time.monotonic() - self._last_provider_retry - >= self._provider_retry_interval - ) - - async def _start_one(self, name: str, module: Any) -> None: - try: - await module.start() - except Exception as exc: - logger.warning( - f"[V2Integration] {name} start failed: {exc}" - ) - - def _active_modules(self) -> List[Tuple[str, Any]]: - return [ - ("knowledge_manager", self._knowledge_manager), - ("memory_manager", self._memory_manager), - ("exemplar_library", self._exemplar_library), - ("social_analyzer", self._social_analyzer), - ("jargon_filter", self._jargon_filter), - ] - - def _needs_provider_or_module_retry(self) -> bool: - if self._embedding_provider_configured() and not self._embedding_provider: - return True - if self._rerank_provider_configured() and not self._rerank_provider: - return True - if self._embedding_provider and self._knowledge_manager is None: - return ( - self._config.knowledge_engine == "lightrag" - and self._knowledge_manager_retryable - ) - if self._embedding_provider and self._memory_manager is None: - return ( - self._config.memory_engine == "mem0" - and not self._memory_delegated() - and self._memory_manager_retryable - ) - return self._exemplar_library_needs_embedding_refresh() - - def _embedding_provider_configured(self) -> bool: - return bool( - str(getattr(self._config, "embedding_provider_id", "") or "").strip() - ) - - def _rerank_provider_configured(self) -> bool: - return bool( - str(getattr(self._config, "rerank_provider_id", "") or "").strip() - ) - - def _exemplar_library_needs_embedding_refresh(self) -> bool: - if not (self._db and self._embedding_provider and self._exemplar_library): - return False - return getattr(self._exemplar_library, "_embedding", None) is None - - async def warmup(self, group_ids: List[str]) -> None: - """Pre-warm heavyweight module instances for *group_ids*. - - Should be called shortly after ``start()`` once active group IDs - are known. Currently only LightRAG benefits from pre-warming - (each cold-start avoids a 12-15s initialisation penalty on the - first user query). - """ - await self.refresh_provider_bindings() - if ( - self._knowledge_manager - and hasattr(self._knowledge_manager, "warmup_instances") - ): - try: - await self._knowledge_manager.warmup_instances(group_ids) - except Exception as exc: - logger.debug( - f"[V2Integration] Knowledge warmup failed: {exc}" - ) - - async def stop(self) -> None: - """Stop all active v2 modules and release resources. - - Attempts to flush remaining buffered messages with a per-group - timeout. Timed-out buffers are discarded to avoid blocking - the shutdown sequence. - """ - _flush_timeout = self._config.task_cancel_timeout - - for group_id in list(self._ingestion_buffer.keys()): - try: - await asyncio.wait_for( - self._flush_ingestion_buffer(group_id), - timeout=_flush_timeout, - ) - except asyncio.TimeoutError: - dropped = len(self._ingestion_buffer.pop(group_id, [])) - logger.warning( - f"[V2Integration] Buffer flush timeout for group " - f"{group_id}, dropped {dropped} messages" - ) - except Exception as exc: - logger.warning( - f"[V2Integration] Buffer flush failed on stop " - f"for group {group_id}: {exc}" - ) - - modules: List[Tuple[str, Any]] = [ - ("knowledge_manager", self._knowledge_manager), - ("memory_manager", self._memory_manager), - ("exemplar_library", self._exemplar_library), - ("social_analyzer", self._social_analyzer), - ("jargon_filter", self._jargon_filter), - ] - - async def _stop_one(name: str, module: Any) -> None: - try: - await module.stop() - except Exception as exc: - logger.warning( - f"[V2Integration] {name} stop failed: {exc}" - ) - - async def _close_reranker() -> None: - try: - await self._rerank_provider.close() - except Exception as exc: - logger.warning(f"[V2Integration] Reranker close failed: {exc}") - - tasks = [ - _stop_one(name, module) - for name, module in modules - if module and hasattr(module, "stop") - ] - if self._rerank_provider and hasattr(self._rerank_provider, "close"): - tasks.append(_close_reranker()) - - await asyncio.gather(*tasks) - logger.info("[V2Integration] All modules stopped") - - # Public API - - @monitored - async def process_message( - self, message: MessageData, group_id: str - ) -> TriggerResult: - """Process an incoming message through the tiered trigger. - - Tier 1 operations run concurrently on every message. Tier 2 - operations fire when their policies are satisfied. - """ - await self.refresh_provider_bindings() - return await self._trigger.process_message(message, group_id) - - @monitored - async def get_enhanced_context( - self, - query: str, - group_id: str, - top_k: int = 5, - ) -> Dict[str, Any]: - """Retrieve v2 enhanced context for response generation. - - Returns a dict with optional keys: - * ``knowledge_context`` (str): Retrieved knowledge graph context. - * ``related_memories`` (List[str]): Semantically related memories. - * ``few_shot_examples`` (List[str]): Style exemplar texts - (not reranked; returned as-is). - * ``graph_stats`` (dict): Social graph summary statistics. - - When a reranker is available, knowledge and memory candidates are - reranked by relevance and only the top-k are returned. Few-shot - exemplars and graph stats are returned unmodified. - - Results are cached per (group_id, query_hash) with a configurable - TTL to avoid redundant retrieval on repeated or similar queries. - - All retrieval tasks run concurrently via ``asyncio.gather`` to - minimise total latency. - """ - await self.refresh_provider_bindings() - - # --- Check query result cache --- - cache_key = self._make_cache_key(query, group_id) - cached_result = self._cache.get("context", cache_key) - if cached_result is not None: - logger.debug( - f"[V2Integration] Context cache hit (group={group_id})" - ) - return cached_result - - context: Dict[str, Any] = {} - - # --- Build concurrent retrieval tasks --- - - async def _fetch_knowledge() -> None: - if not self._knowledge_manager: - return - try: - if hasattr(self._knowledge_manager, "query_knowledge"): - ctx = await self._knowledge_manager.query_knowledge( - query, group_id, - mode=self._config.lightrag_query_mode, - ) - elif hasattr( - self._knowledge_manager, - "answer_question_with_knowledge_graph", - ): - ctx = ( - await self._knowledge_manager - .answer_question_with_knowledge_graph(query, group_id) - ) - else: - ctx = "" - if ctx: - context["knowledge_context"] = ctx - except Exception as exc: - logger.debug( - f"[V2Integration] Knowledge retrieval failed: {exc}" - ) - - async def _fetch_memories() -> None: - if self._memory_delegated(): - logger.debug("[V2Integration] Memory retrieval delegated to LivingMemory") - return - if not self._memory_manager: - return - try: - memories = await self._memory_manager.get_related_memories( - query, group_id - ) - if memories: - context["related_memories"] = memories - except Exception as exc: - logger.debug( - f"[V2Integration] Memory retrieval failed: {exc}" - ) - - async def _fetch_exemplars() -> None: - if not self._exemplar_library: - return - try: - examples = await self._exemplar_library.get_few_shot_examples( - query, group_id, k=top_k - ) - if examples: - context["few_shot_examples"] = examples - except Exception as exc: - logger.debug( - f"[V2Integration] Exemplar retrieval failed: {exc}" - ) - - async def _fetch_graph_stats() -> None: - if not self._social_analyzer: - return - try: - stats = await self._social_analyzer.get_graph_statistics( - group_id - ) - if stats and stats.get("node_count", 0) > 0: - context["graph_stats"] = stats - except Exception as exc: - logger.debug( - f"[V2Integration] Social graph stats failed: {exc}" - ) - - # --- Run all retrievals concurrently --- - await asyncio.gather( - _fetch_knowledge(), - _fetch_memories(), - _fetch_exemplars(), - _fetch_graph_stats(), - ) - - # --- Conditional reranking --- - # Only invoke the reranker when there are enough candidates to - # justify the additional API round-trip latency. - rerank_candidates = len(context.get("related_memories", [])) - if "knowledge_context" in context: - rerank_candidates += 1 - min_candidates = getattr( - self._config, "rerank_min_candidates", 3 - ) - if self._rerank_provider and rerank_candidates >= min_candidates: - context = await self._rerank_context(query, context, top_k) - - # --- Store result in cache --- - self._cache.set("context", cache_key, context) - - return context - - # Cache helpers - - @staticmethod - def _make_cache_key(query: str, group_id: str) -> str: - """Generate a compact cache key from query text and group ID.""" - query_hash = hashlib.md5(query.encode("utf-8")).hexdigest()[:12] - return f"{group_id}:{query_hash}" - - def get_trigger_stats(self, group_id: str) -> Dict[str, Any]: - """Return tiered trigger statistics for a group.""" - return self._trigger.get_group_stats(group_id) - - # Module factories - - def _create_embedding_provider(self) -> Optional[Any]: - """Resolve embedding provider from the framework.""" - try: - from ..embedding.factory import EmbeddingProviderFactory - return EmbeddingProviderFactory.create(self._config, self._context) - except Exception as exc: - logger.debug( - f"[V2Integration] Embedding provider unavailable: {exc}" - ) - return None - - def _create_rerank_provider(self) -> Optional[Any]: - """Resolve reranker provider from the framework.""" - try: - from ..reranker.factory import RerankProviderFactory - return RerankProviderFactory.create(self._config, self._context) - except Exception as exc: - logger.debug(f"[V2Integration] Reranker unavailable: {exc}") - return None - - def _create_knowledge_manager(self) -> Optional[Any]: - """Create knowledge manager based on configured engine.""" - if self._config.knowledge_engine == "lightrag": - if not self._embedding_provider: - if self._embedding_provider_configured(): - logger.info( - "[V2Integration] LightRAG is waiting for the " - "embedding provider registry to become ready" - ) - else: - logger.warning( - "[V2Integration] LightRAG requires an embedding " - "provider; configure embedding_provider_id or use " - "the legacy knowledge engine" - ) - return None - try: - from ..integration import LightRAGKnowledgeManager - return LightRAGKnowledgeManager( - self._config, self._llm, self._embedding_provider - ) - except ImportError: - self._knowledge_manager_retryable = False - logger.warning( - "[V2Integration] lightrag-hku not installed, " - "falling back to legacy knowledge engine" - ) - except Exception as exc: - logger.warning( - f"[V2Integration] LightRAG init failed: {exc}" - ) - logger.debug( - "[V2Integration] LightRAG traceback:", exc_info=True - ) - return None - - def _create_memory_manager(self) -> Optional[Any]: - """Create memory manager based on configured engine.""" - if self._memory_delegated(): - logger.info("[V2Integration] Memory engine skipped: delegated to LivingMemory") - return None - if self._config.memory_engine == "mem0": - if not self._embedding_provider: - if self._embedding_provider_configured(): - logger.info( - "[V2Integration] Mem0 is waiting for the embedding " - "provider registry to become ready" - ) - else: - logger.warning( - "[V2Integration] Mem0 requires an embedding provider; " - "configure embedding_provider_id or use the legacy " - "memory engine" - ) - return None - try: - from ..integration import Mem0MemoryManager - return Mem0MemoryManager( - self._config, self._llm, self._embedding_provider - ) - except ImportError: - self._memory_manager_retryable = False - logger.warning( - "[V2Integration] mem0ai not installed, " - "falling back to legacy memory engine" - ) - except Exception as exc: - logger.warning( - f"[V2Integration] Mem0 init failed: {exc}" - ) - logger.debug( - "[V2Integration] Mem0 traceback:", exc_info=True - ) - return None - - def _create_exemplar_library(self) -> Optional[Any]: - """Create exemplar library if DB and embedding are available.""" - if not self._db: - return None - try: - from ..integration import ExemplarLibrary - return ExemplarLibrary(self._db, self._embedding_provider) - except Exception as exc: - logger.debug( - f"[V2Integration] ExemplarLibrary init failed: {exc}" - ) - return None - - def _create_social_analyzer(self) -> Optional[Any]: - """Create social graph analyzer.""" - try: - from ..social import SocialGraphAnalyzer - return SocialGraphAnalyzer(self._llm, self._db) - except Exception as exc: - logger.debug( - f"[V2Integration] SocialGraphAnalyzer init failed: {exc}" - ) - return None - - def _create_jargon_filter(self) -> Optional[Any]: - """Create jargon statistical filter.""" - try: - from ..jargon import JargonStatisticalFilter - return JargonStatisticalFilter() - except Exception as exc: - logger.debug( - f"[V2Integration] JargonStatisticalFilter init failed: {exc}" - ) - return None - - # Trigger wiring - - def _register_trigger_operations(self) -> None: - """Register all available modules with the tiered trigger. - - Architecture: - Tier 1 (per-message, sub-millisecond): - - jargon_stats: in-memory statistical counters - - ingestion_buffer: append message to buffer (no I/O) - - exemplar: embedding + DB insert (< 1s) - - Tier 2 (batch, LLM-gated, cooldown-protected): - - ingestion_flush: batch-process buffered messages through - LightRAG and Mem0, amortising LLM overhead across - multiple messages - - jargon: LLM-based jargon meaning inference - - social: community detection and influence ranking - - Knowledge graph ingestion (LightRAG) and memory ingestion (Mem0) - are intentionally registered as Tier 2 batch operations rather - than Tier 1 per-message callbacks because they each invoke one - or more LLM round-trips (entity extraction, fact extraction) - that take 3-10 seconds per message. Running them per-message - would dominate the event loop and block subsequent processing. - """ - - # ---- Tier 1: per-message lightweight operations ---- - - if self._jargon_filter: - jf = self._jargon_filter - - async def _jargon_update( - message: MessageData, group_id: str - ) -> None: - jf.update_from_message(message.message, group_id, message.sender_id) - - self._trigger.register_tier1("jargon_stats", _jargon_update) - - # Buffer messages for batch ingestion (knowledge + memory). - # This replaces the previous per-message LightRAG/Mem0 callbacks - # with a sub-millisecond append operation. - if self._knowledge_manager or self._memory_manager: - buf = self._ingestion_buffer - - async def _buffer_message( - message: MessageData, group_id: str - ) -> None: - if ( - message.message - and len(message.message.strip()) >= _MIN_INGESTION_LENGTH - ): - buf[group_id].append(message) - - self._trigger.register_tier1("ingestion_buffer", _buffer_message) - - if self._exemplar_library: - lib = self._exemplar_library - - async def _exemplar_add( - message: MessageData, group_id: str - ) -> None: - await lib.add_exemplar( - message.message, group_id, message.sender_id - ) - - self._trigger.register_tier1("exemplar", _exemplar_add) - - # ---- Tier 2: batch operations (LLM-heavy) ---- - - # Batch ingestion: flush buffered messages through LightRAG - # and Mem0. Fires every 5 messages or 60 seconds, whichever - # comes first. This amortises the per-message LLM overhead - # and reduces total API calls. - if self._knowledge_manager or self._memory_manager: - self._trigger.register_tier2( - "ingestion_flush", - self._flush_ingestion_buffer, - BatchTriggerPolicy( - message_threshold=5, cooldown_seconds=60 - ), - ) - - if self._jargon_filter: - jf2 = self._jargon_filter - llm = self._llm - db = self._db - - async def _jargon_batch(group_id: str) -> None: - candidates = jf2.get_jargon_candidates(group_id, top_k=20) - if not candidates or not llm: - return - for candidate in candidates[:10]: - try: - meaning = await llm.generate_response( - f"Explain the slang/jargon term " - f"'{candidate['term']}' in the context of an " - f"online chat group. Return a concise definition.", - model_type="filter", - ) - if ( - meaning - and db - and hasattr(db, "save_or_update_jargon") - ): - await db.save_or_update_jargon( - group_id, - candidate["term"], - { - "meaning": meaning, - "raw_content": "[]", - "is_jargon": True, - "count": 1, - "is_complete": True, - }, - ) - except Exception as exc: - logger.debug( - f"[V2Integration] Jargon inference failed " - f"for '{candidate['term']}': {exc}" - ) - - self._trigger.register_tier2( - "jargon", - _jargon_batch, - BatchTriggerPolicy( - message_threshold=20, cooldown_seconds=180 - ), - ) - - if self._social_analyzer: - sa = self._social_analyzer - - async def _social_batch(group_id: str) -> None: - # Execute independently so one failure does not skip the other. - try: - await sa.detect_communities(group_id) - except Exception as exc: - logger.debug( - f"[V2Integration] detect_communities failed: {exc}" - ) - try: - await sa.get_influence_ranking(group_id) - except Exception as exc: - logger.debug( - f"[V2Integration] get_influence_ranking failed: {exc}" - ) - - self._trigger.register_tier2( - "social", - _social_batch, - BatchTriggerPolicy( - message_threshold=50, cooldown_seconds=600 - ), - ) - - # Batch ingestion - - async def _flush_ingestion_buffer(self, group_id: str) -> None: - """Flush buffered messages for a group through knowledge and memory. - - Processes all buffered messages concurrently through LightRAG and - Mem0 in a single batch operation, then clears the buffer. Messages - within each engine are processed sequentially to avoid overwhelming - the underlying LLM providers with concurrent requests. - """ - messages = self._ingestion_buffer.pop(group_id, []) - if not messages: - return - - logger.debug( - f"[V2Integration] Flushing ingestion buffer: " - f"group={group_id}, count={len(messages)}" - ) - - async def _ingest_knowledge() -> None: - if not self._knowledge_manager: - return - method = None - if hasattr( - self._knowledge_manager, - "process_message_for_knowledge_graph", - ): - method = self._knowledge_manager.process_message_for_knowledge_graph - elif hasattr( - self._knowledge_manager, "process_message_for_knowledge" - ): - method = self._knowledge_manager.process_message_for_knowledge - if not method: - return - for msg in messages: - try: - await method(msg, group_id) - except Exception as exc: - logger.debug( - f"[V2Integration] Knowledge ingestion failed: {exc}" - ) - - async def _ingest_memory() -> None: - if self._memory_delegated(): - logger.debug("[V2Integration] Memory ingestion delegated to LivingMemory") - return - if not self._memory_manager: - return - for msg in messages: - try: - await self._memory_manager.add_memory_from_message( - msg, group_id - ) - except Exception as exc: - logger.debug( - f"[V2Integration] Memory ingestion failed: {exc}" - ) - - # Run knowledge and memory ingestion concurrently across engines, - # but sequentially within each engine to avoid provider overload. - await asyncio.gather( - _ingest_knowledge(), - _ingest_memory(), - ) - - def _memory_delegated(self) -> bool: - delegation = self._feature_delegation - if not delegation or not hasattr(delegation, "should_delegate_memory"): - return False - try: - return bool(delegation.should_delegate_memory()) - except Exception: - return False - - # Reranking - - @monitored - async def _rerank_context( - self, - query: str, - context: Dict[str, Any], - top_k: int, - ) -> Dict[str, Any]: - """Rerank knowledge and memory candidates by relevance. - - Few-shot exemplars and graph stats are returned unmodified. - """ - try: - documents: List[str] = [] - sources: List[str] = [] - - if "knowledge_context" in context: - documents.append(context["knowledge_context"]) - sources.append("knowledge") - - for mem in context.get("related_memories", []): - documents.append(mem) - sources.append("memory") - - if not documents: - return context - - results = await self._rerank_provider.rerank( - query, documents, top_n=top_k - ) - - # Rebuild context with reranked order. - reranked_memories: List[str] = [] - reranked_knowledge = "" - for r in results: - if r.index >= len(documents): - logger.debug( - f"[V2Integration] Reranker returned out-of-range " - f"index {r.index} (len={len(documents)}); skipping" - ) - continue - src = sources[r.index] - doc = documents[r.index] - if src == "knowledge": - reranked_knowledge = doc - elif src == "memory": - reranked_memories.append(doc) - - if reranked_knowledge: - context["knowledge_context"] = reranked_knowledge - elif "knowledge_context" in context: - del context["knowledge_context"] - - if reranked_memories: - context["related_memories"] = reranked_memories - elif "related_memories" in context: - del context["related_memories"] - - except Exception as exc: - logger.debug( - f"[V2Integration] Reranking failed, using unranked: {exc}" - ) - - return context +""" +V2 learning integration layer. + +Wires together the v2-architecture modules and provides a unified +interface for the ``MaiBotEnhancedLearningManager`` to delegate to. +When v2 features are enabled in ``PluginConfig`` the learning manager +instantiates this class and calls its ``process_message`` and +``get_enhanced_context`` methods alongside (or instead of) the legacy +code paths. + +Modules orchestrated: + * ``TieredLearningTrigger`` — per-message / batch operation scheduling + * ``LightRAGKnowledgeManager`` — knowledge graph (replaces legacy) + * ``Mem0MemoryManager`` — memory management (replaces legacy) + * ``ExemplarLibrary`` — few-shot style exemplar retrieval + * ``SocialGraphAnalyzer`` — community detection / influence ranking + * ``JargonStatisticalFilter`` — statistical jargon pre-filter + * ``IRerankProvider`` — cross-source context reranking + +Design notes: + - All module construction is guarded by the relevant config flags so + that unused modules are never instantiated. + - ``start()`` / ``stop()`` manage the full lifecycle of every active + v2 module. + - Each module that can fail during construction logs a warning and + falls back gracefully (the integration layer keeps working with + the remaining modules). + - Thread-safe for single-event-loop asyncio usage. +""" + +import asyncio +import hashlib +import time +from collections import defaultdict +from typing import Any, Dict, List, Optional, Tuple + +from astrbot.api import logger + +from ...config import PluginConfig +from ...core.interfaces import MessageData +from ...utils.cache_manager import get_cache_manager +from ..monitoring.instrumentation import monitored +from ..quality import ( + BatchTriggerPolicy, + TieredLearningTrigger, + TriggerResult, +) + +# Minimum message length to consider for LLM-heavy ingestion operations. +_MIN_INGESTION_LENGTH = 15 + +# Maximum buffered messages per group before force-flushing. +_INGESTION_BUFFER_MAX = 10 + + +class V2LearningIntegration: + """Facade that initialises, wires, and exposes v2 learning modules. + + Usage:: + + v2 = V2LearningIntegration(config, llm_adapter, db_manager, context) + await v2.start() + result = await v2.process_message(message, group_id) + context = await v2.get_enhanced_context("query", group_id) + await v2.stop() + """ + + def __init__( + self, + config: PluginConfig, + llm_adapter: Optional[Any] = None, + db_manager: Optional[Any] = None, + context: Optional[Any] = None, + feature_delegation: Optional[Any] = None, + ) -> None: + self._config = config + self._llm = llm_adapter + self._db = db_manager + self._context = context + self._feature_delegation = feature_delegation + self._started = False + self._provider_retry_lock = asyncio.Lock() + self._last_provider_retry: float = 0.0 + self._provider_retry_interval: float = max( + 0.1, + float(getattr(config, "provider_retry_interval_seconds", 10.0) or 10.0), + ) + self._knowledge_manager_retryable = True + self._memory_manager_retryable = True + + # --- Resolve framework providers via factories --------------- + self._embedding_provider = self._create_embedding_provider() + self._rerank_provider = self._create_rerank_provider() + + # --- Instantiate v2 modules ---------------------------------- + self._knowledge_manager = self._create_knowledge_manager() + self._memory_manager = self._create_memory_manager() + self._exemplar_library = self._create_exemplar_library() + self._social_analyzer = self._create_social_analyzer() + self._jargon_filter = self._create_jargon_filter() + + # --- Query result cache via CacheManager ---------------------- + self._cache = get_cache_manager() + + # --- Message buffer for batch ingestion ----------------------- + # Knowledge and memory ingestion are LLM-heavy operations and + # must not run per-message. Instead, messages are buffered here + # and flushed as a batch in a Tier 2 operation. + self._ingestion_buffer: Dict[str, List[MessageData]] = defaultdict(list) + + # --- Tiered trigger ------------------------------------------ + self._trigger = TieredLearningTrigger() + self._register_trigger_operations() + + logger.info( + "[V2Integration] Initialised — " + f"knowledge={self._config.knowledge_engine}, " + f"memory={self._config.memory_engine}, " + f"memory_delegated={self._memory_delegated()}, " + f"embedding={'yes' if self._embedding_provider else 'no'}, " + f"reranker={'yes' if self._rerank_provider else 'no'}" + ) + + # Lifecycle + + async def start(self) -> None: + """Start all active v2 modules that expose a ``start`` method.""" + await self.refresh_provider_bindings(force=True) + + await asyncio.gather(*( + self._start_one(name, module) + for name, module in self._active_modules() + if module and hasattr(module, "start") + )) + self._started = True + logger.info("[V2Integration] All modules started") + + async def refresh_provider_bindings(self, *, force: bool = False) -> bool: + """Retry framework provider binding and create dependent modules. + + AstrBot can load plugins before provider registries are populated. This + lets startup, warmup, and first-use paths bind providers later without a + manual plugin reload. + """ + if not self._needs_provider_or_module_retry(): + return False + + if not force and not self._provider_retry_due(): + return False + + async with self._provider_retry_lock: + if not self._needs_provider_or_module_retry(): + return False + + if not force and not self._provider_retry_due(): + return False + self._last_provider_retry = time.monotonic() + + changed = False + modules_to_start: List[Tuple[str, Any]] = [] + + if not self._embedding_provider and self._embedding_provider_configured(): + provider = self._create_embedding_provider() + if provider: + self._embedding_provider = provider + changed = True + + if not self._rerank_provider and self._rerank_provider_configured(): + provider = self._create_rerank_provider() + if provider: + self._rerank_provider = provider + changed = True + + if self._embedding_provider: + if self._knowledge_manager is None: + self._knowledge_manager = self._create_knowledge_manager() + if self._knowledge_manager: + changed = True + modules_to_start.append(( + "knowledge_manager", + self._knowledge_manager, + )) + + if self._memory_manager is None: + self._memory_manager = self._create_memory_manager() + if self._memory_manager: + changed = True + modules_to_start.append(( + "memory_manager", + self._memory_manager, + )) + + if self._exemplar_library_needs_embedding_refresh(): + self._exemplar_library = self._create_exemplar_library() + changed = True + + if changed: + self._register_trigger_operations() + if self._started and modules_to_start: + await asyncio.gather(*( + self._start_one(name, module) + for name, module in modules_to_start + if module and hasattr(module, "start") + )) + logger.info( + "[V2Integration] Provider bindings refreshed — " + f"embedding={'yes' if self._embedding_provider else 'no'}, " + f"reranker={'yes' if self._rerank_provider else 'no'}" + ) + return changed + + def _provider_retry_due(self) -> bool: + return ( + time.monotonic() - self._last_provider_retry + >= self._provider_retry_interval + ) + + async def _start_one(self, name: str, module: Any) -> None: + try: + await module.start() + except Exception as exc: + logger.warning( + f"[V2Integration] {name} start failed: {exc}" + ) + + def _active_modules(self) -> List[Tuple[str, Any]]: + return [ + ("knowledge_manager", self._knowledge_manager), + ("memory_manager", self._memory_manager), + ("exemplar_library", self._exemplar_library), + ("social_analyzer", self._social_analyzer), + ("jargon_filter", self._jargon_filter), + ] + + def _needs_provider_or_module_retry(self) -> bool: + if self._embedding_provider_configured() and not self._embedding_provider: + return True + if self._rerank_provider_configured() and not self._rerank_provider: + return True + if self._embedding_provider and self._knowledge_manager is None: + return ( + self._config.knowledge_engine == "lightrag" + and self._knowledge_manager_retryable + ) + if self._embedding_provider and self._memory_manager is None: + return ( + self._config.memory_engine == "mem0" + and not self._memory_delegated() + and self._memory_manager_retryable + ) + return self._exemplar_library_needs_embedding_refresh() + + def _embedding_provider_configured(self) -> bool: + return bool( + str(getattr(self._config, "embedding_provider_id", "") or "").strip() + ) + + def _rerank_provider_configured(self) -> bool: + return bool( + str(getattr(self._config, "rerank_provider_id", "") or "").strip() + ) + + def _exemplar_library_needs_embedding_refresh(self) -> bool: + if not (self._db and self._embedding_provider and self._exemplar_library): + return False + return getattr(self._exemplar_library, "_embedding", None) is None + + async def warmup(self, group_ids: List[str]) -> None: + """Pre-warm heavyweight module instances for *group_ids*. + + Should be called shortly after ``start()`` once active group IDs + are known. Currently only LightRAG benefits from pre-warming + (each cold-start avoids a 12-15s initialisation penalty on the + first user query). + """ + await self.refresh_provider_bindings() + if ( + self._knowledge_manager + and hasattr(self._knowledge_manager, "warmup_instances") + ): + try: + await self._knowledge_manager.warmup_instances(group_ids) + except Exception as exc: + logger.debug( + f"[V2Integration] Knowledge warmup failed: {exc}" + ) + + async def stop(self) -> None: + """Stop all active v2 modules and release resources. + + Attempts to flush remaining buffered messages with a per-group + timeout. Timed-out buffers are discarded to avoid blocking + the shutdown sequence. + """ + _flush_timeout = self._config.task_cancel_timeout + + for group_id in list(self._ingestion_buffer.keys()): + try: + await asyncio.wait_for( + self._flush_ingestion_buffer(group_id), + timeout=_flush_timeout, + ) + except asyncio.TimeoutError: + dropped = len(self._ingestion_buffer.pop(group_id, [])) + logger.warning( + f"[V2Integration] Buffer flush timeout for group " + f"{group_id}, dropped {dropped} messages" + ) + except Exception as exc: + logger.warning( + f"[V2Integration] Buffer flush failed on stop " + f"for group {group_id}: {exc}" + ) + + modules: List[Tuple[str, Any]] = [ + ("knowledge_manager", self._knowledge_manager), + ("memory_manager", self._memory_manager), + ("exemplar_library", self._exemplar_library), + ("social_analyzer", self._social_analyzer), + ("jargon_filter", self._jargon_filter), + ] + + async def _stop_one(name: str, module: Any) -> None: + try: + await module.stop() + except Exception as exc: + logger.warning( + f"[V2Integration] {name} stop failed: {exc}" + ) + + async def _close_reranker() -> None: + try: + await self._rerank_provider.close() + except Exception as exc: + logger.warning(f"[V2Integration] Reranker close failed: {exc}") + + tasks = [ + _stop_one(name, module) + for name, module in modules + if module and hasattr(module, "stop") + ] + if self._rerank_provider and hasattr(self._rerank_provider, "close"): + tasks.append(_close_reranker()) + + await asyncio.gather(*tasks) + logger.info("[V2Integration] All modules stopped") + + # Public API + + @monitored + async def process_message( + self, message: MessageData, group_id: str + ) -> TriggerResult: + """Process an incoming message through the tiered trigger. + + Tier 1 operations run concurrently on every message. Tier 2 + operations fire when their policies are satisfied. + """ + await self.refresh_provider_bindings() + return await self._trigger.process_message(message, group_id) + + @monitored + async def get_enhanced_context( + self, + query: str, + group_id: str, + top_k: int = 5, + ) -> Dict[str, Any]: + """Retrieve v2 enhanced context for response generation. + + Returns a dict with optional keys: + * ``knowledge_context`` (str): Retrieved knowledge graph context. + * ``related_memories`` (List[str]): Semantically related memories. + * ``few_shot_examples`` (List[str]): Style exemplar texts + (not reranked; returned as-is). + * ``graph_stats`` (dict): Social graph summary statistics. + + When a reranker is available, knowledge and memory candidates are + reranked by relevance and only the top-k are returned. Few-shot + exemplars and graph stats are returned unmodified. + + Results are cached per (group_id, query_hash) with a configurable + TTL to avoid redundant retrieval on repeated or similar queries. + + All retrieval tasks run concurrently via ``asyncio.gather`` to + minimise total latency. + """ + await self.refresh_provider_bindings() + + # --- Check query result cache --- + cache_key = self._make_cache_key(query, group_id) + cached_result = self._cache.get("context", cache_key) + if cached_result is not None: + logger.debug( + f"[V2Integration] Context cache hit (group={group_id})" + ) + return cached_result + + context: Dict[str, Any] = {} + + # --- Build concurrent retrieval tasks --- + + async def _fetch_knowledge() -> None: + if not self._knowledge_manager: + return + try: + if hasattr(self._knowledge_manager, "query_knowledge"): + ctx = await self._knowledge_manager.query_knowledge( + query, group_id, + mode=self._config.lightrag_query_mode, + ) + elif hasattr( + self._knowledge_manager, + "answer_question_with_knowledge_graph", + ): + ctx = ( + await self._knowledge_manager + .answer_question_with_knowledge_graph(query, group_id) + ) + else: + ctx = "" + if ctx: + context["knowledge_context"] = ctx + except Exception as exc: + logger.debug( + f"[V2Integration] Knowledge retrieval failed: {exc}" + ) + + async def _fetch_memories() -> None: + if self._memory_delegated(): + logger.debug("[V2Integration] Memory retrieval delegated to LivingMemory") + return + if not self._memory_manager: + return + try: + memories = await self._memory_manager.get_related_memories( + query, group_id + ) + if memories: + context["related_memories"] = memories + except Exception as exc: + logger.debug( + f"[V2Integration] Memory retrieval failed: {exc}" + ) + + async def _fetch_exemplars() -> None: + if not self._exemplar_library: + return + try: + examples = await self._exemplar_library.get_few_shot_examples( + query, group_id, k=top_k + ) + if examples: + context["few_shot_examples"] = examples + except Exception as exc: + logger.debug( + f"[V2Integration] Exemplar retrieval failed: {exc}" + ) + + async def _fetch_graph_stats() -> None: + if not self._social_analyzer: + return + try: + stats = await self._social_analyzer.get_graph_statistics( + group_id + ) + if stats and stats.get("node_count", 0) > 0: + context["graph_stats"] = stats + except Exception as exc: + logger.debug( + f"[V2Integration] Social graph stats failed: {exc}" + ) + + # --- Run all retrievals concurrently --- + await asyncio.gather( + _fetch_knowledge(), + _fetch_memories(), + _fetch_exemplars(), + _fetch_graph_stats(), + ) + + # --- Conditional reranking --- + # Only invoke the reranker when there are enough candidates to + # justify the additional API round-trip latency. + rerank_candidates = len(context.get("related_memories", [])) + if "knowledge_context" in context: + rerank_candidates += 1 + min_candidates = getattr( + self._config, "rerank_min_candidates", 3 + ) + if self._rerank_provider and rerank_candidates >= min_candidates: + context = await self._rerank_context(query, context, top_k) + + # --- Store result in cache --- + self._cache.set("context", cache_key, context) + + return context + + # Cache helpers + + @staticmethod + def _make_cache_key(query: str, group_id: str) -> str: + """Generate a compact cache key from query text and group ID.""" + query_hash = hashlib.md5(query.encode("utf-8")).hexdigest()[:12] + return f"{group_id}:{query_hash}" + + def get_trigger_stats(self, group_id: str) -> Dict[str, Any]: + """Return tiered trigger statistics for a group.""" + return self._trigger.get_group_stats(group_id) + + # Module factories + + def _create_embedding_provider(self) -> Optional[Any]: + """Resolve embedding provider from the framework.""" + try: + from ..embedding.factory import EmbeddingProviderFactory + return EmbeddingProviderFactory.create(self._config, self._context) + except Exception as exc: + logger.debug( + f"[V2Integration] Embedding provider unavailable: {exc}" + ) + return None + + def _create_rerank_provider(self) -> Optional[Any]: + """Resolve reranker provider from the framework.""" + try: + from ..reranker.factory import RerankProviderFactory + return RerankProviderFactory.create(self._config, self._context) + except Exception as exc: + logger.debug(f"[V2Integration] Reranker unavailable: {exc}") + return None + + def _create_knowledge_manager(self) -> Optional[Any]: + """Create knowledge manager based on configured engine.""" + if self._config.knowledge_engine == "lightrag": + if not self._embedding_provider: + if self._embedding_provider_configured(): + logger.info( + "[V2Integration] LightRAG is waiting for the " + "embedding provider registry to become ready" + ) + else: + logger.warning( + "[V2Integration] LightRAG requires an embedding " + "provider; configure embedding_provider_id or use " + "the legacy knowledge engine" + ) + return None + try: + from ..integration import LightRAGKnowledgeManager + return LightRAGKnowledgeManager( + self._config, self._llm, self._embedding_provider + ) + except ImportError: + self._knowledge_manager_retryable = False + logger.warning( + "[V2Integration] lightrag-hku not installed, " + "falling back to legacy knowledge engine" + ) + except Exception as exc: + logger.warning( + f"[V2Integration] LightRAG init failed: {exc}" + ) + logger.debug( + "[V2Integration] LightRAG traceback:", exc_info=True + ) + return None + + def _create_memory_manager(self) -> Optional[Any]: + """Create memory manager based on configured engine.""" + if self._memory_delegated(): + logger.info("[V2Integration] Memory engine skipped: delegated to LivingMemory") + return None + if self._config.memory_engine == "mem0": + if not self._embedding_provider: + if self._embedding_provider_configured(): + logger.info( + "[V2Integration] Mem0 is waiting for the embedding " + "provider registry to become ready" + ) + else: + logger.warning( + "[V2Integration] Mem0 requires an embedding provider; " + "configure embedding_provider_id or use the legacy " + "memory engine" + ) + return None + try: + from ..integration import Mem0MemoryManager + return Mem0MemoryManager( + self._config, self._llm, self._embedding_provider + ) + except ImportError: + self._memory_manager_retryable = False + logger.warning( + "[V2Integration] mem0ai not installed, " + "falling back to legacy memory engine" + ) + except Exception as exc: + logger.warning( + f"[V2Integration] Mem0 init failed: {exc}" + ) + logger.debug( + "[V2Integration] Mem0 traceback:", exc_info=True + ) + return None + + def _create_exemplar_library(self) -> Optional[Any]: + """Create exemplar library if DB and embedding are available.""" + if not self._db: + return None + try: + from ..integration import ExemplarLibrary + return ExemplarLibrary(self._db, self._embedding_provider) + except Exception as exc: + logger.debug( + f"[V2Integration] ExemplarLibrary init failed: {exc}" + ) + return None + + def _create_social_analyzer(self) -> Optional[Any]: + """Create social graph analyzer.""" + try: + from ..social import SocialGraphAnalyzer + return SocialGraphAnalyzer(self._llm, self._db) + except Exception as exc: + logger.debug( + f"[V2Integration] SocialGraphAnalyzer init failed: {exc}" + ) + return None + + def _create_jargon_filter(self) -> Optional[Any]: + """Create jargon statistical filter.""" + try: + from ..jargon import JargonStatisticalFilter + return JargonStatisticalFilter() + except Exception as exc: + logger.debug( + f"[V2Integration] JargonStatisticalFilter init failed: {exc}" + ) + return None + + # Trigger wiring + + def _register_trigger_operations(self) -> None: + """Register all available modules with the tiered trigger. + + Architecture: + Tier 1 (per-message, sub-millisecond): + - jargon_stats: in-memory statistical counters + - ingestion_buffer: append message to buffer (no I/O) + - exemplar: embedding + DB insert (< 1s) + + Tier 2 (batch, LLM-gated, cooldown-protected): + - ingestion_flush: batch-process buffered messages through + LightRAG and Mem0, amortising LLM overhead across + multiple messages + - jargon: LLM-based jargon meaning inference + - social: community detection and influence ranking + + Knowledge graph ingestion (LightRAG) and memory ingestion (Mem0) + are intentionally registered as Tier 2 batch operations rather + than Tier 1 per-message callbacks because they each invoke one + or more LLM round-trips (entity extraction, fact extraction) + that take 3-10 seconds per message. Running them per-message + would dominate the event loop and block subsequent processing. + """ + + # ---- Tier 1: per-message lightweight operations ---- + + if self._jargon_filter: + jf = self._jargon_filter + + async def _jargon_update( + message: MessageData, group_id: str + ) -> None: + jf.update_from_message(message.message, group_id, message.sender_id) + + self._trigger.register_tier1("jargon_stats", _jargon_update) + + # Buffer messages for batch ingestion (knowledge + memory). + # This replaces the previous per-message LightRAG/Mem0 callbacks + # with a sub-millisecond append operation. + if self._knowledge_manager or self._memory_manager: + buf = self._ingestion_buffer + + async def _buffer_message( + message: MessageData, group_id: str + ) -> None: + if ( + message.message + and len(message.message.strip()) >= _MIN_INGESTION_LENGTH + ): + buf[group_id].append(message) + + self._trigger.register_tier1("ingestion_buffer", _buffer_message) + + if self._exemplar_library: + lib = self._exemplar_library + + async def _exemplar_add( + message: MessageData, group_id: str + ) -> None: + await lib.add_exemplar( + message.message, group_id, message.sender_id + ) + + self._trigger.register_tier1("exemplar", _exemplar_add) + + # ---- Tier 2: batch operations (LLM-heavy) ---- + + # Batch ingestion: flush buffered messages through LightRAG + # and Mem0. Fires every 5 messages or 60 seconds, whichever + # comes first. This amortises the per-message LLM overhead + # and reduces total API calls. + if self._knowledge_manager or self._memory_manager: + self._trigger.register_tier2( + "ingestion_flush", + self._flush_ingestion_buffer, + BatchTriggerPolicy( + message_threshold=5, cooldown_seconds=60 + ), + ) + + if self._jargon_filter: + jf2 = self._jargon_filter + llm = self._llm + db = self._db + + async def _jargon_batch(group_id: str) -> None: + candidates = jf2.get_jargon_candidates(group_id, top_k=20) + if not candidates or not llm: + return + for candidate in candidates[:10]: + try: + # 跳过已被手动编辑或已完成推断的黑话,避免覆盖用户修改 + if db and hasattr(db, "get_jargon"): + existing = await db.get_jargon(group_id, candidate["term"]) + if existing and existing.get("is_complete"): + logger.debug( + f"[V2Integration] 跳过已完成的黑话: " + f"'{candidate['term']}'" + ) + continue + + meaning = await llm.generate_response( + f"Explain the slang/jargon term " + f"'{candidate['term']}' in the context of an " + f"online chat group. Return a concise definition.", + model_type="filter", + ) + if ( + meaning + and db + and hasattr(db, "save_or_update_jargon") + ): + jargon_data = { + "meaning": meaning, + "raw_content": "[]", + "is_jargon": True, + "is_complete": True, + } + # 已有记录时保留原始计数,不重置为1 + if existing: + jargon_data["count"] = existing.get("count", 1) + else: + jargon_data["count"] = 1 + await db.save_or_update_jargon( + group_id, + candidate["term"], + jargon_data, + ) + except Exception as exc: + logger.debug( + f"[V2Integration] Jargon inference failed " + f"for '{candidate['term']}': {exc}" + ) + + self._trigger.register_tier2( + "jargon", + _jargon_batch, + BatchTriggerPolicy( + message_threshold=20, cooldown_seconds=180 + ), + ) + + if self._social_analyzer: + sa = self._social_analyzer + + async def _social_batch(group_id: str) -> None: + # Execute independently so one failure does not skip the other. + try: + await sa.detect_communities(group_id) + except Exception as exc: + logger.debug( + f"[V2Integration] detect_communities failed: {exc}" + ) + try: + await sa.get_influence_ranking(group_id) + except Exception as exc: + logger.debug( + f"[V2Integration] get_influence_ranking failed: {exc}" + ) + + self._trigger.register_tier2( + "social", + _social_batch, + BatchTriggerPolicy( + message_threshold=50, cooldown_seconds=600 + ), + ) + + # Batch ingestion + + async def _flush_ingestion_buffer(self, group_id: str) -> None: + """Flush buffered messages for a group through knowledge and memory. + + Processes all buffered messages concurrently through LightRAG and + Mem0 in a single batch operation, then clears the buffer. Messages + within each engine are processed sequentially to avoid overwhelming + the underlying LLM providers with concurrent requests. + """ + messages = self._ingestion_buffer.pop(group_id, []) + if not messages: + return + + logger.debug( + f"[V2Integration] Flushing ingestion buffer: " + f"group={group_id}, count={len(messages)}" + ) + + async def _ingest_knowledge() -> None: + if not self._knowledge_manager: + return + method = None + if hasattr( + self._knowledge_manager, + "process_message_for_knowledge_graph", + ): + method = self._knowledge_manager.process_message_for_knowledge_graph + elif hasattr( + self._knowledge_manager, "process_message_for_knowledge" + ): + method = self._knowledge_manager.process_message_for_knowledge + if not method: + return + for msg in messages: + try: + await method(msg, group_id) + except Exception as exc: + logger.debug( + f"[V2Integration] Knowledge ingestion failed: {exc}" + ) + + async def _ingest_memory() -> None: + if self._memory_delegated(): + logger.debug("[V2Integration] Memory ingestion delegated to LivingMemory") + return + if not self._memory_manager: + return + for msg in messages: + try: + await self._memory_manager.add_memory_from_message( + msg, group_id + ) + except Exception as exc: + logger.debug( + f"[V2Integration] Memory ingestion failed: {exc}" + ) + + # Run knowledge and memory ingestion concurrently across engines, + # but sequentially within each engine to avoid provider overload. + await asyncio.gather( + _ingest_knowledge(), + _ingest_memory(), + ) + + def _memory_delegated(self) -> bool: + delegation = self._feature_delegation + if not delegation or not hasattr(delegation, "should_delegate_memory"): + return False + try: + return bool(delegation.should_delegate_memory()) + except Exception: + return False + + # Reranking + + @monitored + async def _rerank_context( + self, + query: str, + context: Dict[str, Any], + top_k: int, + ) -> Dict[str, Any]: + """Rerank knowledge and memory candidates by relevance. + + Few-shot exemplars and graph stats are returned unmodified. + """ + try: + documents: List[str] = [] + sources: List[str] = [] + + if "knowledge_context" in context: + documents.append(context["knowledge_context"]) + sources.append("knowledge") + + for mem in context.get("related_memories", []): + documents.append(mem) + sources.append("memory") + + if not documents: + return context + + results = await self._rerank_provider.rerank( + query, documents, top_n=top_k + ) + + # Rebuild context with reranked order. + reranked_memories: List[str] = [] + reranked_knowledge = "" + for r in results: + if r.index >= len(documents): + logger.debug( + f"[V2Integration] Reranker returned out-of-range " + f"index {r.index} (len={len(documents)}); skipping" + ) + continue + src = sources[r.index] + doc = documents[r.index] + if src == "knowledge": + reranked_knowledge = doc + elif src == "memory": + reranked_memories.append(doc) + + if reranked_knowledge: + context["knowledge_context"] = reranked_knowledge + elif "knowledge_context" in context: + del context["knowledge_context"] + + if reranked_memories: + context["related_memories"] = reranked_memories + elif "related_memories" in context: + del context["related_memories"] + + except Exception as exc: + logger.debug( + f"[V2Integration] Reranking failed, using unranked: {exc}" + ) + + return context