diff --git a/audio_subscribers.py b/audio_subscribers.py new file mode 100644 index 0000000..54ef3ea --- /dev/null +++ b/audio_subscribers.py @@ -0,0 +1,221 @@ +"""Per-session audio subscriber registry (Wave 4.3). + +A separate, tiny module so the routing surface stays independent of the +session registry (which is already load-bearing for ADR-082 Phase 1). The +dashboard WebSocket lands here; ``mcp_shim._play_wav_bytes`` queries this +registry before falling back to sounddevice; the kernel queries the +``/v1/sessions/{id}/subscribers`` HTTP endpoint before spawning afplay. + +Thread-safety: registration happens in FastAPI's event loop thread (WS +handler), lookup and emit happen from the MCP shim playback thread. A +single RLock around the dict is sufficient — the set-per-session is small +(usually 1 dashboard) and contention is effectively zero. + +Delivery semantics: the emit paths are best-effort. A WebSocket that has +already died just drops the frame — the kernel fallback ``afplay`` +never happened because the check went through before we spawned it, but +the session will miss this turn's audio. That's acceptable: the dashboard +polls and reconnects; the user sees silence for one turn instead of +nothing at all. +""" + +from __future__ import annotations + +import asyncio +import logging +import threading +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +if TYPE_CHECKING: # pragma: no cover — type-check-only import + from fastapi import WebSocket + +logger = logging.getLogger("mod3.audio_subscribers") + + +@dataclass +class _Subscriber: + """A single WebSocket subscription for a session.""" + + ws: "WebSocket" + # The event loop the WebSocket was accepted on. Emit calls from other + # threads need to run_coroutine_threadsafe onto this loop. + loop: asyncio.AbstractEventLoop + # Monotonic sequence for logging / frame ordering. Opaque to callers. + seq: int = 0 + + +@dataclass +class _SessionBucket: + """Subscribers currently attached to a session_id.""" + + subscribers: list[_Subscriber] = field(default_factory=list) + + +class AudioSubscriberRegistry: + """Thread-safe session_id → active WebSocket subscribers mapping. + + Callers never reach into the bucket lists directly — they go through + register / unregister / has_subscribers / emit_wav. + """ + + def __init__(self) -> None: + self._lock = threading.RLock() + self._buckets: dict[str, _SessionBucket] = {} + self._frame_seq = 0 + + # ------------------------------------------------------------------ + # Registration + # ------------------------------------------------------------------ + + def register(self, session_id: str, ws: "WebSocket", loop: asyncio.AbstractEventLoop) -> _Subscriber: + """Attach ``ws`` to ``session_id``. Caller must have already + ``accept()``-ed the WebSocket. + """ + sub = _Subscriber(ws=ws, loop=loop) + with self._lock: + bucket = self._buckets.setdefault(session_id, _SessionBucket()) + bucket.subscribers.append(sub) + count = len(bucket.subscribers) + logger.info("audio subscriber attached: session=%s total=%d", session_id, count) + return sub + + def unregister(self, session_id: str, sub: _Subscriber) -> None: + """Detach. Idempotent — double-unregister is a no-op.""" + with self._lock: + bucket = self._buckets.get(session_id) + if bucket is None: + return + try: + bucket.subscribers.remove(sub) + except ValueError: + return + remaining = len(bucket.subscribers) + if not bucket.subscribers: + # Drop empty buckets so the subscribed-check stays fast. + self._buckets.pop(session_id, None) + logger.info("audio subscriber detached: session=%s remaining=%d", session_id, remaining) + + # ------------------------------------------------------------------ + # Inspection + # ------------------------------------------------------------------ + + def has_subscribers(self, session_id: str) -> bool: + with self._lock: + bucket = self._buckets.get(session_id) + return bool(bucket and bucket.subscribers) + + def count(self, session_id: str) -> int: + with self._lock: + bucket = self._buckets.get(session_id) + return len(bucket.subscribers) if bucket else 0 + + def snapshot(self) -> dict[str, int]: + """session_id → subscriber count. For diagnostics only.""" + with self._lock: + return {sid: len(b.subscribers) for sid, b in self._buckets.items()} + + # ------------------------------------------------------------------ + # Emit + # ------------------------------------------------------------------ + + def emit_wav( + self, + session_id: str, + wav_bytes: bytes, + *, + job_id: str | None = None, + duration_sec: float | None = None, + sample_rate: int | None = None, + ) -> int: + """Push a whole WAV blob to every subscriber of ``session_id``. + + Returns the number of subscribers the frame was enqueued for. Each + send is fire-and-forget via ``run_coroutine_threadsafe`` — the caller + doesn't block on socket I/O, which matches the existing + ``BrowserChannel.broadcast_trace_event`` pattern. + + The wire format is a single binary WebSocket frame containing the + raw WAV (RIFF / WAVE) bytes. Browsers can decode this directly via + AudioContext.decodeAudioData. A preceding small JSON control frame + announces the incoming audio with session_id + job_id + duration + so the dashboard can correlate the blob to a synthesize call. + """ + with self._lock: + bucket = self._buckets.get(session_id) + if not bucket or not bucket.subscribers: + return 0 + targets = list(bucket.subscribers) + self._frame_seq += 1 + seq = self._frame_seq + + header = { + "type": "audio_header", + "session_id": session_id, + "job_id": job_id, + "duration_sec": duration_sec, + "sample_rate": sample_rate, + "bytes": len(wav_bytes), + "format": "wav", + "seq": seq, + } + + delivered = 0 + for sub in targets: + try: + asyncio.run_coroutine_threadsafe(_send_audio_frame(sub.ws, header, wav_bytes), sub.loop) + delivered += 1 + except Exception as exc: # noqa: BLE001 — disconnected subscribers are expected + logger.debug("emit_wav: scheduling failed for %s: %s", session_id, exc) + logger.info( + "emit_wav: session=%s bytes=%d delivered_to=%d seq=%d", + session_id, + len(wav_bytes), + delivered, + seq, + ) + return delivered + + +async def _send_audio_frame(ws: "WebSocket", header: dict, wav_bytes: bytes) -> None: + """Send header JSON + binary WAV over a single WebSocket. + + Split into a module-level coroutine so ``run_coroutine_threadsafe`` + returns a Future the caller can ignore — the pair is sent in order on + the socket's own coroutine context. + """ + try: + await ws.send_json(header) + await ws.send_bytes(wav_bytes) + except Exception as exc: # noqa: BLE001 — disconnect mid-send is expected + logger.debug("audio frame send failed: %s", exc) + + +# --------------------------------------------------------------------------- +# Process-global default registry — shared between http_api and mcp_shim. +# --------------------------------------------------------------------------- + +_default_registry: AudioSubscriberRegistry | None = None +_default_registry_lock = threading.Lock() + + +def get_default_audio_subscribers() -> AudioSubscriberRegistry: + global _default_registry + with _default_registry_lock: + if _default_registry is None: + _default_registry = AudioSubscriberRegistry() + return _default_registry + + +def reset_default_audio_subscribers() -> None: + """For tests — drop the module-level registry.""" + global _default_registry + with _default_registry_lock: + _default_registry = None + + +__all__ = [ + "AudioSubscriberRegistry", + "get_default_audio_subscribers", + "reset_default_audio_subscribers", +] diff --git a/channels.py b/channels.py index 9f6f592..94dfa67 100644 --- a/channels.py +++ b/channels.py @@ -14,6 +14,11 @@ partial_transcript, transcript, trace_event — kernel cycle-trace events (ADR-083), fanned out via BrowserChannel.broadcast_trace_event(). + +The MOD3_USE_COGOS_AGENT kernel-bridged path emits response_text AND +response_complete via BrowserChannel.broadcast_response_{text,complete} +so the dashboard UI's turn-done signal fires on every turn, matching the +local-inference path's behavior. """ from __future__ import annotations @@ -493,6 +498,40 @@ def broadcast_response_text(cls, text: str, session_id: str | None = None) -> No except Exception as exc: # noqa: BLE001 — disconnected clients are expected logger.debug("response_text send failed for %s: %s", ch.channel_id, exc) + @classmethod + def broadcast_response_complete( + cls, + metrics: dict | None = None, + session_id: str | None = None, + ) -> None: + """Push a `response_complete` frame to dashboard WebSocket clients. + + Companion to :meth:`broadcast_response_text`: the MOD3_USE_COGOS_AGENT + response bridge emits exactly one complete-frame per kernel + `agent_response` event so the dashboard UI's per-turn `isResponding` + state gets cleared (otherwise the chat panel spinner hangs forever). + + Routing and threading match `broadcast_response_text` 1:1 — pass the + same `session_id` so the completion frame lands on the same channel + that received the text frames for this turn. `metrics` follows the + local-path convention from `agent_loop._process` (`{"llm_ms": ..., + "provider": ...}`); the kernel path populates it with + `{"provider": "cogos-agent", ...}`. + """ + frame = {"type": "response_complete", "metrics": metrics or {}} + expected_channel = None + if session_id and session_id.startswith("mod3:"): + expected_channel = session_id[len("mod3:") :] + for ch in list(cls._active_channels): + if not ch._active: + continue + if expected_channel and ch.channel_id != expected_channel: + continue + try: + asyncio.run_coroutine_threadsafe(ch.ws.send_json(frame), ch._loop) + except Exception as exc: # noqa: BLE001 — disconnected clients are expected + logger.debug("response_complete send failed for %s: %s", ch.channel_id, exc) + # ------------------------------------------------------------------ # Cleanup # ------------------------------------------------------------------ diff --git a/cogos_agent_bridge.py b/cogos_agent_bridge.py index c8616f2..05ba8ab 100644 --- a/cogos_agent_bridge.py +++ b/cogos_agent_bridge.py @@ -194,9 +194,24 @@ def _extract_response_text(payload: dict) -> Optional[str]: async def run_response_bridge(subscriber: KernelBusSubscriber) -> None: """Consume `subscriber` and broadcast agent replies to dashboard clients. - `BrowserChannel.broadcast_response_text()` is thread-safe via - `run_coroutine_threadsafe`, matching the existing trace-event pattern. - Malformed events (no recoverable text) are logged at debug and skipped. + Each kernel `agent_response` event on `bus_dashboard_response` is a + complete per-turn reply (see `apps/cogos/agent_tools_respond.go` — the + `respond` tool is documented as "call at most once per user turn" and + the auto-fallback publishes once if the model skipped the tool call). + We therefore emit two dashboard frames per kernel event: + + * ``broadcast_response_text`` — the reply body (chat panel render) + * ``broadcast_response_complete`` — the turn-done signal so the UI's + per-turn spinner clears. Without this, the dashboard hangs + awaiting completion because the kernel path never reaches the + ``send_response_complete`` call that the local-inference branch + emits at ``agent_loop._process`` ~L300. + + ``BrowserChannel.broadcast_response_{text,complete}()`` are thread-safe + via ``run_coroutine_threadsafe``, matching the existing trace-event + pattern. Malformed events (no recoverable text) are logged at debug + and skipped — we do NOT emit a completion frame for skipped events + (keeps the 1:1 pairing with what the UI actually rendered). """ first_event_logged = False forwarded = 0 @@ -221,6 +236,16 @@ async def run_response_bridge(subscriber: KernelBusSubscriber) -> None: session_id = _extract_session_id(env.payload) try: BrowserChannel.broadcast_response_text(text, session_id=session_id) + # Pair the text frame with a completion frame so the dashboard's + # per-turn "awaiting response" state clears. Kernel emits exactly + # one agent_response per user turn, so one complete per event is + # the correct cardinality. + metrics: dict = {"provider": "cogos-agent"} + if env.event_id: + metrics["event_id"] = env.event_id + if env.ts: + metrics["kernel_ts"] = env.ts + BrowserChannel.broadcast_response_complete(metrics, session_id=session_id) forwarded += 1 logger.debug( "cogos-agent: forwarded response event_id=%s session=%s (total=%d)", diff --git a/dashboard/index.html b/dashboard/index.html index ae166c5..c66e0ef 100644 --- a/dashboard/index.html +++ b/dashboard/index.html @@ -214,11 +214,86 @@ /* Leave room at the bottom so the drawer doesn't cover the input */ body { padding-bottom: 32px; } + /* Participant panel (ADR-082 / Wave 4) — live session roster. Renders as a + collapsible drawer on the right side of the main pane. */ + .participants-pill { + display: flex; align-items: center; gap: 6px; + font-size: 0.75rem; color: var(--muted); + padding: 4px 10px; border-radius: 12px; + background: var(--bg); border: 1px solid var(--border); + cursor: pointer; user-select: none; + } + .participants-pill:hover { border-color: var(--accent); } + .participants-pill .count { color: var(--accent); font-variant-numeric: tabular-nums; } + .participants-pill .count.me { color: var(--green); } + #participants-panel { + position: fixed; top: 48px; right: 16px; + background: var(--surface); border: 1px solid var(--border); border-radius: 8px; + width: 320px; max-height: 60vh; overflow-y: auto; + box-shadow: 0 8px 24px rgba(0,0,0,0.4); + z-index: 30; display: none; padding: 8px 0; + } + #participants-panel.open { display: block; } + #participants-panel .pp-header { + padding: 8px 14px; border-bottom: 1px solid var(--border); + font-size: 0.7rem; color: var(--muted); text-transform: uppercase; + letter-spacing: 0.5px; display: flex; align-items: center; gap: 8px; + } + #participants-panel .pp-header .session-id { + font-family: ui-monospace, SFMono-Regular, Menlo, monospace; + color: var(--text); text-transform: none; letter-spacing: 0; + margin-left: auto; font-size: 0.7rem; + } + #participants-list { list-style: none; padding: 0; margin: 0; } + #participants-list .pp-empty { + padding: 16px 14px; font-size: 0.8rem; color: var(--muted); text-align: center; + } + .pp-row { + display: flex; align-items: center; gap: 10px; + padding: 8px 14px; border-bottom: 1px solid rgba(48,54,61,0.4); + font-size: 0.8rem; + } + .pp-row:last-child { border-bottom: none; } + .pp-row.me { + background: rgba(63,185,80,0.07); + border-left: 2px solid var(--green); + padding-left: 12px; + } + .pp-row .pp-info { flex: 1; min-width: 0; } + .pp-row .pp-name { color: var(--text); font-weight: 500; } + .pp-row .pp-name .you-marker { + display: inline-block; margin-left: 4px; padding: 0 6px; + background: var(--green); color: #000; border-radius: 8px; + font-size: 0.6rem; font-weight: 600; text-transform: uppercase; + letter-spacing: 0.5px; vertical-align: middle; + } + .pp-row .pp-meta { + font-size: 0.68rem; color: var(--muted); + font-family: ui-monospace, SFMono-Regular, Menlo, monospace; + display: flex; gap: 6px; flex-wrap: wrap; margin-top: 2px; + } + .pp-row .pp-meta .pp-voice { color: var(--accent); } + .pp-row .pp-meta .pp-session { opacity: 0.7; } + .pp-row .pp-badge { + font-size: 0.6rem; padding: 1px 6px; border-radius: 8px; + background: var(--bg); border: 1px solid var(--border); color: var(--muted); + text-transform: uppercase; letter-spacing: 0.5px; + } + .pp-row .pp-badge.user { color: var(--accent); border-color: var(--accent); } + .pp-row .pp-badge.agent { color: var(--orange); border-color: var(--orange); } + .pp-row .pp-conflict { color: var(--orange); font-size: 0.65rem; margin-left: 4px; } + .pp-row .pp-audio-dot { + width: 6px; height: 6px; border-radius: 50%; + background: var(--muted); flex-shrink: 0; + } + .pp-row .pp-audio-dot.ws { background: var(--green); } + /* Responsive */ @media (max-width: 700px) { .main { padding: 12px 16px; } .voice-controls { flex-wrap: wrap; } .msg { max-width: 90%; } + #participants-panel { right: 8px; width: calc(100vw - 16px); } } @@ -235,6 +310,22 @@

Mod³

Mic +
+ 👥 + 0 + sessions +
+ + + +
+
+ Participants + unregistered +
+
@@ -391,25 +482,39 @@

Mod³

const audioOutputSelect = document.getElementById('audio-output'); // _playback declared below with _ws and _transport +const OUTPUT_DEVICE_STORAGE_KEY = 'mod3-output-device'; + async function populateOutputDevices() { try { const devices = await navigator.mediaDevices.enumerateDevices(); const outputs = devices.filter(d => d.kind === 'audiooutput'); if (outputs.length === 0) return; audioOutputSelect.innerHTML = ''; + const saved = localStorage.getItem(OUTPUT_DEVICE_STORAGE_KEY); let selectedIdx = 0; + let sawSaved = false; outputs.forEach((d, i) => { const opt = document.createElement('option'); opt.value = d.deviceId; opt.textContent = d.label || `Speaker ${i + 1}`; audioOutputSelect.appendChild(opt); + if (saved && d.deviceId === saved) { + selectedIdx = i; + sawSaved = true; + } const label = (d.label || '').toLowerCase(); - // Prefer device starting with "Default -" - if (label.startsWith('default -') || label.startsWith('default —')) { + // Prefer device starting with "Default -" only when no saved pick exists + if (!sawSaved && (label.startsWith('default -') || label.startsWith('default —'))) { selectedIdx = i; } }); audioOutputSelect.selectedIndex = selectedIdx; + // Push the resolved sink into the Wave 4 audio context now that enumeration + // is complete — otherwise first-playback races populateOutputDevices and + // pins the sink to whatever the dropdown transiently held. + if (window.__mod3AudioSink) { + await window.__mod3AudioSink(audioOutputSelect.value); + } } catch (err) { console.warn('Could not enumerate output devices:', err); } @@ -463,10 +568,21 @@

Mod³

}); audioOutputSelect.addEventListener('change', async () => { + const deviceId = audioOutputSelect.value; + const label = audioOutputSelect.options[audioOutputSelect.selectedIndex].text; + // Persist the user's choice so reloads honor it (fixes timing race where + // ensureAudioCtx runs before populateOutputDevices and pins the sink to + // whatever was transiently first in the select). + try { localStorage.setItem(OUTPUT_DEVICE_STORAGE_KEY, deviceId); } catch { /* private mode */ } + // Legacy chat-path playback if (_playback) { - await _playback.setOutputDevice(audioOutputSelect.value); - console.log('[Output] Switched to:', audioOutputSelect.options[audioOutputSelect.selectedIndex].text); + await _playback.setOutputDevice(deviceId); + } + // Wave 4 WebSocket-path AudioContext (new) + if (window.__mod3AudioSink) { + await window.__mod3AudioSink(deviceId); } + console.log('[Output] Switched to:', label); }); // --- WebSocket transport reference (shared between voice and text) --- @@ -666,6 +782,416 @@

Mod³

chatInput.style.height = 'auto'; chatInput.style.height = Math.min(chatInput.scrollHeight, 120) + 'px'; }); + +/** + * Wave 4 — Session registration + participant panel. + * + * On page load: + * 1. Try to reuse a session_id from sessionStorage (so refreshes reuse the + * same bus identity). + * 2. Otherwise, POST to the kernel's /v1/channel-sessions/register (the + * kernel owns minting authority — ADR-082 Wave 3.5). Fall back to mod3's + * direct /v1/sessions/register when the kernel is unreachable so the + * dashboard still works when only mod3 is running. + * 3. Poll GET /v1/sessions every 4s to render the live roster. + * 4. On beforeunload, best-effort deregister so the voice goes back to the + * pool quickly. + * + * Exposes window.__mod3Session for the audio-WebSocket code in Wave 4.3 to + * consume — that code keys its /ws/audio/{session_id} subscription off this + * value. + */ +(function setupSessionRegistration() { + const KERNEL_URL = 'http://localhost:6931'; // Wave 3.5 kernel-owned authority + const SESSION_KEY = 'mod3.sessionId'; + const POLL_INTERVAL_MS = 4000; + + // Short UUID helper (8 hex chars — matches the kernel's cs- short-id + // style for merged displays). + function shortUuid() { + const bytes = new Uint8Array(4); + crypto.getRandomValues(bytes); + return Array.from(bytes, b => b.toString(16).padStart(2, '0')).join(''); + } + + // Session state published on window so other scripts can see it. + window.__mod3Session = { + session_id: null, + participant_id: null, + assigned_voice: null, + registered: false, + }; + + async function registerSession() { + // Reuse cached session if present — refreshes stay on the same identity. + const cached = sessionStorage.getItem(SESSION_KEY); + const reuseId = cached ? JSON.parse(cached) : null; + + const participantId = (reuseId && reuseId.participant_id) || + ('dashboard-' + shortUuid()); + const sessionId = reuseId ? reuseId.session_id : undefined; + + const payload = { + participant_id: participantId, + participant_type: 'user', + preferred_voice: null, + preferred_output_device: 'system-default', + }; + if (sessionId) payload.session_id = sessionId; + + // 1. Try kernel-owned endpoint first. + let data = null; + try { + const r = await fetch(`${KERNEL_URL}/v1/channel-sessions/register`, { + method: 'POST', + headers: {'Content-Type': 'application/json'}, + body: JSON.stringify({...payload, kinds: ['audio']}), + }); + if (r.ok) { + data = await r.json(); + // Kernel returns {kernel: {...}, mod3: {...}} — normalize to mod3 shape + if (data.mod3) { + data = {...data.mod3, session_id: data.kernel.session_id || data.mod3.session_id}; + } + console.log('[Session] Registered via kernel:', data.session_id); + } + } catch (e) { + console.log('[Session] Kernel unreachable, falling back to mod3 direct:', e.message); + } + + // 2. Fallback: mod3 direct registration. + if (!data) { + try { + // Mod3's /v1/sessions/register requires a session_id in the payload — + // mint one on the client when we don't have a cached one. This is + // fallback-path only; the kernel path auto-mints on empty. + const directPayload = {...payload}; + if (!directPayload.session_id) { + directPayload.session_id = 'dashboard-' + shortUuid() + '-' + Date.now(); + } + const r = await fetch('/v1/sessions/register', { + method: 'POST', + headers: {'Content-Type': 'application/json'}, + body: JSON.stringify(directPayload), + }); + if (r.ok) { + data = await r.json(); + console.log('[Session] Registered via mod3 direct:', data.session_id); + } else { + console.error('[Session] mod3 register failed:', r.status, await r.text()); + return; + } + } catch (e) { + console.error('[Session] mod3 register threw:', e); + return; + } + } + + if (!data || !data.session_id) return; + + window.__mod3Session = { + session_id: data.session_id, + participant_id: data.participant_id || participantId, + assigned_voice: data.assigned_voice || null, + registered: true, + }; + + sessionStorage.setItem(SESSION_KEY, JSON.stringify({ + session_id: data.session_id, + participant_id: data.participant_id || participantId, + })); + + const selfLabel = document.getElementById('participants-self-id'); + if (selfLabel) { + selfLabel.textContent = data.session_id; + selfLabel.style.color = 'var(--green)'; + } + + // Dispatch event so the audio-WS bootstrap (Wave 4.3) can subscribe. + window.dispatchEvent(new CustomEvent('mod3-session-registered', { + detail: window.__mod3Session, + })); + } + + async function pollSessions() { + try { + const r = await fetch('/v1/sessions'); + if (!r.ok) return; + const body = await r.json(); + renderParticipantPanel(body.sessions || []); + } catch (e) { + // silent — the /health pill already tracks connectivity + } + } + + function renderParticipantPanel(sessions) { + const listEl = document.getElementById('participants-list'); + const countEl = document.getElementById('participants-count'); + const pluralEl = document.getElementById('participants-plural'); + if (!listEl || !countEl) return; + + const selfId = window.__mod3Session && window.__mod3Session.session_id; + + countEl.textContent = String(sessions.length); + if (pluralEl) pluralEl.textContent = sessions.length === 1 ? '' : 's'; + if (countEl) { + const hasSelf = selfId && sessions.some(s => s.session_id === selfId); + countEl.className = hasSelf ? 'count me' : 'count'; + } + + if (sessions.length === 0) { + listEl.innerHTML = '
  • No active sessions
  • '; + return; + } + + // Sort: self first, then agents, then users, each by registered_at desc + const sorted = [...sessions].sort((a, b) => { + if (a.session_id === selfId) return -1; + if (b.session_id === selfId) return 1; + return (b.registered_at || 0) - (a.registered_at || 0); + }); + + listEl.innerHTML = sorted.map(s => { + const isSelf = s.session_id === selfId; + const kind = (s.participant_type || 'agent').toLowerCase(); + const voice = s.assigned_voice || '—'; + const conflict = s.voice_conflict + ? '' : ''; + const pid = escapeHtml(s.participant_id || ''); + const sid = escapeHtml(s.session_id || ''); + const ageSec = s.registered_at ? Math.round(Date.now()/1000 - s.registered_at) : 0; + const ageStr = ageSec < 60 ? ageSec + 's' : + ageSec < 3600 ? Math.round(ageSec/60) + 'm' : + Math.round(ageSec/3600) + 'h'; + const youMarker = isSelf ? ' you' : ''; + return ( + '
  • ' + + '' + + '
    ' + + '
    ' + pid + youMarker + conflict + '
    ' + + '
    ' + + '' + escapeHtml(voice) + '' + + '' + sid.slice(0,16) + + (sid.length > 16 ? '…' : '') + '' + + '' + ageStr + '' + + '
    ' + + '
    ' + + '' + escapeHtml(kind) + '' + + '
  • ' + ); + }).join(''); + } + + // Toggle panel open/close + const pillEl = document.getElementById('participants-toggle'); + const panelEl = document.getElementById('participants-panel'); + if (pillEl && panelEl) { + pillEl.addEventListener('click', (e) => { + e.stopPropagation(); + panelEl.classList.toggle('open'); + }); + // Click-outside-to-close + document.addEventListener('click', (e) => { + if (!panelEl.contains(e.target) && !pillEl.contains(e.target)) { + panelEl.classList.remove('open'); + } + }); + } + + async function deregisterOnUnload() { + const s = window.__mod3Session; + if (!s || !s.session_id) return; + // Use navigator.sendBeacon when available — it survives the unload event. + const url = `/v1/sessions/${encodeURIComponent(s.session_id)}/deregister`; + try { + if (navigator.sendBeacon) { + navigator.sendBeacon(url, new Blob(['{}'], {type: 'application/json'})); + } else { + // Blocking fallback for browsers without sendBeacon. + fetch(url, {method: 'POST', keepalive: true, body: '{}', + headers: {'Content-Type': 'application/json'}}); + } + } catch { + // Best-effort; server sweep will clean up eventually. + } + } + + window.addEventListener('beforeunload', deregisterOnUnload); + + // Kick off: register, then start polling. Register THEN poll so the + // first roster render can mark self correctly. + registerSession().finally(() => { + pollSessions(); + setInterval(pollSessions, POLL_INTERVAL_MS); + }); +})(); + +/** + * Wave 4.3 — Per-session audio WebSocket. + * + * After the session registers, open ``/ws/audio/{session_id}`` and play + * inbound WAV blobs through a Web Audio context. The contract from the + * server is: + * + * 1. JSON text frame: {type: "audio_header", session_id, job_id, + * duration_sec, sample_rate, bytes, format: "wav", seq} + * 2. Binary frame: raw WAV bytes. + * + * We pair them up, decode via AudioContext.decodeAudioData, and play. + * Chunking is explicitly whole-blob for v1 — decodeAudioData wants a + * complete WAV. A future revision can stream PCM through an AudioWorklet + * for lower latency; the header envelope is the forward-compatibility + * seam. + * + * Reconnect: on close, reconnect with exponential backoff up to 30s. The + * kernel's subscriber-check runs on every synthesize so a transient gap + * just means one turn plays through afplay — the next turn lands back in + * the browser. + */ +(function setupAudioSubscription() { + let ws = null; + let audioCtx = null; + let reconnectDelay = 1000; + let currentSessionId = null; + let pendingHeader = null; // last JSON header waiting for its binary pair + let pendingSinkId = null; // selected output device id, applied once audioCtx exists + + async function applySinkId(ctx, sinkId) { + if (!ctx || typeof ctx.setSinkId !== 'function') return false; + // setSinkId accepts "" for default, or a deviceId. Chrome's enumerateDevices + // uses "" for the system default; we pass that through. + const id = (sinkId == null) ? '' : sinkId; + try { + await ctx.setSinkId(id); + console.log('[AudioWS] sink bound to', id || '(default)'); + return true; + } catch (err) { + console.warn('[AudioWS] setSinkId failed:', err); + return false; + } + } + + // Exposed so the outer audio-output