From b1d2ded048f12a630e2ad83cacf72bd9de9691be Mon Sep 17 00:00:00 2001 From: Danilo Alonso Date: Sat, 25 Apr 2026 10:16:11 -0400 Subject: [PATCH 1/2] feat: add refuse calculator and session.complete() (#4) Adds two primitives for spec-compliant 204 responses. refuse: subscription config predicate that runs before session creation. Returning true responds with 204. session.complete(): mid-stream method that writes a final event with session.id, records the id in a hapi server cache, then closes. The next reconnect with that id in Last-Event-ID gets 204. Completion cache is configurable via the completion plugin option (cache, segment, expiresIn). Default: hapi's in-process cache, 5 minute TTL. --- API.md | 59 +++++++++- package.json | 1 + src/index.ts | 2 + src/session.ts | 27 +++++ src/sse.ts | 59 +++++++++- src/subscription.ts | 3 +- test/session.test.ts | 95 ++++++++++++++++ test/sse.test.ts | 258 +++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 501 insertions(+), 3 deletions(-) diff --git a/API.md b/API.md index dcee662..095d050 100644 --- a/API.md +++ b/API.md @@ -39,6 +39,11 @@ await server.register({ headers: { 'X-Custom': 'value' }, // extra headers on every SSE response backpressure: { maxBytes: 65536, strategy: 'drop' }, // optional hooks: { ... }, // optional, see Hooks section + completion: { // optional, see Stream completion section + cache: 'my-redis-cache', // named cache engine (default: hapi default) + segment: 'completed-sessions', // segment name + expiresIn: 5 * 60 * 1000, // TTL for completion tokens (default: 5 min) + }, }, }); ``` @@ -54,6 +59,7 @@ server.sse.subscription('/chat/{room}', { auth: 'jwt', retry: 5000, keepAlive: { interval: 10_000 }, + refuse: (request) => server.app.shuttingDown || circuitBreaker.isOpen(), filter: async (path, message, { credentials, params, internal }) => { if (params.room !== internal.targetRoom) { return false; // don't deliver @@ -74,6 +80,7 @@ server.sse.subscription('/chat/{room}', { | `auth` | `RouteOptions['auth']` | hapi auth config for the route | | `retry` | `number \| null` | Override plugin-level retry | | `keepAlive` | `{ interval: number } \| false` | Override plugin-level keep-alive | +| `refuse` | `(request) => boolean \| Promise` | Server-state predicate. Runs before the session is created. Returning `true` responds with `204 No Content`, telling the EventSource not to reconnect. | | `filter` | `(path, message, opts) => boolean \| { override } \| Promise<...>` | Per-session delivery filter | | `onSubscribe` | `(session, path, params) => void \| Promise` | Fires before SSE headers are sent. Throwing a Boom error returns that HTTP error to the client. | | `onUnsubscribe` | `(session, path, params) => void` | Fires on client disconnect | @@ -189,13 +196,63 @@ The `Session` object represents a single SSE connection. ```typescript session.push(data, event?, id?) // Send an event. Returns boolean (false if dropped/closed). session.comment(text?) // Send a comment (invisible to EventSource) -session.close() // End the connection +session.close() // End the connection (client will reconnect) +session.complete() // Mark the stream as done; next reconnect gets HTTP 204 +session.id // UUID generated at construction; used as the completion token session.isOpen // true if connection is still active session.connectedAt // Unix timestamp (ms) when the session was created session.lastEventId // Value of Last-Event-ID header (empty string if absent) session.request // The original hapi Request object ``` +**Stream completion** — when the stream has done its work and the client should not reconnect, call `session.complete()` instead of `session.close()`: + +```typescript +server.route({ + method: 'GET', + path: '/jobs/{id}/progress', + handler: { + sse: { + stream: async (request, session) => { + for await (const update of jobProgress(request.params.id)) { + session.push(update, 'progress', update.eventId); + } + await session.complete(); // emits a final event with a token id, closes the stream + }, + }, + }, +}); +``` + +`complete()` writes a final SSE event of type `complete` with `session.id` as its `id` field. The browser saves that UUID as its `Last-Event-ID`. If the EventSource reconnects, the plugin sees the token in the `Last-Event-ID` header and responds `204 No Content`. Per the [WHATWG SSE spec](https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events-intro), a 204 response makes the EventSource stop reconnecting. + +Clients can react to completion via `eventSource.addEventListener('complete', handler)`. + +**Completion cache** — tokens live in a hapi server cache (default: in-process catbox-memory, 5 minute TTL). Each token is consumed on first redemption. Use the `completion` plugin option to customize: + +```typescript +const server = Hapi.server({ + port: 3000, + cache: [ + { + name: 'redis-cache', + provider: { constructor: require('@hapi/catbox-redis'), options: { host: '127.0.0.1' } }, + }, + ], +}); + +await server.register({ + plugin: SsePlugin, + options: { + completion: { + cache: 'redis-cache', // share completion state across processes + segment: 'sse-completions', + expiresIn: 10 * 60 * 1000, // 10 minutes + }, + }, +}); +``` + **Metadata** — attach arbitrary key-value data to a session: ```typescript diff --git a/package.json b/package.json index 36f1672..41a21b7 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "@hapi/hapi": "^21.4.7", "@types/node": "^22.19.15", "@vitest/coverage-v8": "^4.1.0", + "eventsource": "^4.1.0", "tsdown": "^0.21.4", "typescript": "^5.9.3", "typescript-eslint": "^8.57.1", diff --git a/src/index.ts b/src/index.ts index 37d2c05..b662263 100644 --- a/src/index.ts +++ b/src/index.ts @@ -11,6 +11,8 @@ export type { SubscriptionConfig, SubscriptionInfo, FilterOptions, + CompletionCacheOptions, + CompletionStore, } from './sse.js'; export type { Replayer, ReplayEntry } from './replayer.js'; export { FiniteReplayer, ValidReplayer } from './replayer.js'; diff --git a/src/session.ts b/src/session.ts index 6f92bf6..10dc7be 100644 --- a/src/session.ts +++ b/src/session.ts @@ -1,5 +1,6 @@ import type { Request } from '@hapi/hapi'; import type { ServerResponse } from 'node:http'; +import { randomUUID } from 'node:crypto'; import { EventBuffer } from './event-buffer.js'; @@ -18,6 +19,7 @@ export interface SessionOptions { } export class Session { + readonly id: string = randomUUID(); readonly request: Request; readonly lastEventId: string; readonly connectedAt: number; @@ -43,6 +45,8 @@ export class Session { #maxDurationTimer: ReturnType | null = null; /** @internal */ #closed = false; + /** @internal */ + #initialized = false; constructor(options: SessionOptions) { this.request = options.request; @@ -81,6 +85,12 @@ export class Session { } initialize(): void { + if (this.#closed || this.#initialized) { + return; + } + + this.#initialized = true; + this.#res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', @@ -165,6 +175,23 @@ export class Session { this.#flush(); } + async complete(): Promise { + if (this.#closed || !this.#initialized) { + return; + } + + this.#buffer.push({ complete: true }, 'complete', this.id); + this.#flush(); + + const store = this.request.route.realm.plugins['@hapi/sse']?.completionStore; + + if (store) { + await store.set(this.id, true, 0); + } + + this.close(); + } + close(): void { if (this.#closed) { return; diff --git a/src/sse.ts b/src/sse.ts index 1f20d34..83213d0 100644 --- a/src/sse.ts +++ b/src/sse.ts @@ -19,12 +19,23 @@ export interface SseHooks { onPublish?: (path: string, data: unknown, deliveryCount: number) => void; } +export interface CompletionCacheOptions { + cache?: string; + segment?: string; + expiresIn?: number; +} + +export interface CompletionStore { + set(id: string, value: boolean, ttl: number): Promise; +} + export interface SsePluginOptions { keepAlive?: { interval: number } | false; retry?: number | null; headers?: Record; hooks?: SseHooks; backpressure?: BackpressureOptions; + completion?: CompletionCacheOptions; } export interface SseHandlerOptions { @@ -68,6 +79,12 @@ declare module '@hapi/hapi' { interface HandlerDecorations { sse?: SseHandlerOptions; } + + interface PluginsStates { + '@hapi/sse'?: { + completionStore: CompletionStore; + }; + } } const RETRY_FLOOR = 1000; @@ -76,7 +93,12 @@ const clampRetry = (value: number | null): number | null => { return value === null ? null : Math.max(value, RETRY_FLOOR); }; -const defaults: Required> = { +const COMPLETION_DEFAULTS: CompletionCacheOptions = { + segment: 'completed-sse-sessions', + expiresIn: 5 * 60 * 1000, +}; + +const defaults: Required> = { keepAlive: { interval: 15_000 }, retry: 2000, headers: {}, @@ -102,12 +124,19 @@ const hooksSchema = Joi.object({ onPublish: Joi.function(), }); +const completionSchema = Joi.object({ + cache: Joi.string(), + segment: Joi.string(), + expiresIn: Joi.number().integer().positive(), +}); + const pluginOptionsSchema = Joi.object({ keepAlive: keepAliveSchema, retry: retrySchema, headers: headersSchema, hooks: hooksSchema, backpressure: backpressureSchema, + completion: completionSchema, }).label('SsePluginOptions'); const replayerSchema = Joi.object({ @@ -121,6 +150,7 @@ const replayerSchema = Joi.object({ const subscriptionConfigSchema = Joi.object({ auth: Joi.any(), filter: Joi.function(), + refuse: Joi.function(), onSubscribe: Joi.function(), onUnsubscribe: Joi.function(), onReconnect: Joi.function(), @@ -150,6 +180,11 @@ export const SsePlugin: NamedPlugin = { const registry = new SubscriptionRegistry(); const hooks = options.hooks; + const completion = { ...COMPLETION_DEFAULTS, ...options.completion }; + const completionStore = server.cache(completion); + + server.realm.plugins['@hapi/sse'] = { completionStore }; + let totalConnections = 0; let totalDisconnections = 0; let totalPublishes = 0; @@ -188,6 +223,24 @@ export const SsePlugin: NamedPlugin = { handler: async (request: Request, h: ResponseToolkit) => { const matched = registry.matchPath(request.path)!; + if (subConfig.refuse && (await subConfig.refuse(request))) { + request.raw.res.writeHead(204); + request.raw.res.end(); + + return h.abandon; + } + + const lastEventId = request.headers['last-event-id']; + const incomingId = Array.isArray(lastEventId) ? lastEventId[0] : lastEventId; + + if (incomingId && (await completionStore.get(incomingId))) { + await completionStore.drop(incomingId); + request.raw.res.writeHead(204); + request.raw.res.end(); + + return h.abandon; + } + const maxSessions = subConfig.maxSessions; if (maxSessions && registry.subscriptionSessionCount(matched.pattern) >= maxSessions) { @@ -207,6 +260,10 @@ export const SsePlugin: NamedPlugin = { await subConfig.onSubscribe(session, request.path, matched.params); } + if (!session.isOpen) { + return h.abandon; + } + session.initialize(); registry.addSession(matched.pattern, session, matched.params, request.path); totalConnections++; diff --git a/src/subscription.ts b/src/subscription.ts index 98466d8..7e633b3 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -1,4 +1,4 @@ -import type { RouteOptions } from '@hapi/hapi'; +import type { Request, RouteOptions } from '@hapi/hapi'; import { Session } from './session.js'; import type { Replayer } from './replayer.js'; @@ -16,6 +16,7 @@ export interface SubscriptionConfig { message: T, options: FilterOptions, ) => boolean | { override: unknown } | Promise; + refuse?: (request: Request) => boolean | Promise; onSubscribe?: (session: Session, path: string, params: Record) => void | Promise; onUnsubscribe?: (session: Session, path: string, params: Record) => void; onReconnect?: (session: Session, path: string, params: Record) => void | Promise; diff --git a/test/session.test.ts b/test/session.test.ts index 1c8e79a..2f7ef48 100644 --- a/test/session.test.ts +++ b/test/session.test.ts @@ -87,6 +87,101 @@ describe.concurrent('Session', () => { expect(session.isOpen).toBe(false); }); + it('complete() sends a final event, writes the session id to the realm completion store, and closes', async () => { + const writes: string[] = []; + const mockRes = { + writeHead: () => {}, + write: (chunk: string) => { + writes.push(chunk); + + return true; + }, + end: () => {}, + } as any; + + const stored: string[] = []; + const completionStore = { + set: async (id: string) => { + stored.push(id); + }, + }; + + const session = new Session({ + request: { + headers: {}, + raw: { req: { socket: {} }, res: mockRes }, + route: { realm: { plugins: { '@hapi/sse': { completionStore } } } }, + } as any, + retry: null, + keepAlive: false, + headers: {}, + }); + + expect(session.id).toMatch(/^[0-9a-f-]{36}$/); + + session.initialize(); + await session.complete(); + + const final = writes.at(-1)!; + + expect(final).toContain('event: complete'); + expect(final).toContain(`id: ${session.id}`); + expect(final).toContain('data: {"complete":true}'); + expect(stored).toEqual([session.id]); + expect(session.isOpen).toBe(false); + }); + + it('complete() is a no-op when the session was never initialized', async () => { + const stored: string[] = []; + const completionStore = { + set: async (id: string) => { + stored.push(id); + }, + }; + + const session = new Session({ + request: { + headers: {}, + raw: { req: { socket: {} }, res: { writeHead: () => {}, write: () => true, end: () => {} } as any }, + route: { realm: { plugins: { '@hapi/sse': { completionStore } } } }, + } as any, + retry: null, + keepAlive: false, + headers: {}, + }); + + await session.complete(); + + expect(stored).toEqual([]); + expect(session.isOpen).toBe(true); + }); + + it('complete() is a no-op when the session is already closed', async () => { + const stored: string[] = []; + const completionStore = { + set: async (id: string) => { + stored.push(id); + }, + }; + + const session = new Session({ + request: { + headers: {}, + raw: { req: { socket: {} }, res: { writeHead: () => {}, write: () => true, end: () => {} } as any }, + route: { realm: { plugins: { '@hapi/sse': { completionStore } } } }, + } as any, + retry: null, + keepAlive: false, + headers: {}, + }); + + session.initialize(); + session.close(); + await session.complete(); + + expect(stored).toEqual([]); + }); + it('uses the first event ID when the header is an array of IDs', async () => { const mockRequest = { headers: { 'last-event-id': ['id1', 'id2'] }, diff --git a/test/sse.test.ts b/test/sse.test.ts index dda285f..6591560 100644 --- a/test/sse.test.ts +++ b/test/sse.test.ts @@ -2,6 +2,7 @@ import * as timers from 'node:timers/promises'; import { expect, describe, it } from 'vitest'; import http from 'node:http'; import net from 'node:net'; +import { EventSource } from 'eventsource'; import Hapi from '@hapi/hapi'; import Boom from '@hapi/boom'; @@ -150,6 +151,263 @@ describe.concurrent('SSE Plugin', () => { expect(result.headers['content-type']).toContain('application/json'); }); + it('refuse calculator returning true responds 204 before any session is created', async ({ onTestFinished }) => { + const server = Hapi.server({ port: 0 }); + onTestFinished(() => server.stop()); + await server.register({ plugin: SsePlugin }); + + let onSubscribeCalled = false; + + server.sse.subscription('/events', { + refuse: () => true, + onSubscribe: () => { + onSubscribeCalled = true; + }, + }); + + await server.start(); + + const result = await collectSse(`http://localhost:${server.info.port}/events`, { timeout: 500 }); + + expect(result.status).toBe(204); + expect(result.events.length).toBe(0); + expect(result.headers['content-type']).toBeUndefined(); + expect(onSubscribeCalled).toBe(false); + expect(server.sse.sessionCount).toBe(0); + expect(server.sse.stats().totalConnections).toBe(0); + }); + + it('refuse calculator returning false allows the connection to proceed', async ({ onTestFinished }) => { + const server = Hapi.server({ port: 0 }); + onTestFinished(() => server.stop()); + await server.register({ plugin: SsePlugin, options: { retry: null, keepAlive: false } }); + + server.sse.subscription('/events', { + refuse: () => false, + }); + + await server.start(); + const port = server.info.port; + + const promise = collectSse(`http://localhost:${port}/events`, { maxEvents: 1, timeout: 500 }); + await timers.setTimeout(50); + await server.sse.publish('/events', { ok: true }, { event: 'msg' }); + const { status, events } = await promise; + + expect(status).toBe(200); + expect(events.length).toBe(1); + }); + + it('refuse calculator receives the request and can be async', async ({ onTestFinished }) => { + const server = Hapi.server({ port: 0 }); + onTestFinished(() => server.stop()); + await server.register({ plugin: SsePlugin }); + + server.sse.subscription('/events', { + refuse: async (request) => { + await timers.setTimeout(10); + + return request.headers['x-shutdown'] === 'true'; + }, + }); + + await server.start(); + const port = server.info.port; + + const blocked = await collectSse(`http://localhost:${port}/events`, { + timeout: 500, + headers: { 'x-shutdown': 'true' }, + }); + expect(blocked.status).toBe(204); + }); + + it('completion cache can be overridden with a named server cache', async ({ onTestFinished }) => { + const CatboxMemory = (await import('@hapi/catbox-memory')).Engine; + + const server = Hapi.server({ + port: 0, + cache: [ + { + name: 'my-named-cache', + provider: { constructor: CatboxMemory, options: { maxByteSize: 1024 * 1024 } }, + }, + ], + }); + onTestFinished(() => server.stop()); + + await server.register({ + plugin: SsePlugin, + options: { + retry: null, + keepAlive: false, + completion: { + cache: 'my-named-cache', + segment: 'custom-segment', + expiresIn: 60_000, + }, + }, + }); + + server.sse.subscription('/events'); + + await server.start(); + const port = server.info.port; + + // Verify completion still works end-to-end via the named cache + const tokenPromise = new Promise((resolve) => { + const req = http.get(`http://localhost:${port}/events`, (res) => { + let data = ''; + res.setEncoding('utf8'); + res.on('data', (chunk: string) => { + data += chunk; + }); + res.on('end', () => { + req.destroy(); + const idMatch = data.match(/^id:\s*(.+)$/m); + + resolve(idMatch ? idMatch[1].trim() : ''); + }); + }); + }); + + await timers.setTimeout(50); + await server.sse.eachSession((session) => session.complete()); + + const token = await tokenPromise; + expect(token).toMatch(/[0-9a-f-]{36}/); + + const reconnect = await collectSse(`http://localhost:${port}/events`, { + timeout: 500, + headers: { 'Last-Event-ID': token }, + }); + expect(reconnect.status).toBe(204); + }); + + it('session.complete() sends a 204 to the next reconnect via Last-Event-ID', async ({ onTestFinished }) => { + const server = Hapi.server({ port: 0 }); + onTestFinished(() => server.stop()); + await server.register({ plugin: SsePlugin, options: { retry: null, keepAlive: false } }); + + server.sse.subscription('/events'); + + await server.start(); + const port = server.info.port; + + const captured: string[] = []; + + const firstConnect = new Promise((resolve) => { + const req = http.get(`http://localhost:${port}/events`, (res) => { + let data = ''; + res.setEncoding('utf8'); + res.on('data', (chunk: string) => { + data += chunk; + }); + res.on('end', () => { + captured.push(data); + req.destroy(); + + const idMatch = data.match(/^id:\s*(.+)$/m); + + resolve(idMatch ? idMatch[1].trim() : ''); + }); + }); + }); + + await timers.setTimeout(50); + await server.sse.eachSession((session) => session.complete()); + + const completionToken = await firstConnect; + expect(completionToken).toMatch(/[0-9a-f-]{36}/); + + const reconnect = await collectSse(`http://localhost:${port}/events`, { + timeout: 500, + headers: { 'Last-Event-ID': completionToken }, + }); + + expect(reconnect.status).toBe(204); + + // Token is consumed — a second reconnect with the same token streams normally + const secondReconnectPromise = collectSse(`http://localhost:${port}/events`, { + maxEvents: 1, + timeout: 500, + headers: { 'Last-Event-ID': completionToken }, + }); + await timers.setTimeout(50); + await server.sse.publish('/events', { ok: true }, { event: 'msg' }); + const secondReconnect = await secondReconnectPromise; + expect(secondReconnect.status).toBe(200); + }); + + it('real EventSource client reconnects after server closes the stream', async ({ onTestFinished }) => { + const server = Hapi.server({ port: 0 }); + onTestFinished(() => server.stop()); + await server.register({ plugin: SsePlugin, options: { retry: 100, keepAlive: false } }); + + let onSubscribeCount = 0; + + server.sse.subscription('/events', { + onSubscribe: () => { + onSubscribeCount++; + }, + }); + + await server.start(); + + const es = new EventSource(`http://localhost:${server.info.port}/events`); + onTestFinished(() => es.close()); + + await new Promise((resolve) => es.addEventListener('open', () => resolve(), { once: true })); + expect(onSubscribeCount).toBe(1); + + await server.sse.eachSession((session) => session.close()); + + await new Promise((resolve) => es.addEventListener('open', () => resolve(), { once: true })); + + expect(onSubscribeCount).toBe(2); + expect(es.readyState).toBe(EventSource.OPEN); + }); + + it('real EventSource client stops reconnecting after session.complete() runs', async ({ onTestFinished }) => { + const server = Hapi.server({ port: 0 }); + onTestFinished(() => server.stop()); + await server.register({ plugin: SsePlugin, options: { retry: 100, keepAlive: false } }); + + let onSubscribeCount = 0; + + server.sse.subscription('/events', { + onSubscribe: () => { + onSubscribeCount++; + }, + }); + + await server.start(); + + const es = new EventSource(`http://localhost:${server.info.port}/events`); + onTestFinished(() => es.close()); + + await new Promise((resolve) => es.addEventListener('open', () => resolve(), { once: true })); + + await server.sse.eachSession((session) => session.complete()); + + await new Promise((resolve) => { + const check = () => { + if (es.readyState === EventSource.CLOSED) { + resolve(); + } else { + setTimeout(check, 25); + } + }; + + check(); + }); + + const closedAt = onSubscribeCount; + await timers.setTimeout(300); + // Reconnect was 204'd at the completion check, before reaching onSubscribe + expect(onSubscribeCount).toBe(closedAt); + expect(es.readyState).toBe(EventSource.CLOSED); + }); + it('publish delivers to subscribers', async ({ onTestFinished }) => { const server = Hapi.server({ port: 0 }); onTestFinished(() => server.stop()); From 642ef61e66590ce545e9ecc1676660c283f4f4b9 Mon Sep 17 00:00:00 2001 From: Danilo Alonso Date: Sat, 25 Apr 2026 16:38:19 -0400 Subject: [PATCH 2/2] test: cover all branches to satisfy 100% coverage gate - Drop unreachable defensive check in #onKeepAlive: the keep-alive timer is cleared in close() before #closed flips, so the callback cannot run after close. - Extract readLastEventId helper and reuse it in the subscription handler so the existing array-header unit test covers the parsing branch. - Add a test for session.close() called from onSubscribe (the remaining path that hits the !session.isOpen short-circuit before initialize). --- src/session.ts | 15 +++++++-------- src/sse.ts | 5 ++--- test/sse.test.ts | 39 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 11 deletions(-) diff --git a/src/session.ts b/src/session.ts index 10dc7be..6b92673 100644 --- a/src/session.ts +++ b/src/session.ts @@ -18,6 +18,12 @@ export interface SessionOptions { maxDuration?: number; } +export const readLastEventId = (request: Request): string => { + const raw = request.headers['last-event-id']; + + return ((Array.isArray(raw) ? raw[0] : raw) ?? '').replace(/[\x00-\x1f]/g, ''); +}; + export class Session { readonly id: string = randomUUID(); readonly request: Request; @@ -51,10 +57,7 @@ export class Session { constructor(options: SessionOptions) { this.request = options.request; this.connectedAt = Date.now(); - - const rawId = options.request.headers['last-event-id']; - - this.lastEventId = ((Array.isArray(rawId) ? rawId[0] : rawId) ?? '').replace(/[\x00-\x1f]/g, ''); + this.lastEventId = readLastEventId(options.request); this.#res = options.request.raw.res; this.#buffer = new EventBuffer(); this.#retry = options.retry; @@ -129,10 +132,6 @@ export class Session { /** @internal */ #onKeepAlive(): void { - if (this.#closed) { - return; - } - this.#buffer.comment(); this.#buffer.dispatch(); this.#flush(); diff --git a/src/sse.ts b/src/sse.ts index 83213d0..e448371 100644 --- a/src/sse.ts +++ b/src/sse.ts @@ -4,7 +4,7 @@ import * as Hoek from '@hapi/hoek'; import Joi from 'joi'; import { createRequire } from 'node:module'; -import { Session } from './session.js'; +import { Session, readLastEventId } from './session.js'; import type { BackpressureOptions } from './session.js'; import { SubscriptionRegistry } from './subscription.js'; @@ -230,8 +230,7 @@ export const SsePlugin: NamedPlugin = { return h.abandon; } - const lastEventId = request.headers['last-event-id']; - const incomingId = Array.isArray(lastEventId) ? lastEventId[0] : lastEventId; + const incomingId = readLastEventId(request); if (incomingId && (await completionStore.get(incomingId))) { await completionStore.drop(incomingId); diff --git a/test/sse.test.ts b/test/sse.test.ts index 6591560..1103a26 100644 --- a/test/sse.test.ts +++ b/test/sse.test.ts @@ -221,6 +221,45 @@ describe.concurrent('SSE Plugin', () => { expect(blocked.status).toBe(204); }); + it('onSubscribe calling session.close() abandons the request without setup', async ({ onTestFinished }) => { + let onSessionFired = false; + let onUnsubscribeFired = false; + + const server = Hapi.server({ port: 0 }); + onTestFinished(() => server.stop()); + await server.register({ + plugin: SsePlugin, + options: { + retry: null, + keepAlive: false, + hooks: { + onSession: () => { + onSessionFired = true; + }, + }, + }, + }); + + server.sse.subscription('/events', { + onSubscribe: (session) => { + session.close(); + }, + onUnsubscribe: () => { + onUnsubscribeFired = true; + }, + }); + + await server.start(); + + const result = await collectSse(`http://localhost:${server.info.port}/events`, { timeout: 500 }); + + expect(result.events.length).toBe(0); + expect(server.sse.sessionCount).toBe(0); + expect(server.sse.stats().totalConnections).toBe(0); + expect(onSessionFired).toBe(false); + expect(onUnsubscribeFired).toBe(false); + }); + it('completion cache can be overridden with a named server cache', async ({ onTestFinished }) => { const CatboxMemory = (await import('@hapi/catbox-memory')).Engine;