diff --git a/packages/event-service/src/__tests__/client.test.ts b/packages/event-service/src/__tests__/client.test.ts index 9527dec426..01fe89f415 100644 --- a/packages/event-service/src/__tests__/client.test.ts +++ b/packages/event-service/src/__tests__/client.test.ts @@ -387,4 +387,102 @@ describe('EventServiceClient', () => { expect(err).toBeInstanceOf(Error); expect(err.name).toBe('HandshakeTimeoutError'); }); + + describe('subscribe/unsubscribe refcounting', () => { + function sentMessages() { + return lastMockWs.sent.map(s => JSON.parse(s) as unknown); + } + + it('only sends one wire context.subscribe when two consumers subscribe to the same context', async () => { + const client = makeClient(); + await client.connect(); + lastMockWs.sent = []; + + client.subscribe(['room:1']); + client.subscribe(['room:1']); + + expect(sentMessages()).toEqual([{ type: 'context.subscribe', contexts: ['room:1'] }]); + }); + + it('keeps the subscription alive when one of two consumers unsubscribes', async () => { + const client = makeClient(); + await client.connect(); + lastMockWs.sent = []; + + client.subscribe(['room:1']); + client.subscribe(['room:1']); + client.unsubscribe(['room:1']); + + // Only the initial 0→1 subscribe should have been sent. No unsubscribe yet + // because the second consumer is still holding a ref. + expect(sentMessages()).toEqual([{ type: 'context.subscribe', contexts: ['room:1'] }]); + }); + + it('sends context.unsubscribe only when the last consumer drops the ref (1→0)', async () => { + const client = makeClient(); + await client.connect(); + lastMockWs.sent = []; + + client.subscribe(['room:1']); + client.subscribe(['room:1']); + client.unsubscribe(['room:1']); + client.unsubscribe(['room:1']); + + expect(sentMessages()).toEqual([ + { type: 'context.subscribe', contexts: ['room:1'] }, + { type: 'context.unsubscribe', contexts: ['room:1'] }, + ]); + }); + + it('handles a mixed batch: only newly-active contexts get sent', async () => { + const client = makeClient(); + await client.connect(); + client.subscribe(['room:1']); + lastMockWs.sent = []; + + // room:1 already at refcount 1, room:2 is new. Only room:2 should hit the wire. + client.subscribe(['room:1', 'room:2']); + + expect(sentMessages()).toEqual([{ type: 'context.subscribe', contexts: ['room:2'] }]); + }); + + it('extra unsubscribes for an unknown context are no-ops', async () => { + const client = makeClient(); + await client.connect(); + lastMockWs.sent = []; + + // Never subscribed — must not crash and must not emit a wire message. + client.unsubscribe(['ghost']); + + expect(sentMessages()).toEqual([]); + }); + + it('resubscribe-on-reconnect deduplicates by context (one entry per active context)', async () => { + vi.useFakeTimers(); + const client = makeClient(); + await client.connect(); + + // Two consumers hold the same context. + client.subscribe(['room:1']); + client.subscribe(['room:1']); + + // Drop the connection — auto-reconnect kicks in. + lastMockWs.triggerClose(); + await vi.advanceTimersByTimeAsync(2000); + expect(allMockWs.length).toBe(2); + // The second mock socket also auto-triggers open via the global stub. + await vi.advanceTimersByTimeAsync(0); + + const resubMessages = allMockWs[1].sent + .map(s => JSON.parse(s) as { type: string; contexts?: string[] }) + .filter(m => m.type === 'context.subscribe'); + + // Exactly one resubscribe message containing the context exactly once, + // regardless of how many consumers hold the ref. + expect(resubMessages).toHaveLength(1); + expect(resubMessages[0]?.contexts).toEqual(['room:1']); + + vi.useRealTimers(); + }); + }); }); diff --git a/packages/event-service/src/client.ts b/packages/event-service/src/client.ts index 1bda87f200..5a9490bf2d 100644 --- a/packages/event-service/src/client.ts +++ b/packages/event-service/src/client.ts @@ -48,7 +48,11 @@ export class EventServiceClient { private ws: WebSocket | null = null; private connected = false; private eventHandlers = new Map void>>(); - private activeContexts = new Set(); + // Refcounted so multiple consumers can independently subscribe to and + // unsubscribe from the same context without trampling each other. The wire + // `context.subscribe`/`context.unsubscribe` messages are only sent on the + // 0↔1 transitions; intermediate refcount churn stays client-side. + private activeContexts = new Map(); private reconnectTimer: ReturnType | null = null; private destroyed = false; private reconnectAttempts = 0; @@ -219,20 +223,31 @@ export class EventServiceClient { } subscribe(contexts: string[]): void { + const newlyActive: string[] = []; for (const ctx of contexts) { - this.activeContexts.add(ctx); + const next = (this.activeContexts.get(ctx) ?? 0) + 1; + this.activeContexts.set(ctx, next); + if (next === 1) newlyActive.push(ctx); } - if (this.isConnected()) { - this.send({ type: 'context.subscribe', contexts }); + if (newlyActive.length > 0 && this.isConnected()) { + this.send({ type: 'context.subscribe', contexts: newlyActive }); } } unsubscribe(contexts: string[]): void { + const released: string[] = []; for (const ctx of contexts) { - this.activeContexts.delete(ctx); + const current = this.activeContexts.get(ctx); + if (current === undefined) continue; + if (current <= 1) { + this.activeContexts.delete(ctx); + released.push(ctx); + } else { + this.activeContexts.set(ctx, current - 1); + } } - if (this.isConnected()) { - this.send({ type: 'context.unsubscribe', contexts }); + if (released.length > 0 && this.isConnected()) { + this.send({ type: 'context.unsubscribe', contexts: released }); } } @@ -315,7 +330,7 @@ export class EventServiceClient { if (this.activeContexts.size > 0) { this.send({ type: 'context.subscribe', - contexts: Array.from(this.activeContexts), + contexts: Array.from(this.activeContexts.keys()), }); } }