Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/octopal/channels/telegram/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,17 @@ async def _flush_pending_turn(
confidence=decision.confidence,
)
return
source_context = {
"source_channel": "telegram",
"chat_kind": "group",
"addressing_action": decision.action,
"addressing_reason": decision.reason,
}
else:
source_context = {
"source_channel": "telegram",
"chat_kind": "private",
}

# Immediate feedback
if reply_to_message_id is not None:
Expand All @@ -919,6 +930,7 @@ async def _flush_pending_turn(
images=images,
saved_file_paths=saved_file_paths,
source_channel="telegram",
source_context=source_context,
)
except Exception:
logger.exception("Failed to handle aggregated message", chat_id=chat_id)
Expand Down
12 changes: 12 additions & 0 deletions src/octopal/channels/whatsapp/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,17 @@ async def _flush_pending_turn(
confidence=decision.confidence,
)
return
source_context = {
"source_channel": "whatsapp",
"chat_kind": "group",
"addressing_action": decision.action,
"addressing_reason": decision.reason,
}
else:
source_context = {
"source_channel": "whatsapp",
"chat_kind": "private",
}

# Immediate feedback
if to and message_id:
Expand All @@ -356,6 +367,7 @@ async def _flush_pending_turn(
images=images,
saved_file_paths=saved_file_paths,
source_channel="whatsapp",
source_context=source_context,
)
update_last_message(self.settings)
immediate = getattr(reply, "immediate", "")
Expand Down
55 changes: 51 additions & 4 deletions src/octopal/runtime/memory/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,36 @@ async def get_context_by_facets(
top = scored[: self.top_k]
return [f"{entry.role}: {entry.content}" for _, entry in top]

async def get_recent_history(self, chat_id: int, limit: int = 6) -> list[tuple[str, str, str]]:
async def get_recent_history(
self,
chat_id: int,
limit: int = 6,
*,
conversation_scope: str | None = None,
) -> list[tuple[str, str, str]]:
fetch_limit = max(limit * 5, 50)
entries = await asyncio.to_thread(
self.store.list_memory_entries_by_chat, chat_id, limit=fetch_limit
)
if conversation_scope:
owner_fetch_limit = max(fetch_limit * 4, 200)
owner_entries = await asyncio.to_thread(
self.store.list_memory_entries_for_owner,
self.owner_id,
owner_fetch_limit,
)
entries = [
entry
for entry in owner_entries
if _entry_matches_conversation_scope(
entry,
chat_id=chat_id,
conversation_scope=conversation_scope,
)
]
entries.sort(key=lambda entry: entry.created_at, reverse=True)
entries = entries[:fetch_limit]
else:
entries = await asyncio.to_thread(
self.store.list_memory_entries_by_chat, chat_id, limit=fetch_limit
)
entries = [entry for entry in entries if _is_conversational_history_entry(entry)][:limit]
entries.reverse()
return [(entry.role, entry.content, entry.created_at.isoformat()) for entry in entries]
Expand Down Expand Up @@ -217,10 +242,32 @@ def _is_conversational_history_entry(entry: MemoryEntry) -> bool:
"planner",
"scheduler",
"control_plane",
"heartbeat",
)
return not any(bool(metadata.get(flag)) for flag in internal_flags)


def _entry_matches_conversation_scope(
entry: MemoryEntry,
*,
chat_id: int,
conversation_scope: str,
) -> bool:
metadata = entry.metadata or {}
if str(metadata.get("conversation_scope", "") or "") == conversation_scope:
return True
if metadata.get("chat_id") == chat_id:
return True
channel = str(metadata.get("channel", "") or "").strip().lower()
return metadata.get("conversation_scope") is None and channel in {
"telegram",
"whatsapp",
"desktop",
"chat",
"a2a",
}


def _extract_assertion(value: str) -> _Assertion | None:
match = _ASSERTION_RE.match(value or "")
if not match:
Expand Down
35 changes: 35 additions & 0 deletions src/octopal/runtime/octo/message_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

_discard_worker_followup_batch = _followup_pipeline._discard_worker_followup_batch
_schedule_worker_followup_flush = _followup_pipeline._schedule_worker_followup_flush
_SHARED_CONVERSATION_SCOPE = "default"


def _build_user_memory_content(
Expand Down Expand Up @@ -92,6 +93,26 @@ def _core_callable(name: str, default: Callable[..., Any]) -> Callable[..., Any]
return default


def _conversation_scope_for_turn(*, track_progress: bool) -> str | None:
return _SHARED_CONVERSATION_SCOPE if track_progress else None


def _memory_channel_context_metadata(source_context: dict[str, Any] | None) -> dict[str, Any]:
if not source_context:
return {}
allowed_keys = {
"source_channel",
"chat_kind",
"addressing_action",
"addressing_reason",
}
return {
key: value
for key, value in source_context.items()
if key in allowed_keys and value not in (None, "")
}


class OctoMessageRuntimeMixin:
async def handle_message(
self,
Expand All @@ -107,6 +128,7 @@ async def handle_message(
include_wakeup: bool = True,
background_delivery: bool = False,
source_channel: str | None = None,
source_context: dict[str, Any] | None = None,
) -> OctoReply:
correlation_token = None
correlation_id = correlation_id_var.get()
Expand All @@ -116,6 +138,9 @@ async def handle_message(
trace_status = "ok"
trace_output: dict[str, Any] | None = None
channel = (source_channel or ("desktop" if is_ws else "chat")).strip() or "chat"
source_context = dict(source_context or {})
source_context.setdefault("source_channel", channel)
conversation_scope = _conversation_scope_for_turn(track_progress=track_progress)
trace_metadata: dict[str, Any] = {
"channel": channel,
"message_kind": "heartbeat" if not track_progress else "user",
Expand All @@ -125,6 +150,8 @@ async def handle_message(
"persist_to_memory": persist_to_memory,
"track_progress": track_progress,
"background_delivery": background_delivery,
"conversation_scope": conversation_scope,
**_memory_channel_context_metadata(source_context),
}
wants_followup = False
finalized_visible_reply = False
Expand Down Expand Up @@ -199,11 +226,13 @@ async def handle_message(
{
"chat_id": chat_id,
"channel": channel,
"conversation_scope": conversation_scope,
"has_images": bool(images),
"has_files": bool(saved_file_paths),
"saved_file_paths": list(saved_file_paths or []),
"heartbeat": not track_progress,
"fact_candidate": False,
**_memory_channel_context_metadata(source_context),
},
)
bootstrap_context = None
Expand Down Expand Up @@ -239,6 +268,8 @@ async def handle_message(
"saved_file_paths": saved_file_paths,
"include_wakeup": include_wakeup,
"route_mode": route_request.mode,
"conversation_scope": conversation_scope,
"channel_context": source_context,
}
route_or_reply = _core_callable("route_or_reply", _default_route_or_reply)
while True:
Expand Down Expand Up @@ -332,8 +363,10 @@ async def handle_message(
{
"chat_id": chat_id,
"channel": channel,
"conversation_scope": conversation_scope,
"background_delivery": True,
"heartbeat": not track_progress,
**_memory_channel_context_metadata(source_context),
},
)
elif persist_to_memory and delivery.user_visible:
Expand All @@ -343,7 +376,9 @@ async def handle_message(
{
"chat_id": chat_id,
"channel": channel,
"conversation_scope": conversation_scope,
"heartbeat": not track_progress,
**_memory_channel_context_metadata(source_context),
},
)
if delivery.user_visible and track_progress:
Expand Down
53 changes: 51 additions & 2 deletions src/octopal/runtime/octo/prompt_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ async def _build_memory_context_bundle(
user_text: str,
chat_id: int,
facts: FactsService | None = None,
conversation_scope: str | None = None,
) -> MemoryContextBundle:
canon_context = await asyncio.to_thread(canon.get_tier1_context)

Expand All @@ -332,7 +333,14 @@ async def _build_memory_context_bundle(
else:
memory_context = await memory.get_context(user_text, exclude_chat_id=chat_id)

raw_recent_history = await memory.get_recent_history(chat_id, limit=20)
try:
raw_recent_history = await memory.get_recent_history(
chat_id,
limit=20,
conversation_scope=conversation_scope,
)
except TypeError:
raw_recent_history = await memory.get_recent_history(chat_id, limit=20)
recent_history = [_normalize_recent_history_item(item) for item in raw_recent_history]
if recent_history and recent_history[-1][0] == "user" and recent_history[-1][1] == user_text:
recent_history = recent_history[:-1]
Expand Down Expand Up @@ -369,6 +377,8 @@ async def build_octo_prompt(
tool_policy_summary: str = "",
facts: FactsService | None = None,
reflection: ReflectionService | None = None,
conversation_scope: str | None = None,
channel_context: dict[str, object] | None = None,
) -> list[Message]:
"""Assembles all the pieces into the final message list for the LLM."""

Expand All @@ -380,7 +390,14 @@ async def build_octo_prompt(

datetime_prompt = _current_datetime_prompt()

memory_bundle = await _build_memory_context_bundle(memory, canon, user_text, chat_id, facts)
memory_bundle = await _build_memory_context_bundle(
memory,
canon,
user_text,
chat_id,
facts,
conversation_scope=conversation_scope,
)

messages: list[Message] = [Message(role="system", content=system_prompt)]
if persona_prompt_lines:
Expand Down Expand Up @@ -416,6 +433,9 @@ async def build_octo_prompt(
content=tool_policy_summary.strip(),
)
)
channel_context_prompt = _build_channel_context_prompt(channel_context, conversation_scope)
if channel_context_prompt:
messages.append(Message(role="system", content=channel_context_prompt))

if memory_bundle.canon_context:
messages.append(Message(role="system", content=memory_bundle.canon_context))
Expand Down Expand Up @@ -505,6 +525,35 @@ async def build_octo_prompt(
return messages


def _build_channel_context_prompt(
channel_context: dict[str, object] | None,
conversation_scope: str | None,
) -> str:
if not channel_context and not conversation_scope:
return ""
context = dict(channel_context or {})
source_channel = str(context.get("source_channel", "") or "").strip()
chat_kind = str(context.get("chat_kind", "") or "").strip()
addressing_action = str(context.get("addressing_action", "") or "").strip()
lines = ["Current turn transport context:"]
if source_channel:
lines.append(f"- source_channel={source_channel}")
if chat_kind:
lines.append(f"- chat_kind={chat_kind}")
if addressing_action:
lines.append(f"- group_addressing_action={addressing_action}")
if conversation_scope:
lines.append(f"- conversation_scope={conversation_scope}")
if chat_kind == "group":
lines.append(
"This is a valid group-chat turn for this agent because the ingress addressing gate already allowed it."
)
lines.append(
"Treat addressed group-chat messages as part of this agent's normal conversation context; do not claim you cannot see the group chat."
)
return "\n".join(lines)


async def build_control_plane_prompt(
*,
user_text: str,
Expand Down
4 changes: 4 additions & 0 deletions src/octopal/runtime/octo/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@ async def route_or_reply(
saved_file_paths: list[str] | None = None,
include_wakeup: bool = True,
route_mode: str | RouteMode = RouteMode.CONVERSATION,
conversation_scope: str | None = None,
channel_context: dict[str, object] | None = None,
) -> str:
"""Core routing logic: decide whether to use tools or reply to user."""
# Internal chat_id (<= 0) should not trigger typing indicators.
Expand Down Expand Up @@ -461,6 +463,8 @@ async def route_or_reply(
tool_policy_summary=tool_policy_summary,
facts=getattr(octo, "facts", None),
reflection=getattr(octo, "reflection", None),
conversation_scope=conversation_scope,
channel_context=channel_context,
)
runtime_plan_context = _build_runtime_plan_context(octo, chat_id)
messages.append(Message(role="system", content=_build_runtime_plan_guidance()))
Expand Down
Loading