Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci-module.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [20, 22]
node-version: [22, 24, latest]
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
- uses: actions/checkout@v6
- uses: actions/setup-node@v6
with:
node-version: ${{ matrix.node-version }}
- run: npm install
Expand Down
93 changes: 58 additions & 35 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ const server = Hapi.server({ port: 3000 });

await server.register({ plugin: SsePlugin });

server.sse.subscription('/events');
server.sse.subscription('/chat/{room}');

await server.start();

// Publish from anywhere
await server.sse.publish('/events', { msg: 'hello' }, { event: 'chat' });
await server.sse.publish('/chat/general', { text: 'hello', user: 'alice' }, { event: 'message' });
```

## Plugin Options
Expand All @@ -50,12 +50,12 @@ await server.register({
Registers a subscription route. Clients connect via `GET <path>`.

```typescript
server.sse.subscription('/events/{channel}', {
server.sse.subscription('/chat/{room}', {
auth: 'jwt',
retry: 5000,
keepAlive: { interval: 10_000 },
filter: async (path, message, { credentials, params, internal }) => {
if (params.channel !== internal.targetChannel) {
if (params.room !== internal.targetRoom) {
return false; // don't deliver
}
return { override: { ...message, filtered: true } }; // or transform
Expand All @@ -79,19 +79,21 @@ server.sse.subscription('/events/{channel}', {
| `onUnsubscribe` | `(session, path, params) => void` | Fires on client disconnect |
| `onReconnect` | `(session, path, params) => void \| Promise<void>` | Fires when `Last-Event-ID` is present (after replay). Errors close the session gracefully. |
| `replay` | `Replayer` | Replay provider for automatic reconnection replay |
| `maxSessions` | `number` | Maximum concurrent sessions for this subscription. Excess connections receive a 503 response. |
| `maxDuration` | `number` | Maximum connection lifetime in ms. Sessions are closed after this duration (with ±10% jitter to prevent thundering herd reconnections). A `: session expired` comment is sent before closing. |

### `server.sse.publish(path, data, opts?)`

Publishes an event to all matching subscribers. Returns the number of sessions that received the event.

```typescript
const delivered = await server.sse.publish(
'/events/news',
{ headline: '...' },
'/chat/general',
{ text: 'hello everyone', user: 'alice' },
{
event: 'breaking',
id: 'evt-42',
internal: { targetChannel: 'news' }, // passed to filter
event: 'message',
id: 'msg-42',
internal: { targetRoom: 'general' }, // passed to filter
matchMode: 'literal', // 'pattern' (default) or 'literal'
},
);
Expand All @@ -101,8 +103,8 @@ console.log(`Delivered to ${delivered} sessions`);

**`matchMode`:**

- `'pattern'` (default) — delivers to all sessions on a matching subscription pattern (e.g. `/events/{channel}`)
- `'literal'` — only delivers to sessions whose actual connected path equals `path` exactly. Useful for parameterized subscriptions where you want to target `/events/news` but not `/events/sport`.
- `'pattern'` (default) — delivers to all sessions on a matching subscription pattern (e.g. `/chat/{room}`)
- `'literal'` — only delivers to sessions whose actual connected path equals `path` exactly. Useful for parameterized subscriptions where you want to target `/chat/general` but not `/chat/random`.

**Note:** Only events published with an explicit `id` are recorded by the replayer. Events without an `id` are delivered but not stored for replay.

Expand All @@ -111,7 +113,10 @@ console.log(`Delivered to ${delivered} sessions`);
Sends an event to every connected session across all subscriptions. Returns the delivery count.

```typescript
const count = await server.sse.broadcast({ type: 'maintenance' }, { event: 'system' });
const count = await server.sse.broadcast(
{ text: 'Server restarting in 5 minutes', user: 'system' },
{ event: 'system' },
);
```

### `server.sse.eachSession(fn, opts?)`
Expand All @@ -121,9 +126,9 @@ Iterates over connected sessions. Optionally filter by subscription pattern.
```typescript
await server.sse.eachSession(
async (session) => {
session.push({ ping: true });
session.push({ text: 'ping', user: 'system' });
},
{ subscription: '/events' },
{ subscription: '/chat/{room}' },
);
```

Expand All @@ -133,16 +138,15 @@ Returns a snapshot of all registered subscriptions with active session counts.

```typescript
const subs = server.sse.subscriptions();
// [{ pattern: '/events', activeSessions: 3 }, { pattern: '/chat/{room}', activeSessions: 12 }]
// [{ pattern: '/chat/{room}', activeSessions: 12 }]
```

### `server.sse.closeSessions(pattern)`

Closes all sessions for a specific subscription pattern.

```typescript
server.sse.closeSessions('/events'); // close all /events sessions
server.sse.closeSessions('/chat/{room}'); // close all chat sessions
server.sse.closeSessions('/chat/{room}');
```

### `server.sse.sessionCount`
Expand Down Expand Up @@ -195,8 +199,8 @@ session.request // The original hapi Request object
**Metadata** — attach arbitrary key-value data to a session:

```typescript
session.set('userId', 42);
session.get('userId'); // 42
session.set('userId', 'alice');
session.get('userId'); // 'alice'
session.has('userId'); // true
session.delete('userId'); // true
```
Expand All @@ -205,23 +209,23 @@ Metadata persists for the lifetime of the session. Useful for tagging sessions i

## Custom Handler Mode

For full control over the stream (e.g. AI token streaming), use the handler decorator instead of subscriptions:
For full control over the stream (e.g. AI-assisted chat responses), use the handler decorator instead of subscriptions:

```typescript
server.route({
method: 'GET',
path: '/stream',
path: '/chat/{room}/ai',
handler: {
sse: {
stream: async (request, session) => {
for (const token of tokens) {
session.push({ token }, 'token');
session.push({ token, user: 'assistant' }, 'token');
}
session.close();
},
retry: 3000, // override plugin-level retry
keepAlive: { interval: 10_000 }, // override plugin-level keep-alive
headers: { 'X-Stream': 'true' }, // override plugin-level headers
headers: { 'X-Chat-Bot': 'true' }, // override plugin-level headers
backpressure: { maxBytes: 32768, strategy: 'close' },
},
},
Expand All @@ -237,40 +241,41 @@ server.route({
| `keepAlive` | `{ interval: number } \| false` | Override plugin-level keep-alive (default: inherits from plugin) |
| `headers` | `Record<string, string>` | Override plugin-level headers (default: inherits from plugin) |
| `backpressure` | `BackpressureOptions` | Override plugin-level backpressure (default: inherits from plugin) |
| `maxDuration` | `number` | Maximum connection lifetime in ms (with ±10% jitter). Sends a comment before closing. |

## Event Replay

Automatic replay of missed events on client reconnection. When a client sends `Last-Event-ID`, the replayer pushes missed events before `onReconnect` fires.
SSE clients automatically send a `Last-Event-ID` header when reconnecting after a dropped connection. When a replayer is configured, the plugin uses that ID to find where the client left off and pushes any events published after it — so the client catches up on what it missed while disconnected.

Only events published with an explicit `id` are recorded. Events without an `id` are delivered but not stored for replay — this prevents the buffer from filling with unaddressable entries.

Two built-in replayers:

### FiniteReplayer

Fixed-size ring buffer. O(1) append, linear scan for replay.
Keeps the last N events in a fixed-size ring buffer. When full, the oldest entry is dropped to make room. Memory usage is predictable — bounded by `size`.

```typescript
import { FiniteReplayer } from '@hapi/sse';

const replayer = new FiniteReplayer({ size: 100, autoId: true });

server.sse.subscription('/events', { replay: replayer });
server.sse.subscription('/chat/{room}', { replay: replayer });
```

### ValidReplayer

Time-based expiry with periodic garbage collection.
Keeps events for a fixed duration. A periodic cleanup timer removes expired entries, so memory usage varies with publish rate but replayed events are never older than `ttl`.

```typescript
import { ValidReplayer } from '@hapi/sse';

const replayer = new ValidReplayer({ ttl: 60_000, autoId: true });

server.sse.subscription('/events', { replay: replayer });
server.sse.subscription('/chat/{room}', { replay: replayer });
```

Call `replayer.stop()` to clear the GC timer (handled automatically on server stop).
Call `replayer.stop()` to clear the cleanup timer (handled automatically on server stop).

**Options:**

Expand Down Expand Up @@ -314,7 +319,7 @@ await server.register({
// Handler level — overrides plugin level
server.route({
method: 'GET',
path: '/stream',
path: '/chat/{room}/ai',
handler: {
sse: {
stream: async (req, session) => { ... },
Expand Down Expand Up @@ -343,13 +348,13 @@ await server.register({
options: {
hooks: {
onSession: (session, path, params) => {
console.log(`New connection: ${path}`);
console.log(`Joined: ${path}`);
},
onSessionClose: (session, path, params) => {
console.log(`Disconnected: ${path}`);
console.log(`Left: ${path}`);
},
onPublish: (path, data, deliveryCount) => {
console.log(`Published to ${path}: ${deliveryCount} recipients`);
console.log(`Message in ${path}: ${deliveryCount} recipients`);
},
},
},
Expand All @@ -368,16 +373,34 @@ interface ChatMessage {
user: string;
}

server.sse.subscription<ChatMessage>('/chat', {
server.sse.subscription<ChatMessage>('/chat/{room}', {
filter: (path, message) => {
// message is typed as ChatMessage
return message.user !== 'blocked';
},
});

await server.sse.publish<ChatMessage>('/chat', { text: 'hi', user: 'alice' });
await server.sse.publish<ChatMessage>('/chat/general', { text: 'hello', user: 'alice' });
```

## Security

The plugin includes several built-in defenses against known SSE attack vectors:

**Retry floor** — The `retry` value is silently clamped to a minimum of 1000ms. This prevents reconnection storm attacks where a malicious or misconfigured `retry: 0` causes clients to reconnect thousands of times per second. Setting `retry: null` disables the retry field entirely (no clamping).

**Last-Event-ID sanitization** — Control characters (`\x00`–`\x1f`) are stripped from the incoming `Last-Event-ID` header. This prevents null byte injection and CRLF attacks via the reconnection header.

**Connection limiting** — Use `maxSessions` on subscriptions to cap concurrent connections. Excess connections receive an HTTP 503 response before SSE headers are sent, preventing connection exhaustion.

**Connection TTL** — Use `maxDuration` to enforce a maximum connection lifetime. A ±10% jitter is applied to prevent thundering herd reconnections when many clients connect at the same time. Clients automatically reconnect via the standard SSE reconnection mechanism.

**CRLF injection protection** — The `EventBuffer` serializer strips or splits newlines in `event` and `id` fields, and splits `data` fields on line terminators. This prevents SSE event injection attacks (CVE-2026-33128, CVE-2026-22735, CVE-2026-29085 pattern).

**Backpressure** — Slow consumers are handled via configurable backpressure strategies (`drop` or `close`), preventing unbounded memory growth from write buffer accumulation.

**Not in scope** — Origin header validation, CSRF protection, and authentication are handled by hapi's auth system and middleware (`onPreAuth` extensions or reverse proxy configuration), not by the SSE plugin.

## Exports

```typescript
Expand Down
31 changes: 17 additions & 14 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
"name": "@hapi/sse",
"version": "0.0.1",
"description": "",
"main": "./dist/index.js",
"types": "./dist/index-BZ6I-Suu.d.ts",
"module": "./dist/index.js",
"types": "./dist/index.d.mts",
"exports": {
".": "./dist/index.js",
".": "./dist/index.mjs",
"./package.json": "./package.json"
},
"type": "module",
Expand All @@ -16,9 +14,10 @@
"LICENSE.md"
],
"scripts": {
"test": "vitest run --coverage",
"lint": "tsc --noEmit && eslint .",
"build": "tsdown"
"test": "vitest run",
"lint": "eslint .",
"build": "tsdown",
"prepublishOnly": "node --run build"
},
"keywords": [],
"author": "Danilo Alonso",
Expand All @@ -29,12 +28,16 @@
},
"devDependencies": {
"@hapi/boom": "^10.0.1",
"@hapi/hapi": "^21.4.6",
"@types/node": "^22",
"@vitest/coverage-v8": "^3.2.4",
"tsdown": "^0.13.3",
"typescript": "^5.8.2",
"typescript-eslint": "^8.11.0",
"vitest": "^3.2.4"
"@hapi/hapi": "^21.4.7",
"@types/node": "^22.19.15",
"@vitest/coverage-v8": "^4.1.0",
"tsdown": "^0.21.4",
"typescript": "^5.9.3",
"typescript-eslint": "^8.57.1",
"vitest": "^4.1.0"
},
"dependencies": {
"@hapi/hoek": "^11.0.7",
"joi": "^18.1.2"
}
}
1 change: 1 addition & 0 deletions src/event-buffer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export class EventBuffer {
/** @internal */
#buffer = '';

data(value: unknown): this {
Expand Down
14 changes: 7 additions & 7 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export { EventBuffer } from './event-buffer.ts';
export { Session } from './session.ts';
export type { BackpressureOptions } from './session.ts';
export { SsePlugin } from './sse.ts';
export { EventBuffer } from './event-buffer.js';
export { Session } from './session.js';
export type { BackpressureOptions } from './session.js';
export { SsePlugin } from './sse.js';
export type {
SsePluginOptions,
SseApi,
Expand All @@ -11,6 +11,6 @@ export type {
SubscriptionConfig,
SubscriptionInfo,
FilterOptions,
} from './sse.ts';
export type { Replayer, ReplayEntry } from './replayer.ts';
export { FiniteReplayer, ValidReplayer } from './replayer.ts';
} from './sse.js';
export type { Replayer, ReplayEntry } from './replayer.js';
export { FiniteReplayer, ValidReplayer } from './replayer.js';
Loading