diff --git a/pages/dashboard/app.js b/pages/dashboard/app.js index b2a089b9..16a4d441 100644 --- a/pages/dashboard/app.js +++ b/pages/dashboard/app.js @@ -408,6 +408,34 @@ else modal.removeAttribute("open"); } + function showConfirm(title, message, confirmText) { + return new Promise((resolve) => { + const modal = $("detail-modal"); + if (!modal) { resolve(window.confirm(message)); return; } + setText("modal-title", title); + setHtml("modal-body", ` +

${escapeHtml(message)}

+
+ + +
+ `); + const done = (result) => { + if (typeof modal.close === "function") modal.close(); + else modal.removeAttribute("open"); + resolve(result); + }; + $("confirm-ok").addEventListener("click", () => done(true), { once: true }); + $("confirm-cancel").addEventListener("click", () => done(false), { once: true }); + $("modal-close").addEventListener("click", () => done(false), { once: true }); + modal.addEventListener("cancel", (e) => { e.preventDefault(); done(false); }, { once: true }); + if (typeof modal.showModal === "function") { + try { modal.showModal(); return; } catch (_) {} + } + modal.setAttribute("open", ""); + }); + } + function resolvePageFromHash() { const raw = window.location.hash.replace(/^#\/?/, ""); return PAGE_META[raw] ? raw : "home"; @@ -1095,11 +1123,11 @@ const typeText = { persona: t("reviews.personaUpdates", "人格更新"), style: t("reviews.expressionReviews", "表达审查"), jargon: t("reviews.jargonCandidates", "黑话候选") }[kind] || t("reviews.items", "审查项"); const actionText = action === "approve" ? t("actions.pass", "通过") : action === "reject" ? t("actions.reject", "拒绝") : t("actions.delete", "删除"); const scopeText = selectedReviewIds(kind).length ? t("selection.selected", "选中") : t("selection.currentPage", "当前页"); - if (!window.confirm(t("reviews.confirmBatch", "确定批量{action}{scope} {count} 条{type}?") + if (!await showConfirm(t("reviews.batchConfirmTitle", "批量操作确认"), t("reviews.confirmBatch", "确定批量{action}{scope} {count} 条{type}?") .replace("{action}", actionText) .replace("{scope}", scopeText) .replace("{count}", fmt(ids.length, 0)) - .replace("{type}", typeText))) return; + .replace("{type}", typeText), actionText)) return; const payload = { action: action === "delete" @@ -1305,7 +1333,7 @@ return; } const actionText = action === "approve" ? t("actions.confirm", "确认") : action === "reject" ? t("actions.rejectBack", "驳回") : t("actions.delete", "删除"); - if (!window.confirm(t("jargon.confirmBatch", "确定批量{action}选中的 {count} 条黑话?").replace("{action}", actionText).replace("{count}", fmt(ids.length, 0)))) return; + if (!await showConfirm(t("jargon.batchConfirmTitle", "批量操作确认"), t("jargon.confirmBatch", "确定批量{action}选中的 {count} 条黑话?").replace("{action}", actionText).replace("{count}", fmt(ids.length, 0)), actionText)) return; const result = await apiPost("jargon/action", { action: action === "delete" ? "batch_delete" : "batch_review", @@ -1367,7 +1395,7 @@ return; } const actionText = action === "approve" ? t("actions.approve", "批准") : action === "reject" ? t("actions.reject", "拒绝") : t("actions.delete", "删除"); - if (!window.confirm(t("style.confirmBatch", "确定批量{action}选中的 {count} 条表达审查?").replace("{action}", actionText).replace("{count}", fmt(ids.length, 0)))) return; + if (!await showConfirm(t("style.batchConfirmTitle", "批量操作确认"), t("style.confirmBatch", "确定批量{action}选中的 {count} 条表达审查?").replace("{action}", actionText).replace("{count}", fmt(ids.length, 0)), actionText)) return; const result = await apiPost("style/action", { action: action === "delete" ? "batch_delete" : "batch_review", diff --git a/services/core_learning/v2_learning_integration.py b/services/core_learning/v2_learning_integration.py index 5f0a1c85..4f90397c 100644 --- a/services/core_learning/v2_learning_integration.py +++ b/services/core_learning/v2_learning_integration.py @@ -1,936 +1,951 @@ -""" -V2 learning integration layer. - -Wires together the v2-architecture modules and provides a unified -interface for the ``MaiBotEnhancedLearningManager`` to delegate to. -When v2 features are enabled in ``PluginConfig`` the learning manager -instantiates this class and calls its ``process_message`` and -``get_enhanced_context`` methods alongside (or instead of) the legacy -code paths. - -Modules orchestrated: - * ``TieredLearningTrigger`` — per-message / batch operation scheduling - * ``LightRAGKnowledgeManager`` — knowledge graph (replaces legacy) - * ``Mem0MemoryManager`` — memory management (replaces legacy) - * ``ExemplarLibrary`` — few-shot style exemplar retrieval - * ``SocialGraphAnalyzer`` — community detection / influence ranking - * ``JargonStatisticalFilter`` — statistical jargon pre-filter - * ``IRerankProvider`` — cross-source context reranking - -Design notes: - - All module construction is guarded by the relevant config flags so - that unused modules are never instantiated. - - ``start()`` / ``stop()`` manage the full lifecycle of every active - v2 module. - - Each module that can fail during construction logs a warning and - falls back gracefully (the integration layer keeps working with - the remaining modules). - - Thread-safe for single-event-loop asyncio usage. -""" - -import asyncio -import hashlib -import time -from collections import defaultdict -from typing import Any, Dict, List, Optional, Tuple - -from astrbot.api import logger - -from ...config import PluginConfig -from ...core.interfaces import MessageData -from ...utils.cache_manager import get_cache_manager -from ..monitoring.instrumentation import monitored -from ..quality import ( - BatchTriggerPolicy, - TieredLearningTrigger, - TriggerResult, -) - -# Minimum message length to consider for LLM-heavy ingestion operations. -_MIN_INGESTION_LENGTH = 15 - -# Maximum buffered messages per group before force-flushing. -_INGESTION_BUFFER_MAX = 10 - - -class V2LearningIntegration: - """Facade that initialises, wires, and exposes v2 learning modules. - - Usage:: - - v2 = V2LearningIntegration(config, llm_adapter, db_manager, context) - await v2.start() - result = await v2.process_message(message, group_id) - context = await v2.get_enhanced_context("query", group_id) - await v2.stop() - """ - - def __init__( - self, - config: PluginConfig, - llm_adapter: Optional[Any] = None, - db_manager: Optional[Any] = None, - context: Optional[Any] = None, - feature_delegation: Optional[Any] = None, - ) -> None: - self._config = config - self._llm = llm_adapter - self._db = db_manager - self._context = context - self._feature_delegation = feature_delegation - self._started = False - self._provider_retry_lock = asyncio.Lock() - self._last_provider_retry: float = 0.0 - self._provider_retry_interval: float = max( - 0.1, - float(getattr(config, "provider_retry_interval_seconds", 10.0) or 10.0), - ) - self._knowledge_manager_retryable = True - self._memory_manager_retryable = True - - # --- Resolve framework providers via factories --------------- - self._embedding_provider = self._create_embedding_provider() - self._rerank_provider = self._create_rerank_provider() - - # --- Instantiate v2 modules ---------------------------------- - self._knowledge_manager = self._create_knowledge_manager() - self._memory_manager = self._create_memory_manager() - self._exemplar_library = self._create_exemplar_library() - self._social_analyzer = self._create_social_analyzer() - self._jargon_filter = self._create_jargon_filter() - - # --- Query result cache via CacheManager ---------------------- - self._cache = get_cache_manager() - - # --- Message buffer for batch ingestion ----------------------- - # Knowledge and memory ingestion are LLM-heavy operations and - # must not run per-message. Instead, messages are buffered here - # and flushed as a batch in a Tier 2 operation. - self._ingestion_buffer: Dict[str, List[MessageData]] = defaultdict(list) - - # --- Tiered trigger ------------------------------------------ - self._trigger = TieredLearningTrigger() - self._register_trigger_operations() - - logger.info( - "[V2Integration] Initialised — " - f"knowledge={self._config.knowledge_engine}, " - f"memory={self._config.memory_engine}, " - f"memory_delegated={self._memory_delegated()}, " - f"embedding={'yes' if self._embedding_provider else 'no'}, " - f"reranker={'yes' if self._rerank_provider else 'no'}" - ) - - # Lifecycle - - async def start(self) -> None: - """Start all active v2 modules that expose a ``start`` method.""" - await self.refresh_provider_bindings(force=True) - - await asyncio.gather(*( - self._start_one(name, module) - for name, module in self._active_modules() - if module and hasattr(module, "start") - )) - self._started = True - logger.info("[V2Integration] All modules started") - - async def refresh_provider_bindings(self, *, force: bool = False) -> bool: - """Retry framework provider binding and create dependent modules. - - AstrBot can load plugins before provider registries are populated. This - lets startup, warmup, and first-use paths bind providers later without a - manual plugin reload. - """ - if not self._needs_provider_or_module_retry(): - return False - - if not force and not self._provider_retry_due(): - return False - - async with self._provider_retry_lock: - if not self._needs_provider_or_module_retry(): - return False - - if not force and not self._provider_retry_due(): - return False - self._last_provider_retry = time.monotonic() - - changed = False - modules_to_start: List[Tuple[str, Any]] = [] - - if not self._embedding_provider and self._embedding_provider_configured(): - provider = self._create_embedding_provider() - if provider: - self._embedding_provider = provider - changed = True - - if not self._rerank_provider and self._rerank_provider_configured(): - provider = self._create_rerank_provider() - if provider: - self._rerank_provider = provider - changed = True - - if self._embedding_provider: - if self._knowledge_manager is None: - self._knowledge_manager = self._create_knowledge_manager() - if self._knowledge_manager: - changed = True - modules_to_start.append(( - "knowledge_manager", - self._knowledge_manager, - )) - - if self._memory_manager is None: - self._memory_manager = self._create_memory_manager() - if self._memory_manager: - changed = True - modules_to_start.append(( - "memory_manager", - self._memory_manager, - )) - - if self._exemplar_library_needs_embedding_refresh(): - self._exemplar_library = self._create_exemplar_library() - changed = True - - if changed: - self._register_trigger_operations() - if self._started and modules_to_start: - await asyncio.gather(*( - self._start_one(name, module) - for name, module in modules_to_start - if module and hasattr(module, "start") - )) - logger.info( - "[V2Integration] Provider bindings refreshed — " - f"embedding={'yes' if self._embedding_provider else 'no'}, " - f"reranker={'yes' if self._rerank_provider else 'no'}" - ) - return changed - - def _provider_retry_due(self) -> bool: - return ( - time.monotonic() - self._last_provider_retry - >= self._provider_retry_interval - ) - - async def _start_one(self, name: str, module: Any) -> None: - try: - await module.start() - except Exception as exc: - logger.warning( - f"[V2Integration] {name} start failed: {exc}" - ) - - def _active_modules(self) -> List[Tuple[str, Any]]: - return [ - ("knowledge_manager", self._knowledge_manager), - ("memory_manager", self._memory_manager), - ("exemplar_library", self._exemplar_library), - ("social_analyzer", self._social_analyzer), - ("jargon_filter", self._jargon_filter), - ] - - def _needs_provider_or_module_retry(self) -> bool: - if self._embedding_provider_configured() and not self._embedding_provider: - return True - if self._rerank_provider_configured() and not self._rerank_provider: - return True - if self._embedding_provider and self._knowledge_manager is None: - return ( - self._config.knowledge_engine == "lightrag" - and self._knowledge_manager_retryable - ) - if self._embedding_provider and self._memory_manager is None: - return ( - self._config.memory_engine == "mem0" - and not self._memory_delegated() - and self._memory_manager_retryable - ) - return self._exemplar_library_needs_embedding_refresh() - - def _embedding_provider_configured(self) -> bool: - return bool( - str(getattr(self._config, "embedding_provider_id", "") or "").strip() - ) - - def _rerank_provider_configured(self) -> bool: - return bool( - str(getattr(self._config, "rerank_provider_id", "") or "").strip() - ) - - def _exemplar_library_needs_embedding_refresh(self) -> bool: - if not (self._db and self._embedding_provider and self._exemplar_library): - return False - return getattr(self._exemplar_library, "_embedding", None) is None - - async def warmup(self, group_ids: List[str]) -> None: - """Pre-warm heavyweight module instances for *group_ids*. - - Should be called shortly after ``start()`` once active group IDs - are known. Currently only LightRAG benefits from pre-warming - (each cold-start avoids a 12-15s initialisation penalty on the - first user query). - """ - await self.refresh_provider_bindings() - if ( - self._knowledge_manager - and hasattr(self._knowledge_manager, "warmup_instances") - ): - try: - await self._knowledge_manager.warmup_instances(group_ids) - except Exception as exc: - logger.debug( - f"[V2Integration] Knowledge warmup failed: {exc}" - ) - - async def stop(self) -> None: - """Stop all active v2 modules and release resources. - - Attempts to flush remaining buffered messages with a per-group - timeout. Timed-out buffers are discarded to avoid blocking - the shutdown sequence. - """ - _flush_timeout = self._config.task_cancel_timeout - - for group_id in list(self._ingestion_buffer.keys()): - try: - await asyncio.wait_for( - self._flush_ingestion_buffer(group_id), - timeout=_flush_timeout, - ) - except asyncio.TimeoutError: - dropped = len(self._ingestion_buffer.pop(group_id, [])) - logger.warning( - f"[V2Integration] Buffer flush timeout for group " - f"{group_id}, dropped {dropped} messages" - ) - except Exception as exc: - logger.warning( - f"[V2Integration] Buffer flush failed on stop " - f"for group {group_id}: {exc}" - ) - - modules: List[Tuple[str, Any]] = [ - ("knowledge_manager", self._knowledge_manager), - ("memory_manager", self._memory_manager), - ("exemplar_library", self._exemplar_library), - ("social_analyzer", self._social_analyzer), - ("jargon_filter", self._jargon_filter), - ] - - async def _stop_one(name: str, module: Any) -> None: - try: - await module.stop() - except Exception as exc: - logger.warning( - f"[V2Integration] {name} stop failed: {exc}" - ) - - async def _close_reranker() -> None: - try: - await self._rerank_provider.close() - except Exception as exc: - logger.warning(f"[V2Integration] Reranker close failed: {exc}") - - tasks = [ - _stop_one(name, module) - for name, module in modules - if module and hasattr(module, "stop") - ] - if self._rerank_provider and hasattr(self._rerank_provider, "close"): - tasks.append(_close_reranker()) - - await asyncio.gather(*tasks) - logger.info("[V2Integration] All modules stopped") - - # Public API - - @monitored - async def process_message( - self, message: MessageData, group_id: str - ) -> TriggerResult: - """Process an incoming message through the tiered trigger. - - Tier 1 operations run concurrently on every message. Tier 2 - operations fire when their policies are satisfied. - """ - await self.refresh_provider_bindings() - return await self._trigger.process_message(message, group_id) - - @monitored - async def get_enhanced_context( - self, - query: str, - group_id: str, - top_k: int = 5, - ) -> Dict[str, Any]: - """Retrieve v2 enhanced context for response generation. - - Returns a dict with optional keys: - * ``knowledge_context`` (str): Retrieved knowledge graph context. - * ``related_memories`` (List[str]): Semantically related memories. - * ``few_shot_examples`` (List[str]): Style exemplar texts - (not reranked; returned as-is). - * ``graph_stats`` (dict): Social graph summary statistics. - - When a reranker is available, knowledge and memory candidates are - reranked by relevance and only the top-k are returned. Few-shot - exemplars and graph stats are returned unmodified. - - Results are cached per (group_id, query_hash) with a configurable - TTL to avoid redundant retrieval on repeated or similar queries. - - All retrieval tasks run concurrently via ``asyncio.gather`` to - minimise total latency. - """ - await self.refresh_provider_bindings() - - # --- Check query result cache --- - cache_key = self._make_cache_key(query, group_id) - cached_result = self._cache.get("context", cache_key) - if cached_result is not None: - logger.debug( - f"[V2Integration] Context cache hit (group={group_id})" - ) - return cached_result - - context: Dict[str, Any] = {} - - # --- Build concurrent retrieval tasks --- - - async def _fetch_knowledge() -> None: - if not self._knowledge_manager: - return - try: - if hasattr(self._knowledge_manager, "query_knowledge"): - ctx = await self._knowledge_manager.query_knowledge( - query, group_id, - mode=self._config.lightrag_query_mode, - ) - elif hasattr( - self._knowledge_manager, - "answer_question_with_knowledge_graph", - ): - ctx = ( - await self._knowledge_manager - .answer_question_with_knowledge_graph(query, group_id) - ) - else: - ctx = "" - if ctx: - context["knowledge_context"] = ctx - except Exception as exc: - logger.debug( - f"[V2Integration] Knowledge retrieval failed: {exc}" - ) - - async def _fetch_memories() -> None: - if self._memory_delegated(): - logger.debug("[V2Integration] Memory retrieval delegated to LivingMemory") - return - if not self._memory_manager: - return - try: - memories = await self._memory_manager.get_related_memories( - query, group_id - ) - if memories: - context["related_memories"] = memories - except Exception as exc: - logger.debug( - f"[V2Integration] Memory retrieval failed: {exc}" - ) - - async def _fetch_exemplars() -> None: - if not self._exemplar_library: - return - try: - examples = await self._exemplar_library.get_few_shot_examples( - query, group_id, k=top_k - ) - if examples: - context["few_shot_examples"] = examples - except Exception as exc: - logger.debug( - f"[V2Integration] Exemplar retrieval failed: {exc}" - ) - - async def _fetch_graph_stats() -> None: - if not self._social_analyzer: - return - try: - stats = await self._social_analyzer.get_graph_statistics( - group_id - ) - if stats and stats.get("node_count", 0) > 0: - context["graph_stats"] = stats - except Exception as exc: - logger.debug( - f"[V2Integration] Social graph stats failed: {exc}" - ) - - # --- Run all retrievals concurrently --- - await asyncio.gather( - _fetch_knowledge(), - _fetch_memories(), - _fetch_exemplars(), - _fetch_graph_stats(), - ) - - # --- Conditional reranking --- - # Only invoke the reranker when there are enough candidates to - # justify the additional API round-trip latency. - rerank_candidates = len(context.get("related_memories", [])) - if "knowledge_context" in context: - rerank_candidates += 1 - min_candidates = getattr( - self._config, "rerank_min_candidates", 3 - ) - if self._rerank_provider and rerank_candidates >= min_candidates: - context = await self._rerank_context(query, context, top_k) - - # --- Store result in cache --- - self._cache.set("context", cache_key, context) - - return context - - # Cache helpers - - @staticmethod - def _make_cache_key(query: str, group_id: str) -> str: - """Generate a compact cache key from query text and group ID.""" - query_hash = hashlib.md5(query.encode("utf-8")).hexdigest()[:12] - return f"{group_id}:{query_hash}" - - def get_trigger_stats(self, group_id: str) -> Dict[str, Any]: - """Return tiered trigger statistics for a group.""" - return self._trigger.get_group_stats(group_id) - - # Module factories - - def _create_embedding_provider(self) -> Optional[Any]: - """Resolve embedding provider from the framework.""" - try: - from ..embedding.factory import EmbeddingProviderFactory - return EmbeddingProviderFactory.create(self._config, self._context) - except Exception as exc: - logger.debug( - f"[V2Integration] Embedding provider unavailable: {exc}" - ) - return None - - def _create_rerank_provider(self) -> Optional[Any]: - """Resolve reranker provider from the framework.""" - try: - from ..reranker.factory import RerankProviderFactory - return RerankProviderFactory.create(self._config, self._context) - except Exception as exc: - logger.debug(f"[V2Integration] Reranker unavailable: {exc}") - return None - - def _create_knowledge_manager(self) -> Optional[Any]: - """Create knowledge manager based on configured engine.""" - if self._config.knowledge_engine == "lightrag": - if not self._embedding_provider: - if self._embedding_provider_configured(): - logger.info( - "[V2Integration] LightRAG is waiting for the " - "embedding provider registry to become ready" - ) - else: - logger.warning( - "[V2Integration] LightRAG requires an embedding " - "provider; configure embedding_provider_id or use " - "the legacy knowledge engine" - ) - return None - try: - from ..integration import LightRAGKnowledgeManager - return LightRAGKnowledgeManager( - self._config, self._llm, self._embedding_provider - ) - except ImportError: - self._knowledge_manager_retryable = False - logger.warning( - "[V2Integration] lightrag-hku not installed, " - "falling back to legacy knowledge engine" - ) - except Exception as exc: - logger.warning( - f"[V2Integration] LightRAG init failed: {exc}" - ) - logger.debug( - "[V2Integration] LightRAG traceback:", exc_info=True - ) - return None - - def _create_memory_manager(self) -> Optional[Any]: - """Create memory manager based on configured engine.""" - if self._memory_delegated(): - logger.info("[V2Integration] Memory engine skipped: delegated to LivingMemory") - return None - if self._config.memory_engine == "mem0": - if not self._embedding_provider: - if self._embedding_provider_configured(): - logger.info( - "[V2Integration] Mem0 is waiting for the embedding " - "provider registry to become ready" - ) - else: - logger.warning( - "[V2Integration] Mem0 requires an embedding provider; " - "configure embedding_provider_id or use the legacy " - "memory engine" - ) - return None - try: - from ..integration import Mem0MemoryManager - return Mem0MemoryManager( - self._config, self._llm, self._embedding_provider - ) - except ImportError: - self._memory_manager_retryable = False - logger.warning( - "[V2Integration] mem0ai not installed, " - "falling back to legacy memory engine" - ) - except Exception as exc: - logger.warning( - f"[V2Integration] Mem0 init failed: {exc}" - ) - logger.debug( - "[V2Integration] Mem0 traceback:", exc_info=True - ) - return None - - def _create_exemplar_library(self) -> Optional[Any]: - """Create exemplar library if DB and embedding are available.""" - if not self._db: - return None - try: - from ..integration import ExemplarLibrary - return ExemplarLibrary(self._db, self._embedding_provider) - except Exception as exc: - logger.debug( - f"[V2Integration] ExemplarLibrary init failed: {exc}" - ) - return None - - def _create_social_analyzer(self) -> Optional[Any]: - """Create social graph analyzer.""" - try: - from ..social import SocialGraphAnalyzer - return SocialGraphAnalyzer(self._llm, self._db) - except Exception as exc: - logger.debug( - f"[V2Integration] SocialGraphAnalyzer init failed: {exc}" - ) - return None - - def _create_jargon_filter(self) -> Optional[Any]: - """Create jargon statistical filter.""" - try: - from ..jargon import JargonStatisticalFilter - return JargonStatisticalFilter() - except Exception as exc: - logger.debug( - f"[V2Integration] JargonStatisticalFilter init failed: {exc}" - ) - return None - - # Trigger wiring - - def _register_trigger_operations(self) -> None: - """Register all available modules with the tiered trigger. - - Architecture: - Tier 1 (per-message, sub-millisecond): - - jargon_stats: in-memory statistical counters - - ingestion_buffer: append message to buffer (no I/O) - - exemplar: embedding + DB insert (< 1s) - - Tier 2 (batch, LLM-gated, cooldown-protected): - - ingestion_flush: batch-process buffered messages through - LightRAG and Mem0, amortising LLM overhead across - multiple messages - - jargon: LLM-based jargon meaning inference - - social: community detection and influence ranking - - Knowledge graph ingestion (LightRAG) and memory ingestion (Mem0) - are intentionally registered as Tier 2 batch operations rather - than Tier 1 per-message callbacks because they each invoke one - or more LLM round-trips (entity extraction, fact extraction) - that take 3-10 seconds per message. Running them per-message - would dominate the event loop and block subsequent processing. - """ - - # ---- Tier 1: per-message lightweight operations ---- - - if self._jargon_filter: - jf = self._jargon_filter - - async def _jargon_update( - message: MessageData, group_id: str - ) -> None: - jf.update_from_message(message.message, group_id, message.sender_id) - - self._trigger.register_tier1("jargon_stats", _jargon_update) - - # Buffer messages for batch ingestion (knowledge + memory). - # This replaces the previous per-message LightRAG/Mem0 callbacks - # with a sub-millisecond append operation. - if self._knowledge_manager or self._memory_manager: - buf = self._ingestion_buffer - - async def _buffer_message( - message: MessageData, group_id: str - ) -> None: - if ( - message.message - and len(message.message.strip()) >= _MIN_INGESTION_LENGTH - ): - buf[group_id].append(message) - - self._trigger.register_tier1("ingestion_buffer", _buffer_message) - - if self._exemplar_library: - lib = self._exemplar_library - - async def _exemplar_add( - message: MessageData, group_id: str - ) -> None: - await lib.add_exemplar( - message.message, group_id, message.sender_id - ) - - self._trigger.register_tier1("exemplar", _exemplar_add) - - # ---- Tier 2: batch operations (LLM-heavy) ---- - - # Batch ingestion: flush buffered messages through LightRAG - # and Mem0. Fires every 5 messages or 60 seconds, whichever - # comes first. This amortises the per-message LLM overhead - # and reduces total API calls. - if self._knowledge_manager or self._memory_manager: - self._trigger.register_tier2( - "ingestion_flush", - self._flush_ingestion_buffer, - BatchTriggerPolicy( - message_threshold=5, cooldown_seconds=60 - ), - ) - - if self._jargon_filter: - jf2 = self._jargon_filter - llm = self._llm - db = self._db - - async def _jargon_batch(group_id: str) -> None: - candidates = jf2.get_jargon_candidates(group_id, top_k=20) - if not candidates or not llm: - return - for candidate in candidates[:10]: - try: - meaning = await llm.generate_response( - f"Explain the slang/jargon term " - f"'{candidate['term']}' in the context of an " - f"online chat group. Return a concise definition.", - model_type="filter", - ) - if ( - meaning - and db - and hasattr(db, "save_or_update_jargon") - ): - await db.save_or_update_jargon( - group_id, - candidate["term"], - { - "meaning": meaning, - "raw_content": "[]", - "is_jargon": True, - "count": 1, - "is_complete": True, - }, - ) - except Exception as exc: - logger.debug( - f"[V2Integration] Jargon inference failed " - f"for '{candidate['term']}': {exc}" - ) - - self._trigger.register_tier2( - "jargon", - _jargon_batch, - BatchTriggerPolicy( - message_threshold=20, cooldown_seconds=180 - ), - ) - - if self._social_analyzer: - sa = self._social_analyzer - - async def _social_batch(group_id: str) -> None: - # Execute independently so one failure does not skip the other. - try: - await sa.detect_communities(group_id) - except Exception as exc: - logger.debug( - f"[V2Integration] detect_communities failed: {exc}" - ) - try: - await sa.get_influence_ranking(group_id) - except Exception as exc: - logger.debug( - f"[V2Integration] get_influence_ranking failed: {exc}" - ) - - self._trigger.register_tier2( - "social", - _social_batch, - BatchTriggerPolicy( - message_threshold=50, cooldown_seconds=600 - ), - ) - - # Batch ingestion - - async def _flush_ingestion_buffer(self, group_id: str) -> None: - """Flush buffered messages for a group through knowledge and memory. - - Processes all buffered messages concurrently through LightRAG and - Mem0 in a single batch operation, then clears the buffer. Messages - within each engine are processed sequentially to avoid overwhelming - the underlying LLM providers with concurrent requests. - """ - messages = self._ingestion_buffer.pop(group_id, []) - if not messages: - return - - logger.debug( - f"[V2Integration] Flushing ingestion buffer: " - f"group={group_id}, count={len(messages)}" - ) - - async def _ingest_knowledge() -> None: - if not self._knowledge_manager: - return - method = None - if hasattr( - self._knowledge_manager, - "process_message_for_knowledge_graph", - ): - method = self._knowledge_manager.process_message_for_knowledge_graph - elif hasattr( - self._knowledge_manager, "process_message_for_knowledge" - ): - method = self._knowledge_manager.process_message_for_knowledge - if not method: - return - for msg in messages: - try: - await method(msg, group_id) - except Exception as exc: - logger.debug( - f"[V2Integration] Knowledge ingestion failed: {exc}" - ) - - async def _ingest_memory() -> None: - if self._memory_delegated(): - logger.debug("[V2Integration] Memory ingestion delegated to LivingMemory") - return - if not self._memory_manager: - return - for msg in messages: - try: - await self._memory_manager.add_memory_from_message( - msg, group_id - ) - except Exception as exc: - logger.debug( - f"[V2Integration] Memory ingestion failed: {exc}" - ) - - # Run knowledge and memory ingestion concurrently across engines, - # but sequentially within each engine to avoid provider overload. - await asyncio.gather( - _ingest_knowledge(), - _ingest_memory(), - ) - - def _memory_delegated(self) -> bool: - delegation = self._feature_delegation - if not delegation or not hasattr(delegation, "should_delegate_memory"): - return False - try: - return bool(delegation.should_delegate_memory()) - except Exception: - return False - - # Reranking - - @monitored - async def _rerank_context( - self, - query: str, - context: Dict[str, Any], - top_k: int, - ) -> Dict[str, Any]: - """Rerank knowledge and memory candidates by relevance. - - Few-shot exemplars and graph stats are returned unmodified. - """ - try: - documents: List[str] = [] - sources: List[str] = [] - - if "knowledge_context" in context: - documents.append(context["knowledge_context"]) - sources.append("knowledge") - - for mem in context.get("related_memories", []): - documents.append(mem) - sources.append("memory") - - if not documents: - return context - - results = await self._rerank_provider.rerank( - query, documents, top_n=top_k - ) - - # Rebuild context with reranked order. - reranked_memories: List[str] = [] - reranked_knowledge = "" - for r in results: - if r.index >= len(documents): - logger.debug( - f"[V2Integration] Reranker returned out-of-range " - f"index {r.index} (len={len(documents)}); skipping" - ) - continue - src = sources[r.index] - doc = documents[r.index] - if src == "knowledge": - reranked_knowledge = doc - elif src == "memory": - reranked_memories.append(doc) - - if reranked_knowledge: - context["knowledge_context"] = reranked_knowledge - elif "knowledge_context" in context: - del context["knowledge_context"] - - if reranked_memories: - context["related_memories"] = reranked_memories - elif "related_memories" in context: - del context["related_memories"] - - except Exception as exc: - logger.debug( - f"[V2Integration] Reranking failed, using unranked: {exc}" - ) - - return context +""" +V2 learning integration layer. + +Wires together the v2-architecture modules and provides a unified +interface for the ``MaiBotEnhancedLearningManager`` to delegate to. +When v2 features are enabled in ``PluginConfig`` the learning manager +instantiates this class and calls its ``process_message`` and +``get_enhanced_context`` methods alongside (or instead of) the legacy +code paths. + +Modules orchestrated: + * ``TieredLearningTrigger`` — per-message / batch operation scheduling + * ``LightRAGKnowledgeManager`` — knowledge graph (replaces legacy) + * ``Mem0MemoryManager`` — memory management (replaces legacy) + * ``ExemplarLibrary`` — few-shot style exemplar retrieval + * ``SocialGraphAnalyzer`` — community detection / influence ranking + * ``JargonStatisticalFilter`` — statistical jargon pre-filter + * ``IRerankProvider`` — cross-source context reranking + +Design notes: + - All module construction is guarded by the relevant config flags so + that unused modules are never instantiated. + - ``start()`` / ``stop()`` manage the full lifecycle of every active + v2 module. + - Each module that can fail during construction logs a warning and + falls back gracefully (the integration layer keeps working with + the remaining modules). + - Thread-safe for single-event-loop asyncio usage. +""" + +import asyncio +import hashlib +import time +from collections import defaultdict +from typing import Any, Dict, List, Optional, Tuple + +from astrbot.api import logger + +from ...config import PluginConfig +from ...core.interfaces import MessageData +from ...utils.cache_manager import get_cache_manager +from ..monitoring.instrumentation import monitored +from ..quality import ( + BatchTriggerPolicy, + TieredLearningTrigger, + TriggerResult, +) + +# Minimum message length to consider for LLM-heavy ingestion operations. +_MIN_INGESTION_LENGTH = 15 + +# Maximum buffered messages per group before force-flushing. +_INGESTION_BUFFER_MAX = 10 + + +class V2LearningIntegration: + """Facade that initialises, wires, and exposes v2 learning modules. + + Usage:: + + v2 = V2LearningIntegration(config, llm_adapter, db_manager, context) + await v2.start() + result = await v2.process_message(message, group_id) + context = await v2.get_enhanced_context("query", group_id) + await v2.stop() + """ + + def __init__( + self, + config: PluginConfig, + llm_adapter: Optional[Any] = None, + db_manager: Optional[Any] = None, + context: Optional[Any] = None, + feature_delegation: Optional[Any] = None, + ) -> None: + self._config = config + self._llm = llm_adapter + self._db = db_manager + self._context = context + self._feature_delegation = feature_delegation + self._started = False + self._provider_retry_lock = asyncio.Lock() + self._last_provider_retry: float = 0.0 + self._provider_retry_interval: float = max( + 0.1, + float(getattr(config, "provider_retry_interval_seconds", 10.0) or 10.0), + ) + self._knowledge_manager_retryable = True + self._memory_manager_retryable = True + + # --- Resolve framework providers via factories --------------- + self._embedding_provider = self._create_embedding_provider() + self._rerank_provider = self._create_rerank_provider() + + # --- Instantiate v2 modules ---------------------------------- + self._knowledge_manager = self._create_knowledge_manager() + self._memory_manager = self._create_memory_manager() + self._exemplar_library = self._create_exemplar_library() + self._social_analyzer = self._create_social_analyzer() + self._jargon_filter = self._create_jargon_filter() + + # --- Query result cache via CacheManager ---------------------- + self._cache = get_cache_manager() + + # --- Message buffer for batch ingestion ----------------------- + # Knowledge and memory ingestion are LLM-heavy operations and + # must not run per-message. Instead, messages are buffered here + # and flushed as a batch in a Tier 2 operation. + self._ingestion_buffer: Dict[str, List[MessageData]] = defaultdict(list) + + # --- Tiered trigger ------------------------------------------ + self._trigger = TieredLearningTrigger() + self._register_trigger_operations() + + logger.info( + "[V2Integration] Initialised — " + f"knowledge={self._config.knowledge_engine}, " + f"memory={self._config.memory_engine}, " + f"memory_delegated={self._memory_delegated()}, " + f"embedding={'yes' if self._embedding_provider else 'no'}, " + f"reranker={'yes' if self._rerank_provider else 'no'}" + ) + + # Lifecycle + + async def start(self) -> None: + """Start all active v2 modules that expose a ``start`` method.""" + await self.refresh_provider_bindings(force=True) + + await asyncio.gather(*( + self._start_one(name, module) + for name, module in self._active_modules() + if module and hasattr(module, "start") + )) + self._started = True + logger.info("[V2Integration] All modules started") + + async def refresh_provider_bindings(self, *, force: bool = False) -> bool: + """Retry framework provider binding and create dependent modules. + + AstrBot can load plugins before provider registries are populated. This + lets startup, warmup, and first-use paths bind providers later without a + manual plugin reload. + """ + if not self._needs_provider_or_module_retry(): + return False + + if not force and not self._provider_retry_due(): + return False + + async with self._provider_retry_lock: + if not self._needs_provider_or_module_retry(): + return False + + if not force and not self._provider_retry_due(): + return False + self._last_provider_retry = time.monotonic() + + changed = False + modules_to_start: List[Tuple[str, Any]] = [] + + if not self._embedding_provider and self._embedding_provider_configured(): + provider = self._create_embedding_provider() + if provider: + self._embedding_provider = provider + changed = True + + if not self._rerank_provider and self._rerank_provider_configured(): + provider = self._create_rerank_provider() + if provider: + self._rerank_provider = provider + changed = True + + if self._embedding_provider: + if self._knowledge_manager is None: + self._knowledge_manager = self._create_knowledge_manager() + if self._knowledge_manager: + changed = True + modules_to_start.append(( + "knowledge_manager", + self._knowledge_manager, + )) + + if self._memory_manager is None: + self._memory_manager = self._create_memory_manager() + if self._memory_manager: + changed = True + modules_to_start.append(( + "memory_manager", + self._memory_manager, + )) + + if self._exemplar_library_needs_embedding_refresh(): + self._exemplar_library = self._create_exemplar_library() + changed = True + + if changed: + self._register_trigger_operations() + if self._started and modules_to_start: + await asyncio.gather(*( + self._start_one(name, module) + for name, module in modules_to_start + if module and hasattr(module, "start") + )) + logger.info( + "[V2Integration] Provider bindings refreshed — " + f"embedding={'yes' if self._embedding_provider else 'no'}, " + f"reranker={'yes' if self._rerank_provider else 'no'}" + ) + return changed + + def _provider_retry_due(self) -> bool: + return ( + time.monotonic() - self._last_provider_retry + >= self._provider_retry_interval + ) + + async def _start_one(self, name: str, module: Any) -> None: + try: + await module.start() + except Exception as exc: + logger.warning( + f"[V2Integration] {name} start failed: {exc}" + ) + + def _active_modules(self) -> List[Tuple[str, Any]]: + return [ + ("knowledge_manager", self._knowledge_manager), + ("memory_manager", self._memory_manager), + ("exemplar_library", self._exemplar_library), + ("social_analyzer", self._social_analyzer), + ("jargon_filter", self._jargon_filter), + ] + + def _needs_provider_or_module_retry(self) -> bool: + if self._embedding_provider_configured() and not self._embedding_provider: + return True + if self._rerank_provider_configured() and not self._rerank_provider: + return True + if self._embedding_provider and self._knowledge_manager is None: + return ( + self._config.knowledge_engine == "lightrag" + and self._knowledge_manager_retryable + ) + if self._embedding_provider and self._memory_manager is None: + return ( + self._config.memory_engine == "mem0" + and not self._memory_delegated() + and self._memory_manager_retryable + ) + return self._exemplar_library_needs_embedding_refresh() + + def _embedding_provider_configured(self) -> bool: + return bool( + str(getattr(self._config, "embedding_provider_id", "") or "").strip() + ) + + def _rerank_provider_configured(self) -> bool: + return bool( + str(getattr(self._config, "rerank_provider_id", "") or "").strip() + ) + + def _exemplar_library_needs_embedding_refresh(self) -> bool: + if not (self._db and self._embedding_provider and self._exemplar_library): + return False + return getattr(self._exemplar_library, "_embedding", None) is None + + async def warmup(self, group_ids: List[str]) -> None: + """Pre-warm heavyweight module instances for *group_ids*. + + Should be called shortly after ``start()`` once active group IDs + are known. Currently only LightRAG benefits from pre-warming + (each cold-start avoids a 12-15s initialisation penalty on the + first user query). + """ + await self.refresh_provider_bindings() + if ( + self._knowledge_manager + and hasattr(self._knowledge_manager, "warmup_instances") + ): + try: + await self._knowledge_manager.warmup_instances(group_ids) + except Exception as exc: + logger.debug( + f"[V2Integration] Knowledge warmup failed: {exc}" + ) + + async def stop(self) -> None: + """Stop all active v2 modules and release resources. + + Attempts to flush remaining buffered messages with a per-group + timeout. Timed-out buffers are discarded to avoid blocking + the shutdown sequence. + """ + _flush_timeout = self._config.task_cancel_timeout + + for group_id in list(self._ingestion_buffer.keys()): + try: + await asyncio.wait_for( + self._flush_ingestion_buffer(group_id), + timeout=_flush_timeout, + ) + except asyncio.TimeoutError: + dropped = len(self._ingestion_buffer.pop(group_id, [])) + logger.warning( + f"[V2Integration] Buffer flush timeout for group " + f"{group_id}, dropped {dropped} messages" + ) + except Exception as exc: + logger.warning( + f"[V2Integration] Buffer flush failed on stop " + f"for group {group_id}: {exc}" + ) + + modules: List[Tuple[str, Any]] = [ + ("knowledge_manager", self._knowledge_manager), + ("memory_manager", self._memory_manager), + ("exemplar_library", self._exemplar_library), + ("social_analyzer", self._social_analyzer), + ("jargon_filter", self._jargon_filter), + ] + + async def _stop_one(name: str, module: Any) -> None: + try: + await module.stop() + except Exception as exc: + logger.warning( + f"[V2Integration] {name} stop failed: {exc}" + ) + + async def _close_reranker() -> None: + try: + await self._rerank_provider.close() + except Exception as exc: + logger.warning(f"[V2Integration] Reranker close failed: {exc}") + + tasks = [ + _stop_one(name, module) + for name, module in modules + if module and hasattr(module, "stop") + ] + if self._rerank_provider and hasattr(self._rerank_provider, "close"): + tasks.append(_close_reranker()) + + await asyncio.gather(*tasks) + logger.info("[V2Integration] All modules stopped") + + # Public API + + @monitored + async def process_message( + self, message: MessageData, group_id: str + ) -> TriggerResult: + """Process an incoming message through the tiered trigger. + + Tier 1 operations run concurrently on every message. Tier 2 + operations fire when their policies are satisfied. + """ + await self.refresh_provider_bindings() + return await self._trigger.process_message(message, group_id) + + @monitored + async def get_enhanced_context( + self, + query: str, + group_id: str, + top_k: int = 5, + ) -> Dict[str, Any]: + """Retrieve v2 enhanced context for response generation. + + Returns a dict with optional keys: + * ``knowledge_context`` (str): Retrieved knowledge graph context. + * ``related_memories`` (List[str]): Semantically related memories. + * ``few_shot_examples`` (List[str]): Style exemplar texts + (not reranked; returned as-is). + * ``graph_stats`` (dict): Social graph summary statistics. + + When a reranker is available, knowledge and memory candidates are + reranked by relevance and only the top-k are returned. Few-shot + exemplars and graph stats are returned unmodified. + + Results are cached per (group_id, query_hash) with a configurable + TTL to avoid redundant retrieval on repeated or similar queries. + + All retrieval tasks run concurrently via ``asyncio.gather`` to + minimise total latency. + """ + await self.refresh_provider_bindings() + + # --- Check query result cache --- + cache_key = self._make_cache_key(query, group_id) + cached_result = self._cache.get("context", cache_key) + if cached_result is not None: + logger.debug( + f"[V2Integration] Context cache hit (group={group_id})" + ) + return cached_result + + context: Dict[str, Any] = {} + + # --- Build concurrent retrieval tasks --- + + async def _fetch_knowledge() -> None: + if not self._knowledge_manager: + return + try: + if hasattr(self._knowledge_manager, "query_knowledge"): + ctx = await self._knowledge_manager.query_knowledge( + query, group_id, + mode=self._config.lightrag_query_mode, + ) + elif hasattr( + self._knowledge_manager, + "answer_question_with_knowledge_graph", + ): + ctx = ( + await self._knowledge_manager + .answer_question_with_knowledge_graph(query, group_id) + ) + else: + ctx = "" + if ctx: + context["knowledge_context"] = ctx + except Exception as exc: + logger.debug( + f"[V2Integration] Knowledge retrieval failed: {exc}" + ) + + async def _fetch_memories() -> None: + if self._memory_delegated(): + logger.debug("[V2Integration] Memory retrieval delegated to LivingMemory") + return + if not self._memory_manager: + return + try: + memories = await self._memory_manager.get_related_memories( + query, group_id + ) + if memories: + context["related_memories"] = memories + except Exception as exc: + logger.debug( + f"[V2Integration] Memory retrieval failed: {exc}" + ) + + async def _fetch_exemplars() -> None: + if not self._exemplar_library: + return + try: + examples = await self._exemplar_library.get_few_shot_examples( + query, group_id, k=top_k + ) + if examples: + context["few_shot_examples"] = examples + except Exception as exc: + logger.debug( + f"[V2Integration] Exemplar retrieval failed: {exc}" + ) + + async def _fetch_graph_stats() -> None: + if not self._social_analyzer: + return + try: + stats = await self._social_analyzer.get_graph_statistics( + group_id + ) + if stats and stats.get("node_count", 0) > 0: + context["graph_stats"] = stats + except Exception as exc: + logger.debug( + f"[V2Integration] Social graph stats failed: {exc}" + ) + + # --- Run all retrievals concurrently --- + await asyncio.gather( + _fetch_knowledge(), + _fetch_memories(), + _fetch_exemplars(), + _fetch_graph_stats(), + ) + + # --- Conditional reranking --- + # Only invoke the reranker when there are enough candidates to + # justify the additional API round-trip latency. + rerank_candidates = len(context.get("related_memories", [])) + if "knowledge_context" in context: + rerank_candidates += 1 + min_candidates = getattr( + self._config, "rerank_min_candidates", 3 + ) + if self._rerank_provider and rerank_candidates >= min_candidates: + context = await self._rerank_context(query, context, top_k) + + # --- Store result in cache --- + self._cache.set("context", cache_key, context) + + return context + + # Cache helpers + + @staticmethod + def _make_cache_key(query: str, group_id: str) -> str: + """Generate a compact cache key from query text and group ID.""" + query_hash = hashlib.md5(query.encode("utf-8")).hexdigest()[:12] + return f"{group_id}:{query_hash}" + + def get_trigger_stats(self, group_id: str) -> Dict[str, Any]: + """Return tiered trigger statistics for a group.""" + return self._trigger.get_group_stats(group_id) + + # Module factories + + def _create_embedding_provider(self) -> Optional[Any]: + """Resolve embedding provider from the framework.""" + try: + from ..embedding.factory import EmbeddingProviderFactory + return EmbeddingProviderFactory.create(self._config, self._context) + except Exception as exc: + logger.debug( + f"[V2Integration] Embedding provider unavailable: {exc}" + ) + return None + + def _create_rerank_provider(self) -> Optional[Any]: + """Resolve reranker provider from the framework.""" + try: + from ..reranker.factory import RerankProviderFactory + return RerankProviderFactory.create(self._config, self._context) + except Exception as exc: + logger.debug(f"[V2Integration] Reranker unavailable: {exc}") + return None + + def _create_knowledge_manager(self) -> Optional[Any]: + """Create knowledge manager based on configured engine.""" + if self._config.knowledge_engine == "lightrag": + if not self._embedding_provider: + if self._embedding_provider_configured(): + logger.info( + "[V2Integration] LightRAG is waiting for the " + "embedding provider registry to become ready" + ) + else: + logger.warning( + "[V2Integration] LightRAG requires an embedding " + "provider; configure embedding_provider_id or use " + "the legacy knowledge engine" + ) + return None + try: + from ..integration import LightRAGKnowledgeManager + return LightRAGKnowledgeManager( + self._config, self._llm, self._embedding_provider + ) + except ImportError: + self._knowledge_manager_retryable = False + logger.warning( + "[V2Integration] lightrag-hku not installed, " + "falling back to legacy knowledge engine" + ) + except Exception as exc: + logger.warning( + f"[V2Integration] LightRAG init failed: {exc}" + ) + logger.debug( + "[V2Integration] LightRAG traceback:", exc_info=True + ) + return None + + def _create_memory_manager(self) -> Optional[Any]: + """Create memory manager based on configured engine.""" + if self._memory_delegated(): + logger.info("[V2Integration] Memory engine skipped: delegated to LivingMemory") + return None + if self._config.memory_engine == "mem0": + if not self._embedding_provider: + if self._embedding_provider_configured(): + logger.info( + "[V2Integration] Mem0 is waiting for the embedding " + "provider registry to become ready" + ) + else: + logger.warning( + "[V2Integration] Mem0 requires an embedding provider; " + "configure embedding_provider_id or use the legacy " + "memory engine" + ) + return None + try: + from ..integration import Mem0MemoryManager + return Mem0MemoryManager( + self._config, self._llm, self._embedding_provider + ) + except ImportError: + self._memory_manager_retryable = False + logger.warning( + "[V2Integration] mem0ai not installed, " + "falling back to legacy memory engine" + ) + except Exception as exc: + logger.warning( + f"[V2Integration] Mem0 init failed: {exc}" + ) + logger.debug( + "[V2Integration] Mem0 traceback:", exc_info=True + ) + return None + + def _create_exemplar_library(self) -> Optional[Any]: + """Create exemplar library if DB and embedding are available.""" + if not self._db: + return None + try: + from ..integration import ExemplarLibrary + return ExemplarLibrary(self._db, self._embedding_provider) + except Exception as exc: + logger.debug( + f"[V2Integration] ExemplarLibrary init failed: {exc}" + ) + return None + + def _create_social_analyzer(self) -> Optional[Any]: + """Create social graph analyzer.""" + try: + from ..social import SocialGraphAnalyzer + return SocialGraphAnalyzer(self._llm, self._db) + except Exception as exc: + logger.debug( + f"[V2Integration] SocialGraphAnalyzer init failed: {exc}" + ) + return None + + def _create_jargon_filter(self) -> Optional[Any]: + """Create jargon statistical filter.""" + try: + from ..jargon import JargonStatisticalFilter + return JargonStatisticalFilter() + except Exception as exc: + logger.debug( + f"[V2Integration] JargonStatisticalFilter init failed: {exc}" + ) + return None + + # Trigger wiring + + def _register_trigger_operations(self) -> None: + """Register all available modules with the tiered trigger. + + Architecture: + Tier 1 (per-message, sub-millisecond): + - jargon_stats: in-memory statistical counters + - ingestion_buffer: append message to buffer (no I/O) + - exemplar: embedding + DB insert (< 1s) + + Tier 2 (batch, LLM-gated, cooldown-protected): + - ingestion_flush: batch-process buffered messages through + LightRAG and Mem0, amortising LLM overhead across + multiple messages + - jargon: LLM-based jargon meaning inference + - social: community detection and influence ranking + + Knowledge graph ingestion (LightRAG) and memory ingestion (Mem0) + are intentionally registered as Tier 2 batch operations rather + than Tier 1 per-message callbacks because they each invoke one + or more LLM round-trips (entity extraction, fact extraction) + that take 3-10 seconds per message. Running them per-message + would dominate the event loop and block subsequent processing. + """ + + # ---- Tier 1: per-message lightweight operations ---- + + if self._jargon_filter: + jf = self._jargon_filter + + async def _jargon_update( + message: MessageData, group_id: str + ) -> None: + jf.update_from_message(message.message, group_id, message.sender_id) + + self._trigger.register_tier1("jargon_stats", _jargon_update) + + # Buffer messages for batch ingestion (knowledge + memory). + # This replaces the previous per-message LightRAG/Mem0 callbacks + # with a sub-millisecond append operation. + if self._knowledge_manager or self._memory_manager: + buf = self._ingestion_buffer + + async def _buffer_message( + message: MessageData, group_id: str + ) -> None: + if ( + message.message + and len(message.message.strip()) >= _MIN_INGESTION_LENGTH + ): + buf[group_id].append(message) + + self._trigger.register_tier1("ingestion_buffer", _buffer_message) + + if self._exemplar_library: + lib = self._exemplar_library + + async def _exemplar_add( + message: MessageData, group_id: str + ) -> None: + await lib.add_exemplar( + message.message, group_id, message.sender_id + ) + + self._trigger.register_tier1("exemplar", _exemplar_add) + + # ---- Tier 2: batch operations (LLM-heavy) ---- + + # Batch ingestion: flush buffered messages through LightRAG + # and Mem0. Fires every 5 messages or 60 seconds, whichever + # comes first. This amortises the per-message LLM overhead + # and reduces total API calls. + if self._knowledge_manager or self._memory_manager: + self._trigger.register_tier2( + "ingestion_flush", + self._flush_ingestion_buffer, + BatchTriggerPolicy( + message_threshold=5, cooldown_seconds=60 + ), + ) + + if self._jargon_filter: + jf2 = self._jargon_filter + llm = self._llm + db = self._db + + async def _jargon_batch(group_id: str) -> None: + candidates = jf2.get_jargon_candidates(group_id, top_k=20) + if not candidates or not llm: + return + for candidate in candidates[:10]: + try: + # 跳过已被手动编辑或已完成推断的黑话,避免覆盖用户修改 + if db and hasattr(db, "get_jargon"): + existing = await db.get_jargon(group_id, candidate["term"]) + if existing and existing.get("is_complete"): + logger.debug( + f"[V2Integration] 跳过已完成的黑话: " + f"'{candidate['term']}'" + ) + continue + + meaning = await llm.generate_response( + f"Explain the slang/jargon term " + f"'{candidate['term']}' in the context of an " + f"online chat group. Return a concise definition.", + model_type="filter", + ) + if ( + meaning + and db + and hasattr(db, "save_or_update_jargon") + ): + jargon_data = { + "meaning": meaning, + "raw_content": "[]", + "is_jargon": True, + "is_complete": True, + } + # 已有记录时保留原始计数,不重置为1 + if existing: + jargon_data["count"] = existing.get("count", 1) + else: + jargon_data["count"] = 1 + await db.save_or_update_jargon( + group_id, + candidate["term"], + jargon_data, + ) + except Exception as exc: + logger.debug( + f"[V2Integration] Jargon inference failed " + f"for '{candidate['term']}': {exc}" + ) + + self._trigger.register_tier2( + "jargon", + _jargon_batch, + BatchTriggerPolicy( + message_threshold=20, cooldown_seconds=180 + ), + ) + + if self._social_analyzer: + sa = self._social_analyzer + + async def _social_batch(group_id: str) -> None: + # Execute independently so one failure does not skip the other. + try: + await sa.detect_communities(group_id) + except Exception as exc: + logger.debug( + f"[V2Integration] detect_communities failed: {exc}" + ) + try: + await sa.get_influence_ranking(group_id) + except Exception as exc: + logger.debug( + f"[V2Integration] get_influence_ranking failed: {exc}" + ) + + self._trigger.register_tier2( + "social", + _social_batch, + BatchTriggerPolicy( + message_threshold=50, cooldown_seconds=600 + ), + ) + + # Batch ingestion + + async def _flush_ingestion_buffer(self, group_id: str) -> None: + """Flush buffered messages for a group through knowledge and memory. + + Processes all buffered messages concurrently through LightRAG and + Mem0 in a single batch operation, then clears the buffer. Messages + within each engine are processed sequentially to avoid overwhelming + the underlying LLM providers with concurrent requests. + """ + messages = self._ingestion_buffer.pop(group_id, []) + if not messages: + return + + logger.debug( + f"[V2Integration] Flushing ingestion buffer: " + f"group={group_id}, count={len(messages)}" + ) + + async def _ingest_knowledge() -> None: + if not self._knowledge_manager: + return + method = None + if hasattr( + self._knowledge_manager, + "process_message_for_knowledge_graph", + ): + method = self._knowledge_manager.process_message_for_knowledge_graph + elif hasattr( + self._knowledge_manager, "process_message_for_knowledge" + ): + method = self._knowledge_manager.process_message_for_knowledge + if not method: + return + for msg in messages: + try: + await method(msg, group_id) + except Exception as exc: + logger.debug( + f"[V2Integration] Knowledge ingestion failed: {exc}" + ) + + async def _ingest_memory() -> None: + if self._memory_delegated(): + logger.debug("[V2Integration] Memory ingestion delegated to LivingMemory") + return + if not self._memory_manager: + return + for msg in messages: + try: + await self._memory_manager.add_memory_from_message( + msg, group_id + ) + except Exception as exc: + logger.debug( + f"[V2Integration] Memory ingestion failed: {exc}" + ) + + # Run knowledge and memory ingestion concurrently across engines, + # but sequentially within each engine to avoid provider overload. + await asyncio.gather( + _ingest_knowledge(), + _ingest_memory(), + ) + + def _memory_delegated(self) -> bool: + delegation = self._feature_delegation + if not delegation or not hasattr(delegation, "should_delegate_memory"): + return False + try: + return bool(delegation.should_delegate_memory()) + except Exception: + return False + + # Reranking + + @monitored + async def _rerank_context( + self, + query: str, + context: Dict[str, Any], + top_k: int, + ) -> Dict[str, Any]: + """Rerank knowledge and memory candidates by relevance. + + Few-shot exemplars and graph stats are returned unmodified. + """ + try: + documents: List[str] = [] + sources: List[str] = [] + + if "knowledge_context" in context: + documents.append(context["knowledge_context"]) + sources.append("knowledge") + + for mem in context.get("related_memories", []): + documents.append(mem) + sources.append("memory") + + if not documents: + return context + + results = await self._rerank_provider.rerank( + query, documents, top_n=top_k + ) + + # Rebuild context with reranked order. + reranked_memories: List[str] = [] + reranked_knowledge = "" + for r in results: + if r.index >= len(documents): + logger.debug( + f"[V2Integration] Reranker returned out-of-range " + f"index {r.index} (len={len(documents)}); skipping" + ) + continue + src = sources[r.index] + doc = documents[r.index] + if src == "knowledge": + reranked_knowledge = doc + elif src == "memory": + reranked_memories.append(doc) + + if reranked_knowledge: + context["knowledge_context"] = reranked_knowledge + elif "knowledge_context" in context: + del context["knowledge_context"] + + if reranked_memories: + context["related_memories"] = reranked_memories + elif "related_memories" in context: + del context["related_memories"] + + except Exception as exc: + logger.debug( + f"[V2Integration] Reranking failed, using unranked: {exc}" + ) + + return context