Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 78 additions & 34 deletions agent_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from modality import CognitiveEvent, CognitiveIntent, ModalityType
from pipeline_state import PipelineState
from providers import AGENT_TOOLS, InferenceProvider
from schemas.bargein import BargeinContext

if TYPE_CHECKING:
from channels import BrowserChannel
Expand Down Expand Up @@ -149,6 +150,9 @@ def __init__(
self.draft_queue = DraftQueue()
self._speculative_context: list[dict[str, str]] = [] # Context for speculative inference
self._human_speaking = False # Whether human is currently speaking
# A2: typed barge-in context prepared before the next turn, consumed by A3
# for prompt injection. Set by _prepare_bargein_context() on the WS path.
self._pending_bargein: BargeinContext | None = None

async def handle_event(self, event: CognitiveEvent) -> None:
"""Called when a CognitiveEvent arrives from the channel."""
Expand All @@ -175,12 +179,46 @@ async def handle_event(self, event: CognitiveEvent) -> None:

async def _process(self, event: CognitiveEvent) -> None:
"""Core: event → provider → tool dispatch."""
# Context stitching: inject interrupt context from dashboard path
# This closes the barge-in loop — the agent knows what was spoken,
# what was unsaid, and what the user interrupted with.
interrupt_context = self._build_interrupt_context(event.content)
if interrupt_context:
self.conversation.append({"role": "system", "content": interrupt_context})
# A2: build typed BargeinContext from pipeline_state.last_interrupt (if any)
# and stash on self._pending_bargein. A3 will consume it for prompt injection.
self._prepare_bargein_context(user_text=event.content)

# MOD3_USE_COGOS_AGENT fork: forward user turn to kernel bus instead of
# calling local provider. Response arrives asynchronously via the
# cogos_agent_bridge → BrowserChannel.broadcast_response_text path.
from cogos_agent_bridge import is_enabled as _cogos_agent_enabled
from cogos_agent_bridge import post_user_message as _post_user_message

if _cogos_agent_enabled():
session_id = f"mod3:{self.channel_id or 'unknown'}"
# Fold any pending barge-in context into the forwarded text so the
# kernel cycle sees it. A full structured payload will come in a
# later iteration; for v1 we prepend the terse prompt renderer.
forwarded_text = event.content
pending = self._pending_bargein
if pending is not None:
self._pending_bargein = None
forwarded_text = (
"[interrupted earlier] "
+ pending.format_for_prompt()
+ "\n"
+ forwarded_text
)
ok = await _post_user_message(forwarded_text, session_id=session_id)
if not ok and self._channel_ref:
try:
await self._channel_ref.send_response_text(
"[cogos-agent unreachable — check kernel]"
)
await self._channel_ref.send_response_complete(
metrics={"provider": "cogos-agent", "error": "unreachable"}
)
except Exception:
pass
# Track the user turn in history so subsequent turns carry it.
self.conversation.append({"role": "user", "content": event.content})
self._trim_history()
return

self.conversation.append({"role": "user", "content": event.content})
self._trim_history()
Expand All @@ -190,6 +228,7 @@ async def _process(self, event: CognitiveEvent) -> None:
# Assemble system prompt with kernel context (afferent path)
kernel_ctx = _fetch_kernel_context()
system_prompt = _BASE_SYSTEM_PROMPT + kernel_ctx
system_prompt = self._inject_pending_bargein(system_prompt)

response = await self.provider.chat(
messages=self.conversation,
Expand Down Expand Up @@ -510,46 +549,51 @@ async def background_validate_drafts(self, latest_user_text: str) -> None:

await self._push_draft_queue_state()

def _build_interrupt_context(self, user_text: str) -> str | None:
"""Build context stitch from pipeline_state.last_interrupt.

When the user barged in during TTS playback, captures what was
spoken vs unspoken and injects it as structured context for the
next inference call. Consumes the interrupt (clears it).
def _prepare_bargein_context(self, user_text: str | None) -> None:
"""Read pipeline_state.last_interrupt and stash a typed BargeinContext.

Returns a context string, or None if no interrupt occurred.
Called at the top of each WS turn. If the previous assistant reply was
interrupted (and the interrupt is still fresh, < 30s), build a
BargeinContext via the A1 schema and store it on ``self._pending_bargein``
for A3 to pick up during prompt construction. Clears last_interrupt so
the next turn does not re-consume a stale record.
"""
info = self.pipeline_state.last_interrupt
if info is None:
return None
self._pending_bargein = None
return

# Only use recent interrupts (within last 30 seconds)
if time.time() - info.timestamp > 30:
return None
# Stale — clear and skip.
with self.pipeline_state._lock:
self.pipeline_state._last_interrupt = None
self._pending_bargein = None
return

# Clear the interrupt so we don't re-inject it
# Consume the interrupt so we don't re-inject it on subsequent turns.
# pipeline_state has no public consume helper yet; clear the private
# slot under its lock (matches the pre-existing pattern on this path).
with self.pipeline_state._lock:
self.pipeline_state._last_interrupt = None

# Compute unspoken remainder
unspoken = ""
if info.full_text and info.delivered_text:
if info.full_text.startswith(info.delivered_text):
unspoken = info.full_text[len(info.delivered_text) :].strip()
else:
# Fallback: everything after the delivered percentage
unspoken = info.full_text[len(info.delivered_text) :].strip()
self._pending_bargein = BargeinContext.from_interrupt_info(
info,
source="browser_vad",
user_said=user_text or None,
)

parts = []
parts.append("[Barge-in context — your previous response was interrupted]")
parts.append(f'spoken (user heard this): "{info.delivered_text}"')
if unspoken:
parts.append(f'unspoken (user did NOT hear this): "{unspoken}"')
parts.append(f"interrupted_at: {info.spoken_pct * 100:.0f}%")
parts.append(f'user_said: "{user_text}"')
parts.append("Acknowledge what was interrupted and respond to the user's new input.")

return "\n".join(parts)
def _inject_pending_bargein(self, system_prompt: str) -> str:
"""Append the pending BargeinContext (if any) to the system prompt.

Consumes ``self._pending_bargein`` so it does not leak into subsequent
turns. Returns the prompt unchanged if no barge-in is pending.
"""
pending = self._pending_bargein
if pending is None:
return system_prompt
self._pending_bargein = None
return system_prompt + "\n\n" + pending.format_for_prompt()

def _trim_history(self) -> None:
"""Keep conversation within MAX_HISTORY messages."""
Expand Down
Loading
Loading