diff --git a/openviking/storage/queuefs/semantic_queue.py b/openviking/storage/queuefs/semantic_queue.py index 4f412d9cc..28bb74aac 100644 --- a/openviking/storage/queuefs/semantic_queue.py +++ b/openviking/storage/queuefs/semantic_queue.py @@ -2,8 +2,14 @@ # SPDX-License-Identifier: Apache-2.0 """SemanticQueue: Semantic extraction queue.""" -from typing import Optional +import hashlib +import json +import threading +import time +from dataclasses import dataclass +from typing import Dict, Optional +from openviking_cli.utils.config import get_openviking_config from openviking_cli.utils.logger import get_logger from .named_queue import NamedQueue @@ -12,12 +18,194 @@ logger = get_logger(__name__) +@dataclass +class _TrackedSemanticRequest: + msg: SemanticMsg + queue_msg_id: str + follow_up_msg: Optional[SemanticMsg] = None + + class SemanticQueue(NamedQueue): """Semantic extraction queue for async generation of .abstract.md and .overview.md.""" - async def enqueue(self, msg: SemanticMsg) -> str: + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._semantic_lock = threading.Lock() + self._tracked_by_key: Dict[str, _TrackedSemanticRequest] = {} + self._queue_id_to_key: Dict[str, str] = {} + self._completed_request_at: Dict[str, float] = {} + + @staticmethod + def _normalize_changes(changes: Optional[Dict[str, list]]) -> Dict[str, list]: + normalized: Dict[str, list] = {"added": [], "modified": [], "deleted": []} + if not changes: + return normalized + + for key in normalized: + values = changes.get(key) or [] + normalized[key] = sorted({str(value) for value in values}) + return normalized + + @classmethod + def _logical_key(cls, msg: SemanticMsg) -> str: + payload = { + "uri": msg.uri, + "context_type": msg.context_type, + "recursive": msg.recursive, + "account_id": msg.account_id, + "user_id": msg.user_id, + "agent_id": msg.agent_id, + "role": msg.role, + "skip_vectorization": msg.skip_vectorization, + "telemetry_id": msg.telemetry_id or "", + "target_uri": msg.target_uri or "", + "lifecycle_lock_handle_id": msg.lifecycle_lock_handle_id or "", + "is_code_repo": msg.is_code_repo, + } + return json.dumps(payload, sort_keys=True, separators=(",", ":")) + + @classmethod + def _request_fingerprint(cls, msg: SemanticMsg) -> str: + payload = { + "logical_key": cls._logical_key(msg), + "changes": cls._normalize_changes(msg.changes), + } + digest = hashlib.sha256( + json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8") + ) + return digest.hexdigest() + + @classmethod + def _merge_msgs(cls, base: SemanticMsg, incoming: SemanticMsg) -> SemanticMsg: + merged = SemanticMsg.from_dict(base.to_dict()) + merged.changes = cls._normalize_changes(base.changes) + incoming_changes = cls._normalize_changes(incoming.changes) + + if merged.changes == {"added": [], "modified": [], "deleted": []}: + merged.changes = None + + if incoming_changes == {"added": [], "modified": [], "deleted": []}: + return merged + + if merged.changes is None: + merged.changes = incoming_changes + return merged + + for key in ("added", "modified", "deleted"): + merged_values = set(merged.changes.get(key, [])) + merged_values.update(incoming_changes.get(key, [])) + merged.changes[key] = sorted(merged_values) + return merged + + def _cooldown_seconds(self) -> int: + try: + return max(0, int(get_openviking_config().semantic.summary_enqueue_cooldown_seconds)) + except Exception: + return 0 + + async def enqueue(self, msg: SemanticMsg, bypass_cooldown: bool = False) -> str: """Serialize SemanticMsg object and store in queue.""" - return await super().enqueue(msg.to_dict()) + now = time.monotonic() + logical_key = self._logical_key(msg) + request_fingerprint = self._request_fingerprint(msg) + cooldown_seconds = self._cooldown_seconds() + + with self._semantic_lock: + last_completed_at = self._completed_request_at.get(request_fingerprint) + if ( + not bypass_cooldown + and cooldown_seconds > 0 + and last_completed_at is not None + and now - last_completed_at < cooldown_seconds + ): + logger.info( + "Skipped semantic enqueue within cooldown: uri=%s context=%s", + msg.uri, + msg.context_type, + ) + tracked = self._tracked_by_key.get(logical_key) + return tracked.queue_msg_id if tracked else msg.id + + tracked = self._tracked_by_key.get(logical_key) + if tracked: + baseline_msg = tracked.follow_up_msg or tracked.msg + merged_msg = self._merge_msgs(baseline_msg, msg) + baseline_fingerprint = self._request_fingerprint(baseline_msg) + merged_fingerprint = self._request_fingerprint(merged_msg) + should_preserve_retry = msg.id == tracked.msg.id + if should_preserve_retry: + if tracked.follow_up_msg is None: + tracked.follow_up_msg = SemanticMsg.from_dict(msg.to_dict()) + logger.info( + "Preserved semantic re-enqueue for active request: uri=%s context=%s", + msg.uri, + msg.context_type, + ) + else: + logger.info( + "Kept existing coalesced follow-up for active retry: uri=%s context=%s", + msg.uri, + msg.context_type, + ) + elif baseline_fingerprint != merged_fingerprint: + tracked.follow_up_msg = merged_msg + logger.info( + "Coalesced semantic enqueue while request is active: uri=%s context=%s", + msg.uri, + msg.context_type, + ) + else: + logger.debug( + "Deduped identical semantic enqueue while request is active: uri=%s context=%s", + msg.uri, + msg.context_type, + ) + return tracked.queue_msg_id + + queue_msg_id = await super().enqueue(msg.to_dict()) + + with self._semantic_lock: + self._tracked_by_key[logical_key] = _TrackedSemanticRequest( + msg=SemanticMsg.from_dict(msg.to_dict()), + queue_msg_id=queue_msg_id, + ) + self._queue_id_to_key[queue_msg_id] = logical_key + + return queue_msg_id + + async def ack(self, msg_id: str) -> None: + """Acknowledge successful processing and release tracked semantic state.""" + follow_up_msg: Optional[SemanticMsg] = None + logical_key: Optional[str] = None + tracked: Optional[_TrackedSemanticRequest] = None + + if not msg_id: + return + + ack_file = f"{self.path}/ack" + with self._semantic_lock: + logical_key = self._queue_id_to_key.get(msg_id) + if logical_key: + tracked = self._tracked_by_key.get(logical_key) + + try: + self._agfs.write(ack_file, msg_id.encode("utf-8")) + except Exception as e: + logger.warning(f"[SemanticQueue] Ack failed for {self.name} msg_id={msg_id}: {e}") + return + + with self._semantic_lock: + logical_key = self._queue_id_to_key.pop(msg_id, None) + if logical_key: + tracked = self._tracked_by_key.pop(logical_key, None) + if tracked: + self._completed_request_at[self._request_fingerprint(tracked.msg)] = ( + time.monotonic() + ) + follow_up_msg = tracked.follow_up_msg + + if follow_up_msg is not None: + await self.enqueue(follow_up_msg, bypass_cooldown=True) async def dequeue(self) -> Optional[SemanticMsg]: """Get message from queue and deserialize to SemanticMsg object.""" @@ -54,3 +242,13 @@ async def peek(self) -> Optional[SemanticMsg]: return SemanticMsg.from_dict(data_dict) except Exception: return None + + async def clear(self) -> bool: + """Clear queue data and reset semantic tracking state.""" + cleared = await super().clear() + if cleared: + with self._semantic_lock: + self._tracked_by_key.clear() + self._queue_id_to_key.clear() + self._completed_request_at.clear() + return cleared diff --git a/openviking_cli/utils/config/parser_config.py b/openviking_cli/utils/config/parser_config.py index abd805c53..0121173f3 100644 --- a/openviking_cli/utils/config/parser_config.py +++ b/openviking_cli/utils/config/parser_config.py @@ -554,6 +554,9 @@ class SemanticConfig: memory_chunk_overlap: int = 200 """Character overlap between adjacent memory chunks for context continuity.""" + summary_enqueue_cooldown_seconds: int = 300 + """Minimum interval before re-enqueuing the same semantic summary request after completion.""" + # Configuration registry for dynamic loading PARSER_CONFIG_REGISTRY = { diff --git a/tests/misc/test_semantic_config.py b/tests/misc/test_semantic_config.py index e68b768d0..4ecdc34db 100644 --- a/tests/misc/test_semantic_config.py +++ b/tests/misc/test_semantic_config.py @@ -17,6 +17,7 @@ def test_semantic_config_defaults(): assert config.overview_max_chars == 4000 assert config.memory_chunk_chars == 2000 assert config.memory_chunk_overlap == 200 + assert config.summary_enqueue_cooldown_seconds == 300 def test_semantic_config_custom_values(): diff --git a/tests/storage/test_semantic_queue_dedupe.py b/tests/storage/test_semantic_queue_dedupe.py new file mode 100644 index 000000000..0a84c11d3 --- /dev/null +++ b/tests/storage/test_semantic_queue_dedupe.py @@ -0,0 +1,307 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 + +import json +from types import SimpleNamespace + +from openviking.storage.queuefs.semantic_msg import SemanticMsg +from openviking.storage.queuefs.semantic_queue import SemanticQueue + + +class FakeQueueAGFS: + def __init__(self, fail_ack: bool = False): + self._pending = [] + self._processing = {} + self._dirs = set() + self._next_id = 0 + self._fail_ack = fail_ack + + def mkdir(self, path): + self._dirs.add(path) + + def write(self, path, content): + if path.endswith("/enqueue"): + self._next_id += 1 + msg_id = f"msg-{self._next_id}" + payload = content.decode("utf-8") if isinstance(content, bytes) else str(content) + self._pending.append({"id": msg_id, "data": payload}) + return msg_id + if path.endswith("/ack"): + if self._fail_ack: + raise RuntimeError("ack failed") + msg_id = content.decode("utf-8") if isinstance(content, bytes) else str(content) + self._processing.pop(msg_id, None) + return msg_id + if path.endswith("/clear"): + self._pending.clear() + self._processing.clear() + return "cleared" + raise NotImplementedError(path) + + def read(self, path): + if path.endswith("/size"): + return str(len(self._pending)).encode("utf-8") + if path.endswith("/peek"): + if not self._pending: + return b"{}" + return json.dumps(self._pending[0]).encode("utf-8") + if path.endswith("/dequeue"): + if not self._pending: + return b"{}" + item = self._pending.pop(0) + self._processing[item["id"]] = item + return json.dumps(item).encode("utf-8") + raise NotImplementedError(path) + + +def _semantic_config(cooldown_seconds: int): + return SimpleNamespace( + semantic=SimpleNamespace(summary_enqueue_cooldown_seconds=cooldown_seconds) + ) + + +async def test_semantic_queue_dedupes_same_request_while_active(monkeypatch): + monkeypatch.setattr( + "openviking.storage.queuefs.semantic_queue.get_openviking_config", + lambda: _semantic_config(300), + ) + queue = SemanticQueue(FakeQueueAGFS(), "/queue", "Semantic") + + first_msg = SemanticMsg(uri="viking://user/default/memories/entities", context_type="memory") + second_msg = SemanticMsg(uri="viking://user/default/memories/entities", context_type="memory") + + first_id = await queue.enqueue(first_msg) + second_id = await queue.enqueue(second_msg) + + assert first_id == second_id + assert await queue.size() == 1 + + raw = await queue.dequeue_raw() + assert raw is not None + queue._on_dequeue_start() + await queue.ack(raw["id"]) + + assert await queue.size() == 0 + + +async def test_semantic_queue_coalesces_changed_memory_updates(monkeypatch): + monkeypatch.setattr( + "openviking.storage.queuefs.semantic_queue.get_openviking_config", + lambda: _semantic_config(300), + ) + queue = SemanticQueue(FakeQueueAGFS(), "/queue", "Semantic") + + first = SemanticMsg( + uri="viking://user/default/memories/entities", + context_type="memory", + changes={"added": ["a.md"], "modified": [], "deleted": []}, + ) + second = SemanticMsg( + uri="viking://user/default/memories/entities", + context_type="memory", + changes={"added": [], "modified": ["b.md"], "deleted": []}, + ) + + first_id = await queue.enqueue(first) + second_id = await queue.enqueue(second) + third_id = await queue.enqueue(second) + fourth_id = await queue.enqueue(first) + + assert first_id == second_id + assert second_id == third_id + assert third_id == fourth_id + assert await queue.size() == 1 + + raw = await queue.dequeue_raw() + assert raw is not None + queue._on_dequeue_start() + await queue.ack(raw["id"]) + + assert await queue.size() == 1 + + follow_up = await queue.dequeue_raw() + assert follow_up is not None + payload = SemanticMsg.from_json(follow_up["data"]) + assert payload.changes == { + "added": ["a.md"], + "modified": ["b.md"], + "deleted": [], + } + + +async def test_semantic_queue_applies_completion_cooldown(monkeypatch): + clock = {"now": 100.0} + monkeypatch.setattr( + "openviking.storage.queuefs.semantic_queue.get_openviking_config", + lambda: _semantic_config(60), + ) + monkeypatch.setattr( + "openviking.storage.queuefs.semantic_queue.time.monotonic", + lambda: clock["now"], + ) + + queue = SemanticQueue(FakeQueueAGFS(), "/queue", "Semantic") + msg = SemanticMsg(uri="viking://user/default/memories/entities", context_type="memory") + + await queue.enqueue(msg) + raw = await queue.dequeue_raw() + assert raw is not None + queue._on_dequeue_start() + await queue.ack(raw["id"]) + + second_id = await queue.enqueue(msg) + assert await queue.size() == 0 + + clock["now"] += 61 + third_id = await queue.enqueue(msg) + + assert third_id != second_id + assert await queue.size() == 1 + + +async def test_semantic_queue_preserves_same_message_retry_while_active(monkeypatch): + monkeypatch.setattr( + "openviking.storage.queuefs.semantic_queue.get_openviking_config", + lambda: _semantic_config(300), + ) + queue = SemanticQueue(FakeQueueAGFS(), "/queue", "Semantic") + + msg = SemanticMsg(uri="viking://user/default/memories/entities", context_type="memory") + + first_id = await queue.enqueue(msg) + second_id = await queue.enqueue(msg) + + assert first_id == second_id + assert await queue.size() == 1 + + raw = await queue.dequeue_raw() + assert raw is not None + queue._on_dequeue_start() + await queue.ack(raw["id"]) + + assert await queue.size() == 1 + + follow_up = await queue.dequeue_raw() + assert follow_up is not None + payload = SemanticMsg.from_json(follow_up["data"]) + assert payload.id == msg.id + + +async def test_semantic_queue_retry_does_not_clobber_coalesced_follow_up(monkeypatch): + monkeypatch.setattr( + "openviking.storage.queuefs.semantic_queue.get_openviking_config", + lambda: _semantic_config(300), + ) + queue = SemanticQueue(FakeQueueAGFS(), "/queue", "Semantic") + + original = SemanticMsg( + uri="viking://user/default/memories/entities", + context_type="memory", + changes={"added": ["a.md"], "modified": [], "deleted": []}, + ) + incremental = SemanticMsg( + uri="viking://user/default/memories/entities", + context_type="memory", + changes={"added": [], "modified": ["b.md"], "deleted": []}, + ) + + await queue.enqueue(original) + await queue.enqueue(incremental) + await queue.enqueue(original) + + raw = await queue.dequeue_raw() + assert raw is not None + queue._on_dequeue_start() + await queue.ack(raw["id"]) + + follow_up = await queue.dequeue_raw() + assert follow_up is not None + payload = SemanticMsg.from_json(follow_up["data"]) + assert payload.changes == { + "added": ["a.md"], + "modified": ["b.md"], + "deleted": [], + } + + +async def test_semantic_queue_does_not_release_follow_up_when_ack_fails(monkeypatch): + monkeypatch.setattr( + "openviking.storage.queuefs.semantic_queue.get_openviking_config", + lambda: _semantic_config(300), + ) + queue = SemanticQueue(FakeQueueAGFS(fail_ack=True), "/queue", "Semantic") + + first = SemanticMsg( + uri="viking://user/default/memories/entities", + context_type="memory", + changes={"added": ["a.md"], "modified": [], "deleted": []}, + ) + second = SemanticMsg( + uri="viking://user/default/memories/entities", + context_type="memory", + changes={"added": [], "modified": ["b.md"], "deleted": []}, + ) + + await queue.enqueue(first) + await queue.enqueue(second) + + raw = await queue.dequeue_raw() + assert raw is not None + queue._on_dequeue_start() + await queue.ack(raw["id"]) + + assert await queue.size() == 0 + assert raw["id"] in queue._queue_id_to_key + assert queue._tracked_by_key + + +async def test_semantic_queue_keeps_distinct_targets_separate(monkeypatch): + monkeypatch.setattr( + "openviking.storage.queuefs.semantic_queue.get_openviking_config", + lambda: _semantic_config(300), + ) + queue = SemanticQueue(FakeQueueAGFS(), "/queue", "Semantic") + + session_msg = SemanticMsg( + uri="viking://session/default/abc", + context_type="session", + target_uri="viking://user/default/memories/entities", + ) + memory_msg = SemanticMsg( + uri="viking://session/default/abc", + context_type="session", + target_uri="viking://user/default/memories/patterns", + ) + + first_id = await queue.enqueue(session_msg) + second_id = await queue.enqueue(memory_msg) + + assert first_id != second_id + assert await queue.size() == 2 + + +async def test_semantic_queue_keeps_distinct_telemetry_and_lock_keys_separate(monkeypatch): + monkeypatch.setattr( + "openviking.storage.queuefs.semantic_queue.get_openviking_config", + lambda: _semantic_config(300), + ) + queue = SemanticQueue(FakeQueueAGFS(), "/queue", "Semantic") + + first = SemanticMsg( + uri="viking://session/default/abc", + context_type="session", + telemetry_id="telemetry-a", + lifecycle_lock_handle_id="lock-a", + ) + second = SemanticMsg( + uri="viking://session/default/abc", + context_type="session", + telemetry_id="telemetry-b", + lifecycle_lock_handle_id="lock-b", + ) + + first_id = await queue.enqueue(first) + second_id = await queue.enqueue(second) + + assert first_id != second_id + assert await queue.size() == 2