Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion src/web/public/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ class CodemanApp {
this.totalTokens = 0;
this.globalStats = null; // Global token/cost stats across all sessions
this.eventSource = null;
// Stable per-page client ID — lets the server target this connection
// for live filter updates (POST /api/events/subscribe) without forcing
// an SSE reconnect on session switches.
this._clientId = (typeof crypto !== 'undefined' && crypto.randomUUID)
? crypto.randomUUID()
: 'c-' + Math.random().toString(36).slice(2) + Date.now().toString(36);
this.terminal = null;
this.fitAddon = null;
this.activeSessionId = null;
Expand Down Expand Up @@ -696,6 +702,28 @@ class CodemanApp {
// SSE Connection
// ═══════════════════════════════════════════════════════════════

/**
* POST a live subscription update so the server filters terminal events
* to the given session(s) for this client. Fire-and-forget — failures
* are non-fatal because we'll still get every event we don't want
* (just at higher cost), and the next reconnect carries the filter via
* the SSE query string.
*/
_updateSseSubscription(sessionId) {
try {
const body = JSON.stringify({
clientId: this._clientId,
sessions: sessionId ? [sessionId] : null,
});
fetch('/api/events/subscribe', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body,
keepalive: true,
}).catch(() => { /* non-fatal */ });
} catch { /* non-fatal */ }
}

connectSSE() {
// Check if browser is offline
if (!navigator.onLine) {
Expand Down Expand Up @@ -725,7 +753,13 @@ class CodemanApp {
this.setConnectionStatus('reconnecting');
}

this.eventSource = new EventSource('/api/events');
// Build URL with stable client ID and (if known) the active-session
// filter so the server only streams session:terminal events for the
// session we're rendering. Lifecycle/metadata events are sent globally
// regardless of filter (server side).
const _sseParams = new URLSearchParams({ clientId: this._clientId });
if (this.activeSessionId) _sseParams.set('sessions', this.activeSessionId);
this.eventSource = new EventSource(`/api/events?${_sseParams.toString()}`);

// Store all event listeners for cleanup on reconnect
const listeners = [];
Expand Down Expand Up @@ -2424,6 +2458,12 @@ class CodemanApp {
this._cleanupPreviousSession(sessionId);
this.activeSessionId = sessionId;
try { localStorage.setItem('codeman-active-session', sessionId); } catch {}
// Narrow SSE filter to the active session — server stops streaming
// session:terminal events for other sessions to this client. Cuts
// SSE traffic ~Nx for N concurrent sessions. Fire-and-forget; on the
// rare race where server doesn't know our clientId yet, the next
// selectSession or reconnect catches up.
this._updateSseSubscription(sessionId);
this.hideWelcome();
// Clear idle hooks on view, but keep action hooks until user interacts
this.clearPendingHooks(sessionId, 'idle_prompt');
Expand Down
25 changes: 21 additions & 4 deletions src/web/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -579,9 +579,11 @@ export class WebServer extends EventEmitter {
}

// Parse optional session subscription filter from query parameter.
// /api/events?sessions=id1,id2 — client only receives events for those sessions.
// /api/events (no param) — client receives all events (backwards-compatible).
const query = req.query as { sessions?: string };
// /api/events?sessions=id1,id2 — client only receives session:terminal
// events for those sessions (other events broadcast to all clients).
// /api/events?clientId=<uuid> — enables live filter updates via
// POST /api/events/subscribe without reconnecting.
const query = req.query as { sessions?: string; clientId?: string };
let sessionFilter: Set<string> | null = null;
if (query.sessions) {
const ids = query.sessions
Expand All @@ -592,6 +594,7 @@ export class WebServer extends EventEmitter {
sessionFilter = new Set(ids);
}
}
const clientId = typeof query.clientId === 'string' && query.clientId ? query.clientId : undefined;

reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
Expand All @@ -603,7 +606,7 @@ export class WebServer extends EventEmitter {
// Track tunnel clients — cloudflared proxies locally so req.ip is always
// 127.0.0.1; detect tunnel traffic via Cf-Connecting-Ip header instead.
const isRemote = !!req.headers['cf-connecting-ip'];
this.sse.addClient(reply, sessionFilter, isRemote);
this.sse.addClient(reply, sessionFilter, isRemote, clientId);

// Send initial state
// Use light state for SSE init to avoid sending 2MB+ terminal buffers
Expand All @@ -618,6 +621,20 @@ export class WebServer extends EventEmitter {
});
});

// Live subscription update — change a connected client's session filter
// without forcing an SSE reconnect. Body: { clientId, sessions: string[] | null }
// Empty/null sessions array = remove filter (receive all session:terminal events).
this.app.post('/api/events/subscribe', (req, reply) => {
const body = (req.body || {}) as { clientId?: string; sessions?: string[] | null };
if (!body.clientId || typeof body.clientId !== 'string') {
reply.code(400).send({ error: 'clientId required' });
return;
}
const sessions = Array.isArray(body.sessions) ? body.sessions.filter((s) => typeof s === 'string') : null;
const updated = this.sse.updateClientFilter(body.clientId, sessions);
reply.code(updated ? 204 : 404).send();
});

// Global error handler for structured errors thrown by findSessionOrFail
this.app.setErrorHandler((error, _req, reply) => {
const statusCode = (error as { statusCode?: number }).statusCode ?? 500;
Expand Down
63 changes: 37 additions & 26 deletions src/web/sse-stream-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export class SseStreamManager {
* or `null` meaning "receive all events" (backwards-compatible default).
*/
private sseClients: Map<FastifyReply, Set<string> | null> = new Map();
/** Optional client-supplied IDs → reply, for live filter updates without reconnecting */
private sseClientsById: Map<string, FastifyReply> = new Map();
/** SSE clients connecting from non-localhost (i.e. through tunnel) */
private remoteSseClients: Set<FastifyReply> = new Set();
/** Clients with backpressure — skip writes until 'drain' fires */
Expand Down Expand Up @@ -103,17 +105,43 @@ export class SseStreamManager {
this._isTunnelActive = active;
}

addClient(reply: FastifyReply, sessionFilter: Set<string> | null, isRemote: boolean): void {
addClient(reply: FastifyReply, sessionFilter: Set<string> | null, isRemote: boolean, clientId?: string): void {
this.sseClients.set(reply, sessionFilter);
if (isRemote) {
this.remoteSseClients.add(reply);
}
if (clientId) {
// If a previous reply registered the same id (reconnect), drop the old one.
const prev = this.sseClientsById.get(clientId);
if (prev && prev !== reply) {
this.sseClients.delete(prev);
this.remoteSseClients.delete(prev);
this.backpressuredClients.delete(prev);
}
this.sseClientsById.set(clientId, reply);
}
}

removeClient(reply: FastifyReply): void {
this.sseClients.delete(reply);
this.remoteSseClients.delete(reply);
this.backpressuredClients.delete(reply);
// Clear any clientId mappings pointing at this reply
for (const [id, r] of this.sseClientsById) {
if (r === reply) this.sseClientsById.delete(id);
}
}

/**
* Update an existing client's session subscription filter without forcing
* an SSE reconnect. Returns true if the client was found and updated.
*/
updateClientFilter(clientId: string, sessions: string[] | null): boolean {
const reply = this.sseClientsById.get(clientId);
if (!reply || !this.sseClients.has(reply)) return false;
const filter = sessions && sessions.length > 0 ? new Set(sessions) : null;
this.sseClients.set(reply, filter);
return true;
}

/** Send a single SSE event to a specific client. */
Expand Down Expand Up @@ -188,35 +216,18 @@ export class SseStreamManager {
console.error(`[Server] Failed to serialize SSE event "${event}":`, err);
return;
}
// Extract sessionId from event data for subscription filtering.
const eventSessionId = this.extractSessionId(event, data);

for (const [client, filter] of this.sseClients) {
// No filter (null) = receive everything. Otherwise, skip if event is
// session-scoped and the session isn't in the client's subscription set.
if (filter && eventSessionId && !filter.has(eventSessionId)) continue;
// Subscription filtering is intentionally NOT applied here. The
// `?sessions=` filter is intended to suppress only the high-volume
// terminal stream — lifecycle/metadata events (session:created,
// session:updated, ralph:*, hook:*, etc.) are needed for correct UI
// state across all sessions even when the client subscribes to a single
// active session's terminal output. Terminal events bypass this method
// entirely (see flushSessionTerminalBatch — it applies the filter).
for (const [client] of this.sseClients) {
this.sendSSEPreformatted(client, message);
}
}

/**
* Extract the session ID from an event's data payload for subscription filtering.
* Returns the sessionId string if the event is session-scoped, or null for global events.
*/
private extractSessionId(event: string, data: unknown): string | null {
if (data == null || typeof data !== 'object') return null;
const record = data as Record<string, unknown>;

// Most session-scoped events use `sessionId`
if (typeof record.sessionId === 'string') return record.sessionId;

// Session lifecycle events (session:*) use `id` from the session state object
if (typeof record.id === 'string' && event.startsWith('session:')) return record.id;

// No session ID found — treat as global event (sent to all clients)
return null;
}

// ========== Terminal Data Batching ==========

// Batch terminal data for better performance (60fps)
Expand Down