diff --git a/src/web/public/app.js b/src/web/public/app.js index d26e68ae..d2368f3f 100644 --- a/src/web/public/app.js +++ b/src/web/public/app.js @@ -286,6 +286,12 @@ class CodemanApp { this.totalTokens = 0; this.globalStats = null; // Global token/cost stats across all sessions this.eventSource = null; + // Stable per-page client ID — lets the server target this connection + // for live filter updates (POST /api/events/subscribe) without forcing + // an SSE reconnect on session switches. + this._clientId = (typeof crypto !== 'undefined' && crypto.randomUUID) + ? crypto.randomUUID() + : 'c-' + Math.random().toString(36).slice(2) + Date.now().toString(36); this.terminal = null; this.fitAddon = null; this.activeSessionId = null; @@ -696,6 +702,28 @@ class CodemanApp { // SSE Connection // ═══════════════════════════════════════════════════════════════ + /** + * POST a live subscription update so the server filters terminal events + * to the given session(s) for this client. Fire-and-forget — failures + * are non-fatal because we'll still get every event we don't want + * (just at higher cost), and the next reconnect carries the filter via + * the SSE query string. + */ + _updateSseSubscription(sessionId) { + try { + const body = JSON.stringify({ + clientId: this._clientId, + sessions: sessionId ? [sessionId] : null, + }); + fetch('/api/events/subscribe', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body, + keepalive: true, + }).catch(() => { /* non-fatal */ }); + } catch { /* non-fatal */ } + } + connectSSE() { // Check if browser is offline if (!navigator.onLine) { @@ -725,7 +753,13 @@ class CodemanApp { this.setConnectionStatus('reconnecting'); } - this.eventSource = new EventSource('/api/events'); + // Build URL with stable client ID and (if known) the active-session + // filter so the server only streams session:terminal events for the + // session we're rendering. Lifecycle/metadata events are sent globally + // regardless of filter (server side). + const _sseParams = new URLSearchParams({ clientId: this._clientId }); + if (this.activeSessionId) _sseParams.set('sessions', this.activeSessionId); + this.eventSource = new EventSource(`/api/events?${_sseParams.toString()}`); // Store all event listeners for cleanup on reconnect const listeners = []; @@ -2424,6 +2458,12 @@ class CodemanApp { this._cleanupPreviousSession(sessionId); this.activeSessionId = sessionId; try { localStorage.setItem('codeman-active-session', sessionId); } catch {} + // Narrow SSE filter to the active session — server stops streaming + // session:terminal events for other sessions to this client. Cuts + // SSE traffic ~Nx for N concurrent sessions. Fire-and-forget; on the + // rare race where server doesn't know our clientId yet, the next + // selectSession or reconnect catches up. + this._updateSseSubscription(sessionId); this.hideWelcome(); // Clear idle hooks on view, but keep action hooks until user interacts this.clearPendingHooks(sessionId, 'idle_prompt'); diff --git a/src/web/server.ts b/src/web/server.ts index 57c6a55b..1daaf52c 100644 --- a/src/web/server.ts +++ b/src/web/server.ts @@ -579,9 +579,11 @@ export class WebServer extends EventEmitter { } // Parse optional session subscription filter from query parameter. - // /api/events?sessions=id1,id2 — client only receives events for those sessions. - // /api/events (no param) — client receives all events (backwards-compatible). - const query = req.query as { sessions?: string }; + // /api/events?sessions=id1,id2 — client only receives session:terminal + // events for those sessions (other events broadcast to all clients). + // /api/events?clientId= — enables live filter updates via + // POST /api/events/subscribe without reconnecting. + const query = req.query as { sessions?: string; clientId?: string }; let sessionFilter: Set | null = null; if (query.sessions) { const ids = query.sessions @@ -592,6 +594,7 @@ export class WebServer extends EventEmitter { sessionFilter = new Set(ids); } } + const clientId = typeof query.clientId === 'string' && query.clientId ? query.clientId : undefined; reply.raw.writeHead(200, { 'Content-Type': 'text/event-stream', @@ -603,7 +606,7 @@ export class WebServer extends EventEmitter { // Track tunnel clients — cloudflared proxies locally so req.ip is always // 127.0.0.1; detect tunnel traffic via Cf-Connecting-Ip header instead. const isRemote = !!req.headers['cf-connecting-ip']; - this.sse.addClient(reply, sessionFilter, isRemote); + this.sse.addClient(reply, sessionFilter, isRemote, clientId); // Send initial state // Use light state for SSE init to avoid sending 2MB+ terminal buffers @@ -618,6 +621,20 @@ export class WebServer extends EventEmitter { }); }); + // Live subscription update — change a connected client's session filter + // without forcing an SSE reconnect. Body: { clientId, sessions: string[] | null } + // Empty/null sessions array = remove filter (receive all session:terminal events). + this.app.post('/api/events/subscribe', (req, reply) => { + const body = (req.body || {}) as { clientId?: string; sessions?: string[] | null }; + if (!body.clientId || typeof body.clientId !== 'string') { + reply.code(400).send({ error: 'clientId required' }); + return; + } + const sessions = Array.isArray(body.sessions) ? body.sessions.filter((s) => typeof s === 'string') : null; + const updated = this.sse.updateClientFilter(body.clientId, sessions); + reply.code(updated ? 204 : 404).send(); + }); + // Global error handler for structured errors thrown by findSessionOrFail this.app.setErrorHandler((error, _req, reply) => { const statusCode = (error as { statusCode?: number }).statusCode ?? 500; diff --git a/src/web/sse-stream-manager.ts b/src/web/sse-stream-manager.ts index 8a2aecba..539846ef 100644 --- a/src/web/sse-stream-manager.ts +++ b/src/web/sse-stream-manager.ts @@ -48,6 +48,8 @@ export class SseStreamManager { * or `null` meaning "receive all events" (backwards-compatible default). */ private sseClients: Map | null> = new Map(); + /** Optional client-supplied IDs → reply, for live filter updates without reconnecting */ + private sseClientsById: Map = new Map(); /** SSE clients connecting from non-localhost (i.e. through tunnel) */ private remoteSseClients: Set = new Set(); /** Clients with backpressure — skip writes until 'drain' fires */ @@ -103,17 +105,43 @@ export class SseStreamManager { this._isTunnelActive = active; } - addClient(reply: FastifyReply, sessionFilter: Set | null, isRemote: boolean): void { + addClient(reply: FastifyReply, sessionFilter: Set | null, isRemote: boolean, clientId?: string): void { this.sseClients.set(reply, sessionFilter); if (isRemote) { this.remoteSseClients.add(reply); } + if (clientId) { + // If a previous reply registered the same id (reconnect), drop the old one. + const prev = this.sseClientsById.get(clientId); + if (prev && prev !== reply) { + this.sseClients.delete(prev); + this.remoteSseClients.delete(prev); + this.backpressuredClients.delete(prev); + } + this.sseClientsById.set(clientId, reply); + } } removeClient(reply: FastifyReply): void { this.sseClients.delete(reply); this.remoteSseClients.delete(reply); this.backpressuredClients.delete(reply); + // Clear any clientId mappings pointing at this reply + for (const [id, r] of this.sseClientsById) { + if (r === reply) this.sseClientsById.delete(id); + } + } + + /** + * Update an existing client's session subscription filter without forcing + * an SSE reconnect. Returns true if the client was found and updated. + */ + updateClientFilter(clientId: string, sessions: string[] | null): boolean { + const reply = this.sseClientsById.get(clientId); + if (!reply || !this.sseClients.has(reply)) return false; + const filter = sessions && sessions.length > 0 ? new Set(sessions) : null; + this.sseClients.set(reply, filter); + return true; } /** Send a single SSE event to a specific client. */ @@ -188,35 +216,18 @@ export class SseStreamManager { console.error(`[Server] Failed to serialize SSE event "${event}":`, err); return; } - // Extract sessionId from event data for subscription filtering. - const eventSessionId = this.extractSessionId(event, data); - - for (const [client, filter] of this.sseClients) { - // No filter (null) = receive everything. Otherwise, skip if event is - // session-scoped and the session isn't in the client's subscription set. - if (filter && eventSessionId && !filter.has(eventSessionId)) continue; + // Subscription filtering is intentionally NOT applied here. The + // `?sessions=` filter is intended to suppress only the high-volume + // terminal stream — lifecycle/metadata events (session:created, + // session:updated, ralph:*, hook:*, etc.) are needed for correct UI + // state across all sessions even when the client subscribes to a single + // active session's terminal output. Terminal events bypass this method + // entirely (see flushSessionTerminalBatch — it applies the filter). + for (const [client] of this.sseClients) { this.sendSSEPreformatted(client, message); } } - /** - * Extract the session ID from an event's data payload for subscription filtering. - * Returns the sessionId string if the event is session-scoped, or null for global events. - */ - private extractSessionId(event: string, data: unknown): string | null { - if (data == null || typeof data !== 'object') return null; - const record = data as Record; - - // Most session-scoped events use `sessionId` - if (typeof record.sessionId === 'string') return record.sessionId; - - // Session lifecycle events (session:*) use `id` from the session state object - if (typeof record.id === 'string' && event.startsWith('session:')) return record.id; - - // No session ID found — treat as global event (sent to all clients) - return null; - } - // ========== Terminal Data Batching ========== // Batch terminal data for better performance (60fps)