From 08cfcccacd3dcf8a01b771dfcf0805414427424e Mon Sep 17 00:00:00 2001 From: Danel Date: Sun, 28 Jun 2026 22:29:03 -0500 Subject: [PATCH] feat(res): add res.sse() for Server-Sent Events Add a res.sse() helper that opens a text/event-stream response and returns a stream with send(), stream(), comment() and close(). send() serializes data like the rest of the framework (strings as-is, Buffer/TypedArray as UTF-8, objects as JSON) and supports event/id/retry. stream() emits one event per chunk of any web ReadableStream, Node Readable, Blob or async iterable, unifying them into a Node Readable (web streams and Blobs via Readable.fromWeb(), Node streams and async iterables directly). A keep-alive heartbeat is started by default and cleared automatically on disconnect via on-finished. No new dependencies are added. --- examples/sse/index.js | 40 ++++++ examples/sse/public/index.html | 38 +++++ lib/response.js | 28 +++- lib/sse.js | 254 +++++++++++++++++++++++++++++++++ test/res.sse.js | 229 +++++++++++++++++++++++++++++ 5 files changed, 588 insertions(+), 1 deletion(-) create mode 100644 examples/sse/index.js create mode 100644 examples/sse/public/index.html create mode 100644 lib/sse.js create mode 100644 test/res.sse.js diff --git a/examples/sse/index.js b/examples/sse/index.js new file mode 100644 index 00000000000..dfe6426d454 --- /dev/null +++ b/examples/sse/index.js @@ -0,0 +1,40 @@ +'use strict' + +var express = require('../../'); +var path = require('node:path'); + +var app = module.exports = express(); + +// serve the client page +app.get('/', function (req, res) { + res.sendFile(path.join(__dirname, 'public', 'index.html')); +}); + +/* -server-sent events endpoint + -send unnamend event + -The event sends ticks to the client every second. + -stops sending them when the client disconnects +*/ +app.get('/events', function (req, res) { + var sse = res.sse(); + + sse.send({ message: 'connected', at: new Date().toISOString() }); + + var n = 0; + var timer = setInterval(function () { + sse.send({ now: new Date().toISOString() }, { event: 'tick', id: ++n }); + }, 1000); + + req.on('close', function () { + clearInterval(timer); + sse.close(); + }); +}); + + +if (!module.parent) { + app.listen(3000); + console.log('Express sse-server started on port 3000'); +} + + diff --git a/examples/sse/public/index.html b/examples/sse/public/index.html new file mode 100644 index 00000000000..430a2df1494 --- /dev/null +++ b/examples/sse/public/index.html @@ -0,0 +1,38 @@ + + + + + Express SSE example + + + +

Server-Sent Events

+

Streaming live from /events via the standard EventSource API.

+ + + + + diff --git a/lib/response.js b/lib/response.js index b4755a5c060..9b062921dfb 100644 --- a/lib/response.js +++ b/lib/response.js @@ -29,6 +29,7 @@ var normalizeTypes = require('./utils').normalizeTypes; var setCharset = require('./utils').setCharset; var cookie = require('cookie'); var send = require('send'); +var SSE = require('./sse'); var extname = path.extname; var resolve = path.resolve; var basename = path.basename; @@ -247,6 +248,31 @@ res.json = function json(obj) { return this.send(body); }; +/** + * Open a Server-Sent Events stream on this response. + * + * Writes the `text/event-stream` headers, starts a keep-alive heartbeat and + * returns an `SSE` instance to push events with: + * + * app.get('/events', function (req, res) { + * var sse = res.sse(); + * sse.send({ hello: 'world' }); + * sse.send('tick', { event: 'ping', id: 1 }); + * }); + * + * Options: + * + * - `heartbeat` keep-alive interval in ms (default `15000`, `false` to disable) + * + * @param {object} [options] + * @return {SSE} + * @public + */ + +res.sse = function sse(options) { + return new SSE(this, options); +}; + /** * Send JSON response with JSONP callback support. * @@ -847,7 +873,7 @@ res.redirect = function redirect(url) { html: function(){ var u = escapeHtml(address); body = '' + statuses.message[status] + '' - + '

' + statuses.message[status] + '. Redirecting to ' + u + '

' + + '

' + statuses.message[status] + '. Redirecting to ' + u + '

' }, default: function(){ diff --git a/lib/sse.js b/lib/sse.js new file mode 100644 index 00000000000..a901dc39b15 --- /dev/null +++ b/lib/sse.js @@ -0,0 +1,254 @@ +'use strict'; + +/** + * Module dependencies. + * @private + */ + +var debug = require('debug')('express:sse'); +var onFinished = require('on-finished'); +var Readable = require('node:stream').Readable; +const { Buffer } = require('node:buffer'); + +/** + * Module variables. + * @private + */ + +var DEFAULT_HEARTBEAT = 15000; + +/** + * Module exports. + * @public + */ + +module.exports = SSE; + +/** + * Initialize a new Server-Sent Events stream on the given response. + * + * Opens the stream by writing the `text/event-stream` headers and starts a + * heartbeat that keeps the connection (and any intermediary proxies) alive. + * The heartbeat is cleared automatically once the connection ends. + * + * Options: + * + * - `heartbeat` interval in ms for keep-alive comments (default `15000`, + * `false` to disable) + * + * @param {http.ServerResponse} res + * @param {object} [options] + * @public + */ + +function SSE(res, options) { + var opts = options || {}; + var self = this; + + this.res = res; + this.finished = false; + this._heartbeat = null; + + debug('open stream'); + + // open the event stream + res.statusCode = 200; + res.set({ + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache, no-transform', + 'Connection': 'keep-alive', + 'X-Accel-Buffering': 'no' + }); + + // flush the headers so the client establishes the connection immediately + res.flushHeaders(); + + // keep the connection alive with periodic comments + var heartbeat = opts.heartbeat === undefined ? DEFAULT_HEARTBEAT : opts.heartbeat; + + if (heartbeat) { + this._heartbeat = setInterval(function onHeartbeat() { + self.comment(''); + }, heartbeat); + + // do not let the heartbeat keep the process alive on its own + this._heartbeat.unref(); + } + + // stop the heartbeat once the underlying connection is gone + onFinished(res, function onClose() { + debug('close stream'); + self.finished = true; + self._clearHeartbeat(); + }); +} + +/** + * Send an event with the given `data`. + * + * `data` is serialized the same way the rest of the framework serializes a + * response body: strings are sent as-is, `Buffer`/`TypedArray` are decoded as + * UTF-8 text, objects are `JSON.stringify`'d and everything else is coerced + * with `String()`. Multi-line payloads are split into multiple `data:` lines + * as required by the protocol. + * + * Options: + * + * - `event` the event name (`event:` field) + * - `id` the event id (`id:` field) + * - `retry` the client reconnection time in ms (`retry:` field) + * + * @param {*} data + * @param {object} [options] + * @return {SSE} for chaining + * @public + */ + +SSE.prototype.send = function send(data, options) { + var opts = options || {}; + var frame = ''; + + if (opts.id != null) frame += 'id: ' + opts.id + '\n'; + if (opts.event != null) frame += 'event: ' + opts.event + '\n'; + if (opts.retry != null) frame += 'retry: ' + opts.retry + '\n'; + + // a payload may span multiple lines, each needs its own `data:` field + var lines = format(data).split('\n'); + for (var i = 0; i < lines.length; i++) { + frame += 'data: ' + lines[i] + '\n'; + } + + return this._write(frame + '\n'); +}; + +/** + * Stream a source, emitting one event per chunk, and resolve when it ends. + * + * Any stream-like source is accepted by bridging it into a Node `Readable`: + * a web `ReadableStream` or `Blob` via `Readable.fromWeb()`, and Node streams + * or async iterables directly. Each chunk is passed through `send()`, so the + * same serialization and `options` apply. + * + * @param {ReadableStream|Readable|Blob|AsyncIterable|Iterable} source + * @param {object} [options] + * @return {Promise} resolves with the stream when the source ends + * @public + */ + +SSE.prototype.stream = async function stream(source, options) { + var readable = toReadable(source); + + for await (var chunk of readable) { + if (this.finished) break; + this.send(chunk, options); + } + + return this; +}; + +/** + * Write a comment line, used as a keep-alive that does not trigger `onmessage`. + * + * @param {string} [text] + * @return {SSE} for chaining + * @public + */ + +SSE.prototype.comment = function comment(text) { + return this._write(': ' + (text == null ? '' : text) + '\n\n'); +}; + +/** + * Close the stream and stop the heartbeat. + * + * @return {SSE} for chaining + * @public + */ + +SSE.prototype.close = function close() { + this._clearHeartbeat(); + + if (!this.finished) { + this.finished = true; + this.res.end(); + } + + return this; +}; + +/** + * Write a raw chunk if the connection is still open. + * + * @param {string} chunk + * @return {SSE} for chaining + * @private + */ + +SSE.prototype._write = function _write(chunk) { + if (!this.finished) { + this.res.write(chunk); + } + + return this; +}; + +/** + * Clear the heartbeat interval if one is running. + * + * @private + */ + +SSE.prototype._clearHeartbeat = function _clearHeartbeat() { + if (this._heartbeat) { + clearInterval(this._heartbeat); + this._heartbeat = null; + } +}; + +/** + * Serialize a value to the text payload of a `data:` field. + * + * @param {*} data + * @return {string} + * @private + */ + +function format(data) { + if (data == null) { + return ''; + } + + if (ArrayBuffer.isView(data)) { + return Buffer.from(data.buffer, data.byteOffset, data.byteLength).toString('utf8'); + } + + if (typeof data === 'object') { + return JSON.stringify(data); + } + + return String(data); +} + +/** + * Bridge any stream-like source into a Node `Readable`. + * + * @param {ReadableStream|Readable|Blob|AsyncIterable|Iterable} source + * @return {Readable} + * @private + */ + +function toReadable(source) { + if (source instanceof Readable) { + return source; + } + + if (typeof ReadableStream !== 'undefined' && source instanceof ReadableStream) { + return Readable.fromWeb(source); + } + + if (typeof Blob !== 'undefined' && source instanceof Blob) { + return Readable.fromWeb(source.stream()); + } + + return Readable.from(source); +} diff --git a/test/res.sse.js b/test/res.sse.js new file mode 100644 index 00000000000..8af30586195 --- /dev/null +++ b/test/res.sse.js @@ -0,0 +1,229 @@ +'use strict' + +var express = require('../') + , request = require('supertest') + , assert = require('node:assert') + , Readable = require('node:stream').Readable; + +const { Buffer } = require('node:buffer'); + +describe('res', function () { + describe('.sse()', function () { + it('should set the event-stream headers', function (done) { + var app = express(); + + app.get('/', function (req, res) { + res.sse({ heartbeat: false }).close(); + }); + + request(app) + .get('/') + .expect('Content-Type', 'text/event-stream; charset=utf-8') + .expect('Cache-Control', 'no-cache, no-transform') + .expect('Connection', 'keep-alive') + .expect(200, done); + }) + + it('should send a string as a single data field', function (done) { + var app = express(); + + app.get('/', function (req, res) { + var sse = res.sse({ heartbeat: false }); + sse.send('hello'); + sse.close(); + }); + + request(app) + .get('/') + .expect(200, 'data: hello\n\n', done); + }) + + it('should serialize objects as JSON', function (done) { + var app = express(); + + app.get('/', function (req, res) { + var sse = res.sse({ heartbeat: false }); + sse.send({ hello: 'world' }); + sse.close(); + }); + + request(app) + .get('/') + .expect(200, 'data: {"hello":"world"}\n\n', done); + }) + + it('should decode Buffers as utf-8 text', function (done) { + var app = express(); + + app.get('/', function (req, res) { + var sse = res.sse({ heartbeat: false }); + sse.send(Buffer.from('café')); + sse.close(); + }); + + request(app) + .get('/') + .expect(200, 'data: café\n\n', done); + }) + + it('should write id, event and retry fields', function (done) { + var app = express(); + + app.get('/', function (req, res) { + var sse = res.sse({ heartbeat: false }); + sse.send('payload', { event: 'update', id: 42, retry: 3000 }); + sse.close(); + }); + + request(app) + .get('/') + .expect(200, 'id: 42\nevent: update\nretry: 3000\ndata: payload\n\n', done); + }) + + it('should split multi-line data into multiple data fields', function (done) { + var app = express(); + + app.get('/', function (req, res) { + var sse = res.sse({ heartbeat: false }); + sse.send('line one\nline two'); + sse.close(); + }); + + request(app) + .get('/') + .expect(200, 'data: line one\ndata: line two\n\n', done); + }) + + it('should write a comment line', function (done) { + var app = express(); + + app.get('/', function (req, res) { + var sse = res.sse({ heartbeat: false }); + sse.comment('keep-alive'); + sse.close(); + }); + + request(app) + .get('/') + .expect(200, ': keep-alive\n\n', done); + }) + + it('should be chainable', function (done) { + var app = express(); + + app.get('/', function (req, res) { + res.sse({ heartbeat: false }) + .send('a') + .send('b') + .close(); + }); + + request(app) + .get('/') + .expect(200, 'data: a\n\ndata: b\n\n', done); + }) + + describe('.stream()', function () { + it('should emit one event per chunk of a Node Readable', function (done) { + var app = express(); + + app.get('/', function (req, res) { + var sse = res.sse({ heartbeat: false }); + sse.stream(Readable.from(['one', 'two'])).then(function () { + sse.close(); + }); + }); + + request(app) + .get('/') + .expect(200, 'data: one\n\ndata: two\n\n', done); + }) + + it('should emit one event per chunk of a web ReadableStream', function (done) { + var app = express(); + + app.get('/', function (req, res) { + var sse = res.sse({ heartbeat: false }); + var source = new ReadableStream({ + start: function (controller) { + controller.enqueue('a'); + controller.enqueue('b'); + controller.close(); + } + }); + sse.stream(source, { event: 'chunk' }).then(function () { + sse.close(); + }); + }); + + request(app) + .get('/') + .expect(200, 'event: chunk\ndata: a\n\nevent: chunk\ndata: b\n\n', done); + }) + + it('should resolve with the stream instance', function (done) { + var app = express(); + + app.get('/', function (req, res) { + var sse = res.sse({ heartbeat: false }); + sse.stream(Readable.from(['x'])).then(function (returned) { + assert.strictEqual(returned, sse); + sse.close(); + }); + }); + + request(app) + .get('/') + .expect(200, 'data: x\n\n', done); + }) + + it('should emit one event per chunk of an async iterable', function (done) { + var app = express(); + + app.get('/', function (req, res) { + var sse = res.sse({ heartbeat: false }); + async function * gen () { + yield 'a'; + yield 'b'; + } + sse.stream(gen()).then(function () { + sse.close(); + }); + }); + + request(app) + .get('/') + .expect(200, 'data: a\n\ndata: b\n\n', done); + }) + + it('should stream the contents of a Blob', function (done) { + var app = express(); + + app.get('/', function (req, res) { + var sse = res.sse({ heartbeat: false }); + sse.stream(new Blob(['hello world'])).then(function () { + sse.close(); + }); + }); + + request(app) + .get('/') + .expect(200, 'data: hello world\n\n', done); + }) + }) + + it('should not throw when writing after close', function (done) { + var app = express(); + + app.get('/', function (req, res) { + var sse = res.sse({ heartbeat: false }); + sse.close(); + sse.send('ignored'); + }); + + request(app) + .get('/') + .expect(200, '', done); + }) + }) +})