Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion API.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ await server.register({
headers: { 'X-Custom': 'value' }, // extra headers on every SSE response
backpressure: { maxBytes: 65536, strategy: 'drop' }, // optional
hooks: { ... }, // optional, see Hooks section
completion: { // optional, see Stream completion section
cache: 'my-redis-cache', // named cache engine (default: hapi default)
segment: 'completed-sessions', // segment name
expiresIn: 5 * 60 * 1000, // TTL for completion tokens (default: 5 min)
},
},
});
```
Expand All @@ -54,6 +59,7 @@ server.sse.subscription('/chat/{room}', {
auth: 'jwt',
retry: 5000,
keepAlive: { interval: 10_000 },
refuse: (request) => server.app.shuttingDown || circuitBreaker.isOpen(),
filter: async (path, message, { credentials, params, internal }) => {
if (params.room !== internal.targetRoom) {
return false; // don't deliver
Expand All @@ -74,6 +80,7 @@ server.sse.subscription('/chat/{room}', {
| `auth` | `RouteOptions['auth']` | hapi auth config for the route |
| `retry` | `number \| null` | Override plugin-level retry |
| `keepAlive` | `{ interval: number } \| false` | Override plugin-level keep-alive |
| `refuse` | `(request) => boolean \| Promise<boolean>` | Server-state predicate. Runs before the session is created. Returning `true` responds with `204 No Content`, telling the EventSource not to reconnect. |
| `filter` | `(path, message, opts) => boolean \| { override } \| Promise<...>` | Per-session delivery filter |
| `onSubscribe` | `(session, path, params) => void \| Promise<void>` | Fires before SSE headers are sent. Throwing a Boom error returns that HTTP error to the client. |
| `onUnsubscribe` | `(session, path, params) => void` | Fires on client disconnect |
Expand Down Expand Up @@ -189,13 +196,63 @@ The `Session` object represents a single SSE connection.
```typescript
session.push(data, event?, id?) // Send an event. Returns boolean (false if dropped/closed).
session.comment(text?) // Send a comment (invisible to EventSource)
session.close() // End the connection
session.close() // End the connection (client will reconnect)
session.complete() // Mark the stream as done; next reconnect gets HTTP 204
session.id // UUID generated at construction; used as the completion token
session.isOpen // true if connection is still active
session.connectedAt // Unix timestamp (ms) when the session was created
session.lastEventId // Value of Last-Event-ID header (empty string if absent)
session.request // The original hapi Request object
```

**Stream completion** — when the stream has done its work and the client should not reconnect, call `session.complete()` instead of `session.close()`:

```typescript
server.route({
method: 'GET',
path: '/jobs/{id}/progress',
handler: {
sse: {
stream: async (request, session) => {
for await (const update of jobProgress(request.params.id)) {
session.push(update, 'progress', update.eventId);
}
await session.complete(); // emits a final event with a token id, closes the stream
},
},
},
});
```

`complete()` writes a final SSE event of type `complete` with `session.id` as its `id` field. The browser saves that UUID as its `Last-Event-ID`. If the EventSource reconnects, the plugin sees the token in the `Last-Event-ID` header and responds `204 No Content`. Per the [WHATWG SSE spec](https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events-intro), a 204 response makes the EventSource stop reconnecting.

Clients can react to completion via `eventSource.addEventListener('complete', handler)`.

**Completion cache** — tokens live in a hapi server cache (default: in-process catbox-memory, 5 minute TTL). Each token is consumed on first redemption. Use the `completion` plugin option to customize:

```typescript
const server = Hapi.server({
port: 3000,
cache: [
{
name: 'redis-cache',
provider: { constructor: require('@hapi/catbox-redis'), options: { host: '127.0.0.1' } },
},
],
});

await server.register({
plugin: SsePlugin,
options: {
completion: {
cache: 'redis-cache', // share completion state across processes
segment: 'sse-completions',
expiresIn: 10 * 60 * 1000, // 10 minutes
},
},
});
```

**Metadata** — attach arbitrary key-value data to a session:

```typescript
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"@hapi/hapi": "^21.4.7",
"@types/node": "^22.19.15",
"@vitest/coverage-v8": "^4.1.0",
"eventsource": "^4.1.0",
"tsdown": "^0.21.4",
"typescript": "^5.9.3",
"typescript-eslint": "^8.57.1",
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ export type {
SubscriptionConfig,
SubscriptionInfo,
FilterOptions,
CompletionCacheOptions,
CompletionStore,
} from './sse.js';
export type { Replayer, ReplayEntry } from './replayer.js';
export { FiniteReplayer, ValidReplayer } from './replayer.js';
42 changes: 34 additions & 8 deletions src/session.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { Request } from '@hapi/hapi';
import type { ServerResponse } from 'node:http';
import { randomUUID } from 'node:crypto';

import { EventBuffer } from './event-buffer.js';

Expand All @@ -17,7 +18,14 @@ export interface SessionOptions {
maxDuration?: number;
}

export const readLastEventId = (request: Request): string => {
const raw = request.headers['last-event-id'];

return ((Array.isArray(raw) ? raw[0] : raw) ?? '').replace(/[\x00-\x1f]/g, '');
};

export class Session {
readonly id: string = randomUUID();
readonly request: Request;
readonly lastEventId: string;
readonly connectedAt: number;
Expand All @@ -43,14 +51,13 @@ export class Session {
#maxDurationTimer: ReturnType<typeof setTimeout> | null = null;
/** @internal */
#closed = false;
/** @internal */
#initialized = false;

constructor(options: SessionOptions) {
this.request = options.request;
this.connectedAt = Date.now();

const rawId = options.request.headers['last-event-id'];

this.lastEventId = ((Array.isArray(rawId) ? rawId[0] : rawId) ?? '').replace(/[\x00-\x1f]/g, '');
this.lastEventId = readLastEventId(options.request);
this.#res = options.request.raw.res;
this.#buffer = new EventBuffer();
this.#retry = options.retry;
Expand Down Expand Up @@ -81,6 +88,12 @@ export class Session {
}

initialize(): void {
if (this.#closed || this.#initialized) {
return;
}

this.#initialized = true;

this.#res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Expand Down Expand Up @@ -119,10 +132,6 @@ export class Session {

/** @internal */
#onKeepAlive(): void {
if (this.#closed) {
return;
}

this.#buffer.comment();
this.#buffer.dispatch();
this.#flush();
Expand Down Expand Up @@ -165,6 +174,23 @@ export class Session {
this.#flush();
}

async complete(): Promise<void> {
if (this.#closed || !this.#initialized) {
return;
}

this.#buffer.push({ complete: true }, 'complete', this.id);
this.#flush();

const store = this.request.route.realm.plugins['@hapi/sse']?.completionStore;

if (store) {
await store.set(this.id, true, 0);
}

this.close();
}

close(): void {
if (this.#closed) {
return;
Expand Down
60 changes: 58 additions & 2 deletions src/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as Hoek from '@hapi/hoek';
import Joi from 'joi';
import { createRequire } from 'node:module';

import { Session } from './session.js';
import { Session, readLastEventId } from './session.js';
import type { BackpressureOptions } from './session.js';
import { SubscriptionRegistry } from './subscription.js';

Expand All @@ -19,12 +19,23 @@ export interface SseHooks {
onPublish?: (path: string, data: unknown, deliveryCount: number) => void;
}

export interface CompletionCacheOptions {
cache?: string;
segment?: string;
expiresIn?: number;
}

export interface CompletionStore {
set(id: string, value: boolean, ttl: number): Promise<void>;
}

export interface SsePluginOptions {
keepAlive?: { interval: number } | false;
retry?: number | null;
headers?: Record<string, string>;
hooks?: SseHooks;
backpressure?: BackpressureOptions;
completion?: CompletionCacheOptions;
}

export interface SseHandlerOptions {
Expand Down Expand Up @@ -68,6 +79,12 @@ declare module '@hapi/hapi' {
interface HandlerDecorations {
sse?: SseHandlerOptions;
}

interface PluginsStates {
'@hapi/sse'?: {
completionStore: CompletionStore;
};
}
}

const RETRY_FLOOR = 1000;
Expand All @@ -76,7 +93,12 @@ const clampRetry = (value: number | null): number | null => {
return value === null ? null : Math.max(value, RETRY_FLOOR);
};

const defaults: Required<Omit<SsePluginOptions, 'hooks' | 'backpressure'>> = {
const COMPLETION_DEFAULTS: CompletionCacheOptions = {
segment: 'completed-sse-sessions',
expiresIn: 5 * 60 * 1000,
};

const defaults: Required<Omit<SsePluginOptions, 'hooks' | 'backpressure' | 'completion'>> = {
keepAlive: { interval: 15_000 },
retry: 2000,
headers: {},
Expand All @@ -102,12 +124,19 @@ const hooksSchema = Joi.object({
onPublish: Joi.function(),
});

const completionSchema = Joi.object({
cache: Joi.string(),
segment: Joi.string(),
expiresIn: Joi.number().integer().positive(),
});

const pluginOptionsSchema = Joi.object({
keepAlive: keepAliveSchema,
retry: retrySchema,
headers: headersSchema,
hooks: hooksSchema,
backpressure: backpressureSchema,
completion: completionSchema,
}).label('SsePluginOptions');

const replayerSchema = Joi.object({
Expand All @@ -121,6 +150,7 @@ const replayerSchema = Joi.object({
const subscriptionConfigSchema = Joi.object({
auth: Joi.any(),
filter: Joi.function(),
refuse: Joi.function(),
onSubscribe: Joi.function(),
onUnsubscribe: Joi.function(),
onReconnect: Joi.function(),
Expand Down Expand Up @@ -150,6 +180,11 @@ export const SsePlugin: NamedPlugin<SsePluginOptions> = {
const registry = new SubscriptionRegistry();
const hooks = options.hooks;

const completion = { ...COMPLETION_DEFAULTS, ...options.completion };
const completionStore = server.cache<boolean>(completion);

server.realm.plugins['@hapi/sse'] = { completionStore };

let totalConnections = 0;
let totalDisconnections = 0;
let totalPublishes = 0;
Expand Down Expand Up @@ -188,6 +223,23 @@ export const SsePlugin: NamedPlugin<SsePluginOptions> = {
handler: async (request: Request, h: ResponseToolkit) => {
const matched = registry.matchPath(request.path)!;

if (subConfig.refuse && (await subConfig.refuse(request))) {
request.raw.res.writeHead(204);
request.raw.res.end();

return h.abandon;
}

const incomingId = readLastEventId(request);

if (incomingId && (await completionStore.get(incomingId))) {
await completionStore.drop(incomingId);
request.raw.res.writeHead(204);
request.raw.res.end();

return h.abandon;
}

const maxSessions = subConfig.maxSessions;

if (maxSessions && registry.subscriptionSessionCount(matched.pattern) >= maxSessions) {
Expand All @@ -207,6 +259,10 @@ export const SsePlugin: NamedPlugin<SsePluginOptions> = {
await subConfig.onSubscribe(session, request.path, matched.params);
}

if (!session.isOpen) {
return h.abandon;
}

session.initialize();
registry.addSession(matched.pattern, session, matched.params, request.path);
totalConnections++;
Expand Down
3 changes: 2 additions & 1 deletion src/subscription.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { RouteOptions } from '@hapi/hapi';
import type { Request, RouteOptions } from '@hapi/hapi';

import { Session } from './session.js';
import type { Replayer } from './replayer.js';
Expand All @@ -16,6 +16,7 @@ export interface SubscriptionConfig<T = unknown> {
message: T,
options: FilterOptions,
) => boolean | { override: unknown } | Promise<boolean | { override: unknown }>;
refuse?: (request: Request) => boolean | Promise<boolean>;
onSubscribe?: (session: Session, path: string, params: Record<string, string>) => void | Promise<void>;
onUnsubscribe?: (session: Session, path: string, params: Record<string, string>) => void;
onReconnect?: (session: Session, path: string, params: Record<string, string>) => void | Promise<void>;
Expand Down
Loading