diff --git a/src/octopal/channels/telegram/handlers.py b/src/octopal/channels/telegram/handlers.py index 9f13bd29..26db47f5 100644 --- a/src/octopal/channels/telegram/handlers.py +++ b/src/octopal/channels/telegram/handlers.py @@ -899,6 +899,17 @@ async def _flush_pending_turn( confidence=decision.confidence, ) return + source_context = { + "source_channel": "telegram", + "chat_kind": "group", + "addressing_action": decision.action, + "addressing_reason": decision.reason, + } + else: + source_context = { + "source_channel": "telegram", + "chat_kind": "private", + } # Immediate feedback if reply_to_message_id is not None: @@ -919,6 +930,7 @@ async def _flush_pending_turn( images=images, saved_file_paths=saved_file_paths, source_channel="telegram", + source_context=source_context, ) except Exception: logger.exception("Failed to handle aggregated message", chat_id=chat_id) diff --git a/src/octopal/channels/whatsapp/runtime.py b/src/octopal/channels/whatsapp/runtime.py index f88a7274..b1709c22 100644 --- a/src/octopal/channels/whatsapp/runtime.py +++ b/src/octopal/channels/whatsapp/runtime.py @@ -335,6 +335,17 @@ async def _flush_pending_turn( confidence=decision.confidence, ) return + source_context = { + "source_channel": "whatsapp", + "chat_kind": "group", + "addressing_action": decision.action, + "addressing_reason": decision.reason, + } + else: + source_context = { + "source_channel": "whatsapp", + "chat_kind": "private", + } # Immediate feedback if to and message_id: @@ -356,6 +367,7 @@ async def _flush_pending_turn( images=images, saved_file_paths=saved_file_paths, source_channel="whatsapp", + source_context=source_context, ) update_last_message(self.settings) immediate = getattr(reply, "immediate", "") diff --git a/src/octopal/runtime/memory/service.py b/src/octopal/runtime/memory/service.py index b97614d7..3f1acb18 100644 --- a/src/octopal/runtime/memory/service.py +++ b/src/octopal/runtime/memory/service.py @@ -174,11 +174,36 @@ async def get_context_by_facets( top = scored[: self.top_k] return [f"{entry.role}: {entry.content}" for _, entry in top] - async def get_recent_history(self, chat_id: int, limit: int = 6) -> list[tuple[str, str, str]]: + async def get_recent_history( + self, + chat_id: int, + limit: int = 6, + *, + conversation_scope: str | None = None, + ) -> list[tuple[str, str, str]]: fetch_limit = max(limit * 5, 50) - entries = await asyncio.to_thread( - self.store.list_memory_entries_by_chat, chat_id, limit=fetch_limit - ) + if conversation_scope: + owner_fetch_limit = max(fetch_limit * 4, 200) + owner_entries = await asyncio.to_thread( + self.store.list_memory_entries_for_owner, + self.owner_id, + owner_fetch_limit, + ) + entries = [ + entry + for entry in owner_entries + if _entry_matches_conversation_scope( + entry, + chat_id=chat_id, + conversation_scope=conversation_scope, + ) + ] + entries.sort(key=lambda entry: entry.created_at, reverse=True) + entries = entries[:fetch_limit] + else: + entries = await asyncio.to_thread( + self.store.list_memory_entries_by_chat, chat_id, limit=fetch_limit + ) entries = [entry for entry in entries if _is_conversational_history_entry(entry)][:limit] entries.reverse() return [(entry.role, entry.content, entry.created_at.isoformat()) for entry in entries] @@ -217,10 +242,32 @@ def _is_conversational_history_entry(entry: MemoryEntry) -> bool: "planner", "scheduler", "control_plane", + "heartbeat", ) return not any(bool(metadata.get(flag)) for flag in internal_flags) +def _entry_matches_conversation_scope( + entry: MemoryEntry, + *, + chat_id: int, + conversation_scope: str, +) -> bool: + metadata = entry.metadata or {} + if str(metadata.get("conversation_scope", "") or "") == conversation_scope: + return True + if metadata.get("chat_id") == chat_id: + return True + channel = str(metadata.get("channel", "") or "").strip().lower() + return metadata.get("conversation_scope") is None and channel in { + "telegram", + "whatsapp", + "desktop", + "chat", + "a2a", + } + + def _extract_assertion(value: str) -> _Assertion | None: match = _ASSERTION_RE.match(value or "") if not match: diff --git a/src/octopal/runtime/octo/message_runtime.py b/src/octopal/runtime/octo/message_runtime.py index 628a913d..6403bdcf 100644 --- a/src/octopal/runtime/octo/message_runtime.py +++ b/src/octopal/runtime/octo/message_runtime.py @@ -49,6 +49,7 @@ _discard_worker_followup_batch = _followup_pipeline._discard_worker_followup_batch _schedule_worker_followup_flush = _followup_pipeline._schedule_worker_followup_flush +_SHARED_CONVERSATION_SCOPE = "default" def _build_user_memory_content( @@ -92,6 +93,26 @@ def _core_callable(name: str, default: Callable[..., Any]) -> Callable[..., Any] return default +def _conversation_scope_for_turn(*, track_progress: bool) -> str | None: + return _SHARED_CONVERSATION_SCOPE if track_progress else None + + +def _memory_channel_context_metadata(source_context: dict[str, Any] | None) -> dict[str, Any]: + if not source_context: + return {} + allowed_keys = { + "source_channel", + "chat_kind", + "addressing_action", + "addressing_reason", + } + return { + key: value + for key, value in source_context.items() + if key in allowed_keys and value not in (None, "") + } + + class OctoMessageRuntimeMixin: async def handle_message( self, @@ -107,6 +128,7 @@ async def handle_message( include_wakeup: bool = True, background_delivery: bool = False, source_channel: str | None = None, + source_context: dict[str, Any] | None = None, ) -> OctoReply: correlation_token = None correlation_id = correlation_id_var.get() @@ -116,6 +138,9 @@ async def handle_message( trace_status = "ok" trace_output: dict[str, Any] | None = None channel = (source_channel or ("desktop" if is_ws else "chat")).strip() or "chat" + source_context = dict(source_context or {}) + source_context.setdefault("source_channel", channel) + conversation_scope = _conversation_scope_for_turn(track_progress=track_progress) trace_metadata: dict[str, Any] = { "channel": channel, "message_kind": "heartbeat" if not track_progress else "user", @@ -125,6 +150,8 @@ async def handle_message( "persist_to_memory": persist_to_memory, "track_progress": track_progress, "background_delivery": background_delivery, + "conversation_scope": conversation_scope, + **_memory_channel_context_metadata(source_context), } wants_followup = False finalized_visible_reply = False @@ -199,11 +226,13 @@ async def handle_message( { "chat_id": chat_id, "channel": channel, + "conversation_scope": conversation_scope, "has_images": bool(images), "has_files": bool(saved_file_paths), "saved_file_paths": list(saved_file_paths or []), "heartbeat": not track_progress, "fact_candidate": False, + **_memory_channel_context_metadata(source_context), }, ) bootstrap_context = None @@ -239,6 +268,8 @@ async def handle_message( "saved_file_paths": saved_file_paths, "include_wakeup": include_wakeup, "route_mode": route_request.mode, + "conversation_scope": conversation_scope, + "channel_context": source_context, } route_or_reply = _core_callable("route_or_reply", _default_route_or_reply) while True: @@ -332,8 +363,10 @@ async def handle_message( { "chat_id": chat_id, "channel": channel, + "conversation_scope": conversation_scope, "background_delivery": True, "heartbeat": not track_progress, + **_memory_channel_context_metadata(source_context), }, ) elif persist_to_memory and delivery.user_visible: @@ -343,7 +376,9 @@ async def handle_message( { "chat_id": chat_id, "channel": channel, + "conversation_scope": conversation_scope, "heartbeat": not track_progress, + **_memory_channel_context_metadata(source_context), }, ) if delivery.user_visible and track_progress: diff --git a/src/octopal/runtime/octo/prompt_builder.py b/src/octopal/runtime/octo/prompt_builder.py index 06e74e18..47c0135a 100644 --- a/src/octopal/runtime/octo/prompt_builder.py +++ b/src/octopal/runtime/octo/prompt_builder.py @@ -306,6 +306,7 @@ async def _build_memory_context_bundle( user_text: str, chat_id: int, facts: FactsService | None = None, + conversation_scope: str | None = None, ) -> MemoryContextBundle: canon_context = await asyncio.to_thread(canon.get_tier1_context) @@ -332,7 +333,14 @@ async def _build_memory_context_bundle( else: memory_context = await memory.get_context(user_text, exclude_chat_id=chat_id) - raw_recent_history = await memory.get_recent_history(chat_id, limit=20) + try: + raw_recent_history = await memory.get_recent_history( + chat_id, + limit=20, + conversation_scope=conversation_scope, + ) + except TypeError: + raw_recent_history = await memory.get_recent_history(chat_id, limit=20) recent_history = [_normalize_recent_history_item(item) for item in raw_recent_history] if recent_history and recent_history[-1][0] == "user" and recent_history[-1][1] == user_text: recent_history = recent_history[:-1] @@ -369,6 +377,8 @@ async def build_octo_prompt( tool_policy_summary: str = "", facts: FactsService | None = None, reflection: ReflectionService | None = None, + conversation_scope: str | None = None, + channel_context: dict[str, object] | None = None, ) -> list[Message]: """Assembles all the pieces into the final message list for the LLM.""" @@ -380,7 +390,14 @@ async def build_octo_prompt( datetime_prompt = _current_datetime_prompt() - memory_bundle = await _build_memory_context_bundle(memory, canon, user_text, chat_id, facts) + memory_bundle = await _build_memory_context_bundle( + memory, + canon, + user_text, + chat_id, + facts, + conversation_scope=conversation_scope, + ) messages: list[Message] = [Message(role="system", content=system_prompt)] if persona_prompt_lines: @@ -416,6 +433,9 @@ async def build_octo_prompt( content=tool_policy_summary.strip(), ) ) + channel_context_prompt = _build_channel_context_prompt(channel_context, conversation_scope) + if channel_context_prompt: + messages.append(Message(role="system", content=channel_context_prompt)) if memory_bundle.canon_context: messages.append(Message(role="system", content=memory_bundle.canon_context)) @@ -505,6 +525,35 @@ async def build_octo_prompt( return messages +def _build_channel_context_prompt( + channel_context: dict[str, object] | None, + conversation_scope: str | None, +) -> str: + if not channel_context and not conversation_scope: + return "" + context = dict(channel_context or {}) + source_channel = str(context.get("source_channel", "") or "").strip() + chat_kind = str(context.get("chat_kind", "") or "").strip() + addressing_action = str(context.get("addressing_action", "") or "").strip() + lines = ["Current turn transport context:"] + if source_channel: + lines.append(f"- source_channel={source_channel}") + if chat_kind: + lines.append(f"- chat_kind={chat_kind}") + if addressing_action: + lines.append(f"- group_addressing_action={addressing_action}") + if conversation_scope: + lines.append(f"- conversation_scope={conversation_scope}") + if chat_kind == "group": + lines.append( + "This is a valid group-chat turn for this agent because the ingress addressing gate already allowed it." + ) + lines.append( + "Treat addressed group-chat messages as part of this agent's normal conversation context; do not claim you cannot see the group chat." + ) + return "\n".join(lines) + + async def build_control_plane_prompt( *, user_text: str, diff --git a/src/octopal/runtime/octo/router.py b/src/octopal/runtime/octo/router.py index 5499cd8c..4e474719 100644 --- a/src/octopal/runtime/octo/router.py +++ b/src/octopal/runtime/octo/router.py @@ -375,6 +375,8 @@ async def route_or_reply( saved_file_paths: list[str] | None = None, include_wakeup: bool = True, route_mode: str | RouteMode = RouteMode.CONVERSATION, + conversation_scope: str | None = None, + channel_context: dict[str, object] | None = None, ) -> str: """Core routing logic: decide whether to use tools or reply to user.""" # Internal chat_id (<= 0) should not trigger typing indicators. @@ -461,6 +463,8 @@ async def route_or_reply( tool_policy_summary=tool_policy_summary, facts=getattr(octo, "facts", None), reflection=getattr(octo, "reflection", None), + conversation_scope=conversation_scope, + channel_context=channel_context, ) runtime_plan_context = _build_runtime_plan_context(octo, chat_id) messages.append(Message(role="system", content=_build_runtime_plan_guidance())) diff --git a/tests/test_memory_system.py b/tests/test_memory_system.py index 36e51d51..d8ea55a5 100644 --- a/tests/test_memory_system.py +++ b/tests/test_memory_system.py @@ -8,6 +8,7 @@ from octopal.infrastructure.store.models import MemoryEntry from octopal.infrastructure.store.sqlite import SQLiteStore from octopal.runtime.memory.canon import CanonService +from octopal.runtime.memory.service import MemoryService from octopal.utils import utc_now @@ -95,6 +96,131 @@ def test_memory_chat_history_orders_by_created_at_not_uuid(tmp_path: Path) -> No assert [row.content for row in rows] == ["newest", "middle", "oldest"] +def test_recent_history_can_share_a_conversation_scope_across_chats(tmp_path: Path) -> None: + store = SQLiteStore(_StoreSettings(tmp_path / "data", tmp_path / "workspace")) + service = MemoryService(store=store, embeddings=None, owner_id="default") + + async def scenario() -> list[tuple[str, str, str]]: + await service.add_message( + "user", + "private setup detail", + { + "chat_id": 1, + "channel": "telegram", + "conversation_scope": "default", + }, + ) + await service.add_message( + "assistant", + "private setup acknowledged", + { + "chat_id": 1, + "channel": "telegram", + "conversation_scope": "default", + }, + ) + await service.add_message( + "system", + "internal worker result", + { + "chat_id": 2, + "channel": "telegram", + "conversation_scope": "default", + "worker_result": True, + }, + ) + await service.add_message( + "assistant", + "background heartbeat delivery", + { + "chat_id": 2, + "channel": "telegram", + "conversation_scope": "default", + "heartbeat": True, + }, + ) + await service.add_message( + "user", + "addressed group follow-up", + { + "chat_id": 2, + "channel": "telegram", + "conversation_scope": "default", + "chat_kind": "group", + "addressing_action": "respond_self", + }, + ) + return await service.get_recent_history(2, limit=10, conversation_scope="default") + + history = asyncio.run(scenario()) + + assert [(role, content) for role, content, _created_at in history] == [ + ("user", "private setup detail"), + ("assistant", "private setup acknowledged"), + ("user", "addressed group follow-up"), + ] + + +def test_scoped_recent_history_orders_by_created_at_not_insert_order(tmp_path: Path) -> None: + store = SQLiteStore(_StoreSettings(tmp_path / "data", tmp_path / "workspace")) + service = MemoryService(store=store, embeddings=None, owner_id="default") + base = datetime(2026, 6, 5, 10, 0, tzinfo=UTC) + entries = [ + MemoryEntry( + id=str(uuid.uuid4()), + role="user", + content="newest", + embedding=None, + created_at=base + timedelta(minutes=2), + metadata={ + "owner_id": "default", + "chat_id": 2, + "channel": "telegram", + "conversation_scope": "default", + }, + ), + MemoryEntry( + id=str(uuid.uuid4()), + role="assistant", + content="oldest backfill", + embedding=None, + created_at=base, + metadata={ + "owner_id": "default", + "chat_id": 1, + "channel": "telegram", + "conversation_scope": "default", + }, + ), + MemoryEntry( + id=str(uuid.uuid4()), + role="user", + content="middle backfill", + embedding=None, + created_at=base + timedelta(minutes=1), + metadata={ + "owner_id": "default", + "chat_id": 1, + "channel": "telegram", + "conversation_scope": "default", + }, + ), + ] + for entry in entries: + store.add_memory_entry(entry) + + async def scenario() -> list[tuple[str, str, str]]: + return await service.get_recent_history(2, limit=3, conversation_scope="default") + + history = asyncio.run(scenario()) + + assert [(role, content) for role, content, _created_at in history] == [ + ("assistant", "oldest backfill"), + ("user", "middle backfill"), + ("user", "newest"), + ] + + def test_canon_event_log_and_compaction(tmp_path: Path) -> None: canon = CanonService( workspace_dir=tmp_path / "workspace", diff --git a/tests/test_octo_prompt_history_timestamps.py b/tests/test_octo_prompt_history_timestamps.py index ecc9f819..68c54006 100644 --- a/tests/test_octo_prompt_history_timestamps.py +++ b/tests/test_octo_prompt_history_timestamps.py @@ -64,3 +64,51 @@ async def scenario() -> None: assert "Sent at: 2026-04-28T10:02:00+00:00\n\ncheck status" not in contents asyncio.run(scenario()) + + +def test_build_octo_prompt_passes_conversation_scope_and_group_context() -> None: + class DummyMemory: + def __init__(self) -> None: + self.seen_scope: str | None = None + + async def get_context(self, user_text: str, exclude_chat_id: int | None = None): + return [] + + async def get_recent_history( + self, + chat_id: int, + limit: int = 20, + *, + conversation_scope: str | None = None, + ): + self.seen_scope = conversation_scope + return [] + + memory = DummyMemory() + + async def scenario() -> None: + messages = await build_octo_prompt( + store=object(), + memory=memory, + canon=DummyCanon(), + user_text="can you see this group message?", + chat_id=-100, + bootstrap_context="", + conversation_scope="default", + channel_context={ + "source_channel": "telegram", + "chat_kind": "group", + "addressing_action": "respond_self", + }, + ) + + contents = [message.content for message in messages if isinstance(message.content, str)] + joined = "\n".join(contents) + assert memory.seen_scope == "default" + assert "source_channel=telegram" in joined + assert "chat_kind=group" in joined + assert "group_addressing_action=respond_self" in joined + assert "valid group-chat turn" in joined + assert "normal conversation context" in joined + + asyncio.run(scenario())